当前位置: 首页 > news >正文

11、Kafka ------ Kafka 核心API 及 生产者API 讲解

目录

  • Kafka核心API 及 生产者API讲解
    • ★ Kafka的核心API
      • Kafka包含如下5类核心API:
    • ★ 生产者API
      • Kafka 的API 文档
    • ★ 使用生产者API发送消息

Kafka核心API 及 生产者API讲解

官方文档

★ Kafka的核心API

Kafka包含如下5类核心API:

在这里插入图片描述

Producer API(生产者API):
应用程序通过该API向主题发布消息。

Consumer API(消费者API):
应用程序通过该API订阅一个或多个主题,并从所订阅的主题中拉取消息(记录)

Streams API(流API):
应用程序可通过该API实现流处理器,可以将一个主题的消息“导流”到另一个主题,并能地对消息进行任意自定义的转换。

类似于 RabbitMQ 的 Exchange

Connector API(连接器API):
应用程序可通过这套API来实现连接器,这些连接器不断地从源系统或应用程序导入数据到Kafka,反过来也可将Kafka消息不断地导入某个接收系统或应用程序。

通过这个API,可以让应用程序和Kafka这个消息系统进行一个实时的交互,我们的系统可以不断的接收来自Kafka的消息,也可以让我们的程序不断的把数据导入到Kafka的消息系统中,就像是一个通道,所以叫连接API。

应用场景:我们的应用程序要和Kafka之间保持实时的数据流的时候,就可以用这个连接API。

AdminAPI(管理API):
应用程序可通过该API管理和检查主题、Broker和其他Kafka实体。

在这里插入图片描述



这5套API中,只有流API使用的是专门的JAR包。

其他都用的是org.apache.kafka:kafka-clients依赖库。

而流API用的是org.apache.kafka:kafka-streams依赖库。



★ 生产者API


在这里插入图片描述

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version>
</dependency>

生产者API 的核心类是 KafkaProducer,它提供了一个 send()方法 来发送消息,该方法需要传入一个 ProducerRecord<K,V>对象。

ProducerRecord 代表了一条消息,Kafka 的消息是包含了key、value、timestamp。

ProducerRecord定义了如下6个构造器:

- ProducerRecord(String topic, Integer partition, K key, V value):创建一条发送到指定主题和指定分区的消息。- ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers):创建一条发送到指定主题和指定分区的消息,且包含多个消息头。- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value):创建一条发送到指定主题和指定分区的消息,且使用给定的时间戳。- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers):创建一条发送到指定主题和指定分区的消息、使用给定的时间戳,且包含多个消息头。- ProducerRecord(String topic, K key, V value):创建一条发送到指定主题的消息。- ProducerRecord(String topic, V value):创建一条发送到指定主题的、只带value,不带key的消息。

通过查 API 文档可看这个 ProducerRecord 消息对象 的6个构造器:

在这里插入图片描述

Kafka 的API 文档

Kafka 的API 文档

在这里插入图片描述

★ 使用生产者API发送消息

使用生产者API发送消息很简单,基本只要两步:

1、创建KafkaProducer对象,创建该对象时要传入Properties对象,用于对该生产者进行配置。

2、调用KafkaProducer对象的send()方法发送消息,调用ProducerRecord的构造器即可创建不同的消息。

3、发送完成后,关闭KafkaProducer对象。



为何Kafka的KafkaProducer需要一个Properties来来创建KafkaProducer?

因为Kafka的Producer API提供了海量的配置选项——如果你将这些配置选项每个都定义成方法,那将是一件让人无比痛苦的事情。

所以Kafka在设计该API时,就直接用了一个Properties来封装所有的配置属性。

相关文章:

11、Kafka ------ Kafka 核心API 及 生产者API 讲解

目录 Kafka核心API 及 生产者API讲解★ Kafka的核心APIKafka包含如下5类核心API&#xff1a; ★ 生产者APIKafka 的API 文档 ★ 使用生产者API发送消息 Kafka核心API 及 生产者API讲解 官方文档 ★ Kafka的核心API Kafka包含如下5类核心API&#xff1a; Producer API&#x…...

MySQL 8.3 发布, 它带来哪些新变化?

1月16号 MySQL 官方发布 8.3 创新版 和 8.0.36 长期支持版本 (该版本 没有新增功能&#xff0c;更多是修复bug )&#xff0c;本文基于 官方文档 说一下 8.3 版本带来的变化。 一 增加的特性 1.1 GTID_NEXT 支持增加 TAG 选项。 之前的版本中 GTID_NEXTUUID:number &#xff…...

【数据结构】详谈队列的顺序存储及C语言实现

循环队列及其基本操作的C语言实现 前言一、队列的顺序存储1.1 队尾指针与队头指针1.2 基本操作实现的底层逻辑1.2.1 队列的创建与销毁1.2.2 队列的增加与删除1.2.3 队列的判空与判满1.2.4 逻辑的局限性 二、循环队列2.1 循环队列的实现逻辑一2.2 循环队列的实现逻辑二2.3 循环队…...

为什么 HTTPS 协议能保障数据传输的安全性?

HTTP 协议 在谈论 HTTPS 协议之前&#xff0c;先来回顾一下 HTTP 协议的概念。 HTTP 协议介绍 HTTP 协议是一种基于文本的传输协议&#xff0c;它位于 OSI 网络模型中的应用层。 HTTP 协议是通过客户端和服务器的请求应答来进行通讯&#xff0c;目前协议由之前的 RFC 2616 拆…...

使用 Node 创建 Web 服务器

Node.js 提供了 http 模块&#xff0c;http 模块主要用于搭建 HTTP 服务端和客户端&#xff0c;使用 HTTP 服务器或客户端功能必须调用 http 模块&#xff0c;代码如下&#xff1a; var http require(http); 以下是演示一个最基本的 HTTP 服务器架构(使用 8080 端口)&#x…...

leetcode 151反转字符串如何原地去除多余空格

题目&#xff1a;https://leetcode.cn/problems/reverse-words-in-a-string/description/ 完整题解:https://leetcode.cn/problems/reverse-words-in-a-string/solutions/2611893/chu-li-kong-ge-ku-han-shu-reversefan-zhu-bioo 思路来自代码随想录&#xff0c;对其中的除去多…...

面试问题记录【深圳,共三面,A 轮公司】

问题记录 一面&#xff1a; 自我介绍项目介绍项目中用到的本地缓存是否涉及数据不一致问题&#xff0c;如何解决&#xff1f;项目中用到了 RTree 和普通的 B 树和 B树的数据结构的区别是什么&#xff1f;mysql 数据库中几种日志的用法和区别&#xff1f;redis 中缓存三兄弟存…...

Mysql数据库cpu飙升怎么解决

排查过程 &#xff08;1&#xff09;使用top命令观察&#xff0c;确定是mysql导致还是其他原因。 &#xff08;2&#xff09;如果是mysql导致的&#xff0c;show processlist&#xff0c;查看session情况&#xff0c;确定是不是有消耗资源的sql在运行。 &#xff08;3&#xf…...

PHP反序列化漏洞-POP链构造

POP链构造 POP链(Property-Oriented Programming)是一种常用于构造特定调用链的方法,用于从现有运行环境中寻找一系列代码或指令调用。它的目的是构成一组连续的调用链,最终达到攻击者恶意利用的目的。POP链实质上是通过控制对象的可控属性来控制程序的执行流程,从而利用…...

CentOS 7安装Java并配置环境

一、安装Java环境 1、检查系统是否安装Java [rootlocalhost ~]# java -version 2、更新系统软件包 [rootlocalhost ~]# yum update #遇到[y/n],选择y并回车&#xff0c;耐心等待下载完毕&#xff0c;之后系统会自动检验更新的软件包遇到 /var/run/yum.pid 已被锁定 /var/…...

Vagrant创建Oracle RAC环境示例

利用Vagrant安装Oracle RAC&#xff08;默认为non-CDB模式&#xff09;&#xff0c;生成2台虚机&#xff0c;耗时约1小时。 node1: -----------------------------------------------------------------node1: INFO: 2024-01-11 18:25:54: Make create database commandnode1: …...

鸿蒙 HarmonyOS ArkTS ArkUI 动画 中心缩放、顶部缩放、纵向缩放

EntryComponentstruct Index {State widthA: number 200State heightA: number 200onPageShow():void{animateTo ( {duration: 2000,iterations: -1,curve:Curve.Linear}, () > {this.widthA 0this.heightA 0} )}build() {Column() {// 中心缩放Column(){}.width(this.wi…...

基于python socket实现TCP/UDP通信

两个应用程序如果需要进行通讯最基本的一个前提就是能够唯一的标示一个进程&#xff0c;我们知道IP层的ip地址可以唯一标示主机&#xff0c;而TCP层协议和端口号可以唯一标示主机的一个进程&#xff0c;这样我们可以利用ip地址&#xff0b;协议&#xff0b;端口号唯一标示网络中…...

指针的运算

指针的运算 1.指针-整数 #define N_VALUES 5 float values[N_VALUES]; float* vp; //指针-整数:指针的关系运算 int main() { for (vp &values[0]; vp < &values[N_VALUES];) { *vp 0;//指针每自增一次,就是指向下一个元素的地址 } return …...

记录一次QT乱码问题

问题描述 在敲陆文周的书《QT5开发及实例》的示例代码时&#xff0c;出现乱码&#xff0c;如下图所示 具体代码如下 Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);ui->treeWidget->clear();int groupSize 2;int ite…...

怎么提升搜狗网站排名

在当今数字化时代&#xff0c;网站排名对于品牌、企业以及个人都至关重要。而对于许多网站来说&#xff0c;搜狗搜索引擎是一个重要的流量来源。为了在搜狗上取得更好的排名&#xff0c;不仅需要优化网站内容&#xff0c;还需要巧妙运用一些工具和技巧。在本文中&#xff0c;我…...

搜索经典题——填充 9*9矩阵

题目&#xff1a;给定一个九行九列矩阵&#xff0c;填充矩阵元素&#xff0c;要求&#xff1a; 1、每一行每一列&#xff0c;每个小九宫格&#xff08;图片画粗的地方就是&#xff09;不能包含相同元素 2、每一行&#xff0c;每一列&#xff0c;每个小九宫格均会完整出现1-9的数…...

Vue待办事项(组件,模块化)

//html页面代码 <!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title></title> <style> * { padding: 0; margin: 0; }…...

Vue中的组件

在应用程序的开发中&#xff0c;组件是不可缺少的。在Vue的使用中&#xff0c;同样也会用到组件。   vue组件的一般知识点&#xff1a;   1、组件的名字唯一&#xff1b;   2、组件以Html形式书写&#xff1b;   3、组件可以复用&#xff1b;   4、组件可以嵌套&…...

svg矢量图标在wpf中的使用

在wpf应用程序开发中&#xff0c;为支持图标的矢量缩放&#xff0c;及在不同分辨率下界面中图标元素的矢量无损缩放&#xff0c;所以常常用到svg图标&#xff0c;那么如果完 美的将svg图标运用到wpf日常的项目开发中呢&#xff0c;这里分享一下我的个人使用经验和详细步骤。 步…...

如何在云端加速缓存构建

缓存是指将某类数据存储起来以便以后重复使用的过程&#xff0c;它的运用在开发场景中非常普遍。类似于你习惯把最常用的调料放在厨房台面上&#xff0c;而不是橱柜里&#xff0c;这样你在准备大餐时就可以轻松取用。 但对于一个更为技术性、更精确的用例&#xff0c;比如像谷…...

JavaWeb-Cookie与Session

一、概念 是否还记得我们在HTTP概念中提到&#xff1a;HTTP的一大特点是无状态&#xff0c;这意味着多次HTTP请求之间是无法共享数据的。而在请求之间共享一些数据又是我们期望达到的效果。&#xff08;例如登录的记住我功能&#xff09;于是便有了会话跟踪技术&#xff0c;而…...

ZABBIX根据IP列表,主机描述,或IP子网批量创建主机的维护任务

有时候被ZABBIX监控的主机可能需要关机重启等维护操作,为了在此期间不触发告警,需要创建主机的维护任务,以免出现误告警 ZABBIX本身有这个API可供调用(不同版本细节略有不同,本次用的ZABBIX6.*),实现批量化建立主机的维护任务 无论哪种方式(IP列表,主机描述,或IP子网)创建维护…...

PMIS_ENT_STD

...

32 登录页组件

效果演示 实现了一个登录页面的样式&#xff0c;包括一个容器、左侧和右侧部分。左侧部分是一个背景图片&#xff0c;右侧部分是一个表单&#xff0c;包括输入框、复选框、按钮和忘记密码链接。整个页面的背景色为白色&#xff0c;容器为一个圆角矩形&#xff0c;表单为一个半透…...

Docker(一)简介和基本概念:什么是 Docker?用它会带来什么样的好处?

作者主页&#xff1a; 正函数的个人主页 文章收录专栏&#xff1a; Docker 欢迎大家点赞 &#x1f44d; 收藏 ⭐ 加关注哦&#xff01; 一、简介 本章将带领你进入 Docker 的世界。 什么是 Docker&#xff1f; 用它会带来什么样的好处&#xff1f; 好吧&#xff0c;让我们带…...

【Linux】进程的概念 进程状态 进程优先级

Content 一、什么是进程1. 进程的概念2. 进程的描述 - 进程控制块&#xff08;PCB&#xff09;3. Linux下的进程 二、进程状态1. 教科书中的进程状态运行状态阻塞状态挂起状态 2. Linux下的进程状态R&#xff08;running&#xff09;- 运行状态S&#xff08;sleeping) - 睡眠状…...

Go语言热重载和优雅地关闭程序

Go语言热重载和优雅地关闭程序 我们有时会因不同的目的去关闭服务&#xff0c;一种关闭服务是终止操作系统&#xff0c;一种关闭服务是用来更新配置。 我们希望优雅地关闭服务和通过热重载重新加载配置&#xff0c;而这两种方式可以通过信号包来完成。 1、代码实现 package…...

Python实现两个列表相加的方法汇总

1. 使用 “” 运算符 通过 “” 运算符将两个列表相加&#xff0c;得到一个新的列表。例如&#xff1a; list1 [1, 2, 3] list2 [4, 5, 6] result list1 list2 print(result) # [1, 2, 3, 4, 5, 6]2. 使用 extend 方法 使用 extend 方法将一个列表中的元素逐个添加到另…...

debian12.4配置

文章目录 debian12.4配置概述笔记将非root用户添加到sudo组更换国内源配置ssh的客户端访问END debian12.4配置 概述 在虚拟机中装了一个debian12.4, 想配置ssh客户端连接, 出了问题. 配置乱了, 还好长了个心眼, 做了快照. 发现2个问题: debian12.4默认安装完, 有ssh, 先检查…...