使用Linux部署Kafka教程
目录
一、部署Zookeeper
1 拉取Zookeeper镜像
2 运行Zookeeper
二、部署Kafka
1 拉取Kafka镜像
2 运行Kafka
三、验证是否部署成功
1 进入到kafka容器中
2 创建topic 生产者
3 生产者发送消息
4 消费者消费消息
四、搭建kafka管理平台
五、SpringBoot整合Kafka
1、导入依赖
2、修改配置
3、生产者
4、消费者
5、测试发送消息
6、测试收到消息
一、部署Zookeeper
1 拉取Zookeeper镜像
docker pull wurstmeister/zookeeper
- 1
2 运行Zookeeper
docker run --restart=always --name zookeeper \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2 \
-p 2181:2181 \
-v /etc/localtime:/etc/localtime \
-d wurstmeister/zookeeper
二、部署Kafka
1 拉取Kafka镜像
docker pull wurstmeister/kafka
2 运行Kafka
docker run --restart=always --name kafka \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2 \-p 9092:9092 \-e KAFKA_BROKER_ID=0 \-e KAFKA_ZOOKEEPER_CONNECT=192.168.8.102:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.8.102:9092 \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-v /etc/localtime:/etc/localtime \-d wurstmeister/kafka
参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka 配置zookeeper管理kafka的路径172.16.0.13:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
三、验证是否部署成功
1 进入到kafka容器中
docker exec -it kafka /bin/sh
2 创建topic 生产者
cd opt/kafka_2.13-2.8.1bin/kafka-topics.sh --create --zookeeper 192.168.8.102:2181 --replication-factor 1 --partitions 1 --topic partopic

3 生产者发送消息
bin/kafka-console-producer.sh --broker-list 192.168.8.102:9092 --topic partopic

4 消费者消费消息
- 新打开个ssh窗口
- 跟前面步骤一样进入到容器
bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.102:9092 --topic partopic --from-beginning

四、搭建kafka管理平台
docker search kafdrop

docker run -d --rm -p 9000:9000 \-e JVM_OPTS="-Xms32M -Xmx64M" \-e KAFKA_BROKERCONNECT=<host:port,host:port> \-e SERVER_SERVLET_CONTEXTPATH="/" \obsidiandynamics/kafdrop<host:port,host:port> 为 外网集群地址 多个用逗号分隔 例如xxx.xxx.xxx.xxx:9092,yyy.yyy.yyy.yyy:9092 尖角号不留上面的命令是百度的以下是我自己尝试的
docker run -d --name kafdrop -p 9001:9001 \-e JVM_OPTS="-Xms32M -Xmx64M -Dserver.port=9001" \-e KAFKA_BROKERCONNECT=192.168.58.130:9092 \-e SERVER_SERVLET_CONTEXTPATH="/" \obsidiandynamics/kafdrop因为我docker启动了其他东西占用了9001端口,而这个kafdrop其实就是一个springboot项目,以jar命令的形式启动
访问地址:Kafdrop: Broker List

五、SpringBoot整合Kafka
1、导入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2、修改配置
spring:kafka:bootstrap-servers: 192.168.58.130:9092 #部署linux的kafka的ip地址和端口号producer:# 发生错误后,消息重发的次数。retries: 1#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false
本次测试:linux地址:192.168.58.130
spring.kafka.bootstrap-servers=192.168.58.130:9092
advertised.listeners=192.168.58.130:9092
3、生产者
import com.alibaba.fastjson.JSON;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 事件的生产者*/
@Slf4j
@Component
public class KafkaProducer {@Autowiredpublic KafkaTemplate kafkaTemplate;/** 主题 */public static final String TOPIC_TEST = "Test";/** 消费者组 */public static final String TOPIC_GROUP = "test-consumer-group";public void send(Object obj){String obj2String = JSON.toJSONString(obj);log.info("准备发送消息为:{}",obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);//回调future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {//发送失败的处理log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {//成功的处理log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + result.toString());}});}}
4、消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;/*** 事件消费者*/
@Component
public class KafkaConsumer {private Logger logger = LoggerFactory.getLogger(org.apache.kafka.clients.consumer.KafkaConsumer.class);@KafkaListener(topics = KafkaProducer.TOPIC_TEST,groupId = KafkaProducer.TOPIC_GROUP)public void topicTest(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional<?> message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);ack.acknowledge();}}
}
5、测试发送消息
@Testvoid kafkaTest(){kafkaProducer.send("Hello Kafka");}
6、测试收到消息
![]()
相关文章:
使用Linux部署Kafka教程
目录 一、部署Zookeeper 1 拉取Zookeeper镜像 2 运行Zookeeper 二、部署Kafka 1 拉取Kafka镜像 2 运行Kafka 三、验证是否部署成功 1 进入到kafka容器中 2 创建topic 生产者 3 生产者发送消息 4 消费者消费消息 四、搭建kafka管理平台 五、SpringBoot整合Kafka 1…...
pyechart笔记:opts.AxisOpts
定制化图表的轴线(x轴和y轴)的样式和设置 0 不设置坐标轴 c1(Bar().add_xaxis([力量,智力,敏捷]).add_yaxis(全能骑士,# 系列名称,用于 tooltip 的显示,legend 的图例筛选。[429,321,296],#系列数据).add_yaxis(猴子,[352,236,4…...
深度思考rpc框架面经之五:rpc熔断限流、rpc复用连接机制
11 RPC框架如何实现限流和熔断 推荐文章:RPC实现原理之核心技术-限流熔断 11.1 为什么Dubbo要做服务的限流?(根本原因是服务端进行自我保护) 限流是一种常见的系统保护手段。在分布式系统和微服务架构中,一个接口的过度使用可能会导致资源…...
Go 数组
数组用于在单个变量中存储相同类型的多个值,而不是为每个值声明单独的变量。 声明数组 在Go中,有两种声明数组的方式: 使用var关键字: 语法 var array_name [length]datatype{values} // 这里定义了长度 或者 var array_n…...
04架构管理之分支管理实践-一种git分支管理最佳实践
专栏说明:针对于企业的架构管理岗位,分享架构管理岗位的职责,工作内容,指导架构师如何完成架构管理工作,完成架构师到架构管理者的转变。计划以10篇博客阐述清楚架构管理工作,专栏名称:架构管理…...
D.OASIS City 和 Warrix 在The Sandbox 庆祝 Rise of the 10th Legend十周年
D.OASIS 首次展示了变革性娱乐 D.OASIS City,正如它与 WARRIX 一起承诺的那样。WARRIX 是获得泰国国家队球衣生产授权的标志性运动服装品牌。 这款激动人心的游戏冒险游戏于今天推出,让用户能够投入 D.OASIS City x WARRIX:Rise of the 10th…...
Git基本操作(Idea版)
第一次发布项目(本地->远程) 方式一 通过push的方式推送本地库到远程库(远程已创建好仓库) 这种方式需要提前创建好仓库。 右键点击项目,可以将当前分支的内容 push 到 GitHub 的远程仓库中。 注意:…...
NSS [羊城杯 2020]easyser
NSS [羊城杯 2020]easyser 开题。很容易让人觉得环境坏了。 不要慌,无从下手时。看源码、扫目录、抓包。一套操作下来,发现几个可以下手的路由。 /index.php /robots.txt 访问 /star1.php,一说到百度,就猜测是否存在SSRF。 源码中…...
理解底层— —Golang的log库,二开实现自定义Logger
理解底层— —Golang的log库,实现自定义Logger 1 分析实现思路 基于golang中自带的log库实现:对日志实现设置日志级别,每天生成一个文件,同时添加上前缀以及展示文件名等 日志级别,通过添加prefix:[INFO]、…...
RabbitMQ---Spring AMQP
Spring AMQP 1. 简介 Spring有很多不同的项目,其中就有对AMQP的支持: Spring AMQP的页面:http://spring.io/projects/spring-amqp 注意这里一段描述: Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协…...
C语言练习题解析:挑战与突破,开启编程新篇章!(2)
💓博客主页:江池俊的博客⏩收录专栏:C语言刷题专栏👉专栏推荐:✅C语言初阶之路 ✅C语言进阶之路💻代码仓库:江池俊的代码仓库🎉欢迎大家点赞👍评论📝收藏⭐ 文…...
sqlite3 加密访问
关于sqlite3 加密 一、相关加密用到的sqlcipher 1.1 sqlcipher 是一个数据库加密的开源库 sqlcipher开源地址 我这边是使用的docker镜像,镜像地址: https://hub.docker.com/r/pallocchi/sqlcipher 加密格式 docker run -v <workdir>:/sqlcip…...
clickhouse 系列1:clickhouse v21.7.5.29 源码编译
1.gcc10安装 安装依赖 yum update yum install -y gcc gcc-c++ yum install -y bzip2 下载gcc 源码包并解压 wget -P /data/base https://mirrors.aliyun.com/gnu/gcc/gcc-10.2.0/gcc-10.2.0.tar.gz cd /data/base && tar -xzvf /data/base/gcc-...
servlet初体验之环境搭建!!!
我们需要用到tomcat服务器,咩有下载的小伙伴看过来:如何正确下载tomcat???_明天更新的博客-CSDN博客 1. 创建普通的Java项目,并在项目中创建libs目录存放第三方的jar包。 建立普通项目 创建libs目录存放第三…...
宁芝 NIZ 键盘开机需要重新插拔 USB 线才能使用
宁芝 NIZ 键盘开机需要重新插拔 USB 线才能使用 问题描述 宁芝 NIZ 键盘开机后无法识别到键盘,需要重新插拔 USB 线才能使用。 解决方法 按住 Fn BackSpaceE 键 5 秒,键盘会切换模式, 状态灯闪 1 次为 USB 接口;状态灯闪 2 次为 PS / 2 …...
R编程教程_编程入门自学教程_菜鸟教程-免费教程分享
教程简介 R是用于统计分析、绘图的语言和操作环境。R是属于GNU系统的一个自由、免费、源代码开放的软件,它是一个用于统计计算和统计制图的优秀工具。R语言的核心是解释计算机语言,其允许分支和循环以及使用函数的模块化编程。 R语言允许与以Cÿ…...
[CMake教程] CMake列表 - list
目录 零、简介一、Reading二、Search三、Modification四、Ordering 零、简介 列表在CMake中大量使用。初始化列表语法如下: set(myList a b c) # Creates the list "a;b;c"归根结底,列表只是一个由分号分隔列表项的单个字符串,这…...
报错 - net::ERR_ABORTED 500 (Internal Server Error)
报错:net::ERR_ABORTED 500 (Internal Server Error) 根据提示找到对应文件 解决:检查代码,根据高亮颜色判断,发现箭头函数漏了一个>。 报错:Uncaught TypeError: Assignment to constant variable. 原因&#x…...
【Java Easypoi Apache poi】 Word导入与导出
引入依赖 <dependency><groupId>cn.afterturn</groupId><artifactId>easypoi-spring-boot-starter</artifactId> </dependency> <!-- 下面的版本需要对应上面依赖中的版本 否则可能会起冲突 --> <!-- 下面的依赖主要是为了使用A…...
Java稀疏数组
目录 1.稀疏数组 2.稀疏数组的使用 2.1 二维数组转换为稀疏数组 2.2 稀疏数组转换为二维数组 1.稀疏数组 稀疏数组(Sparse Array):当一个数组中的大部分元素为相同的值,可使用稀疏数组来保存该数组,可以将稀疏数组…...
【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
R语言AI模型部署方案:精准离线运行详解
R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
uniapp微信小程序视频实时流+pc端预览方案
方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度WebSocket图片帧定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐RTMP推流TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...
【论文阅读28】-CNN-BiLSTM-Attention-(2024)
本文把滑坡位移序列拆开、筛优质因子,再用 CNN-BiLSTM-Attention 来动态预测每个子序列,最后重构出总位移,预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵(S…...
.Net Framework 4/C# 关键字(非常用,持续更新...)
一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...
Linux离线(zip方式)安装docker
目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1:修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本:CentOS 7 64位 内核版本:3.10.0 相关命令: uname -rcat /etc/os-rele…...
Linux nano命令的基本使用
参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时,显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...
CppCon 2015 学习:Time Programming Fundamentals
Civil Time 公历时间 特点: 共 6 个字段: Year(年)Month(月)Day(日)Hour(小时)Minute(分钟)Second(秒) 表示…...
