当前位置: 首页 > news >正文

Windows10配置C++版本的Kafka,并进行发布和订阅测试

配置的环境为:Release x64下的环境

完整项目:https://gitee.com/jiajingong/kafka-publisher

1、首先下载相应的库文件(.lib,.dll)

参考链接:

GitHub - eStreamSoftware/delphi-kafka

GitHub - cloader/KafkaCPP-win32-dll: KafkaCpp-win32-dll

2、新建一个新的命令行C++工程

建完工程后,选择Release x64,并在生成中执行重新生成解决方案,这样会在项目目录下生成x64/Release文件夹

3、通过VS2017配置附加库目录和附加依赖项

所有的.lib、.dll等库文件均在下图x64/Release目录下

附加依赖项加入:librdkafka.lib;librdkafkacpp.lib,如下图:

4、发布端:

将主函数的CPP文件改为:

#include <iostream>
#include <thread>
#include "rdkafkacpp.h"int main()
{std::string brokers = "172.18.4.96:9092";std::string errorStr;RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (!conf) {std::cout << "Create RdKafka Conf failed" << std::endl;return -1;}conf->set("message.max.bytes", "10240000", errorStr); //最大字节数conf->set("replica.fetch.max.bytes", "20485760", errorStr);conf->set("bootstrap.servers", brokers, errorStr);RdKafka::Producer *producer = RdKafka::Producer::create(conf, errorStr);if (!producer) {std::cout << "Create Producer failed" << std::endl;return -1;}//创建TopicRdKafka::Topic *topic = RdKafka::Topic::create(producer, "koala-stqf-03", tconf, errorStr);if (!topic) {std::cout << "Create Topic failed" << std::endl;}int count = 0;while (true){   //发送消息RdKafka::ErrorCode resCode = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, (char *)"123456789", 10, nullptr, nullptr);std::cout << "Count:" << count << ",has publish:" << (char *)"123456789" << std::endl;if (resCode != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resCode) << std::endl;}count += 1;std::this_thread::sleep_for(std::chrono::seconds(1));}delete conf;delete tconf;delete topic;delete producer;RdKafka::wait_destroyed(5000);return 0;
}

5、订阅端

新建一个同样的订阅端工程,同样将主函数的代码改为:

#include "rdkafkacpp.h"
#include <chrono>
#include <time.h>
#include <sstream>
#include <iomanip>
#include <iostream>
#include <algorithm>
#include <iterator>void consume_cb(RdKafka::Message &message, void *opaque)
{switch (message.err()) {case RdKafka::ERR__TIMED_OUT:std::cout << "RdKafka::ERR__TIMED_OUT" << std::endl;break;case RdKafka::ERR_NO_ERROR:/* Real message */RdKafka::MessageTimestamp ts;ts = message.timestamp();if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {std::string timeprefix;if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {timeprefix = "created time";}else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {timeprefix = "log append time";}unsigned long long milli = ts.timestamp + (unsigned long long)8 * 60 * 60 * 1000;//此处转化为东八区北京时间,如果是其它时区需要按需求修改auto mTime = std::chrono::milliseconds(milli);auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(mTime);auto tt = std::chrono::system_clock::to_time_t(tp);tm timeinfo;::gmtime_s(&timeinfo, &tt);//char s[60]{ 0 };//::sprintf(s, "%04d-%02d-%02d %02d:%02d:%02d", timeinfo.tm_year + 1900, timeinfo.tm_mon + 1, timeinfo.tm_mday, timeinfo.tm_hour, timeinfo.tm_min, timeinfo.tm_sec);// std::cout << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;
#if 0std::stringstream ss;std::string dateStr;ss << timeinfo.tm_year + 1900 << "-"<< timeinfo.tm_mon + 1 << "-"<< timeinfo.tm_mday;ss >> dateStr;ss.clear();ss << timeinfo.tm_hour << ":"<< timeinfo.tm_min << ":"<< timeinfo.tm_sec;std::string timeStr;ss >> timeStr;std::string dateTimeStr;dateTimeStr += dateStr;dateTimeStr.push_back(' ');dateTimeStr += timeStr;
#endif // 0//std::cout << "TimeStamp" << timeprefix << " " << s << std::endl;std::cout << "TimeStamp   " << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;}std::cout << message.topic_name() << " offset" << message.offset() << "  partion " << message.partition() << " message: " << reinterpret_cast<char*>(message.payload()) << std::endl;break;case RdKafka::ERR__PARTITION_EOF:/* Last message */std::cout << "EOF reached for" << std::endl;break;case RdKafka::ERR__UNKNOWN_TOPIC:case RdKafka::ERR__UNKNOWN_PARTITION:std::cout << "Consume failed: " << message.errstr();break;default:/* Errors */std::cout << "Consume failed: " << message.errstr();break;}
}
int main()
{std::string brokers = "172.18.4.96:9092";std::string errstr;std::vector<std::string> topics{ "koala-stqf-03","klai-seim-alert-koala-test-03"};std::string group_id = "whl-consumer-group";RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);if (conf->set("group.id", group_id, errstr)) {std::cout << errstr << std::endl;return -1;}conf->set("bootstrap.servers", brokers, errstr);conf->set("max.partition.fetch.bytes", "1024000", errstr);//conf->set("enable-auto-commit", "true", errstr);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);tconf->set("auto.offset.reset", "latest", errstr);conf->set("default_topic_conf", tconf, errstr);RdKafka::KafkaConsumer *m_consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!m_consumer) {std::cout << "failed to create consumer " << errstr << std::endl;return -1;}#if 0 //从上一次消费结束的位置开始消费RdKafka::ErrorCode err = m_consumer->subscribe(topics);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;return -1;}
#else //指定每个topic的每个分区开始消费的位置//基本思路为先获取server端的状态信息,将与订阅相关的topic找出来,根据分区,创建TopicPartion;最后使用assign消费RdKafka::Metadata *metadataMap{ nullptr };RdKafka::ErrorCode err = m_consumer->metadata(true, nullptr, &metadataMap, 2000);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;}const RdKafka::Metadata::TopicMetadataVector *topicList = metadataMap->topics();std::cout << "broker topic size: " << topicList->size() << std::endl;RdKafka::Metadata::TopicMetadataVector subTopicMetaVec;std::copy_if(topicList->begin(), topicList->end(), std::back_inserter(subTopicMetaVec), [&topics](const RdKafka::TopicMetadata* data) {return std::find_if(topics.begin(), topics.end(), [data](const std::string &tname) {return data->topic() == tname; }) != topics.end();});std::vector<RdKafka::TopicPartition*> topicpartions;std::for_each(subTopicMetaVec.begin(), subTopicMetaVec.end(), [&topicpartions](const RdKafka::TopicMetadata* data) {auto parVec = data->partitions();std::for_each(parVec->begin(), parVec->end(), [&](const RdKafka::PartitionMetadata *value) {std::cout << data->topic() << " has partion: " << value->id() << " Leader is : " << value->leader() << std::endl;topicpartions.push_back(RdKafka::TopicPartition::create(data->topic(), value->id(), RdKafka::Topic::OFFSET_END));});});m_consumer->assign(topicpartions);
#endif // 0RdKafka::ErrorCode errccc = m_consumer->subscribe(topics);if (errccc != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(errccc) << std::endl;return -1;}while (true){RdKafka::Message *msg = m_consumer->consume(6000);consume_cb(*msg, nullptr); //消息一条消息delete msg;}return 0;
}

6、发布 订阅展示:

相关文章:

Windows10配置C++版本的Kafka,并进行发布和订阅测试

配置的环境为&#xff1a;Release x64下的环境 完整项目&#xff1a;https://gitee.com/jiajingong/kafka-publisher 1、首先下载相应的库文件&#xff08;.lib&#xff0c;.dll&#xff09; 参考链接&#xff1a; GitHub - eStreamSoftware/delphi-kafka GitHub - cloade…...

vue3 下载文件 responseType-blob 或者 a标签

在 Vue 3 中&#xff0c;你可以使用 axios 或 fetch 来下载文件&#xff0c;并将 responseType 设置为 blob 以处理二进制数据。以下是一个使用 axios 的示例&#xff1a; 使用 axios 下载文件 首先&#xff0c;确保你已经安装了 axios&#xff1a; npm install axios然后在你…...

【Gin-Web】Bluebell社区项目梳理6:限流策略-漏桶与令牌桶

本文目录 一、限流二、漏桶三、令牌桶算法四、Gin框架中实现令牌桶限流 一、限流 限流又称为流量控制&#xff0c;也就是流控&#xff0c;通常是指限制到达系统的并发请求数。 限流虽然会影响部分用户的使用体验&#xff0c;但是能一定程度上保证系统的稳定性&#xff0c;不至…...

51单片机-AT24CXX存储器工作原理

1、AT24CXX存储器工作原理 1.1、特点&#xff1a; 与400KHz&#xff0c;I2C总线兼容1.8到6.0伏工作电压范围低功耗CMOS技术写保护功能当WP为高电平时进入写保护状态页写缓冲器自定时擦写周期100万次编程/擦除周期可保存数据100年8脚DIP SOIC或TSSOP封装温度范围商业级和工业级…...

突破性能极限:DeepSeek开源FlashMLA解码内核技术解析

引言&#xff1a;大模型时代的推理加速革命 在生成式AI大行其道的今天&#xff0c;如何提升大语言模型的推理效率已成为行业焦点。DeepSeek团队最新开源的FlashMLA项目凭借其惊人的性能表现引发关注——在H800 GPU上实现580 TFLOPS计算性能&#xff0c;这正是大模型推理优化的…...

点击修改按钮图片显示有问题

问题可能出在表单数据的初始化上。在 ave-form.vue 中&#xff0c;我们需要处理一下从后端返回的图片数据&#xff0c;因为它们可能是 JSON 字符串格式。 vue:src/views/tools/fake-strategy/components/ave-form.vue// ... existing code ...Watch(value)watchValue(v: any) …...

[AI]从零开始的树莓派运行DeepSeek模型教程

一、前言 在前面的教程中&#xff0c;教了大家如何在windows中使用llama.cpp来运行DeepSeek模型。根据前面的教程中&#xff0c;我们也了解到了&#xff0c;我们只需要编译好llama.cpp就可以运行DeepSeek以及类似的LLM模型。那么本次教程就来教大家如何使用树莓派来运行大模型。…...

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷(二)

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷&#xff08;二&#xff09; 第一部分&#xff1a;网络平台搭建与设备安全防护任务书第二部分&#xff1a;网络安全事件响应、数字取证调查、应用程序安全任务书任务 1&#xff1a;应急响应&…...

Open WebUI本地部署教程

文章目录 1、系统环境配置2、源码下载2.1 github源码地址下载 3、环境启动3.1 后端环境3.2 前端环境 4、问题4.1 浏览器跨域问题4.2 all-MiniLM-L6-v2模型文件下载失败问题4.3 单独部署backend启动报错问题 1、系统环境配置 操作系统&#xff1a;windows/linux/macos Python版…...

Missing required prop: “maxlength“

背景&#xff1a; 封装一个使用功能相同使用频率较高的input公共组件作为子组件&#xff0c;大多数长度要求为200&#xff0c;且实时显示统计子数&#xff0c;部分input有输入提示。 代码实现如下&#xff1a; <template><el-input v-model"inputValue" t…...

dify本地部署

安装docker。 在官网安装docker。 如果遇到wsl报错&#xff0c;就使用 wsl --updata 进行更新。如果问题解决&#xff0c;进入docker应该是如下界面&#xff1a; 克隆 在自己创建的文件内使用 git clone gitgithub.com:langgenius/dify.git 或 git clone https://github.com…...

python学习一

学习网络安全为什么要学python? 1、在实际的渗透测试过程中,面对复杂多变的网络环境,当常用工 具不能满足实际需求的时候,往往需要对现有工具进行扩展,或者 编写符合我们要求的工具、自动化脚本,这个时候就需要具备一定 的编程能力。 2、python是一门编程语言经常用它…...

git branch

文章目录 1.简介2.格式3.选项4.示例参考文献 1.简介 git branch 用于管理分支&#xff0c;包括查看、创建、删除、重命名和关联。 git branch 是 Git 版本控制系统中用于管理分支的命令。分支是 Git 的核心功能之一&#xff0c;允许开发者在同一个代码库中并行开发不同的功能…...

算法-图-数据结构(邻接矩阵)-BFS广度优先遍历

邻接矩阵广度优先遍历&#xff08;BFS&#xff09;是一种用于遍历或搜索图的算法&#xff0c;以下是具体介绍&#xff1a; 1. 基本概念 图是一种非线性的数据结构&#xff0c;由顶点和边组成&#xff0c;可分为无向图、有向图、加权图、无权图等。邻接矩阵是表示图的一种数…...

数学建模之数学模型—2:非线性规划

文章目录 非线性规划基本概念与结论凸集与凸函数极值条件无约束条件的极值判断条件有约束条件的极值判断条件 无约束非线性规划一维搜索算法步骤示例特点代码模板 最速下降法算法详细步骤 代码实现示例最优步长的求解 黄金分割法斐波那契法牛顿法阻尼牛顿法模式搜索法Powell方法…...

unity学习51:所有UI的父物体:canvas画布

目录 1 下载资源 1.1 在window / Asset store下下载一套免费的UI资源 1.2 下载&#xff0c;导入import 1.3 导入后在 project / Asset下面可以看到 2 画布canvas&#xff0c;UI的父物体 2.1 创建canvas 2.1.1 画布的下面是 event system是UI相关的事件系统 2.2 canvas…...

ctfshow做题笔记—栈溢出—pwn57~pwn60

目录 前言 一、pwn57&#xff08;先了解一下简单的64位shellcode吧&#xff09; 二、pwn58 三、pwn59&#xff08;64位 无限制&#xff09; 四、pwn60&#xff08;入门难度shellcode&#xff09; 前言 往前写了几道题&#xff0c;与shellcode有关&#xff0c;关于shellc…...

数据结构 1-2 线性表的链式存储-链表

1 原理 顺序表的缺点&#xff1a; 插入和删除移动大量元素数组的大小不好控制占用一大段连续的存储空间&#xff0c;造成很多碎片 链表规避了上述顺序表缺点 逻辑上相邻的两个元素在物理位置上不相邻 头结点 L&#xff1a;头指针 头指针&#xff1a;链表中第一个结点的存储…...

ArcGIS Pro进行坡度与坡向分析

在地理信息系统中&#xff0c;坡度分析是一项至关重要的空间分析方法&#xff0c;旨在精确计算地表或地形的坡度&#xff0c;为地形特征识别、土地资源规划、环境保护、灾害预警等领域提供科学依据。本文将详细介绍如何利用ArcGIS Pro这一强大的地理信息系统软件&#xff0c;进…...

My first Android application

界面元素组成&#xff1a; 功能代码&#xff1a; /*实现功能&#xff1a;当输入内容后&#xff0c;欢迎文本发生相应改变&#xff0c;并清除掉文本域内容当未输入任何内容时&#xff0c;弹出提示文本以警告用户*/val greetingText findViewById<TextView>(R.id.printer)…...

3分钟搞定B站视频转文字:智能高效免费工具bili2text全解析

3分钟搞定B站视频转文字&#xff1a;智能高效免费工具bili2text全解析 【免费下载链接】bili2text Bilibili视频转文字&#xff0c;一步到位&#xff0c;输入链接即可使用 项目地址: https://gitcode.com/gh_mirrors/bi/bili2text 你是否曾为整理B站学习视频内容而反复暂…...

TranslucentTB启动失败?3步修复Microsoft.UI.Xaml依赖问题

TranslucentTB启动失败&#xff1f;3步修复Microsoft.UI.Xaml依赖问题 【免费下载链接】TranslucentTB A lightweight utility that makes the Windows taskbar translucent/transparent. 项目地址: https://gitcode.com/gh_mirrors/tr/TranslucentTB TranslucentTB是一…...

高性能模块化哔哩哔哩下载器BBDown架构设计深度解析

高性能模块化哔哩哔哩下载器BBDown架构设计深度解析 【免费下载链接】BBDown Bilibili Downloader. 一个命令行式哔哩哔哩下载器. 项目地址: https://gitcode.com/gh_mirrors/bb/BBDown 在当今数字内容消费时代&#xff0c;高效获取和管理在线视频资源成为技术爱好者和开…...

虚拟机基础:JVM、V8 运行机制极简科普

文章目录 前言一、先搞懂&#xff1a;到底什么是“虚拟机”&#xff1f;二、JVM&#xff1a;Java世界的“铁饭碗管家”2.1 JVM的整体工作流程2.2 JVM的核心结构&#xff1a;五大区域三大子系统2.2.1 运行时数据区&#xff08;JVM的“房间布局”&#xff09;2.2.2 三大核心子系统…...

长沙有没有可以定制包装盒的厂家?—— 供应链选型与技术方案全解析

文章摘要&#xff1a;面向产品、采购、电商与供应链从业者&#xff0c;本文以长沙区域包装供应链为样本&#xff0c;从定制能力、设备工艺、交付周期、品控合规、成本结构等维度&#xff0c;系统分析本地包装盒定制厂商的选型标准、技术门槛与风险点&#xff0c;提供可直接落地…...

当Copilot写出恶意反序列化代码时——智能代码生成安全风险评估的“黄金45分钟”响应协议(含SAST+DAST+LLM-Sandbox三重验证机制)

第一章&#xff1a;当Copilot写出恶意反序列化代码时——智能代码生成安全风险评估的“黄金45分钟”响应协议&#xff08;含SASTDASTLLM-Sandbox三重验证机制&#xff09; 2026奇点智能技术大会(https://ml-summit.org) 当开发者在IDE中键入// Deserialize untrusted JSON pa…...

Spring with AI (): 定制对话——Prompt模板引入

从 UI 工程师到 AI 应用架构者 13 年前&#xff0c;我的工作是让按钮在 IE6 上对齐&#xff1b; 13 年后&#xff0c;我用 fetch-event-source 订阅大模型的“思维流”&#xff0c;用 OCR 解锁图片中的文字——前端&#xff0c;正在成为 AI 产品的第一道体验防线。 最近&#x…...

迅雷链接在线解密解析工具系统源码_本地化API_开源

内容目录一、详细介绍二、效果展示1.部分代码2.效果图展示一、详细介绍 迅雷链接在线解密解析工具系统源码/本地化API/开源 本地化API后无需担心API失效的烦恼&#xff0c;还可以改成加密链接等&#xff0c;自行探索 二、效果展示 1.部分代码 代码如下&#xff08;示例&am…...

SMAPI错误拦截与自动修复:如何确保星露谷物语游戏稳定性的完整指南

SMAPI错误拦截与自动修复&#xff1a;如何确保星露谷物语游戏稳定性的完整指南 【免费下载链接】SMAPI The modding API for Stardew Valley. 项目地址: https://gitcode.com/gh_mirrors/smap/SMAPI SMAPI&#xff08;Stardew Valley Modding API&#xff09;作为星露谷…...

保姆级避坑指南:Ubuntu 20.04 LTS源码编译Qt 5.15.2全流程

1. 为什么选择源码编译Qt 5.15.2&#xff1f; 在Ubuntu 20.04 LTS上安装Qt通常有两种方式&#xff1a;通过apt安装预编译版本&#xff0c;或者从源码编译安装。源码编译虽然步骤繁琐&#xff0c;但能带来三个关键优势&#xff1a;版本可控&#xff08;官方仓库的Qt版本往往较旧…...