Kafka的配置和使用
目录
1.服务器用docker安装kafka
2.springboot集成kafka实现生产者和消费者
1.服务器用docker安装kafka
①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载)
a、自动安装
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
b、启动docker
sudo systemctl start docker
c、 通过运行hello-world镜像来验证是否正确安装了Docker Engine-Community。
// 拉取镜像
sudo docker pull hello-world
// 执行
hello-world sudo docker run hello-world
d、安装成功

②、zookeeper
a、docker search zookeeper
b、docker pull zookeeper
③、安装kafka
a、docker search kafka
b、docker pull wurstmeister/kafka
④、运行zookeeper
a、docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper
⑤、运行kafka
a、 docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=42.194.238.131:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://42.194.238.131:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
b、参数说明
参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.21.10.10:2181/kafka 配置zookeeper管理kafka的路径172.21.10.10:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.10.10:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
⑥、检验kafka是否可以使用
docker exec -it kafka bash
cd /opt/kafka_2.13-2.8.1/
cd bin
a、运行kafka生产者并发送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
b、在开一个页面,运行kafka消费者发送消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
⑦、结果是这个样子的

⑧、每条消息都有一个主题,消费者指定监听哪个主题的消息,如果进来消息队列的是我们指定监听的主题,就消费,否则不消费(topic这里指定的生产和消费的主题)

⑨、消费者宕掉了,生产者接着发,消息不会丢,消费者重启之后会重新接收到宕机之后发的所有消息
2.springboot集成kafka实现生产者和消费者
①、在pom中创建依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.8</version>
</dependency>
②、配置kafka
a、在 application.yml 文件中添加以下配置:(注:yml中两个相同名字的会报错,比如两个spring)
spring:
kafka:
#自己的kafka所在的ip地址和端口号
bootstrap-servers: localhost:9092
consumer:
#一个group-id代表一个消费组,一个消息可以被几个消费组消费
group-id: my-group
auto-offset-reset: earliest
producer: #序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
b、创建一个生产者
@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}
sendMessage 方法,用于发送消息到 Kafka。
@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaTemplate.send("my-topic", message);}}
c、 创建一个消费者
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}}
@KafkaListener 注解声明了一个消费者方法,用于接收从
my-topic 主题中读取的消息
@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group-id")public void consume(String message) {System.out.println("Received message: " + message);}}
相关文章:
Kafka的配置和使用
目录 1.服务器用docker安装kafka 2.springboot集成kafka实现生产者和消费者 1.服务器用docker安装kafka ①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载) a、自动安装 curl -fsSL https://get.docker.com | b…...
【C++】unordered_map在Windows和Linux上的不同行为
我目前手头上的项目,需要编译在板端Linux上运行,但是日常daily调试多在Windows上开发。这就涉及到同一份代码在多平台上的编译个运行。有一次遇到了一个奇怪的现象:跑同样的一份代码,Windows和Linux出来的结果是不一致的。最终确定…...
Apipost三方消息通知,接口变更不用愁
Apipost致力于为开发者提供更全面的API管理功能。而最近,Apipost又新增了一个非常实用的功能:第三方消息推送。这个功能可以帮助开发人员及时了解API的变更情况,从而更好地管理和优化自己的API。 具体来说,Apipost的第三方消息推…...
C语言 用数组名作函数参数
当用数组名作函数参数时,如果形参数组中各元素的值发生变化,实参数组元素的值随之变化。 1.数组元素做实参的情况: 如果已经定义一个函数,其原型为 void swap(int x,int y);假设函数的作用是将两个形参(x,y…...
每日一题(980. 不同路径 III)-回溯
题目 980. 不同路径 III 题解思路 表格中值为1的为起始点值为0 的是可以经过的点,但是只能经过一次值为2 的是终点,计算从起点到终点一共有多少种路径 计算出值为0的方格个数,同时找到起点位置当位于终点时候且经过所有的方格为0的点 即为…...
【Python:json常用函数,用于加载和保存json文件】load(), loads(), dump(), dumps()
文章目录 1、load()2、loads()3、dump()4、dumps() json文件为javascript object Notation文件,属于轻量级的数据交换格式,可以用于存储和交换数据。json文件是由类似{ }的key-value映射组成。 1、load() 把json文件加载为Python的数据格式,…...
Flink State 和 Fault Tolerance详解
有状态操作或者操作算子在处理DataStream的元素或者事件的时候需要存储计算的中间状态,这就使得状态在整个Flink的精细化计算中有着非常重要的地位: 记录数据从某一个过去时间点到当前时间的状态信息。以每分钟/小时/天汇总事件时,状态将保留…...
小红书2023“家生活”趋势白皮书
关于报告的所有内容,公众【营销人星球】获取下载查看 核心观点 近年来,年轻人与家的关系愈发紧密。 在小红书上,我们观察到了家居家装内容的蓬勃生长,3 年来相关内容的笔记规模增长了6倍,相关品类的搜索量增加的 3.…...
使用 LangChain 搭建基于 Amazon DynamoDB 的大语言模型应用
LangChain 是一个旨在简化使用大型语言模型创建应用程序的框架。作为语言模型集成框架,在这个应用场景中,LangChain 将与 Amazon DynamoDB 紧密结合,构建一个完整的基于大语言模型的聊天应用。 本次活动,我们特意邀请了亚马逊云科…...
210. 课程表 II Python
文章目录 一、题目描述示例 1示例 2示例 3 二、代码三、解题思路 一、题目描述 现在你总共有 numCourses 门课需要选,记为 0 到 numCourses - 1。给你一个数组 prerequisites ,其中 prerequisites[i] [ai, bi] ,表示在选修课程 ai 前 必须 …...
【LeetCode 算法】Linked List Cycle II 环形链表 II
文章目录 Linked List Cycle II 环形链表 II问题描述:分析代码哈希快慢指针 Tag Linked List Cycle II 环形链表 II 问题描述: 给定一个链表的头节点 head ,返回链表开始入环的第一个节点。 如果链表无环,则返回 null。 如果链…...
蒸散发与植被总初级生产力估算
目标 熟悉蒸散发ET及其组分(植被蒸腾Ec、土壤蒸发Es、冠层截留Ei)、植被总初级生产力GPP的概念和碳水耦合的基本原理;掌握利用Python与ArcGIS工具进行课程相关的操作;熟练掌握国际上流行的Penman-Monteith模型,并能够…...
uniapp微信小程序底部弹窗自定义组件
基础弹窗效果组件 <template><view><viewclass"tui-actionsheet-class tui-actionsheet":class"[show ? tui-actionsheet-show : ]"><view class"regional-selection">底部弹窗</view></view><!-- 遮罩…...
人工智能的最新进展:2024年将会发生什么?
文章目录 2024年AI最新发展2024年AI具体应用2024年AI的具体预测 ✍创作者:全栈弄潮儿 🏡 个人主页: 全栈弄潮儿的个人主页 🏙️ 个人社区,欢迎你的加入:全栈弄潮儿的个人社区 📙 专栏地址&#…...
使用Golang实现一套流程可配置,适用于广告、推荐系统的业务性框架——组合应用
在《使用Golang实现一套流程可配置,适用于广告、推荐系统的业务性框架——简单应用》中,我们看到了各种组合Handler的组件,如HandlerGroup和Layer。这些组件下面的子模块又是不同组件,比如LayerCenter的子组件是Layer。如果此时我…...
DNS入门学习:DNS缓存的原理和作用(中科三方)
在实际业务场景中,DNS解析过程并不总是严格遵循从根域名服务器、顶级域名服务器再到权威域名服务器的一级级查询过程,这只是一个标准状态。为了节省全球查询的时间,同时减轻各级服务器的解析压力,DNS系统中引入了缓存机制。本文中…...
Linux虚拟机安装tomcat(图文详解)
目录 第一章、xshell工具和xftp的使用1.1)xshell下载与安装1.2)xshell连接1.3)xftp下载安装和连接 第二章、安装tomcat1.1)关闭防火墙,传输tomcat压缩包到Linux虚拟机12)启动tomcat 第一章、xshell工具和xf…...
Matlab对TMS320F28335编程--SVPWM配置互补PWM输出
前言 F28335中断 目的:FOC的核心算法及SVPWM输出,SVPWM的载波频率10kHz,SVPWM的每个周期都会触发ADC中断采集相电流,SVPWM为芯片ePWM4、5、6通道,配置死区 1、配置中断SVPWM进ADC中断,查上表知CPU1,PIE1 …...
MySQL数据库——多表操作
文章目录 前言多表关系一对一关系一对多/多对一关系多对多关系 外键约束创建外键约束插入数据删除带有外键约束的表的数据删除外键约束 多表联合查询数据准备交叉连接查询内连接查询外连接查询左外连接查询右外连接查询满外连接查询 子查询子查询关键字ALL 关键字ANY 和 SOME 关…...
Java版本spring cloud + spring boot企业电子招投标系统源代码 tbms
功能模块: 待办消息,招标公告,中标公告,信息发布 描述: 全过程数字化采购管理,打造从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通供应商门户具备内外协同的能力,为…...
老板与员工:分钟理解 Subagent 架构甘
一、项目背景与核心价值 1. 解决的核心痛点 Navicat的数据库连接密码并非明文存储,而是通过AES算法加密后写入.ncx格式的XML配置文件中。一旦用户忘记密码,常规方式只能重新配置连接,效率极低。本项目只作为学习研究使用,不做其他…...
告别环境冲突!用Anaconda虚拟环境搞定QGIS 3.18二次开发(附Pycharm代码补全修复)
告别环境冲突!用Anaconda虚拟环境搞定QGIS 3.18二次开发(附Pycharm代码补全修复) 当你在深夜调试QGIS插件时,突然发现昨天还能运行的脚本今天报了一堆依赖错误——这种场景对GIS开发者来说再熟悉不过了。环境冲突、版本不匹配、ID…...
【深入解析】数字电路核心组合逻辑芯片实战应用指南
1. 74系列组合逻辑芯片基础认知 第一次接触74系列芯片时,我盯着实验室抽屉里那些标着74HC138、74HC148的黑色小方块完全无从下手。直到导师扔给我一块面包板和几个LED灯,才真正理解这些芯片就像乐高积木里的基础模块——通过不同组合能搭建出千变万化的数…...
小白也能玩转AI视觉定位:Qwen2.5-VL Chord模型保姆级安装教程
小白也能玩转AI视觉定位:Qwen2.5-VL Chord模型保姆级安装教程 1. 前言:什么是视觉定位? 想象一下,你有一张全家福照片,想快速找到照片中穿红色衣服的表妹在哪里。传统方法可能需要你手动查看每个角落,而A…...
Llama-3.2V-11B-cot实战教程:自定义提示词模板提升CoT推理结构化程度
Llama-3.2V-11B-cot实战教程:自定义提示词模板提升CoT推理结构化程度 1. 工具概览与核心价值 Llama-3.2V-11B-cot是基于Meta最新多模态大模型开发的专业级视觉推理工具,特别针对双卡RTX 4090环境进行了深度优化。这个工具最突出的特点是实现了开箱即用…...
搜索引擎Elasticsearch
Elasticsearch:大数据时代的智能搜索利器 在信息爆炸的今天,如何快速、精准地检索海量数据成为企业和开发者的核心需求。Elasticsearch作为一款开源的分布式搜索引擎,凭借其高性能、可扩展性和易用性,成为全球范围内广泛应用的搜…...
如何快速掌握抗体序列分析:ANARCI完整入门指南
如何快速掌握抗体序列分析:ANARCI完整入门指南 【免费下载链接】ANARCI Antibody Numbering and Antigen Receptor ClassIfication 项目地址: https://gitcode.com/gh_mirrors/an/ANARCI 抗体序列编号是抗体研究和药物开发中的关键环节,而ANARCI&…...
无需Root!Termux+Samba三步搭建手机NAS,跨平台文件共享无忧
1. 为什么你需要手机NAS? 每次用微信传文件都要忍受压缩画质?电脑和手机互传文件还得找数据线?家里多台设备共享电影资源只能靠U盘来回倒腾?这些问题我都遇到过,直到发现用旧手机搭建NAS这个神器方案。最让我惊喜的是…...
FRCRN常见错误代码排查手册:从403 Forbidden到CUDA错误
FRCRN常见错误代码排查手册:从403 Forbidden到CUDA错误 部署和使用FRCRN进行语音降噪时,遇到各种报错是常有的事。这些错误信息往往让人一头雾水,从网络权限的“403 Forbidden”到让人头疼的CUDA问题,每一个都可能让你卡上半天。…...
Alibaba DASD-4B Thinking 对话工具在时序预测中的应用:结合LSTM模型的分析与报告生成
Alibaba DASD-4B Thinking 对话工具在时序预测中的应用:结合LSTM模型的分析与报告生成 1. 引言 想象一下这个场景:你的团队刚刚用LSTM模型跑完了下个季度的销量预测,屏幕上那条起伏的曲线清晰地告诉你,三月份会有一个销售高峰&a…...
