当前位置: 首页 > news >正文

librdkafka的简单使用

文章目录

    • 摘要
    • kafka是什么
    • 安装环境
    • librdkafka的简单使用
      • 生产者
      • 消费者

摘要

本文是Getting Started with Apache Kafka and C/C++的中文版, kafka的hello world程序。

本文完整代码见仓库,这里只列出producer/consumer的代码


kafka是什么

本节来源:Kafka - 维基百科,自由的百科全书、Kafka入门简介 - 知乎

首先我们得知道什么是Kafka。

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制。

在这里插入图片描述

kafka有以下一些基本概念:

  • Producer - 消息生产者,就是向kafka broker发消息的客户端。
  • Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
  • Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。
  • Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  • Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消费一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
  • Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消费者可以指定偏移量来指定要消费的消息。

安装环境

上一节,kafka的概念看着比较简单,发布-订阅/生产-消费的模型。

为了可以调用Kafka的C/C++ API, 需要先安装环境。

# almlinux8
# dnf search kafka
# dnf install librdkafka-devel# dnf search glib
# dnf install glib2-devel# ubuntu22
# 开发库apt install librdkafka-dev  libglib2.0-dev# 安装docker环境apt install docker.io docker-compose# 本地安装 Kafka
## ref: https://docs.confluent.io/confluent-cli/current/install.html#cpwget -qO - https://packages.confluent.io/confluent-cli/deb/archive.key | sudo apt-key add
➜ add-apt-repository "deb https://packages.confluent.io/confluent-cli/deb stable main"apt install confluent-cli## 启动kafka
## usage: https://docs.confluent.io/confluent-cli/current/command-reference/local/kafka/confluent_local_kafka_start.html
## error: https://stackoverflow.com/questions/63776518/error-2-matches-found-based-on-name-network-nameofservice-default-is-ambiguo
## error:https://stackoverflow.com/questions/77985757/kafka-in-docker-using-confluent-cli-doesnt-workwhereis confluent
confluent: /usr/bin/confluent➜ export CONFLUENT_HOME=/usr/bin/confluent# 我执行下面命令后,没有看到Plaintext Ports信息
➜ confluent local kafka start# 停止,然后重新启动,管用了
➜ confluent local kafka stop
➜ confluent local kafka startThe local commands are intended for a single-node development environment only, NOT for production usage. See more: https://docs.confluent.io/current/cli/index.htmlPulling from confluentinc/confluent-local
Digest: sha256:ad62269bf4766820c298f7581cf872a49f46a11dbaebcccb4fd2e71049288c5b
Status: Image is up to date for confluentinc/confluent-local:7.6.0
+-----------------+-------+
| Kafka REST Port | 8082  |
| Plaintext Ports | 43465 |
+-----------------+-------+
Started Confluent Local containers "8d72d911a4".
To continue your Confluent Local experience, run `confluent local kafka topic create <topic>` and `confluent local kafka topic produce <topic>`.# Create a new topic, purchases, which you will use to produce and consume events.
➜ confluent local kafka topic create purchases
Created topic "purchases".

librdkafka的简单使用

confluenceinc/librdkafka是Apache Kafka协议的 C 库实现 ,提供生产者、消费者和管理客户端。

下面运行的程序来自:Apache Kafka and C/C++ - Getting Started Tutorial

代码中kafka的API可以查询:librdkafka: librdkafka documentation

代码中使用了glib库,日常开发我不会使用这个库,因为感觉比较冷,它的API可查询:GLib – 2.0: Automatic Cleanup


生产者

总体逻辑:

  • 从配置文件中加载配置
  • 创建生产者
  • 生产者发送消息
#include <glib.h>
#include <librdkafka/rdkafka.h>#include "common.c"#define ARR_SIZE(arr) ( sizeof((arr)) / sizeof((arr[0])) )/* Optional per-message delivery callback (triggered by poll() or flush())* when a message has been successfully delivered or permanently* failed delivery (after retries).*/
static void dr_msg_cb (rd_kafka_t *kafka_handle,const rd_kafka_message_t *rkmessage,void *opaque) {if (rkmessage->err) {g_error("Message delivery failed: %s", rd_kafka_err2str(rkmessage->err));}
}int main (int argc, char **argv) {rd_kafka_t *producer;rd_kafka_conf_t *conf;char errstr[512];// Parse the command line.if (argc != 2) {g_error("Usage: %s <config.ini>", argv[0]);return 1;}// Parse the configuration.// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdconst char *config_file = argv[1];g_autoptr(GError) error = NULL;g_autoptr(GKeyFile) key_file = g_key_file_new();if (!g_key_file_load_from_file (key_file, config_file, G_KEY_FILE_NONE, &error)) {g_error ("Error loading config file: %s", error->message);return 1;}// Load the relevant configuration sections.conf = rd_kafka_conf_new();load_config_group(conf, key_file, "default");// Install a delivery-error callback.rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);// Create the Producer instance.producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));if (!producer) {g_error("Failed to create new producer: %s", errstr);return 1;}// Configuration object is now owned, and freed, by the rd_kafka_t instance.conf = NULL;// Produce data by selecting random values from these lists.int message_count = 10;const char *topic = "purchases";const char *user_ids[6] = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};const char *products[5] = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};for (int i = 0; i < message_count; i++) {const char *key =  user_ids[random() % ARR_SIZE(user_ids)];const char *value =  products[random() % ARR_SIZE(products)];size_t key_len = strlen(key);size_t value_len = strlen(value);rd_kafka_resp_err_t err;err = rd_kafka_producev(producer,RD_KAFKA_V_TOPIC(topic),RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),RD_KAFKA_V_KEY((void*)key, key_len),RD_KAFKA_V_VALUE((void*)value, value_len),RD_KAFKA_V_OPAQUE(NULL),RD_KAFKA_V_END);if (err) {g_error("Failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));return 1;} else {g_message("Produced event to topic %s: key = %12s value = %12s", topic, key, value);}rd_kafka_poll(producer, 0);}// Block until the messages are all sent.g_message("Flushing final messages..");rd_kafka_flush(producer, 10 * 1000);if (rd_kafka_outq_len(producer) > 0) {g_error("%d message(s) were not delivered", rd_kafka_outq_len(producer));}g_message("%d events were produced to topic %s.", message_count, topic);rd_kafka_destroy(producer);return 0;
}

消费者

总体逻辑:

  • 从配置文件中加载配置
  • 创建消费者
  • 订阅topic
  • 轮询消费者的消息
#include <glib.h>
#include <librdkafka/rdkafka.h>#include "common.c"static volatile sig_atomic_t run = 1;/*** @brief Signal termination of program*/
static void stop(int sig) { run = 0; }int main(int argc, char **argv) {rd_kafka_t *consumer;rd_kafka_conf_t *conf;rd_kafka_resp_err_t err;char errstr[512];// Parse the command line.if (argc != 2) {g_error("Usage: %s <config.ini>", argv[0]);return 1;}// Parse the configuration.// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdconst char *config_file = argv[1];g_autoptr(GError) error = NULL;g_autoptr(GKeyFile) key_file = g_key_file_new();if (!g_key_file_load_from_file(key_file, config_file, G_KEY_FILE_NONE,&error)) {g_error("Error loading config file: %s", error->message);return 1;}// Load the relevant configuration sections.conf = rd_kafka_conf_new();load_config_group(conf, key_file, "default");load_config_group(conf, key_file, "consumer");// Create the Consumer instance.consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));if (!consumer) {g_error("Failed to create new consumer: %s", errstr);return 1;}rd_kafka_poll_set_consumer(consumer);// Configuration object is now owned, and freed, by the rd_kafka_t instance.conf = NULL;// Convert the list of topics to a format suitable for librdkafka.const char *topic = "purchases";rd_kafka_topic_partition_list_t *subscription =rd_kafka_topic_partition_list_new(1);rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA);// Subscribe to the list of topics.err = rd_kafka_subscribe(consumer, subscription);if (err) {g_error("Failed to subscribe to %d topics: %s", subscription->cnt,rd_kafka_err2str(err));rd_kafka_topic_partition_list_destroy(subscription);rd_kafka_destroy(consumer);return 1;}rd_kafka_topic_partition_list_destroy(subscription);// Install a signal handler for clean shutdown.signal(SIGINT, stop);// Start polling for messages.while (run) {rd_kafka_message_t *consumer_message;consumer_message = rd_kafka_consumer_poll(consumer, 500);if (!consumer_message) {g_message("Waiting...");continue;}if (consumer_message->err) {if (consumer_message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {/* We can ignore this error - it just means we've read* everything and are waiting for more data.*/} else {g_message("Consumer error: %s",rd_kafka_message_errstr(consumer_message));return 1;}} else {g_message("Consumed event from topic %s: key = %.*s value = %s",rd_kafka_topic_name(consumer_message->rkt),(int)consumer_message->key_len, (char *)consumer_message->key,(char *)consumer_message->payload);}// Free the message when we're done.rd_kafka_message_destroy(consumer_message);}// Close the consumer: commit final offsets and leave the group.g_message("Closing consumer");rd_kafka_consumer_close(consumer);// Destroy the consumer.rd_kafka_destroy(consumer);return 0;
}

相关文章:

librdkafka的简单使用

文章目录 摘要kafka是什么安装环境librdkafka的简单使用生产者消费者 摘要 本文是Getting Started with Apache Kafka and C/C的中文版&#xff0c; kafka的hello world程序。 本文完整代码见仓库&#xff0c;这里只列出producer/consumer的代码 kafka是什么 本节来源&#…...

【iOS ARKit】播放3D音频

3D音频 在前面系列中&#xff0c;我们了解如何定位追踪用户&#xff08;实际是定位用户的移动设备&#xff09;的位置与方向&#xff0c;然后通过摄像机的投影矩阵将虚拟物体投影到用户移动设备屏幕。如果用户移动了&#xff0c;则通过VIO 和 IMU更新用户的位置与方向信息&…...

ES学习日记(四)-------插件head安装和一些配套插件下载

前言 接上节,第三方插件选择了时间久,功能丰富,长得丑的head,head 插件在ES 5版本以前开箱即用非常简单&#xff0c;ES 5版本以后需要运行在node环境下&#xff0c;所以我们要先准备一下环境 一.安装Git 不装了,明儿再说,看会儿手机准备下班!!!!!!!!!...

flask+uwsgi+云服务器 部署服务端

参考&#xff1a;使用uwsgi部署flask 报错 “找不到Python应用程序&#xff0c;请检查启动日志以查找错误” 或者&#xff1a; no python application found, check your startup logs for errors debug 过程&#xff1a;查到Python uWSGI 安装配置 里面说&#xff0c;先写测…...

linux学习之路 -- 普通用户添加进sudoer列表

在Linux系统里&#xff0c;很多的操作普通用户是不能执行的&#xff0c;所以我们需要对普通用户进行提权操作&#xff0c;可我们会发现&#xff0c;一开始没有配置的话&#xff0c;是无法的提权操作的&#xff0c;下面我将介绍普通用户该如何配置sudoer列表。 首先以root 的身…...

【分类评估指标,精确率,召回率,】from sklearn.metrics import classification_report

from&#xff1a; https://zhuanlan.zhihu.com/p/368196647 多分类 from sklearn.metrics import classification_report y_true [0, 1, 2, 2, 2] y_pred [0, 0, 2, 2, 1] target_names [class 0, class 1, class 2] # print(classification_report(y_true, y_pred, targe…...

element-ui autocomplete 组件源码分享

紧接着 input 组件的源码&#xff0c;分享带输入建议的 autocomplete 组件&#xff0c;在 element-ui 官方文档上&#xff0c;没有这个组件的 api 目录&#xff0c;它的 api 是和 input 组件的 api 在一起的&#xff0c;看完源码之后发现&#xff0c;源码当中 autocomplete 组件…...

视觉SLAM理论与实践的学习链接汇总

仅供学习&#xff0c;在此感谢所有乐于分享知识的大佬们~ 一、 ORB_SLAM理论 视觉SLAM 前端 后端 回环 建图 1、 前端视觉里程计 1.1 特征点法 一文带你搞懂相机内参外参(Intrinsics & Extrinsics)-知乎 VSLAM 笔记——我们如何通过图像来计算位姿的变化&#xff…...

极光笔记|极光消息推送服务的云原生实践

摘要 极光始终秉承“以开发者为中心”的战略导向&#xff0c;极光推送&#xff08;JPush&#xff09;是国内领先的消息推送服务。极光推送&#xff08;JPush&#xff09;本质上是一种软件付费应用程序&#xff0c;结合当前主流云厂商基础施设&#xff0c;逐渐演进成了云上SaaS…...

高效八股文背诵方法

往往到了找工作高峰期&#xff0c;经常会出现八股文很多 难以背诵 的苦恼&#xff0c;下面在下结合情况&#xff0c;列举了几点自认为可以的背诵方法&#xff1a; 1. **大声朗读**&#xff1a; - 对于Java核心概念和重要理论&#xff0c;先大声朗读&#xff0c;这不仅可以帮…...

Codeforces Round 841 (Div. 2) C. Even Subarrays

题目 思路&#xff1a; #include <bits/stdc.h> using namespace std; #define int long long #define pb push_back #define fi first #define se second #define lson p << 1 #define rson p << 1 | 1 const int maxn 1e6 5, inf 1e9, maxm 4e4 5; co…...

用 SpringBoot+Redis 解决海量重复提交问题

1前言 在实际的开发项目中,一个对外暴露的接口往往会面临很多次请求&#xff0c;我们来解释一下幂等的概念&#xff1a;任意多次执行所产生的影响均与一次执行的影响相同。按照这个含义&#xff0c;最终的含义就是 对数据库的影响只能是一次性的&#xff0c;不能重复处理。如何…...

前端基础知识html

一.基础标签 1.<h1>-<h6>:定义标题&#xff0c;h最大&#xff0c;h最小 2.<font>&#xff1a;定义文本的字体&#xff0c;尺寸&#xff0c;颜色 3.<b>&#xff1a;定义粗体文本 4.<i>&#xff1a;定义斜体文本 5.<u>&#xff1a;定义文本下…...

网络原理-传输层-UDP报文结构

本文介绍UDP报文 有很多友友搞不清楚UDP报文的详细结构还有TCP的详细结构,所以专门分开来讲 以免弄混. 首先我们先看一下整个UDP结构,让大家有一个全方面的认识 下面我们来详细解释UDP报 16位源端口号(本机):就是2字节大小,16个二进制位. 16位目的端口号(目的机):也是2字节…...

TCP/IP参考模型(四层及其解析)

文章目录 1、什么是TCP/IP2、四层协议2.1 应用层&#xff08;应用程序协议&#xff09;2.2 传输层&#xff08;源端口↔️目的端口&#xff09;2.3 网络层&#xff08;主机↔️主机&#xff09;2.4 网络接口层&#xff08;主机↔️网络层&#xff09; 总结 1、什么是TCP/IP TC…...

2024第六届环境科学与可再生能源国际会议能源 (ESRE 2024) 即将召开!

2024第六届环境科学与可再生能源国际会议 能源 &#xff08;ESRE 2024&#xff09; 即将举行 2024 年 6 月 28 日至 30 日在德国法兰克福举行。ESRE 2024 年 旨在为研究人员、从业人员和专业人士提供一个论坛 从工业界、学术界和政府到研究和 发展&#xff0c;环境科学领域的专…...

CentOS配置docker外部访问

CoreOS 官方文档提供的方法 官方文档&#xff1a;​​​​​​https://coreos.com/os/docs/latest/customizing-docker.html 新建 /etc/systemd/system/docker-tcp.socket 文件 [Unit] DescriptionDocker Socket for the API[Socket] # ListenStream127.0.0.1:2375 ListenStre…...

面试前端八股文十问十答第二期

面试前端八股文十问十答第二期 作者&#xff1a;程序员小白条&#xff0c;个人博客 相信看了本文后&#xff0c;对你的面试是有一定帮助的&#xff01;关注专栏后就能收到持续更新&#xff01; ⭐点赞⭐收藏⭐不迷路&#xff01;⭐ 1&#xff09;从输入URL到页面加载的全过程…...

【漏洞复现】大华综合安防监控管理平台 Digital Surveillance System系统存在RCE漏洞

免责声明&#xff1a;文章来源互联网收集整理&#xff0c;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;所产生的一切不良后果与文章作者无关。该…...

ssm网上订餐管理系统开发mysql数据库web结构java编程计算机网页源码eclipse项目采用线性算法

一、源码特点 ssm 网上订餐管理系统是一套完善的信息系统&#xff0c;结合springMVC框架完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

Flask RESTful 示例

目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题&#xff1a; 下面创建一个简单的Flask RESTful API示例。首先&#xff0c;我们需要创建环境&#xff0c;安装必要的依赖&#xff0c;然后…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

循环冗余码校验CRC码 算法步骤+详细实例计算

通信过程&#xff1a;&#xff08;白话解释&#xff09; 我们将原始待发送的消息称为 M M M&#xff0c;依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)&#xff08;意思就是 G &#xff08; x ) G&#xff08;x) G&#xff08;x) 是已知的&#xff09;&#xff0…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

Axios请求超时重发机制

Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式&#xff1a; 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

关于 WASM:1. WASM 基础原理

一、WASM 简介 1.1 WebAssembly 是什么&#xff1f; WebAssembly&#xff08;WASM&#xff09; 是一种能在现代浏览器中高效运行的二进制指令格式&#xff0c;它不是传统的编程语言&#xff0c;而是一种 低级字节码格式&#xff0c;可由高级语言&#xff08;如 C、C、Rust&am…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

站群服务器的应用场景都有哪些?

站群服务器主要是为了多个网站的托管和管理所设计的&#xff0c;可以通过集中管理和高效资源的分配&#xff0c;来支持多个独立的网站同时运行&#xff0c;让每一个网站都可以分配到独立的IP地址&#xff0c;避免出现IP关联的风险&#xff0c;用户还可以通过控制面板进行管理功…...

从物理机到云原生:全面解析计算虚拟化技术的演进与应用

前言&#xff1a;我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM&#xff08;Java Virtual Machine&#xff09;让"一次编写&#xff0c;到处运行"成为可能。这个软件层面的虚拟化让我着迷&#xff0c;但直到后来接触VMware和Doc…...