当前位置: 首页 > news >正文

Kafka-多线程消费及分区设置

目录

  • 一、Kafka是什么?
    • 消息系统:Publish/subscribe(发布/订阅者)模式
    • 相关术语
  • 二、初步使用
    • 1.yml文件配置
    • 2.生产者类
    • 3.消费者类
    • 4.发送消息
  • 三、减少分区数量
    • 1.停止业务服务进程
    • 2.停止kafka服务进程
    • 3.重新启动kafka服务
    • 4.重新启动业务服务
  • 参考文章

一、Kafka是什么?

Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统。可满足每秒百万级的消息生产和消费;有一套完善的消息存储机制,确保数据高效安全且持久化;Kafka作为一个集群运行在一个或多个服务器上,可以跨多个机房,当某台故障时,生产者和消费者转而使用其他的Kafka。

消息系统:Publish/subscribe(发布/订阅者)模式

1.消息发布者发布消息到主题中,有多个订阅者消费该消息。
2.当发布者发布消息时,不管是否有订阅者都不会报错。
3.一定要先有消息发布者,后有消息订阅者。

相关术语

1.Broker:Kafka服务器,负责创建topic、消息存储和转发。
2.Topic:消息类别(主题),用于区分消息。
3.Partition:分区,真正的存储数据单元。每个Topic包含一个或多个分区,用于保存消息和维护偏移量。(一般为kafka节点数CPU的总核心数量)
4.offset:分区消息此时被消费的位置。分区中消息的唯一id。
5.Producer:消息生产者。
6.Consumer:消息消费者。
7.Consumer Group:消费者组。由消费不同的分区的多个消费者实例组成,共用同一个Group-id。
8.Message:消息,由offset(分区上的消息id)、MessageSize(消息内容data大小)、data(消息具体内容)组成。

二、初步使用

1.yml文件配置

spring:kafka:bootstrap-servers: http://127.0.0.1:9002properties:security:protocol: SASL_PLAINTEXTsasl:mechanism: PLAINjaas:config: org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456";producer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消费者超时时间 6properties:max:poll:interval:ms: 6000listener:# 在侦听器容器中运行的线程数。消费者组中的实例数量。 【本次重点】concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false

2.生产者类

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;@Component
@Slf4j
public class KafkaProducer {// 消费者组public static final String TOPIC_GROUP2 = "topic.group2";@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public void send(String topic,Object obj) {String obj2String = JSONObject.toJSONString(obj);log.info("准备发送消息为:{}", obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {//发送失败的处理log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> stringObjectSendResult) {//成功的处理log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());}});}
}

3.消费者类

使用注解的方式来创建主题和分区。

package com.lezhi.szxy.oa.core.kafka;import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ServiceException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.poi.ss.formula.functions.T;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.RetryingBatchErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;@Component
@Slf4j
public class KafkaConsumer {@Resourceprivate addService addService;@Resourceprivate RedisLockUtil redisLockUtil;@ResourceRedissonClient redissonClient;@ResourceRedisTemplate<String,String> redisTemplate;private static final String ADD_LOCK_PREFIX = "ADD_LOCK_PREFIX";ObjectMapper objectMapper = new ObjectMapper();/*** 初始化主题分区* @return*/@Beanpublic NewTopic batchTopic() {log.info("初始化主题分区batchTopic : add_topic,分区:5,副本数:1 >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");return new NewTopic("add_topic", 5, (short) 1);}/*** 添加消息* @param ack*/@KafkaListener(topics = "add_topic"C,groupId = KafkaProducer.TOPIC_GROUP2)public void handleAddMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {log.info("add_topic-队列消费端 topic:{}, 收到消息>>>>>>>>>>>>>>>>>", topic);Optional message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();try {ParamImport param =  objectMapper.readValue(String.valueOf(msg) , ParamImport .class);String fullKey = redisLockUtil.getFullKey(ADD_LOCK_PREFIX , String.valueOf(msg));if(redisLockUtil.getLock(fullKey , 10000)){// 业务代码...log.info("add_topic 消费了: Topic:" + topic + ",Message:" + String.valueOf(msg));}else {log.info("add_topic 已经被消费: Topic:" + topic + ",Message:" + String.valueOf(msg));}ack.acknowledge();} catch (Exception e) {e.printStackTrace();log.error("解析 <"+OaConstant.SALARY_SEND_MESSAGE_KAFKA_TOPIC+"> 数据异常");}}}
}

配置消费端主题分区启动后,查看kafka,add_topic主题生成五个分区实例
kafka配置
注意:一个消费线程,可以对应若干分区。但是为了保证数据的一致性,同一个分区同时只能备一个消费者实例消费,所以超过分区数量的消费者实例个数是多余的,会被闲置。

将消费者实例(消费线程)比为一个人,分区消息相当于一个办公位。办公位数>人数时,哪个办公位有消息待消费,人就到哪一个工位处理消息。当办公位数<人数时,后面的人数需要排队等待前面的人离开,才可以进入办公位消费。
当人再多时,只有一个办公位,人也得排队办公,属于同步消费;当办公位有多个时,才能实现多人同时操作。

单机kafka分区最好不超过5。默认使用轮询策略。

4.发送消息

public void addTopicMsg(ParamImport param) throws ServiceException {String json;try {json = objectMapper.writeValueAsString(param);} catch (JsonProcessingException e) {log.error("addTopicMsg-发送消息,kafka消息转换失败:{}", e);throw new ServiceException("发送失败");}log.info("addTopicMsg-发送消息,发送kafka请求>>>>>>>>>>>>>>>>>>>>>>>");kafkaTemplate.send("add_topic", json);}

三、减少分区数量

上文中,我们使用了new NewTopic()的方式创建分区,分区数量只能动态增加不能减少。所以我们需要根据以下步骤来重新生成分区,达成减少分区的目的。

1.停止业务服务进程

停止业务服务进程,使得不会重复生成分区。修改代码内配置的new NewTopic()配置分区数。

2.停止kafka服务进程

停止kafka服务进程,清空分区、主题等数据。

3.重新启动kafka服务

4.重新启动业务服务

此时就会根据修改后的分区设置重新生成分区。

参考文章

【SpringBoot】在Springboot中怎么设置Kafka自动创建Topic
SpringBoot+Kafka之如何优雅的创建topic
想弄明白Kafka到底是什么吗?看完这篇你就知道了!(概念、数据存储、生产者、消费者)
图解Kafka,看本篇就足够啦!

相关文章:

Kafka-多线程消费及分区设置

目录 一、Kafka是什么&#xff1f;消息系统&#xff1a;Publish/subscribe&#xff08;发布/订阅者&#xff09;模式相关术语 二、初步使用1.yml文件配置2.生产者类3.消费者类4.发送消息 三、减少分区数量1.停止业务服务进程2.停止kafka服务进程3.重新启动kafka服务4.重新启动业…...

计算机导论06-人机交互

文章目录 人机交互基础人机交互概述人机交互及其发展人机交互方式人机界面 新型人机交互技术显示屏技术跟踪与识别&#xff08;技术&#xff09;脑-机接口 多媒体技术多媒体技术基础多媒体的概念多媒体技术及其特性多媒体技术的应用多媒体技术发展趋势 多媒体应用技术文字&…...

hot100:07接雨水

题目链接&#xff1a; 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 算法思想&#xff1a; 这里采取的是暴力解法和双指针的解法&#xff0c;但是这个题目还有其他的两种解法&#xff08;单调栈和动态规划&#xff0c;同学可以自行了解&#xff…...

Docker安装MySQL教程分享(附MySQL基础入门教程)

docker安装MySQL Docker可以通过以下命令来安装MySQL容器&#xff1a; 首先确保已经在计算机上安装了Docker。如果没有安装&#xff0c;请根据操作系统的不同进行相应的安装。 打开终端或命令提示符&#xff0c;并运行以下命令拉取最新版本的MySQL镜像&#xff1a; docker pu…...

麒麟V10挂载iso,配置yum源

本文介绍yum 如何挂载本地镜像源 1) 拷贝镜像到本地 2) 执行以下命令&#xff1a; # mount -o loop 镜像路径及镜像名字 /mnt&#xff08;或 media&#xff09; 挂载前 挂载后 3) 进入/etc/yum.repos.d&#xff08;yum.repos.d 是一个目录&#xff0c;该目录是分析 RPM 软件…...

《Linux C编程实战》笔记:信号的捕捉和处理

Linux系统中对信号的处理主要由signal和sigaction函数来完成&#xff0c;另外还会介绍一个函数pause&#xff0c;它可以用来响应任何信号&#xff0c;不过不做任何处理 signal函数 #include <signal.h> void (*signal(int signum, void (*handler)(int)))(int);可以分解…...

python算法与数据结构---单调栈与实践

单调栈 单调栈是一个栈&#xff0c;里面的元素的大小按照它们所在栈的位置&#xff0c;满足一定的单调性&#xff1b; 性质&#xff1a; 单调递减栈能找到左边第一个比当前元素大的元素&#xff1b;单调递增栈能找到左边第一个比当前元素小的元素&#xff1b; 应用场景 一般用…...

文心一言使用分享

ChatGPT 和文心一言哪个更好用&#xff1f; 一个直接可以用&#xff0c;一个还需要借助一些工具&#xff0c;还有可能账号会消失…… 没有可比性。 通用大模型用于特定功能的时候需要一些引导技巧。 import math import time def calculate_coordinate(c, d, e, f, g, h,…...

【C++干货铺】C++11新特性——lambda表达式 | 包装器

个人主页点击直达&#xff1a;小白不是程序媛 C系列专栏&#xff1a;C干货铺 代码仓库&#xff1a;Gitee 目录 C98中的排序 lambda表达式 lambda表达式语法 表达式中的各部分说明 lambda表达式的使用 基本的使用 [var]值传递捕捉变量var ​编辑 [&var]引用传递捕…...

在 EggJS 中实现 Redis 上锁

配置环境 下载 Redis Windows 访问 https://github.com/microsoftarchive/redis/releases 选择版本进行下载 - 勾选 [配置到环境变量] - 无脑下一步并安装 命令行执行&#xff1a;redis-cli -v 查看已安装的 Redis 版本&#xff0c;能成功查看就表示安装成功啦~ Mac brew i…...

Unity-场景

创建场景 创建新的场景后&#xff1a; 文件 -> 生成设置 -> Build中的场景 -> 将项目中需要使用的场景拖进去 SceneTest public class SceneTest : MonoBehaviour {// Start is called before the first frame updatevoid Start(){// 两个类&#xff1a; 场景类、场…...

MATLAB R2023b for Mac 中文

MATLAB R2023b 是 MathWorks 发布的最新版本的 MATLAB&#xff0c;适用于进行算法开发、数据可视化、数据分析以及数值计算等任务的工程师和科学家。它包含了一系列新增功能和改进&#xff0c;如改进了数据导入工具&#xff0c;增加了对数据帧和表格对象的支持&#xff0c;增强…...

01 MyBatisPlus快速入门

1. MyBatis-Plus快速入门 版本 3.5.31并非另起炉灶 , 而是MyBatis的增强 , 使用之前依然要导入MyBatis的依赖 , 且之前MyBatis的所有功能依然可以使用.局限性是仅限于单表操作, 对于多表仍需要手写 项目结构&#xff1a; 先导入依赖&#xff0c;比之前多了一个mybatis-plus…...

HarmonyOS 应用开发入门

HarmonyOS 应用开发入门 前言 DevEco Studio Release版本为&#xff1a;DevEco Studio 3.1.1。 Compile SDK Release版本为&#xff1a;3.1.0&#xff08;API 9&#xff09;。 构建方式为 HVigor&#xff0c;而非 Gradle。 最新版本已不再支持 &#xff08;”Java、JavaScrip…...

【机器学习300问】9、梯度下降是用来干嘛的?

当你和我一样对自己问出这个问题后&#xff0c;分析一下&#xff01;其实我首先得知道梯度下降是什么&#xff0c;也就它的定义。其次我得了解它具体用在什么地方&#xff0c;也就是使用场景。最后才是这个问题&#xff0c;梯度下降有什么用&#xff1f;怎么用&#xff1f; 所以…...

第13章 1 进程和线程

文章目录 程序和进程的概念 p173函数式创建子进程Process类常用的属性和方法1 p175Process类中常用的属性和方法2 p176继承式创建子进程 p177进程池的使用 p178并发和并行 p179进程之间数据是否共享 p180队列的基本使用 p180使用队列实现进程之间的通信 p182函数式创建线程 p18…...

什么是中间件?

文章目录 为什么需要中间件&#xff1f;中间件生态漫谈数据库中间件读写分离分库分表引进数据库中间件MyCat 服务端代理模式ShardingJDBC 客户端代理模式 总结 IT 系统从单体应用逐渐向分布式架构演变&#xff0c;高并发、高可用、高性能、分布式等话题变得异常火热&#xff0c…...

汽车售后服务客户满意度调查报告

本文由群狼调研&#xff08;长沙旅行社满意度调查&#xff09;出品&#xff0c;欢迎转载&#xff0c;请注明出处。汽车售后服务客户满意度调查报告通常包括以下内容&#xff1a; 1.调研概况&#xff1a;介绍调研的目的、背景和范围&#xff0c;包括调研的时间、地点和样本规模等…...

初始RabbitMQ(入门篇)

消息队列(MQ) 本质上就是一个队列,一个先进先出的队列,队列中存放的内容是message(消息),是一种跨进程的通信机制,用于上下游传递消息, 为什么使用MQ: 削峰填谷: MQ可以很好的做一个缓冲机制,例如在一个系统中有A和B两个应用,A是接收用户的请求的,然后A调用B进行处理. 这时…...

JVM:Java类加载机制

Java类加载机制的全过程&#xff1a; 加载、验证、准备、初始化和卸载这五个阶段的顺序是确定的&#xff0c;类型的加载过程必须按照这种顺序按部就班地开始&#xff0c;而解析阶段则不一定&#xff1a;它在某些情况下可以在初始化阶段之后再开始&#xff0c; 这是为了支持Java…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!

一、引言 在数据驱动的背景下&#xff0c;知识图谱凭借其高效的信息组织能力&#xff0c;正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合&#xff0c;探讨知识图谱开发的实现细节&#xff0c;帮助读者掌握该技术栈在实际项目中的落地方法。 …...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计

随着大语言模型&#xff08;LLM&#xff09;参数规模的增长&#xff0c;推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长&#xff0c;而KV缓存的内存消耗可能高达数十GB&#xff08;例如Llama2-7B处理100K token时需50GB内存&a…...

Mobile ALOHA全身模仿学习

一、题目 Mobile ALOHA&#xff1a;通过低成本全身远程操作学习双手移动操作 传统模仿学习&#xff08;Imitation Learning&#xff09;缺点&#xff1a;聚焦与桌面操作&#xff0c;缺乏通用任务所需的移动性和灵活性 本论文优点&#xff1a;&#xff08;1&#xff09;在ALOHA…...

k8s从入门到放弃之HPA控制器

k8s从入门到放弃之HPA控制器 Kubernetes中的Horizontal Pod Autoscaler (HPA)控制器是一种用于自动扩展部署、副本集或复制控制器中Pod数量的机制。它可以根据观察到的CPU利用率&#xff08;或其他自定义指标&#xff09;来调整这些对象的规模&#xff0c;从而帮助应用程序在负…...

Neko虚拟浏览器远程协作方案:Docker+内网穿透技术部署实践

前言&#xff1a;本文将向开发者介绍一款创新性协作工具——Neko虚拟浏览器。在数字化协作场景中&#xff0c;跨地域的团队常需面对实时共享屏幕、协同编辑文档等需求。通过本指南&#xff0c;你将掌握在Ubuntu系统中使用容器化技术部署该工具的具体方案&#xff0c;并结合内网…...