KAFKA第二课之生产者(面试重点)
生产者学习
1.1 生产者消息发送流程
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
生产者如何发送的?
现在Main线程中将数据进行处理,处理成IO型数据,然后调用sender进行发送
Main:
1.读取生产者配置
2.产生数据
3.过滤数据(校验什么的)
4.序列化
5.放入缓冲区 RecordAccumulator
6.发送Sender
细节: 考虑的问题 1.生产者配置的读取和修改 2.数据的过滤与分区, 3.缓冲区是如何设置的,大小
4.发送(发送失败怎么样,请求区的大小)
这里注意一下,可以在缓冲区对数据进行压缩,这样就提高缓冲区的容量和发送的数据量,提高吞吐量
1.2 同步发送与异步发送
1.什么是同步和异步
同步就是,串行,一条龙 异步 一起运行
举例: 餐馆点餐
同步: 需要等服务员过来,让服务员记录,
异步: 点餐APP直接点餐,交给队列,让他自己运行
2.发送的同步异步
同步:需要得到返回值
异步:发送过去不管了
3. 分区好处
啥是分区?
将一个数据块分成多个数据块
将数据分布式处理了
存储: 可以分在多个机器上, 也可以整多个副本。便于存储,同时提高健壮性
IO:多个数据块可以同时进行发送接收消费。生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费
4. 默认分区器
前提条件: 1.分区 2.key值
规则:
- 1存在,按1分区
- 1不存在,按2.key值对分区数取余得到的值分区
- 1.2都不存在 随机选个分区,等这个批次发送完了,再换
3 就是粘性分区
那么粘性分区的缺点是什么?
因为缓冲区溢出的条件是,大小和时间双重判断,如果大小不够,但是时间够了,还是会发走,这样,最后导致,分区上产生数据倾斜
如何解决的?
3.3.1 Kafka去掉粘性分区的时间控制,批次只由大小判断
1.3.自定义分区器
1.思路
- 1.实现接口Parititoner,重写相关方法
- 2.修改配置 将partitioner设置为默认配置
2.1 自定义分区器代码
public class MyPartitioner implements Partitioner {// 自定义分区器 实现partitioner接口// 1.分区方法@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String data = value.toString();// 创建partition 作为最后的分区标识int partitions;// 分区逻辑// 根据含有的字符串进行判断 判断进入哪个分区if (data.contains("atguigu")){partitions = 0;} else if (data.contains("shangguigu")){partitions = 1;} else {partitions = 2;}return partitions;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
2.2 主类
package com.atguigu.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class ProducerClientAsync {public static void main(String[] args) {// 0 配置对象Properties properties = new Properties();// --指定kafka的Broker地址properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");// -- 1.指定序列化器 序列化器的全限定类名properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//.setProperty(ProducerConfig.LINGER_MS_CONFIG,"0");// -- 2.设置分区器properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());// -- 3.获取客户端连接对象KafkaProducer<String,String> kafkaProducer= new KafkaProducer<String,String>(properties);// key是主题 v是发送内容 这里注意一下// -- 4.发送数据String[] str= {"atguigu","111","atguigu","shangguigu","222"};for (int i =0; i < str.length; i++) {System.out.println(str[i]);try {kafkaProducer.send(new ProducerRecord<>("first", str[i]), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition());}else {// 出现异常打印exception.printStackTrace();}}}).get();} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}}kafkaProducer.close();}
}

3.面试细节
1.如何提高生产者的吞吐量
- 批次大小调到16
- 将等待时间改成50-100ms 默认是0
- 压缩数据量,这样每次发送的数据就多了
- 加大缓冲区大小,进来的数据变多,发送也能提上去
2.生产者如何保证数据可靠性的
主要通过ack机制
1.什么是ACK机制?
根据ack值来决定Kafka集群服务端的存储应答
- ack=0 最低 生产者只管发送,不用接收
- ack=1 中等 生产者发送完需要等待Leader保存后回应,
- ack=-1 最高 生产者发送完需要等待所有副本保存后回应
2.分析ACK机制
性能与安全是成反比的
所以,-1虽然最安全,但是效率最低
3.如果将ACK调到-1会出现什么问题?
有可能出现数据重复发送与接收
比如,在同步的瞬间,Leader死掉,但是其他副本已经落盘,这时候,就是问题了。
因为Leader死掉了,所以会直接更换Leader,选出一个副本作为Leader,注意,这时显示没有收到内容,所以,send重新发送,这时候,每个副本上,收到的就是2份该数据了。
4.应用场景
acks=0 几乎不用
acks=1 传输普通日志,允许丢失
acks=-1 传输高可靠性数据,一般与钱有关
5.ACK=-1一定可靠么?
不一定
如果分区副本数设置为1 ,或者ISR里应答的最小副本数设置为1(默认也是1),这时候,ack=1效果相同了。
也就是说,应答一个,就能走,就没意义了
所以需要完全可靠就需要配置一下
ACK=-1 & 分区副本大于等于2 & ISR应答最小副本数量大于等于2
3. 数据去重
1.概念
至少一次:一次或者多次 完全可靠

最多一次:直接不管回复只管发送 ack=0
至少:保证数据不丢失,但是无法保证数据不重复
最多: 无法保证数据不丢失
1.如何解决数据的重复发送与接收的问题,同时保证数据的不丢失
注意,这里解决的是sender和服务端的重复发送与接收,而不是生产者本身发送多个重复消息的问题,这个要搞清楚。
一般重复问题,都是通过标识来判别,从而去重的
Kafka 0.11 引入 幂等性和事务
精确一次: 幂等性 +至少一次(ack=-1 & 分区副本>=2 & ISR最小副本>=2)
4.幂等性
1.概念
啥是幂等性,标识一个消息的唯一标识
<pid,partition,Seqnumber>
Pid 是会话ID,每次重新生成会话,就会重新生成PID
partition是分区 标识 消息是哪个分区的
Seqnumber是单调递增的标识,注意,这是每个分区独享的
这三个在一起,才是唯一标识。
2.如何使用幂等性
开启参数enable.idempotence 默认为true,false关闭。
开启开关就行
相关文章:
KAFKA第二课之生产者(面试重点)
生产者学习 1.1 生产者消息发送流程 在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到K…...
Mybatis 源码 ∞ :杂七杂八
文章目录 一、前言二、TypeHandler三、KeyGenerator四、Plugin1 Interceptor2 org.apache.ibatis.plugin.Plugin3. 调用场景 五、Mybatis 嵌套映射 BUG1. 示例2. 原因3. 解决方案 六、discriminator 标签七、其他1. RowBounds2. ResultHandler3. MapKey 一、前言 Mybatis 官网…...
堆的实现以及应用
💓博主个人主页:不是笨小孩👀 ⏩专栏分类:数据结构与算法👀 刷题专栏👀 C语言👀 🚚代码仓库:笨小孩的代码库👀 ⏩社区:不是笨小孩👀 🌹欢迎大家三连关注&…...
MySql011——检索数据:过滤数据(使用正则表达式)
前提:使用《MySql006——检索数据:基础select语句》中创建的products表 一、正则表达式介绍 关于正则表达式的介绍大家可以看我的这一篇博客《Java038——正则表达式》,这里就不再累赘。 二、使用MySQL正则表达式 2.1、基本字符匹配 检索…...
数据结构与算法-栈(LIFO)(经典面试题)
一:面试经典 1. 如何设计一个括号匹配的功能?比如给你一串括号让你判断是否符合我们的括号原则, 栈 力扣 2. 如何设计一个浏览器的前进和后退功能? 思想:两个栈,一个栈存放前进栈&…...
NSI45030AT1G LED驱动器方案为汽车外部及内部照明恒流稳流器(CCR)方案
关于线性恒流调节器(CCR):是一种用于控制电流的稳定输出。它通常由一个功率晶体管和一个参考电流源组成。CCR的工作原理是通过不断调节功率晶体管的导通时间来维持输出电流的恒定。当输出电流超过设定值时,CCR会减少功率晶体管的导…...
uni-app中使用pinia
目录 Pinia 是什么? uni-app 使用Pinia main.js 中引用pinia 创建和注册模块 定义pinia方式 选项options方式 定义pinia 页面中使用 pinia选项options方式 函数方式 定义pinia 页面中使用 函数方式 定义的pinia Pinia 是什么? Pinia࿰…...
Spring之事务管理
文章目录 前言一、事务及其参数含义1.事务的四个特性2.事务的传播行为(propagation)3.事务隔离性4.事务的隔离级别(ioslation)5.timeout(超时)6.readOnly(是否只读)7.rollbackFor&am…...
linux常见的mysql问题
当涉及到MySQL在Linux系统上的常见问题时,以下是10个经常遇到的问题及其解答: 无法连接到MySQL服务器。 确保MySQL服务器正在运行:可以使用systemctl status mysql或service mysql status命令检查MySQL服务状态。确保MySQL服务器网络设置正确…...
常见分辨率时序信息
分辨率列表 分辨率一:640x480(逐行) 分辨率二:800x600(逐行) 分辨率三:1024x768(逐行) 分辨率四:大名鼎鼎720P(逐行) 注:选择720P@30帧的,需拉长HOR TOTAL TIME 分辨率五:1280x800(逐行) 分辨率六:1280x960(逐行...
机器人CPP编程基础-05完结The End
非常不可思议……之前四篇博文竟然有超过100的阅读量…… 此文此部分终结,没有继续写下去的必要了。 插入一个分享: 编程基础不重要了,只要明确需求,借助AI工具就能完成一个项目。 当然也不是一次成功,工具使用也需要…...
数据库应用系统DBAS功能设计与实施(三级数据库)
目录 一、了解软件体系结构及设计过程 1、软件体系结构与设计过程 2、软件设计过程 二、了解DBAS总体设计 1、DBAS体系结构设计 2、软件体系结构设计 3、软硬件选型与配置设计 4、业务规则初步设计 三、了解DBAS功能概要设计 1、表示层概要设计 2、业务逻辑层概要设计…...
快速幂典型
题目描述 求 a 乘 b 对 p 取模的值,其中 1≤a,b,p≤1018。 输入描述: 第一行a,第二行b,第三行p。 输出描述: 一个整数,表示abmodp的值。 示例1 输入 2 3 9 输出 6 #include<bits/stdc.h> using namespace std; t…...
计算机竞赛 python+opencv+机器学习车牌识别
0 前言 🔥 优质竞赛项目系列,今天要分享的是 🚩 基于机器学习的车牌识别系统 🥇学长这里给一个题目综合评分(每项满分5分) 难度系数:4分工作量:4分创新点:3分 该项目较为新颖,适…...
解决电脑声音正常但就是某些游戏没声音问题
电脑声音正常,玩普遍游戏也正常,就有游戏不出声音 详细介绍经过,不喜欢的请直接跳 第三部分。 一、先说下起因现象。 1 大富翁11 没声音。 前段时间无聊怀旧就买了个大富翁11玩玩,近二十年前的老台式机正常无问题。后来想在性能…...
【UniApp开发小程序】小程序首页(展示商品、商品搜索、商品分类搜索)【后端基于若依管理系统开发】
文章目录 界面效果界面实现工具js页面首页让文字只显示两行路由跳转传递对象将商品分为两列显示使用中划线划掉原价 后端商品controllerservicemappersql 界面效果 【说明】 界面中商品的图片来源于闲鱼,若侵权请联系删除关于商品分类页面的实现,请在我…...
Redis 持久化及集群架构
Redis 持久化及集群架构 本篇技术博文将深入探讨 Redis 持久化机制的原理、配置和使用方式。我们将介绍两种常用的持久化方式:RDB 持久化和 AOF 持久化。您将了解到它们的工作原理、优缺点以及如何根据需求选择合适的持久化方式。 通过深入学习 Redis 持久化及集群…...
FPGA + WS2812采灯控制
文章目录 一、WS2812C-2020-V11、产品概述2、引出端排列及功能3、数据传输时间4、数据传输方法 二、使用WS2812C显示图片1、静态显示2、动态显示 一、WS2812C-2020-V1 1、产品概述 WS2812C-2020-V1是一个集控制电路与发光电路于一体的智能外控LED光源;其外型采用最…...
【视频】使用OBS将MP4推流至腾讯云直播
1、下载OBS OBS官网:https://obsproject.com/ OBS支持Win、Mac、Linux,如果下载速度很慢,建议使用迅雷下载 2、OBS推流设置 2.1 添加场景 默认会有一个“场景”,如果想继续添加可以点击“+”按钮 2.2 添加媒体源 1)点击“来源”窗口中“+”按钮 2)支持的媒体源如…...
Vue基本知识
一、vue入门 Vue为前端的框架,免除了原生js的DOM操作。简化书写。 基于MVVM的思想,实现数据的双向绑定,使编程的重点放在数据上。 1、引入vue.js文件 2、定义vue核心对象,定义数据模型 3、编写视图 //1、引入vue.js <scr…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...
Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
iOS性能调优实战:借助克魔(KeyMob)与常用工具深度洞察App瓶颈
在日常iOS开发过程中,性能问题往往是最令人头疼的一类Bug。尤其是在App上线前的压测阶段或是处理用户反馈的高发期,开发者往往需要面对卡顿、崩溃、能耗异常、日志混乱等一系列问题。这些问题表面上看似偶发,但背后往往隐藏着系统资源调度不当…...
搭建DNS域名解析服务器(正向解析资源文件)
正向解析资源文件 1)准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2)服务端安装软件:bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...
【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)
LeetCode 3309. 连接二进制表示可形成的最大数值(中等) 题目描述解题思路Java代码 题目描述 题目链接:LeetCode 3309. 连接二进制表示可形成的最大数值(中等) 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...
django blank 与 null的区别
1.blank blank控制表单验证时是否允许字段为空 2.null null控制数据库层面是否为空 但是,要注意以下几点: Django的表单验证与null无关:null参数控制的是数据库层面字段是否可以为NULL,而blank参数控制的是Django表单验证时字…...
论文阅读:LLM4Drive: A Survey of Large Language Models for Autonomous Driving
地址:LLM4Drive: A Survey of Large Language Models for Autonomous Driving 摘要翻译 自动驾驶技术作为推动交通和城市出行变革的催化剂,正从基于规则的系统向数据驱动策略转变。传统的模块化系统受限于级联模块间的累积误差和缺乏灵活性的预设规则。…...
