Windows10配置C++版本的Kafka,并进行发布和订阅测试
配置的环境为:Release x64下的环境
完整项目:https://gitee.com/jiajingong/kafka-publisher
1、首先下载相应的库文件(.lib,.dll)
参考链接:
GitHub - eStreamSoftware/delphi-kafka
GitHub - cloader/KafkaCPP-win32-dll: KafkaCpp-win32-dll
2、新建一个新的命令行C++工程
建完工程后,选择Release x64,并在生成中执行重新生成解决方案,这样会在项目目录下生成x64/Release文件夹
3、通过VS2017配置附加库目录和附加依赖项
所有的.lib、.dll等库文件均在下图x64/Release目录下
附加依赖项加入:librdkafka.lib;librdkafkacpp.lib,如下图:
4、发布端:
将主函数的CPP文件改为:
#include <iostream>
#include <thread>
#include "rdkafkacpp.h"int main()
{std::string brokers = "172.18.4.96:9092";std::string errorStr;RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (!conf) {std::cout << "Create RdKafka Conf failed" << std::endl;return -1;}conf->set("message.max.bytes", "10240000", errorStr); //最大字节数conf->set("replica.fetch.max.bytes", "20485760", errorStr);conf->set("bootstrap.servers", brokers, errorStr);RdKafka::Producer *producer = RdKafka::Producer::create(conf, errorStr);if (!producer) {std::cout << "Create Producer failed" << std::endl;return -1;}//创建TopicRdKafka::Topic *topic = RdKafka::Topic::create(producer, "koala-stqf-03", tconf, errorStr);if (!topic) {std::cout << "Create Topic failed" << std::endl;}int count = 0;while (true){ //发送消息RdKafka::ErrorCode resCode = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, (char *)"123456789", 10, nullptr, nullptr);std::cout << "Count:" << count << ",has publish:" << (char *)"123456789" << std::endl;if (resCode != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resCode) << std::endl;}count += 1;std::this_thread::sleep_for(std::chrono::seconds(1));}delete conf;delete tconf;delete topic;delete producer;RdKafka::wait_destroyed(5000);return 0;
}
5、订阅端
新建一个同样的订阅端工程,同样将主函数的代码改为:
#include "rdkafkacpp.h"
#include <chrono>
#include <time.h>
#include <sstream>
#include <iomanip>
#include <iostream>
#include <algorithm>
#include <iterator>void consume_cb(RdKafka::Message &message, void *opaque)
{switch (message.err()) {case RdKafka::ERR__TIMED_OUT:std::cout << "RdKafka::ERR__TIMED_OUT" << std::endl;break;case RdKafka::ERR_NO_ERROR:/* Real message */RdKafka::MessageTimestamp ts;ts = message.timestamp();if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {std::string timeprefix;if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {timeprefix = "created time";}else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {timeprefix = "log append time";}unsigned long long milli = ts.timestamp + (unsigned long long)8 * 60 * 60 * 1000;//此处转化为东八区北京时间,如果是其它时区需要按需求修改auto mTime = std::chrono::milliseconds(milli);auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(mTime);auto tt = std::chrono::system_clock::to_time_t(tp);tm timeinfo;::gmtime_s(&timeinfo, &tt);//char s[60]{ 0 };//::sprintf(s, "%04d-%02d-%02d %02d:%02d:%02d", timeinfo.tm_year + 1900, timeinfo.tm_mon + 1, timeinfo.tm_mday, timeinfo.tm_hour, timeinfo.tm_min, timeinfo.tm_sec);// std::cout << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;
#if 0std::stringstream ss;std::string dateStr;ss << timeinfo.tm_year + 1900 << "-"<< timeinfo.tm_mon + 1 << "-"<< timeinfo.tm_mday;ss >> dateStr;ss.clear();ss << timeinfo.tm_hour << ":"<< timeinfo.tm_min << ":"<< timeinfo.tm_sec;std::string timeStr;ss >> timeStr;std::string dateTimeStr;dateTimeStr += dateStr;dateTimeStr.push_back(' ');dateTimeStr += timeStr;
#endif // 0//std::cout << "TimeStamp" << timeprefix << " " << s << std::endl;std::cout << "TimeStamp " << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;}std::cout << message.topic_name() << " offset" << message.offset() << " partion " << message.partition() << " message: " << reinterpret_cast<char*>(message.payload()) << std::endl;break;case RdKafka::ERR__PARTITION_EOF:/* Last message */std::cout << "EOF reached for" << std::endl;break;case RdKafka::ERR__UNKNOWN_TOPIC:case RdKafka::ERR__UNKNOWN_PARTITION:std::cout << "Consume failed: " << message.errstr();break;default:/* Errors */std::cout << "Consume failed: " << message.errstr();break;}
}
int main()
{std::string brokers = "172.18.4.96:9092";std::string errstr;std::vector<std::string> topics{ "koala-stqf-03","klai-seim-alert-koala-test-03"};std::string group_id = "whl-consumer-group";RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);if (conf->set("group.id", group_id, errstr)) {std::cout << errstr << std::endl;return -1;}conf->set("bootstrap.servers", brokers, errstr);conf->set("max.partition.fetch.bytes", "1024000", errstr);//conf->set("enable-auto-commit", "true", errstr);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);tconf->set("auto.offset.reset", "latest", errstr);conf->set("default_topic_conf", tconf, errstr);RdKafka::KafkaConsumer *m_consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!m_consumer) {std::cout << "failed to create consumer " << errstr << std::endl;return -1;}#if 0 //从上一次消费结束的位置开始消费RdKafka::ErrorCode err = m_consumer->subscribe(topics);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;return -1;}
#else //指定每个topic的每个分区开始消费的位置//基本思路为先获取server端的状态信息,将与订阅相关的topic找出来,根据分区,创建TopicPartion;最后使用assign消费RdKafka::Metadata *metadataMap{ nullptr };RdKafka::ErrorCode err = m_consumer->metadata(true, nullptr, &metadataMap, 2000);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;}const RdKafka::Metadata::TopicMetadataVector *topicList = metadataMap->topics();std::cout << "broker topic size: " << topicList->size() << std::endl;RdKafka::Metadata::TopicMetadataVector subTopicMetaVec;std::copy_if(topicList->begin(), topicList->end(), std::back_inserter(subTopicMetaVec), [&topics](const RdKafka::TopicMetadata* data) {return std::find_if(topics.begin(), topics.end(), [data](const std::string &tname) {return data->topic() == tname; }) != topics.end();});std::vector<RdKafka::TopicPartition*> topicpartions;std::for_each(subTopicMetaVec.begin(), subTopicMetaVec.end(), [&topicpartions](const RdKafka::TopicMetadata* data) {auto parVec = data->partitions();std::for_each(parVec->begin(), parVec->end(), [&](const RdKafka::PartitionMetadata *value) {std::cout << data->topic() << " has partion: " << value->id() << " Leader is : " << value->leader() << std::endl;topicpartions.push_back(RdKafka::TopicPartition::create(data->topic(), value->id(), RdKafka::Topic::OFFSET_END));});});m_consumer->assign(topicpartions);
#endif // 0RdKafka::ErrorCode errccc = m_consumer->subscribe(topics);if (errccc != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(errccc) << std::endl;return -1;}while (true){RdKafka::Message *msg = m_consumer->consume(6000);consume_cb(*msg, nullptr); //消息一条消息delete msg;}return 0;
}
6、发布 订阅展示:
相关文章:

Windows10配置C++版本的Kafka,并进行发布和订阅测试
配置的环境为:Release x64下的环境 完整项目:https://gitee.com/jiajingong/kafka-publisher 1、首先下载相应的库文件(.lib,.dll) 参考链接: GitHub - eStreamSoftware/delphi-kafka GitHub - cloade…...
vue3 下载文件 responseType-blob 或者 a标签
在 Vue 3 中,你可以使用 axios 或 fetch 来下载文件,并将 responseType 设置为 blob 以处理二进制数据。以下是一个使用 axios 的示例: 使用 axios 下载文件 首先,确保你已经安装了 axios: npm install axios然后在你…...

【Gin-Web】Bluebell社区项目梳理6:限流策略-漏桶与令牌桶
本文目录 一、限流二、漏桶三、令牌桶算法四、Gin框架中实现令牌桶限流 一、限流 限流又称为流量控制,也就是流控,通常是指限制到达系统的并发请求数。 限流虽然会影响部分用户的使用体验,但是能一定程度上保证系统的稳定性,不至…...

51单片机-AT24CXX存储器工作原理
1、AT24CXX存储器工作原理 1.1、特点: 与400KHz,I2C总线兼容1.8到6.0伏工作电压范围低功耗CMOS技术写保护功能当WP为高电平时进入写保护状态页写缓冲器自定时擦写周期100万次编程/擦除周期可保存数据100年8脚DIP SOIC或TSSOP封装温度范围商业级和工业级…...

突破性能极限:DeepSeek开源FlashMLA解码内核技术解析
引言:大模型时代的推理加速革命 在生成式AI大行其道的今天,如何提升大语言模型的推理效率已成为行业焦点。DeepSeek团队最新开源的FlashMLA项目凭借其惊人的性能表现引发关注——在H800 GPU上实现580 TFLOPS计算性能,这正是大模型推理优化的…...

点击修改按钮图片显示有问题
问题可能出在表单数据的初始化上。在 ave-form.vue 中,我们需要处理一下从后端返回的图片数据,因为它们可能是 JSON 字符串格式。 vue:src/views/tools/fake-strategy/components/ave-form.vue// ... existing code ...Watch(value)watchValue(v: any) …...

[AI]从零开始的树莓派运行DeepSeek模型教程
一、前言 在前面的教程中,教了大家如何在windows中使用llama.cpp来运行DeepSeek模型。根据前面的教程中,我们也了解到了,我们只需要编译好llama.cpp就可以运行DeepSeek以及类似的LLM模型。那么本次教程就来教大家如何使用树莓派来运行大模型。…...

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷(二)
2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷(二) 第一部分:网络平台搭建与设备安全防护任务书第二部分:网络安全事件响应、数字取证调查、应用程序安全任务书任务 1:应急响应&…...

Open WebUI本地部署教程
文章目录 1、系统环境配置2、源码下载2.1 github源码地址下载 3、环境启动3.1 后端环境3.2 前端环境 4、问题4.1 浏览器跨域问题4.2 all-MiniLM-L6-v2模型文件下载失败问题4.3 单独部署backend启动报错问题 1、系统环境配置 操作系统:windows/linux/macos Python版…...

Missing required prop: “maxlength“
背景: 封装一个使用功能相同使用频率较高的input公共组件作为子组件,大多数长度要求为200,且实时显示统计子数,部分input有输入提示。 代码实现如下: <template><el-input v-model"inputValue" t…...

dify本地部署
安装docker。 在官网安装docker。 如果遇到wsl报错,就使用 wsl --updata 进行更新。如果问题解决,进入docker应该是如下界面: 克隆 在自己创建的文件内使用 git clone gitgithub.com:langgenius/dify.git 或 git clone https://github.com…...
python学习一
学习网络安全为什么要学python? 1、在实际的渗透测试过程中,面对复杂多变的网络环境,当常用工 具不能满足实际需求的时候,往往需要对现有工具进行扩展,或者 编写符合我们要求的工具、自动化脚本,这个时候就需要具备一定 的编程能力。 2、python是一门编程语言经常用它…...
git branch
文章目录 1.简介2.格式3.选项4.示例参考文献 1.简介 git branch 用于管理分支,包括查看、创建、删除、重命名和关联。 git branch 是 Git 版本控制系统中用于管理分支的命令。分支是 Git 的核心功能之一,允许开发者在同一个代码库中并行开发不同的功能…...

算法-图-数据结构(邻接矩阵)-BFS广度优先遍历
邻接矩阵广度优先遍历(BFS)是一种用于遍历或搜索图的算法,以下是具体介绍: 1. 基本概念 图是一种非线性的数据结构,由顶点和边组成,可分为无向图、有向图、加权图、无权图等。邻接矩阵是表示图的一种数…...
数学建模之数学模型—2:非线性规划
文章目录 非线性规划基本概念与结论凸集与凸函数极值条件无约束条件的极值判断条件有约束条件的极值判断条件 无约束非线性规划一维搜索算法步骤示例特点代码模板 最速下降法算法详细步骤 代码实现示例最优步长的求解 黄金分割法斐波那契法牛顿法阻尼牛顿法模式搜索法Powell方法…...

unity学习51:所有UI的父物体:canvas画布
目录 1 下载资源 1.1 在window / Asset store下下载一套免费的UI资源 1.2 下载,导入import 1.3 导入后在 project / Asset下面可以看到 2 画布canvas,UI的父物体 2.1 创建canvas 2.1.1 画布的下面是 event system是UI相关的事件系统 2.2 canvas…...

ctfshow做题笔记—栈溢出—pwn57~pwn60
目录 前言 一、pwn57(先了解一下简单的64位shellcode吧) 二、pwn58 三、pwn59(64位 无限制) 四、pwn60(入门难度shellcode) 前言 往前写了几道题,与shellcode有关,关于shellc…...

数据结构 1-2 线性表的链式存储-链表
1 原理 顺序表的缺点: 插入和删除移动大量元素数组的大小不好控制占用一大段连续的存储空间,造成很多碎片 链表规避了上述顺序表缺点 逻辑上相邻的两个元素在物理位置上不相邻 头结点 L:头指针 头指针:链表中第一个结点的存储…...

ArcGIS Pro进行坡度与坡向分析
在地理信息系统中,坡度分析是一项至关重要的空间分析方法,旨在精确计算地表或地形的坡度,为地形特征识别、土地资源规划、环境保护、灾害预警等领域提供科学依据。本文将详细介绍如何利用ArcGIS Pro这一强大的地理信息系统软件,进…...

My first Android application
界面元素组成: 功能代码: /*实现功能:当输入内容后,欢迎文本发生相应改变,并清除掉文本域内容当未输入任何内容时,弹出提示文本以警告用户*/val greetingText findViewById<TextView>(R.id.printer)…...

(十)学生端搭建
本次旨在将之前的已完成的部分功能进行拼装到学生端,同时完善学生端的构建。本次工作主要包括: 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql
智慧工地管理云平台系统,智慧工地全套源码,java版智慧工地源码,支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求,提供“平台网络终端”的整体解决方案,提供劳务管理、视频管理、智能监测、绿色施工、安全管…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...

1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

高等数学(下)题型笔记(八)空间解析几何与向量代数
目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

ETLCloud可能遇到的问题有哪些?常见坑位解析
数据集成平台ETLCloud,主要用于支持数据的抽取(Extract)、转换(Transform)和加载(Load)过程。提供了一个简洁直观的界面,以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

PL0语法,分析器实现!
简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...

Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...