当前位置: 首页 > news >正文

kafka学习-生产者

目录

1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

自定义序列化器

4、分区器

默认分区规则

自定义分区器

5、生产者拦截器

作用

自定义拦截器

6、生产者原理解析


1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

  • 在Kafka中保存的数据都是字节数组。
  • 消息发送前,需要将消息序列化为字节数组进行发送。
  • 生产者通过key.serializer和value.serializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Serializer接口定义序列化器。
  • Kafka已实现的序列化器有:ByteArraySerializer、ByteBufferSerializer、BytesSerializer、DoubleSerializer、FloatSerializer、IntegerSerializer、StringSerializer、LongSerializer、ShortSerializer。

自定义序列化器

实现org.apache.kafka.common.serialization.Serializer<T>接口,并实现其中的serializer方法。

@Data
public class User {private Integer userId;private String username;
}public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果数据是null,则返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}// 第一个4字节存储userId的值// 第二个4字节存储username字节数组的长度int值// 第三个length长度,存储username序列化之后的字节数组ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化数据异常");}}@Overridepublic void close() {// do nothing}
}

4、分区器

默认分区规则

KafkaProducer.partition();DefaultPartitioner.partition();

  1. 如果record提供了分区号,则使⽤record提供的分区号
  2. 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
  3. 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。

自定义分区器

实现org.apache.kafka.clients.producer.Partitioner接口,并实现其中的partition方法。

在生产者参数中通过配置partitioner.class指定自定义分区器。

/*** 自定义分区器*/
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 此处可以计算分区的数字。// 我们直接指定为2return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

5、生产者拦截器

作用

        在发送消息前,或者在执行回调逻辑前,对消息做一些定制化的处理,比如修改消息,打印消息日志等。此外,Producer允许设置多个拦截器从而形成一条拦截器链,Producer将按照指定顺序调用它们。

自定义拦截器

        自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口,并实现其中的onSend()、onAcknowledgement()、close()接口。其中:

  • onSend(ProducerRecord):Producer 确保在消息被序列化前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修 改消息所属的topic和分区,否则会影响⽬标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤, 并且通常都是在Producer回调逻辑触发之前。
  • close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。

        在生产者参数中通过配置ProducerConfig.INTERCEPTOR_CLASSES_CONFIG指定自定义拦截器。

public class Interceptor<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器---go");// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();// 添加消息头headers.add("interceptor", "interceptor".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY, VALUE>(topic, partition, timestamp, key, value, headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器---back");if (exception != null) {// 如果发生异常,记录在日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

6、生产者原理解析

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

相关文章:

kafka学习-生产者

目录 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 自定义序列化器 4、分区器 默认分区规则 自定义分区器 5、生产者拦截器 作用 自定义拦截器 6、生产者原理解析 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 在Kafka中保存的数…...

【Python】设计模式

设计模式分为三种类型&#xff0c;共23类。 创建型模式&#xff1a;单例模式、抽象工厂模式、建造者模式、工厂模式、原型模式。结构型模式&#xff1a;适配器模式、桥接模式、装饰模式、组合模式、外观模式、享元模式、代理模式。行为型模式&#xff1a;模版方法模式、命令模…...

C++ 数字

C 数字 通常&#xff0c;当我们需要用到数字时&#xff0c;我们会使用原始的数据类型&#xff0c;如 int、short、long、float 和 double 等等。这些用于数字的数据类型&#xff0c;其可能的值和数值范围&#xff0c;我们已经在 C 数据类型一章中讨论过。 C 定义数字 我们已…...

code阶段——gitgitlab安装

在code阶段&#xff0c;我们需要将不同版本的代码存储到一个仓库中&#xff0c;常见的版本控制工具就是SVN或者Git&#xff0c;这里我们采用Git作为版本控制工具&#xff0c;GitLab作为远程仓库。 Git安装 https://git-scm.com/&#xff08;傻瓜式安装&#xff09; GitLab安…...

C 风格文件输入/输出---无格式输入/输出

C 标准库的 C I/O 子集实现 C 风格流输入/输出操作。 <cstdio> 头文件提供通用文件支持并提供有窄和多字节字符输入/输出能力的函数&#xff0c;而 <cwchar>头文件提供有宽字符输入/输出能力的函数。 无格式输入/输出 从文件流获取字符 std::fgetc, std::getc …...

Spring-MVC的文件上传下载,及插件的使用(让项目开发更节省时间)

目录 一、概述 ( 1 ) 介绍 ( 2 ) 讲述 二、上传 三、下载 四、jrebel的使用 五、多文件上传 给我们带来什么收获 一、概述 ( 1 ) 介绍 Spring MVC的文件上传下载是指在Spring MVC框架中实现文件的上传和下载功能。文件上传是指将本地计算机上的文件上传到服务器端…...

算法 数据结构 递归冒泡算法 java冒泡算法 优化递归冒泡 数据结构(九)

使用递归算法实现冒泡&#xff1a; package com.nami.algorithm.study.day06;import java.util.Arrays;/*** beyond u self and trust u self.** Author: lbc* Date: 2023-09-05 15:36* email: 594599620qq.com* Description: keep coding*/ public class BubbleSort2 {// p…...

【计算机视觉 | 目标检测】目标检测常用数据集及其介绍(十五)

文章目录 一、STN PLAD (STN Power Line Assets Dataset)二、Satlas三、Street Dataset四、UAVVaste五、UDA-CH (Unsupervised Domain Adaptation on Cultural Heritage)六、USB (Universal-Scale Object Detection Benchmark)七、VEDAI (Vehicle Detection in Aerial Imagery)…...

洛谷P8814:解密 ← CSP-J 2022 复赛第2题

【题目来源】https://www.luogu.com.cn/problem/P8814https://www.acwing.com/problem/content/4732/【题目描述】 给定一个正整数 k&#xff0c;有 k 次询问&#xff0c;每次给定三个正整数 ni&#xff0c;ei&#xff0c;di&#xff0c;求两个正整数 pi&#xff0c;qi&#xf…...

Flutter实现CombineExecutor进行多个异步分组监听,监听第一个异步执行的开始和最后一个异步执行结束时机。

1.场景 我们在调用接口时&#xff0c;很多时候会同时调用多个接口&#xff0c;接口都是异步执行&#xff0c;我们很难知道调用的多个接口哪个会最后执行完成&#xff0c;我们有时候需要对最后一个接口执行完成的时机监听&#xff0c;所以基于该需求&#xff0c;设计了CombineE…...

2023 年最新Java 毕业设计选题题目参考,500道 Java 毕业设计题目,值得收藏

大家好&#xff0c;我是程序员徐师兄&#xff0c;最近有很多同学咨询&#xff0c;说毕业设计了&#xff0c;不知道选怎么题目好&#xff0c;有哪些是想需要注意的。 确实毕设选题实际上对很多同学来说一个大坑&#xff0c; 每年挖坑给自己跳的人太多太多&#xff0c;选题选得好…...

Mac电脑其他文件占用超过一大半的内存如何清理?

mac的存储空间时不时会提示内存已满&#xff0c;查看内存占用比例最大的居然是「其他文件」&#xff0c;「其他文件」是Mac无法识别的格式文件或应用插件扩展等等...如果你想要给Mac做一次彻底的磁盘空间清理&#xff0c;首当其冲可先对「其他文件」下手&#xff0c;那么我们该…...

geopandas 笔记: datasets 数据集

geopandas 自带的几个数据集 1 世界各个国家 import geopandas as gpd import pandas as pdpd.set_option(display.max_rows,None) gpd.read_file(gpd.datasets.get_path(naturalearth_lowres)) pop_est人口数量continent国家所在的大陆name国家的名称iso_a3国家的三个字母的…...

长胜证券:三大拐点共振 看好智能驾驶新一轮行情

摘要 【长胜证券&#xff1a;三大拐点共振 看好智能驾驭新一轮行情】长胜证券研报指出&#xff0c;全球共振&#xff0c;国内智驾商场正迎来三大拐点&#xff1a;1&#xff09;技能上&#xff0c;“BEV Transformer数据闭环”新架构2023年开端上车&#xff0c;使得不依靠高精地…...

AIGC专栏5——EasyPhoto AI写真照片生成器 sd-webui插件介绍、安装与使用

AIGC专栏5——EasyPhoto AI写真照片生成器 插件安装与使用 学习前言源码下载地址技术原理储备&#xff08;SD/Control/Lora&#xff09;StableDiffusionControlNetLora EasyPhoto插件简介EasyPhoto插件安装安装方式一&#xff1a;Webui界面安装 &#xff08;需要良好的网络&…...

【Python程序设计】 工厂模式【07/8】

一、说明 我们探索数据工程中使用的设计模式 - 软件设计中常见问题的可重用解决方案。 以下文章是有关 Python 数据工程系列文章的一部分&#xff0c;旨在帮助数据工程师、数据科学家、数据分析师、机器学习工程师或其他刚接触 Python 的人掌握基础知识。 迄今为止&#xff0c;…...

PHP8的多维数组-PHP8知识详解

今天分享的是php8的数组中的多维数组&#xff0c;主要内容有&#xff1a;多维数组的概念、创建和输出二维数组、创建和输出三维数组。 1、多维数组的概念 多维数组是包含一个或多个数组的数组。在多维数组中&#xff0c;主数组中的每一个元素也可以是一个数组&#xff0c;子数…...

【【STM32--28--IO引脚的复用功能】】

STM32–28–IO引脚的复用功能 STM32的IO复用功能 何为复用? 我们先了解一下何为通用 IO端口的输入或输出是由GPIO外设控制&#xff0c;我们称之为通用 复用&#xff1a; IO端口的输入或者是输出是由其他非GPIO外设控制就像经常说的USART 由 DR寄存器进行输出 STM32的IO复用功…...

CodeJock Active-X / COM v22.1.0 Crack

CodeJock Active-X / COM v22.1.0--这个支持 Unicode 啦&#xff0c; Unicode Unicode 创建专业应用程序&#xff0c;其中包含一整套高度可定制的用户界面组件&#xff0c;包括 Visual Studio 风格的对接窗格和 Office 风格的功能区、工具栏和菜单&#xff0c;为您的应用程序…...

mac通过docker搭建elasticsearch:8.9.2以及kibana:8.9.2

1.elasticsearch.yml配置修改&#xff1a; cluster.name: "docker-cluster" network.host: 0.0.0.0 http.port: 9200 #discovery.seed_hosts: ["172.17.0.2"]#----------------------- BEGIN SECURITY AUTO CONFIGURATION ----------------------- # # T…...

[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?

&#x1f9e0; 智能合约中的数据是如何在区块链中保持一致的&#xff1f; 为什么所有区块链节点都能得出相同结果&#xff1f;合约调用这么复杂&#xff0c;状态真能保持一致吗&#xff1f;本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里&#xf…...

web vue 项目 Docker化部署

Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段&#xff1a; 构建阶段&#xff08;Build Stage&#xff09;&#xff1a…...

微信小程序之bind和catch

这两个呢&#xff0c;都是绑定事件用的&#xff0c;具体使用有些小区别。 官方文档&#xff1a; 事件冒泡处理不同 bind&#xff1a;绑定的事件会向上冒泡&#xff0c;即触发当前组件的事件后&#xff0c;还会继续触发父组件的相同事件。例如&#xff0c;有一个子视图绑定了b…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程&#xff0c;它的核心机制是 Goroutine 协程、Channel 通道&#xff0c;并基于CSP&#xff08;Communicating Sequential Processes&#xff0…...

uniapp微信小程序视频实时流+pc端预览方案

方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度​WebSocket图片帧​定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐​RTMP推流​TRTC/即构SDK推流❌ 付费方案 &#xff08;部分有免费额度&#x…...

视觉slam十四讲实践部分记录——ch2、ch3

ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

音视频——I2S 协议详解

I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议&#xff0c;专门用于在数字音频设备之间传输数字音频数据。它由飞利浦&#xff08;Philips&#xff09;公司开发&#xff0c;以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换

目录 关键点 技术实现1 技术实现2 摘要&#xff1a; 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式&#xff08;自动驾驶、人工驾驶、远程驾驶、主动安全&#xff09;&#xff0c;并通过实时消息推送更新车…...

淘宝扭蛋机小程序系统开发:打造互动性强的购物平台

淘宝扭蛋机小程序系统的开发&#xff0c;旨在打造一个互动性强的购物平台&#xff0c;让用户在购物的同时&#xff0c;能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机&#xff0c;实现旋转、抽拉等动作&#xff0c;增…...