kafka 命令脚本说明以及在java中使用
一、命令行使用
1.1、topic 命令
1、关于topic,这里用window 来示例
bin\windows\kafka-topics.bat
2、创建 first topic,五个分区,1个副本
bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --partitions 5 --replication-factor 1 --topic first
3、查看当前服务器中的所有 topic
bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
4、查看 first 主题的详情
bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic first
5、修改分区数**(注意:分区数只能增加,不能减少)**
bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic first --partitions 6
6、删除 topic,该操作在winodw,会出现文件授权问题,日志可以在kafka的启动命令窗口中查看,只需要修改文件权限即可,如果出现这个问题,我们需要清空之前配置的 data
和kafka-logs
这两个文件中的内容,再次重新启动即可。
bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic first
1.2、生产者命令行操作
1、关于查看操作生产者命令参数,这里用window 来示例
.\bin\windows\kafka-console-producer.bat
2、发送消息,这里发送了2次的数据,第一次是hello,第二次是world
.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic first
1.3、消费者命令行操作
1、关于查看操作生产者命令参数,这里用window 来示例
.\bin\windows\kafka-console-consumer.bat
2、接受消息,因为前面我们在发送消息的时候,消费者没有启动,所以第一次发的数据这里是收不到的,并没有存储到topic中
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic first
3、把主题中所有的数据都读取出来(包括历史数据),可以看到我们获取到了从消费者没有上线之前到上线之后的所有数据,一共6条。
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic first
1.4、脚本说明
项目 | Value |
---|---|
connect-standalone.sh | 用于启动单节点的Standalone模式的Kafka Connect组件。 |
connect-distributed.sh | 用于启动多节点的Distributed模式的Kafka Connect组件。 |
kafka-acls.sh | 脚本用于设置Kafka权限,比如设置哪些用户可以访问Kafka的哪些TOPIC的权限。 |
kafka-delegation-tokens.sh | 用于管理Delegation Token。基于Delegation Token的认证是一种轻量级的认证机制,是对SASL认证机制的补充。 |
kafka-topics.sh | 用于管理所有TOPIC。 |
kafka-console-producer.sh | 用于生产消息。 |
kafka-console-consumer.sh | 用于消费消息。 |
kafka-producer-perf-test.sh | 用于生产者性能测试。 |
kafka-consumer-perf-test.sh | 用于消费者性能测试。 |
kafka-delete-records.sh | 用于删除Kafka的分区消息,由于Kafka有自己的自动消息删除策略,使用率不高。 |
kafka-dump-log.sh | 用于查看Kafka消息文件的内容,包括消息的各种元数据信息、消息体数据。 |
kafka-log-dirs.sh | 用于查询各个Broker上的各个日志路径的磁盘占用情况。 |
kafka-mirror-maker.sh | 用于在Kafka集群间实现数据镜像。 |
kafka-preferred-replica-election.sh | 用于执行Preferred Leader选举,可以为指定的主题执行更换Leader的操作。 |
kafka-reassign-partitions.sh | 用于执行分区副本迁移以及副本文件路径迁移。 |
kafka-run-class.sh | 用于执行任何带main方法的Kafka类。 |
kafka-server-start.sh | 用于启动Broker进程。 |
kafka-server-stop.sh | 用于停止Broker进程。 |
kafka-streams-application-reset.sh | 用于给Kafka Streams应用程序重设位移,以便重新消费数据。 |
kafka-verifiable-producer.sh | 用于测试验证生产者的功能。 |
kafka-verifiable-consumer.sh | 用于测试验证消费者功能。 |
trogdor.sh | 是Kafka的测试框架,用于执行各种基准测试和负载测试。 |
kafka-broker-api-versions.sh | 脚本主要用于验证不同Kafka版本之间服务器和客户端的适配性 |
1.5、关闭kafka
1、一定要先关闭 kafka,再关闭zookeeper,否则容易出现数据错乱
如果出现数据错错乱,最简单的方法就是清空data和kafka-logs 这两个文件下的内容,重新启动即可
2、关闭
.\bin\windows\kafka-server-stop.bat
.\bin\windows\zookeeper-server-stop.bat
1.6、选择分区数及kafka性能测试
1、主要工具是 kafka-producer-perf-test.bat
和 kafka-consumer-perf-test.bat
两个脚本,可以参考 kafka如何选择分区数及kafka性能测试
二、java 使用
2.1、使用原生客户端
1、依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency>
2、发送和消费消息,具体代码如下:
public class KafkaConfig {public static void main(String[] args) {// 声明主题String topic = "first";// 创建消费者Properties consumerConfig = new Properties();consumerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"boot-kafka");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig);// 订阅主题并循环拉取消息kafkaConsumer.subscribe(Arrays.asList(topic));new Thread(new Runnable() {@Overridepublic void run() {while (true){ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));for(ConsumerRecord<String, String> record:records){System.out.println(record.value());}}}}).start();// 创建生产者Properties producerConfig = new Properties();producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG,"boot-kafka-client");producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer producer = new KafkaProducer<>(producerConfig);// 给主题发送消息producer.send(new ProducerRecord<>(topic, "hello,"+System.currentTimeMillis()));}
}
2.2、使用springBoot
1、依赖
<!-- 不使用kafka的原始客户端,使用spring集成的,这样比较方便 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><!-- 可以不用指定,springBoot 会帮我们选择,如果有特殊需求,可以更改 --><!-- <version>3.0.2</version>--></dependency>
2、配置文件
server:port: 7280servlet:context-path: /thermal-emqx2kafkashutdown: gracefulspring:application:name: thermal-api-demonstration-tdenginelifecycle:timeout-per-shutdown-phase: 30smvc:pathmatch:matching-strategy: ant_path_matcher # 不然spring boot 2.6以后的版本 和 swagger 会出现 问题,可以参考 https://blog.csdn.net/qq_41027259/article/details/125747298kafka:bootstrap-servers: 127.0.0.1:9092 # 192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094 连接的 Kafka Broker 主机名称和端口号#properties.key-serializer: # 用于配置客户端的附加属性,对于生产者和消费者都是通用的,。 org.apache.kafka.common.serialization.StringSerializerproducer: # 生产者retries: 3 # 重试次数#acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)#batch-size: 16384 # 一次最多发送数据量#buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # 消费者group-id: test-consumer-group #默认的消费组ID,在Kafka的/config/consumer.properties中查看和修改#enable-auto-commit: true # 是否自动提交offset#auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)#auto-offset-reset: latest #earliest,latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3、发送消息
package cn.jt.thermalemqx2kafka.kafka.controller;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;/*** @author GXM* @version 1.0.0* @Description TODO* @createTime 2023年08月17日*/
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/mock")public String sendKafkaMessage() {Map<String, Object> data = new HashMap<>(2);data.put("id", 1);data.put("name", "gkj");kafkaTemplate.send("first", JSON.toJSONString(data));return "ok";}
}
4、接受消息
package cn.jt.thermalemqx2kafka.kafka.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author GXM* @version 1.0.0* @Description TODO* @createTime 2023年08月17日*/
@Slf4j
@Component
public class KafkaListener {@org.springframework.kafka.annotation.KafkaListener(topics = "first")private void handler(String content) {log.info("consumer received: {} ", content);}
}
相关文章:

kafka 命令脚本说明以及在java中使用
一、命令行使用 1.1、topic 命令 1、关于topic,这里用window 来示例 bin\windows\kafka-topics.bat2、创建 first topic,五个分区,1个副本 bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --partitions 5 --replication-factor 1 -…...

Qt应用开发(基础篇)——文件选择对话框 QFileDialog
一、前言 QFileDialog类继承于QDialog,提供了一个允许用户选择文件或目录的对话框。 对话框窗口 QDialog QFileDialog文件选择对话框允许用户在当前文件系统中选择一个或者多个文件或者文件路径,使用静态函数创建是很简便的方式,比如…...

图像OCR转文字,验证码识别技术太疯狂-UI软件自动化
现在用PYTHON识别图片文字,PaddleOCR,Tesseract,Opencv等很多开源技术。知识大爆炸年代,几年不学习就跟不上时代了。 以前早的时候一个验证码图片上有4个不同颜色字符,带一些杂点,我写点代码按颜色最多的进行提取&…...

Docker:自定义镜像
(总结自b站黑马程序员课程) 环环相扣,跳过部分章节和知识点是不可取的。 一、镜像结构 镜像是分层结构,每一层称为一个Layer。 ①BaseImage层:包含基本的系统函数库、环境变量、文件系统。 ②Entrypoint࿱…...
【Nginx22】Nginx学习:FastCGI模块(四)错误处理及其它
Nginx学习:FastCGI模块(四)错误处理及其它 FastCGI 最后一篇,我们将学习完剩下的所有配置指令。在这里,错误处理还是单独拿出来成为一个小节了,而剩下的内容都放到其它中进行学习。不要感觉是其它的就没用了…...
轮毂电机单位换算-米每秒/转每分
先前写了一篇度/S和RPM的关系 这次补全一点 假设轮毂电机直径20CM 0.2M 周长为0.628M 0.2*3.14 轮子转一圈走0.628M 1RPM的单位是转/分 换成转/S 就除以60 也就是轮子转一圈的速度0.628/60 m/S 0.010467m/S 所以换算如下: 1RPM0.010467 m/S 那么1m/S1/(0.010467) RPM95.5RPM 如…...

博流RISC-V芯片BL616开发环境搭建
文章目录 1、工具安装2、代码下载3、环境变量配置4、下载交叉编译器5、编译与下载运行6、使用ninja编译 本文分别介绍博流RISC-V芯片 BL616 在 Windows和Linux 下开发环境搭建,本文同时适用BL618,BL602,BL702,BL808系列芯片。 1、…...

Weblogic漏洞(三)之 Weblogic 弱口令、任意文件读取漏洞
Weblogic 弱口令、任意文件读取漏洞 环境安装 此次我们实验的靶场,是vnlhub中的Weblogic漏洞中的weak_password靶场,我们 cd 到weak_password,然后输入以下命令启动靶场环境: docker-compose up -d输入以下的命令可以查看当前启…...

15 mysql tiny/meidum/long blob/text 的数据存储
前言 这里主要是 由于之前的一个 datetime 存储的时间 导致的问题的衍生出来的探究 探究的主要内容为 int 类类型的存储, 浮点类类型的存储, char 类类型的存储, blob 类类型的存储, enum/json/set/bit 类类型的存储 本文主要 的相关内容是 tiny/medium/long blob/text 类…...

【方案】基于视频与AI智能分析技术的城市轨道交通视频监控建设方案
一、背景分析 地铁作为重要的公共场所交通枢纽,流动性非常高、人员大量聚集,轨道交通需要利用视频监控系统来实现全程、全方位的安全防范,这也是保证地铁行车组织和安全的重要手段。调度员和车站值班员通过系统监管列车运行、客流情况、变电…...
mysql8 修改数据存储位置
1、停止MySQL服务 systemctl stop mysqld2、复制现有的数据库目录到新的位置 默认情况下,MySQL的数据库目录位于/var/lib/mysql。假设您想将数据库目录更改为/home/mysql,您可以使用以下命令来复制数据库目录 cp -R /var/lib/mysql /home/mysql3、修改…...
Qt QSlider样式
滑块控件QSlider,如果设置的垂直样式,其进度颜色和剩余颜色,刚好和横向样式的颜色相反的,不确定这个是否是Qt的BUG,Qt456都是这个现象 QSlider::groove:horizontal{ height:8px; background:#FF0000; }QSlider::add-p…...

Redis五大数据类型
Redis五大数据类型 Redis-Key 官网:https://www.redis.net.cn/order/ 序号命令语法描述1DEL key该命令用于在 key 存在时删除 key2DUMP key序列化给定 key ,并返回被序列化的值3EXISTS key检查给定 key 是否存在,存在返回1,否则返…...

chatGPT训练过程
强化学习基础 强化学习是指智能体在不确定环境中最大化其获得的奖励从而达到自主决策的目的。其执行过程为:智能体依据策略决策从而执行动作,然后感知环境获取环境的状态,进而得到奖励(以便下次再到相同状态时能采取更优的动作),…...
原神角色数据分析项目说明文档
---项目涉及--- 前端html语言,flask框架,excel,MySQL,DataFrame数组,numpy,pyecharts ---实现方式--- 将所有角色数据存储在excel表格中,在需要时读取,当用户想要查看某一项时&…...
【Qt】QML-04:自定义变量(属性)property
1、property 1.1 介绍 property用来自定义属性。 什么是属性?面向对象中,类由方法和属性构成。对于从C语言的过来人,更喜欢称之为变量。 之所以说“自定义”,是因为QML语言本身已有默认定义好的属性,这些属性不可以…...

基于Cadence Allegro无盘设计操作流程
无盘设计 1.因为过孔具有电容效应,无盘设计能最大限度保证阻抗连续性,从而减小反射与插损; 2.减缓走线压力,降低产品成本与风险; SetupConstraintsModelSpacing Models勾选Hole to line SetupUnused Pads Su…...

微信小程序 - 2023年最新版手机号快捷登录详细教程
前言 最近开发公司手机快捷登录的功能,花费了不少时间,这里附上详细教程。 这里以海底捞小程序的图片为例,如有侵权请联系小编删除。 代码如下 <button open-type"getPhoneNumber" getphonenumber"getPhoneNumber"…...
Spring_Bean的自动装配
目录 三种配置机制 测试搭建 byName byType 使用注解 Autowire Qualifer Resource Autowire和Resource的不同 自动装配是使用spring满足bean依赖的一种条件 三种配置机制 在xml中显式配置;在java中显式配置;隐式的bean发现机制和自动装配。 …...

使用boost::geometry::union_ 合并边界(内、外)- 方案一
使用boost::geometry::union_ 合并边界(内、外):方案一 结合 boost::geometry::read_wkt() 函数 #include <iostream> #include <vector>#include <boost/geometry.hpp> #include <boost/geometry/geometries/point_x…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...

UDP(Echoserver)
网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法:netstat [选项] 功能:查看网络状态 常用选项: n 拒绝显示别名&#…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)
在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...

【VLNs篇】07:NavRL—在动态环境中学习安全飞行
项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...
【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案
目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后,迭代器会失效,因为顺序迭代器在内存中是连续存储的,元素删除后,后续元素会前移。 但一些场景中,我们又需要在执行删除操作…...