使用Linux部署Kafka教程
目录
一、部署Zookeeper
1 拉取Zookeeper镜像
2 运行Zookeeper
二、部署Kafka
1 拉取Kafka镜像
2 运行Kafka
三、验证是否部署成功
1 进入到kafka容器中
2 创建topic 生产者
3 生产者发送消息
4 消费者消费消息
四、搭建kafka管理平台
五、SpringBoot整合Kafka
1、导入依赖
2、修改配置
3、生产者
4、消费者
5、测试发送消息
6、测试收到消息
一、部署Zookeeper
1 拉取Zookeeper镜像
docker pull wurstmeister/zookeeper
- 1
2 运行Zookeeper
docker run --restart=always --name zookeeper \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2 \
-p 2181:2181 \
-v /etc/localtime:/etc/localtime \
-d wurstmeister/zookeeper
二、部署Kafka
1 拉取Kafka镜像
docker pull wurstmeister/kafka
2 运行Kafka
docker run --restart=always --name kafka \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2 \-p 9092:9092 \-e KAFKA_BROKER_ID=0 \-e KAFKA_ZOOKEEPER_CONNECT=192.168.8.102:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.8.102:9092 \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-v /etc/localtime:/etc/localtime \-d wurstmeister/kafka
参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka 配置zookeeper管理kafka的路径172.16.0.13:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
三、验证是否部署成功
1 进入到kafka容器中
docker exec -it kafka /bin/sh
2 创建topic 生产者
cd opt/kafka_2.13-2.8.1bin/kafka-topics.sh --create --zookeeper 192.168.8.102:2181 --replication-factor 1 --partitions 1 --topic partopic
3 生产者发送消息
bin/kafka-console-producer.sh --broker-list 192.168.8.102:9092 --topic partopic
4 消费者消费消息
- 新打开个ssh窗口
- 跟前面步骤一样进入到容器
bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.102:9092 --topic partopic --from-beginning
四、搭建kafka管理平台
docker search kafdrop
docker run -d --rm -p 9000:9000 \-e JVM_OPTS="-Xms32M -Xmx64M" \-e KAFKA_BROKERCONNECT=<host:port,host:port> \-e SERVER_SERVLET_CONTEXTPATH="/" \obsidiandynamics/kafdrop<host:port,host:port> 为 外网集群地址 多个用逗号分隔 例如xxx.xxx.xxx.xxx:9092,yyy.yyy.yyy.yyy:9092 尖角号不留上面的命令是百度的以下是我自己尝试的
docker run -d --name kafdrop -p 9001:9001 \-e JVM_OPTS="-Xms32M -Xmx64M -Dserver.port=9001" \-e KAFKA_BROKERCONNECT=192.168.58.130:9092 \-e SERVER_SERVLET_CONTEXTPATH="/" \obsidiandynamics/kafdrop因为我docker启动了其他东西占用了9001端口,而这个kafdrop其实就是一个springboot项目,以jar命令的形式启动
访问地址:Kafdrop: Broker List
五、SpringBoot整合Kafka
1、导入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2、修改配置
spring:kafka:bootstrap-servers: 192.168.58.130:9092 #部署linux的kafka的ip地址和端口号producer:# 发生错误后,消息重发的次数。retries: 1#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false
本次测试:linux地址:192.168.58.130
spring.kafka.bootstrap-servers=192.168.58.130:9092
advertised.listeners=192.168.58.130:9092
3、生产者
import com.alibaba.fastjson.JSON;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 事件的生产者*/
@Slf4j
@Component
public class KafkaProducer {@Autowiredpublic KafkaTemplate kafkaTemplate;/** 主题 */public static final String TOPIC_TEST = "Test";/** 消费者组 */public static final String TOPIC_GROUP = "test-consumer-group";public void send(Object obj){String obj2String = JSON.toJSONString(obj);log.info("准备发送消息为:{}",obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);//回调future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {//发送失败的处理log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {//成功的处理log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + result.toString());}});}}
4、消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;/*** 事件消费者*/
@Component
public class KafkaConsumer {private Logger logger = LoggerFactory.getLogger(org.apache.kafka.clients.consumer.KafkaConsumer.class);@KafkaListener(topics = KafkaProducer.TOPIC_TEST,groupId = KafkaProducer.TOPIC_GROUP)public void topicTest(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional<?> message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);ack.acknowledge();}}
}
5、测试发送消息
@Testvoid kafkaTest(){kafkaProducer.send("Hello Kafka");}
6、测试收到消息
相关文章:

使用Linux部署Kafka教程
目录 一、部署Zookeeper 1 拉取Zookeeper镜像 2 运行Zookeeper 二、部署Kafka 1 拉取Kafka镜像 2 运行Kafka 三、验证是否部署成功 1 进入到kafka容器中 2 创建topic 生产者 3 生产者发送消息 4 消费者消费消息 四、搭建kafka管理平台 五、SpringBoot整合Kafka 1…...

pyechart笔记:opts.AxisOpts
定制化图表的轴线(x轴和y轴)的样式和设置 0 不设置坐标轴 c1(Bar().add_xaxis([力量,智力,敏捷]).add_yaxis(全能骑士,# 系列名称,用于 tooltip 的显示,legend 的图例筛选。[429,321,296],#系列数据).add_yaxis(猴子,[352,236,4…...
深度思考rpc框架面经之五:rpc熔断限流、rpc复用连接机制
11 RPC框架如何实现限流和熔断 推荐文章:RPC实现原理之核心技术-限流熔断 11.1 为什么Dubbo要做服务的限流?(根本原因是服务端进行自我保护) 限流是一种常见的系统保护手段。在分布式系统和微服务架构中,一个接口的过度使用可能会导致资源…...
Go 数组
数组用于在单个变量中存储相同类型的多个值,而不是为每个值声明单独的变量。 声明数组 在Go中,有两种声明数组的方式: 使用var关键字: 语法 var array_name [length]datatype{values} // 这里定义了长度 或者 var array_n…...

04架构管理之分支管理实践-一种git分支管理最佳实践
专栏说明:针对于企业的架构管理岗位,分享架构管理岗位的职责,工作内容,指导架构师如何完成架构管理工作,完成架构师到架构管理者的转变。计划以10篇博客阐述清楚架构管理工作,专栏名称:架构管理…...

D.OASIS City 和 Warrix 在The Sandbox 庆祝 Rise of the 10th Legend十周年
D.OASIS 首次展示了变革性娱乐 D.OASIS City,正如它与 WARRIX 一起承诺的那样。WARRIX 是获得泰国国家队球衣生产授权的标志性运动服装品牌。 这款激动人心的游戏冒险游戏于今天推出,让用户能够投入 D.OASIS City x WARRIX:Rise of the 10th…...

Git基本操作(Idea版)
第一次发布项目(本地->远程) 方式一 通过push的方式推送本地库到远程库(远程已创建好仓库) 这种方式需要提前创建好仓库。 右键点击项目,可以将当前分支的内容 push 到 GitHub 的远程仓库中。 注意:…...

NSS [羊城杯 2020]easyser
NSS [羊城杯 2020]easyser 开题。很容易让人觉得环境坏了。 不要慌,无从下手时。看源码、扫目录、抓包。一套操作下来,发现几个可以下手的路由。 /index.php /robots.txt 访问 /star1.php,一说到百度,就猜测是否存在SSRF。 源码中…...

理解底层— —Golang的log库,二开实现自定义Logger
理解底层— —Golang的log库,实现自定义Logger 1 分析实现思路 基于golang中自带的log库实现:对日志实现设置日志级别,每天生成一个文件,同时添加上前缀以及展示文件名等 日志级别,通过添加prefix:[INFO]、…...

RabbitMQ---Spring AMQP
Spring AMQP 1. 简介 Spring有很多不同的项目,其中就有对AMQP的支持: Spring AMQP的页面:http://spring.io/projects/spring-amqp 注意这里一段描述: Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协…...

C语言练习题解析:挑战与突破,开启编程新篇章!(2)
💓博客主页:江池俊的博客⏩收录专栏:C语言刷题专栏👉专栏推荐:✅C语言初阶之路 ✅C语言进阶之路💻代码仓库:江池俊的代码仓库🎉欢迎大家点赞👍评论📝收藏⭐ 文…...
sqlite3 加密访问
关于sqlite3 加密 一、相关加密用到的sqlcipher 1.1 sqlcipher 是一个数据库加密的开源库 sqlcipher开源地址 我这边是使用的docker镜像,镜像地址: https://hub.docker.com/r/pallocchi/sqlcipher 加密格式 docker run -v <workdir>:/sqlcip…...
clickhouse 系列1:clickhouse v21.7.5.29 源码编译
1.gcc10安装 安装依赖 yum update yum install -y gcc gcc-c++ yum install -y bzip2 下载gcc 源码包并解压 wget -P /data/base https://mirrors.aliyun.com/gnu/gcc/gcc-10.2.0/gcc-10.2.0.tar.gz cd /data/base && tar -xzvf /data/base/gcc-...

servlet初体验之环境搭建!!!
我们需要用到tomcat服务器,咩有下载的小伙伴看过来:如何正确下载tomcat???_明天更新的博客-CSDN博客 1. 创建普通的Java项目,并在项目中创建libs目录存放第三方的jar包。 建立普通项目 创建libs目录存放第三…...
宁芝 NIZ 键盘开机需要重新插拔 USB 线才能使用
宁芝 NIZ 键盘开机需要重新插拔 USB 线才能使用 问题描述 宁芝 NIZ 键盘开机后无法识别到键盘,需要重新插拔 USB 线才能使用。 解决方法 按住 Fn BackSpaceE 键 5 秒,键盘会切换模式, 状态灯闪 1 次为 USB 接口;状态灯闪 2 次为 PS / 2 …...
R编程教程_编程入门自学教程_菜鸟教程-免费教程分享
教程简介 R是用于统计分析、绘图的语言和操作环境。R是属于GNU系统的一个自由、免费、源代码开放的软件,它是一个用于统计计算和统计制图的优秀工具。R语言的核心是解释计算机语言,其允许分支和循环以及使用函数的模块化编程。 R语言允许与以Cÿ…...
[CMake教程] CMake列表 - list
目录 零、简介一、Reading二、Search三、Modification四、Ordering 零、简介 列表在CMake中大量使用。初始化列表语法如下: set(myList a b c) # Creates the list "a;b;c"归根结底,列表只是一个由分号分隔列表项的单个字符串,这…...

报错 - net::ERR_ABORTED 500 (Internal Server Error)
报错:net::ERR_ABORTED 500 (Internal Server Error) 根据提示找到对应文件 解决:检查代码,根据高亮颜色判断,发现箭头函数漏了一个>。 报错:Uncaught TypeError: Assignment to constant variable. 原因&#x…...
【Java Easypoi Apache poi】 Word导入与导出
引入依赖 <dependency><groupId>cn.afterturn</groupId><artifactId>easypoi-spring-boot-starter</artifactId> </dependency> <!-- 下面的版本需要对应上面依赖中的版本 否则可能会起冲突 --> <!-- 下面的依赖主要是为了使用A…...

Java稀疏数组
目录 1.稀疏数组 2.稀疏数组的使用 2.1 二维数组转换为稀疏数组 2.2 稀疏数组转换为二维数组 1.稀疏数组 稀疏数组(Sparse Array):当一个数组中的大部分元素为相同的值,可使用稀疏数组来保存该数组,可以将稀疏数组…...
云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?
大家好,欢迎来到《云原生核心技术》系列的第七篇! 在上一篇,我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在,我们就像一个拥有了一块崭新数字土地的农场主,是时…...
Ubuntu系统下交叉编译openssl
一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机:Ubuntu 20.04.6 LTSHost:ARM32位交叉编译器:arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...

黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 
Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

【Java_EE】Spring MVC
目录 Spring Web MVC 编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 编辑参数重命名 RequestParam 编辑编辑传递集合 RequestParam 传递JSON数据 编辑RequestBody …...

NLP学习路线图(二十三):长短期记忆网络(LSTM)
在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

排序算法总结(C++)
目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指:同样大小的样本 **(同样大小的数据)**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别
【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而,传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案,能够实现大范围覆盖并远程采集数据。尽管具备这些优势…...
Caliper 配置文件解析:fisco-bcos.json
config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...

pgsql:还原数据库后出现重复序列导致“more than one owned sequence found“报错问题的解决
问题: pgsql数据库通过备份数据库文件进行还原时,如果表中有自增序列,还原后可能会出现重复的序列,此时若向表中插入新行时会出现“more than one owned sequence found”的报错提示。 点击菜单“其它”-》“序列”,…...