当前位置: 首页 > news >正文

Kafka:安装和配置

 producer:发布消息的对象,称为消息产生者 (Kafka topic producer)

topic:Kafka将消息分门别类,每一个消息称为一个主题(topic)

consumer:订阅消息并处理发布消息的对象称为消费者(consumer)

broker:已发布的消息保存在一组服务器中,称为kafka集群,集群中的每一个服务器都是一个代理(broker),消费者(consumer)可以订阅一个或者多个主题(topic),并从broker中拉取数据,从而消费这些已发布的信息。

1、Kafka对zookeeper是一个强依赖,保存Kafka相关的节点数据,所以安装kafka之前要先安装zookeeper

下载镜像

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

下载镜像

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

2、入门案例

①创建kafka-demo工程并引入依赖

        <!--kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

②创建ProducerQuickStart生产者类并实现

package com.heima.kafkademo.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*生产者*/
public class ProducerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、生产对象*/KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封装发送消息的对象ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");/*3、发送消息*/producer.send(record);/*4、关闭通道,负责消息发送不成功*/producer.close();}
}

③创建ConsumerQuickStart消费者类并实现

package com.heima.kafkademo.sample;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、消费者对象*/KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);/*3、订阅主题*/consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程处于一直监听状态while (true){//4、获取消息ConsumerRecords<String,String> consumerRecords =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : consumerRecords) {System.out.println(record.key());System.out.println(record.value());}}}
}

④运行测试

        成功接收到消息

总结

  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)

  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)

下一篇: springboot集成kafka收发消息

相关文章:

Kafka:安装和配置

producer&#xff1a;发布消息的对象&#xff0c;称为消息产生者 &#xff08;Kafka topic producer&#xff09; topic&#xff1a;Kafka将消息分门别类&#xff0c;每一个消息称为一个主题&#xff08;topic&#xff09; consumer&#xff1a;订阅消息并处理发布消息的对象…...

786. 第k个数

文章目录 QuestionIdeasCode Question 给定一个长度为 n 的整数数列&#xff0c;以及一个整数 k &#xff0c;请用快速选择算法求出数列从小到大排序后的第 k 个数。 输入格式 第一行包含两个整数 n 和 k 。 第二行包含 n 个整数&#xff08;所有整数均在 1∼109 范围内&…...

用友-NC-Cloud远程代码执行漏洞[2023-HW]

用友-NC-Cloud远程代码执行漏洞[2023-HW] 一、漏洞介绍二、资产搜索三、漏洞复现PoC小龙POC检测脚本: 四、修复建议 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失&#…...

Transformer(二)(VIT,TNT)(基于视觉CV)

目录 1.视觉中的Attention 2.VIT框架&#xff08;图像分类&#xff0c;不需要decoder&#xff09; 2.1整体框架 2.2.CNN和Transformer遇到的问题 2.3.1CNN 2.3.2Transformer 2.3.3二者对比 2.4.公式理解 3TNT 参考文献 1.视觉中的Attention 对于人类而言看到一幅图可以立…...

Scratch 详解 之 线性→代数之——求两线段交点坐标

可能有人要问&#xff1a;求交点坐标有什么用呢&#xff1f;而且为啥要用线代来求&#xff1f;直线方程不行吗&#xff1f;&#xff1f;&#xff1f; 这个问题&#xff0c;我只能说&#xff0c;直线方程计算的次数过多了&#xff0c;而且动不动就要考虑线的方向&#xff0c;90的…...

Python-组合数据类型

今天要介绍的是Python的组合数据类型 整理不易&#xff0c;希望得到大家的支持&#xff0c;欢迎各位读者评论点赞收藏 感谢&#xff01; 目录 知识点知识导图1、组合数据类型的基本概念1.1 组合数据类型1.2 集合类型概述1.3 序列类型概述1.4 映射类型概述 2、列表类型2.1 列表的…...

vue3+vue-simple-uploader实现大文件上传

vue-simple-uploader本身是基于vue2实现,如果要使用vue3会报错。如何在vue3中使用,可参考我的另一篇文章:解决vue3中不能使用vue-simple-uploader__Jyann_的博客-CSDN博客 一.实现思路 使用vue-simple-uploader组件的uploader组件,设置自动上传为false,即可开启手动上传。…...

自适应变异麻雀搜索算法及其Matlab实现

麻雀搜索算法( sparrow search algorithm&#xff0c;SSA) 是2020 年新提出的一种元启发式算法[1]&#xff0c;它是受麻雀种群的觅食和反捕食行为启发&#xff0c;将搜索群体分为发现者、加入者和侦察者 3 部分&#xff0c;其相互分工寻找最优值&#xff0c;通过 19 个标准测试…...

ETL技术入门之ETLCloud初认识

首先ETL是什么&#xff1f; ETL代表“Extract, Transform, Load”&#xff0c;是一种用于数据集成和转换的过程。它在数据管理和分析中扮演着重要的角色。下面我们将分解每个步骤&#xff1a; Extract&#xff08;抽取&#xff09;&#xff1a; 这一步骤涉及从多个不同的数据源…...

uniapp项目如何运行在微信小程序模拟器上

在HbuilderX中的小程序写完后自己一定要保存&#xff0c;否则会出不来效果 那么怎么让uniapp项目运行在微信小程序开发工具中呢 1 在hbuilderx中点击运行到小程序模拟器 2 然后在项目目录中会生成一个文件夹 在微信小程序开发软件中的工具>安全设置>打开端口 或者在微…...

数据挖掘全流程解析

数据挖掘全流程解析 数据指标选择 在这一阶段&#xff0c;使用直方图和柱状图的方式对数据进行分析&#xff0c;观察什么数据属性对于因变量会产生更加明显的结果。 如何绘制直方图和条形统计图 数据清洗 观察数据是否存在数据缺失或者离群点的情况。 数据异常的两种情况…...

详细介绍如何对音乐信息进行检索和音频节拍跟踪

在本文中,我们将了解节拍的概念,以及我们在尝试跟踪节拍时面临的挑战。然后我们将介绍解决问题的方法以及业界最先进的解决方案。 介绍 音乐就在我们身边。每当我们听到任何与我们的心灵和思想相关的音乐时,我们就会迷失其中。我们下意识地随着听到的节拍而敲击。您一定已…...

Java课题笔记~ HTTP协议(请求和响应)

Servlet最主要的作用就是处理客户端请求&#xff0c;并向客户端做出响应。为此&#xff0c;针对Servlet的每次请求&#xff0c;Web服务器在调用service()方法之前&#xff0c;都会创建两个对象 分别是HttpServletRequest和HttpServletResponse。 其中HttpServletRequest用于封…...

在x86下运行的Ubuntu系统上部署QEMU用于模拟RISC-V硬件环境

1.配置工作环境 sudo apt install gcc bison flex libncurses-dev ninja-build \pkg-config build-essential zlib1g-dev pkg-config libglib2.0-dev \binutils-dev libboost-all-dev autoconf libtool libssl-dev \libpixman-1-dev python-capstone virtualenv software-prop…...

网络爬虫选择代理IP的标准

Hey&#xff0c;小伙伴们&#xff01;作为一家http代理产品供应商&#xff0c;我知道网络爬虫在选择代理IP时可能会遇到些问题&#xff0c;毕竟市面上有很多选择。别担心&#xff01;今天我要给大家分享一些实用的建议&#xff0c;帮助你们选择适合网络爬虫的代理IP。一起来看看…...

RxJava 复刻简版之三,map 多次中转数据

案例代码&#xff1a;https://gitee.com/bobidali/lite-rx-java/commit/292e9227a5491f7ec6a07f395292ef8e6ff69290 RxJava 的调用第一步是封装了观察者接受了数据的处理&#xff0c;进一步就是使用 map 将数据操作传递给上下游 1、类似Observer.create 创建一个简单的观察者…...

06 Word2Vec模型(第一个专门做词向量的模型,CBOW和Skip-gram)

博客配套视频链接: https://space.bilibili.com/383551518?spm_id_from=333.1007.0.0 b 站直接看 配套 github 链接:https://github.com/nickchen121/Pre-training-language-model 配套博客链接:https://www.cnblogs.com/nickchen121/p/15105048.html 神经网络语言模型(NNL…...

Axure RP9小白安装教程

第一步&#xff1a; 打开&#xff1a;Axure中文学习网 第二步&#xff1a; 鼠标移动软件下载&#xff0c;点击Axure RP 9下载既可 第三步&#xff1a; 注意&#xff1a;Axure RP 9 MAC正式版为苹果版本&#xff0c;Axure RP 9 WIN正式版为Windows版本 中文汉化包&#xff…...

腾讯云CVM服务器2核2g1m带宽支持多少人访问?

腾讯云2核2g1m的服务器支持多少人同时访问&#xff1f;2核2g1m云服务器短板是在1M公网带宽上&#xff0c;腾讯云服务器网以网站应用为例&#xff0c;当大规模用户同时访问网站时&#xff0c;很大概率会卡在公网带宽上&#xff0c;所以压根就谈不上2核2G的CPU内存计算性能是否够…...

8.12学习笔记

在PyTorch中&#xff0c;Dataset和DataLoader是用于处理数据的两个重要类。Dataset类是一个抽象类&#xff0c;用于表示数据集。它的主要作用是将数据加载到内存中&#xff0c;并提供一种统一的方式来访问数据。为了使用Dataset类&#xff0c;你需要继承它并实现两个方法&#…...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:

一、属性动画概述NETX 作用&#xff1a;实现组件通用属性的渐变过渡效果&#xff0c;提升用户体验。支持属性&#xff1a;width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项&#xff1a; 布局类属性&#xff08;如宽高&#xff09;变化时&#…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

STM32HAL库USART源代码解析及应用

STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...

PostgreSQL——环境搭建

一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在&#xff0…...

c++第七天 继承与派生2

这一篇文章主要内容是 派生类构造函数与析构函数 在派生类中重写基类成员 以及多继承 第一部分&#xff1a;派生类构造函数与析构函数 当创建一个派生类对象时&#xff0c;基类成员是如何初始化的&#xff1f; 1.当派生类对象创建的时候&#xff0c;基类成员的初始化顺序 …...

WebRTC从入门到实践 - 零基础教程

WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC&#xff1f; WebRTC&#xff08;Web Real-Time Communication&#xff09;是一个支持网页浏览器进行实时语音…...

6.9-QT模拟计算器

源码: 头文件: widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QMouseEvent>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr);…...

ArcPy扩展模块的使用(3)

管理工程项目 arcpy.mp模块允许用户管理布局、地图、报表、文件夹连接、视图等工程项目。例如&#xff0c;可以更新、修复或替换图层数据源&#xff0c;修改图层的符号系统&#xff0c;甚至自动在线执行共享要托管在组织中的工程项。 以下代码展示了如何更新图层的数据源&…...