当前位置: 首页 > article >正文

Kafka入门-生产者

生产者

生产者发送流程:
在这里插入图片描述

延迟时间为0ms时,也就意味着每当有数据就会直接发送

异步发送API

异步发送和同步发送的不同在于:异步发送不需要等待结果,同步发送必须等待结果才能进行下一步发送。

普通异步发送

首先导入所需的kafka依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version>
</dependency>
public class CustomProducer {public static void main(String[] args) {//配置Properties properties = new Properties();//连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.27.101:9092,192.168.27.102:9092");//指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//创建Kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//异步发送数据kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first","learning Kafka-"+i));}//关闭资源kafkaProducer.close();}
}
带回调函数的异步发送

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数:元数据信息和异常信息,如果异常信息为null,说明消息发送成功,如果异常现象不为null,说明消息发送失败。

修改发送方法,采用回调

//异步发送数据,并有回调函数
kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));
for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主题: "+ recordMetadata.topic() +" 分区:"+recordMetadata.partition());}}});}

运行方法就能看到返回的主题、分区

主题: first 分区:2

同步发送

同步发送只需更改发送方式

//同步发送数据kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first","learning Kafka-"+i)).get();}

为什么要分区

  1. 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上,合理控制分区的任务,可以实现负载均衡的效果。
  2. 提高并行度,生产者可以以分区为单位发送数据,而消费者则可以以分区为单位进行消费

分区策略:

  1. 默认分区策略:

    • 如果在记录中指定了分区,那么直接使用指定的分区

      例如在send方法指定分区2,key为""

          kafkaProducer.send(new ProducerRecord<>("first",2,"", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主题: "+ recordMetadata.topic() +" 分区:"+recordMetadata.partition());}}});
      
    • 如果未指定分区但存在键key,则根据key的哈希值与topic的partition数目进行取余选择分区

      例如在send方法中不指定分区,设置key

      kafkaProducer.send(new ProducerRecord<>("first","haha", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主题: "+ recordMetadata.topic() +" 分区:"+recordMetadata.partition());}}});
      
    • 如果不存在分区也没有键key,那么使用黏性分区,会随机选择一个分区并且尽可能一直使用该分区,如果该分区batch已满或者已完成,kafka会再随机一个分区进行使用(和上一个分区不同)。

自定义分区器

首先自定义一个分区器

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//获取数据String msgValues = value.toString();int partition;//如果发送的数据包含aha字段则发送到0号分区,不包含则发往1号分区if(msgValues.contains("aha")){partition = 0;}else {partition = 1;}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

在创建Kafka对象之前设置配置,选择自定义的分区器

//关联自定义分区
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hzp.kafka.producer.MyPartitioner");

注意:如果使用自定义分区的同时,还在send方法内指定分区,那么以指定分区为准。

生产者提高吞吐量

生产者发消息就相当于用货车从本地仓库(缓冲区)送货到kafka,相关的参数有两个,一个是batch size批次大小,一个是linger.ms等待时间。batch size默认为16k,相当于货车的容量大小,如果货车装满了就发往kafka。但是通常情况下等待时间为0ms,也就是每当仓库来了一箱货就直接送到kafka,不管货车是否装满。

因此提高吞吐量主要有以下方法:

  1. 修改linger.ms,增长等待时间或者增加批次大小,让货车尽量装多一点货甚至装满再发送。(等待时间会造成一定的延迟,通常控制在5-100ms)
  2. 发送数据时,采取压缩的方式
  3. 增大缓冲区大小,缓冲区大小通常为32m。相当于增加仓库大小,让仓库能够存储更多的货物。
        //缓冲区大小(单位为kb,默认32M)1024*1024*32properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);//批次大小(单位为kb,默认16kb)1024*16properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//linger.ms (单位为ms)properties.put(ProducerConfig.LINGER_MS_CONFIG,10);//压缩 设置压缩类型为snappy,可配置的值有gzip、snappy、lz4、zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

数据可靠性

数据可靠性与ACK应答级别有关

acks:

  • 0:生产者发送过来的数据,不需要等数据落盘就应答。

    如果不等数据落盘就应答,容易造成数据丢失,生产者发送数据就不管了,可靠性差,效率高。

  • 1:生产者发送过来的数据,Leader收到数据后应答

    如果Leader接收到数据,并且应答之后,突然挂掉了,但是此时Leader还没有同步数据给其他节点,此时就造成数据丢失。生产者发送数据Leader应答,可靠性中等,效率中等。

  • -1:生产者发送的数据,Leader和ISR队列中的所有节点收齐数据后应答

    生产者发送数据需要Leader和ISR队列里面所有的Follower应答,可靠性高,效率低。

    如果Leader收到数据并且和Follower同步数据时,有一个Follower因为故障,长时间不能与Leader同步,这应该如何解决?

    解决方案:Leader维护了一个动态的in-sync replica set(ISR)也就是与Leader保持同步的Follower+Leader的集合(Leader:0,ISR:0,1,2)。如果Follower长时间未向Leader发送通信请求或者同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。这样就不用长时间等待以故障的节点。

如果分区副本为1,那么ACK应答-1和1没有区别,挂了数据就直接丢失,如果ISR里面也只有一个(Leader:0,ISR:0),那么说明没有Follower跟Leader同步,那么仍然会数据丢失。因此可以得到:数据完全可靠的条件:ACK级别设置为-1、分区副本大于等于2、ISR应答里的最小副本数量大于等于2。

通常情况下,acks=0很少使用,acks=1主要用于传输普通日志(大量但并不重要的数据),允许个别数据丢失,acks=-1一般用于传输重要的数据比如金钱这类对可靠性要求比较高的场景。

acks=-1仍然存在问题,比如现在Leader:0,ISR:0,1,2。生产者发送数据data,Leader:0接收到data后与1、2同步数据。同步数据完成之后,即将应答之前,Leader突然挂掉了,那么此时就会从1,2中选择一个成为新的Leader。假设1成为新的Leader,此时生产者没有收到应答,再次发送数据data,那么此时Leader:1就接收到了两份data数据,造成数据重复。

java设置acks,以及重试次数

//acks 设置为1
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//重试次数 默认为int的最大值
properties.put(ProducerConfig.RETRIES_CONFIG,3);

数据去重

在刚刚的数据可靠性中,我们知道怎么让数据能够完全可靠,就是让ACK级别设置为-1、分区副本大于等于2、ISR应答里的最小副本数量大于等于2。从数据传递来看,这种设置就是数据传递至少一次(At Least One);而当ACK级别设置为0,那么数据传递最多一次(At Most One)。

At Least One可以保证数据不丢失,但是不能保证数据不重复,At Most One可以保证数据不重复,但是不能保证数据不丢失。那么如果既想数据不丢失,又想数据不重复,此时就要依靠幂等性和事务。

幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条数据,保证了不重复。

重复数据的判断标准就是<PID、Partition、SqlNumber>相同的消息,Broker只会持久化一条数据。Pid标识指的是ProducerId,生产者编号,Kafka每重启一次就分配一个新的;Partiton标识分区号;SqlNumber是单调自增的,因此幂等性能够保证在单分区、单会话内不重复。

幂等性的使用只需设置enable.idempotence即可,默认为true,关闭只需设置为false。

事务

事务开启之前,必须先开启幂等性。事务底层依赖幂等性。

数据有序

Kafka单分区内有序,但是多分区时,分区与分区之间无序。

数据乱序

kafka保证数据单分区有序的条件是:

  1. 如果没有开启幂等性,那么需要设置max.in.flight.request.per.connection的值为1
  2. 如果开启幂等性,那么需要设置max.in.flight.request.per.connection的值小于等于5.

在kafka1.x版本之后当kafka启用幂等,那么kafka服务端会缓存producer发来的最近5个request的元数据,而幂等性的实现依赖单调递增的序号SqlNumber。如果发送时出现乱序,那么会根据单调递增的序号进行重排序。也就是说当开启了幂等性并且缓存的请求个数小于5,那么会在服务端进行一次重新排序,让数据有序。

相关文章:

Kafka入门-生产者

生产者 生产者发送流程&#xff1a; 延迟时间为0ms时&#xff0c;也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于&#xff1a;异步发送不需要等待结果&#xff0c;同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...

深度学习水论文:mamba+图像增强

&#x1f9c0;当前视觉领域对高效长序列建模需求激增&#xff0c;对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模&#xff0c;以及动态计算优势&#xff0c;在图像质量提升和细节恢复方面有难以替代的作用。 &#x1f9c0;因此短时间内&#xff0c;就有不…...

AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机

这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机&#xff0c;因为在使用过程中发现 Airsim 对外部监控相机的描述模糊&#xff0c;而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置&#xff0c;最后在源码示例中找到了&#xff0c;所以感…...

现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?

现有的 Redis 分布式锁库&#xff08;如 Redisson&#xff09;相比于开发者自己基于 Redis 命令&#xff08;如 SETNX, EXPIRE, DEL&#xff09;手动实现分布式锁&#xff0c;提供了巨大的便利性和健壮性。主要体现在以下几个方面&#xff1a; 原子性保证 (Atomicity)&#xff…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解

在 C/C 编程的编译和链接过程中&#xff0c;附加包含目录、附加库目录和附加依赖项是三个至关重要的设置&#xff0c;它们相互配合&#xff0c;确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中&#xff0c;这些概念容易让人混淆&#xff0c;但深入理解它们的作用和联…...

【从零学习JVM|第三篇】类的生命周期(高频面试题)

前言&#xff1a; 在Java编程中&#xff0c;类的生命周期是指类从被加载到内存中开始&#xff0c;到被卸载出内存为止的整个过程。了解类的生命周期对于理解Java程序的运行机制以及性能优化非常重要。本文会深入探寻类的生命周期&#xff0c;让读者对此有深刻印象。 目录 ​…...

Mysql8 忘记密码重置,以及问题解决

1.使用免密登录 找到配置MySQL文件&#xff0c;我的文件路径是/etc/mysql/my.cnf&#xff0c;有的人的是/etc/mysql/mysql.cnf 在里最后加入 skip-grant-tables重启MySQL服务 service mysql restartShutting down MySQL… SUCCESS! Starting MySQL… SUCCESS! 重启成功 2.登…...

C++.OpenGL (20/64)混合(Blending)

混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...

Linux 中如何提取压缩文件 ?

Linux 是一种流行的开源操作系统&#xff0c;它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间&#xff0c;使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的&#xff0c;要在 …...

纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join

纯 Java 项目&#xff08;非 SpringBoot&#xff09;集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...

Java求职者面试指南:计算机基础与源码原理深度解析

Java求职者面试指南&#xff1a;计算机基础与源码原理深度解析 第一轮提问&#xff1a;基础概念问题 1. 请解释什么是进程和线程的区别&#xff1f; 面试官&#xff1a;进程是程序的一次执行过程&#xff0c;是系统进行资源分配和调度的基本单位&#xff1b;而线程是进程中的…...

Java数值运算常见陷阱与规避方法

整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...

GitFlow 工作模式(详解)

今天再学项目的过程中遇到使用gitflow模式管理代码&#xff0c;因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存&#xff0c;无论是github还是gittee&#xff0c;都是一种基于git去保存代码的形式&#xff0c;这样保存代码…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别

【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而&#xff0c;传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案&#xff0c;能够实现大范围覆盖并远程采集数据。尽管具备这些优势&#xf…...

MFC 抛体运动模拟:常见问题解决与界面美化

在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...

云原生安全实战:API网关Kong的鉴权与限流详解

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关&#xff08;API Gateway&#xff09; API网关是微服务架构中的核心组件&#xff0c;负责统一管理所有API的流量入口。它像一座…...

【C++特殊工具与技术】优化内存分配(一):C++中的内存分配

目录 一、C 内存的基本概念​ 1.1 内存的物理与逻辑结构​ 1.2 C 程序的内存区域划分​ 二、栈内存分配​ 2.1 栈内存的特点​ 2.2 栈内存分配示例​ 三、堆内存分配​ 3.1 new和delete操作符​ 4.2 内存泄漏与悬空指针问题​ 4.3 new和delete的重载​ 四、智能指针…...

vulnyx Blogger writeup

信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面&#xff0c;gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress&#xff0c;说明目标所使用的cms是wordpress&#xff0c;访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

【JVM面试篇】高频八股汇总——类加载和类加载器

目录 1. 讲一下类加载过程&#xff1f; 2. Java创建对象的过程&#xff1f; 3. 对象的生命周期&#xff1f; 4. 类加载器有哪些&#xff1f; 5. 双亲委派模型的作用&#xff08;好处&#xff09;&#xff1f; 6. 讲一下类的加载和双亲委派原则&#xff1f; 7. 双亲委派模…...

push [特殊字符] present

push &#x1f19a; present 前言present和dismiss特点代码演示 push和pop特点代码演示 前言 在 iOS 开发中&#xff0c;push 和 present 是两种不同的视图控制器切换方式&#xff0c;它们有着显著的区别。 present和dismiss 特点 在当前控制器上方新建视图层级需要手动调用…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

LLMs 系列实操科普(1)

写在前面&#xff1a; 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容&#xff0c;原视频时长 ~130 分钟&#xff0c;以实操演示主流的一些 LLMs 的使用&#xff0c;由于涉及到实操&#xff0c;实际上并不适合以文字整理&#xff0c;但还是决定尽量整理一份笔…...

RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)

RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发&#xff0c;后来由Pivotal Software Inc.&#xff08;现为VMware子公司&#xff09;接管。RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;用 Erlang 语言编写。广泛应用于各种分布…...

DingDing机器人群消息推送

文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人&#xff0c;点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置&#xff0c;详见说明文档 成功后&#xff0c;记录Webhook 2 API文档说明 点击设置说明 查看自…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...

Selenium常用函数介绍

目录 一&#xff0c;元素定位 1.1 cssSeector 1.2 xpath 二&#xff0c;操作测试对象 三&#xff0c;窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四&#xff0c;弹窗 五&#xff0c;等待 六&#xff0c;导航 七&#xff0c;文件上传 …...

在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)

考察一般的三次多项式&#xff0c;以r为参数&#xff1a; p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]&#xff1b; 此多项式的根为&#xff1a; 尽管看起来这个多项式是特殊的&#xff0c;其实一般的三次多项式都是可以通过线性变换化为这个形式…...

华为OD机考-机房布局

import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...