confluent-kafka入门教程
文章目录
- 官方文档
- 与kafka-python的对比
- 配置
- 文档
- 配置项
- Producer代码示例
- Consumer代码示例
官方文档
confluent_kafka API — confluent-kafka 2.8.0 documentation
Quick Start for Confluent Cloud | Confluent Documentation
与kafka-python的对比
| 对比维度 | confluent-kafka | kafka-python |
|---|---|---|
| 性能表现 | 基于librdkafka构建,处理大规模消息时,吞吐量高、延迟低,性能出色 | 纯Python实现,受GIL限制,处理大量并发任务时存在性能瓶颈,高负载下消息处理速度较慢 |
| 功能特性 | 具备丰富高级特性,如细粒度配置、复杂分区控制、消息压缩、安全认证等 | 功能相对基础,能满足常见Kafka使用场景,高级特性支持不足 |
| 易用性 | 功能丰富、配置选项多,学习曲线较陡,初学者上手难度大 | 接口设计简单直观,易于理解和使用,初学者能快速上手 |
| 社区支持 | 由Confluent公司维护,有专业团队和丰富资源,更新维护及时,社区活跃度高 | 开源社区项目,社区较活跃,但资源和支持力度相对较弱 |
| 兼容性 | 依赖librdkafka,在不同操作系统和环境中可能存在兼容性问题,需额外配置安装 | 纯Python实现,兼容性好,在各种Python环境中可方便使用,无需额外依赖 |
| 适用场景 | 适用于对性能要求高、需高级特性的大规模生产环境,如金融交易系统、实时数据处理平台 | 适合开发和测试环境,以及对性能要求不高的小型项目,如简单日志收集系统、数据监控工具 |
配置
文档
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
配置项
字典类型, 配置字段如下:
bootstrap.servers: kafka服务地址, 以逗号分隔
statistics.interval.ms: 发送统计间隔, 需要注册一个统计毁掉函数, 默认值为0,表示禁用统计, 粒度为1000ms
security.protocol: 可选值为“plaintext, ssl, sasl_plaintext, sasl_ssl”, 默认值为“plaintext”, 交互协议
sasl.mechanisms: 用于认证的SASL机制, 支持“GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER”, 默认为“GSSAPI”
sasl.username: SASL用户名, 支持的认证机制为“PLAIN and SASL-SCRAM-…”
sasl.password: SASL密码, 支持的认证机制同sasl.username
group.id: 消费者组id
group.instance.id: 启用静态组成员关系, 静态组成员允许在配置的session.timeout.ms时间范围内, 离开或重新加入组而不会引起组内再平衡。这个使用时候最好配置一个大点的session.timeout.ms值, 可以避免组内再平衡引起的短暂的服务不可用。需要kafka server端版本号>=2.3.0
session.timeout.ms: 客户端组回话或检测失败超时时间, 默认值为45000ms
group.protocol: 使用的组协议, 可选值为“classic”和“consumer”, 当前默认值为classic, 以后版本更新默认值会改为consumer
max.poll.interval.ms: 默认值为30000, 高级消费者调用消费消息函数(例如 rd_kafka_consumer_poll ())之间允许的最大时间间隔。如果超过此时间间隔,消费者将被视为失败,并且消费者组将进行重新平衡,以便将分区重新分配给另一个消费者组成员。警告:此时可能无法进行偏移提交。注意:建议为长时间处理应用程序设置 enable.auto.offset.store=false,然后在消息处理后显式存储偏移(使用 offsets_store()),以确保在处理完成之前不会自动提交偏移。
enable.auto.commit: 默认值为true, 在后台自动并定期提交偏移量。注意:设置为false不会阻止使用者获取先前提交的起始偏移量。为了规避此行为,请在调用assign()时设置每个分区的特定起始偏移量。
auto.commit.interval.ms: 默认值为5000ms, 消费者偏移量被提交(写入)到偏移量存储的毫秒级频率。(0 = 禁用)。此设置由高级消费者使用。
enable.auto.offset.store: 默认值为true, 自动存储提供给应用程序的最后一条消息的偏移量。偏移量存储是每个分区下一个要(自动)提交的偏移量的内存存储。
linger.ms: 默认值为5ms, 在将消息发送前,等待生产者队列中的消息累积以构建消息批次(消息集)的毫秒级延迟。较高的值允许更大、更有效的(开销更小、压缩更好)消息批次累积,但会增加消息传递延迟。
retries: 默认值为2147483647, 消息发送失败的重试次数
retry.backoff.ms: 默认值为100,重试时间间隔, 指数级增长, 受 retry.backoff.max.ms 的限制。
batch.size: 默认值为1000000字节数, 消息批次的最大字节数
Producer代码示例
from random import choice
from confluent_kafka import Producerif __name__ == '__main__':config = {# User-specific properties that you must set'bootstrap.servers': '<BOOTSTRAP SERVERS>','sasl.username': '<CLUSTER API KEY>','sasl.password': '<CLUSTER API SECRET>',# Fixed properties'security.protocol': 'SASL_SSL','sasl.mechanisms': 'PLAIN','acks': 'all'}# Create Producer instanceproducer = Producer(config)# Optional per-message delivery callback (triggered by poll() or flush())# when a message has been successfully delivered or permanently# failed delivery (after retries).def delivery_callback(err, msg):if err:print('ERROR: Message failed delivery: {}'.format(err))else:print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))# Produce data by selecting random values from these lists.topic = "purchases"user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']count = 0for _ in range(10):user_id = choice(user_ids)product = choice(products)producer.produce(topic, product, user_id, callback=delivery_callback)count += 1# Block until the messages are sent.producer.poll(10000)producer.flush()
Consumer代码示例
from confluent_kafka import Consumerif __name__ == '__main__':config = {# User-specific properties that you must set'bootstrap.servers': '<BOOTSTRAP SERVERS>','sasl.username': '<CLUSTER API KEY>','sasl.password': '<CLUSTER API SECRET>',# Fixed properties'security.protocol': 'SASL_SSL','sasl.mechanisms': 'PLAIN','group.id': 'kafka-python-getting-started','auto.offset.reset': 'earliest'}# Create Consumer instanceconsumer = Consumer(config)# Subscribe to topictopic = "purchases"consumer.subscribe([topic])# Poll for new messages from Kafka and print them.try:while True:# 如果一次想拉取多个消息, 可以用consumer.consume方法, 该方法返回的是一个Message列表# msg_list = consumer.consume(num_messages=消息数量, timeout=如果没有消息, 最长等待的超时时间msg = consumer.poll(1.0)if msg is None:# Initial message consumption may take up to# `session.timeout.ms` for the consumer group to# rebalance and start consumingprint("Waiting...")elif msg.error():print("ERROR: %s".format(msg.error()))else:# Extract the (optional) key and value, and print.print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))except KeyboardInterrupt:passfinally:# Leave group and commit final offsetsconsumer.close()
相关文章:
confluent-kafka入门教程
文章目录 官方文档与kafka-python的对比配置文档配置项 Producer代码示例Consumer代码示例 官方文档 confluent_kafka API — confluent-kafka 2.8.0 documentation Quick Start for Confluent Cloud | Confluent Documentation 与kafka-python的对比 对比维度confluent-ka…...
江苏广电HC2910-创维代工-Hi3798cv200-2+8G-海美迪安卓7.0-强刷包
江苏广电HC2910-创维代工-Hi3798cv200-28G-海美迪安卓7.0-强刷包 说明 1、由于原机的融合网关路由不能设置,原网口无法使用,需要用usb2.0的RJ45usb网卡接入。 通过usb接口网卡联网可以实现百兆网口连接。原机usb3.0的接口可以以接入硬盘,播放…...
如何提高前端应用的性能?
如何提高前端应用的性能? 提高前端应用性能的方法可以从以下几个方面入手: 1. **代码优化** - 使用代码分割(Code Splitting)按需加载资源 - 减少DOM操作,使用虚拟DOM技术 - 避免深层嵌套的数据结构 - 使用Web Worker…...
python 库 下载 ,整合在一个小程序 UIUIUI
上图 import os import time import threading import requests import subprocess import importlib import tkinter as tk from tkinter import ttk, messagebox, scrolledtext from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import…...
Python爬虫-爬取猫眼演出数据
前言 本文是该专栏的第53篇,后面会持续分享python爬虫干货知识,记得关注。 猫眼平台除了有影院信息之外,它还涵盖了演出信息,比如说“演唱会,音乐节,话剧音乐剧,脱口秀,音乐会,戏曲艺术,相声”等等各种演出相关信息。 而本文,笔者将以猫眼平台为例,基于Python爬虫…...
nvm切换node版本后,解决npm找不到的问题
解决方法如下 命令行查看node版本 node -v找到node版本所对应的npm版本 点击进入node版本 npm对应版本下载 点击进入npm版本 下载Windows 压缩包 下载完成后,解压,文件改名为npm 复制到你nvm对应版本的node_modules 下面 将下载的npm /bin 目录…...
Windows系统安装MySQL安装实战分享
以下是在 Windows 系统上安装 MySQL 的详细实战步骤,涵盖下载、安装、配置及常见问题处理。 一、准备工作 下载 MySQL 安装包 访问 MySQL 官网。选择 MySQL Community Server(免费版本)。根据系统位数(32/64位)下载 …...
Vue 人看 React useRef:它不只是替代 ref
如果你是从 Vue 转到 React 的开发者,初见 useRef 可能会想:这不就是 React 版的 ref 吗?但真相是 —— 它能做的,比你想象得多得多。 👀 Vue 人初见 useRef 在 Vue 中,ref 是我们访问 DOM 或响应式数据的…...
零成本自建企业级SD-WAN!用Panabit手搓iWAN实战
我们前面提到过,最开始了解到Panabit,是因为他的SD-WAN产品(误以为是外国货?这家国产SD-WAN神器竟能免费白嫖,附Panabit免费版体验全记录);现在发现,其SD-WAN的技术基础是iWAN&#…...
Unity-微信截图功能简单复刻-03绘制空心矩形
思路-绘制空心矩形 拓展UGUI的Graphic类,实现拖拽接口。 开始拖拽时记录鼠标位置, 使用拖拽中的鼠标位置和记录的位置,计算矩形顶点,绘制矩形。 两个三角形合并为一个矩形,作为空心矩形的一条边,四个边合并为空心矩形…...
国产品牌芯洲科技100V降压芯片系列
SCT2A25采用带集成环路补偿的恒导通时间(COT)模式控制,大大简化了转换器的片外配置。SCT2A25具有典型的140uA低静态电流,采用脉冲频率调制(PFM)模式,它使转换器在轻载或空载条件下实现高转换效率。 芯洲科技100V降压芯片系列提供丰富的48V系…...
研一自救指南 - 07. CSS面向面试学习
最近的前端面试多多少少都会遇到css的提问,感觉还是要把重点内容记记背背。这里基于b站和我自己面试的情况整理。 20250418更新: 1. BFC Block Formatting Context,一个块级的盒子,可以创建多个。里面有很多个块,他们…...
图灵奖得主LeCun:DeepSeek开源在产品层是一种竞争,但在基础方法层更像是一种合作;新一代AI将情感化
图片来源:This is World 来源 | Z Potential Z Highlights: 新型的AI系统是以深度学习为基础,能够理解物理世界并且拥有记忆、推理和规划能力的。一旦成功构建这样的系统,它们可能会有类似情感的反应,但这些情感是基…...
从GET到POST:HTTP请求的攻防实战与CTF挑战解析
初探HTTP请求:当浏览器遇见服务器 基础协议差异可视化 # 典型GET请求 GET /login.php?username=admin&password=p@ssw0rd HTTP/1.1 Host: example.com User-Agent: Mozilla/5.0# 典型POST请求 POST /login.php HTTP/1.1 Host: example.com Content-Type: application/x…...
SQL-exists和in核心区别、 性能对比、适用场景
EXISTS和IN的基本区别。IN用于检查某个值是否在子查询返回的结果集中,而EXISTS用于检查子 查询是否至少返回了一行数据。通常来说,EXISTS在子查询结果集较大时表现更好,因为一旦找 到匹配项就会停止搜索,而IN则需要遍历整个结果集。 在 SQL 中,EXISTS 和 IN 都可以用于…...
Charles 安装与使用详解:实现 App 与小程序 HTTPS 抓包
在日常的移动端开发、接口调试或逆向分析中,我们经常需要抓取移动 App 或微信小程序的 HTTP/HTTPS 请求。Charles 是一款经典强大的代理抓包工具,凭借简单的界面和强大的功能,成为了 macOS 抓包的首选工具之一。 本文将详细介绍 Charles 的安…...
使用Redis5.X部署一个集群
文章目录 1.用Redis5.x来创建Cluste2. 查看节点信息 nodes3. 添加节点 add-node4.删除节点 del-node5.手动指定从节点 replicate6.检查集群健康状态 check 建议使用5.x版本。 首先,下载Redis,根据自己的环境选择版本。 一键启动Redis集群文件配置。 ech…...
Ubuntu Linux 中文输入法默认使用英文标点
ubuntu从wayland切换到x11, 然后安装fcitx(是fcitx4版本)和 fcitx-googlepinyin, 再sudo dpkg -i 安装百度输入法deb包. 在fcitx配置中, 附加组件,打勾高级, 取消打勾标点支持和全角字符. 百度输入法就可以默认用英文标点了. 而google拼音输入法的问题是字体大小没法保存,每…...
Mermaid 是什么,为什么适合AI模型和markdown
什么是 Mermaid? Mermaid 是一个基于 JavaScript 的开源绘图和图表工具,允许用户通过简单的文本语法创建图表。它支持生成流程图、时序图、类图、甘特图等多种类型的可视化内容,并直接从类似 Markdown 的代码中渲染。Mermaid 因其与 Markdow…...
Java漏洞原理与实战
一、基本概念 1、序列化与反序列化 (1)序列化:将对象写入IO流中,ObjectOutputStream类的writeobject()方法可以实现序列化 (2)反序列化:从IO流中恢复对象,ObjectinputStream类的readObject()方法用于反序列化 (3)意义:序列化机制允许将实现序列化的J…...
第十届团体程序设计天梯赛-上理赛点随笔
2025.4.19来到军工路580号上海理工大学赛点参加cccc 校内环境挺好的,校内氛围也不错;临走前还用晚餐券顺走一袋橘子 再来说说比赛 首先是举办方服务器爆了,导致前10分钟刷不出题,一个多小时还上交不了代码 然后就是我用py总有几…...
考公:数字推理
文章目录 1.真题12 312 530 756 ()-3 3 1 12 17 ()356 342 333 324 ()30 28 27 25 () 2215105 1494 1383 1272 ()2 3 8 21 46 ()4/25 1/4 4/9 1 ()39 416 630 848 ()5 8 11 17 () 10714 21 40 77 () 229 2.数字推理方法2.1 差值法2.2 比值法(乘法关系)2.…...
树莓派超全系列教程文档--(32)config.txt常用音频配置
config.txt常用音频配置 板载模拟音频(3.5mm耳机插孔)audio_pwm_modedisable_audio_ditherenable_audio_ditherpwm_sample_bits HDMI音频 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 板载模拟音频(3.5mm耳机…...
面试专栏-02-MySQL知识点(第二部分)
6、锁 1、分类: 全局锁:锁住数据库中的所有表表级锁:每次操作锁住整张表行级锁:每次操作锁住对应行的数据 2、全局锁 加锁后,整个实例只能进行读取操作,从而保证数据的完成性和一致性。 特点ÿ…...
55、⾸屏加载⽩屏怎么进⾏优化
答: (1)使⽤CDN 减⼩代码体积,加快请求速度; (2)SSR通过服务端把所有数据全部渲染完成再返回给客⼾端; (3) 路由懒加载,当⽤⼾访问的时候,再加载相应模块; (4) 使⽤外…...
python函数之间嵌套使用yield
假设一种场景,函数 A 可以在获得函数 B 的返回值(即一个生成器对象)后,再次对其进行 yield 操作。这是因为 Python 的生成器是可迭代的,你可以在一个生成器中迭代另一个生成器,并将其结果逐个 yield 出去。…...
【MySQL数据库入门到精通】
文章目录 一、SQL分类二、DDL-数据库操作1.查询2.创建数据库3.删除数据库4.使用数据库 三、DDL-表操作1.查询 一、SQL分类 根据功能主要分为DDL DML DQL DCL DDL:Date Definition Language数据定义语言:定义数据库,表和字段 DML:Date Manipulatin Lan…...
[Swift]pod install成功后运行项目报错问题error: Sandbox: bash(84760) deny(1)
操作: platform :ios, 14.0target ZKMKAPP do# Comment the next line if you dont want to use dynamic frameworksuse_frameworks!# Pods for ZKMKAPPpod Moyaend pod install成功后运行报错 报错: error: Sandbox: bash(84760) deny(1) file-writ…...
游戏引擎学习第233天
原地归并排序地方很蒙圈 game_render_group.cpp:注意当前的SortEntries函数是O(n^2),并引入一个提前退出的条件 其实我们不太讨论这些话题,因为我并没有深入研究过计算机科学,所以我也没有太多内容可以分享。但希望在过去几天里…...
卷积神经网络基础(二)
停更好久的卷积神经网络基础知识终于开始更新了哈哈,今天主要介绍的是误差反向传播法。 目录 一、计算图 1.1 用计算图求解 1.2 局部计算 1.3 为什么采用计算图 二、链式法则 2.1 计算图的反向传播 2.2 链式法则 2.3 链式法则和计算图 三、反向传播 3.1 …...
