【C/C++】记录一次麻烦的Kafka+Json体验
文章目录
- 麻烦的Kafka+Json体验
- 1 目标
- 2 工程搭建
- 2.1 docker配置
- 2.2 代码
- 2.3 工程压缩包
- 3 执行结果
麻烦的Kafka+Json体验
1 目标
初心:结合kafka + json + docker,验证基本的数据生产/消费。
Kafka 配合 JSON 工具,主要是为了数据的序列化和反序列化,以及便于消息内容的格式化、解析和处理(让消息内容可读、结构化、标准化,方便发送、接收和处理)。
-
数据格式标准化
Kafka 本身是一个消息队列系统,消息的内容可以是任意字节流。使用 JSON 作为消息格式,能够让消息结构清晰、规范,方便发送方和接收方统一约定消息格式。 -
跨语言和跨平台兼容
JSON 是一种轻量级数据交换格式,几乎所有编程语言都支持 JSON 的解析和生成。这让 Kafka 发送的消息可以被不同语言和平台的消费者方便地处理。 -
方便调试和监控
JSON 格式是文本格式,易于阅读和打印,方便开发和运维人员在调试、日志查看时快速理解消息内容。 -
灵活的消息结构
JSON 支持嵌套结构、数组、键值对等灵活的数据组织方式,适合表达复杂的数据模型。 -
序列化/反序列化工具支持
常见的 Kafka 客户端库通常提供 JSON 序列化器(Serializer)和反序列化器(Deserializer),让你可以方便地将业务对象转换成 JSON 字符串发送到 Kafka,或从 JSON 字符串转换回业务对象。 -
与 schema 注册中心配合
虽然 JSON 本身没有强类型约束,但通过结合 JSON Schema 和 schema 注册中心(如 Confluent Schema Registry),可以保证数据格式的一致性和兼容性。
2 工程搭建
2.1 docker配置
docker-compose.yml
version: "3.8"services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0container_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.5.0container_name: kafkaports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1depends_on:- zookeeperdev:build:context: .dockerfile: Dockerfile.dev # ✅ 使用构建好的开发镜像container_name: cpp_devtty: truestdin_open: trueworking_dir: /home/dev/codevolumes:- ./cpp_kafka_code:/home/dev/codedepends_on:- kafka
Dockerfile.dev
FROM ubuntu:22.04ENV DEBIAN_FRONTEND=noninteractiveRUN apt-get update && apt-get install -y \build-essential \cmake \git \curl \wget \pkg-config \vim \librdkafka-dev \libssl-dev \libzstd-dev \libjsoncpp-dev \ca-certificates \&& rm -rf /var/lib/apt/lists/*# 安装 nlohmann/json(如未预装)
# RUN wget https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz && \
# tar -xf json.tar.xz && \
# cp -r single_include/nlohmann /usr/include/ && \
# rm -rf json.tar.xz single_include
RUN apt-get update && apt-get install -y \nlohmann-json3-dev# 创建开发用户
RUN useradd -ms /bin/bash dev
USER dev
WORKDIR /home/dev/code
2.2 代码
cpp_kafka_code/
├── CMakeLists.txt
├── include/
│ ├── KafkaProducer.h
│ ├── KafkaConsumer.h
│ ├── ConfigManager.h
│ └── Message.h
├── src/
│ ├── KafkaProducer.cpp
│ ├── KafkaConsumer.cpp
│ ├── ConfigManager.cpp
│ └── Message.cpp
├── config/
│ └── kafka_config.json
├── test/
│ ├── producer_test.cpp
│ └── consumer_test.cpp
CMakeLists.txt
cmake_minimum_required(VERSION 3.14)
project(KafkaModule)set(CMAKE_CXX_STANDARD 17)set(ENV{PKG_CONFIG_PATH} "/usr/lib/x86_64-linux-gnu/pkgconfig")# 查找依赖库
find_package(PkgConfig REQUIRED)
pkg_check_modules(RDKAFKA REQUIRED rdkafka)# 添加头文件路径
include_directories(${CMAKE_SOURCE_DIR}/include${RDKAFKA_INCLUDE_DIRS}/usr/include/librdkafka
)link_directories(${RDKAFKA_LIBRARY_DIRS})# 源码文件
file(GLOB SOURCESsrc/*.cpp
)# 可执行测试文件
add_executable(producer_test test/producer_test.cpp ${SOURCES})
add_executable(consumer_test test/consumer_test.cpp ${SOURCES})# 链接依赖库
# target_link_libraries(producer_test PkgConfig::RDKAFKA)
# target_link_libraries(consumer_test PkgConfig::RDKAFKA)
target_link_libraries(producer_test ${RDKAFKA_LIBRARIES})
target_link_libraries(consumer_test ${RDKAFKA_LIBRARIES})
config/kafka_config.json
{"bootstrap.servers": "kafka:9092","group.id": "my-group","auto.offset.reset": "earliest","enable.auto.commit": "false"
}
include/ConfigManager.h
#pragma once
#include <string>
#include <nlohmann/json.hpp>struct KafkaConfig {std::string brokers;std::string groupId;bool enableIdempotence = true;int batchSize = 100;int lingerMs = 5;
};class ConfigManager {
public:static KafkaConfig loadConfig(const std::string& filename);
};
include/KafkaConsumer.h
#pragma once#include <string>
#include <librdkafka/rdkafka.h>class KafkaConsumer {
public:KafkaConsumer(const std::string& configFile, const std::string& topic);~KafkaConsumer();bool poll(std::string& outMessage);private:rd_kafka_t* rk_;rd_kafka_conf_t* conf_;rd_kafka_topic_partition_list_t* topics_;
};
include/KafkaProducer.h
#pragma once
#include <string>
#include <rdkafka.h>class KafkaProducer {
public:explicit KafkaProducer(const std::string& configFile);bool send(const std::string& topic, const std::string& message);~KafkaProducer();private:rd_kafka_t* producer_ = nullptr;rd_kafka_conf_t* conf_ = nullptr;
};
include/Message.h
#pragma once
#include <string>
#include <nlohmann/json.hpp>class Message {
public:Message() = default;explicit Message(const std::string& jsonStr);std::string toJson() const;void setField(const std::string& key, const std::string& value);std::string getField(const std::string& key) const;private:nlohmann::json data_;
};
src/ConfigManager.cpp
#include "ConfigManager.h"
#include <fstream>KafkaConfig ConfigManager::loadConfig(const std::string& filename) {std::ifstream in(filename);nlohmann::json j;in >> j;KafkaConfig cfg;cfg.brokers = j["bootstrap.servers"];cfg.groupId = j["group.id"];cfg.enableIdempotence = j.value("enable_idempotence", true);cfg.batchSize = j.value("batch_size", 100);cfg.lingerMs = j.value("linger_ms", 5);return cfg;
}
src/KafkaConsumer.cpp
#include "KafkaConsumer.h"
#include <fstream>
#include <iostream>
#include <nlohmann/json.hpp>KafkaConsumer::KafkaConsumer(const std::string& configFile, const std::string& topic) {std::ifstream file(configFile);if (!file.is_open()) {std::cerr << "Failed to open config file: " << configFile << "\n";exit(1);}nlohmann::json configJson;file >> configJson;char errstr[512];conf_ = rd_kafka_conf_new();// for (auto& el : configJson.items()) {// std::string valueStr = el.value().dump();// if (rd_kafka_conf_set(conf_, el.key().c_str(), valueStr.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {// std::cerr << "Config error: " << errstr << "\n";// }// }for (auto& el : configJson.items()) {std::string valStr;if (el.value().is_string()) {valStr = el.value().get<std::string>();} else {valStr = el.value().dump(); // 数字或布尔等用dump转换成字符串}if (rd_kafka_conf_set(conf_, el.key().c_str(), valStr.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {std::cerr << "Config error: " << errstr << "\n";}}rk_ = rd_kafka_new(RD_KAFKA_CONSUMER, conf_, errstr, sizeof(errstr));if (!rk_) {std::cerr << "Failed to create consumer: " << errstr << "\n";exit(1);}// Subscribe to topictopics_ = rd_kafka_topic_partition_list_new(1);rd_kafka_topic_partition_list_add(topics_, topic.c_str(), -1);rd_kafka_subscribe(rk_, topics_);
}KafkaConsumer::~KafkaConsumer() {rd_kafka_consumer_close(rk_);rd_kafka_destroy(rk_);rd_kafka_topic_partition_list_destroy(topics_);
}bool KafkaConsumer::poll(std::string& outMessage) {rd_kafka_message_t* msg = rd_kafka_consumer_poll(rk_, 1000);if (!msg) return false;if (msg->err) {std::cerr << "Consumer error: " << rd_kafka_message_errstr(msg) << "\n";rd_kafka_message_destroy(msg);return false;}std::cout << "Raw message hex: ";for (unsigned int i = 0; i < msg->len; ++i) {printf("%02X ", ((unsigned char*)msg->payload)[i]);}std::cout << std::endl;outMessage = std::string((char*)msg->payload, msg->len);// if (!outMessage.empty()) {// std::cout << "Test outMessage: " << outMessage << std::endl;// }rd_kafka_message_destroy(msg);return true;
}
src/KafkaProducer.cpp
#include "KafkaProducer.h"
#include "ConfigManager.h"
#include <iostream>KafkaProducer::KafkaProducer(const std::string& configFile) {KafkaConfig cfg = ConfigManager::loadConfig(configFile);conf_ = rd_kafka_conf_new();rd_kafka_conf_set(conf_, "bootstrap.servers", cfg.brokers.c_str(), nullptr, 0);if (cfg.enableIdempotence) {rd_kafka_conf_set(conf_, "enable.idempotence", "true", nullptr, 0);}producer_ = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, nullptr, 0);
}bool KafkaProducer::send(const std::string& topic, const std::string& message) {for (unsigned char c : message) {printf("%02X ", c);}printf("\n");rd_kafka_resp_err_t err = rd_kafka_producev(producer_,RD_KAFKA_V_TOPIC(topic.c_str()),RD_KAFKA_V_VALUE(const_cast<char*>(message.c_str()), message.size()),RD_KAFKA_V_END);return err == RD_KAFKA_RESP_ERR_NO_ERROR;
}KafkaProducer::~KafkaProducer() {rd_kafka_flush(producer_, 3000);rd_kafka_destroy(producer_);
}
src/Message.cpp
#include "Message.h"
#include <iostream>Message::Message(const std::string& jsonStr) {data_ = nlohmann::json::parse(jsonStr);
}std::string Message::toJson() const {if (data_.empty()) {std::cerr << "[Message::toJson] Warning: data_ is empty.\n";}return data_.dump();
}void Message::setField(const std::string& key, const std::string& value) {data_[key] = value;
}std::string Message::getField(const std::string& key) const {return data_.value(key, "");
}
test/consumer_test.cpp
#include "KafkaConsumer.h"
#include <iostream>int main() {KafkaConsumer consumer("../config/kafka_config.json", "test_topic");std::cout << "Consumer started. Waiting for messages...\n";while (true) {std::string msg;if (consumer.poll(msg)) {std::cout << "Received: " << msg << std::endl;}}return 0;
}
test/producer_test.cpp
#include "KafkaProducer.h"
#include "Message.h"
#include <iostream>int main() {Message msg;msg.setField("type", "test");msg.setField("payload", "hello");std::string jsonStr = msg.toJson();std::cout << "序列化结果: " << jsonStr << std::endl;Message parsed(jsonStr);std::cout << "解析 payload: " << parsed.getField("payload") << std::endl;// KafkaProducer producer("config/kafka_config.json");Message msg1;msg1.setField("type", "test");msg1.setField("payload", "hello from producer");std::cout << "Send message: " << msg1.toJson() << std::endl;KafkaProducer producer("../config/kafka_config.json");// if (producer.send("test_topic", msg1.toJson())) {// std::cout << "Message sent successfully!\n";// } else {// std::cout << "Message send failed.\n";// }std::string testStr = R"({"type":"test","payload":"hello from producer"})";producer.send("test_topic", testStr);return 0;
}
2.3 工程压缩包
3 执行结果
当前还存在一点bug,produce的数据,consumer接收时,前面总是出现乱码,应该时序列化/反序列化导致的,但是当前还未找到原因,
等我调试好后的好消息吧!!!!
相关文章:
【C/C++】记录一次麻烦的Kafka+Json体验
文章目录 麻烦的KafkaJson体验1 目标2 工程搭建2.1 docker配置2.2 代码2.3 工程压缩包 3 执行结果 麻烦的KafkaJson体验 1 目标 初心:结合kafka json docker,验证基本的数据生产/消费。 Kafka 配合 JSON 工具,主要是为了数据的序列化和反…...

Linux系列-2 Shell常用命令收集
背景 本文用于收集Linux常用命令(基于Centos7),是一个持续更新的博客,建议收藏,编写shell时遇到问题可以随时查阅。 1.Shell类型 shell是用C语言编写的程序,作为命令解释器连接着用户和操作系统内核。常见的shell有sh(Bourne She…...

MATLAB使用多个扇形颜色变化表示空间一个点的多种数值
MATLAB使用多个扇形颜色变化表示空间一个点的多种数值 excel中表格中数据格式,多行 lonlatdata1data2data3117380.11100 clear;close all; figure(Position,[100 100 800 800]);num_points 14; [num,txt,raw] xlsread(test.xlsx); x num(:,1); y num(:,2);d…...
mysql:MVCC机制
MVCC机制 MVCC机制主要是mysql的多版本并发控制的一个机制,它主要是允许mysql去保存同一时间对同一份数据的不同历史版本的,从而避免读写之间的锁竞争,从而去提高并发的性能。 像传统的锁机制(读写互斥锁(Read-Write …...
Vue3 + Element Plus 实现树形结构的“单选 + 只选叶子节点 + 默认选中第一个子节点”
在 Vue 项目中,我们常使用树形结构组件来展示层级数据。本文将介绍如何使用 Element Plus 的 <el-tree> 组件,在 Vue3 中实现以下需求: ✅ 只能勾选叶子节点 ✅ 每次只能选中一个节点(单选) ✅ 页面加载时默认…...

CAD精简多段线顶点、优化、删除多余、重复顶点——CAD c#二次开发
附部分代码如下: public static void Pl精简(){Document doc Autodesk.AutoCAD.ApplicationServices.Application.DocumentManager.MdiActiveDocument;Database db doc.Database;Editor ed doc.Editor;var plOrigon db.SelectCurve("\n选择多段线:");…...

输电线路的“智慧之眼”:全天候可视化监测如何赋能电网安全运维
在电力需求持续攀升、电网规模日益庞大的今天,输电线路的安全稳定运行面临着前所未有的挑战。线路跨越地形复杂多变,尤其是在偏远山区、铁路沿线及恶劣天气条件下,传统的人工巡检方式显得力不从心——效率低、风险高、覆盖有限。如何实现更智…...
Spring 核心知识点补充
Spring 核心知识点补充 1. IoC(控制反转) 核心思想:将对象的创建和依赖管理交给容器,而非在代码中直接控制实现方式: XML 配置:<bean> 标签定义对象注解:Component, Service, Repositor…...

两阶段法目标检测发展脉络
模式识别期末展示大作业,做个记录,希望大家喜欢。 R-CNN Fast R-CNN R-FCN 整个过程可以分解为以下几个步骤: 输入图像 (image) 和初步特征提取 (conv, feature maps): 首先,输入一张原始图像,经过一系列…...
Flannel 支持的后端
Flannel 是一个为 Kubernetes 设计的容器网络解决方案,支持多种后端(backend)来处理节点间的数据包转发。根据官方文档和其他可靠来源,以下是 Flannel 支持的后端类型及其说明: VXLAN(推荐) 描述…...

小白的进阶之路系列之六----人工智能从初步到精通pytorch数据集与数据加载器
本文将介绍以下内容: 数据集与数据加载器 数据迁移 如何建立神经网络 数据集与数据加载器 处理数据样本的代码可能会变得混乱且难以维护;理想情况下,我们希望我们的数据集代码与模型训练代码解耦,以获得更好的可读性和模块化。PyTorch提供了两个数据原语:torch.utils…...
SQL进阶之旅 Day 5: 常用函数与表达式
【SQL进阶之旅 Day 5】常用函数与表达式 在SQL的进阶学习中,掌握常用函数和表达式是提升查询效率、解决复杂业务问题的关键。本篇文章将深入探讨聚合函数、日期函数、条件表达式等核心内容,并结合实际案例分析其应用价值。通过理论讲解、代码示例和性能…...

NestJS——重构日志、数据库、配置
个人简介 👀个人主页: 前端杂货铺 🙋♂️学习方向: 主攻前端方向,正逐渐往全干发展 📃个人状态: 研发工程师,现效力于中国工业软件事业 🚀人生格言: 积跬步…...

c++数据结构8——二叉树的性质
一、二叉树的基本性质 示图1: 性质1:层节点数上限 在一棵二叉树中,第i层至多有2^{i-1}个节点(首层是第1层) 这个性质可以通过数学归纳法证明: 第1层:2^{1-1}2^01个节点(根节点&am…...

Window Server 2019--08 网络负载均衡与Web Farm
本章要点 1、了解网络负载均衡技术 2、掌握Web Farm核心原理 3、掌握如何使用Windows NLB搭建Web Farm环境 网络负载均衡技术将外部计算机发送的连接请求均匀的分配到服务器集群中的每台服务器上,接受到请求的服务器独立地响应客户的请求。 网络负载均衡技术还…...
arcgis字段计算器中计算矢量面的每个点坐标
python脚本 函数 def ExportCoordinates(feat):coors = []partnum = 0partcount = feat.partCountwhile partnum < partcount:part = feat.getPart(partnum)pnt = part.next()while pnt:coors.append("({}, {})".format(pnt.X,pnt.Y))pnt = part.next()if not p…...

SpringBoot:统一功能处理、拦截器、适配器模式
文章目录 拦截器什么是拦截器?为什么要使用拦截器?拦截器的使用拦截路径执行流程典型应用场景DispatcherServlet源码分析 适配器模式适配器模式定义适配器模式角色适配器模式的实现适配器模式应用场景 统⼀数据返回格式优点 统一处理异常总结 拦截器 什…...

AI Agent工具全景解析:从Coze到RAGflow,探索智能体自动化未来!
在人工智能技术持续深入行业应用的背景下,越来越多的企业和个人寻求通过自动化技术来提高效率和减少重复性劳动,AI Agent的崛起已经成为了不可忽视的趋势。AI Agent,即人工智能代理,是一种基于先进的人工智能技术,特别…...
GitLab CI流水线权限隔离
方案概述 本方案实现在GitLab CI/CD中根据不同人员的权限级别执行不同的流水线步骤,主要基于GitLab的以下特性: rules 条件判断variables 变量传递only/except 条件限制用户权限API查询 基础权限模型设计 1. 用户角色定义 角色描述对应GitLab权限De…...
xcode卡死问题,无论打开什么程序xcode总是在转菊花,重启电脑,卸载重装都不行
很可能是因为我们上次没有正常关闭Xcode,而Xcode保留了上次错误的一些记录,而这次打开Xcode依然去加载错误的记录,所以必须完全删除这些记录Xcode才能加载正常的项目。 那么也就是说,我们是不是只需要删除这部分错误记录文件就可以…...

Onvif协议:IPC客户端开发-IPC相机控制(c语言版)
前言: 本博文主要是借鉴OceanStar大神的博文,在他的博文的基础之上做了一部分修改与简化。 博文链接: Onvif协议:IPC客户端开发之鉴权_onvif鉴权方式-CSDN博客 Onvif协议:IPC客户端开发之PTZ控制_onvif ptz-CSDN博客…...

如何最简单、通俗地理解Pytorch?神经网络中的“梯度”是怎么自动求出来的?PyTorch的动态计算图是如何实现即时执行的?
PyTorch是一门科学——现代深度学习工程中的一把锋利利器。它的简洁、优雅、强大,正在让越来越多的AI研究者、开发者深度应用。 1. PyTorch到底是什么?为什么它重要? PyTorch是一个开源的深度学习框架,由Facebook AI Research(FAIR)于2016年发布,它的名字由两个部分组成…...

QT+opecv如何更改图片的拍摄路径
如何更改相机拍摄图片的路径 前言:基础夯实:效果展示:实现功能:遇到问题:未解决: 核心代码: 前言: 最近在项目开发中遇到需要让用户更改相机拍摄路径的问题,用户可自己选…...
WebSocket学习总结
WebSocket 是一种基于TCP的网络通信协议,允许浏览器和服务器之间进行全双工、实时、低延迟的双向数据传输。它突破了传统HTTP协议的限制(请求-响应模式),特别适合需要实时通信的场景(如聊天、实时数据推送、游戏等&…...

秋招Day11 - JVM - 类加载机制
了解类的加载机制吗? JVM是运行Java字节码,也就是运行.class文件的虚拟机,JVM把.class文件中描述类的数据结构加载到内存中,并对数据进行校验,解析和初始化,最终转化为JVM可以使用的类型(Klass…...

Webug4.0靶场通关笔记03- 第3关SQL注入之时间盲注(手注法+脚本法 两种方法)
目录 一、源码分析 1.分析闭合 2.分析输出 (1)查询成功 (2)查询失败 (3)SQL语句执行报错 二、第03关 延时注入 1.打开靶场 2.SQL手注 (1)盲注分析 (2…...
PostgreSQL 数据完整性检查工具对比:amcheck 与 pg_checksums
PostgreSQL 数据完整性检查工具对比:amcheck 与 pg_checksums PostgreSQL 提供了两种重要的数据完整性检查机制:amcheck 扩展和 pg_checksums 工具。它们在功能定位、检查层次和使用场景上有显著区别。 核心对比概览 特性amcheckpg_checksums检查对象…...

Vert.x学习笔记-什么是Handler
Vert.x学习笔记 在Vert.x中,Handler是一个核心概念,用于处理异步事件和回调。它是Vert.x响应式编程模型的核心组件之一,通过函数式接口的方式简化了异步编程的复杂性。 1. Handler的定义 Handler是一个函数式接口,定义如下&#…...
浏览器游戏的次世代革命:WebAssembly 3.0 实战指南
破局开篇:开发者必须跨越的性能鸿沟 在2025年,WebAssembly(WASM)技术已经成为高性能Web应用的核心驱动力。特别是WASM3引擎的广泛应用,使得在浏览器中实现主机级游戏画质成为可能。本文将深入探讨WASM3的关键特性、性…...
Java设计模式之工厂模式与策略模式简单案例学习
目录 1.前言2.工厂模式2.1 简单工厂方法2.2 静态工厂方法2.3 抽象工厂方法 3.策略模式4.区别与联系4.1定义与核心意图4.2 UML 结构对比4.3 关键组成对比4.4 应用场景对比 1.前言 最近接手的项目真的是太无语了,经历了多数人的编写,什么牛马鬼神写法都有&…...