当前位置: 首页 > news >正文

Windows10配置C++版本的Kafka,并进行发布和订阅测试

配置的环境为:Release x64下的环境

完整项目:https://gitee.com/jiajingong/kafka-publisher

1、首先下载相应的库文件(.lib,.dll)

参考链接:

GitHub - eStreamSoftware/delphi-kafka

GitHub - cloader/KafkaCPP-win32-dll: KafkaCpp-win32-dll

2、新建一个新的命令行C++工程

建完工程后,选择Release x64,并在生成中执行重新生成解决方案,这样会在项目目录下生成x64/Release文件夹

3、通过VS2017配置附加库目录和附加依赖项

所有的.lib、.dll等库文件均在下图x64/Release目录下

附加依赖项加入:librdkafka.lib;librdkafkacpp.lib,如下图:

4、发布端:

将主函数的CPP文件改为:

#include <iostream>
#include <thread>
#include "rdkafkacpp.h"int main()
{std::string brokers = "172.18.4.96:9092";std::string errorStr;RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (!conf) {std::cout << "Create RdKafka Conf failed" << std::endl;return -1;}conf->set("message.max.bytes", "10240000", errorStr); //最大字节数conf->set("replica.fetch.max.bytes", "20485760", errorStr);conf->set("bootstrap.servers", brokers, errorStr);RdKafka::Producer *producer = RdKafka::Producer::create(conf, errorStr);if (!producer) {std::cout << "Create Producer failed" << std::endl;return -1;}//创建TopicRdKafka::Topic *topic = RdKafka::Topic::create(producer, "koala-stqf-03", tconf, errorStr);if (!topic) {std::cout << "Create Topic failed" << std::endl;}int count = 0;while (true){   //发送消息RdKafka::ErrorCode resCode = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, (char *)"123456789", 10, nullptr, nullptr);std::cout << "Count:" << count << ",has publish:" << (char *)"123456789" << std::endl;if (resCode != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resCode) << std::endl;}count += 1;std::this_thread::sleep_for(std::chrono::seconds(1));}delete conf;delete tconf;delete topic;delete producer;RdKafka::wait_destroyed(5000);return 0;
}

5、订阅端

新建一个同样的订阅端工程,同样将主函数的代码改为:

#include "rdkafkacpp.h"
#include <chrono>
#include <time.h>
#include <sstream>
#include <iomanip>
#include <iostream>
#include <algorithm>
#include <iterator>void consume_cb(RdKafka::Message &message, void *opaque)
{switch (message.err()) {case RdKafka::ERR__TIMED_OUT:std::cout << "RdKafka::ERR__TIMED_OUT" << std::endl;break;case RdKafka::ERR_NO_ERROR:/* Real message */RdKafka::MessageTimestamp ts;ts = message.timestamp();if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {std::string timeprefix;if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {timeprefix = "created time";}else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {timeprefix = "log append time";}unsigned long long milli = ts.timestamp + (unsigned long long)8 * 60 * 60 * 1000;//此处转化为东八区北京时间,如果是其它时区需要按需求修改auto mTime = std::chrono::milliseconds(milli);auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(mTime);auto tt = std::chrono::system_clock::to_time_t(tp);tm timeinfo;::gmtime_s(&timeinfo, &tt);//char s[60]{ 0 };//::sprintf(s, "%04d-%02d-%02d %02d:%02d:%02d", timeinfo.tm_year + 1900, timeinfo.tm_mon + 1, timeinfo.tm_mday, timeinfo.tm_hour, timeinfo.tm_min, timeinfo.tm_sec);// std::cout << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;
#if 0std::stringstream ss;std::string dateStr;ss << timeinfo.tm_year + 1900 << "-"<< timeinfo.tm_mon + 1 << "-"<< timeinfo.tm_mday;ss >> dateStr;ss.clear();ss << timeinfo.tm_hour << ":"<< timeinfo.tm_min << ":"<< timeinfo.tm_sec;std::string timeStr;ss >> timeStr;std::string dateTimeStr;dateTimeStr += dateStr;dateTimeStr.push_back(' ');dateTimeStr += timeStr;
#endif // 0//std::cout << "TimeStamp" << timeprefix << " " << s << std::endl;std::cout << "TimeStamp   " << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;}std::cout << message.topic_name() << " offset" << message.offset() << "  partion " << message.partition() << " message: " << reinterpret_cast<char*>(message.payload()) << std::endl;break;case RdKafka::ERR__PARTITION_EOF:/* Last message */std::cout << "EOF reached for" << std::endl;break;case RdKafka::ERR__UNKNOWN_TOPIC:case RdKafka::ERR__UNKNOWN_PARTITION:std::cout << "Consume failed: " << message.errstr();break;default:/* Errors */std::cout << "Consume failed: " << message.errstr();break;}
}
int main()
{std::string brokers = "172.18.4.96:9092";std::string errstr;std::vector<std::string> topics{ "koala-stqf-03","klai-seim-alert-koala-test-03"};std::string group_id = "whl-consumer-group";RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);if (conf->set("group.id", group_id, errstr)) {std::cout << errstr << std::endl;return -1;}conf->set("bootstrap.servers", brokers, errstr);conf->set("max.partition.fetch.bytes", "1024000", errstr);//conf->set("enable-auto-commit", "true", errstr);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);tconf->set("auto.offset.reset", "latest", errstr);conf->set("default_topic_conf", tconf, errstr);RdKafka::KafkaConsumer *m_consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!m_consumer) {std::cout << "failed to create consumer " << errstr << std::endl;return -1;}#if 0 //从上一次消费结束的位置开始消费RdKafka::ErrorCode err = m_consumer->subscribe(topics);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;return -1;}
#else //指定每个topic的每个分区开始消费的位置//基本思路为先获取server端的状态信息,将与订阅相关的topic找出来,根据分区,创建TopicPartion;最后使用assign消费RdKafka::Metadata *metadataMap{ nullptr };RdKafka::ErrorCode err = m_consumer->metadata(true, nullptr, &metadataMap, 2000);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;}const RdKafka::Metadata::TopicMetadataVector *topicList = metadataMap->topics();std::cout << "broker topic size: " << topicList->size() << std::endl;RdKafka::Metadata::TopicMetadataVector subTopicMetaVec;std::copy_if(topicList->begin(), topicList->end(), std::back_inserter(subTopicMetaVec), [&topics](const RdKafka::TopicMetadata* data) {return std::find_if(topics.begin(), topics.end(), [data](const std::string &tname) {return data->topic() == tname; }) != topics.end();});std::vector<RdKafka::TopicPartition*> topicpartions;std::for_each(subTopicMetaVec.begin(), subTopicMetaVec.end(), [&topicpartions](const RdKafka::TopicMetadata* data) {auto parVec = data->partitions();std::for_each(parVec->begin(), parVec->end(), [&](const RdKafka::PartitionMetadata *value) {std::cout << data->topic() << " has partion: " << value->id() << " Leader is : " << value->leader() << std::endl;topicpartions.push_back(RdKafka::TopicPartition::create(data->topic(), value->id(), RdKafka::Topic::OFFSET_END));});});m_consumer->assign(topicpartions);
#endif // 0RdKafka::ErrorCode errccc = m_consumer->subscribe(topics);if (errccc != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(errccc) << std::endl;return -1;}while (true){RdKafka::Message *msg = m_consumer->consume(6000);consume_cb(*msg, nullptr); //消息一条消息delete msg;}return 0;
}

6、发布 订阅展示:

相关文章:

Windows10配置C++版本的Kafka,并进行发布和订阅测试

配置的环境为&#xff1a;Release x64下的环境 完整项目&#xff1a;https://gitee.com/jiajingong/kafka-publisher 1、首先下载相应的库文件&#xff08;.lib&#xff0c;.dll&#xff09; 参考链接&#xff1a; GitHub - eStreamSoftware/delphi-kafka GitHub - cloade…...

vue3 下载文件 responseType-blob 或者 a标签

在 Vue 3 中&#xff0c;你可以使用 axios 或 fetch 来下载文件&#xff0c;并将 responseType 设置为 blob 以处理二进制数据。以下是一个使用 axios 的示例&#xff1a; 使用 axios 下载文件 首先&#xff0c;确保你已经安装了 axios&#xff1a; npm install axios然后在你…...

【Gin-Web】Bluebell社区项目梳理6:限流策略-漏桶与令牌桶

本文目录 一、限流二、漏桶三、令牌桶算法四、Gin框架中实现令牌桶限流 一、限流 限流又称为流量控制&#xff0c;也就是流控&#xff0c;通常是指限制到达系统的并发请求数。 限流虽然会影响部分用户的使用体验&#xff0c;但是能一定程度上保证系统的稳定性&#xff0c;不至…...

51单片机-AT24CXX存储器工作原理

1、AT24CXX存储器工作原理 1.1、特点&#xff1a; 与400KHz&#xff0c;I2C总线兼容1.8到6.0伏工作电压范围低功耗CMOS技术写保护功能当WP为高电平时进入写保护状态页写缓冲器自定时擦写周期100万次编程/擦除周期可保存数据100年8脚DIP SOIC或TSSOP封装温度范围商业级和工业级…...

突破性能极限:DeepSeek开源FlashMLA解码内核技术解析

引言&#xff1a;大模型时代的推理加速革命 在生成式AI大行其道的今天&#xff0c;如何提升大语言模型的推理效率已成为行业焦点。DeepSeek团队最新开源的FlashMLA项目凭借其惊人的性能表现引发关注——在H800 GPU上实现580 TFLOPS计算性能&#xff0c;这正是大模型推理优化的…...

点击修改按钮图片显示有问题

问题可能出在表单数据的初始化上。在 ave-form.vue 中&#xff0c;我们需要处理一下从后端返回的图片数据&#xff0c;因为它们可能是 JSON 字符串格式。 vue:src/views/tools/fake-strategy/components/ave-form.vue// ... existing code ...Watch(value)watchValue(v: any) …...

[AI]从零开始的树莓派运行DeepSeek模型教程

一、前言 在前面的教程中&#xff0c;教了大家如何在windows中使用llama.cpp来运行DeepSeek模型。根据前面的教程中&#xff0c;我们也了解到了&#xff0c;我们只需要编译好llama.cpp就可以运行DeepSeek以及类似的LLM模型。那么本次教程就来教大家如何使用树莓派来运行大模型。…...

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷(二)

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷&#xff08;二&#xff09; 第一部分&#xff1a;网络平台搭建与设备安全防护任务书第二部分&#xff1a;网络安全事件响应、数字取证调查、应用程序安全任务书任务 1&#xff1a;应急响应&…...

Open WebUI本地部署教程

文章目录 1、系统环境配置2、源码下载2.1 github源码地址下载 3、环境启动3.1 后端环境3.2 前端环境 4、问题4.1 浏览器跨域问题4.2 all-MiniLM-L6-v2模型文件下载失败问题4.3 单独部署backend启动报错问题 1、系统环境配置 操作系统&#xff1a;windows/linux/macos Python版…...

Missing required prop: “maxlength“

背景&#xff1a; 封装一个使用功能相同使用频率较高的input公共组件作为子组件&#xff0c;大多数长度要求为200&#xff0c;且实时显示统计子数&#xff0c;部分input有输入提示。 代码实现如下&#xff1a; <template><el-input v-model"inputValue" t…...

dify本地部署

安装docker。 在官网安装docker。 如果遇到wsl报错&#xff0c;就使用 wsl --updata 进行更新。如果问题解决&#xff0c;进入docker应该是如下界面&#xff1a; 克隆 在自己创建的文件内使用 git clone gitgithub.com:langgenius/dify.git 或 git clone https://github.com…...

python学习一

学习网络安全为什么要学python? 1、在实际的渗透测试过程中,面对复杂多变的网络环境,当常用工 具不能满足实际需求的时候,往往需要对现有工具进行扩展,或者 编写符合我们要求的工具、自动化脚本,这个时候就需要具备一定 的编程能力。 2、python是一门编程语言经常用它…...

git branch

文章目录 1.简介2.格式3.选项4.示例参考文献 1.简介 git branch 用于管理分支&#xff0c;包括查看、创建、删除、重命名和关联。 git branch 是 Git 版本控制系统中用于管理分支的命令。分支是 Git 的核心功能之一&#xff0c;允许开发者在同一个代码库中并行开发不同的功能…...

算法-图-数据结构(邻接矩阵)-BFS广度优先遍历

邻接矩阵广度优先遍历&#xff08;BFS&#xff09;是一种用于遍历或搜索图的算法&#xff0c;以下是具体介绍&#xff1a; 1. 基本概念 图是一种非线性的数据结构&#xff0c;由顶点和边组成&#xff0c;可分为无向图、有向图、加权图、无权图等。邻接矩阵是表示图的一种数…...

数学建模之数学模型—2:非线性规划

文章目录 非线性规划基本概念与结论凸集与凸函数极值条件无约束条件的极值判断条件有约束条件的极值判断条件 无约束非线性规划一维搜索算法步骤示例特点代码模板 最速下降法算法详细步骤 代码实现示例最优步长的求解 黄金分割法斐波那契法牛顿法阻尼牛顿法模式搜索法Powell方法…...

unity学习51:所有UI的父物体:canvas画布

目录 1 下载资源 1.1 在window / Asset store下下载一套免费的UI资源 1.2 下载&#xff0c;导入import 1.3 导入后在 project / Asset下面可以看到 2 画布canvas&#xff0c;UI的父物体 2.1 创建canvas 2.1.1 画布的下面是 event system是UI相关的事件系统 2.2 canvas…...

ctfshow做题笔记—栈溢出—pwn57~pwn60

目录 前言 一、pwn57&#xff08;先了解一下简单的64位shellcode吧&#xff09; 二、pwn58 三、pwn59&#xff08;64位 无限制&#xff09; 四、pwn60&#xff08;入门难度shellcode&#xff09; 前言 往前写了几道题&#xff0c;与shellcode有关&#xff0c;关于shellc…...

数据结构 1-2 线性表的链式存储-链表

1 原理 顺序表的缺点&#xff1a; 插入和删除移动大量元素数组的大小不好控制占用一大段连续的存储空间&#xff0c;造成很多碎片 链表规避了上述顺序表缺点 逻辑上相邻的两个元素在物理位置上不相邻 头结点 L&#xff1a;头指针 头指针&#xff1a;链表中第一个结点的存储…...

ArcGIS Pro进行坡度与坡向分析

在地理信息系统中&#xff0c;坡度分析是一项至关重要的空间分析方法&#xff0c;旨在精确计算地表或地形的坡度&#xff0c;为地形特征识别、土地资源规划、环境保护、灾害预警等领域提供科学依据。本文将详细介绍如何利用ArcGIS Pro这一强大的地理信息系统软件&#xff0c;进…...

My first Android application

界面元素组成&#xff1a; 功能代码&#xff1a; /*实现功能&#xff1a;当输入内容后&#xff0c;欢迎文本发生相应改变&#xff0c;并清除掉文本域内容当未输入任何内容时&#xff0c;弹出提示文本以警告用户*/val greetingText findViewById<TextView>(R.id.printer)…...

如何在看板中体现优先级变化

在看板中有效体现优先级变化的关键措施包括&#xff1a;采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中&#xff0c;设置任务排序规则尤其重要&#xff0c;因为它让看板视觉上直观地体…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

基于当前项目通过npm包形式暴露公共组件

1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹&#xff0c;并新增内容 3.创建package文件夹...

【论文笔记】若干矿井粉尘检测算法概述

总的来说&#xff0c;传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度&#xff0c;通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面

代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口&#xff08;适配服务端返回 Token&#xff09; export const login async (code, avatar) > {const res await http…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

Python爬虫(一):爬虫伪装

一、网站防爬机制概述 在当今互联网环境中&#xff0c;具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类&#xff1a; 身份验证机制&#xff1a;直接将未经授权的爬虫阻挡在外反爬技术体系&#xff1a;通过各种技术手段增加爬虫获取数据的难度…...

.Net Framework 4/C# 关键字(非常用,持续更新...)

一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败&#xff0c;具体原因是客户端发送了密码认证请求&#xff0c;但Redis服务器未设置密码 1.为Redis设置密码&#xff08;匹配客户端配置&#xff09; 步骤&#xff1a; 1&#xff09;.修…...

JavaScript 数据类型详解

JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型&#xff08;Primitive&#xff09; 和 对象类型&#xff08;Object&#xff09; 两大类&#xff0c;共 8 种&#xff08;ES11&#xff09;&#xff1a; 一、原始类型&#xff08;7种&#xff09; 1. undefined 定…...