当前位置: 首页 > article >正文

confluent-kafka入门教程

文章目录

  • 官方文档
  • 与kafka-python的对比
  • 配置
    • 文档
    • 配置项
  • Producer代码示例
  • Consumer代码示例

官方文档

confluent_kafka API — confluent-kafka 2.8.0 documentation

Quick Start for Confluent Cloud | Confluent Documentation

与kafka-python的对比

对比维度confluent-kafkakafka-python
性能表现基于librdkafka构建,处理大规模消息时,吞吐量高、延迟低,性能出色纯Python实现,受GIL限制,处理大量并发任务时存在性能瓶颈,高负载下消息处理速度较慢
功能特性具备丰富高级特性,如细粒度配置、复杂分区控制、消息压缩、安全认证等功能相对基础,能满足常见Kafka使用场景,高级特性支持不足
易用性功能丰富、配置选项多,学习曲线较陡,初学者上手难度大接口设计简单直观,易于理解和使用,初学者能快速上手
社区支持由Confluent公司维护,有专业团队和丰富资源,更新维护及时,社区活跃度高开源社区项目,社区较活跃,但资源和支持力度相对较弱
兼容性依赖librdkafka,在不同操作系统和环境中可能存在兼容性问题,需额外配置安装纯Python实现,兼容性好,在各种Python环境中可方便使用,无需额外依赖
适用场景适用于对性能要求高、需高级特性的大规模生产环境,如金融交易系统、实时数据处理平台适合开发和测试环境,以及对性能要求不高的小型项目,如简单日志收集系统、数据监控工具

配置

文档

https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

配置项

字典类型, 配置字段如下:

bootstrap.servers: kafka服务地址, 以逗号分隔

statistics.interval.ms: 发送统计间隔, 需要注册一个统计毁掉函数, 默认值为0,表示禁用统计, 粒度为1000ms

security.protocol: 可选值为“plaintext, ssl, sasl_plaintext, sasl_ssl”, 默认值为“plaintext”, 交互协议

sasl.mechanisms: 用于认证的SASL机制, 支持“GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER”, 默认为“GSSAPI”

sasl.username: SASL用户名, 支持的认证机制为“PLAIN and SASL-SCRAM-…”

sasl.password: SASL密码, 支持的认证机制同sasl.username

group.id: 消费者组id

group.instance.id: 启用静态组成员关系, 静态组成员允许在配置的session.timeout.ms时间范围内, 离开或重新加入组而不会引起组内再平衡。这个使用时候最好配置一个大点的session.timeout.ms值, 可以避免组内再平衡引起的短暂的服务不可用。需要kafka server端版本号>=2.3.0

session.timeout.ms: 客户端组回话或检测失败超时时间, 默认值为45000ms

group.protocol: 使用的组协议, 可选值为“classic”和“consumer”, 当前默认值为classic, 以后版本更新默认值会改为consumer

max.poll.interval.ms: 默认值为30000, 高级消费者调用消费消息函数(例如 rd_kafka_consumer_poll ())之间允许的最大时间间隔。如果超过此时间间隔,消费者将被视为失败,并且消费者组将进行重新平衡,以便将分区重新分配给另一个消费者组成员。警告:此时可能无法进行偏移提交。注意:建议为长时间处理应用程序设置 enable.auto.offset.store=false,然后在消息处理后显式存储偏移(使用 offsets_store()),以确保在处理完成之前不会自动提交偏移。

enable.auto.commit: 默认值为true, 在后台自动并定期提交偏移量。注意:设置为false不会阻止使用者获取先前提交的起始偏移量。为了规避此行为,请在调用assign()时设置每个分区的特定起始偏移量。

auto.commit.interval.ms: 默认值为5000ms, 消费者偏移量被提交(写入)到偏移量存储的毫秒级频率。(0 = 禁用)。此设置由高级消费者使用。

enable.auto.offset.store: 默认值为true, 自动存储提供给应用程序的最后一条消息的偏移量。偏移量存储是每个分区下一个要(自动)提交的偏移量的内存存储。

linger.ms: 默认值为5ms, 在将消息发送前,等待生产者队列中的消息累积以构建消息批次(消息集)的毫秒级延迟。较高的值允许更大、更有效的(开销更小、压缩更好)消息批次累积,但会增加消息传递延迟。

retries: 默认值为2147483647, 消息发送失败的重试次数

retry.backoff.ms: 默认值为100,重试时间间隔, 指数级增长, 受 retry.backoff.max.ms 的限制。

batch.size: 默认值为1000000字节数, 消息批次的最大字节数

Producer代码示例

from random import choice
from confluent_kafka import Producerif __name__ == '__main__':config = {# User-specific properties that you must set'bootstrap.servers': '<BOOTSTRAP SERVERS>','sasl.username':     '<CLUSTER API KEY>','sasl.password':     '<CLUSTER API SECRET>',# Fixed properties'security.protocol': 'SASL_SSL','sasl.mechanisms':   'PLAIN','acks':              'all'}# Create Producer instanceproducer = Producer(config)# Optional per-message delivery callback (triggered by poll() or flush())# when a message has been successfully delivered or permanently# failed delivery (after retries).def delivery_callback(err, msg):if err:print('ERROR: Message failed delivery: {}'.format(err))else:print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))# Produce data by selecting random values from these lists.topic = "purchases"user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']count = 0for _ in range(10):user_id = choice(user_ids)product = choice(products)producer.produce(topic, product, user_id, callback=delivery_callback)count += 1# Block until the messages are sent.producer.poll(10000)producer.flush()

Consumer代码示例

from confluent_kafka import Consumerif __name__ == '__main__':config = {# User-specific properties that you must set'bootstrap.servers': '<BOOTSTRAP SERVERS>','sasl.username':     '<CLUSTER API KEY>','sasl.password':     '<CLUSTER API SECRET>',# Fixed properties'security.protocol': 'SASL_SSL','sasl.mechanisms':   'PLAIN','group.id':          'kafka-python-getting-started','auto.offset.reset': 'earliest'}# Create Consumer instanceconsumer = Consumer(config)# Subscribe to topictopic = "purchases"consumer.subscribe([topic])# Poll for new messages from Kafka and print them.try:while True:# 如果一次想拉取多个消息, 可以用consumer.consume方法, 该方法返回的是一个Message列表# msg_list = consumer.consume(num_messages=消息数量, timeout=如果没有消息, 最长等待的超时时间msg = consumer.poll(1.0)if msg is None:# Initial message consumption may take up to# `session.timeout.ms` for the consumer group to# rebalance and start consumingprint("Waiting...")elif msg.error():print("ERROR: %s".format(msg.error()))else:# Extract the (optional) key and value, and print.print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))except KeyboardInterrupt:passfinally:# Leave group and commit final offsetsconsumer.close()

相关文章:

confluent-kafka入门教程

文章目录 官方文档与kafka-python的对比配置文档配置项 Producer代码示例Consumer代码示例 官方文档 confluent_kafka API — confluent-kafka 2.8.0 documentation Quick Start for Confluent Cloud | Confluent Documentation 与kafka-python的对比 对比维度confluent-ka…...

江苏广电HC2910-创维代工-Hi3798cv200-2+8G-海美迪安卓7.0-强刷包

江苏广电HC2910-创维代工-Hi3798cv200-28G-海美迪安卓7.0-强刷包 说明 1、由于原机的融合网关路由不能设置&#xff0c;原网口无法使用&#xff0c;需要用usb2.0的RJ45usb网卡接入。 通过usb接口网卡联网可以实现百兆网口连接。原机usb3.0的接口可以以接入硬盘&#xff0c;播放…...

如何提高前端应用的性能?

如何提高前端应用的性能&#xff1f; 提高前端应用性能的方法可以从以下几个方面入手&#xff1a; 1. **代码优化** - 使用代码分割&#xff08;Code Splitting&#xff09;按需加载资源 - 减少DOM操作&#xff0c;使用虚拟DOM技术 - 避免深层嵌套的数据结构 - 使用Web Worker…...

python 库 下载 ,整合在一个小程序 UIUIUI

上图 import os import time import threading import requests import subprocess import importlib import tkinter as tk from tkinter import ttk, messagebox, scrolledtext from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import…...

Python爬虫-爬取猫眼演出数据

前言 本文是该专栏的第53篇,后面会持续分享python爬虫干货知识,记得关注。 猫眼平台除了有影院信息之外,它还涵盖了演出信息,比如说“演唱会,音乐节,话剧音乐剧,脱口秀,音乐会,戏曲艺术,相声”等等各种演出相关信息。 而本文,笔者将以猫眼平台为例,基于Python爬虫…...

nvm切换node版本后,解决npm找不到的问题

解决方法如下 命令行查看node版本 node -v找到node版本所对应的npm版本 点击进入node版本 npm对应版本下载 点击进入npm版本 下载Windows 压缩包 下载完成后&#xff0c;解压&#xff0c;文件改名为npm 复制到你nvm对应版本的node_modules 下面 将下载的npm /bin 目录…...

Windows系统安装MySQL安装实战分享

以下是在 Windows 系统上安装 MySQL 的详细实战步骤&#xff0c;涵盖下载、安装、配置及常见问题处理。 一、准备工作 下载 MySQL 安装包 访问 MySQL 官网。选择 MySQL Community Server&#xff08;免费版本&#xff09;。根据系统位数&#xff08;32/64位&#xff09;下载 …...

Vue 人看 React useRef:它不只是替代 ref

如果你是从 Vue 转到 React 的开发者&#xff0c;初见 useRef 可能会想&#xff1a;这不就是 React 版的 ref 吗&#xff1f;但真相是 —— 它能做的&#xff0c;比你想象得多得多。 &#x1f440; Vue 人初见 useRef 在 Vue 中&#xff0c;ref 是我们访问 DOM 或响应式数据的…...

零成本自建企业级SD-WAN!用Panabit手搓iWAN实战

我们前面提到过&#xff0c;最开始了解到Panabit&#xff0c;是因为他的SD-WAN产品&#xff08;误以为是外国货&#xff1f;这家国产SD-WAN神器竟能免费白嫖&#xff0c;附Panabit免费版体验全记录&#xff09;&#xff1b;现在发现&#xff0c;其SD-WAN的技术基础是iWAN&#…...

Unity-微信截图功能简单复刻-03绘制空心矩形

思路-绘制空心矩形 拓展UGUI的Graphic类,实现拖拽接口。 开始拖拽时记录鼠标位置&#xff0c; 使用拖拽中的鼠标位置和记录的位置&#xff0c;计算矩形顶点&#xff0c;绘制矩形。 两个三角形合并为一个矩形&#xff0c;作为空心矩形的一条边&#xff0c;四个边合并为空心矩形…...

国产品牌芯洲科技100V降压芯片系列

SCT2A25采用带集成环路补偿的恒导通时间(COT)模式控制&#xff0c;大大简化了转换器的片外配置。SCT2A25具有典型的140uA低静态电流&#xff0c;采用脉冲频率调制(PFM)模式&#xff0c;它使转换器在轻载或空载条件下实现高转换效率。 芯洲科技100V降压芯片系列提供丰富的48V系…...

研一自救指南 - 07. CSS面向面试学习

最近的前端面试多多少少都会遇到css的提问&#xff0c;感觉还是要把重点内容记记背背。这里基于b站和我自己面试的情况整理。 20250418更新&#xff1a; 1. BFC Block Formatting Context&#xff0c;一个块级的盒子&#xff0c;可以创建多个。里面有很多个块&#xff0c;他们…...

图灵奖得主LeCun:DeepSeek开源在产品层是一种竞争,但在基础方法层更像是一种合作;新一代AI将情感化

图片来源&#xff1a;This is World 来源 | Z Potential Z Highlights&#xff1a; 新型的AI系统是以深度学习为基础&#xff0c;能够理解物理世界并且拥有记忆、推理和规划能力的。一旦成功构建这样的系统&#xff0c;它们可能会有类似情感的反应&#xff0c;但这些情感是基…...

从GET到POST:HTTP请求的攻防实战与CTF挑战解析

初探HTTP请求:当浏览器遇见服务器 基础协议差异可视化 # 典型GET请求 GET /login.php?username=admin&password=p@ssw0rd HTTP/1.1 Host: example.com User-Agent: Mozilla/5.0# 典型POST请求 POST /login.php HTTP/1.1 Host: example.com Content-Type: application/x…...

SQL-exists和in核心区别​、 性能对比​、适用场景​

EXISTS和IN的基本区别。IN用于检查某个值是否在子查询返回的结果集中,而EXISTS用于检查子 查询是否至少返回了一行数据。通常来说,EXISTS在子查询结果集较大时表现更好,因为一旦找 到匹配项就会停止搜索,而IN则需要遍历整个结果集。 在 SQL 中,EXISTS 和 IN 都可以用于…...

Charles 安装与使用详解:实现 App 与小程序 HTTPS 抓包

在日常的移动端开发、接口调试或逆向分析中&#xff0c;我们经常需要抓取移动 App 或微信小程序的 HTTP/HTTPS 请求。Charles 是一款经典强大的代理抓包工具&#xff0c;凭借简单的界面和强大的功能&#xff0c;成为了 macOS 抓包的首选工具之一。 本文将详细介绍 Charles 的安…...

使用Redis5.X部署一个集群

文章目录 1.用Redis5.x来创建Cluste2. 查看节点信息 nodes3. 添加节点 add-node4.删除节点 del-node5.手动指定从节点 replicate6.检查集群健康状态 check 建议使用5.x版本。 首先&#xff0c;下载Redis&#xff0c;根据自己的环境选择版本。 一键启动Redis集群文件配置。 ech…...

Ubuntu Linux 中文输入法默认使用英文标点

ubuntu从wayland切换到x11, 然后安装fcitx(是fcitx4版本)和 fcitx-googlepinyin, 再sudo dpkg -i 安装百度输入法deb包. 在fcitx配置中, 附加组件,打勾高级, 取消打勾标点支持和全角字符. 百度输入法就可以默认用英文标点了. 而google拼音输入法的问题是字体大小没法保存,每…...

Mermaid 是什么,为什么适合AI模型和markdown

什么是 Mermaid&#xff1f; Mermaid 是一个基于 JavaScript 的开源绘图和图表工具&#xff0c;允许用户通过简单的文本语法创建图表。它支持生成流程图、时序图、类图、甘特图等多种类型的可视化内容&#xff0c;并直接从类似 Markdown 的代码中渲染。Mermaid 因其与 Markdow…...

Java漏洞原理与实战

一、基本概念 1、序列化与反序列化 (1)序列化:将对象写入IO流中&#xff0c;ObjectOutputStream类的writeobject()方法可以实现序列化 (2)反序列化:从IO流中恢复对象&#xff0c;ObjectinputStream类的readObject()方法用于反序列化 (3)意义:序列化机制允许将实现序列化的J…...

第十届团体程序设计天梯赛-上理赛点随笔

2025.4.19来到军工路580号上海理工大学赛点参加cccc 校内环境挺好的&#xff0c;校内氛围也不错&#xff1b;临走前还用晚餐券顺走一袋橘子 再来说说比赛 首先是举办方服务器爆了&#xff0c;导致前10分钟刷不出题&#xff0c;一个多小时还上交不了代码 然后就是我用py总有几…...

考公:数字推理

文章目录 1.真题12 312 530 756 ()-3 3 1 12 17 ()356 342 333 324 ()30 28 27 25 () 2215105 1494 1383 1272 ()2 3 8 21 46 ()4/25 1/4 4/9 1 ()39 416 630 848 ()5 8 11 17 () 10714 21 40 77 () 229 2.数字推理方法2.1 差值法2.2 比值法&#xff08;乘法关系&#xff09;2.…...

树莓派超全系列教程文档--(32)config.txt常用音频配置

config.txt常用音频配置 板载模拟音频&#xff08;3.5mm耳机插孔&#xff09;audio_pwm_modedisable_audio_ditherenable_audio_ditherpwm_sample_bits HDMI音频 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 板载模拟音频&#xff08;3.5mm耳机…...

面试专栏-02-MySQL知识点(第二部分)

6、锁 1、分类&#xff1a; 全局锁&#xff1a;锁住数据库中的所有表表级锁&#xff1a;每次操作锁住整张表行级锁&#xff1a;每次操作锁住对应行的数据 2、全局锁 加锁后&#xff0c;整个实例只能进行读取操作&#xff0c;从而保证数据的完成性和一致性。 特点&#xff…...

55、⾸屏加载⽩屏怎么进⾏优化

答&#xff1a; &#xff08;1&#xff09;使⽤CDN 减⼩代码体积&#xff0c;加快请求速度&#xff1b; (2)SSR通过服务端把所有数据全部渲染完成再返回给客⼾端&#xff1b; (3) 路由懒加载&#xff0c;当⽤⼾访问的时候&#xff0c;再加载相应模块&#xff1b; (4) 使⽤外…...

python函数之间嵌套使用yield

假设一种场景&#xff0c;函数 A 可以在获得函数 B 的返回值&#xff08;即一个生成器对象&#xff09;后&#xff0c;再次对其进行 yield 操作。这是因为 Python 的生成器是可迭代的&#xff0c;你可以在一个生成器中迭代另一个生成器&#xff0c;并将其结果逐个 yield 出去。…...

【MySQL数据库入门到精通】

文章目录 一、SQL分类二、DDL-数据库操作1.查询2.创建数据库3.删除数据库4.使用数据库 三、DDL-表操作1.查询 一、SQL分类 根据功能主要分为DDL DML DQL DCL DDL:Date Definition Language数据定义语言&#xff1a;定义数据库&#xff0c;表和字段 DML:Date Manipulatin Lan…...

[Swift]pod install成功后运行项目报错问题error: Sandbox: bash(84760) deny(1)

操作&#xff1a; platform :ios, 14.0target ZKMKAPP do# Comment the next line if you dont want to use dynamic frameworksuse_frameworks!# Pods for ZKMKAPPpod Moyaend pod install成功后运行报错 报错&#xff1a; error: Sandbox: bash(84760) deny(1) file-writ…...

游戏引擎学习第233天

原地归并排序地方很蒙圈 game_render_group.cpp&#xff1a;注意当前的SortEntries函数是O(n^2)&#xff0c;并引入一个提前退出的条件 其实我们不太讨论这些话题&#xff0c;因为我并没有深入研究过计算机科学&#xff0c;所以我也没有太多内容可以分享。但希望在过去几天里…...

卷积神经网络基础(二)

停更好久的卷积神经网络基础知识终于开始更新了哈哈&#xff0c;今天主要介绍的是误差反向传播法。 目录 一、计算图 1.1 用计算图求解 1.2 局部计算 1.3 为什么采用计算图 二、链式法则 2.1 计算图的反向传播 2.2 链式法则 2.3 链式法则和计算图 三、反向传播 3.1 …...