day08_Kafka
文章目录
- day08_Kafka课程笔记
- 一、今日课程内容
- 一、消息队列(了解)
- **为什么消息队列就像是“数据的快递员”?**
- **实际意义**
- 1、产生背景
- 2、消息队列介绍
- 2.1 常见的消息队列产品
- 2.2 应用场景
- 2.3 消息队列中两种消息模型
- 二、Kafka的基本介绍
- 1、Kafka基本介绍
- **为什么 Kafka 就像是“数据的高速公路”?**
- **实际意义**
- 2、回顾zookeeper知识
- 回顾启动zookeeper服务
- 回顾zookeeper工具连接
- 3、Kafka的架构(掌握)
- 三、Kafka的集群搭建(操作)
- 1、软件安装
- 2、安装易错点
- 3、配置Kafka的一键化启动
- 4、启动服务
- 5、操作kafka的多种方式
- 四、Kafka的shell命令使用
- topics操作
- producer和consumer操作
- bootstrap-server和zookeeper以及broker-list的区别:
- 五、kafka tools工具使用(熟悉)
- 3-1 连接配置
- 3-2 创建主题
- 3-3 删除主题
- 3-4 主题下的数据查看
- 3-5 数据发送和接收
- 六、Kafka的Python API的操作(熟悉)
- 模块安装
- 模块使用
- 3.1 完成生产者代码
- 3.2 完成消费者代码
- 01_生产者代码入门.py
- 02_消费者代码入门.py
day08_Kafka课程笔记
一、今日课程内容
- 1- 消息队列(了解)
- 2- Kafka的基本介绍(掌握架构,其他了解)
- 3- Kafka的集群搭建(操作)
- 4- Kafka的相关使用(掌握kafka常用shell命令)
今日目的:掌握Kafka架构
一、消息队列(了解)
简单来说:消息队列就像是“数据的快递员”,在应用程序之间传递消息,确保数据能够可靠、高效地传输和处理。
具体而言:
- 核心概念:
- 消息:需要传递的数据单元,可以是文本、JSON、二进制等格式。
- 队列:存储消息的容器,遵循先进先出(FIFO)的原则。
- 生产者:发送消息的应用程序。
- 消费者:接收和处理消息的应用程序。
- 特点:
- 异步通信:生产者和消费者不需要同时在线,消息可以暂存于队列中。
- 解耦:生产者和消费者之间无需直接交互,降低了系统耦合度。
- 可靠性:消息队列通常支持持久化,确保消息不会丢失。
- 扩展性:支持多个生产者和消费者,便于系统扩展。
实际生产场景:
- 在电商系统中,使用消息队列处理订单,确保订单数据可靠传输。
- 在日志收集中,使用消息队列缓冲日志数据,避免数据丢失。
- 在微服务架构中,使用消息队列实现服务间的异步通信。
总之:消息队列是分布式系统中重要的组件,通过异步通信和解耦,提高了系统的可靠性、扩展性和灵活性。
为什么消息队列就像是“数据的快递员”?
-
传递数据:可靠传输
- 快递员:将包裹从发件人送到收件人,确保包裹安全到达。
- 消息队列:将消息从生产者传递到消费者,确保数据可靠传输。
-
异步通信:无需实时交互
- 快递员:发件人和收件人不需要同时在场,包裹可以暂存在快递点。
- 消息队列:生产者和消费者不需要同时在线,消息可以暂存于队列中,实现异步通信。
-
解耦系统:降低依赖
- 快递员:发件人和收件人无需直接联系,通过快递员完成交互。
- 消息队列:生产者和消费者之间无需直接交互,降低了系统耦合度。
-
可靠性:确保数据不丢失
- 快递员:通过物流系统确保包裹不丢失。
- 消息队列:通过持久化机制确保消息不丢失,即使系统故障也能恢复。
-
扩展性:支持多对多通信
- 快递员:可以同时为多个发件人和收件人服务。
- 消息队列:支持多个生产者和消费者,便于系统扩展和负载均衡。
实际意义
消息队列就像“数据的快递员”,通过异步通信、解耦系统、可靠传输和扩展性,确保数据能够在分布式系统中高效、可靠地传递和处理。
1、产生背景
消息队列:指的数据在一个容器中,从容器中一端传递到另一端的过程
消息(message): 指的是数据,只不过这个数据存在一定流动状态
队列(queue): 指的容器,可以存储数据,只不过这个容器具备FIFO(先进先出)特性
思考: 公共容器需要具备什么特点?
1- 公共性: 各个程序都可以与之对接
2- FIFO特性: 先进先出
3- 具备高效的并发能力: 能够承载海量数据
4- 具备一定的容错能力: 比如支持重新读取消息方案
2、消息队列介绍
2.1 常见的消息队列产品
MQ:message queue消息队列
activeMQ: 出现时期比较早的一款消息队列的中间件产品,在早期使用人群是非常多,目前整个社区活跃度严重下降,使用人群很少了
rabbitMQ: 此款是目前使用人群比较多的一款消息队列的中间件的产品,社区活跃度比较高,主要是应用传统业务领域中
rocketMQ: 是阿里推出的一款消息队列的中间件的产品,目前主要是在阿里系环境中使用,目前支持的客户端比较少,主要是Java中应用较多
Kafka: Apache旗下的顶级开源项目,是一款消息队列的中间件产品项目来源于领英,是大数据体系中目前为止最为常用的一款消息队列的产品
2.2 应用场景
- 应用解耦合
- 异步处理
- 限流削峰
- 消息驱动系统
2.3 消息队列中两种消息模型
在Java中, 为了能够集成消息队列的产品, 专门提供了一个消息队列的协议: JMS(Java Message Server) java消息服务消息队列中两个角色: 生产者(producer) 和 消费者(consumer)
生产者: 生产/发送消息到消息队列中
消费者: 从消息队列中获取消息在JMS规范中, 专门规定了两种消息消费模型:
1- 点对点消费模型: 指的一条消息最终只能被一个消费者所消费。微信聊天的私聊
2- 发布订阅消费模型: 指的一条消息最终被多个消费者所消费。微信聊天的群聊
二、Kafka的基本介绍
1、Kafka基本介绍
Kafka是一款消息队列的中间件产品, 来源于领英公司, 后期贡献给了Apache, 目前是Aapche旗下的顶级开源项目, 采用语言是Scala
官方地址: http://kafka.apache.org
kafka的特点:
- 可靠性:Kafka集群是分布式的,并且有多副本的机制。数据可以自动复制
- 可扩展性:Kafka集群可以灵活的调整,在线扩容
- 耐用性:Kafka数据保存在磁盘上面,数据并且有多副本的机制。数据持久化,而且可以一定程度上防止数据丢失
- 高性能:Kafka可以存储海量的数据,虽然是使用磁盘进行数据存储,但是Kafka有各种优化手段(例如:磁盘的顺序读写、零拷贝等)提高数据的读写速度(吞吐量)
简单来说:Kafka是一个分布式的流数据平台,就像是“数据的高速公路”,能够高效地处理实时数据流,支持数据的发布、订阅、存储和处理。
具体而言:
- 核心概念:
- Topic:数据流的分类,类似于消息队列的主题。
- Producer:数据生产者,将数据发布到Topic。
- Consumer:数据消费者,从Topic订阅数据。
- Broker:Kafka集群中的服务器节点,负责存储和转发数据。
- Partition:Topic的分区,支持并行处理和水平扩展。
- 特点:
- 高吞吐量:支持每秒处理数百万条消息。
- 持久化:数据持久化到磁盘,支持数据重放。
- 分布式:支持集群部署,具备高可用性和容错能力。
- 实时性:支持实时数据流的处理和分析。
实际生产场景:
- 在日志收集中,使用Kafka收集和传输分布式系统的日志数据。
- 在实时推荐系统中,使用Kafka处理用户行为数据,实时更新推荐结果。
- 在金融领域,使用Kafka处理交易数据,进行实时风险监控。
总之:Kafka是一个强大的分布式流数据平台,广泛应用于实时数据处理、日志收集和消息传递等场景,为大数据生态系统提供了高效、可靠的数据传输和处理能力。
为什么 Kafka 就像是“数据的高速公路”?
-
高速传输:高效处理数据流
- 高速公路:车辆可以快速、高效地通行。
- Kafka:支持高吞吐量,每秒可以处理数百万条消息,确保数据流的高速传输。
-
多车道并行:分区与扩展
- 高速公路:多车道设计,支持车辆并行通行。
- Kafka:通过Partition(分区)实现数据的并行处理和水平扩展,提高数据处理能力。
-
持久化存储:数据不丢失
- 高速公路:有完善的基础设施,确保车辆安全通行。
- Kafka:数据持久化到磁盘,支持数据重放,确保数据不丢失。
-
分布式架构:高可用性与容错
- 高速公路:有多条备用路线,避免交通堵塞或事故中断。
- Kafka:支持集群部署,具备高可用性和容错能力,即使部分节点故障,数据仍能正常传输。
-
实时性:快速响应
- 高速公路:车辆可以快速到达目的地。
- Kafka:支持实时数据流的处理和分析,确保数据能够快速传递到消费者。
实际意义
Kafka就像“数据的高速公路”,通过高吞吐量、分区并行、持久化存储和分布式架构,确保数据流能够高效、可靠地传输和处理,为实时数据应用提供了强大的支持。
2、回顾zookeeper知识
Kafka需要使用到zookeeper服务!
回顾启动zookeeper服务
# 三台都需要启动zookeeper服务
[root@node1 ~]# /export/server/zookeeper/bin/zkServer.sh start
[root@node2 ~]# /export/server/zookeeper/bin/zkServer.sh start
[root@node3 ~]# /export/server/zookeeper/bin/zkServer.sh start
回顾zookeeper工具连接
使用hadoop阶段发的ZooInspector软件,双击zookeeper-dev-ZooInspector.jar启动
3、Kafka的架构(掌握)
回顾HDFS写入过程:
Kafka架构:
1- Kafka中集群节点叫broker,节点和节点之间没有主从之分,地位是完全一样
2- Topic:主题/话题,是业务层面对消息进行分类的。
3- 一个Topic可以设置多个Partition分区。
4- 同一个Partition分区可以设置多个副本,但是副本数不能超过(>)集群broker节点的个数
5- 虽然broker节点间没有主从之分,但是同一个Partition分区的不同副本间有主从之分,分为了Leader主副本和Follower从副本
6- 生产者将数据首先发送给到Leader主副本,接着是Leader主副本主动的往Follower从副本上同步消息
7- Zookeeper用来管理集群,以及管理元数据信息
8- ISR同步列表。该列表中存放的是与Leader主副本消息同步程度最接近的Follower从副本,也就是消息最小的一个列表。该列表作用,当Leader主副本无法对外提供服务的时候,会从该ISR列表中选择一个Follower从副本变成Leader主副本,对外提供服务相关名词:
Kafka Cluster: Kafka集群
Topic: 主题/话题
Broker: Kafka中的节点
Producer: 生产者,负责生产/发送消息到Kafka中
Consumer: 消费者,负责从Kafka中获取消息
Partition: 分区。一个Topic可以设置多个分区,没有数量限制
三、Kafka的集群搭建(操作)
1、软件安装
环境搭建,参考【Spark课程阶段_部署文档.doc】的9.1章节内容。
2、安装易错点
-
1- 配置文件中监听地址前面的注释,记得打开。也就是删除最前面的#
-
2- 分发之后,记得要修改每个server.sql的 id 和 监听地址
-
3- 分发之后,记得source /etc/profile让环境变量生效
-
4- 没有启动zookeeper,或者仅仅启动了其中一台
-
5- 启动的时候server.sql中路径,不要写错了
3、配置Kafka的一键化启动
注意:使用一键化脚本,也得需要先启动zookeeper
环境搭建,参考【Spark课程阶段_部署文档.doc】的9.4章节内容。
4、启动服务
方式1: 正常启动
# 1.先在三台机器都输入以下命令,启动ZooKeeper
/export/server/zookeeper/bin/zkServer.sh start# 2.再在三台集群上都输入以下命令,启动Kafka
# 注意:下面是一条命令!!!
nohup /export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.sql 2>&1 &
方式2: 使用kafka的onekey脚本
# 1.先在三台机器都输入以下命令,启动ZooKeeper
/export/server/zookeeper/bin/zkServer.sh start# 2.只在node1上一键启动所有kafka服务
/export/onekey/start-kafka.sh
5、操作kafka的多种方式
四、Kafka的shell命令使用
Kafka本质上就是一个消息队列的中间件的产品,主要负责消息数据的传递。也就说学习Kafka 也就是学习如何使用Kafka生产数据,以及如何使用Kafka来消费数据
topics操作
注意:
创建topic不指定分区数和副本数,默认都是1个
分区数可以后期通过alter增大,但是不能减小
副本数一旦确定,不能修改!
参数如下:
cd /export/server/kafka/bin./kafka-topics.sh 参数说明:--bootstrap-server: Kafka集群中broker服务器--topic: 指定Topic名称--partitions: 设置Topic的分区数,可以省略不写--replication-factor: 设置Topic分区的副本数,可以省略不写--create: 指定操作类型。这里是新建Topic--delete: 指定操作类型。这里是删除Topic--alter: 指定操作类型。这里是修改Topic--list: 指定操作类型。这里是查看所有Topic列表--describe: 指定操作类型。这里是查看详细且具体的Topic信息
- 1- 创建Topic
# 创建topic,默认1个分区,1个副本
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itcast # 可以使用以下参数提前查看是否是默认1个
--list: 指定操作类型。这里是查看所有Topic列表
--describe: 指定操作类型。这里是查看详细且具体的Topic信息
# 注意: 如果副本数超过了集群broker节点个数,就会报错
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itheima --partitions 4 --replication-factor 4
# 把replication-factor改成3以内就能创建成功了
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itheima --partitions 4 --replication-factor 3
- 2- 查看Topic
# --list查看所有topic 只有名称信息
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list
# --describe 可以查看详细Topic信息
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --describe # --describe 可以查看具体Topic信息
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --describe --topic itheima
当然也可使用zookeeper客户端查看
-
3- 修改Topic
本质就是扩容分区!!!
因为分区不能减小,副本不能修改
# 增大topic分区
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 4
# 注意: partitions分区,只能增大,不能减小。而且没有数量限制
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 1
# 注意: 副本既不能增大,也不能减小
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 4 --replication-factor 2
…
- 4- 删除Topic
# 再创建一个spark主题
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic spark/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list# 删除spark主题/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --delete --topic spark/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list
producer和consumer操作
消费者要和生产者指定是同一个topic主题,才能接收到消息
参数如下:
cd /export/server/kafka/bin./kafka-console-producer.sh 参数说明--broker-list: Kafka集群中broker服务器--topic: 指定Topic./kafka-console-consumer.sh 参数说明--bootstrap-server: Kafka集群中broker连接信息--topic: 指定Topiclatest: 消费者(默认)从最新的地方开始消费--from-beginning: 指定该参数以后,会从最旧的地方开始消费--max-messages: 最多消费的条数。
- 1- 模拟生产者Producer
# 为了方便演示再创建一个topic,名称为spark
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic spark# 模拟生产者给spark发送消息
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark-- 注意: 上述命令执行完后,出现 >,可以输入对应的消息了
-
2- 模拟消费者Consumer
注意: 可以右键CRT客户端连接->克隆会话来模拟多个消费者
# 模拟消费者从spark获取消息,默认每次拿最新的
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark -- 注意: 输入完上述命令后,自动接收最新的消息(因为默认latest),还可以持续接收...# --from-beginning 会从最旧的地方开始消费
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark --from-beginning-- 注意: 输入完上述命令后,自动接收了生产者发送的所有消息,还可以持续接收...# --max-messages x 可以设置从最旧的地方最大消费次数x
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark --from-beginning --max-messages 2-- 注意: 输入完上述命令后,只接收了前2个消息就结束了,不会持续接收
注意:
我们有时候发现消费者打印出来的消息和生产者生产的顺序不一致,是乱序的。原因如下:
topic有多个分区,底层是多线程来读取数据并进行打印输出。因此会存在乱序现象
bootstrap-server和zookeeper以及broker-list的区别:
旧版(<v2.2): kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --create --topic ..
注意: 旧版用--zookeeper参数,主机名(或IP)和端口用ZooKeeper的2181,也就是server.sql文件中zookeeper.connect属性的配置值.新版(>v2.2): kafka-topics.sh --bootstrap-server node1:9092 --create --topic ..
注意: 新版用--bootstrap-server参数,主机名(或IP)和端口用某个节点的即可,即主机名(或主机IP):9092。9092是Kafka的监听端口broker-list:broker指的是kafka的服务端,可以是一个服务器也可以是一个集群。producer和consumer都相当于这个服务端的客户端。一般我们在使用console producer的时候,broker-list参数是必备参数,另外一个必备的参数是topicbootstrap-servers: 指的是kafka集群的服务器地址,这个和broker-list功能是一样的,只不过我们在console producer要求用broker-list,其他地方都采用bootstrap-servers。
简单来说:
bootstrap-server
、zookeeper
和broker-list
是Kafka中用于连接和协调的核心概念,分别像是“导航员”、“协调员”和“服务列表”,各自承担不同的角色和功能。具体而言:
bootstrap-server:
- 功能:用于客户端(Producer和Consumer)连接Kafka集群的入口点。客户端通过
bootstrap-server
获取集群的元数据(如Topic分区信息),并自动发现其他Broker。- 特点:支持自动发现集群中的其他Broker,简化了配置和管理。
- 使用场景:适用于Kafka 0.9及以上版本,推荐在新版本中使用。
- 示例:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
zookeeper:
- 功能:在早期版本(Kafka 0.8及以前)中,Zookeeper用于管理Kafka集群的元数据(如Broker状态、Topic分区信息)和消费者偏移量(offset)。
- 特点:依赖Zookeeper会增加系统复杂性和性能开销,因此在Kafka 0.9及以后版本中,逐渐被
bootstrap-server
取代。- 使用场景:适用于旧版本Kafka,或需要与Zookeeper集成的场景。
- 示例:
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
broker-list:
- 功能:指定Kafka集群中的一个或多个Broker地址,主要用于Producer连接集群并发送消息。
- 特点:需要手动配置Broker地址列表,不支持自动发现其他Broker。
- 使用场景:适用于Producer的配置,尤其是在旧版本或特定场景下。
- 示例:
kafka-console-producer.sh --broker-list localhost:9092 --topic test
实际生产场景:
- 在新版Kafka中,推荐使用
bootstrap-server
,因为它简化了配置并支持自动发现集群中的其他Broker。- 在旧版Kafka中,可能需要使用
zookeeper
来管理消费者偏移量和集群元数据。- 对于Producer,
broker-list
仍然是一个常用的配置参数,尤其是在需要手动指定Broker地址时。总之:
bootstrap-server
、zookeeper
和broker-list
在Kafka中各有其作用,bootstrap-server
是新版本的推荐选择,zookeeper
适用于旧版本,而broker-list
主要用于Producer的配置。根据Kafka版本和具体需求选择合适的配置方式,可以提高系统的性能和可维护性。如果需要更详细的信息,可以参考相关文档或搜索来源。
五、kafka tools工具使用(熟悉)
可以在可视化的工具通过点击来操作kafka完成主题的创建,分区等操作
注意: 安装完后桌面不会有快捷方式,需要去电脑上搜索,或者去自己选的安装位置找到发送快捷方式到桌面!
3-1 连接配置
-
修改工具的数据显示类型字符串类型
3-2 创建主题
3-3 删除主题
3-4 主题下的数据查看
3-5 数据发送和接收
- 3-5 发送消息数据到kafka
六、Kafka的Python API的操作(熟悉)
模块安装
纯Python的方式操作Kafka。
准备工作:在node1的节点上安装一个python用于操作Kafka的库
安装kafka-python 模模块 ,模块中提供了操作kafka的方法
在线安装
在node1上安装就可以,需要保证服务器能够连接网络
安装命令: python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple
离线安装
将kafka_python-2.0.2-py2.py3-none-any.whl安装包上传服务器software目录下进行安装
安装命令: pip install kafka_python-2.0.2-py2.py3-none-any.whl
模块使用
API使用的参考文档: https://kafka-python.readthedocs.io/en/master/usage.html#kafkaproducer
模块中封装了两个类,
一个是生成者类KafkaProducer,提供了向kafka写数据的方法
另一个是消费者类KafkaConsumer,提供了读取kafka数据的方法
3.1 完成生产者代码
生成者类KafkaProducer,提供了向kafka写数据的方法
send(topic,valu)方法: 发送消息
topic参数:指定向哪个主题发送消息
value参数:指定发送的消息数据 ,数据类型要求是bytes类型
示例:
# 导包
from kafka import KafkaProducer# 编写代码
if __name__ == '__main__':# 创建生产者对象并指定对应服务器producer = KafkaProducer(bootstrap_servers=['node1:9092'])# 发送消息for i in range(1,101):future = producer.send('kafka', f'hi_kafka_{i}'.encode())# 获取元数据record_metadata = future.get()# 从元数据中获取主题,分区,偏移print(record_metadata.topic)print(record_metadata.partition)print(record_metadata.offset)
3.2 完成消费者代码
消费者类KafkaConsumer,提供了读取kafka数据的方法
KafkaConsumer(topic,bootstrap_servers)
第一个参数:指定消费者连接的主题,
第二个参数:指定消费者连接的kafka服务器
示例:
# 导包
from kafka import KafkaConsumer# 编写代码
if __name__ == '__main__':# 创建消费者对象consumer = KafkaConsumer('kafka',bootstrap_servers=['node1:9092'])# 遍历对象for message in consumer:# 格式化打印,设置相关参数# 因为value是二进制,需要decode解码print ("主题:%s,分区:%d,偏移:%d : key=%s value=%s"% (message.topic, message.partition,message.offset, message.key, message.value.decode('utf8')))
可能遇到的错误:
原因: 服务器环境有问题。是因为服务器上既安装了kafka-python的第三方依赖,同时还安装kafka的第三方依赖。可以通过pip list | grep kafka进行确定
解决办法: 先将这两个第三方依赖全部卸载,然后再重新执行如下命令
python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple
01_生产者代码入门.py
# 导包
import timefrom kafka import KafkaProducer# 异步发送:生产者发送消息后不会等待 Kafka 的确认,这种方式可以提高吞吐量,但可能会牺牲一定的可靠性,因为生产者无法立即知道消息是否成功发送。通常可以通过回调函数来处理发送结果。
def yibu():# 创建生产者对象producer = KafkaProducer(bootstrap_servers=['node1:9092'])# 发送消息到kafka# 发送1到10数字到test_python主题中for i in range(1, 11):# 如果主题不存在就会自动创建对应主题(默认分区数是1),存在就使用producer.send(topic='python_test2', value=f'你好啊~{i}'.encode('utf-8'))# 最后一定用close()释放资源一次性把对应消息发送到kafka中或者让程序多等待一会儿# producer.close()time.sleep(3)# 同步发送:生产者发送消息后会阻塞等待 Kafka 确认消息已成功写入,这种方式可以确保消息发送的可靠性,但会降低吞吐量。
def tongbu():# 创建生产者对象producer = KafkaProducer(bootstrap_servers=['node1:9092'])# 发送消息到kafka# 发送1到10数字到test_python主题中for i in range(1, 11):# 如果主题不存在就会自动创建对应主题(默认分区数是1),存在就使用f = producer.send(topic='python_test2', value=f'测试积压问题5~{i}'.encode('utf-8'))# 获取消息的元数据meta = f.get(timeout=10)print(f"主题:{meta.topic},分区:{meta.partition},消息偏移量:{meta.offset}")# main程序入口
if __name__ == '__main__':# 抽取方法的快捷键: 先选中代码, 然后按住ctrl+alt+M,最后起名字确认就行tongbu()
02_消费者代码入门.py
# 导包
from kafka import KafkaConsumer# main程序入口
if __name__ == '__main__':# 创建消费者对象consumer = KafkaConsumer('python_test2', group_id='g_1', bootstrap_servers=['node1:9092'], auto_offset_reset='earliest')# 获取消息# 注意: 默认是获取最新的消息for msg in consumer:print(f"主题:{msg.topic},分区:{msg.partition},消息内容:{msg.value.decode('utf-8')}")
相关文章:

day08_Kafka
文章目录 day08_Kafka课程笔记一、今日课程内容一、消息队列(了解)**为什么消息队列就像是“数据的快递员”?****实际意义**1、产生背景2、消息队列介绍2.1 常见的消息队列产品2.2 应用场景2.3 消息队列中两种消息模型 二、Kafka的基本介绍1、…...

安装conda 环境
conda create -n my_unet5 python3.8 (必须设置3.8版本) conda activate my_unet5...

【dockerros2】ROS2节点通信:docker容器之间/docker容器与宿主机之间
🌀 一个中大型ROS项目常需要各个人员分别完成特定的功能,而后再组合部署,而各人员完成的功能常常依赖于一定的环境,而我们很难确保这些环境之间不会相互冲突,特别是涉及深度学习环境时。这就给团队项目的部署落地带来了…...
使用外网访问在群晖中搭建思源docker
还是要折腾,之前发现用公网IP可以访问就没有折腾,今天ip变了,用不了了,一搜,发现有方法可以用域名访问,哎,太好了! 原文:分享我在 群晖 docker 部署 思源笔记 步骤 - 链…...

深度学习中的EMA技术:原理、实现与实验分析
深度学习中的EMA技术:原理、实现与实验分析 1. 引言 指数移动平均(Exponential Moving Average, EMA)是深度学习中一种重要的模型参数平滑技术。本文将通过理论分析和实验结果,深入探讨EMA的实现和效果。 深度学习中的EMA技术:原理、实现与…...

win32汇编环境,窗口程序中对按钮控件常用操作的示例
;运行效果 ;win32汇编环境,窗口程序中对按钮控件常用操作的示例 ;常用的操作,例如创建按钮控件,使其无效,改变文本,得到文本等。 ;将代码复制进radasm软件里,直接就可以编译运行。重点部分加备注。 ;>&g…...
CentOS 7.9 通过 yum 安装 Docker
文章目录 前言一、删除已安装的 Docker二、网络设置三、设置 yum 源,并安装依赖四、设置 Docker 仓库五、安装及使用 Docker六、镜像仓库总结 前言 CentOS 7.9 过了维护期,Docker 官方文档没有了相关的安装文档。记录一下,备用! …...

【开源免费】基于Vue和SpringBoot的英语知识应用网站(附论文)
本文项目编号 T 138 ,文末自助获取源码 \color{red}{T138,文末自助获取源码} T138,文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…...

工具推荐:PDFgear——免费且强大的PDF编辑工具 v2.1.12
PDFgear——免费且强大的PDF编辑工具 v2.1.12 软件简介 PDFgear 是一款 完全免费的 PDF 软件,支持 阅读、编辑、转换、合并 以及 跨设备签署 PDF 文件,无需注册即可使用。它提供了丰富的 PDF 处理功能,极大提升了 PDF 文件管理的便捷性和效…...

Web渗透测试之XSS跨站脚本 防御[WAF]绕过手法
目录 XSS防御绕过汇总 参考这篇文章绕过 XSS payload XSS防御绕过汇总 服务端知道有网络攻击或者xss攻 Html...

MMDetection框架下的常见目标检测与分割模型综述与实践指南
目录 综述与实践指南 SSD (Single Shot MultiBox Detector) 基本配置和使用代码 RetinaNet 基本配置和使用代码 Faster R-CNN 基本配置和使用代码 Mask R-CNN 基本配置和使用代码 Cascade R-CNN 基本配置和使用代码 总结 综述与实践指南 MMDetection是一个基于Py…...

怎么实现Redis的高可用?
大家好,我是锋哥。今天分享关于【怎么实现Redis的高可用?】面试题。希望对大家有帮助; 怎么实现Redis的高可用? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 为了实现 Redis 的高可用性,我们需要保证在发…...

OpenCV实现Kuwahara滤波
Kuwahara滤波是一种非线性的平滑滤波技术,其基本原理在于通过计算图像模板中邻域内的均值和方差,选择图像灰度值较为均匀的区域的均值来替代模板中心像素的灰度值。以下是Kuwahara滤波的详细原理说明: 一、基本思想 Kuwahara滤波的基本思想…...

WINFORM - DevExpress -> DevExpress总结[安装、案例]
安装devexpress软件 路径尽量不换,后面破解不容易出问题 vs工具箱添加控件例如: ①使用控制台进入DevExpress安装目录: cd C:\Program Files (x86)\DevExpress 20.1\Components\Tools ②添加DevExpress控件: ToolboxCreator.exe/ini:toolboxcreator…...
Golang学习笔记_22——Reader示例
Golang学习笔记_19——Stringer Golang学习笔记_20——error Golang学习笔记_21——Reader 文章目录 io.Reader 示例从字符串中读取从文件中读取从HTTP响应中读取从内存的字节切片中读取自定义io.Reader实现 源码 io.Reader 示例 从字符串中读取 func ReadFromStrDemo() {str…...

【2024年华为OD机试】(A卷,100分)- 猜字谜(Java JS PythonC/C++)
一、问题描述 小王设计了一个简单的猜字谜游戏,游戏的谜面是一个错误的单词,比如 nesw,玩家需要猜出谜底库中正确的单词。猜中的要求如下: 对于某个谜面和谜底单词,满足下面任一条件都表示猜中: 变换顺序…...

iostat命令详解
iostat 命令是 I/O statistics(输入/输出统计)的缩写,用来报告系统的 CPU 统计信息和块设备及其分区的 IO 统计信息。iostat 是 sysstat 工具集的一个工具,在 Ubuntu 系统中默认是不带 iostat 命令的,需要自行安装: $ sudo apt in…...

Linux:操作系统简介
前言: 在本片文章,小编将带大家理解冯诺依曼体系以及简单理解操作喜欢,并且本篇文章将围绕什么以及为什么两个话题进行展开说明。 冯诺依曼体系: 是什么: 冯诺依曼体系(Von Neumann architectureÿ…...
企业级信息系统开发讲课笔记4.12 Spring Boot默认缓存管理
文章目录 1. Spring Boot默认缓存管理2. Spring的缓存机制2.1 缓存机制概述2.2 缓存接口和缓存管理接口3. 声明式缓存注解3.1 @EnableCaching注解3.2 @Cacheable注解3.2.1 value/cacheNames属性3.2.2 key属性3.2.3 keyGenerator属性3.2.4 cacheManager/cacheResolver属性3.2.5 …...

2025制定一个高级java开发路线:分布式系统、多线程编程、高并发经验
1-熟悉分布式系统的设计和应用,熟悉分布式、缓存、消息、负载均衡等机制和实现者优先。 2-熟悉多线程编程,具备高并发经验优先。 技术学习规划:熟悉分布式系统和高并发技术 以下是针对目标要求的系统性学习规划,分为 阶段目标 和…...

第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...

docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...

2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

376. Wiggle Subsequence
376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...

20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...

接口自动化测试:HttpRunner基础
相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具,支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议,涵盖接口测试、性能测试、数字体验监测等测试类型…...
Linux系统部署KES
1、安装准备 1.版本说明V008R006C009B0014 V008:是version产品的大版本。 R006:是release产品特性版本。 C009:是通用版 B0014:是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存:1GB 以上 硬盘…...