【C/C++】基于 Docker 容器运行的 Kafka + C++ 练手项目
文章目录
- 基于 Docker 容器运行的 Kafka + C++ 练手项目
- 1 项目目的
- 2 项目框架
- 3 代码
- 4 编译运行
- 5 功能与接口说明
- 5.1 Producer 接口:`producer.cpp`
- 关键调用流程
- 参数说明
- 5.2 Consumer 接口:`consumer.cpp`
- 关键调用流程
- 消费流程中注意
- 5.3 工程技术点
基于 Docker 容器运行的 Kafka + C++ 练手项目
使用 C++ 语言调用 Kafka 接口的示例项目,通过容器化部署 Kafka + Zookeeper 环境,实现了 Kafka 生产者与消费者的基本功能。
1 项目目的
- 学习如何用 C++ 操作 Kafka(使用 librdkafka 的 C++ 封装)
- 实践分布式消息队列的基本使用模式:生产者-消费者
- 通过 Docker 快速部署 Kafka + Zookeeper 环境
- 为将来构建中间件(如日志系统、异步任务系统、RPC 框架)奠定基础
2 项目框架
cpp-kafka-project/
├── docker-compose.yml # Kafka + Zookeeper + 开发环境容器定义
├── cpp_kafka_code/
│ ├── CMakeLists.txt
│ ├── producer.cpp
│ ├── consumer.cpp
│ └── create_topic.sh # 创建 topic 的脚本
关键技术点
-
Kafka + Zookeeper 容器化
-
使用 Confluent 提供的官方镜像:
confluentinc/cp-kafka
和cp-zookeeper
-
通过
docker-compose.yml
启动三个容器:zookeeper
:协调 Kafka Brokerkafka
:消息代理cpp_dev
:Ubuntu 开发容器,内含 C++ 源码和构建环境
-
-
Kafka C++ 客户端库
- 使用
librdkafka
的 C++ 封装接口rdkafkacpp.h
- 动态链接
librdkafka++
和librdkafka
- 使用
-
CMake 构建系统
- 自动查找和链接 Kafka 所需的库与头文件
- 支持分离构建(out-of-source)
3 代码
docker-compose.yml
version: "3.8"services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0container_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.5.0container_name: kafkaports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1depends_on:- zookeeperdev:image: ubuntu:22.04container_name: cpp_devtty: truestdin_open: truecommand: /bin/bashworking_dir: /home/dev/codevolumes:- ./cpp_kafka_code:/home/dev/codedepends_on:- kafka
create-topic.sh
#!/bin/bashdocker exec kafka kafka-topics \--create \--topic test_topic \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
加执行权限
chmod +x create_topic.sh
CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(cpp_kafka_example)set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)find_package(PkgConfig REQUIRED)
pkg_check_modules(RDKAFKA REQUIRED IMPORTED_TARGET rdkafka++)add_executable(producer producer.cpp)
target_link_libraries(producer PkgConfig::RDKAFKA)add_executable(consumer consumer.cpp)
target_link_libraries(consumer PkgConfig::RDKAFKA)
producer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <csignal>
#include <memory>class ExampleEventCb : public RdKafka::EventCb {void event_cb(RdKafka::Event &event) override {if (event.type() == RdKafka::Event::EVENT_ERROR) {std::cerr << "Kafka Error: " << event.str() << std::endl;}}
};int main() {std::string brokers = "kafka:9092";std::string topic_str = "test_topic";std::string errstr;// 配置ExampleEventCb event_cb;std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));conf->set("bootstrap.servers", brokers, errstr);conf->set("event_cb", &event_cb, errstr);// 创建 producerstd::unique_ptr<RdKafka::Producer> producer(RdKafka::Producer::create(conf.get(), errstr));if (!producer) {std::cerr << "Failed to create producer: " << errstr << std::endl;return 1;}// 创建 Topicstd::unique_ptr<RdKafka::Topic> topic(RdKafka::Topic::create(producer.get(), topic_str, nullptr, errstr));if (!topic) {std::cerr << "Failed to create topic: " << errstr << std::endl;return 1;}std::string message = "Hello from C++ Kafka Producer!";RdKafka::ErrorCode resp = producer->produce(topic.get(), // topic ptrRdKafka::Topic::PARTITION_UA, // partitionRdKafka::Producer::RK_MSG_COPY, // message flagsconst_cast<char *>(message.c_str()), // payloadmessage.size(), // payload sizenullptr, // optional keynullptr); // opaqueif (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;} else {std::cout << "Message sent successfully\n";}producer->flush(3000);return 0;
}
consumer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <csignal>
#include <memory>bool running = true;void signal_handler(int) {running = false;
}class ExampleEventCb : public RdKafka::EventCb {void event_cb(RdKafka::Event &event) override {if (event.type() == RdKafka::Event::EVENT_ERROR) {std::cerr << "Kafka Error: " << event.str() << std::endl;}}
};int main() {signal(SIGINT, signal_handler);std::string brokers = "kafka:9092";std::string topic = "test_topic";std::string group_id = "cpp_consumer_group";std::string errstr;ExampleEventCb event_cb;auto conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);conf->set("bootstrap.servers", brokers, errstr);conf->set("group.id", group_id, errstr);conf->set("auto.offset.reset", "earliest", errstr);conf->set("event_cb", &event_cb, errstr);auto consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!consumer) {std::cerr << "Failed to create consumer: " << errstr << std::endl;return 1;}consumer->subscribe({topic});std::cout << "Consuming messages from topic " << topic << std::endl;while (running) {auto msg = consumer->consume(1000);if (msg->err() == RdKafka::ERR_NO_ERROR) {std::string message(reinterpret_cast<const char*>(msg->payload()), msg->len());std::cout << "Received message: " << message << std::endl;}}delete msg;}consumer->close();delete consumer;return 0;
}
4 编译运行
- 启动容器
# docker-compose.yml所在目录下
docker-compose up -d
- 安装依赖并编译
docker exec -it cpp_dev /bin/bash
# 以下在容器内执行
apt update && apt install -y g++ cmake pkg-config librdkafka-dev librdkafka++1
mkdir -p build && cd build
cmake ..
make
- 创建kafka topic:
# 宿主机下
./create_topic.sh
- cpp_dev容器下运行consumer和producer
./consumer &
./producer
输出
/home/dev/code/build# ./consumer &
[1] 4069
/home/dev/code/build# Consuming messages from topic test_topic/home/dev/code/build# ./producer
Message sent successfully
Received message: Hello from C++ Kafka Producer!
5 功能与接口说明
5.1 Producer 接口:producer.cpp
功能:向指定的 topic(如 test_topic
)持续发送消息。
关键调用流程
RdKafka::Conf::create(...) // 创建配置对象
conf->set(...) // 设置 broker 等参数
RdKafka::Producer::create(...) // 创建 Producer 实例
producer->produce(...) // 发送消息
参数说明
参数 | 说明 |
---|---|
topic | 目标 topic 名称 |
partition | 使用 RdKafka::Topic::PARTITION_UA 表示由 Kafka 自动分配 |
message flags | 通常为 RK_MSG_COPY |
payload | 消息数据(char*) |
payload length | 消息长度(size_t) |
5.2 Consumer 接口:consumer.cpp
功能:从指定的 topic 订阅并消费消息。
关键调用流程
RdKafka::Conf::create(...) // 创建全局配置
conf->set(...) // 设置 group.id 等参数
RdKafka::KafkaConsumer::create(...) // 创建 KafkaConsumer 实例
consumer->subscribe(...) // 订阅 topic
consumer->consume(...) // 拉取消息
消费流程中注意
msg->payload()
需要转换为char*
后构造成字符串打印- 使用
msg->err()
判断是否正常收到消息
5.3 工程技术点
技术点 | 描述 |
---|---|
容器部署 | 无需本机安装 Kafka,快速启动测试环境 |
Kafka 消费模型 | 使用 KafkaConsumer 拉模式消费,便于理解 |
CMake 模块化 | 可轻松扩展更多模块(如 logger、metrics) |
中间件模板 | 可作为日志系统、消息队列、调度中心等中间件的原型 |
相关文章:
【C/C++】基于 Docker 容器运行的 Kafka + C++ 练手项目
文章目录 基于 Docker 容器运行的 Kafka C 练手项目1 项目目的2 项目框架3 代码4 编译运行5 功能与接口说明5.1 Producer 接口:producer.cpp关键调用流程参数说明 5.2 Consumer 接口:consumer.cpp关键调用流程消费流程中注意 5.3 工程技术点 基于 Docke…...

Linux系统管理与编程24:基础条件准备-混搭“本地+阿里云”yum源
兰生幽谷,不为莫服而不芳; 君子行义,不为莫知而止休。 1.添加宿主机共享文件夹 Linux虚拟机可以和宿主机共享文件夹,这样有利于工具文件的共享。具体操作如下: 1)vmware workstation共享文件夹 虚拟机…...
新一代Python管理UV完全使用指南|附实际体验与效果对比
简介 uv是新一代的Python项目管理工具,具备开发一个完整项目的所有功能点: 功能点描述包管理完全替代pip的功能,支持包的安装、升级、卸载等操作虚拟环境管理内置虚拟环境创建和管理,无需额外安装virtualenv或venv依赖解析与锁定…...

如何在 Windows 10 PC 上获取 iPhone短信
您可以轻松地将媒体数据从 iPhone 传输到 Windows 计算机,并直接访问计算机上的数据。但是,您可以在 Windows 10 PC 上接收 iPhone 短信吗?有什么功能或工具支持它吗?如果您发现在 Windows 10 PC 上接收 iPhone 消息很困难&#x…...
STM32程序运行不了,仿真功能也异常,连断点和复位都异常了
先检查有没有出现复位引脚rst短接0的情况 在检查是否出现明明没配置该外设你却偏偏要使用的情况,比如串口没配置你却偏要发送,引脚没配置你却偏要读取 这几个可能最好的办法就是从开头一行一行注释再运行看看能不能跑起来 还可以用以下方法 检查硬…...

Linux 系统中的软链接与硬链接
目录 一、什么是软链接? 1. 创建软链接 2. 软链接的特性 3. 软链接的用途 二、什么是硬链接? 1. 创建硬链接 2. 硬链接的特性 3. 硬链接的用途 4. 目录硬链接的特殊性 编辑 三、软链接与硬链接的区别 1. inode 编号 2. 路径依赖 3. 删除行…...

Python爬虫第22节- 结合Selenium识别滑动验证码实战
目录 一、引言 二、滑动验证码原理与反爬机制 2.1 验证码原理 2.2 反爬机制 三、工程实战:滑动验证码识别全流程 3.1 工程准备 3.1.1 环境依赖 3.1.2 目标网站与验证码识别案例 3.2 核心破解流程 3.2.1 自动化打开网页与登录 3.2.2 获取验证码图片&#…...
【C/C++】chrono简单使用场景
chrono使用场景举例 1 输出格式化字符串 示例代码 auto now std::chrono::system_clock::now(); auto t std::chrono::system_clock::to_time_t(now); auto ms std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()) % 1000;std::ostrin…...

Escrcpy(安卓手机投屏软件) v1.29.6 中文绿色版
在数字设备日益普及的今天,用户对于设备的控制和管理需求也在不断增加。对于Android设备用户来说,Escrcpy这款强大的工具无疑是一个福音。它不仅提供了直观的图形化界面,让用户能够轻松显示和控制自己的Android设备,还以完全免费开…...
Oracle MOVE ONLINE 实现原理
Oracle MOVE ONLINE 实现原理 Oracle 的 MOVE ONLINE 操作是一种在线重组表的技术,允许在不中断业务的情况下重新组织表数据。以下是其实现原理的详细分析: 基本概念 MOVE ONLINE 是 Oracle 12c 引入的特性,用于替代传统的 ALTER TABLE ..…...

Linux:深入理解网络层
网络层在复杂的网络环境中确定一个合适的路径.传输到指定的网络中 一、网络层的理解 问题1:为什么要有网络层的概念呢?? ——>我们先来讲一个故事: 假设我在学校里被誉为数学大神,是因为我的数学有考满分的能力&…...
【设计模式】简单工厂模式,工厂模式,抽象工厂模式,单例,代理,go案例区分总结
工厂模式三种类型: 一、简单工厂模式(Simple Factory) 定义: 用一个工厂类,根据传入的参数决定创建哪一种具体产品类实例。 面试说法: 由一个统一的工厂创建所有对象,增加新产品时需要修改工…...

Linux_编辑器Vim基本使用
✨✨ 欢迎大家来到小伞的大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:LInux_st 小伞的主页:xiaosan_blog 制作不易!点个赞吧!!谢谢喵!&a…...

vue展示修改前后对比,并显示修改标注diff
动态父组件 <template><el-buttontype"primary"size"small"plainclick"showDiffDialog(subItem)">查看修改内容</el-button><TextDiffDialogv-model:visible"diffDialogVisible":before"currentDiffItem?.…...

LiveWallpaperMacOS:让你的 Mac 桌面动起来
随着桌面美化需求的不断提升,用户对于桌面壁纸的要求已经不再局限于静态图片。越来越多的 Mac 用户希望桌面能像 Windows 一样,拥有动态壁纸,展现个性、提升体验。LiveWallpaperMacOS 正是这样一款让你的 Mac 桌面焕发活力的开源项目。 本文将详细介绍 LiveWallpaperMacOS …...
[预训练]Encoder-only架构的预训练任务核心机制
原创文章1FFN前馈网络与激活函数技术解析:Transformer模型中的关键模块2Transformer掩码技术全解析:分类、原理与应用场景3【大模型技术】Attention注意力机制详解一4Transformer核心技术解析LCPO方法:精准控制推理长度的新突破5Transformer模…...

07-后端Web实战(部门管理)
5. 修改部门 对于任何业务的修改功能来说,一般都会分为两步进行:查询回显、修改数据。 5.1 查询回显 5.1.1 需求 当我们点击 "编辑" 的时候,需要根据ID查询部门数据,然后用于页面回显展示。 5.1.2 接口描述 参照参照…...

mysql ACID 原理
序言:ACID 是一组数据库设计原则,他是业务数据和关键业务程序的可靠性保障。 1、atomicity(原子性) 依赖如下能力 autocommit commit rollback2、一致性 2.1 double write buffer 1、定义:double write buffer 是…...

[Rust_1] 环境配置 | vs golang | 程序运行 | 包管理
目录 Rust 环境安装 GoLang和Rust 关于Go 关于Rust Rust vs. Go,优缺点 GoLang的优点 GoLang的缺点 Rust的优点 Rust的缺点 数据告诉我们什么? Rust和Go的主要区别 (1) 性能 (2) 并发性 (3) 内存安全性 (4) 开发速度 (5) 开发者体验 Ru…...

二十五、面向对象底层逻辑-SpringMVC九大组件之HandlerMapping接口设计
一、引言:MVC架构的交通枢纽 在Spring MVC框架中,HandlerMapping接口扮演着"请求导航仪"的关键角色,它决定了HTTP请求如何被路由到对应的Controller处理器。作为MVC模式的核心组件之一,HandlerMapping在请求处理的生命…...
构建安全高效的邮件网关ngx_mail_ssl_module
一、快速上手:最小配置示例 worker_processes auto;mail {server {# 监听 IMAP over TLSlisten 993 ssl;protocol imap;# TLS 协议与密码套件ssl_protocols TLSv1.2 TLSv1.3;ssl_ciphers HIGH:!aNULL:!MD5;# 证书与私钥ssl_…...

HUAWEI交换机配置镜像口验证(eNSP)
技术术语: 流量观察口:就是我们常说的镜像口,被观察的流量的引流目的端口 流量源端口:企业生产端口,作为观察口观察对象。 命令介绍: [核心交换机]observe-port [观察端口ID或编号(数字&am…...

前端vue3实现图片懒加载
场景和指令用法 场景:电商网站的首页通常会很长,用户不一定能访问到页面靠下面的图片,这类图片通过懒加载优化手段可以做到只有进入视口区域才发送图片请求 核心原理:图片进入视口才发送资源请求 首先:我们需要定义一个全局的指令&#x…...
网站每天几点更新,更新频率是否影响网站收录
1. 每天几点更新网站最合适?总怕时间选错影响收录? 刚开始搞网站的时候,是不是老纠结啥时候更新合适?早上刚上班?半夜没人的时候?选不对时间,总担心搜索引擎爬虫来了没抓到新内容,影…...
主流Markdown编辑器的综合评测与推荐
根据2025年最新资料,结合功能特性、用户体验和技术适配性,以下是对主流Markdown编辑器的综合评测与推荐: 一、核心对比维度与评估方法 功能完整性:支持数学公式、流程图、代码高亮等复杂格式。跨平台兼容性:Windows/m…...

计算机网络-MPLS VPN应用场景与组网
上一篇文章我们通过一个基础实验实现了企业分支间的MPLS VPN互联,如果还不理解的可以多看几遍前面的文章或者多敲下实验。今天来学习几种常见的MPLS VPN应用场景与这些场景下MPLS VPN的部署方法。 一、MPLS VPN典型应用 目前,MPLS VPN的主要应用包括企…...
AugmentFree:解除 AugmentCode 限制的终极方案 如何快速清理vscode和AugmentCode缓存—windows端
AugmentFree1.0工具包:解除 AugmentCode 免费试用限制的终极方案 Augment VIP 是一个专为 VS Code 用户设计的实用工具包,旨在帮助用户管理和清理 VS Code 数据库,解除 AugmentCode 免费试用账户的限制。 augment从根源上解决免费额度限制问…...
WPF【11_7】WPF实战-重构与美化(ViewModel的嵌套与分解、海量数据不要Join)
11-12 【重构】ViewModel的嵌套与分解 目前我们的代码中有一个不易发现的致命问题,如果工作中这样写代码大概率会被打回去重做。那么这个问题是什么呢? --\ViewModels\MainViewModel.cs 视图模型中的 LoadCustomers() 方法,考虑一下在这里我…...

Linux 的编辑器--vim
1.Linux编辑器-vim使⽤ vi/vim的区别简单点来说,它们都是多模式编辑器,不同的是vim是vi的升级版本,它不仅兼容vi的所有指令,⽽且还有⼀些新的特性在⾥⾯。例如语法加亮,可视化操作不仅可以在终端运⾏,也可以…...
Oracle 慢sql排查
Oracle慢sql排查步骤 1.1. 前言 Oracle 慢查询的排查方向包括以下几个方向 : 基准测试 (吞吐量): 包括 Oracle 本身吞吐量和磁盘 I/O 吞吐量硬件分析 (资源情况): 包括查看服务器 CPU , 硬盘的使用情况SQL分析:分析 SQL 中是否存在慢查询 , 是否命中索引配置优化…...