kafka安装说明以及在项目中使用
一、window 安装
1.1、下载安装包
- 下载kafka 地址,其中官方版内置zk, kafka_2.12-3.4.0.tgz
- 其中这个名称的意思是 kafka3.4.0 版本 ,所用语言 scala 版本为 2.12

1.2、安装配置
1、解压刚刚下载的配置文件,解压后如下,其中 data和kafka-logs 这两个文件是没有的

2、修改配置:进入到config目录,
- 修改service.properties里面log.dirs路径未 log.dirs=F:\kafka\installSurround\kafka3.4.0\kafka-logs,该目录是kafka的数据存储目录

- 修改zookeeper.properties里面dataDir路径为 dataDir=F:\kafka\installSurround\kafka3.4.0\data,该目录是 zookeeper存储的kafka的数据目录

3、server.properties说明
| 属性 | 说明 |
|---|---|
| log.dirs | 指定Broker需要使用的若干个文件目录路径,没有默认值,必须指定。在生产环境中一定要为log.dirs配置多个路径,如果条件允许,需要保证目录被挂载到不同的物理磁盘上。优势在于,提升读写性能,多块物理磁盘同时读写数据具有更高的吞吐量;能够实现故障转移(Failover),Kafka 1.1版本引入Failover功能,坏掉磁盘上的数据会自动地转移到其它正常的磁盘上,而且Broker还能正常工作,基于Failover机制,Kafka可以舍弃RAID方案。 |
| zookeeper.connect | CS格式参数,可以指定值为zk1:2181,zk2:2181,zk3:2181,不同Kafka集群可以指定:zk1:2181,zk2:2181,zk3:2181/kafka1,chroot只需要写一次。 |
| listeners | 设置内网访问Kafka服务的监听器。 |
| advertised.listeners | 设置外网访问Kafka服务的监听器。 |
| auto.create.topics.enable | 是否允许自动创建Topic。 |
| unclean.leader.election.enable | 是否允许Unclean Leader 选举。 |
| auto.leader.rebalance.enable | 是否允许定期进行Leader选举,生产环境中建议设置成false。 |
| log.retention.{hours | minutes |
| log.retention.bytes | 指定Broker为消息保存的总磁盘容量大小。message.max.bytes:控制Broker能够接收的最大消息大小。 |
1.3、启动
1、 启动脚本都在bin目录的window目录下,一定要先启动 zookeeper,再启动kafka
如果是linux,不使用window下的命令即可,使用对应的 xxxx.sh 即可
2、首先启动zookeeper
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

3、在启动kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties

二、linux 安装
暂略
三、docker 安装
暂略
四、docker 安装
暂略
五、命令行使用
5.1、topic 命令
1、关于topic,这里用window 来示例
bin\windows\kafka-topics.bat

2、创建 first topic,五个分区,1个副本
bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --partitions 5 --replication-factor 1 --topic first

3、查看当前服务器中的所有 topic
bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

4、查看 first 主题的详情
bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic first

5、修改分区数**(注意:分区数只能增加,不能减少)**
bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic first --partitions 6

6、删除 topic,该操作在winodw,会出现文件授权问题,日志可以在kafka的启动命令窗口中查看,只需要修改文件权限即可,如果出现这个问题,我们需要清空之前配置的 data和kafka-logs 这两个文件中的内容,再次重新启动即可。
bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic first

5.2、生产者命令行操作
1、关于查看操作生产者命令参数,这里用window 来示例
.\bin\windows\kafka-console-producer.bat

2、发送消息,这里发送了2次的数据,第一次是hello,第二次是world
.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic first

5.3、消费者命令行操作
1、关于查看操作生产者命令参数,这里用window 来示例
.\bin\windows\kafka-console-consumer.bat


2、接受消息,因为前面我们在发送消息的时候,消费者没有启动,所以第一次发的数据这里是收不到的,并没有存储到topic中
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic first


3、把主题中所有的数据都读取出来(包括历史数据),可以看到我们获取到了从消费者没有上线之前到上线之后的所有数据,一共6条。
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic first

5.4、脚本说明
| 项目 | Value |
|---|---|
| connect-standalone.sh | 用于启动单节点的Standalone模式的Kafka Connect组件。 |
| connect-distributed.sh | 用于启动多节点的Distributed模式的Kafka Connect组件。 |
| kafka-acls.sh | 脚本用于设置Kafka权限,比如设置哪些用户可以访问Kafka的哪些TOPIC的权限。 |
| kafka-delegation-tokens.sh | 用于管理Delegation Token。基于Delegation Token的认证是一种轻量级的认证机制,是对SASL认证机制的补充。 |
| kafka-topics.sh | 用于管理所有TOPIC。 |
| kafka-console-producer.sh | 用于生产消息。 |
| kafka-console-consumer.sh | 用于消费消息。 |
| kafka-producer-perf-test.sh | 用于生产者性能测试。 |
| kafka-consumer-perf-test.sh | 用于消费者性能测试。 |
| kafka-delete-records.sh | 用于删除Kafka的分区消息,由于Kafka有自己的自动消息删除策略,使用率不高。 |
| kafka-dump-log.sh | 用于查看Kafka消息文件的内容,包括消息的各种元数据信息、消息体数据。 |
| kafka-log-dirs.sh | 用于查询各个Broker上的各个日志路径的磁盘占用情况。 |
| kafka-mirror-maker.sh | 用于在Kafka集群间实现数据镜像。 |
| kafka-preferred-replica-election.sh | 用于执行Preferred Leader选举,可以为指定的主题执行更换Leader的操作。 |
| kafka-reassign-partitions.sh | 用于执行分区副本迁移以及副本文件路径迁移。 |
| kafka-run-class.sh | 用于执行任何带main方法的Kafka类。 |
| kafka-server-start.sh | 用于启动Broker进程。 |
| kafka-server-stop.sh | 用于停止Broker进程。 |
| kafka-streams-application-reset.sh | 用于给Kafka Streams应用程序重设位移,以便重新消费数据。 |
| kafka-verifiable-producer.sh | 用于测试验证生产者的功能。 |
| kafka-verifiable-consumer.sh | 用于测试验证消费者功能。 |
| trogdor.sh | 是Kafka的测试框架,用于执行各种基准测试和负载测试。 |
| kafka-broker-api-versions.sh | 脚本主要用于验证不同Kafka版本之间服务器和客户端的适配性 |
5.5、关闭kafka
1、一定要先关闭 kafka,再关闭zookeeper,否则容易出现数据错乱
如果出现数据错错乱,最简单的方法就是清空data和kafka-logs 这两个文件下的内容,重新启动即可
2、关闭
.\bin\windows\kafka-server-stop.bat
.\bin\windows\zookeeper-server-stop.bat

5.6、选择分区数及kafka性能测试
1、主要工具是 kafka-producer-perf-test.bat 和 kafka-consumer-perf-test.bat 两个脚本,可以参考 kafka如何选择分区数及kafka性能测试
六、java 使用
6.1、使用原生客户端
1、依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency>
2、发送和消费消息,具体代码如下:
public class KafkaConfig {public static void main(String[] args) {// 声明主题String topic = "first";// 创建消费者Properties consumerConfig = new Properties();consumerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"boot-kafka");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig);// 订阅主题并循环拉取消息kafkaConsumer.subscribe(Arrays.asList(topic));new Thread(new Runnable() {@Overridepublic void run() {while (true){ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));for(ConsumerRecord<String, String> record:records){System.out.println(record.value());}}}}).start();// 创建生产者Properties producerConfig = new Properties();producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG,"boot-kafka-client");producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer producer = new KafkaProducer<>(producerConfig);// 给主题发送消息producer.send(new ProducerRecord<>(topic, "hello,"+System.currentTimeMillis()));}
}
6.2、使用springBoot
1、依赖
<!-- 不使用kafka的原始客户端,使用spring集成的,这样比较方便 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><!-- 可以不用指定,springBoot 会帮我们选择,如果有特殊需求,可以更改 --><!-- <version>3.0.2</version>--></dependency>
2、配置文件
server:port: 7280servlet:context-path: /thermal-emqx2kafkashutdown: gracefulspring:application:name: thermal-api-demonstration-tdenginelifecycle:timeout-per-shutdown-phase: 30smvc:pathmatch:matching-strategy: ant_path_matcher # 不然spring boot 2.6以后的版本 和 swagger 会出现 问题,可以参考 https://blog.csdn.net/qq_41027259/article/details/125747298kafka:bootstrap-servers: 127.0.0.1:9092 # 192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094 连接的 Kafka Broker 主机名称和端口号#properties.key-serializer: # 用于配置客户端的附加属性,对于生产者和消费者都是通用的,。 org.apache.kafka.common.serialization.StringSerializerproducer: # 生产者retries: 3 # 重试次数#acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)#batch-size: 16384 # 一次最多发送数据量#buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # 消费者group-id: test-consumer-group #默认的消费组ID,在Kafka的/config/consumer.properties中查看和修改#enable-auto-commit: true # 是否自动提交offset#auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)#auto-offset-reset: latest #earliest,latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3、发送消息
package cn.jt.thermalemqx2kafka.kafka.controller;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;/*** @author GXM* @version 1.0.0* @Description TODO* @createTime 2023年08月17日*/
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/mock")public String sendKafkaMessage() {Map<String, Object> data = new HashMap<>(2);data.put("id", 1);data.put("name", "gkj");kafkaTemplate.send("first", JSON.toJSONString(data));return "ok";}
}
4、接受消息
package cn.jt.thermalemqx2kafka.kafka.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author GXM* @version 1.0.0* @Description TODO* @createTime 2023年08月17日*/
@Slf4j
@Component
public class KafkaListener {@org.springframework.kafka.annotation.KafkaListener(topics = "first")private void handler(String content) {log.info("consumer received: {} ", content);}
}相关文章:
kafka安装说明以及在项目中使用
一、window 安装 1.1、下载安装包 下载kafka 地址,其中官方版内置zk, kafka_2.12-3.4.0.tgz其中这个名称的意思是 kafka3.4.0 版本 ,所用语言 scala 版本为 2.12 1.2、安装配置 1、解压刚刚下载的配置文件,解压后如下&#x…...
二叉树搜索
✅<1>主页:我的代码爱吃辣📃<2>知识讲解:数据结构——二叉搜索树☂️<3>开发环境 :Visual Studio 2022💬<4>前言:在之前的我们已经学过了普通二叉树,了解了基本的二叉树…...
【先进PID控制算法(ADRC,TD,ESO)加入永磁同步电机发电控制仿真模型研究(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
k8s集群生产环境的问题处理
2 k8s上的服务均无法访问 执行命令kubectl get pods -ALL,k8s集群中的服务均是running状态 1 kuboard 网页无法访问 kuboard无法通过浏览器访问,但是查看端口是被占用的...
serve : 无法将“serve”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。
1、在学习webpack打包的时候,需要 serve用来启动开发服务器来部署代码查看效果的。安装完之后运行出现以下错误: 2、使用命令查看安装目录: npm list -g我们已经安装过了 3、解决: 我们看到上图路径在:C:\Users\qiy…...
【LVS】2、部署LVS-DR群集
LVS-DR数据包的流向分析 1.客户端发送请求到负载均衡器,请求的数据报文到达内核空间; 2.负载均衡服务器和正式服务器在同一个网络中,数据通过二层数据链路层来传输; 3.内核空间判断数据包的目标IP是本机VIP,此时IP虚…...
设计模式 -- 单例模式(传统面向对象与JavaScript 的对比实现)
单例模式 – 传统面向对象与JavaScript 的对比实现 文章目录 单例模式 -- 传统面向对象与JavaScript 的对比实现传统的面向对象的实现定义实现思路初级实现缺点 透明的单例模式实现目的(实现效果)实现缺点 用代理实现单例模式优点 JavaScript 中的单例模…...
YOLOX算法调试记录
YOLOX是在YOLOv3基础上改进而来,具有与YOLOv5相媲美的性能,其模型结构如下: 由于博主只是要用YOLOX做对比试验,因此并不需要对模型的结构太过了解。 先前博主调试过YOLOv5,YOLOv7,YOLOv8,相比而言,YOLOX的环…...
基于小程序的汽车俱乐部系统的设计与实现(论文+源码)_kaic
目录 前 言 1 系统概述 1.1 系统主要功能 1.2 开发及运行环境 2 系统分析和总体设计 2.1 需求分析 2.2 可行性分析 2.3 设计目标 2.4 项目规划 2.5 系统开发语言简介 2.6 系统功能模块图 3 系统数据库设计 3.1 数据库开发工具简介 3.2 数据库需求分析 3.3 数据库…...
ProgrammingArduino物联网
programming_arduino_ed2 IO 延时闪灯 void setup() {pinMode(13, OUTPUT); }void loop() {digitalWrite(13, HIGH);delay(500);digitalWrite(13, LOW);delay(500); }// sketch 03-02 加入变量 int ledPin 13; int delayPeriod 500;void setup() {pinMode(ledPin, OUTPUT)…...
SSM框架的学习与应用(Spring + Spring MVC + MyBatis)-Java EE企业级应用开发学习记录(第一天)Mybatis的学习
SSM框架的学习与应用(Spring Spring MVC MyBatis)-Java EE企业级应用开发学习记录(第一天)Mybatis的学习 一、当前的主流框架介绍(这就是后期我会发出来的框架学习) Spring框架 Spring是一个开源框架,是为了解决企业应用程序开发复杂…...
Programming abstractions in C阅读笔记: p118-p122
《Programming Abstractions In C》学习第49天,p118-p122,总结如下: 一、技术总结 1.随机数 (1)seed p119,“The initial value–the value that is used to get the entire process start–is call a seed for the random ge…...
2023国赛数学建模思路 - 案例:ID3-决策树分类算法
文章目录 0 赛题思路1 算法介绍2 FP树表示法3 构建FP树4 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 算法介绍 FP-Tree算法全称是FrequentPattern Tree算法,就是频繁模…...
selenium 选定ul-li下拉选项中某个指定选项
场景:selenium的下拉选项是ul-li模式,选定某个指定的选项。 from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # 显示等待def select_li(self, text, *ul_locator):"…...
回归预测 | MATLAB实现FA-SVM萤火虫算法优化支持向量机多输入单输出回归预测(多指标,多图)
回归预测 | MATLAB实现FA-SVM萤火虫算法优化支持向量机多输入单输出回归预测(多指标,多图) 目录 回归预测 | MATLAB实现FA-SVM萤火虫算法优化支持向量机多输入单输出回归预测(多指标,多图)效果一览基本介绍…...
使用pytorch 的Transformer进行中英文翻译训练
下面是一个使用torch.nn.Transformer进行序列到序列(Sequence-to-Sequence)的机器翻译任务的示例代码,包括数据加载、模型搭建和训练过程。 import torch import torch.nn as nn from torch.nn import Transformer from torch.utils.data im…...
解决element的select组件创建新的选项可多选且opitions数据源中有数据的情况下,回车不能自动选中创建的问题
前言 最近开发项目使用element-plus库内的select组件,其中有提供一个创建新的选项的用法,但是发现一些小问题,在此记录 版本 “element-plus”: “^2.3.9”, “vue”: “^3.3.4”, 问题 1、在options数据源中无数据的时候,在输入框…...
人工智能大模型加速数据库存储模型发展 行列混合存储下的破局
数据存储模型 专栏内容: postgresql内核源码分析手写数据库toadb并发编程toadb开源库 个人主页:我的主页 座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物. 概述 在数据库的发展过程中,关…...
K8S用户管理体系介绍
1 K8S账户体系介绍 在k8s中,有两类用户,service account和user,我们可以通过创建role或clusterrole,再将账户和role或clusterrole进行绑定来给账号赋予权限,实现权限控制,两类账户的作用如下。 server acc…...
实现chatGPT 聊天样式
效果图 代码: <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Chat Example</title&g…...
多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
Fabric V2.5 通用溯源系统——增加图片上传与下载功能
fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...
RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill
视觉语言模型(Vision-Language Models, VLMs),为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展,机器人仍难以胜任复杂的长时程任务(如家具装配),主要受限于人…...
GO协程(Goroutine)问题总结
在使用Go语言来编写代码时,遇到的一些问题总结一下 [参考文档]:https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现: 今天在看到这个教程的时候,在自己的电…...
数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !
我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...
深度剖析 DeepSeek 开源模型部署与应用:策略、权衡与未来走向
在人工智能技术呈指数级发展的当下,大模型已然成为推动各行业变革的核心驱动力。DeepSeek 开源模型以其卓越的性能和灵活的开源特性,吸引了众多企业与开发者的目光。如何高效且合理地部署与运用 DeepSeek 模型,成为释放其巨大潜力的关键所在&…...

