【大数据学习 | kafka】producer的参数与结构
1. producer的结构
producer:生产者
它由三个部分组成
interceptor:拦截器,能拦截到数据,处理完毕以后发送给下游,它和过滤器不同并不是丢弃数据,而是将数据处理完毕再次发送出去,这个默认是不存在的
serialiazer:序列化器,kafka中存储的数据是二进制的,所以数据必须经过序列化器进行处理,这个是必须要有的,将用户的数据转换为byte[]的工具类,其中k和v要分别指定
partitioner: 分区器,主要是控制发送的数据到topic的哪个分区中,这个默认也是存在的
record accumulator
本地缓冲累加器 默认32M
producer的数据不能直接发送到kafka集群中,因为producer和kafka集群并不在一起,远程发送的数据不是一次发送一条这样太影响发送的速度和性能,所以我们发送都是攒一批数据发一次,record accumulator就是一个本地缓冲区,producer将发送的数据放入到缓冲区中,另外一个线程会去拉取其中的数据,远程发送给kafka集群,这个异步线程会根据linger.ms和batch-size进行拉取数据。如果本地累加器中的数据达到batch-size或者是linger.ms的大小阈值就会拉取数据到kafka集群中,这个本地缓冲区不仅仅可以适配两端的效率,还可以批次形式执行任务,增加效率
batch-size 默认16KB
linger.ms 默认为0
生产者部分的整体流程
首先producer将发送的数据准备好
经过interceptor的拦截器进行处理,如果有的话
然后经过序列化器进行转换为相应的byte[]
经过partitioner分区器分类在本地的record accumulator中缓冲
sender线程会自动根据linger.ms和batch-size双指标进行管控,复制数据到kafka
2. producer的简单代码
2.1 准备:
引入maven依赖:
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>
</dependencies>
在resources文件中创建log4j.properties
log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n
2.2 生产者中的设定参数
参数 | 含义 |
---|---|
bootstrap.servers | kafka集群的地址 |
key.serializer | key的序列化器,这个序列化器必须和key的类型匹配 |
value.serializer | value的序列化器,这个序列化器必须和value的类型匹配 |
batch.size | 批次拉取大小默认是16KB |
linger.ms | 拉取的间隔时间默认为0,没有延迟 |
partitioner | 分区器存在默认值 |
interceptor | 拦截器选的 |
2.3 全部代码
public class producer_test {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");//设定集群地址pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设定两个序列化器,其中StringSerializer是系统自带的序列化器,要和数据的类型完全一致pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);//batch-size默认是16KB,参数的单位是bytepro.put(ProducerConfig.LINGER_MS_CONFIG, 0);//默认等待批次时长是0KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");//发送数据的时候有kv两个部分,但是一般k我们什么都不放,只放value的值producer.send(record);producer.close();}
}
在x-shell中观察消费的数据
相关文章:

【大数据学习 | kafka】producer的参数与结构
1. producer的结构 producer:生产者 它由三个部分组成 interceptor:拦截器,能拦截到数据,处理完毕以后发送给下游,它和过滤器不同并不是丢弃数据,而是将数据处理完毕再次发送出去,这个默认是不…...

2. 从服务器的主接口入手
Webserver 的主函数 main.cpp,完成了哪些功能? #include "config.h"int main(int argc, char *argv[]) {string user "";string passwd "";string databasename "";Config config;config.parse_arg(argc, a…...

nginx上传文件超过限制大小、响应超时、反向代理请求超时等问题解决
1、文件大小超过限制 相关配置: client_max_body_size: Syntax:client_max_body_size size;Default:client_max_body_size 1m;Context:http, server, location 2、连接超时: proxy_read_timeout: Syntax:proxy_read_timeout time;Default…...

第16课 核心函数(方法)
掌握常用的内置函数及其用法。 数学类函数:abs、divmod、max、min、pow、round、sum。 类型转换函数:bool、int、float、str、ord、chr、bin、hex、tuple、list、dict、set、enumerate、range、object。 序列操作函数:all、any、filter、m…...

【工具变量】中国制造2025试点城市数据集(2000-2023年)
数据简介:《中国制造2025》是中国ZF于2015年5月8日印发的一项战略规划,旨在加快制造业的转型升级,提升制造业的质量和效益,实现从制造大国向制造强国的转变。该规划是中国实施制造强国战略的第一个十年行动纲领,明确提…...

vscode makfile编译
MinGW-w64下载安装 为了在 Windows 上安装 GCC,您需要安装 MinGW-w64。 MinGW-w64 是一个开源项目,它为 Windows 系统提供了一个完整的 GCC 工具链,支持编译生成 32 位和 64 位的 Windows 应用程序。 访问 MinGW-w64 的主页 mingw-w64.org…...
(四)PostgreSQL数据库操作示例
删除有外键约束的表 最近做数据库练习遇到一个问题,数据库里面有一个表,存在外键约束,我想要删除,所以必须先删除这些外键约束。 查询外键约束 查找外键约束:当你需要知道某个表的外键约束及其引用关系时࿰…...

Docker-微服务项目部署
环境准备 1.微服务项目 参考:通过网盘分享的文件:wolf2w_cloud.zip 链接: https://pan.baidu.com/s/1Lr4k6LPIJ59gVNA_DgKM_Q?pwdkjxt 提取码: kjxt 前端项目:trip-mgrsite-ui,trip-website-ui,trip-wenda-ui 服务项…...
测试Bug提交报告模板
撰写测试Bug提交说明时,清晰、详细和准确是至关重要的。这有助于开发团队快速理解问题、重现Bug并修复它。以下是一个测试Bug提交说明的模板,可以根据实际情况进行调整: 测试Bug提交说明 1. Bug基本信息 Bug编号:[系统自动生成…...

MybatisPlus - 核心功能
文章目录 1.MybatisPlus实现基本的CRUD快速开始常见注解常见配置 2.使用条件构建造器构建查询和更新语句条件构造器自定义SQLService接口 官网 MybatisPlus无侵入和方便快捷. MybatisPlus不仅仅可以简化单表操作,而且还对Mybatis的功能有很多的增强。可以让我们的开…...

小柴冲刺软考中级嵌入式系统设计师系列二、嵌入式系统硬件基础知识(6)嵌入式系统总线及通信接口
目录 越努力,越幸运! flechazo 小柴冲刺软考中级嵌入式系统设计师系列总目录 一、PCI、PCI-E 等接口基本原理与结构 1、PCI (1)高速性。 (2)即插即用性。 (3)可靠性。 (4)复杂性。 (5)自动配置。 (6)共享中断。 (7)扩展性好。 (8)多路复用。…...
利用字典对归一化后的数据0误差还原
假设我对精度要求很高,高到无法容忍有任何误差,那么我先将x按照大小排序,然后归一化,用字典将归一化前后的x存储下来,在深度学习时使用归一化后的x进行处理,但是最后画图等处理时,我用字典取出归…...

HarmonyOS:UIAbility组件概述
一、概述 UIAbility组件是一种包含UI的应用组件,主要用于和用户交互。 UIAbility的设计理念: 原生支持应用组件级的跨端迁移和多端协同。支持多设备和多窗口形态。 UIAbility划分原则与建议: UIAbility组件是系统调度的基本单元,…...

12寸半导体厂说的华夫区是什么意思
1\什么是华夫板 在半导体行业中,“华夫区”通常指的是“华夫板”(Waffle Slab),这是一种特殊设计的楼板,其表面具有许多均匀分布的孔洞,这些孔洞形成了回风通道,用于电子芯片厂房等对空气洁净度有极高要求的环境。华夫板的设计和施工对于保证洁净室的功能发挥至关重要。…...

数据结构之链式结构二叉树的实现(进阶版)
本篇文章主要讲解链式二叉树的层序遍历以及判断是否为一棵完全二叉树 二者将会用到之前学过的队列知识,是将队列和二叉树的整合 一、如何将之前已经写好的文件加入当前的编译界面 如图所示,打开我们需要加入文件所在的文件夹,找到我们要加…...
【高等数学】3-2多元函数积分学
1. 二重积分 可以想象你有一块不规则的平面薄板,它在一个平面区域上。二重积分就是用来求这个薄板的质量(假设薄板的面密度函数是)。 把区域划分成许多非常小的小方块(类似于把一块地划分成很多小格子),在每个小方块上,密度近似看成是一个常数,然后把每个小方块的质量加…...

【传知代码】智慧医疗:纹理特征VS卷积特征
🍑个人主页:Jupiter. 🚀 所属专栏:传知代码 欢迎大家点赞收藏评论😊 目录 论文概述纹理特征和深度卷积特征算法流程数据预处理方法纹理特征提取深度卷积特征提取分类网络搭建代码复现BLS_Model.py文件——分类器搭建py…...

Python-创建并调用自定义文件中的模块/函数
背景:在Python编程中,我们常常需要创建自己的专属文件,以便帮助我们更高效,快捷地完成任务。那么在Python中我们怎么创建并调用自己文件中的模块/函数呢? 在Python中调用自定义文件,通常是指调用自己编写的Python模块…...

Kali Linux
起源与背景 Kali Linux是一个基于Debian的开源Linux发行版,专门为信息安全工作者和渗透测试员设计。它是由Offensive Security Ltd.开发和维护的,作为BackTrack的继承者而诞生。BackTrack是一个流行的安全测试发行版,但为了提供更好的支持和…...

DiffusionDet: Diffusion Model for Object Detection—用于对象检测的扩散模型论文解析
DiffusionDet: Diffusion Model for Object Detection—用于对象检测的扩散模型论文解析 这是一篇发表在CVPR 2023的一篇论文,因为自己本身的研究方向是目标跟踪,之前看了一点使用扩散模型进行多跟踪的论文,里面提到了DiffusionDet因此学习一…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...

STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...

高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

Linux-07 ubuntu 的 chrome 启动不了
文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了,报错如下四、启动不了,解决如下 总结 问题原因 在应用中可以看到chrome,但是打不开(说明:原来的ubuntu系统出问题了,这个是备用的硬盘&a…...

BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践
6月5日,2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席,并作《智能体在安全领域的应用实践》主题演讲,分享了在智能体在安全领域的突破性实践。他指出,百度通过将安全能力…...
06 Deep learning神经网络编程基础 激活函数 --吴恩达
深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...

ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...