当前位置: 首页 > news >正文

Kafka java 配置

前言:
        大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。

介绍:

        我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。

给大家解释配置含义。

1.Kafka配置代码

public KafkaConsumer<String, String> getCustomer() {// 1. 配置属性参数Properties properties = new Properties();// 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");// 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);// 设置消费者是否自动提交offset,true表示自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 设置自动提交offset的时间间隔(单位:毫秒)properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);// 设置每次poll操作返回的最大记录数properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);// 根据配置属性创建Kafka消费者实例return new KafkaConsumer<>(properties);
}

2.Kafka消费者代码

@Test
void KafkaConsumerTest() {// 创建Kafka消费者实例,通过getCustomer()方法获取KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();// 订阅要消费的主题,这里是 "test-topic"consumer.subscribe(Collections.singletonList("test-topic"));// 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));for (ConsumerRecord<String, String> record : records) {// 处理消息的逻辑// 打印消息的offset、key和valueSystem.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());//以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。boolean flag = true;if (flag){// 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息// 如果需要手动提交offset,可以取消注释下面的代码// consumer.commitAsync();// 由于flag为true,这里会跳出循环,不再处理后续的消息break;}}// 关闭消费者,释放资源consumer.close();// 打印结束消费的日志System.out.println("结束消费");
}

相关文章:

Kafka java 配置

前言&#xff1a; 大家好&#xff0c;大家在springboot项目中&#xff0c;经常采用 KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。 介绍&#xff1a; 我们已经集成spring-Kafka 就不需要再…...

网络安全现状:复杂的威胁形势导致压力水平飙升

《2024 年网络安全状况》报告深入分析了当前网络安全挑战和趋势。 该报告重点介绍了几个关键的关注领域&#xff0c;包括人员短缺、技能差距、不断演变的威胁和预算限制&#xff0c;同时还指出了取得进展的领域&#xff0c;例如对威胁响应能力的信心增强以及对网络风险评估的认…...

【机器学习】强化学习(1)——强化学习原理浅析(区分强化学习、监督学习和启发式算法)

文章目录 强化学习介绍强化学习和监督学习比较监督学习强化学习 强化学习的数学和过程表达动作空间序列决策策略&#xff08;policy&#xff09;价值函数&#xff08;value function&#xff09;模型&#xff08;model&#xff09; 强化学习和启发式算法比较强化学习步骤代码走…...

【SoC设计指南 基于Arm Cortex-M】学习笔记1——AMBA

AMBA简介 先进微控制器总线架构&#xff08;Advanced Microcontroller Bus Architecture&#xff0c;AMBA&#xff09;是用在arm处理器上的片上总线协议规范集。 AMBA总线协议规范集包含AHB、APB、AXI等。 AHB&#xff1a;先进高性能总线(Advanced High-performance Bus) APB&…...

flutter鸿蒙模拟器 Win环境调试报错问题记录(暂未解决)

前情提要&#xff1a; 1、flutter项目已经正确生成了ohos项目 2、flutter和鸿蒙的环境变量配置正确 3、ohos项目执行flutter build hap成功 4、没有真机&#xff0c;使用win环境创建的x86模拟器 问题状态 使用模拟器运行ohos&#xff0c;控制台提示“安装HAP 报 code:9568347错…...

详解Rust标准库:HashSet

## 查看本地官方文档安装rust后运行 rustup doc查看The Standard Library即可获取标准库内容 std::collections::hash_set::HashSet定义 HashSet是一种集合数据结构&#xff0c;它只存储唯一的元素。它主要用于检查元素是否存在于集合中&#xff0c;或者对元素进行去重操作&…...

记录学习react的一些内容

由于是在公司实际项目中学习&#xff0c;所以不是很完整 需要一点一点的学 1.React.useState 类似于vue中的ref 可以修改状态 但是是异步的 感觉不好用 const [wishData, setWishData] React.useState<any>(null); 只能使用setxxx来修改 2.useEffect(()>{},[]) 类…...

json绘制热力图

首先需要一段热力信息的json&#xff0c;我放在头部了。 然后就是需要de-geo库了。 实现代码如下&#xff1a; import * as d3geo from d3-geoimport trafficJSON from ../assets/json/traffic.jsonlet geoFun;// 地理投影函数// let info {max: Number.MIN_SAFE_INTEGER,mi…...

linux 下查看程序启动的目录

以azkaban为例 第一步、ps -ef | grep azkaban 查询出进程号 第二步、cd /proc/ 第三步 、cd 进程号 第四部 ll 查看详情 查看jar 位置 查看jar 启动命令...

书生浦语第四期基础岛L1G2000-玩转书生「多模态对话」与「AI搜索」产品

文章目录 一、MindSearch二、书生浦语三、书生万象四、进阶任务 一、MindSearch MindSearch 是一个开源的 AI 搜索引擎。它会对你提出的问题进行分析并拆解为数个子问题&#xff0c;在互联网上搜索、总结得到各个子问题的答案&#xff0c;最后通过模型总结得到最终答案。书生浦…...

保护Kubernetes免受威胁:容器安全的有效实践

安全并非“放之四海而皆准”的解决方案&#xff0c;相反地&#xff0c;它更多的是一个范围&#xff0c;受其应用的特定上下文的影响。安全领域的专业人士很少宣称什么产品是完全安全的&#xff0c;但总有方法可以实现更强的安全性。在本文中&#xff0c;我们将介绍各种方法来支…...

【客观理性深入讨论国产中间件及数据库-科创基础软件】

随着国产化的进程&#xff0c;越来越多的国企央企开始要求软件产品匹配过程化的要求&#xff0c; 最近有一家银行保险的科技公司对行为验证码产品就要求匹配国产中间件&#xff0c; 于是开始了解国产中间件都有哪些厂家 一&#xff1a;国产中间件主要产品及厂商 1 东方通&…...

MFC中Excel的导入以及使用步骤

参考地址 在需要对EXCEL表进行操作的类中添加以下头文件&#xff1a;若出现大量错误将其放入stdafx.h中 #include "resource.h" // 主符号 #include "CWorkbook.h" //单个工作簿 #include "CRange.h" //区域类&#xff0c;对Excel大…...

AWS S3在客户端应用不能使用aws-sdk场景下的文件上传与下载

简介 通常情况下&#xff0c;应用程序上传文件到AWS S3&#xff0c;会使用aws-sdk&#xff0c;但是有些情况下&#xff0c;客户端应用会有安装限制&#xff0c;比如不能安装aws-sdk&#xff0c;此时我们就需要通过其他方式实现文件上传与下载。 这里我们提供一个服务端&#…...

深入解析 Transformers 框架(四):Qwen2.5/GPT 分词流程与 BPE 分词算法技术细节详解

前面我们已经通过三篇文章&#xff0c;详细介绍了 Qwen2.5 大语言模型在 Transformers 框架中的技术细节&#xff0c;包括包和对象加载、模型初始化和分词器技术细节&#xff1a; 深入解析 Transformers 框架&#xff08;一&#xff09;&#xff1a;包和对象加载中的设计巧思与…...

【Python-AI篇】K近邻算法(KNN)

0. 前置----机器学习流程 获取数据集数据基本处理特征工程机器学习模型评估在线服务 1. KNN算法概念 如果一个样本在特征空间中的K个最相似&#xff08;即特征空间中最邻近&#xff09;的样本中大多数属于某一个类别&#xff0c;则该样本也属于这一个类别 1.1 KNN算法流程总…...

aws xray如何实现应用log和trace的关联关系

参考资料 https://community.aws/tutorials/solving-problems-you-cant-see-using-aws-x-ray-and-cloudwatch-for-user-level-observability-in-your-serverless-microservices-applicationshttps://stackoverflow.com/questions/76000811/search-cloudwatch-logs-for-aws-xra…...

centos服务器登录失败次数设定

实现的效果 一台centos服务&#xff0c;如果被别人暴力或者登录次数超过多少次&#xff0c;就拒绝或者在规定时间内拒绝ip登录。这里使用的是fail2ban 安装fail2ban sudo yum install epel-release -y # 先安装 EPEL 源 sudo yum install fail2ban -y配置fail2ban # 复制默…...

实时高效,全面测评快递100API的物流查询功能

一、引言 你是否曾经在网购后焦急地等待包裹&#xff0c;频繁地手动刷新订单页面以获取最新的物流信息&#xff1f;或者作为一名开发者&#xff0c;正在为如何在自己的应用程序中高效地实现物流查询功能而发愁&#xff1f;其实&#xff0c;有一个非常好用的解决方案——快递10…...

第14张 GROUP BY 分组

一、分组功能介绍 使用group by关键字通过某个字段进行分组&#xff0c;对分完组的数据分别 “SELECT 聚合函数”查询结果。 1.1 语法 SELECT column, group_function(column) FROM table [WHERE condition] [GROUP BY group_by_expression] [ORDER BY column]; 明确&#…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

Java 加密常用的各种算法及其选择

在数字化时代&#xff0c;数据安全至关重要&#xff0c;Java 作为广泛应用的编程语言&#xff0c;提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景&#xff0c;有助于开发者在不同的业务需求中做出正确的选择。​ 一、对称加密算法…...

PAN/FPN

import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...

Linux系统部署KES

1、安装准备 1.版本说明V008R006C009B0014 V008&#xff1a;是version产品的大版本。 R006&#xff1a;是release产品特性版本。 C009&#xff1a;是通用版 B0014&#xff1a;是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存&#xff1a;1GB 以上 硬盘&#xf…...

如何应对敏捷转型中的团队阻力

应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中&#xff0c;明确沟通敏捷转型目的尤为关键&#xff0c;团队成员只有清晰理解转型背后的原因和利益&#xff0c;才能降低对变化的…...

uniapp 小程序 学习(一)

利用Hbuilder 创建项目 运行到内置浏览器看效果 下载微信小程序 安装到Hbuilder 下载地址 &#xff1a;开发者工具默认安装 设置服务端口号 在Hbuilder中设置微信小程序 配置 找到运行设置&#xff0c;将微信开发者工具放入到Hbuilder中&#xff0c; 打开后出现 如下 bug 解…...

LCTF液晶可调谐滤波器在多光谱相机捕捉无人机目标检测中的作用

中达瑞和自2005年成立以来&#xff0c;一直在光谱成像领域深度钻研和发展&#xff0c;始终致力于研发高性能、高可靠性的光谱成像相机&#xff0c;为科研院校提供更优的产品和服务。在《低空背景下无人机目标的光谱特征研究及目标检测应用》这篇论文中提到中达瑞和 LCTF 作为多…...