当前位置: 首页 > news >正文

Windows10配置C++版本的Kafka,并进行发布和订阅测试

配置的环境为:Release x64下的环境

完整项目:https://gitee.com/jiajingong/kafka-publisher

1、首先下载相应的库文件(.lib,.dll)

参考链接:

GitHub - eStreamSoftware/delphi-kafka

GitHub - cloader/KafkaCPP-win32-dll: KafkaCpp-win32-dll

2、新建一个新的命令行C++工程

建完工程后,选择Release x64,并在生成中执行重新生成解决方案,这样会在项目目录下生成x64/Release文件夹

3、通过VS2017配置附加库目录和附加依赖项

所有的.lib、.dll等库文件均在下图x64/Release目录下

附加依赖项加入:librdkafka.lib;librdkafkacpp.lib,如下图:

4、发布端:

将主函数的CPP文件改为:

#include <iostream>
#include <thread>
#include "rdkafkacpp.h"int main()
{std::string brokers = "172.18.4.96:9092";std::string errorStr;RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (!conf) {std::cout << "Create RdKafka Conf failed" << std::endl;return -1;}conf->set("message.max.bytes", "10240000", errorStr); //最大字节数conf->set("replica.fetch.max.bytes", "20485760", errorStr);conf->set("bootstrap.servers", brokers, errorStr);RdKafka::Producer *producer = RdKafka::Producer::create(conf, errorStr);if (!producer) {std::cout << "Create Producer failed" << std::endl;return -1;}//创建TopicRdKafka::Topic *topic = RdKafka::Topic::create(producer, "koala-stqf-03", tconf, errorStr);if (!topic) {std::cout << "Create Topic failed" << std::endl;}int count = 0;while (true){   //发送消息RdKafka::ErrorCode resCode = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, (char *)"123456789", 10, nullptr, nullptr);std::cout << "Count:" << count << ",has publish:" << (char *)"123456789" << std::endl;if (resCode != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resCode) << std::endl;}count += 1;std::this_thread::sleep_for(std::chrono::seconds(1));}delete conf;delete tconf;delete topic;delete producer;RdKafka::wait_destroyed(5000);return 0;
}

5、订阅端

新建一个同样的订阅端工程,同样将主函数的代码改为:

#include "rdkafkacpp.h"
#include <chrono>
#include <time.h>
#include <sstream>
#include <iomanip>
#include <iostream>
#include <algorithm>
#include <iterator>void consume_cb(RdKafka::Message &message, void *opaque)
{switch (message.err()) {case RdKafka::ERR__TIMED_OUT:std::cout << "RdKafka::ERR__TIMED_OUT" << std::endl;break;case RdKafka::ERR_NO_ERROR:/* Real message */RdKafka::MessageTimestamp ts;ts = message.timestamp();if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {std::string timeprefix;if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {timeprefix = "created time";}else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {timeprefix = "log append time";}unsigned long long milli = ts.timestamp + (unsigned long long)8 * 60 * 60 * 1000;//此处转化为东八区北京时间,如果是其它时区需要按需求修改auto mTime = std::chrono::milliseconds(milli);auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(mTime);auto tt = std::chrono::system_clock::to_time_t(tp);tm timeinfo;::gmtime_s(&timeinfo, &tt);//char s[60]{ 0 };//::sprintf(s, "%04d-%02d-%02d %02d:%02d:%02d", timeinfo.tm_year + 1900, timeinfo.tm_mon + 1, timeinfo.tm_mday, timeinfo.tm_hour, timeinfo.tm_min, timeinfo.tm_sec);// std::cout << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;
#if 0std::stringstream ss;std::string dateStr;ss << timeinfo.tm_year + 1900 << "-"<< timeinfo.tm_mon + 1 << "-"<< timeinfo.tm_mday;ss >> dateStr;ss.clear();ss << timeinfo.tm_hour << ":"<< timeinfo.tm_min << ":"<< timeinfo.tm_sec;std::string timeStr;ss >> timeStr;std::string dateTimeStr;dateTimeStr += dateStr;dateTimeStr.push_back(' ');dateTimeStr += timeStr;
#endif // 0//std::cout << "TimeStamp" << timeprefix << " " << s << std::endl;std::cout << "TimeStamp   " << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;}std::cout << message.topic_name() << " offset" << message.offset() << "  partion " << message.partition() << " message: " << reinterpret_cast<char*>(message.payload()) << std::endl;break;case RdKafka::ERR__PARTITION_EOF:/* Last message */std::cout << "EOF reached for" << std::endl;break;case RdKafka::ERR__UNKNOWN_TOPIC:case RdKafka::ERR__UNKNOWN_PARTITION:std::cout << "Consume failed: " << message.errstr();break;default:/* Errors */std::cout << "Consume failed: " << message.errstr();break;}
}
int main()
{std::string brokers = "172.18.4.96:9092";std::string errstr;std::vector<std::string> topics{ "koala-stqf-03","klai-seim-alert-koala-test-03"};std::string group_id = "whl-consumer-group";RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);if (conf->set("group.id", group_id, errstr)) {std::cout << errstr << std::endl;return -1;}conf->set("bootstrap.servers", brokers, errstr);conf->set("max.partition.fetch.bytes", "1024000", errstr);//conf->set("enable-auto-commit", "true", errstr);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);tconf->set("auto.offset.reset", "latest", errstr);conf->set("default_topic_conf", tconf, errstr);RdKafka::KafkaConsumer *m_consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!m_consumer) {std::cout << "failed to create consumer " << errstr << std::endl;return -1;}#if 0 //从上一次消费结束的位置开始消费RdKafka::ErrorCode err = m_consumer->subscribe(topics);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;return -1;}
#else //指定每个topic的每个分区开始消费的位置//基本思路为先获取server端的状态信息,将与订阅相关的topic找出来,根据分区,创建TopicPartion;最后使用assign消费RdKafka::Metadata *metadataMap{ nullptr };RdKafka::ErrorCode err = m_consumer->metadata(true, nullptr, &metadataMap, 2000);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;}const RdKafka::Metadata::TopicMetadataVector *topicList = metadataMap->topics();std::cout << "broker topic size: " << topicList->size() << std::endl;RdKafka::Metadata::TopicMetadataVector subTopicMetaVec;std::copy_if(topicList->begin(), topicList->end(), std::back_inserter(subTopicMetaVec), [&topics](const RdKafka::TopicMetadata* data) {return std::find_if(topics.begin(), topics.end(), [data](const std::string &tname) {return data->topic() == tname; }) != topics.end();});std::vector<RdKafka::TopicPartition*> topicpartions;std::for_each(subTopicMetaVec.begin(), subTopicMetaVec.end(), [&topicpartions](const RdKafka::TopicMetadata* data) {auto parVec = data->partitions();std::for_each(parVec->begin(), parVec->end(), [&](const RdKafka::PartitionMetadata *value) {std::cout << data->topic() << " has partion: " << value->id() << " Leader is : " << value->leader() << std::endl;topicpartions.push_back(RdKafka::TopicPartition::create(data->topic(), value->id(), RdKafka::Topic::OFFSET_END));});});m_consumer->assign(topicpartions);
#endif // 0RdKafka::ErrorCode errccc = m_consumer->subscribe(topics);if (errccc != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(errccc) << std::endl;return -1;}while (true){RdKafka::Message *msg = m_consumer->consume(6000);consume_cb(*msg, nullptr); //消息一条消息delete msg;}return 0;
}

6、发布 订阅展示:

相关文章:

Windows10配置C++版本的Kafka,并进行发布和订阅测试

配置的环境为&#xff1a;Release x64下的环境 完整项目&#xff1a;https://gitee.com/jiajingong/kafka-publisher 1、首先下载相应的库文件&#xff08;.lib&#xff0c;.dll&#xff09; 参考链接&#xff1a; GitHub - eStreamSoftware/delphi-kafka GitHub - cloade…...

vue3 下载文件 responseType-blob 或者 a标签

在 Vue 3 中&#xff0c;你可以使用 axios 或 fetch 来下载文件&#xff0c;并将 responseType 设置为 blob 以处理二进制数据。以下是一个使用 axios 的示例&#xff1a; 使用 axios 下载文件 首先&#xff0c;确保你已经安装了 axios&#xff1a; npm install axios然后在你…...

【Gin-Web】Bluebell社区项目梳理6:限流策略-漏桶与令牌桶

本文目录 一、限流二、漏桶三、令牌桶算法四、Gin框架中实现令牌桶限流 一、限流 限流又称为流量控制&#xff0c;也就是流控&#xff0c;通常是指限制到达系统的并发请求数。 限流虽然会影响部分用户的使用体验&#xff0c;但是能一定程度上保证系统的稳定性&#xff0c;不至…...

51单片机-AT24CXX存储器工作原理

1、AT24CXX存储器工作原理 1.1、特点&#xff1a; 与400KHz&#xff0c;I2C总线兼容1.8到6.0伏工作电压范围低功耗CMOS技术写保护功能当WP为高电平时进入写保护状态页写缓冲器自定时擦写周期100万次编程/擦除周期可保存数据100年8脚DIP SOIC或TSSOP封装温度范围商业级和工业级…...

突破性能极限:DeepSeek开源FlashMLA解码内核技术解析

引言&#xff1a;大模型时代的推理加速革命 在生成式AI大行其道的今天&#xff0c;如何提升大语言模型的推理效率已成为行业焦点。DeepSeek团队最新开源的FlashMLA项目凭借其惊人的性能表现引发关注——在H800 GPU上实现580 TFLOPS计算性能&#xff0c;这正是大模型推理优化的…...

点击修改按钮图片显示有问题

问题可能出在表单数据的初始化上。在 ave-form.vue 中&#xff0c;我们需要处理一下从后端返回的图片数据&#xff0c;因为它们可能是 JSON 字符串格式。 vue:src/views/tools/fake-strategy/components/ave-form.vue// ... existing code ...Watch(value)watchValue(v: any) …...

[AI]从零开始的树莓派运行DeepSeek模型教程

一、前言 在前面的教程中&#xff0c;教了大家如何在windows中使用llama.cpp来运行DeepSeek模型。根据前面的教程中&#xff0c;我们也了解到了&#xff0c;我们只需要编译好llama.cpp就可以运行DeepSeek以及类似的LLM模型。那么本次教程就来教大家如何使用树莓派来运行大模型。…...

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷(二)

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷&#xff08;二&#xff09; 第一部分&#xff1a;网络平台搭建与设备安全防护任务书第二部分&#xff1a;网络安全事件响应、数字取证调查、应用程序安全任务书任务 1&#xff1a;应急响应&…...

Open WebUI本地部署教程

文章目录 1、系统环境配置2、源码下载2.1 github源码地址下载 3、环境启动3.1 后端环境3.2 前端环境 4、问题4.1 浏览器跨域问题4.2 all-MiniLM-L6-v2模型文件下载失败问题4.3 单独部署backend启动报错问题 1、系统环境配置 操作系统&#xff1a;windows/linux/macos Python版…...

Missing required prop: “maxlength“

背景&#xff1a; 封装一个使用功能相同使用频率较高的input公共组件作为子组件&#xff0c;大多数长度要求为200&#xff0c;且实时显示统计子数&#xff0c;部分input有输入提示。 代码实现如下&#xff1a; <template><el-input v-model"inputValue" t…...

dify本地部署

安装docker。 在官网安装docker。 如果遇到wsl报错&#xff0c;就使用 wsl --updata 进行更新。如果问题解决&#xff0c;进入docker应该是如下界面&#xff1a; 克隆 在自己创建的文件内使用 git clone gitgithub.com:langgenius/dify.git 或 git clone https://github.com…...

python学习一

学习网络安全为什么要学python? 1、在实际的渗透测试过程中,面对复杂多变的网络环境,当常用工 具不能满足实际需求的时候,往往需要对现有工具进行扩展,或者 编写符合我们要求的工具、自动化脚本,这个时候就需要具备一定 的编程能力。 2、python是一门编程语言经常用它…...

git branch

文章目录 1.简介2.格式3.选项4.示例参考文献 1.简介 git branch 用于管理分支&#xff0c;包括查看、创建、删除、重命名和关联。 git branch 是 Git 版本控制系统中用于管理分支的命令。分支是 Git 的核心功能之一&#xff0c;允许开发者在同一个代码库中并行开发不同的功能…...

算法-图-数据结构(邻接矩阵)-BFS广度优先遍历

邻接矩阵广度优先遍历&#xff08;BFS&#xff09;是一种用于遍历或搜索图的算法&#xff0c;以下是具体介绍&#xff1a; 1. 基本概念 图是一种非线性的数据结构&#xff0c;由顶点和边组成&#xff0c;可分为无向图、有向图、加权图、无权图等。邻接矩阵是表示图的一种数…...

数学建模之数学模型—2:非线性规划

文章目录 非线性规划基本概念与结论凸集与凸函数极值条件无约束条件的极值判断条件有约束条件的极值判断条件 无约束非线性规划一维搜索算法步骤示例特点代码模板 最速下降法算法详细步骤 代码实现示例最优步长的求解 黄金分割法斐波那契法牛顿法阻尼牛顿法模式搜索法Powell方法…...

unity学习51:所有UI的父物体:canvas画布

目录 1 下载资源 1.1 在window / Asset store下下载一套免费的UI资源 1.2 下载&#xff0c;导入import 1.3 导入后在 project / Asset下面可以看到 2 画布canvas&#xff0c;UI的父物体 2.1 创建canvas 2.1.1 画布的下面是 event system是UI相关的事件系统 2.2 canvas…...

ctfshow做题笔记—栈溢出—pwn57~pwn60

目录 前言 一、pwn57&#xff08;先了解一下简单的64位shellcode吧&#xff09; 二、pwn58 三、pwn59&#xff08;64位 无限制&#xff09; 四、pwn60&#xff08;入门难度shellcode&#xff09; 前言 往前写了几道题&#xff0c;与shellcode有关&#xff0c;关于shellc…...

数据结构 1-2 线性表的链式存储-链表

1 原理 顺序表的缺点&#xff1a; 插入和删除移动大量元素数组的大小不好控制占用一大段连续的存储空间&#xff0c;造成很多碎片 链表规避了上述顺序表缺点 逻辑上相邻的两个元素在物理位置上不相邻 头结点 L&#xff1a;头指针 头指针&#xff1a;链表中第一个结点的存储…...

ArcGIS Pro进行坡度与坡向分析

在地理信息系统中&#xff0c;坡度分析是一项至关重要的空间分析方法&#xff0c;旨在精确计算地表或地形的坡度&#xff0c;为地形特征识别、土地资源规划、环境保护、灾害预警等领域提供科学依据。本文将详细介绍如何利用ArcGIS Pro这一强大的地理信息系统软件&#xff0c;进…...

My first Android application

界面元素组成&#xff1a; 功能代码&#xff1a; /*实现功能&#xff1a;当输入内容后&#xff0c;欢迎文本发生相应改变&#xff0c;并清除掉文本域内容当未输入任何内容时&#xff0c;弹出提示文本以警告用户*/val greetingText findViewById<TextView>(R.id.printer)…...

Python|GIF 解析与构建(5):手搓截屏和帧率控制

目录 Python&#xff5c;GIF 解析与构建&#xff08;5&#xff09;&#xff1a;手搓截屏和帧率控制 一、引言 二、技术实现&#xff1a;手搓截屏模块 2.1 核心原理 2.2 代码解析&#xff1a;ScreenshotData类 2.2.1 截图函数&#xff1a;capture_screen 三、技术实现&…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

Appium+python自动化(十六)- ADB命令

简介 Android 调试桥(adb)是多种用途的工具&#xff0c;该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具&#xff0c;其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利&#xff0c;如安装和调试…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

线程同步:确保多线程程序的安全与高效!

全文目录&#xff1a; 开篇语前序前言第一部分&#xff1a;线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分&#xff1a;synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分&#xff…...

【磁盘】每天掌握一个Linux命令 - iostat

目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat&#xff08;I/O Statistics&#xff09;是Linux系统下用于监视系统输入输出设备和CPU使…...

2021-03-15 iview一些问题

1.iview 在使用tree组件时&#xff0c;发现没有set类的方法&#xff0c;只有get&#xff0c;那么要改变tree值&#xff0c;只能遍历treeData&#xff0c;递归修改treeData的checked&#xff0c;发现无法更改&#xff0c;原因在于check模式下&#xff0c;子元素的勾选状态跟父节…...

大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计

随着大语言模型&#xff08;LLM&#xff09;参数规模的增长&#xff0c;推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长&#xff0c;而KV缓存的内存消耗可能高达数十GB&#xff08;例如Llama2-7B处理100K token时需50GB内存&a…...

C++使用 new 来创建动态数组

问题&#xff1a; 不能使用变量定义数组大小 原因&#xff1a; 这是因为数组在内存中是连续存储的&#xff0c;编译器需要在编译阶段就确定数组的大小&#xff0c;以便正确地分配内存空间。如果允许使用变量来定义数组的大小&#xff0c;那么编译器就无法在编译时确定数组的大…...

Python Einops库:深度学习中的张量操作革命

Einops&#xff08;爱因斯坦操作库&#xff09;就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库&#xff0c;用类似自然语言的表达式替代了晦涩的API调用&#xff0c;彻底改变了深度学习工程…...