Ubuntu下C语言操作kafka示例
目录
安装kafka:
安装librdkafka
consumer
Producer
测试运行
安装kafka:
Ubuntu下Kafka安装及使用_ubuntu安装kafka-CSDN博客
安装librdkafka
github地址:GitHub - confluentinc/librdkafka: The Apache Kafka C/C++ library
$ apt install librdkafka-dev
安装路径如下:
consumer
/*** Simple high-level balanced Apache Kafka consumer* using the Kafka driver from librdkafka* (https://github.com/edenhill/librdkafka)*/#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>/* Typical include path would be <librdkafka/rdkafka.h>, but this program* is builtin from within the librdkafka source tree and thus differs. */
//#include <librdkafka/rdkafka.h>
#include "rdkafka.h"static volatile sig_atomic_t run = 1;/*** @brief Signal termination of program*/
static void stop (int sig) {run = 0;
}/*** @returns 1 if all bytes are printable, else 0.*/
static int is_printable (const char *buf, size_t size) {size_t i;for (i = 0 ; i < size ; i++)if (!isprint((int)buf[i]))return 0;return 1;
}int main (int argc, char **argv) {rd_kafka_t *rk; /* Consumer instance handle */rd_kafka_conf_t *conf; /* Temporary configuration object */rd_kafka_resp_err_t err; /* librdkafka API error code */char errstr[512]; /* librdkafka API error reporting buffer */const char *brokers; /* Argument: broker list */const char *groupid; /* Argument: Consumer group id */char **topics; /* Argument: list of topics to subscribe to */int topic_cnt; /* Number of topics to subscribe to */rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */int i;/** Argument validation*/if (argc < 4) {fprintf(stderr,"%% Usage: ""%s <broker> <group.id> <topic1> <topic2>..\n",argv[0]);return 1;}brokers = argv[1];groupid = argv[2];topics = &argv[3];topic_cnt = argc - 3;/** Create Kafka client configuration place-holder*/conf = rd_kafka_conf_new(); // 创建配置文件/* Set bootstrap broker(s) as a comma-separated list of* host or host:port (default port 9092).* librdkafka will use the bootstrap brokers to acquire the full* set of brokers from the cluster. */if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {fprintf(stderr, "%s\n", errstr);rd_kafka_conf_destroy(conf);return 1;}/* Set the consumer group id.* All consumers sharing the same group id will join the same* group, and the subscribed topic' partitions will be assigned* according to the partition.assignment.strategy* (consumer config property) to the consumers in the group. */if (rd_kafka_conf_set(conf, "group.id", groupid,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {fprintf(stderr, "%s\n", errstr);rd_kafka_conf_destroy(conf);return 1;}/* If there is no previously committed offset for a partition* the auto.offset.reset strategy will be used to decide where* in the partition to start fetching messages.* By setting this to earliest the consumer will read all messages* in the partition if there was no previously committed offset. */if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {fprintf(stderr, "%s\n", errstr);rd_kafka_conf_destroy(conf);return 1;}/** Create consumer instance.** NOTE: rd_kafka_new() takes ownership of the conf object* and the application must not reference it again after* this call.*/// 创建一个kafka消费者rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));if (!rk) {fprintf(stderr,"%% Failed to create new consumer: %s\n", errstr);return 1;}conf = NULL; /* Configuration object is now owned, and freed,* by the rd_kafka_t instance. *//* Redirect all messages from per-partition queues to* the main queue so that messages can be consumed with one* call from all assigned partitions.** The alternative is to poll the main queue (for events)* and each partition queue separately, which requires setting* up a rebalance callback and keeping track of the assignment:* but that is more complex and typically not recommended. */rd_kafka_poll_set_consumer(rk);// poll机制,设置消费者实例到poll中/* Convert the list of topics to a format suitable for librdkafka */// 创建主题分区列表subscription = rd_kafka_topic_partition_list_new(topic_cnt);for (i = 0 ; i < topic_cnt ; i++)rd_kafka_topic_partition_list_add(subscription,topics[i],/* the partition is ignored* by subscribe() */RD_KAFKA_PARTITION_UA);/* Subscribe to the list of topics */err = rd_kafka_subscribe(rk, subscription);if (err) {fprintf(stderr,"%% Failed to subscribe to %d topics: %s\n",subscription->cnt, rd_kafka_err2str(err));rd_kafka_topic_partition_list_destroy(subscription);rd_kafka_destroy(rk);return 1;}fprintf(stderr,"%% Subscribed to %d topic(s), ""waiting for rebalance and messages...\n",subscription->cnt);rd_kafka_topic_partition_list_destroy(subscription);/* Signal handler for clean shutdown */signal(SIGINT, stop);/* Subscribing to topics will trigger a group rebalance* which may take some time to finish, but there is no need* for the application to handle this idle period in a special way* since a rebalance may happen at any time.* Start polling for messages. */while (run) {rd_kafka_message_t *rkm;rkm = rd_kafka_consumer_poll(rk, 100);if (!rkm)continue; /* Timeout: no message within 100ms,* try again. This short timeout allows* checking for `run` at frequent intervals.*//* consumer_poll() will return either a proper message* or a consumer error (rkm->err is set). */if (rkm->err) {/* Consumer errors are generally to be considered* informational as the consumer will automatically* try to recover from all types of errors. */fprintf(stderr,"%% Consumer error: %s\n",rd_kafka_message_errstr(rkm));rd_kafka_message_destroy(rkm);continue;}/* Proper message. */printf("Message on %s [%"PRId32"] at offset %"PRId64":\n",rd_kafka_topic_name(rkm->rkt), rkm->partition,rkm->offset);/* Print the message key. */if (rkm->key && is_printable(rkm->key, rkm->key_len))printf(" Key: %.*s\n",(int)rkm->key_len, (const char *)rkm->key);else if (rkm->key)printf(" Key: (%d bytes)\n", (int)rkm->key_len);/* Print the message value/payload. */if (rkm->payload && is_printable(rkm->payload, rkm->len))printf(" Value: %.*s\n",(int)rkm->len, (const char *)rkm->payload);else if (rkm->payload)printf(" Value: (%d bytes)\n", (int)rkm->len);rd_kafka_message_destroy(rkm);}/* Close the consumer: commit final offsets and leave the group. */fprintf(stderr, "%% Closing consumer\n");rd_kafka_consumer_close(rk);/* Destroy the consumer */rd_kafka_destroy(rk);return 0;
}
编译:
gcc consumer.c -o consumer -I/usr/include/librdkafka -L/usr/lib/x86_64-linux-gnu -lrdkafka++ -lrdkafka
Producer
/*** Simple Apache Kafka producer* using the Kafka driver from librdkafka* (https://github.com/edenhill/librdkafka)*/#include <stdio.h>
#include <signal.h>
#include <string.h>/* Typical include path would be <librdkafka/rdkafka.h>, but this program* is builtin from within the librdkafka source tree and thus differs. */
#include "rdkafka.h"static volatile sig_atomic_t run = 1;/*** @brief Signal termination of program*/
static void stop (int sig) {run = 0;fclose(stdin); /* abort fgets() */
}/*** @brief Message delivery report callback.** This callback is called exactly once per message, indicating if* the message was succesfully delivered* (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently* failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).** The callback is triggered from rd_kafka_poll() and executes on* the application's thread.*/
static void dr_msg_cb (rd_kafka_t *rk,const rd_kafka_message_t *rkmessage, void *opaque) {if (rkmessage->err)fprintf(stderr, "%% Message delivery failed: %s\n",rd_kafka_err2str(rkmessage->err));elsefprintf(stderr,"%% Message delivered (%zd bytes, ""partition %"PRId32")\n",rkmessage->len, rkmessage->partition);/* The rkmessage is destroyed automatically by librdkafka */
}int main (int argc, char **argv) {rd_kafka_t *rk; /* Producer instance handle */rd_kafka_conf_t *conf; /* Temporary configuration object */char errstr[512]; /* librdkafka API error reporting buffer */char buf[512]; /* Message value temporary buffer */const char *brokers; /* Argument: broker list */const char *topic; /* Argument: topic to produce to *//** Argument validation*/if (argc != 3) {fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);return 1;}brokers = argv[1];topic = argv[2];/** Create Kafka client configuration place-holder*/conf = rd_kafka_conf_new();/* Set bootstrap broker(s) as a comma-separated list of* host or host:port (default port 9092).* librdkafka will use the bootstrap brokers to acquire the full* set of brokers from the cluster. */if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {fprintf(stderr, "%s\n", errstr);return 1;}/* Set the delivery report callback.* This callback will be called once per message to inform* the application if delivery succeeded or failed.* See dr_msg_cb() above.* The callback is only triggered from rd_kafka_poll() and* rd_kafka_flush(). */rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);/** Create producer instance.** NOTE: rd_kafka_new() takes ownership of the conf object* and the application must not reference it again after* this call.*/rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));if (!rk) {fprintf(stderr,"%% Failed to create new producer: %s\n", errstr);return 1;}/* Signal handler for clean shutdown */signal(SIGINT, stop);fprintf(stderr,"%% Type some text and hit enter to produce message\n""%% Or just hit enter to only serve delivery reports\n""%% Press Ctrl-C or Ctrl-D to exit\n");while (run && fgets(buf, sizeof(buf), stdin)) {size_t len = strlen(buf);rd_kafka_resp_err_t err;if (buf[len-1] == '\n') /* Remove newline */buf[--len] = '\0';if (len == 0) {/* Empty line: only serve delivery reports */rd_kafka_poll(rk, 0/*non-blocking */);continue;}/** Send/Produce message.* This is an asynchronous call, on success it will only* enqueue the message on the internal producer queue.* The actual delivery attempts to the broker are handled* by background threads.* The previously registered delivery report callback* (dr_msg_cb) is used to signal back to the application* when the message has been delivered (or failed).*/retry:err = rd_kafka_producev(/* Producer handle */rk,/* Topic name */RD_KAFKA_V_TOPIC(topic),/* Make a copy of the payload. */RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),/* Message value and length */RD_KAFKA_V_VALUE(buf, len),/* Per-Message opaque, provided in* delivery report callback as* msg_opaque. */RD_KAFKA_V_OPAQUE(NULL),/* End sentinel */RD_KAFKA_V_END);if (err) {/** Failed to *enqueue* message for producing.*/fprintf(stderr,"%% Failed to produce to topic %s: %s\n",topic, rd_kafka_err2str(err));if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {/* If the internal queue is full, wait for* messages to be delivered and then retry.* The internal queue represents both* messages to be sent and messages that have* been sent or failed, awaiting their* delivery report callback to be called.** The internal queue is limited by the* configuration property* queue.buffering.max.messages */rd_kafka_poll(rk, 1000/*block for max 1000ms*/);goto retry;}} else {fprintf(stderr, "%% Enqueued message (%zd bytes) ""for topic %s\n",len, topic);}/* A producer application should continually serve* the delivery report queue by calling rd_kafka_poll()* at frequent intervals.* Either put the poll call in your main loop, or in a* dedicated thread, or call it after every* rd_kafka_produce() call.* Just make sure that rd_kafka_poll() is still called* during periods where you are not producing any messages* to make sure previously produced messages have their* delivery report callback served (and any other callbacks* you register). */rd_kafka_poll(rk, 0/*non-blocking*/);}/* Wait for final messages to be delivered or fail.* rd_kafka_flush() is an abstraction over rd_kafka_poll() which* waits for all messages to be delivered. */fprintf(stderr, "%% Flushing final messages..\n");rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);/* If the output queue is still not empty there is an issue* with producing messages to the clusters. */if (rd_kafka_outq_len(rk) > 0)fprintf(stderr, "%% %d message(s) were not delivered\n",rd_kafka_outq_len(rk));/* Destroy the producer instance */rd_kafka_destroy(rk);return 0;
}
编译:
gcc producer.c -o producer -I/usr/include/librdkafka -L/usr/lib/x86_64-linux-gnu -lrdkafka++ -lrdkafka
测试运行
启动kafka:
bin/kafka-server-start.sh config/server.properties&
创建主题:demo1是主机名,mydemo1是主题。
./bin/kafka-topics.sh --create --bootstrap-server demo1:9092 --replication-factor 1 --partitions 1 --topic mydemo1
启动producer:
./producer demo1:9092 mydemo1
启动consumer:
./consumer demo1:9092 0 mydemo1
在producer终端输入测试消息,在consumer终端能够看到测试消息。
相关文章:

Ubuntu下C语言操作kafka示例
目录 安装kafka: 安装librdkafka consumer Producer 测试运行 安装kafka: Ubuntu下Kafka安装及使用_ubuntu安装kafka-CSDN博客 安装librdkafka github地址:GitHub - confluentinc/librdkafka: The Apache Kafka C/C library $ apt in…...

怎么将pdf中的某一个提取出来?介绍几种提取PDF中页面的方法
怎么将pdf中的某一个提取出来?传统上,我们可能通过手动截取屏幕或使用PDF阅读器的复制功能来提取信息,但这种方法往往不够精确,且无法保留原文档的排版和格式。此外,很多时候我们需要提取的内容可能涉及多个页面、多个…...

HTTP接口报错详解与解决 200,500,403,408,404
前言: 仅做学习记录,侵删 背景 当后端编写接口时,经常需要对接口使用ApiFox或者PostMan进行测试,此时就会出现各种各样的报错,一般都会包括报错编码:200,400,401等。这个状态码一般是服务器所返回的包含…...

监控IP频繁登录服务器脚本
该脚本的作用是监控IP登录失败次数,如果某个IP的登录失败次数超过设定的最大次数,则阻止该IP的进一步登录尝试。通过iptables防火墙阻止连接,当一个IP尝试登录次数超过5次时,iptables会阻止来自该IP的所有连接 #!/bin/bashfuncti…...

分布式链路追踪-03-Jaeger、Zipkin、skywalking 中的 span 是如何设计的?
开源项目 auto-log 自动日志输出 Jaeger、Zipkin 中的 spanId 是如何生成的? 在 Jaeger 和 Zipkin 这两个分布式跟踪系统中,Span ID 是通过不同的方法生成的。 下面分别介绍它们的生成方式: Jaeger 中的 Span ID 生成: 在 Ja…...

【达梦数据库】获取对象DDL
目录 背景获取表的DDL其他 背景 在排查问题时总会遇到获取对象DDL的问题,因此做以下总结。 获取表的DDL 设置disql工具中显示LONG类型数据的最大长度,避免截断: SET LONG 9999获取DDL SELECT DBMS_METADATA.GET_DDL(TABLE,表名,模式名) …...

InnoDB和MyISAM引擎优缺点和区别
nnoDB和MyISAM是MySQL数据库中常用的两种存储引擎。它们各自具有不同的特性和优势,适用于不同的应用场景。 一、InnoDB引擎: 1、它有如下特性: 1)、支持事务(ACID) 2)、支持外键约束(FOREIGN KEY const…...

文件上传知识点汇总
归纳总结一下文件上传(其实是懒得写wp) 基于Dream ZHO师傅的CTF show 文件上传篇(web151-170,看这一篇就够啦)-CSDN博客 和dota_st 师傅的ctfshow-Web1000题系列修炼(一) | dota_st 做一篇自己的总结 目录 一、什么…...

计算机网络技术基础:5.数据通信系统
一、数据通信的基本概念 1.信息 信息是对客观事物的运动状态和存在形式的反映,可以是客观事实的形态、大小、结构、性能等描述,也可以是客观事物与外部之间的联系。信息的载体可以是数字、文字、语音、图形和图像等。计算机及其外围设备产生和交换的信息…...

光谱相机在农业的应用
一、作物生长监测1、营养状况评估 原理:不同的营养元素在植物体内的含量变化会导致植物叶片或其他组织的光谱反射率特性发生改变。例如,氮元素是植物叶绿素的重要组成部分,植物缺氮时,叶绿素含量下降,其在可见光波段&a…...

高考志愿填报:如何制定合理的志愿梯度?
高考志愿填报中常见的避雷行为,深入分析了专业选择、招生政策了解、学校选择、备选方案准备以及防诈骗等方面的关键问题,并提出了针对性的建议与策略。旨在为考生和家长提供实用的指导,助力考生科学合理地填报高考志愿,避免陷入各…...
Android基于Path的addRoundRect,Canvas剪切clipPath简洁的圆角矩形实现,Kotlin(1)
Android基于Path的addRoundRect,Canvas剪切clipPath简洁的圆角矩形实现,Kotlin(1) <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res…...

webGL硬核知识:图形渲染管渲染流程,各个阶段对应的API调用方式
一、图形渲染管线基础流程概述 WebGL 的图形渲染管线大致可分为以下几个主要阶段,每个阶段都有其特定的任务,协同工作将 3D 场景中的物体最终转换为屏幕上呈现的 2D 图像: 顶点处理(Vertex Processing)阶段࿱…...

区块链详解
1. 概述 1.1 什么是区块链? 区块链是一种分布式数据库技术,它以链式数据结构的形式存储数据,每个数据块与前一个数据块相关联,形成了一个不断增长的数据链。每个数据块中包含了一定数量的交易信息或其他数据,这些数据…...

【EXCEL 逻辑函数】AND、OR、XOR、NOT、IF、IFS、IFERROR、IFNA、SWITCH
目录 AND:当所有条件都为真时返回 TRUE,否则返回 FALSE OR:当任一条件为真时返回 TRUE,否则返回 FALSE XOR:当奇数个条件为真时返回 TRUE,否则返回 FALSE NOT :反转逻辑值 IF:根…...

ubuntu下gdb调试ROS
参考: 使用VsCode进行ROS程序调试_ros vscode 调试-CSDN博客 https://blog.csdn.net/weixin_45031801/article/details/134399664?spm1001.2014.3001.5506 一、调试准备 1.1 CMakeLists改动 注释文件中的 set(CMAKE_BUILD_TYPE "Release") #构建类…...

Docke_常用命令详解
这篇文章分享一下笔者常用的Docker命令供各位读者参考。 为什么要用Docker? 简单来说:Docker通过提供轻量级、隔离且可移植的容器化环境,使得应用在不同平台上保持一致性、易于部署和管理,具体如下 环境一致性: Docker容器使得…...

使用vue2.0或vue3.0创建自定义组件
Vue2.0创建自定义组件 在 Vue 2.0 中创建自定义组件是一个相对简单的过程。以下是一个详细的步骤指南,帮助你创建一个自定义组件。 步骤 1: 创建 Vue 组件文件 首先,你需要创建一个新的 Vue 文件(.vue 文件)。假设我们要创建一…...

Elasticsearch-DSL高级查询操作
一、禁用元数据和过滤数据 1、禁用元数据_source GET product/_search {"_source": false, "query": {"match_all": {}} }查询结果不显示元数据 禁用之前: {"took" : 0,"timed_out" : false,"_shards" : {&quo…...

【Linux】重启系统后开不开机(内核模块丢失问题)
问题 重启后开不开机报错如下: FAILED failed to start load kernel moduiles 可以看到提示module dm_mod not found 缺少了dm_mod 在内核module目录中 reboot重启可以看到这个现象: 可以看到重启启动磁盘,加载不到root 原因 dm_mod模块…...

对golang的io型进程进行off-cpu分析
背景: 对于不能占满所有cpu核数的进程,进行on-cpu的分析是没有意义的,因为可能程序大部分时间都处在阻塞状态。 实验例子程序: 以centos8和golang1.23.3为例,测试下面的程序: pprof_netio.go package m…...

Springboot中使用Retrofit
Retrofit官网 https://square.github.io/retrofit/ 配置gradle implementation("com.squareup.okhttp3:okhttp:4.12.0")implementation ("com.squareup.retrofit2:retrofit:2.11.0")implementation ("com.squareup.retrofit2:converter-gson:2.11.0…...

Ubuntu中配置内网固定IP
文章目录 背景一、配置步骤(一)首先确认网卡名称(二)确认网关(三)备份配置文件(四)编辑配置文件(五)应用配置(六)验证配置 二、注意事…...

ExcelVBA编程输出ColorIndex与对应颜色色谱
标题 ExcelVBA编程输出ColorIndex与对应颜色色谱 正文 解决问题编程输出ColorIndex与对应色谱共56,打算分4纵列输出,标题是ColorIndex,Color,Name 1. 解释VBA中的ColorIndex属性 在VBA(Visual Basic for Applications)中ÿ…...

MySQL中in和exists的使用场景
在MySQL中,IN 和 EXISTS 是用于子查询的两种常见方法,它们在不同的场景下有不同的表现和适用性。下面我将详细介绍这两种方法的使用场景、优劣,并通过实验来说明问题。 IN 子查询 使用场景: 当子查询返回的结果集较小且不包含 …...

【多线程2】start 和 run 区别,终止线程,等待线程
Thread 类使用 start 方法,启动一个线程,对于同一个 Thread 对象来说,start 只能调用一次!!! 不怕名字起的长,就怕含义不清楚! 想要启动更多线程,就是得创建新的对象&am…...

富途证券C++面试题及参考答案
C++ 中堆和栈的区别 在 C++ 中,堆和栈是两种不同的内存区域,它们有许多区别。 从内存分配方式来看,栈是由编译器自动分配和释放的内存区域。当一个函数被调用时,函数内的局部变量、函数参数等会被压入栈中,这些变量的内存空间在函数执行结束后会自动被释放。例如,在下面的…...

Go使用sqlx操作MySQL完整指南
# Go使用sqlx操作MySQL完整指南## 1. 安装依赖bash go get github.com/go-sql-driver/mysql go get github.com/jmoiron/sqlx2. 数据库基础操作 package mainimport ("fmt"_ "github.com/go-sql-driver/mysql""github.com/jmoiron/sqlx" )// 定…...

Python 爬取网页文字并保存为 txt 文件教程
引言 在网络数据获取的过程中,我们常常需要从网页中提取有用的文字信息。Python 提供了强大的库来帮助我们实现这一目标。本教程将以https://theory.gmw.cn/2023 - 08/31/content_36801268.htm为例,介绍如何使用requests库和BeautifulSoup库爬取网页文字…...

时间序列预测论文阅读和相关代码库
时间序列预测论文阅读和相关代码库列表 MLP-based的时间序列预测资料DLinearUnetTSFPDMLPLightTS 代码库以及论文库:Time-Series-LibraryUnetTSFLightTS MLP-based的时间序列预测资料 我会定期把我的所有时间序列预测论文有关的资料链接全部同步到这个文章中&#…...