当前位置: 首页 > article >正文

C语言操作Kafka

Kafka服务

Kafka的快速入门 文档很详细,基本上几步就可以搭建一个Kafka测试环境。

下载Kafka的二进制包,然后解压。

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/4.0.0/kafka_2.13-4.0.0.tgz
tar -xzf kafka_2.13-4.0.0.tgz
cd kafka_2.13-4.0.0

生成集群ID,使用集群ID格式化存储

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

启动Kafka Server。

$ bin/kafka-server-start.sh config/server.properties

启动之后,默认的Kafka Server会连续输出日志到控制台。后面的测试命令,需要在另外的终端执行。

创建主题

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

测试生产

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event

测试消费

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

C语言的Kafka库:librdkafka

在C语言中操作Kafka,有不少库可以选择,其中就有librdkafka。

根据librdkafka在gitthub上的信息,它主要是由C语言开发,但是也提供了C++的支持。

当我们在Linux平台上安装librdkafka之后,通过pkg-config命令加库名rdkafka,可以获取编译相关的信息。

如在Fedora 42上:

pkg-config --cflags --libs rdkafka  
-DWITH_GZFILEOP -lrdkafka

需要注意的是,rdkafka.pc中记录的头文件路径是/usr/include,但是它的头文件都位于/usr/include/librdkafka。在C语言的源代码中需要写成:

#include <librdkafka/rdkafka.h>

在C++中则需要写成:

#include <librdkafka/rdkafkacpp.h>

但是C++的API与C语言的类似,后续不再提及C++的相关API,仅以C语言举例。

rdkafka API

rd_kafka_t

API的核心是结构rd_kafka_t

它的创建和销毁是一对函数:

rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,  rd_kafka_conf_t *conf,  char *errstr,  size_t errstr_size);  void rd_kafka_destroy(rd_kafka_t *rk);

其中,rd_kafka_type_t是一个enum,分别为RD_KAFKA_PRODUCERRD_KAFKA_CONSUMER,即生产者与消费者。

rd_kafka_conf_t

rd_kafka_conf_t是另一个结构,创建与销毁函数为:

rd_kafka_conf_t *rd_kafka_conf_new(void);void rd_kafka_conf_destroy(rd_kafka_conf_t *conf);

rd_kafka_conf_t创建之后,使用如下函数设置参数:

rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,  const char *name,  const char *value,  char *errstr,  size_t errstr_size);

如:bootstrap.serverssasl.usernamesasl.password等。

rd_kafka_conf_t设置相关的函数返回值都是rd_kafka_conf_res_t。如果成功,值为RD_KAFKA_CONF_OK,即0。

如果要进行更详细的控制,还可以使用其它一些函数。

如:

void rd_kafka_conf_set_dr_msg_cb(  rd_kafka_conf_t *conf,  void (*dr_msg_cb)(rd_kafka_t *rk,  const rd_kafka_message_t *rkmessage,  void *opaque));

可以设置消息被处理之后的回调函数。其中,rkmessage是被处理的消息,opaque是使用rd_kafka_conf_set_opaque()设置的用户层指针。

证书设置

rd_kafka_conf_set_ssl_cert用于设置证书。

rd_kafka_conf_res_t  
rd_kafka_conf_set_ssl_cert(rd_kafka_conf_t *conf,  rd_kafka_cert_type_t cert_type,  rd_kafka_cert_enc_t cert_enc,  const void *buffer,  size_t size,  char *errstr,  size_t errstr_size);

其中,cert_type的值分别为:RD_KAFKA_CERT_PUBLIC_KEYRD_KAFKA_CERT_PRIVATE_KEYRD_KAFKA_CERT_CA,即公钥、私钥与CA。

cert_enc的值分别为:RD_KAFKA_CERT_ENC_PKCS12RD_KAFKA_CERT_ENC_DERRD_KAFKA_CERT_ENC_PEM,即分别是三种证书格式。

另外还可以设置证书验证的回调函数:

rd_kafka_conf_res_t rd_kafka_conf_set_ssl_cert_verify_cb(  rd_kafka_conf_t *conf,  int (*ssl_cert_verify_cb)(rd_kafka_t *rk,  const char *broker_name,  int32_t broker_id,  int *x509_error,  int depth,  const char *buf,  size_t size,  char *errstr,  size_t errstr_size,  void *opaque));

如果成功,回调函数必须返回1。否则需要设置errstr为错误原因,并且返回0。

rd_kafka_message_t

处理Kafka另外一个重要的结构是rd_kafka_message_t,即消息。它的定义为:

typedef struct rd_kafka_message_s {  rd_kafka_resp_err_t err;rd_kafka_topic_t *rkt; int32_t partition;void *payload;size_t len;                        void *key;size_t key_len;      int64_t offset;void *_private; 
} rd_kafka_message_t;

最开头的err非常重要。每次使用rd_kafka_consume*族函数取得一条消息,以及在生产一条消息的回调函数中时,都要检查这个值,确定是否成功。

生产者

使用rd_kafka_t生产一条消息的函数为:

int rd_kafka_produce(rd_kafka_topic_t *rkt,  int32_t partition,  int msgflags,  void *payload,  size_t len,  const void *key,  size_t keylen,  void *msg_opaque);

成功返回0。

生成一批消息的函数为:

int rd_kafka_produce_batch(rd_kafka_topic_t *rkt,  int32_t partition,  int msgflags,  rd_kafka_message_t *rkmessages,  int message_cnt);

返回值为成功的消息数目。

等待所有消息处理完成的函数为:

rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms);

或者放弃没有发送的消息:

rd_kafka_resp_err_t rd_kafka_purge(rd_kafka_t *rk, int purge_flags);

消费者

控制Kafka消费的函数有一对:

int rd_kafka_consume_start(rd_kafka_topic_t *rkt,  int32_t partition,  int64_t offset);int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition);

注意这两个函数的第一个参数是rd_kafka_topic_t

调用rd_kafka_consume_start()之后,kafka将开始把成批的消息放入本地的队列中,应用需要使用rd_kafka_consume()函数来消费。

rd_kafka_message_t *  
rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms);

这个函数在timeout_ms毫秒以内,返回一条消息,或者NULL。返回的消息,必须使用rd_kafka_message_destroy()释放。

类似生产者,消费者也有批量处理的函数:

ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt,  int32_t partition,  int timeout_ms,  rd_kafka_message_t **rkmessages,  size_t rkmessages_size);

相关文章:

C语言操作Kafka

Kafka服务 Kafka的快速入门 文档很详细&#xff0c;基本上几步就可以搭建一个Kafka测试环境。 下载Kafka的二进制包&#xff0c;然后解压。 wget https://www.apache.org/dyn/closer.cgi?path/kafka/4.0.0/kafka_2.13-4.0.0.tgz tar -xzf kafka_2.13-4.0.0.tgz cd kafka_2.…...

STM32架构解析

在嵌入式开发领域,STM32作为广泛应用的Cortex-M系列微控制器,常常被问及一个基础而深刻的问题:STM32是哈佛结构,还是冯诺依曼结构?这个问题看似简单,却涉及到计算机架构发展的历史、理论与现实的融合。 一、计算体系结构基础:冯诺依曼 vs 哈佛 1.1 冯诺依曼结构的特性…...

在线政治采购系统架构构建指南

一、系统架构设计原则 合规性优先 系统需严格遵循《中华人民共和国政府采购法》及最新修订要求&#xff0c;例如采购流程需满足公开招标不少于 20 日的法定时限&#xff0c;合同需在中标通知书发出后 30 日内签订并备案。同时&#xff0c;需预留接口以适应未来法律修订带来的流…...

UHF RFID无源标签的芯片供电原理

作为无源物联网技术中最基础的一环,UHF RFID无源标签已经被广泛用于商超零售、物流仓储、图书档案、防伪溯源等量非常大的应用领域,仅2021年度,全球出货量就超过200亿。在实际应用中UHF RFID无源标签的芯片是究竟依靠什么来供电的呢? UHF RFID无源标签供电特点 1.借助无线…...

【NLP入门系列一】NLP概述和独热编码

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 博主简介&#xff1a;努力学习的22级本科生一枚 &#x1f31f;​&#xff1b;探索AI算法&#xff0c;C&#xff0c;go语言的世界&#xff1b;在迷茫中寻找光芒…...

洛谷习题V^V

1.帮贡排序 解题思路&#xff1a;按照题意&#xff0c;排序模拟即可 #include <iostream> #include <vector> #include <algorithm> #include <string> using namespace std;struct Member {string name;string position;int contribution;int level;…...

Wireshark 在 macOS 上使用及问题解决

wireshark概述 Wireshark 是被广泛使用的免费开源网络协议分析软件&#xff08;network protocol analyzer&#xff09;或网络数据包分析工具&#xff0c;它可以让你在微观层面上查看网络上发生的事情。它的主要功能是截取网络数据包&#xff0c;并尽可能详细地展示网络数据包…...

不同电脑同一个网络ip地址一样吗?如何更改

想象一下&#xff0c;你住在同一栋公寓楼里&#xff0c;所有住户对外共享一个统一的小区地址&#xff08;类似公网IP&#xff09;&#xff0c;但每家每户又有独立的门牌号&#xff08;类似内网IP&#xff09;。网络世界中的IP地址也遵循这一逻辑&#xff1a;同一局域网内的设备…...

Qt使用智能指针

第一步&#xff1a;导入头文件 #include <QScopedPointer> 第二步:创建对象 .h文件 QSharedPointer<Student> m_pClass; .cpp文件 m_pClass.reset(new Student(param1,param2,...,param_n)); 第三步:绑定信号槽 connect(m_pClass.data(), &Class::sign…...

微软 Azure AI Foundry(国际版)十大重要更新

2025 年被广泛视为 “AI 智能体元年”。在过去半年&#xff0c;微软密集发布众多创新技术&#xff0c;构建起从基础设施层、开发工具层到场景应用层的完整技术矩阵&#xff0c;加速推动诸多具备自主决策能力的 “超级助理” 智能体落地&#xff0c;形成完整的 AI 赋能生态&…...

Realsense D435i 使用说明

D435i 驱动安装 及 ROS使用 Ubuntu16.04适配https://blog.csdn.net/lemonxiaoxiao/article/details/107834936 过程中遇到fatal error ; 需要添加标签。 使用下面网址的博客解决了。https://blog.csdn.net/xuzhengzhe/article/details/135407342 最终如下&#xff1a; target…...

PostgreSQL如何更新和删除表数据

这节说下怎样更新和删除表数据&#xff0c;当然认识命令了&#xff0c;可以问AI帮忙写。 接上节先看下天气表weather的数据&#xff0c;增加了杭州和西安的数据&#xff1a; 一.UPDATE更新命令 用UPDATE命令更新现有的行。 假设所有 杭州 5月12日的温度低了两度&#xff0c;用…...

【leetcode】704. 二分查找

二分查找 题目代码 题目 704. 二分查找 给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中的 target&#xff0c;如果目标值存在返回下标&#xff0c;否则返回 -1。 示例 1: 输入: nums [-1,0,3,…...

Golang | 运用分布式搜索引擎实现视频搜索业务

把前面所设计好的搜索引擎引用进来开发一个简单的具体的视频搜索业务。代码结构&#xff1a; handler目录&#xff1a;后端接口&#xff0c;负责接收请求并返回结果&#xff0c;不存在具体的搜索逻辑。video_search目录&#xff1a;具体的搜索逻辑存放在这&#xff0c;包括reca…...

针对Helsinki-NLP/opus-mt-zh-en模型进行双向互翻的微调

引言  题目听起来有点怪怪的&#xff0c;但是实际上就是对Helsinki-NLP/opus-mt-en-es模型进行微调。但是这个模型是单向的&#xff0c;只支持中到英的翻译&#xff0c;反之则不行。这样的话&#xff0c;如果要做中英双向互翻就需要两个模型&#xff0c;那模型体积直接大了两倍…...

【笔记】Trae+Andrioid Studio+Kotlin开发安卓WebView应用

文章目录 简介依赖步骤AS(Andriod Studio)创建项目AS创建虚拟机TRAE CN 修改项目新增按键捕获功能 新增WebViewWebView加载本地资源在按键回调中向WebView注入JS代码 最终关键代码吐槽 简介 使用Trae配合Andriod Studio开发一个内嵌WebView的安卓应用, 在WebView中加载本地资源…...

Github 2025-05-30Java开源项目日报Top10

根据Github Trendings的统计,今日(2025-05-30统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Java项目10C++项目1TypeScript项目1Keycloak: 现代应用程序和服务的开源身份和访问管理解决方案 创建周期:3846 天开发语言:Java协议类型:Ap…...

Github上一些使用技巧(缩写、Issue的Highlight)自用

1. GIthub中的一些缩写 LGTM ! 最近经常看到一些迷之缩写&#xff0c;感觉挺有意思的&#xff0c;但是有时候看到一些没见过的缩写还是有点懵逼&#xff0c;不过缩写确实也是很方便去review&#xff0c;这里就记录汇总一下&#xff1b;顺便加了一些git的基操单词&#xff08;加…...

TextIn OCR Frontend前端开源组件库发布!

为什么开源 TextIn OCR Frontend 前端组件库&#xff1f; 在 TextIn 社群中&#xff0c;我们时常接到用户反馈&#xff0c;调取 API 进行票据等文件批量识别后&#xff0c;需要另行完成前端工程&#xff0c;实现比对环节。为助力用户节省工程成本&#xff0c;TextIn 团队正式开…...

GitLens 教学(学习更新中)

GitLens 是什么&#xff1f; GitLens 是安装在 Visual Studio Code (VS Code) 中的一个功能极其强大的扩展程序&#xff0c;它直接内嵌在您的代码编辑器中&#xff0c;极大地增强了 VS Code 内置的 Git 功能。它的核心目标是&#xff1a; 深刻理解代码历史&#xff1a; 让您轻…...

C#中数据绑定的简单例子

数据绑定允许将控件的属性和数据链接起来——控件属性值发生改变&#xff0c;会导致数据跟着自动改变。 数据绑定还可以是双向的——控件属性值发生改变&#xff0c;会导致数据跟着自动改变&#xff1b;数据发生改变&#xff0c;也会导致控件属性值跟着自动改变。 1、数据绑定…...

VR 技术在农业领域或许是一抹新曙光​

在科技日新月异的今天&#xff0c;VR(虚拟现实)技术已不再局限于游戏、影视等娱乐范畴&#xff0c;正逐步渗透到各个传统行业&#xff0c;为其带来全新的发展契机&#xff0c;农业领域便是其中之一。VR 技术利用计算机生成三维虚拟世界&#xff0c;给予用户视觉、听觉、触觉等多…...

【JVM】Java程序运行时数据区

运行时数据区 运行时数据区是Java程序执行过程中管理的内存区域 Java 运行时数据区组成&#xff08;JVM 内存结构&#xff09; Java 虚拟机&#xff08;JVM&#xff09;的运行时数据区由以下核心部分组成&#xff1a; 线程私有&#xff1a;程序计数器、Java虚拟机栈、本地方…...

NVIDIA英伟达describe-anything软件本地电脑安装部署完整教程

describe-anything是英伟达联合其他大学开发的一款图片视频内容分析总结软件&#xff0c;可通过AI描述任意图片视频选中区域内容&#xff0c;非常强大&#xff0c;下面是describe-anything本地电脑安装部署教程。 首先电脑上安装git https://github.com/git-for-windows/git/…...

计算机视觉入门:OpenCV与YOLO目标检测

计算机视觉入门&#xff1a;OpenCV与YOLO目标检测 系统化学习人工智能网站&#xff08;收藏&#xff09;&#xff1a;https://www.captainbed.cn/flu 文章目录 计算机视觉入门&#xff1a;OpenCV与YOLO目标检测摘要引言技术原理对比1. OpenCV&#xff1a;传统图像处理与机器学…...

Java 中的 ThreadLocal 详解:从基础到源码

Java 中的 ThreadLocal 详解&#xff1a;从基础到源码 引言 在 Java 多线程编程中&#xff0c;ThreadLocal是一个经常被提及的概念。它提供了一种线程局部变量的机制&#xff0c;使得每个线程都可以独立地存储和访问自己的变量副本&#xff0c;而不会与其他线程产生冲突。本文…...

(二)开启深度学习动手之旅:先筑牢预备知识根基

1 数据操作 数据操作是深度学习的基础&#xff0c;包括数据的创建、索引、切片、运算等操作。这些操作是后续复杂模型构建和训练的前提。 入门 &#xff1a;理解如何使用NumPy创建数组&#xff0c;这是深度学习中数据存储的基本形式。掌握数组的属性&#xff08;如数据类型dt…...

Spring Boot3.4.1 集成redis

Spring Boot3.4.1 集成redis 第一步 引入依赖 <!-- redis 缓存操作 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- pool 对象池 …...

【Prometheus+Grafana实战:搭建监控系统(含告警配置)】

什么是Prometheus和Grafana&#xff1f; Prometheus&#xff1a;一款开源的监控告警工具&#xff0c;擅长时序数据存储和多维度查询&#xff08;通过PromQL&#xff09;&#xff0c;采用Pull模型主动抓取目标指标。Grafana&#xff1a;数据可视化平台&#xff0c;支持多种数据…...

操作系统原理第9章 磁盘存储器管理 重点内容

目录 &#xff08;一&#xff09;外存的组织方式种类 &#xff08;二&#xff09;FAT 系统&#xff08;计算&#xff09; &#xff08;三&#xff09;文件存储空间的管理方式 &#xff08;一&#xff09;外存的组织方式种类 连续组织方式 原理&#xff1a;在磁盘等外存上&…...