Kafka的配置和使用
目录
1.服务器用docker安装kafka
2.springboot集成kafka实现生产者和消费者
1.服务器用docker安装kafka
①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载)
a、自动安装
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
b、启动docker
sudo systemctl start docker
c、 通过运行hello-world镜像来验证是否正确安装了Docker Engine-Community。
// 拉取镜像
sudo docker pull hello-world
// 执行
hello-world sudo docker run hello-world
d、安装成功
②、zookeeper
a、docker search zookeeper
b、docker pull zookeeper
③、安装kafka
a、docker search kafka
b、docker pull wurstmeister/kafka
④、运行zookeeper
a、docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper
⑤、运行kafka
a、 docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=42.194.238.131:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://42.194.238.131:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
b、参数说明
参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.21.10.10:2181/kafka 配置zookeeper管理kafka的路径172.21.10.10:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.10.10:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
⑥、检验kafka是否可以使用
docker exec -it kafka bash
cd /opt/kafka_2.13-2.8.1/
cd bin
a、运行kafka生产者并发送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
b、在开一个页面,运行kafka消费者发送消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
⑦、结果是这个样子的
⑧、每条消息都有一个主题,消费者指定监听哪个主题的消息,如果进来消息队列的是我们指定监听的主题,就消费,否则不消费(topic这里指定的生产和消费的主题)
⑨、消费者宕掉了,生产者接着发,消息不会丢,消费者重启之后会重新接收到宕机之后发的所有消息
2.springboot集成kafka实现生产者和消费者
①、在pom中创建依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.8</version>
</dependency>
②、配置kafka
a、在 application.yml 文件中添加以下配置:(注:yml中两个相同名字的会报错,比如两个spring)
spring:
kafka:
#自己的kafka所在的ip地址和端口号
bootstrap-servers: localhost:9092
consumer:
#一个group-id代表一个消费组,一个消息可以被几个消费组消费
group-id: my-group
auto-offset-reset: earliest
producer: #序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
b、创建一个生产者
@Configuration public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}
sendMessage 方法,用于发送消息到 Kafka。
@RestController public class KafkaController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaTemplate.send("my-topic", message);}}
c、 创建一个消费者
@Configuration @EnableKafka public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}}
@KafkaListener 注解声明了一个消费者方法,用于接收从
my-topic 主题中读取的消息
@Service public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group-id")public void consume(String message) {System.out.println("Received message: " + message);}}
相关文章:

Kafka的配置和使用
目录 1.服务器用docker安装kafka 2.springboot集成kafka实现生产者和消费者 1.服务器用docker安装kafka ①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载) a、自动安装 curl -fsSL https://get.docker.com | b…...
【C++】unordered_map在Windows和Linux上的不同行为
我目前手头上的项目,需要编译在板端Linux上运行,但是日常daily调试多在Windows上开发。这就涉及到同一份代码在多平台上的编译个运行。有一次遇到了一个奇怪的现象:跑同样的一份代码,Windows和Linux出来的结果是不一致的。最终确定…...

Apipost三方消息通知,接口变更不用愁
Apipost致力于为开发者提供更全面的API管理功能。而最近,Apipost又新增了一个非常实用的功能:第三方消息推送。这个功能可以帮助开发人员及时了解API的变更情况,从而更好地管理和优化自己的API。 具体来说,Apipost的第三方消息推…...

C语言 用数组名作函数参数
当用数组名作函数参数时,如果形参数组中各元素的值发生变化,实参数组元素的值随之变化。 1.数组元素做实参的情况: 如果已经定义一个函数,其原型为 void swap(int x,int y);假设函数的作用是将两个形参(x,y…...
每日一题(980. 不同路径 III)-回溯
题目 980. 不同路径 III 题解思路 表格中值为1的为起始点值为0 的是可以经过的点,但是只能经过一次值为2 的是终点,计算从起点到终点一共有多少种路径 计算出值为0的方格个数,同时找到起点位置当位于终点时候且经过所有的方格为0的点 即为…...
【Python:json常用函数,用于加载和保存json文件】load(), loads(), dump(), dumps()
文章目录 1、load()2、loads()3、dump()4、dumps() json文件为javascript object Notation文件,属于轻量级的数据交换格式,可以用于存储和交换数据。json文件是由类似{ }的key-value映射组成。 1、load() 把json文件加载为Python的数据格式,…...

Flink State 和 Fault Tolerance详解
有状态操作或者操作算子在处理DataStream的元素或者事件的时候需要存储计算的中间状态,这就使得状态在整个Flink的精细化计算中有着非常重要的地位: 记录数据从某一个过去时间点到当前时间的状态信息。以每分钟/小时/天汇总事件时,状态将保留…...

小红书2023“家生活”趋势白皮书
关于报告的所有内容,公众【营销人星球】获取下载查看 核心观点 近年来,年轻人与家的关系愈发紧密。 在小红书上,我们观察到了家居家装内容的蓬勃生长,3 年来相关内容的笔记规模增长了6倍,相关品类的搜索量增加的 3.…...

使用 LangChain 搭建基于 Amazon DynamoDB 的大语言模型应用
LangChain 是一个旨在简化使用大型语言模型创建应用程序的框架。作为语言模型集成框架,在这个应用场景中,LangChain 将与 Amazon DynamoDB 紧密结合,构建一个完整的基于大语言模型的聊天应用。 本次活动,我们特意邀请了亚马逊云科…...
210. 课程表 II Python
文章目录 一、题目描述示例 1示例 2示例 3 二、代码三、解题思路 一、题目描述 现在你总共有 numCourses 门课需要选,记为 0 到 numCourses - 1。给你一个数组 prerequisites ,其中 prerequisites[i] [ai, bi] ,表示在选修课程 ai 前 必须 …...
【LeetCode 算法】Linked List Cycle II 环形链表 II
文章目录 Linked List Cycle II 环形链表 II问题描述:分析代码哈希快慢指针 Tag Linked List Cycle II 环形链表 II 问题描述: 给定一个链表的头节点 head ,返回链表开始入环的第一个节点。 如果链表无环,则返回 null。 如果链…...

蒸散发与植被总初级生产力估算
目标 熟悉蒸散发ET及其组分(植被蒸腾Ec、土壤蒸发Es、冠层截留Ei)、植被总初级生产力GPP的概念和碳水耦合的基本原理;掌握利用Python与ArcGIS工具进行课程相关的操作;熟练掌握国际上流行的Penman-Monteith模型,并能够…...

uniapp微信小程序底部弹窗自定义组件
基础弹窗效果组件 <template><view><viewclass"tui-actionsheet-class tui-actionsheet":class"[show ? tui-actionsheet-show : ]"><view class"regional-selection">底部弹窗</view></view><!-- 遮罩…...

人工智能的最新进展:2024年将会发生什么?
文章目录 2024年AI最新发展2024年AI具体应用2024年AI的具体预测 ✍创作者:全栈弄潮儿 🏡 个人主页: 全栈弄潮儿的个人主页 🏙️ 个人社区,欢迎你的加入:全栈弄潮儿的个人社区 📙 专栏地址&#…...

使用Golang实现一套流程可配置,适用于广告、推荐系统的业务性框架——组合应用
在《使用Golang实现一套流程可配置,适用于广告、推荐系统的业务性框架——简单应用》中,我们看到了各种组合Handler的组件,如HandlerGroup和Layer。这些组件下面的子模块又是不同组件,比如LayerCenter的子组件是Layer。如果此时我…...
DNS入门学习:DNS缓存的原理和作用(中科三方)
在实际业务场景中,DNS解析过程并不总是严格遵循从根域名服务器、顶级域名服务器再到权威域名服务器的一级级查询过程,这只是一个标准状态。为了节省全球查询的时间,同时减轻各级服务器的解析压力,DNS系统中引入了缓存机制。本文中…...

Linux虚拟机安装tomcat(图文详解)
目录 第一章、xshell工具和xftp的使用1.1)xshell下载与安装1.2)xshell连接1.3)xftp下载安装和连接 第二章、安装tomcat1.1)关闭防火墙,传输tomcat压缩包到Linux虚拟机12)启动tomcat 第一章、xshell工具和xf…...

Matlab对TMS320F28335编程--SVPWM配置互补PWM输出
前言 F28335中断 目的:FOC的核心算法及SVPWM输出,SVPWM的载波频率10kHz,SVPWM的每个周期都会触发ADC中断采集相电流,SVPWM为芯片ePWM4、5、6通道,配置死区 1、配置中断SVPWM进ADC中断,查上表知CPU1,PIE1 …...

MySQL数据库——多表操作
文章目录 前言多表关系一对一关系一对多/多对一关系多对多关系 外键约束创建外键约束插入数据删除带有外键约束的表的数据删除外键约束 多表联合查询数据准备交叉连接查询内连接查询外连接查询左外连接查询右外连接查询满外连接查询 子查询子查询关键字ALL 关键字ANY 和 SOME 关…...

Java版本spring cloud + spring boot企业电子招投标系统源代码 tbms
功能模块: 待办消息,招标公告,中标公告,信息发布 描述: 全过程数字化采购管理,打造从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通供应商门户具备内外协同的能力,为…...

龙虎榜——20250610
上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...

手游刚开服就被攻击怎么办?如何防御DDoS?
开服初期是手游最脆弱的阶段,极易成为DDoS攻击的目标。一旦遭遇攻击,可能导致服务器瘫痪、玩家流失,甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案,帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...
Spring Boot 实现流式响应(兼容 2.7.x)
在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...

3-11单元格区域边界定位(End属性)学习笔记
返回一个Range 对象,只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意:它移动的位置必须是相连的有内容的单元格…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...

有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...