Kafka的配置和使用
目录
1.服务器用docker安装kafka
2.springboot集成kafka实现生产者和消费者
1.服务器用docker安装kafka
①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载)
a、自动安装
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
b、启动docker
sudo systemctl start docker
c、 通过运行hello-world镜像来验证是否正确安装了Docker Engine-Community。
// 拉取镜像
sudo docker pull hello-world
// 执行
hello-world sudo docker run hello-world
d、安装成功

②、zookeeper
a、docker search zookeeper
b、docker pull zookeeper
③、安装kafka
a、docker search kafka
b、docker pull wurstmeister/kafka
④、运行zookeeper
a、docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper
⑤、运行kafka
a、 docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=42.194.238.131:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://42.194.238.131:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
b、参数说明
参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.21.10.10:2181/kafka 配置zookeeper管理kafka的路径172.21.10.10:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.10.10:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
⑥、检验kafka是否可以使用
docker exec -it kafka bash
cd /opt/kafka_2.13-2.8.1/
cd bin
a、运行kafka生产者并发送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
b、在开一个页面,运行kafka消费者发送消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
⑦、结果是这个样子的

⑧、每条消息都有一个主题,消费者指定监听哪个主题的消息,如果进来消息队列的是我们指定监听的主题,就消费,否则不消费(topic这里指定的生产和消费的主题)

⑨、消费者宕掉了,生产者接着发,消息不会丢,消费者重启之后会重新接收到宕机之后发的所有消息
2.springboot集成kafka实现生产者和消费者
①、在pom中创建依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.8</version>
</dependency>
②、配置kafka
a、在 application.yml 文件中添加以下配置:(注:yml中两个相同名字的会报错,比如两个spring)
spring:
kafka:
#自己的kafka所在的ip地址和端口号
bootstrap-servers: localhost:9092
consumer:
#一个group-id代表一个消费组,一个消息可以被几个消费组消费
group-id: my-group
auto-offset-reset: earliest
producer: #序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
b、创建一个生产者
@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}
sendMessage 方法,用于发送消息到 Kafka。
@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaTemplate.send("my-topic", message);}}
c、 创建一个消费者
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}}
@KafkaListener 注解声明了一个消费者方法,用于接收从
my-topic 主题中读取的消息
@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group-id")public void consume(String message) {System.out.println("Received message: " + message);}}
相关文章:
Kafka的配置和使用
目录 1.服务器用docker安装kafka 2.springboot集成kafka实现生产者和消费者 1.服务器用docker安装kafka ①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载) a、自动安装 curl -fsSL https://get.docker.com | b…...
【C++】unordered_map在Windows和Linux上的不同行为
我目前手头上的项目,需要编译在板端Linux上运行,但是日常daily调试多在Windows上开发。这就涉及到同一份代码在多平台上的编译个运行。有一次遇到了一个奇怪的现象:跑同样的一份代码,Windows和Linux出来的结果是不一致的。最终确定…...
Apipost三方消息通知,接口变更不用愁
Apipost致力于为开发者提供更全面的API管理功能。而最近,Apipost又新增了一个非常实用的功能:第三方消息推送。这个功能可以帮助开发人员及时了解API的变更情况,从而更好地管理和优化自己的API。 具体来说,Apipost的第三方消息推…...
C语言 用数组名作函数参数
当用数组名作函数参数时,如果形参数组中各元素的值发生变化,实参数组元素的值随之变化。 1.数组元素做实参的情况: 如果已经定义一个函数,其原型为 void swap(int x,int y);假设函数的作用是将两个形参(x,y…...
每日一题(980. 不同路径 III)-回溯
题目 980. 不同路径 III 题解思路 表格中值为1的为起始点值为0 的是可以经过的点,但是只能经过一次值为2 的是终点,计算从起点到终点一共有多少种路径 计算出值为0的方格个数,同时找到起点位置当位于终点时候且经过所有的方格为0的点 即为…...
【Python:json常用函数,用于加载和保存json文件】load(), loads(), dump(), dumps()
文章目录 1、load()2、loads()3、dump()4、dumps() json文件为javascript object Notation文件,属于轻量级的数据交换格式,可以用于存储和交换数据。json文件是由类似{ }的key-value映射组成。 1、load() 把json文件加载为Python的数据格式,…...
Flink State 和 Fault Tolerance详解
有状态操作或者操作算子在处理DataStream的元素或者事件的时候需要存储计算的中间状态,这就使得状态在整个Flink的精细化计算中有着非常重要的地位: 记录数据从某一个过去时间点到当前时间的状态信息。以每分钟/小时/天汇总事件时,状态将保留…...
小红书2023“家生活”趋势白皮书
关于报告的所有内容,公众【营销人星球】获取下载查看 核心观点 近年来,年轻人与家的关系愈发紧密。 在小红书上,我们观察到了家居家装内容的蓬勃生长,3 年来相关内容的笔记规模增长了6倍,相关品类的搜索量增加的 3.…...
使用 LangChain 搭建基于 Amazon DynamoDB 的大语言模型应用
LangChain 是一个旨在简化使用大型语言模型创建应用程序的框架。作为语言模型集成框架,在这个应用场景中,LangChain 将与 Amazon DynamoDB 紧密结合,构建一个完整的基于大语言模型的聊天应用。 本次活动,我们特意邀请了亚马逊云科…...
210. 课程表 II Python
文章目录 一、题目描述示例 1示例 2示例 3 二、代码三、解题思路 一、题目描述 现在你总共有 numCourses 门课需要选,记为 0 到 numCourses - 1。给你一个数组 prerequisites ,其中 prerequisites[i] [ai, bi] ,表示在选修课程 ai 前 必须 …...
【LeetCode 算法】Linked List Cycle II 环形链表 II
文章目录 Linked List Cycle II 环形链表 II问题描述:分析代码哈希快慢指针 Tag Linked List Cycle II 环形链表 II 问题描述: 给定一个链表的头节点 head ,返回链表开始入环的第一个节点。 如果链表无环,则返回 null。 如果链…...
蒸散发与植被总初级生产力估算
目标 熟悉蒸散发ET及其组分(植被蒸腾Ec、土壤蒸发Es、冠层截留Ei)、植被总初级生产力GPP的概念和碳水耦合的基本原理;掌握利用Python与ArcGIS工具进行课程相关的操作;熟练掌握国际上流行的Penman-Monteith模型,并能够…...
uniapp微信小程序底部弹窗自定义组件
基础弹窗效果组件 <template><view><viewclass"tui-actionsheet-class tui-actionsheet":class"[show ? tui-actionsheet-show : ]"><view class"regional-selection">底部弹窗</view></view><!-- 遮罩…...
人工智能的最新进展:2024年将会发生什么?
文章目录 2024年AI最新发展2024年AI具体应用2024年AI的具体预测 ✍创作者:全栈弄潮儿 🏡 个人主页: 全栈弄潮儿的个人主页 🏙️ 个人社区,欢迎你的加入:全栈弄潮儿的个人社区 📙 专栏地址&#…...
使用Golang实现一套流程可配置,适用于广告、推荐系统的业务性框架——组合应用
在《使用Golang实现一套流程可配置,适用于广告、推荐系统的业务性框架——简单应用》中,我们看到了各种组合Handler的组件,如HandlerGroup和Layer。这些组件下面的子模块又是不同组件,比如LayerCenter的子组件是Layer。如果此时我…...
DNS入门学习:DNS缓存的原理和作用(中科三方)
在实际业务场景中,DNS解析过程并不总是严格遵循从根域名服务器、顶级域名服务器再到权威域名服务器的一级级查询过程,这只是一个标准状态。为了节省全球查询的时间,同时减轻各级服务器的解析压力,DNS系统中引入了缓存机制。本文中…...
Linux虚拟机安装tomcat(图文详解)
目录 第一章、xshell工具和xftp的使用1.1)xshell下载与安装1.2)xshell连接1.3)xftp下载安装和连接 第二章、安装tomcat1.1)关闭防火墙,传输tomcat压缩包到Linux虚拟机12)启动tomcat 第一章、xshell工具和xf…...
Matlab对TMS320F28335编程--SVPWM配置互补PWM输出
前言 F28335中断 目的:FOC的核心算法及SVPWM输出,SVPWM的载波频率10kHz,SVPWM的每个周期都会触发ADC中断采集相电流,SVPWM为芯片ePWM4、5、6通道,配置死区 1、配置中断SVPWM进ADC中断,查上表知CPU1,PIE1 …...
MySQL数据库——多表操作
文章目录 前言多表关系一对一关系一对多/多对一关系多对多关系 外键约束创建外键约束插入数据删除带有外键约束的表的数据删除外键约束 多表联合查询数据准备交叉连接查询内连接查询外连接查询左外连接查询右外连接查询满外连接查询 子查询子查询关键字ALL 关键字ANY 和 SOME 关…...
Java版本spring cloud + spring boot企业电子招投标系统源代码 tbms
功能模块: 待办消息,招标公告,中标公告,信息发布 描述: 全过程数字化采购管理,打造从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通供应商门户具备内外协同的能力,为…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...
51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
C++ 基础特性深度解析
目录 引言 一、命名空间(namespace) C 中的命名空间 与 C 语言的对比 二、缺省参数 C 中的缺省参数 与 C 语言的对比 三、引用(reference) C 中的引用 与 C 语言的对比 四、inline(内联函数…...
python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)
更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...
企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...
【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...
