kafka之操作示例
一、常用shell命令
#1、创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replications 1 --topic test#2、查看创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181#3、生产者发布消息命令
(执行完此命令后在控制台输入要发送的消息,回车即可)
bin/kafka-console-producer.sh --broker-list 192.168.91.231:9092,192.168.91.231:9093,192.168.91.231:9094 --topic test#4、消费者接受消息命令
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning#5、kafka启动
首先启动zookeeper zkServer.sh start(相当于一个server,kafka会连接这个server)
bin/kafka-server-start.sh config/server.properties # 启动kafka#6、查看kafka节点数目
在zookeeper中查看,登录客户端bin/zkCli.sh 执行ls /brokers/ids 查看节点数目及节点ID,[0,1,2]#7、kafka中的概念
生产者 Producer、代理Broker、消费者Consumer、主题Topic、分区 Partition、消费者组 Consumer Group#8、查看主颗信息
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 [加其他选项]eg:
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 --describe
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test#9、为主题创建分区
一共创建八个分区,编号分别为0~7
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 --alter -partitions 8 -topic test#10、查看kafka进程
ps -eflgrep server.properties
ps -eflgrep server-1.properties
ps -eflgrep server-2.properties#11、kafka宕机重启后,消息不会丢失#12、kafka其中一个broker宕机后,对消费者和生产者影响很小(命令行下测试)
消费者会尝试连接,连接不到,返回java.net.ConnectException:Connection refused异常 生产者可能会在发送消息的时候报异常,但会很快连接到其他broker,继续正常使用#13.查看kafka消息队列的积压情况
bin/kafka-consumer-groups.sh --zookeeper 192.168.91.231:2181 --describe --group console-consumer-37289#14.kafka 中查看所有的group列表信息
bin/kafka-consumer-groups.sh --zookeeper 192.168.91.231:2181 --list
二、python操作kafka
本地安装与启动(基于Docker)
#1、下载zookeeper镜像与kafka镜像:
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1
#2、本地启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6
#3、本地启动kafka(注意下述代码,将kafka启动在9092端口)
docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--enV KAFKA ZO0KEEPER CONNECT=zookeeper:2181 \
--enV KAFKA ADVERTISED HOST NAME=192.168.71.113 \
--enV KAFKA ADVERTISED PORT=9092 \
registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1
上面写的localhost没有影响,查看端口如下
# netstat -tuanlp | grep 9092
tcp 0 0 0.0.0.0:9092 0.0.0.0:*LISTEN 102483/docker-proxy
tcp6 00:::9092 :::* LISTEN 102487/docker-proxy
#4、进入kafka bash
docker exec it kafka bash
cd /opt/kafka/bin
#5、创建Topic,分区为2,Topic name为'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo
kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic egon
数据存在哪里
[root@web02 ~]# docker exec -it kafka bash
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# ls /kafka/
kafka-logs-f33383f9c414
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# 1s /kafka/kafka-logs-f33383f9c414/
kafka_demo-0 kafka_demo-1
egon-0 egon-1
.........
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# ls /kafka/kafka-logs-f33383f9c414/egon-0
00000000000000000000.index0000000000000000000.timeindex
00000000800000080000.1og leader-epoch-checkpoint
#6、查看当前所有topic
kafka-topics.sh --zookeeper zookeeper:2181 --list
#7、命令行操作
$docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-producer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic
然后一行行输入,回车即发送一条消息
>111
>222
>333
另外一个终端
$ docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-consumer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic --from-beginning可以收到消息
111
222
333
#8、安装kafka-python
pip install kafka-python
代码示例:
# pip3 install kafka-python # 版本是2.0.2
from kafka import KafkaProducer, KafkaConsumer
import json
import threading
import time# Kafka broker address
bootstrap_servers = '192.168.71.113:9092'# Topic name
topic = 'test_topic'# Producer function
def kafka_producer():producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda v: json.dumps(v).encode('utf-8'))try:for i in range(10):message = {'message': f'Hello Kafka! Message {i}'}producer.send(topic, value=message)print(f"Sent: {message}")time.sleep(1)else:print("发送完成")except Exception as ex:print(f"Exception occurred: {ex}")finally:producer.close()# Consumer function
def kafka_consumer():consumer = KafkaConsumer(topic,bootstrap_servers=bootstrap_servers,auto_offset_reset='earliest',consumer_timeout_ms=5000) # 设置超时时间为1秒try:for message in consumer:print(f"Received: {message.value}")else:print("消费完毕,等5000毫秒超时即可结束,执行finally内的代码")except Exception as ex:print(f"Exception occurred: {ex}")finally:print("消费者结束")consumer.close()# Create threads for producer and consumer
producer_thread = threading.Thread(target=kafka_producer)
consumer_thread = threading.Thread(target=kafka_consumer)# Start both threads
producer_thread.start()
consumer_thread.start()# Wait for threads to complete
producer_thread.join()
consumer_thread.join()print("Kafka producer and consumer threads have finished.")
执行结果:
相关文章:

kafka之操作示例
一、常用shell命令 #1、创建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replications 1 --topic test#2、查看创建的topic bin/kafka-topics.sh --list --zookeeper localhost:2181#3、生产者发布消息命令 (执行完此命令后在控制台输入要发…...
MySQL问题:MySQL中使用索引一定有效吗?如何排查索引效果
不一定有效,当查询条件中不包含索引列或查询条件复杂且不匹配索引顺序 对于一些小表,MySQL可能选择全表扫描而非使用索引,因为全表扫描的开销可能更小 最终是否用上索引是根据MySQL成本计算决定的,评估CPU和I/O成本 排查索引效…...
OpenSSL 签名验证详解:PKCS7* p7、cafile 与 RSA 验签实现
OpenSSL 签名验证详解:PKCS7* p7、cafile 与 RSA 验签实现 摘要 本文深入剖析 OpenSSL 中 PKCS7* p7 数据结构和 cafile 的作用及相互关系,详细讲解基于 OpenSSL 的 RSA 验签字符串的 C 语言实现,涵盖签名解析、证书加载、验证流程及关键要…...
利用 `ngx_http_xslt_module` 实现 NGINX 的 XML → HTML 转换
一、模块简介 模块名称:ngx_http_xslt_module 首次引入版本:0.7.8 功能:在回传给客户端之前,用指定的 XSLT 样式表对 XML 响应进行转换。 依赖: libxml2libxslt 编译选项:需在 NGINX 编译时添加 --with…...
C语言队列详解
一、什么是队列? 队列(Queue)是一种先进先出(FIFO, First In First Out)的线性数据结构。它只允许在一端插入数据(队尾),在另一端删除数据(队头)。常见于排队…...
Qt中的智能指针
Qt中的智能指针 Qt中提供了多种智能指针,用于管理自动分配的内存,避免内存泄漏和悬挂指针的问题。以下是Qt中常见的智能指针及其功能和使用场景: 1. QSharedPointer QSharedPointer 是 Qt 框架中用于管理动态分配对象的智能指针,类似于 C1…...

车载网关策略 --- 车载网关通信故障处理机制深度解析
我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 钝感力的“钝”,不是木讷、迟钝,而是直面困境的韧劲和耐力,是面对外界噪音的通透淡然。 生活中有两种人,一种人格外在意别人的眼光;另一种人无论…...

三天掌握PyTorch精髓:从感知机到ResNet的快速进阶方法论
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 一、分析式AI基础与深度学习核心概念 1.1 深度学习三要素 数学基础: f(x;W,b)σ(Wxb)(单层感知机) 1.2 PyTorch核心组件 张量操作示例…...
Python爬虫实战:研究Selenium框架相关技术
1. 引言 1.1 研究背景与意义 随着互联网的快速发展,网页数据量呈爆炸式增长。从网页中提取有价值的信息成为数据挖掘、舆情分析、商业智能等领域的重要基础工作。然而,现代网页技术不断演进,越来越多的网页采用 JavaScript 动态加载内容,传统的基于 HTTP 请求的爬虫技术难…...

分布式缓存:三万字详解Redis
文章目录 缓存全景图PreRedis 整体认知框架一、Redis 简介二、核心特性三、性能模型四、持久化详解五、复制与高可用六、集群与分片方案 Redis 核心数据类型概述1. String2. List3. Set4. Sorted Set(有序集合)5. Hash6. Bitmap7. Geo8. HyperLogLog Red…...

BiLSTM与Transformer:位置编码的隐式vs显式之争
BiLSTM 与使用位置编码的LLM(如Transformer)的核心区别 一、架构原理对比 维度BiLSTM带位置编码的LLM(如Transformer)基础单元LSTM单元(记忆细胞、门控机制)自注意力机制(Self-Attention)信息传递双向链式传播(前向+后向LSTM)并行多头注意力,全局上下文关联位置信息…...

html5视频播放器和微信小程序如何实现视频的自动播放功能
在HTML5中实现视频自动播放需设置autoplay和muted属性(浏览器策略要求静音才能自动播放),并可添加loop循环播放、playsinline同层播放等优化属性。微信小程序通过<video>组件的autoplay属性实现自动播放,同时支持全屏按钮、…...
【QT】QString和QStringList去掉空格的方法总结
目录 一、QString去掉空格 1. 移除字符串首尾的空格(trimmed) 2. 移除字符串中的所有空格(remove) 3. 仅移除左侧(开头)或右侧(结尾)空格 4. 替换多个连续空格为单个空格 5. 移…...
58同城大数据面试题及参考答案
ROW_NUMBER、RANK、DENSE_RANK 函数的区别是什么? 这三个函数均为窗口函数,用于为结果集分区中的行生成序号,但核心逻辑存在显著差异,具体表现如下: 数据分布与排序规则 假设存在分区内分数数据为 [90, 85, 85, 80],按分数降序排序: ROW_NUMBER:为分区内每行分配唯一序…...
25.5.27学习总结
快速读入: inline int read() {int x 0, f 1;char ch getchar();while (ch < 0 || ch > 9) { // 跳过非数字字符if (ch -) f -1; // 处理负号ch getchar();}while (ch > 0 && ch < 9) {x x * 10 ch - 0; // 逐字符转数字ch ge…...

关于vue结合elementUI输入框回车刷新问题
问题 vue2项目结合elementUI,使用el-form表单时,第一次打开浏览器url辞职,并且是第一次打开带有这个表单的页面时,输入框输入内容,回车后会意外触发页面自动刷新。 原因 当前 el-form 表单只有一个输入框࿰…...

vue项目表格甘特图开发
🧩 甘特图可以管理项目进度,生产进度等信息,管理者可以更直观的查看内容。 1. 基础环境搭建 引入 dhtmlx-gantt 插件引入插件样式 dhtmlxgantt.css引入必要的扩展模块(如 markers、tooltip)创建 Vue 组件并挂载 DOM 容器初始化 gantt 图表配置2. 数据准备与处理 定义任务…...

Spark 中,创建 DataFrame 的方式(Scala语言)
在 Spark 中,创建 DataFrame 的方式多种多样,可根据数据来源、结构特性及性能需求灵活选择。 一、创建 DataFrame 的 12 种核心方式 1. 从 RDD 转换(需定义 Schema) import org.apache.spark.sql.{Row, SparkSession} import o…...

Python----目标检测(MS COCO数据集)
一、MS COCO数据集 COCO 是一个大规模的对象检测、分割和图像描述数据集。COCO有几个 特点: Object segmentation:目标级的分割(实例分割) Recognition in context:上下文中的识别(图像情景识别࿰…...

塔能科技:有哪些国内工业节能标杆案例?
在国内工业领域,节能降耗不仅是响应国家绿色发展号召、践行社会责任的必要之举,更是企业降低运营成本、提升核心竞争力的关键策略。塔能科技在这一浪潮中脱颖而出,凭借前沿技术与创新方案,成功打造了多个极具代表性的工业标杆案例…...
图论:floyed算法
Floyd 算法是一种用于寻找加权图中所有顶点对之间最短路径的经典算法,它能够处理负权边,但不能处理负权环。即如果边权有负数,切负权边与其他边构成了环就不能用该算法。该算法的时间复杂度为 \(O(V^3)\),其中 V 是图中顶点的数量…...
嵌入式系统C语言编程常用设计模式---参数表驱动设计
参数表驱动设计是一种软件开发和系统设计中常用的方法,它通过参数表来控制程序的行为和流程,提高系统的灵活性、可维护性和可扩展性。它将系统的行为逻辑与具体参数分离,通过表格形式集中管理配置信息。这种模式在嵌入式系统、工业控制和自动…...

OpenCV CUDA模块图像过滤------创建一个行方向的一维积分(Sum)滤波器函数createRowSumFilter()
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 cv::cuda::createRowSumFilter 是 OpenCV CUDA 模块中的一个函数,用于创建一个行方向的一维积分(Sum)滤波器。…...

Frequent values/gcd区间
Frequent values 思路: 这题它的数据是递增的,ST表,它的最多的个数只会在在两个区间本身就是最多的或中间地方产生,所以我用map数组储存每个值的左右临界点,在ST表时比较多一个比较中间值的个数就Ok了。 #define _…...

08SpringBoot高级--自动化配置
目录 Spring Boot Starter 依赖管理解释 一、核心概念 二、工作原理 依赖传递: 自动配置: 版本管理: 三、核心流程 四、常用 Starter 示例 五、自定义 Starter 步骤 创建配置类: 配置属性: 注册自动配置&a…...

Deep Evidential Regression
摘要 翻译: 确定性神经网络(NNs)正日益部署在安全关键领域,其中校准良好、鲁棒且高效的不确定性度量至关重要。本文提出一种新颖方法,用于训练非贝叶斯神经网络以同时估计连续目标值及其关联证据,从而学习…...

「Python教案」循环语句的使用
课程目标 1.知识目标 能使用for循环和while循环设计程序。能使用循环控制语句,break、continue、else设计程序。能使用循环实际问题。 2.能力目标 能根据需求合适的选择循环结构。能对嵌套循环代码进行调试和优化。能利用循环语句设计&am…...

linux快速入门-VMware安装linux,配置静态ip,使用服务器连接工具连接,快照和克隆以及修改相关配置信息
安装VMWare 省略,自己检索 安装操作系统-linux 注意:需要修改的我会给出标题,不要修改的直接点击下一步就可以 选择自定义配置 选择稍后安装操作系统 选择合适的内存 选择NAT模式 仅主机模式 虚拟机只能和主机通信,不能上网…...
用户配置文件(Profile)
2.4.5 用户配置文件(Profile) 用户配置文件由以下组件构成: 一个运营商安全域(MNO-SD) 辅助安全域(SSD)和CASD Applets 应用程序(如NFC应用) 网络接入应用ÿ…...
ubuntu 制作 ssl 证书
安装 openssl sudo apt install openssl 生成 SSL 证书 # 生成私钥 (Private Key) openssl genrsa -out private.key 2048 在当前目录生成 private.key # 生成证书签名请求 (CSR - Certificate Signing Request) openssl req -new -key private.key -out certificate.csr -…...