Kafka java 配置
前言:
大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。
介绍:
我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。
给大家解释配置含义。
1.Kafka配置代码
public KafkaConsumer<String, String> getCustomer() {// 1. 配置属性参数Properties properties = new Properties();// 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");// 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);// 设置消费者是否自动提交offset,true表示自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 设置自动提交offset的时间间隔(单位:毫秒)properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);// 设置每次poll操作返回的最大记录数properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);// 根据配置属性创建Kafka消费者实例return new KafkaConsumer<>(properties);
}
2.Kafka消费者代码
@Test
void KafkaConsumerTest() {// 创建Kafka消费者实例,通过getCustomer()方法获取KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();// 订阅要消费的主题,这里是 "test-topic"consumer.subscribe(Collections.singletonList("test-topic"));// 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));for (ConsumerRecord<String, String> record : records) {// 处理消息的逻辑// 打印消息的offset、key和valueSystem.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());//以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。boolean flag = true;if (flag){// 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息// 如果需要手动提交offset,可以取消注释下面的代码// consumer.commitAsync();// 由于flag为true,这里会跳出循环,不再处理后续的消息break;}}// 关闭消费者,释放资源consumer.close();// 打印结束消费的日志System.out.println("结束消费");
}
相关文章:
Kafka java 配置
前言: 大家好,大家在springboot项目中,经常采用 KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。 介绍: 我们已经集成spring-Kafka 就不需要再…...
网络安全现状:复杂的威胁形势导致压力水平飙升
《2024 年网络安全状况》报告深入分析了当前网络安全挑战和趋势。 该报告重点介绍了几个关键的关注领域,包括人员短缺、技能差距、不断演变的威胁和预算限制,同时还指出了取得进展的领域,例如对威胁响应能力的信心增强以及对网络风险评估的认…...
【机器学习】强化学习(1)——强化学习原理浅析(区分强化学习、监督学习和启发式算法)
文章目录 强化学习介绍强化学习和监督学习比较监督学习强化学习 强化学习的数学和过程表达动作空间序列决策策略(policy)价值函数(value function)模型(model) 强化学习和启发式算法比较强化学习步骤代码走…...
【SoC设计指南 基于Arm Cortex-M】学习笔记1——AMBA
AMBA简介 先进微控制器总线架构(Advanced Microcontroller Bus Architecture,AMBA)是用在arm处理器上的片上总线协议规范集。 AMBA总线协议规范集包含AHB、APB、AXI等。 AHB:先进高性能总线(Advanced High-performance Bus) APB&…...
flutter鸿蒙模拟器 Win环境调试报错问题记录(暂未解决)
前情提要: 1、flutter项目已经正确生成了ohos项目 2、flutter和鸿蒙的环境变量配置正确 3、ohos项目执行flutter build hap成功 4、没有真机,使用win环境创建的x86模拟器 问题状态 使用模拟器运行ohos,控制台提示“安装HAP 报 code:9568347错…...
详解Rust标准库:HashSet
## 查看本地官方文档安装rust后运行 rustup doc查看The Standard Library即可获取标准库内容 std::collections::hash_set::HashSet定义 HashSet是一种集合数据结构,它只存储唯一的元素。它主要用于检查元素是否存在于集合中,或者对元素进行去重操作&…...
记录学习react的一些内容
由于是在公司实际项目中学习,所以不是很完整 需要一点一点的学 1.React.useState 类似于vue中的ref 可以修改状态 但是是异步的 感觉不好用 const [wishData, setWishData] React.useState<any>(null); 只能使用setxxx来修改 2.useEffect(()>{},[]) 类…...
json绘制热力图
首先需要一段热力信息的json,我放在头部了。 然后就是需要de-geo库了。 实现代码如下: import * as d3geo from d3-geoimport trafficJSON from ../assets/json/traffic.jsonlet geoFun;// 地理投影函数// let info {max: Number.MIN_SAFE_INTEGER,mi…...
linux 下查看程序启动的目录
以azkaban为例 第一步、ps -ef | grep azkaban 查询出进程号 第二步、cd /proc/ 第三步 、cd 进程号 第四部 ll 查看详情 查看jar 位置 查看jar 启动命令...
书生浦语第四期基础岛L1G2000-玩转书生「多模态对话」与「AI搜索」产品
文章目录 一、MindSearch二、书生浦语三、书生万象四、进阶任务 一、MindSearch MindSearch 是一个开源的 AI 搜索引擎。它会对你提出的问题进行分析并拆解为数个子问题,在互联网上搜索、总结得到各个子问题的答案,最后通过模型总结得到最终答案。书生浦…...
保护Kubernetes免受威胁:容器安全的有效实践
安全并非“放之四海而皆准”的解决方案,相反地,它更多的是一个范围,受其应用的特定上下文的影响。安全领域的专业人士很少宣称什么产品是完全安全的,但总有方法可以实现更强的安全性。在本文中,我们将介绍各种方法来支…...
【客观理性深入讨论国产中间件及数据库-科创基础软件】
随着国产化的进程,越来越多的国企央企开始要求软件产品匹配过程化的要求, 最近有一家银行保险的科技公司对行为验证码产品就要求匹配国产中间件, 于是开始了解国产中间件都有哪些厂家 一:国产中间件主要产品及厂商 1 东方通&…...
MFC中Excel的导入以及使用步骤
参考地址 在需要对EXCEL表进行操作的类中添加以下头文件:若出现大量错误将其放入stdafx.h中 #include "resource.h" // 主符号 #include "CWorkbook.h" //单个工作簿 #include "CRange.h" //区域类,对Excel大…...
AWS S3在客户端应用不能使用aws-sdk场景下的文件上传与下载
简介 通常情况下,应用程序上传文件到AWS S3,会使用aws-sdk,但是有些情况下,客户端应用会有安装限制,比如不能安装aws-sdk,此时我们就需要通过其他方式实现文件上传与下载。 这里我们提供一个服务端&#…...
深入解析 Transformers 框架(四):Qwen2.5/GPT 分词流程与 BPE 分词算法技术细节详解
前面我们已经通过三篇文章,详细介绍了 Qwen2.5 大语言模型在 Transformers 框架中的技术细节,包括包和对象加载、模型初始化和分词器技术细节: 深入解析 Transformers 框架(一):包和对象加载中的设计巧思与…...
【Python-AI篇】K近邻算法(KNN)
0. 前置----机器学习流程 获取数据集数据基本处理特征工程机器学习模型评估在线服务 1. KNN算法概念 如果一个样本在特征空间中的K个最相似(即特征空间中最邻近)的样本中大多数属于某一个类别,则该样本也属于这一个类别 1.1 KNN算法流程总…...
aws xray如何实现应用log和trace的关联关系
参考资料 https://community.aws/tutorials/solving-problems-you-cant-see-using-aws-x-ray-and-cloudwatch-for-user-level-observability-in-your-serverless-microservices-applicationshttps://stackoverflow.com/questions/76000811/search-cloudwatch-logs-for-aws-xra…...
centos服务器登录失败次数设定
实现的效果 一台centos服务,如果被别人暴力或者登录次数超过多少次,就拒绝或者在规定时间内拒绝ip登录。这里使用的是fail2ban 安装fail2ban sudo yum install epel-release -y # 先安装 EPEL 源 sudo yum install fail2ban -y配置fail2ban # 复制默…...
实时高效,全面测评快递100API的物流查询功能
一、引言 你是否曾经在网购后焦急地等待包裹,频繁地手动刷新订单页面以获取最新的物流信息?或者作为一名开发者,正在为如何在自己的应用程序中高效地实现物流查询功能而发愁?其实,有一个非常好用的解决方案——快递10…...
第14张 GROUP BY 分组
一、分组功能介绍 使用group by关键字通过某个字段进行分组,对分完组的数据分别 “SELECT 聚合函数”查询结果。 1.1 语法 SELECT column, group_function(column) FROM table [WHERE condition] [GROUP BY group_by_expression] [ORDER BY column]; 明确&#…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度
文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...
安卓基础(Java 和 Gradle 版本)
1. 设置项目的 JDK 版本 方法1:通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分,设置 Gradle JDK 方法2:通过 Settings File → Settings... (或 CtrlAltS)…...
深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏
一、引言 在深度学习中,我们训练出的神经网络往往非常庞大(比如像 ResNet、YOLOv8、Vision Transformer),虽然精度很高,但“太重”了,运行起来很慢,占用内存大,不适合部署到手机、摄…...
智能职业发展系统:AI驱动的职业规划平台技术解析
智能职业发展系统:AI驱动的职业规划平台技术解析 引言:数字时代的职业革命 在当今瞬息万变的就业市场中,传统的职业规划方法已无法满足个人和企业的需求。据统计,全球每年有超过2亿人面临职业转型困境,而企业也因此遭…...
spring中的@KafkaListener 注解详解
KafkaListener 是 Spring Kafka 提供的一个核心注解,用于标记一个方法作为 Kafka 消息的消费者。下面是对该注解的详细解析: 基本用法 KafkaListener(topics "myTopic", groupId "myGroup") public void listen(String message)…...
CKA考试知识点分享(2)---ingress
CKA 版本:1.32 第二题是涉及ingress相关。本文不是题目,只是为了学习相关知识点做的实验。 1. 环境准备 需要准备一套K8S集群。 1.1 安装ingress-nginx 下载deploy文件: wget -O controller-v1.12.2.yaml https://raw.githubusercontent…...
AGV|无人叉车工业语音播报器|预警提示器LBE-LEX系列性能与接线说明
LBE-LEX系列AGV|无人叉车工业语音播报器|预警提示器,涵盖LBE-LEI-M-00、LBE-LESM-00、LBE-LES-M-01、LBE-LEC-M-00、LBE-KEI-M-00、LBE-KES-M-00、LBE-KES-M-01、LBE-KEC-M-00等型号,适用于各种需要语音提示的场景,主要有AGV、AMR机器人、无人…...
