当前位置: 首页 > article >正文

Kafka自定义分区机制

文章目录

  • 1.如何自定义分区机制
  • 2.示例


1.如何自定义分区机制

若需要使用自定义分区机制,需要完成两件事:
1)在 producer 程序中创建一个类,实现 org.apache.kafka.clients.producer.Partitioner 接口主要分区逻辑在 Partitioner.partition中实现。
2)在用于构造KafkaProducer的Properties对象中设置 partitioner.class 参数。

2.示例

假设我们的消息中有一些消息是用于审计功能的,这类消息的 key 会被固定地分配一个字符串“audit”。我们想要让这类消息发送到 topic 的最后一个分区上,便于后续统一处理,而对于相同 topic 下的其他消息则采用随机发送的策略发送到其他分区上。那么现在就可以这样来实现自定义的分区策略,如下列代码所示:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class AuditPartitioner implements Partitioner {private Random random;@Overridepublic void configure(Map<String, ?> map) {//该方法实现必要资源的初始化工作random= new Random();}@Overridepublic int partition(String topic, Object keyObj, byte[] keyBytes, Object valueObj, byte[] valueBytes, Cluster cluster) {String key=(String)keyObj;//从集群元数据中把属于该topic的所有分区信息都读取出供分区策略使用List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);int partitionCount =partitionInfoList.size();int auditPartition=partitionCount-1;return key == null|| key.isEmpty()|| !key.contains ("audit")?random.nextInt(partitionCount-1):auditPartition;}@Overridepublic void close() {//该方法实现必要资源的清理工作}
}

创建好自定义分区策略类后,在构建KafkaProducer 之前为Properties增加该属性;代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");//必须指定props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("acks", "-1");props.put("retries", 3);props.put("batch.size", 323840);props.put("linger.ms", 10);props.put("buffer.memory", 33554432);props.put("max.block.ms", 3000);props.put("partitioner.class","com.exm.collectcodenew.kafka.producer.custompartitioner.AuditPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);ProducerRecord nonKeyRecord = new ProducerRecord("topic-test","non-key record");ProducerRecord auditRecord = new ProducerRecord("topic-test", "audit","audit record");ProducerRecord nonAuditRecord =new ProducerRecord("topic-test","other","non-sudit record");producer.send(nonKeyRecord).get();producer.send(nonAuditRecord).get();producer.send(auditRecord).get();producer.send(nonKeyRecord).get();producer.send(nonAuditRecord).get();producer.close();}
}

相关文章:

Kafka自定义分区机制

文章目录 1.如何自定义分区机制2.示例 1.如何自定义分区机制 若需要使用自定义分区机制&#xff0c;需要完成两件事&#xff1a; 1)在 producer 程序中创建一个类&#xff0c;实现 org.apache.kafka.clients.producer.Partitioner 接口主要分区逻辑在 Partitioner.partition中…...

【HarmonyOS NEXT】关键资产存储开发案例

在 iOS 开发中 Keychain 是一个非常安全的存储系统&#xff0c;用于保存敏感信息&#xff0c;如密码、证书、密钥等。与文件系统不同&#xff0c;Keychain 提供了更高的安全性&#xff0c;因为它对数据进行了加密&#xff0c;并且只有经过授权的应用程序才能访问存储的数据。那…...

强化学习(赵世钰版)-学习笔记(9.策略梯度法)

本章是课程的导数第二章&#xff0c;旨在讲解策略的函数化形式。 之前的方法&#xff0c;描述一个策略都是用表格的形式&#xff0c;每一行代表一个状态&#xff0c;每一列代表一个行为&#xff0c;表格中的元素对应相关状态下执行相关行为的概率。 函数化的策略表征形式是指&a…...

ModuleNotFoundError: No module named ‘flask‘ 错误

要解决 ModuleNotFoundError: No module named ‘flask’ 错误&#xff0c;需确保已正确安装 Flask 库。以下是详细步骤&#xff1a; ‌1. 安装 Flask‌ 在终端或命令行中执行以下命令&#xff08;注意权限问题&#xff09;&#xff1a; 使用 pip 安装 pip install flask 若…...

【c++】【STL】unordered_set 底层实现(简略版)

【c】【STL】unordered_set 底层实现&#xff08;简略版&#xff09; ps:这个是我自己看的不保证正确&#xff0c;觉得太长的后面会总结整个调用逻辑 unordered_set 内部实现 template <class _Kty, class _Hasher hash<_Kty>, class _Keyeq equal_to<_Kty>…...

【Zephyr】【一】学习笔记

Zephyr RTOS 示例代码集 1. 基础示例 1.0 基础配置 每个示例都需要一个 prj.conf 文件来配置项目。以下是各个示例所需的配置&#xff1a; 基础示例 prj.conf # 控制台输出 CONFIG_PRINTKy CONFIG_SERIALy CONFIG_UART_CONSOLEy# 日志系统 CONFIG_LOGy CONFIG_LOG_DEFAULT…...

网络安全设备配置与管理-实验4-防火墙AAA服务配置

实验4-p118防火墙AAA服务配置 从这个实验开始&#xff0c;每一个实验都是长篇大论&#x1f613; 不过有好兄弟会替我出手 注意&#xff1a;1. gns3.exe必须以管理员身份打开&#xff0c;否则ping不通虚拟机。 win10虚拟机无法做本次实验&#xff0c;必须用学校给的虚拟机。首…...

后端框架模块化

后端框架的模块化设计旨在简化开发流程、提高可维护性&#xff0c;并通过分层解耦降低复杂性。以下是常见的后端模块及其在不同语言&#xff08;Node.js、Java、Python&#xff09;中的实现方式&#xff1a; 目录 1. 路由&#xff08;Routing&#xff09;2. 中间件&#xff08;…...

【论文阅读】Contrastive Clustering Learning for Multi-Behavior Recommendation

论文地址&#xff1a;Contrastive Clustering Learning for Multi-Behavior Recommendation | ACM Transactions on Information Systems 摘要 近年来&#xff0c;多行为推荐模型取得了显著成功。然而&#xff0c;许多模型未充分考虑不同行为之间的共性与差异性&#xff0c;以…...

视频转音频, 音频转文字

Ubuntu 24 环境准备 # 系统级依赖 sudo apt update && sudo apt install -y ffmpeg python3-venv git build-essential python3-dev# Python虚拟环境 python3 -m venv ~/ai_summary source ~/ai_summary/bin/activate核心工具链 工具用途安装命令Whisper语音识别pip …...

基于协同过滤推荐算法的景点票务数据系统(python-计算机毕设)

摘 要 I ABSTRACT II 第 1 章 引言 1 研究背景及意义 1 研究背景 1研究意义 1 国内外研究现状 2 智慧旅游 3旅游大数据 3 研究内容 4本章小结 4 第 2 章 相关技术概述 5 基于内容的推荐算法 5 基于内容的推荐算法原理 5基于内容的推荐算法实现 5 协同过滤推荐算法 6 协同过…...

QT学习笔记1

** Qt Creator开发环境配置** 安装流程&#xff08;Windows平台&#xff09; 下载与安装 &#xff1a; 访问Qt官网&#xff0c;下载在线安装工具Qt Online Installer。登录或注册Qt账号&#xff0c;选择开源版本&#xff08;需勾选“接受协议”&#xff09;。勾选组件&#xff…...

Ubuntu 24 常用命令方法

文章目录 环境说明1、账号管理1.1、启用 root 2、包管理工具 apt & dpkg2.1、apt 简介 & 阿里源配置2.2、dpkg 简介2.3、apt 和 dpkg 两者之间的关系2.4、常用命令 3、启用 ssh 服务4、防火墙5、开启远程登录6、关闭交换分区7、build-essential&#xff08;编译和开发软…...

Flask多参数模版使用

需要建立目录templates&#xff1b; 把建好的html文件放到templates目录里面&#xff1b; 约定好参数名字&#xff0c;单个名字可以直接使用&#xff1b;多参数使用字典传递&#xff1b; 样例&#xff1a; from flask import render_template # 模板 (Templates) #Flask 使用…...

torcharrow gflags版本问题

问题描述 其实仍然是很简单的编译问题&#xff0c;但是又弄了一整个下午加几乎整个晚上&#xff0c;进度缓慢&#xff0c;又吸取了教训&#xff0c;因而还是来记录一下。 在试图使用torcharrow进行推荐系统模拟的时候&#xff0c;撰写的python程序报错&#xff1a;ERROR: flag…...

自然语言处理|深入解析 PEGASUS:从原理到实践

一、引言 在信息爆炸的时代&#xff0c;互联网上的文本数据以极快的速度增长。无论是新闻资讯、学术论文、社交媒体动态&#xff0c;还是各类报告文档&#xff0c;我们每天接触到的文字信息量巨大。如何快速、准确地提取关键内容成为一项重要任务。文本摘要技术通过将长篇文本…...

Spring AI Alibaba快速使用

AI 时代&#xff0c;Java 程序员也需要与时俱进&#xff0c;这两个框架必须掌握。 一个是 Spring AI一个是 Spring Alibaba AI。 Spring AI 是一个AI工程领域的应用程序框架&#xff0c;它的目标是将 Spring生态系统的设计原则应用于人工智能领域。 但是&#xff0c; Spring…...

socks 协议介绍

SOCKS协议详解 一、基本定义与核心功能 SOCKS&#xff08;Socket Secure&#xff09;是一种网络传输协议&#xff0c;主要用于通过代理服务器转发客户端与目标服务器之间的通信请求。其核心功能包括隐藏用户真实IP地址、穿透防火墙限制以及支持多种网络协议&#xff08;如TCP…...

Linux --centos安装显卡驱动

显卡下载页面 https://www.nvidia.com/en-us/drivers/unix/ 随便下载一个即可 安装过程 查看当前设备的显卡信息 lspci | grep -i vga安装gcc相关依赖 yum update -y yum update gcc yum install build-essential yum install gcc-multilibdkms yum groupinstall "Dev…...

【软件工程】简答题

真题 2024-10 26.需求验证应验证需求规格说明书中每一单一需求是否满足5个性质,这5个性质是什么? 27.简述RUP和UML的关系。 28.简述五种常见的模块间耦合类型。 29.螺旋模型在笛卡尔坐标的4个象限上,分别表达了哪4个方面的活动? 30.为了表达概念模型和软件模型,UML提供了13…...

统信UOS中使用Vscode编程

写在前面&#xff1a;统信UOS其实就是套壳的Linux系统&#xff0c;所以有问题如果搜不到解决方法&#xff0c;可以参考Linux下的解决方法。 1.环境配置 Vscode : 1.85.0 Vscode就直接下载安装就行&#xff0c;然后安装插件&#xff1a;Volar、中文汉化包 node&#xff1a;18…...

K8s认证(CKA/CKAD/CKS)哪家强?主流证书对比

2024年Kubernetes认证全解析&#xff1a;选对证书&#xff0c;薪资翻倍&#xff01; &#xff08;附CKA/CKAD/CKS最新考试攻略&#xff09; 一、K8s认证哪家强&#xff1f;主流证书对比 认证名称颁发机构考试特点适合人群考试费用CKA&#xff08;认证K8s管理员&#xff09;CN…...

HTTP1.0、HTTP1.1、HTTP2.0对比

HTTP 1.0、HTTP 1.1 和 HTTP 2.0 是 HTTP 协议演进中的三个重要版本&#xff0c;每个版本都针对前一代的不足进行了优化和改进。以下是它们的核心区别和关键特性对比&#xff1a; 1. HTTP 1.0&#xff08;1996年&#xff09; 非持久连接 每个请求/响应对都需要单独建立和关闭 T…...

docker安装向量数据库Milvus及可视化工具 Attu

前置条件 1.安装了docker 2.服务器网络正常&#xff0c;可以连接到容器下载地址 3.服务器磁盘空间正常&#xff0c;docker磁盘占用过大&#xff0c;请参考docker容量占用过大解决办法 一、下载yml文件 可在文章资源下载或者自行下载&#xff1a;下载yml 下载这个单机版本的…...

A l密码学(Deepseek)

我&#xff1a;qwertyuiopasdfghjklzxcvbnm deepseek:深度思考中&#xff0e; Okay, lets see. The user input is "qwertyuiopasdfghjklzxcvbnm". At first glance, it looks like a jumbled sequence of letters with some spaces or maybe other characters in …...

DeepSeek + Kimi 自动生成 PPT

可以先用deepseek生成ppt大纲&#xff0c;再把这个大纲复制到Kimi的ppt助手里&#xff1a; https://kimi.moonshot.cn/kimiplus/conpg18t7lagbbsfqksg 选择ppt模板&#xff1a; 点击生成ppt就制作好了。...

Apache Paimon 在抖音集团多场景中的优化实践

资料来源&#xff1a;火山引擎-开发者社区 本文将基于抖音集团内部两大业务的典型实时数仓场景&#xff0c;介绍Paimon在抖音集团内部的生产实践。 作者&#xff1a;李明、苏兴、文杰 抖音集团大数据工程师 目前抖音集团内部主要使用 Lambda 架构进行实时数仓建设&#xff0c;其…...

Uni-App 双栏联动滚动组件开发详解 (电梯导航)

本文基于提供的代码实现一个左右联动的滚动组件&#xff0c;以下是详细的代码解析与实现原理说明&#xff1a; <!--双栏联动滚动组件 - 技术解析功能特性&#xff1a;1. 左侧导航栏与右侧内容区双向联动2. 自适应容器高度3. 平滑滚动定位4. 动态内容位置计算 --> <te…...

当下主流 AI 模型对比:ChatGPT、DeepSeek、Grok 及其他前沿技术

&#x1f4dd;个人主页&#x1f339;&#xff1a;一ge科研小菜鸡-CSDN博客 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 1. 引言 人工智能&#xff08;AI&#xff09;领域近年来取得了巨大的突破&#xff0c;特别是在大语言模型&#xff08;LLM&#…...

【自用】NLP算法面经(5)

一、L1、L2正则化 正则化是机器学习中用于防止过拟合并提高模型泛化能力的技术。当模型过拟合时&#xff0c;它已经很好地学习了训练数据&#xff0c;甚至是训练数据中的噪声&#xff0c;所以可能无法在新的、未见过的数据上表现良好。 比如&#xff1a; 其中&#xff0c;x1和…...