【C/C++】从零开始掌握Kafka
文章目录
- 从零开始掌握Kafka
- 一、Kafka 基础知识理解(理论)
- 1. 核心组件与架构
- 2. 重点概念解析
- 二、Kafka 面试重点知识梳理
- 三、C++ 使用 Kafka 的实践(librdkafka)
- 1. librdkafka 简介
- 2. 安装 librdkafka
- 四、实战:高吞吐生产者与消费者
- 1. 生产者示例(Producer.cpp)
- 2. 消费者示例(Consumer.cpp)
- 五、Kafka 开发相关 C++ 能力
- 六、推荐资料与开源项目
从零开始掌握Kafka
一、Kafka 基础知识理解(理论)
1. 核心组件与架构
组件 | 作用 |
---|---|
Broker | Kafka 节点,负责存储消息 |
Topic | 消息主题,逻辑上的分类 |
Partition | 一个 Topic 的分片,支持并发与扩展性 |
Producer | 负责发送消息 |
Consumer | 负责消费消息 |
Consumer Group | 多消费者协作消费 |
Zookeeper / KRaft | 负责元数据与协调(未来版本转向 KRaft 模式) |
2. 重点概念解析
-
Partition:分片,支持水平扩展(每个 partition 是一个有序日志)。
-
副本机制(Replication):每个 Partition 有一个 leader + N 个 follower,保证高可用。
-
消费者组(Consumer Group):Kafka 实现广播和负载均衡消费的机制。
-
offset 管理:
- 自动提交(enable.auto.commit)
- 手动提交(commitSync / commitAsync)
- Kafka 默认 offset 存在
__consumer_offsets
topic 中。
二、Kafka 面试重点知识梳理
面试点 | 说明 |
---|---|
消息顺序性 | 同一个 partition 内有顺序,跨 partition 无法保证 |
幂等性生产 | 使用 enable.idempotence=true ,避免 producer 重试造成重复发送 |
分布式一致性 | ISR 机制,消息写入需同步到 follower;ACK=all 实现强一致 |
消费位点提交 | 手动提交 offset 是保证消费语义精确一次的关键 |
Rebalance 原理 | 消费者上下线会触发 Rebalance,导致 partition 分配变化 |
三、C++ 使用 Kafka 的实践(librdkafka)
1. librdkafka 简介
-
官方提供的高性能 C/C++ Kafka 客户端库。
-
GitHub 地址:https://github.com/edenhill/librdkafka
-
支持:
- 高吞吐的生产与消费
- offset 提交
- topic/partition 管理
- 幂等发送、压缩、批处理
2. 安装 librdkafka
# Ubuntu
sudo apt-get install librdkafka-dev# Or from source
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install
四、实战:高吞吐生产者与消费者
此处只是简单介绍,完整工程见kafka简单工程
1. 生产者示例(Producer.cpp)
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <csignal>
#include <memory>class ExampleEventCb : public RdKafka::EventCb {void event_cb(RdKafka::Event &event) override {if (event.type() == RdKafka::Event::EVENT_ERROR) {std::cerr << "Kafka Error: " << event.str() << std::endl;}}
};int main() {std::string brokers = "kafka:9092";std::string topic_str = "test_topic";std::string errstr;// 配置ExampleEventCb event_cb;std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));conf->set("bootstrap.servers", brokers, errstr);conf->set("event_cb", &event_cb, errstr);// 创建 producerstd::unique_ptr<RdKafka::Producer> producer(RdKafka::Producer::create(conf.get(), errstr));if (!producer) {std::cerr << "Failed to create producer: " << errstr << std::endl;return 1;}// 创建 Topicstd::unique_ptr<RdKafka::Topic> topic(RdKafka::Topic::create(producer.get(), topic_str, nullptr, errstr));if (!topic) {std::cerr << "Failed to create topic: " << errstr << std::endl;return 1;}std::string message = "Hello from C++ Kafka Producer!";RdKafka::ErrorCode resp = producer->produce(topic.get(), // topic ptrRdKafka::Topic::PARTITION_UA, // partitionRdKafka::Producer::RK_MSG_COPY, // message flagsconst_cast<char *>(message.c_str()), // payloadmessage.size(), // payload sizenullptr, // optional keynullptr); // opaqueif (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;} else {std::cout << "Message sent successfully\n";}producer->flush(3000);return 0;
}}
2. 消费者示例(Consumer.cpp)
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <csignal>
#include <memory>bool running = true;void signal_handler(int) {running = false;
}class ExampleEventCb : public RdKafka::EventCb {void event_cb(RdKafka::Event &event) override {if (event.type() == RdKafka::Event::EVENT_ERROR) {std::cerr << "Kafka Error: " << event.str() << std::endl;}}
};int main() {signal(SIGINT, signal_handler);std::string brokers = "kafka:9092";std::string topic = "test_topic";std::string group_id = "cpp_consumer_group";std::string errstr;ExampleEventCb event_cb;auto conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);conf->set("bootstrap.servers", brokers, errstr);conf->set("group.id", group_id, errstr);conf->set("auto.offset.reset", "earliest", errstr);conf->set("event_cb", &event_cb, errstr);auto consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!consumer) {std::cerr << "Failed to create consumer: " << errstr << std::endl;return 1;}consumer->subscribe({topic});std::cout << "Consuming messages from topic " << topic << std::endl;while (running) {auto msg = consumer->consume(1000);if (msg->err() == RdKafka::ERR_NO_ERROR) {std::string message(reinterpret_cast<const char*>(msg->payload()), msg->len());std::cout << "Received message: " << message << std::endl;}}delete msg;}consumer->close();delete consumer;return 0;
}
五、Kafka 开发相关 C++ 能力
-
熟练使用 RAII、智能指针、异常处理
-
理解线程安全、异步模型(poll, callback)
-
能够结合 JSON/XML 配置 Kafka 客户端
-
编写模块化、高性能的消息收发组件
-
构建系统:CMake
-
日志:spdlog 或 glog
-
单元测试:gtest
-
JSON:nlohmann/json
六、推荐资料与开源项目
-
📚 Kafka 权威指南(原书第2版)
-
📘 librdkafka 文档
-
📖 Apache Kafka 官方文档
-
💻 开源项目参考:
- confluent-kafka-cpp
- cppkafka(封装更现代 C++)
相关文章:
【C/C++】从零开始掌握Kafka
文章目录 从零开始掌握Kafka一、Kafka 基础知识理解(理论)1. 核心组件与架构2. 重点概念解析 二、Kafka 面试重点知识梳理三、C 使用 Kafka 的实践(librdkafka)1. librdkafka 简介2. 安装 librdkafka 四、实战:高吞吐生…...
02_redis分布式锁原理
文章目录 一、redis如何实现分布式锁1. 使用 SETNX 命令2. 设置过期时间3. 释放锁4. 注意事项5. 示例代码 二、Java中分布式锁如何设置超时时间1. Redis分布式锁2. 基于Zookeeper的分布式锁3. 基于数据库的分布式锁注意事项 一、redis如何实现分布式锁 Redis 实现分布式锁是一…...

简单血条于小怪攻击模板
创建一个2d正方形(9-Sliced)命名为Player,在Player下面新建一个画布(Canvas)命名为PlayerHealthUI,在画布下面新建一个滑动条(Slider)命名为HealthBar 把PlayerHealthUI脚本挂载到Pl…...
Win11 系统登入时绑定微软邮箱导致用户名欠缺
Win11 系统登入时绑定微软邮箱导致用户名欠缺 解决思路 -> 解绑当前微软邮箱和用户名 -> 断网离线建立本地账户 -> 设置本地账户为Admin权限 -> 注销当前账户,登入新建的用户 -> 联网绑定微软邮箱 -> 删除旧的用户命令步骤 管理员权限打开…...

代码随想录算法训练营第四十六四十七天
卡码网题目: 110. 字符串接龙105. 有向图的完全联通106. 岛屿的周长107. 寻找存在的路径 其他: 今日总结 往期打卡 110. 字符串接龙 跳转: 110. 字符串接龙 学习: 代码随想录公开讲解 问题: 字典 strList 中从字符串 beginStr 和 endStr 的转换序列是一个按下述规格形成的序…...

华硕FL8000U加装16G+32G=48G内存条
华硕FL8000U加装16G32G48G内存条 一、华硕FL8000U加装内存条endl 一、华硕FL8000U加装内存条 相关视频链接: https://www.bilibili.com/video/BV1gw4dePED8/ endl...
前后端联调实战指南:Axios拦截器、CORS与JWT身份验证全解析
前言 在现代Web开发中,前后端分离架构已成为主流,而前后端联调则是开发过程中不可避免的关键环节。本文将深入探讨前后端联调中的三大核心技术:Axios拦截器的灵活运用、CORS跨域问题的全面解决方案以及JWT身份验证的安全实现。通过本文&…...
java高级 -Junit单元测试
Junit单元测试就是针对最小的功能:方法,编写测试代码对其进行正确性测试。用main方法进行测试的弊端是一个方法测试失败可能会影响别的方法的测试,也无法得到测试报告,需要我们自己观察数据是否正确。 此时,我们就需要…...
在 UVM验证环境中,验证 Out-of-Order或 Interleaving机制
在 UVM验证环境中,验证 Out-of-Order或 Interleaving机制 摘要:在 UVM (Universal Verification Methodology) 验证环境中,验证 Out-of-Order (乱序) 或 Interleaving (交错) 机制是验证复杂 SoC (System on Chip) 设计的重要任务,尤其是在验证高速接口(如 PCIe、AXI)、缓…...
V9数据库替换授权
文章目录 环境文档用途详细信息 环境 系统平台:Linux x86-64 Red Hat Enterprise Linux 7 版本:9.0 文档用途 1、本文档用于指导V9数据库替换授权。 2、V9数据库授权文件为license.dat。 详细信息 1、上传新的授权文件到服务器并修改授权文件属主为…...

勇闯Chromium—— Chromium的多进程架构
问题 构建一个永不崩溃或挂起的渲染引擎几乎是不可能的,构建一个绝对安全的渲染引擎也几乎是不可能的。 从某种程度上来说,2006 年左右的网络浏览器状态与过去单用户、协作式多任务操作系统的状况类似。正如在这样的操作系统中,一个行为不端的应用程序可能导致整个系统崩溃…...
Go语言中常量的命名规则详解
1. 常量的基本命名规则 1.1. 命名格式 1. 使用const关键字声明; 2. 命名格式:const 常量名 [类型] 值; 3. 类型可以省略,由编译器推断; 1.2. 命名风格 大小写规则: 1. 首字母大写:导出常…...

软件质量保证与测试实验
课程 软件质量保证与测试 目的:练习软件测试中白盒测试方法 内容: 测试如下程序段: #include <stdio.h>int main() {int i 1, n1 0, n2 0;float sum 0.0;float average;float score[100];printf("请输入分…...

历年华东师范大学保研上机真题
2025华东师范大学保研上机真题 2024华东师范大学保研上机真题 2023华东师范大学保研上机真题 在线测评链接:https://pgcode.cn/school?classification1 简单一位数代数式计算 题目描述 给一个小学生都会算的1位数与1位数运算的代数式,请你求出这个表…...
【C++】什么是静态库?什么是动态库?
静态库与动态库详解 静态库和动态库是软件开发中两种不同的代码共享和重用机制,它们在链接方式、内存使用和部署方式上有显著区别。 一、静态库(Static Library) 基本概念 静态库是在编译期间被完整复制到最终可执行文件中的预编译代码集合。 主要特点 链接时…...
项目阅读:Instruction Defense
总目录 大模型安全相关研究:https://blog.csdn.net/WhiffeYF/article/details/142132328 https://learnprompting.org/docs/prompt_hacking/defensive_measures/instruction https://www.doubao.com/chat/6945469301219586 速览 指令防御(Instructio…...
springboot中拦截器配置使用
文章目录 前置拦截器代码拦截器注册疑问 前置 你使用 javaspringboot 常用在: 身份验证与授权,使用拦截器检查用户的身份验证状态和权限级别,确保只有经过验证且有适当权限的用户能够访问特定资源日志记录与审计性能分析与监控࿰…...
用 Python 构建自动驾驶的实时通信系统:让车辆“交流”起来!
用 Python 构建自动驾驶的实时通信系统:让车辆“交流”起来! 自动驾驶技术正加速变革全球交通体系,它不仅是机器学习与计算机视觉的胜利,更是一场 高效通信架构的革命。自动驾驶汽车需要实时交换信息,比如: 传感器数据(雷达、激光雷达、摄像头)V2V(车与车通信)V2X(…...

在机器学习中,L2正则化为什么能够缓过拟合?为何正则化等机制能够使一个“过度拟合训练集”的模型展现出更优的泛化性能?正则化
在现代机器学习的发展历程中,过拟合(Overfitting)始终是亟需克服的重要挑战。其表现如同在训练数据上构建过度复杂的映射函数,虽能实现近乎完美的拟合,但其泛化能力却显著受限,导致模型在测试集或实际应用中…...
day36 python神经网络训练
目录 一、数据准备与预处理 二、数据集划分与归一化 三、构建神经网络模型 四、定义损失函数和优化器 五、训练模型 六、评估模型 在机器学习和深度学习的实践中,信贷风险评估是一个非常重要的应用场景。通过构建神经网络模型,我们可以对客户的信用…...

k8s部署ELK补充篇:kubernetes-event-exporter收集Kubernetes集群中的事件
k8s部署ELK补充篇:kubernetes-event-exporter收集Kubernetes集群中的事件 文章目录 k8s部署ELK补充篇:kubernetes-event-exporter收集Kubernetes集群中的事件一、kubernetes-event-exporter简介二、kubernetes-event-exporter实战部署1. 创建Namespace&a…...
【Excel VBA 】窗体控件分类
一、Excel 窗体控件分类 Excel 中的窗体控件分为两大类型,适用于不同的开发需求: 类型所在选项卡特点表单控件开发工具 → 插入 → 表单控件简单易用,直接绑定宏,兼容性好,适合基础自动化操作。ActiveX 控件开发工具…...

C++性能相关的部分内容
C性能相关的部分内容 与底层硬件紧密结合 大端存储和小端存储(硬件概念) C在不同硬件上运行的结果可能不同 比如:输入01234567,对于大端存储的硬件会先在较大地址上先进行存储,而对于小端存储的硬件会先在较小地址上…...
Spring Boot 项目中常用的 ORM 框架 (JPA/Hibernate) 在性能方面有哪些需要注意的点?
在 Spring Boot 项目中使用 JPA (Java Persistence API) / Hibernate (作为 JPA 的默认实现) 时,性能是一个非常关键的考量点。虽然 ORM 极大地简化了数据库交互,但如果不注意,很容易引入性能瓶颈。以下是一些关键的性能注意事项:…...
基于大模型的大肠癌全流程预测与诊疗方案研究报告
目录 一、引言 1.1 研究背景与意义 1.2 研究目的与创新点 二、大模型技术概述 2.1 大模型原理与架构 2.2 大模型在医疗领域的应用现状 三、术前风险预测与准备 3.1 术前风险预测指标 3.2 大模型预测方法与结果 3.3 基于预测结果的术前准备方案 四、术中风险预测与应…...
解决DeepSeek部署难题:提升效率与稳定性的关键策略
DeepSeek 部署中常见问题及对应解决方案 随着大模型技术的快速发展,DeepSeek 作为国内领先的大语言模型之一,广泛应用于自然语言处理、智能客服、内容生成等多个领域。 然而,在实际部署过程中,许多开发者和企业会遇到一系列挑战&a…...

AI进行提问、改写、生图、联网搜索资料,嘎嘎方便!
极客侧边栏-AI板块 目前插件内已接入DeepSeek-R1满血版、Qwen3满血版 、豆包/智谱最新发布的推理模型以及各种顶尖AI大模型,并且目前全都可以免费不限次数使用,秒回不卡顿,联网效果超好! 相比于市面上很多AI产品,极客…...

GStreamer开发笔记(四):ubuntu搭建GStreamer基础开发环境以及基础Demo
若该文为原创文章,转载请注明原文出处 本文章博客地址:https://blog.csdn.net/qq21497936/article/details/147714800 长沙红胖子Qt(长沙创微智科)博文大全:开发技术集合(包含Qt实用技术、树莓派、三维、O…...

2021年认证杯SPSSPRO杯数学建模A题(第二阶段)医学图像的配准全过程文档及程序
2021年认证杯SPSSPRO杯数学建模 A题 医学图像的配准 原题再现: 图像的配准是图像处理领域中的一个典型问题和技术难点,其目的在于比较或融合同一对象在不同条件下获取的图像。例如为了更好地综合多种信息来辨识不同组织或病变,医生可能使用…...

CV中常用Backbone-3:Clip/SAM原理以及代码操作
前面已经介绍了简单的视觉编码器,这里主要介绍多模态中使用比较多的两种backbone:1、Clip;2、SAM。对于这两个backbone简单介绍基本原理,主要是讨论使用这个backbone。 1、CV中常用Backbone-2:ConvNeXt模型详解 2、CV中…...