【大数据学习 | kafka】producer的参数与结构
1. producer的结构

producer:生产者
它由三个部分组成
interceptor:拦截器,能拦截到数据,处理完毕以后发送给下游,它和过滤器不同并不是丢弃数据,而是将数据处理完毕再次发送出去,这个默认是不存在的
serialiazer:序列化器,kafka中存储的数据是二进制的,所以数据必须经过序列化器进行处理,这个是必须要有的,将用户的数据转换为byte[]的工具类,其中k和v要分别指定
partitioner: 分区器,主要是控制发送的数据到topic的哪个分区中,这个默认也是存在的
record accumulator
本地缓冲累加器 默认32M
producer的数据不能直接发送到kafka集群中,因为producer和kafka集群并不在一起,远程发送的数据不是一次发送一条这样太影响发送的速度和性能,所以我们发送都是攒一批数据发一次,record accumulator就是一个本地缓冲区,producer将发送的数据放入到缓冲区中,另外一个线程会去拉取其中的数据,远程发送给kafka集群,这个异步线程会根据linger.ms和batch-size进行拉取数据。如果本地累加器中的数据达到batch-size或者是linger.ms的大小阈值就会拉取数据到kafka集群中,这个本地缓冲区不仅仅可以适配两端的效率,还可以批次形式执行任务,增加效率
batch-size 默认16KB
linger.ms 默认为0
生产者部分的整体流程
首先producer将发送的数据准备好
经过interceptor的拦截器进行处理,如果有的话
然后经过序列化器进行转换为相应的byte[]
经过partitioner分区器分类在本地的record accumulator中缓冲
sender线程会自动根据linger.ms和batch-size双指标进行管控,复制数据到kafka
2. producer的简单代码
2.1 准备:
引入maven依赖:
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>
</dependencies>
在resources文件中创建log4j.properties
log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n
2.2 生产者中的设定参数
| 参数 | 含义 |
|---|---|
| bootstrap.servers | kafka集群的地址 |
| key.serializer | key的序列化器,这个序列化器必须和key的类型匹配 |
| value.serializer | value的序列化器,这个序列化器必须和value的类型匹配 |
| batch.size | 批次拉取大小默认是16KB |
| linger.ms | 拉取的间隔时间默认为0,没有延迟 |
| partitioner | 分区器存在默认值 |
| interceptor | 拦截器选的 |
2.3 全部代码
public class producer_test {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");//设定集群地址pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设定两个序列化器,其中StringSerializer是系统自带的序列化器,要和数据的类型完全一致pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);//batch-size默认是16KB,参数的单位是bytepro.put(ProducerConfig.LINGER_MS_CONFIG, 0);//默认等待批次时长是0KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");//发送数据的时候有kv两个部分,但是一般k我们什么都不放,只放value的值producer.send(record);producer.close();}
}
在x-shell中观察消费的数据

相关文章:
【大数据学习 | kafka】producer的参数与结构
1. producer的结构 producer:生产者 它由三个部分组成 interceptor:拦截器,能拦截到数据,处理完毕以后发送给下游,它和过滤器不同并不是丢弃数据,而是将数据处理完毕再次发送出去,这个默认是不…...
2. 从服务器的主接口入手
Webserver 的主函数 main.cpp,完成了哪些功能? #include "config.h"int main(int argc, char *argv[]) {string user "";string passwd "";string databasename "";Config config;config.parse_arg(argc, a…...
nginx上传文件超过限制大小、响应超时、反向代理请求超时等问题解决
1、文件大小超过限制 相关配置: client_max_body_size: Syntax:client_max_body_size size;Default:client_max_body_size 1m;Context:http, server, location 2、连接超时: proxy_read_timeout: Syntax:proxy_read_timeout time;Default…...
第16课 核心函数(方法)
掌握常用的内置函数及其用法。 数学类函数:abs、divmod、max、min、pow、round、sum。 类型转换函数:bool、int、float、str、ord、chr、bin、hex、tuple、list、dict、set、enumerate、range、object。 序列操作函数:all、any、filter、m…...
【工具变量】中国制造2025试点城市数据集(2000-2023年)
数据简介:《中国制造2025》是中国ZF于2015年5月8日印发的一项战略规划,旨在加快制造业的转型升级,提升制造业的质量和效益,实现从制造大国向制造强国的转变。该规划是中国实施制造强国战略的第一个十年行动纲领,明确提…...
vscode makfile编译
MinGW-w64下载安装 为了在 Windows 上安装 GCC,您需要安装 MinGW-w64。 MinGW-w64 是一个开源项目,它为 Windows 系统提供了一个完整的 GCC 工具链,支持编译生成 32 位和 64 位的 Windows 应用程序。 访问 MinGW-w64 的主页 mingw-w64.org…...
(四)PostgreSQL数据库操作示例
删除有外键约束的表 最近做数据库练习遇到一个问题,数据库里面有一个表,存在外键约束,我想要删除,所以必须先删除这些外键约束。 查询外键约束 查找外键约束:当你需要知道某个表的外键约束及其引用关系时࿰…...
Docker-微服务项目部署
环境准备 1.微服务项目 参考:通过网盘分享的文件:wolf2w_cloud.zip 链接: https://pan.baidu.com/s/1Lr4k6LPIJ59gVNA_DgKM_Q?pwdkjxt 提取码: kjxt 前端项目:trip-mgrsite-ui,trip-website-ui,trip-wenda-ui 服务项…...
测试Bug提交报告模板
撰写测试Bug提交说明时,清晰、详细和准确是至关重要的。这有助于开发团队快速理解问题、重现Bug并修复它。以下是一个测试Bug提交说明的模板,可以根据实际情况进行调整: 测试Bug提交说明 1. Bug基本信息 Bug编号:[系统自动生成…...
MybatisPlus - 核心功能
文章目录 1.MybatisPlus实现基本的CRUD快速开始常见注解常见配置 2.使用条件构建造器构建查询和更新语句条件构造器自定义SQLService接口 官网 MybatisPlus无侵入和方便快捷. MybatisPlus不仅仅可以简化单表操作,而且还对Mybatis的功能有很多的增强。可以让我们的开…...
小柴冲刺软考中级嵌入式系统设计师系列二、嵌入式系统硬件基础知识(6)嵌入式系统总线及通信接口
目录 越努力,越幸运! flechazo 小柴冲刺软考中级嵌入式系统设计师系列总目录 一、PCI、PCI-E 等接口基本原理与结构 1、PCI (1)高速性。 (2)即插即用性。 (3)可靠性。 (4)复杂性。 (5)自动配置。 (6)共享中断。 (7)扩展性好。 (8)多路复用。…...
利用字典对归一化后的数据0误差还原
假设我对精度要求很高,高到无法容忍有任何误差,那么我先将x按照大小排序,然后归一化,用字典将归一化前后的x存储下来,在深度学习时使用归一化后的x进行处理,但是最后画图等处理时,我用字典取出归…...
HarmonyOS:UIAbility组件概述
一、概述 UIAbility组件是一种包含UI的应用组件,主要用于和用户交互。 UIAbility的设计理念: 原生支持应用组件级的跨端迁移和多端协同。支持多设备和多窗口形态。 UIAbility划分原则与建议: UIAbility组件是系统调度的基本单元,…...
12寸半导体厂说的华夫区是什么意思
1\什么是华夫板 在半导体行业中,“华夫区”通常指的是“华夫板”(Waffle Slab),这是一种特殊设计的楼板,其表面具有许多均匀分布的孔洞,这些孔洞形成了回风通道,用于电子芯片厂房等对空气洁净度有极高要求的环境。华夫板的设计和施工对于保证洁净室的功能发挥至关重要。…...
数据结构之链式结构二叉树的实现(进阶版)
本篇文章主要讲解链式二叉树的层序遍历以及判断是否为一棵完全二叉树 二者将会用到之前学过的队列知识,是将队列和二叉树的整合 一、如何将之前已经写好的文件加入当前的编译界面 如图所示,打开我们需要加入文件所在的文件夹,找到我们要加…...
【高等数学】3-2多元函数积分学
1. 二重积分 可以想象你有一块不规则的平面薄板,它在一个平面区域上。二重积分就是用来求这个薄板的质量(假设薄板的面密度函数是)。 把区域划分成许多非常小的小方块(类似于把一块地划分成很多小格子),在每个小方块上,密度近似看成是一个常数,然后把每个小方块的质量加…...
【传知代码】智慧医疗:纹理特征VS卷积特征
🍑个人主页:Jupiter. 🚀 所属专栏:传知代码 欢迎大家点赞收藏评论😊 目录 论文概述纹理特征和深度卷积特征算法流程数据预处理方法纹理特征提取深度卷积特征提取分类网络搭建代码复现BLS_Model.py文件——分类器搭建py…...
Python-创建并调用自定义文件中的模块/函数
背景:在Python编程中,我们常常需要创建自己的专属文件,以便帮助我们更高效,快捷地完成任务。那么在Python中我们怎么创建并调用自己文件中的模块/函数呢? 在Python中调用自定义文件,通常是指调用自己编写的Python模块…...
Kali Linux
起源与背景 Kali Linux是一个基于Debian的开源Linux发行版,专门为信息安全工作者和渗透测试员设计。它是由Offensive Security Ltd.开发和维护的,作为BackTrack的继承者而诞生。BackTrack是一个流行的安全测试发行版,但为了提供更好的支持和…...
DiffusionDet: Diffusion Model for Object Detection—用于对象检测的扩散模型论文解析
DiffusionDet: Diffusion Model for Object Detection—用于对象检测的扩散模型论文解析 这是一篇发表在CVPR 2023的一篇论文,因为自己本身的研究方向是目标跟踪,之前看了一点使用扩散模型进行多跟踪的论文,里面提到了DiffusionDet因此学习一…...
使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
<6>-MySQL表的增删查改
目录 一,create(创建表) 二,retrieve(查询表) 1,select列 2,where条件 三,update(更新表) 四,delete(删除表…...
MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
大模型多显卡多服务器并行计算方法与实践指南
一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
Fabric V2.5 通用溯源系统——增加图片上传与下载功能
fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...
