当前位置: 首页 > news >正文

kafka生产者

1.原理

2.普通异步发送

引入pom:

    <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency></dependencies>
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

测试效果:

3.带回调的异步发送

回调的信息实际是从队列返回的

 

4.同步发送 

只需在异步发送的基础上,再调用一下 get()方法即可。

5.分区

6.分区策略 

指定key的值:对key的hashcode做分配

希望将订单表的数据全部发到kafka的一个分区上,怎么处理?

将该表的名称作为key值然后发送即可 
如:

7.自定义分区 (脏数据的处理)

如果研发人员可以根据企业需求,自己重新实现分区器。 1)需求 例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区, 不包含 atguigu,就发往 1 号分区。 

自定义分区器:

package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String s = value.toString();int partion;if(s.contains("atguigu")) {partion = 1;}else {partion=0;}return partion;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

 拷贝全类名,产生关联

测试结论:

8.如何让提高生产者的吞吐量

 

properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);properties.put(ProducerConfig.LINGER_MS_CONFIG,1);// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

相关文章:

kafka生产者

1.原理 2.普通异步发送 引入pom&#xff1a; <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency><dependency><g…...

前后端分离Vue+node.js在线学习考试系统gqw7o

与其它应用程序相比&#xff0c;在线学习平台的设计主要面向于学校&#xff0c;旨在为管理员和学生、教师、院系提供一个在线学习平台。学生、教师、院系可以通过系统及时查看公告信息等。 在线学习平台是在Windows操作系统下的应用平台。为防止出现兼容性及稳定性问题&#xf…...

关于el-select值的回显问题 : 框内显示label值还是value值

<el-form-item label"状态" prop""><el-selectv-model"roleForm.state"class"m-2"size"large"style"width: 240px"placeholder"请选择状态"value-key"value"//value-key 与下面的ke…...

MCU多核异构通信原理

摘要&#xff1a; 本文结合瑞萨RZ/G2L 多核处理器&#xff0c;给大家讲述一下多核异构设计及通信的原理。 随着电子技术的不断发展&#xff0c;以及市场需求的日益增长&#xff0c;嵌入式系统不仅要求执行复杂的控制任务&#xff0c;还需要实时地采集和处理数据。 为了满足这…...

在autodl搭建stable-diffusion-webui+sadTalker

本文介绍在autodl.com搭建gpu服务器&#xff0c;实现stable-diffusion-webuisadTalker功能&#xff0c;图片音频 可生成视频。 autodl租GPU 自己本地部署SD环境会遇到各种问题&#xff0c;网络问题&#xff08;比如huggingface是无法访问&#xff09;&#xff0c;所以最好的方…...

【办公类-16-10-01】“2023下学期 中4班 自主游戏观察记录(python 排班表系列)

背景需求 上学期的周安排里&#xff0c;每班每周的自主游戏会轮到多个不同的内容 因此在每周的自主游戏观察有2次记录&#xff0c;观察的项目可以写不一样的&#xff0c; 如一位老师写沙水游戏&#xff0c;另一位写表演游戏 本学期&#xff0c;中班的自主游戏全部是户外的&am…...

机器学习:SVM算法(Python)

一、核函数 kernel_func.py import numpy as npdef linear():"""线性核函数:return:"""def _linear(x_i, x_j):return np.dot(x_i, x_j)return _lineardef poly(degree3, coef01.0):"""多项式核函数:param degree: 阶次:param …...

基于yolov5的人脸口罩检测,可进行图像目标检测,也可进行视屏和摄像检测(pytorch框架)【python源码+UI界面+功能源码详解】

功能演示&#xff1a; 基于yolov5的人脸口罩检测系统&#xff0c;支持图像检测&#xff0c;视频检测和实时摄像检测功能&#xff08;pytorch框架&#xff09;_哔哩哔哩_bilibili &#xff08;一&#xff09;简介 基于yolov5的人脸口罩检测系统是在pytorch框架下实现的&#…...

2024如何恢复旧版的Chrome的主题样式

起因 chrome 更新版本之后的主题样式变成了浅紫色的页签卡样式&#xff0c;感觉很不习惯&#xff0c;也很不喜欢 如何换回旧版主题 通过主题商店&#xff0c;安装旧版本的主题 主题商店搜索下面&#xff0c;或着直接访问下面的地址 Chrome Original White Theme https://…...

【文生视频】Diffusion Transformer:OpenAI Sora 原理、Stable Diffusion 3 同源技术

文生视频 Diffusion Transformer&#xff1a;Sora 核心架构、Stable Diffusion 3 同源技术 提出背景输入输出生成流程变换器的引入Diffusion Transformer (DiT)架构Diffusion Transformer (DiT)总结 OpenAI Sora 设计思路阶段1: 数据准备和预处理阶段2: 架构设计阶段3: 输入数据…...

Redis 服务集群、哨兵、缓存及持久化的实现原理和应用场景

Redis 是一种高性能的键值存储系统&#xff0c;已经成为了许多企业和互联网公司的核心技术之一。本文将介绍 Redis 的服务集群、哨兵以及缓存实现原理和应用场景&#xff0c;以帮助读者更好地理解和使用 Redis。 引言&#xff1a; 随着互联网应用规模不断扩大&#xff0c;Redi…...

通过Redis增减库存避坑

问题&#xff1a; 先执行get获取值&#xff0c;判断符合条件再执行incr、decr操作。在临界缓存失效的情况下&#xff0c;会默认赋值当前key为永不过期的0&#xff0c;再执行加减法&#xff0c;导致程序异常。 推荐解决方案&#xff1a; 1、限制接口频率&#xff1a;先incr&…...

Windows系统搭建Elasticsearch引擎结合内网穿透实现远程连接查询数据

文章目录 系统环境1. Windows 安装Elasticsearch2. 本地访问Elasticsearch3. Windows 安装 Cpolar4. 创建Elasticsearch公网访问地址5. 远程访问Elasticsearch6. 设置固定二级子域名 Elasticsearch是一个基于Lucene库的分布式搜索和分析引擎&#xff0c;它提供了一个分布式、多…...

Java爬虫使用JSoup获取静态资源图片

import org.jsoup.Connection; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.nodes.Element; import org.jsoup.select.Elements; import java.io.FileOutputStream;/*** 获取静态图片*/public class ImageDownloader {public static void main…...

LeetCode 2433.找出前缀异或的原始数组

给你一个长度为 n 的 整数 数组 pref 。找出并返回满足下述条件且长度为 n 的数组 arr &#xff1a; pref[i] arr[0] ^ arr[1] ^ … ^ arr[i]. 注意 ^ 表示 按位异或&#xff08;bitwise-xor&#xff09;运算。 可以证明答案是 唯一 的。 示例 1&#xff1a; 输入&#xf…...

C++面试:系统网络性能评估与优化

系统网络性能评估与优化是指对计算机系统中的网络部分进行评估分析&#xff0c;并采取一系列措施来提升网络性能的能力。在面试中&#xff0c;涉及这一主题的问题可能会围绕以下几个方面展开。 网络性能评估 基于网络延迟、带宽、吞吐量等指标对网络性能进行评估。使用工具&a…...

Java适配器模式 - 灵活应对不匹配的接口

Java适配器模式 - 灵活应对不匹配的接口 引言&#xff1a; 在软件开发中&#xff0c;我们经常遇到不同系统、库或框架之间的接口不兼容问题。为了解决这些问题&#xff0c;我们可以使用适配器模式。适配器模式是一种结构型设计模式&#xff0c;它允许不兼容的接口之间进行协作…...

[ai笔记12] chatGPT技术体系梳理+本质探寻

欢迎来到文思源想的ai空间&#xff0c;这是技术老兵重学ai以及成长思考的第12篇分享&#xff01; 这周时间看了两本书&#xff0c;一本是大神斯蒂芬沃尔弗拉姆学的《这就是ChatGPT》,另外一本则是腾讯云生态解决方案高级架构师宋立恒所写的《AI制胜机器学习极简入门》&#xf…...

Elasticsearch:使用 ELSER v2 进行语义搜索

在我之前的文章 “Elasticsearch&#xff1a;使用 ELSER 进行语义搜索”&#xff0c;我们展示了如何使用 ELESR v1 来进行语义搜索。在使用 ELSER 之前&#xff0c;我们必须注意的是&#xff1a; 重要&#xff1a;虽然 ELSER V2 已正式发布&#xff0c;但 ELSER V1 仍处于 [预览…...

智慧农业之智能物流

智慧物流属于农业生产环节中的重要节点,上游为农业生产环节,下游为销售与商贸环节,因此,通过联通生产与销售环节,通过合理调配物流过程,可以实现对于农产品的快速运输与销售,减少中间环节中的无效损耗,从而实现增收节支,实实在在地解决了农产品利润偏低的问题。 生产…...

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频

使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

模型参数、模型存储精度、参数与显存

模型参数量衡量单位 M&#xff1a;百万&#xff08;Million&#xff09; B&#xff1a;十亿&#xff08;Billion&#xff09; 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的&#xff0c;但是一个参数所表示多少字节不一定&#xff0c;需要看这个参数以什么…...

JavaScript 中的 ES|QL:利用 Apache Arrow 工具

作者&#xff1a;来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗&#xff1f;了解下一期 Elasticsearch Engineer 培训的时间吧&#xff01; Elasticsearch 拥有众多新功能&#xff0c;助你为自己…...

使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装

以下是基于 vant-ui&#xff08;适配 Vue2 版本 &#xff09;实现截图中照片上传预览、删除功能&#xff0c;并封装成可复用组件的完整代码&#xff0c;包含样式和逻辑实现&#xff0c;可直接在 Vue2 项目中使用&#xff1a; 1. 封装的图片上传组件 ImageUploader.vue <te…...

Axios请求超时重发机制

Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式&#xff1a; 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...