Kafka java 配置
前言:
大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。
介绍:
我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。
给大家解释配置含义。
1.Kafka配置代码
public KafkaConsumer<String, String> getCustomer() {// 1. 配置属性参数Properties properties = new Properties();// 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");// 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);// 设置消费者是否自动提交offset,true表示自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 设置自动提交offset的时间间隔(单位:毫秒)properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);// 设置每次poll操作返回的最大记录数properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);// 根据配置属性创建Kafka消费者实例return new KafkaConsumer<>(properties);
}
2.Kafka消费者代码
@Test
void KafkaConsumerTest() {// 创建Kafka消费者实例,通过getCustomer()方法获取KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();// 订阅要消费的主题,这里是 "test-topic"consumer.subscribe(Collections.singletonList("test-topic"));// 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));for (ConsumerRecord<String, String> record : records) {// 处理消息的逻辑// 打印消息的offset、key和valueSystem.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());//以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。boolean flag = true;if (flag){// 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息// 如果需要手动提交offset,可以取消注释下面的代码// consumer.commitAsync();// 由于flag为true,这里会跳出循环,不再处理后续的消息break;}}// 关闭消费者,释放资源consumer.close();// 打印结束消费的日志System.out.println("结束消费");
}
相关文章:
Kafka java 配置
前言: 大家好,大家在springboot项目中,经常采用 KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。 介绍: 我们已经集成spring-Kafka 就不需要再…...
网络安全现状:复杂的威胁形势导致压力水平飙升
《2024 年网络安全状况》报告深入分析了当前网络安全挑战和趋势。 该报告重点介绍了几个关键的关注领域,包括人员短缺、技能差距、不断演变的威胁和预算限制,同时还指出了取得进展的领域,例如对威胁响应能力的信心增强以及对网络风险评估的认…...
【机器学习】强化学习(1)——强化学习原理浅析(区分强化学习、监督学习和启发式算法)
文章目录 强化学习介绍强化学习和监督学习比较监督学习强化学习 强化学习的数学和过程表达动作空间序列决策策略(policy)价值函数(value function)模型(model) 强化学习和启发式算法比较强化学习步骤代码走…...
【SoC设计指南 基于Arm Cortex-M】学习笔记1——AMBA
AMBA简介 先进微控制器总线架构(Advanced Microcontroller Bus Architecture,AMBA)是用在arm处理器上的片上总线协议规范集。 AMBA总线协议规范集包含AHB、APB、AXI等。 AHB:先进高性能总线(Advanced High-performance Bus) APB&…...
flutter鸿蒙模拟器 Win环境调试报错问题记录(暂未解决)
前情提要: 1、flutter项目已经正确生成了ohos项目 2、flutter和鸿蒙的环境变量配置正确 3、ohos项目执行flutter build hap成功 4、没有真机,使用win环境创建的x86模拟器 问题状态 使用模拟器运行ohos,控制台提示“安装HAP 报 code:9568347错…...
详解Rust标准库:HashSet
## 查看本地官方文档安装rust后运行 rustup doc查看The Standard Library即可获取标准库内容 std::collections::hash_set::HashSet定义 HashSet是一种集合数据结构,它只存储唯一的元素。它主要用于检查元素是否存在于集合中,或者对元素进行去重操作&…...
记录学习react的一些内容
由于是在公司实际项目中学习,所以不是很完整 需要一点一点的学 1.React.useState 类似于vue中的ref 可以修改状态 但是是异步的 感觉不好用 const [wishData, setWishData] React.useState<any>(null); 只能使用setxxx来修改 2.useEffect(()>{},[]) 类…...
json绘制热力图
首先需要一段热力信息的json,我放在头部了。 然后就是需要de-geo库了。 实现代码如下: import * as d3geo from d3-geoimport trafficJSON from ../assets/json/traffic.jsonlet geoFun;// 地理投影函数// let info {max: Number.MIN_SAFE_INTEGER,mi…...
linux 下查看程序启动的目录
以azkaban为例 第一步、ps -ef | grep azkaban 查询出进程号 第二步、cd /proc/ 第三步 、cd 进程号 第四部 ll 查看详情 查看jar 位置 查看jar 启动命令...
书生浦语第四期基础岛L1G2000-玩转书生「多模态对话」与「AI搜索」产品
文章目录 一、MindSearch二、书生浦语三、书生万象四、进阶任务 一、MindSearch MindSearch 是一个开源的 AI 搜索引擎。它会对你提出的问题进行分析并拆解为数个子问题,在互联网上搜索、总结得到各个子问题的答案,最后通过模型总结得到最终答案。书生浦…...
保护Kubernetes免受威胁:容器安全的有效实践
安全并非“放之四海而皆准”的解决方案,相反地,它更多的是一个范围,受其应用的特定上下文的影响。安全领域的专业人士很少宣称什么产品是完全安全的,但总有方法可以实现更强的安全性。在本文中,我们将介绍各种方法来支…...
【客观理性深入讨论国产中间件及数据库-科创基础软件】
随着国产化的进程,越来越多的国企央企开始要求软件产品匹配过程化的要求, 最近有一家银行保险的科技公司对行为验证码产品就要求匹配国产中间件, 于是开始了解国产中间件都有哪些厂家 一:国产中间件主要产品及厂商 1 东方通&…...
MFC中Excel的导入以及使用步骤
参考地址 在需要对EXCEL表进行操作的类中添加以下头文件:若出现大量错误将其放入stdafx.h中 #include "resource.h" // 主符号 #include "CWorkbook.h" //单个工作簿 #include "CRange.h" //区域类,对Excel大…...
AWS S3在客户端应用不能使用aws-sdk场景下的文件上传与下载
简介 通常情况下,应用程序上传文件到AWS S3,会使用aws-sdk,但是有些情况下,客户端应用会有安装限制,比如不能安装aws-sdk,此时我们就需要通过其他方式实现文件上传与下载。 这里我们提供一个服务端&#…...
深入解析 Transformers 框架(四):Qwen2.5/GPT 分词流程与 BPE 分词算法技术细节详解
前面我们已经通过三篇文章,详细介绍了 Qwen2.5 大语言模型在 Transformers 框架中的技术细节,包括包和对象加载、模型初始化和分词器技术细节: 深入解析 Transformers 框架(一):包和对象加载中的设计巧思与…...
【Python-AI篇】K近邻算法(KNN)
0. 前置----机器学习流程 获取数据集数据基本处理特征工程机器学习模型评估在线服务 1. KNN算法概念 如果一个样本在特征空间中的K个最相似(即特征空间中最邻近)的样本中大多数属于某一个类别,则该样本也属于这一个类别 1.1 KNN算法流程总…...
aws xray如何实现应用log和trace的关联关系
参考资料 https://community.aws/tutorials/solving-problems-you-cant-see-using-aws-x-ray-and-cloudwatch-for-user-level-observability-in-your-serverless-microservices-applicationshttps://stackoverflow.com/questions/76000811/search-cloudwatch-logs-for-aws-xra…...
centos服务器登录失败次数设定
实现的效果 一台centos服务,如果被别人暴力或者登录次数超过多少次,就拒绝或者在规定时间内拒绝ip登录。这里使用的是fail2ban 安装fail2ban sudo yum install epel-release -y # 先安装 EPEL 源 sudo yum install fail2ban -y配置fail2ban # 复制默…...
实时高效,全面测评快递100API的物流查询功能
一、引言 你是否曾经在网购后焦急地等待包裹,频繁地手动刷新订单页面以获取最新的物流信息?或者作为一名开发者,正在为如何在自己的应用程序中高效地实现物流查询功能而发愁?其实,有一个非常好用的解决方案——快递10…...
第14张 GROUP BY 分组
一、分组功能介绍 使用group by关键字通过某个字段进行分组,对分完组的数据分别 “SELECT 聚合函数”查询结果。 1.1 语法 SELECT column, group_function(column) FROM table [WHERE condition] [GROUP BY group_by_expression] [ORDER BY column]; 明确&#…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”
目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...
