当前位置: 首页 > news >正文

kafka数据拉取和发送

文章目录

  • 一、原生 KafkaConsumer
    • 1、pom文件引入kafka
    • 2、拉取数据
    • 3、发送数据
  • 二、在spring boot中使用@KafkaListener
    • 1、添加依赖
    • 2、application.yml
    • 3、消息拉取:consumer
    • 4、自定义ListenerContainerFactory
    • 5、消息发送:producer
    • 6、kafka通过clientId鉴权时的鉴权失败问题

一、原生 KafkaConsumer

1、pom文件引入kafka

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId>
</dependency>

2、拉取数据

简单说只要以下几个步骤:
1、获取kafka地址,并设置Properties
2、获取consumer:KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3、订阅topic:consumer.subscribe(topic);
4、拉取数据:consumer.poll()
5、遍历数据
6、示例:

package com.yogi.test.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;@Component
public class TestMsgConsumer implements InitializingBean {@Value("${test.kafka.address:127.0.0.1:9092}")private String kafkaAddress;@Value("${test.kafka.msg.topic:topic_test_1,topic_test_2}")private String msgTopic;@Value("${test.consumer.name:yogima}")private String consumerGroupId;/*** 消费开关: true-消费,false-暂停消费* 在服务正常停止时用于停止继续消费数据,将缓存中的数据发送完即可*/private Boolean consumeSwitch = true;public void consumerMessage(List<String> topic, String groupId) {LOGGER.info("consumer topic list1:{}",topic.toString());Properties props = new Properties();/*** 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置* 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表*/LOGGER.info("test.kafka.address:{}",kafkaAddress);props.put("bootstrap.servers", kafkaAddress);/*** 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id* 设置一个有业务意义的名字即可*/props.put("group.id", groupId);/*** 自动提交位移*/props.put("enable.auto.commit", Boolean.TRUE);/*** 位移提交超时时间*/props.put("auto.commit.interval.ms", "1000");/*** 从最早的消息开始消费* 1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* 2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据*/props.put("auto.offset.reset", "latest");/*** 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,* Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer* org.apache.kafka.common.serialization.ByteArrayDeserializer* StringDeserializer*/props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/*** 对消息体进行解序列化,与key解序列化类似*/props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完props.put("max.poll.records", "500");//fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。props.put("fetch.message.max.bytes", "300000000");KafkaConsumer<String, String> consumer;try{/*** 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器*/LOGGER.info("start set consumer,props:{}",props.toString());consumer = new KafkaConsumer<>(props);LOGGER.info("set consumer finished");/*** 订阅consumer group需要消费的topic列表*/LOGGER.info("consumer topic list:{}",topic.toString());consumer.subscribe(topic);}catch (Exception e){LOGGER.info("consumer subscribe failed,msg:{},cause:{},e:{}",e.getMessage(),e.getCause(),e);return;}/*** 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,* 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作*/try {while (true) {if (!consumeSwitch) {try {Thread.sleep(30000);} catch (InterruptedException e) {LOGGER.error("err msg:" + e.getMessage());}}/*** 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据* consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));/*** poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法* 返回即认为consumer成功消费了消息*/for (ConsumerRecord<String, String> record : records) {LOGGER.debug("offset = {}, key = {}, value = {}"

相关文章:

kafka数据拉取和发送

文章目录 一、原生 KafkaConsumer1、pom文件引入kafka2、拉取数据3、发送数据二、在spring boot中使用@KafkaListener1、添加依赖2、application.yml3、消息拉取:consumer4、自定义ListenerContainerFactory5、消息发送:producer6、kafka通过clientId鉴权时的鉴权失败问题一、…...

LLM全栈框架完整分类清单(预训练+微调+工具链)

一、预训练框架 1. 大规模分布式训练框架 框架名称核心能力GitHub地址Megatron-LM3D并行训练、FlashAttention支持、Transformer架构优化&#xff08;NVIDIA生态&#xff09;NVIDIA/Megatron-LMDeepSpeedZeRO优化系列、3D并行、RLHF全流程支持&#xff08;微软生态&#xff09…...

蓝桥杯备考:贪心算法之矩阵消除游戏

这道题是牛客上的一道题&#xff0c;它呢和我们之前的排座位游戏非常之相似&#xff0c;但是&#xff0c;排座位问题选择行和列是不会改变元素的值的&#xff0c;这道题呢每每选一行都会把这行或者这列清零&#xff0c;所以我们的策略就是先用二进制把选择所有行的情况全部枚举…...

【Matlab仿真】Matlab Function中如何使用静态变量?

背景 根据Simulink的运行机制&#xff0c;每个采样点会调用一次MATLAB Function的函数&#xff0c;两次调用之间&#xff0c;同一个变量的前次计算的终值如何传递到当前计算周期来&#xff1f;其实可以使用persistent变量实现函数退出和进入时内部变量值的保持。 persistent变…...

DeepSeek 提示词:高效的提示词设计

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…...

深入学习Java中的Lambda表达式

深入学习Java中的Lambda表达式 自Java 8引入以来&#xff0c;Lambda表达式彻底改变了Java的编程风格&#xff0c;让代码变得更加简洁、易读&#xff0c;尤其是在函数式编程的场景中。接下来&#xff0c;我们将深入探讨Lambda表达式的语法、原理以及实际应用&#xff0c;帮助你…...

1.2 AI 量化炒股的起源与发展

**定性价值**&#xff1a;AI量化炒股通过算法模型实现投资决策自动化&#xff0c;显著提升交易效率与风险控制能力&#xff0c;打破传统人工交易的主观性与延迟性&#xff0c;推动金融科技向智能化、数据驱动方向迭代&#xff0c;具有颠覆传统投资模式的战略意义。 **定量价值…...

计算机单位之详解——存储单位Byte 网络传输单位bps 视频码率单位bps

前言&#xff1a; 计算机里面单位有点复杂&#xff0c;容易混淆&#xff0c;很多时候混起来就容易概念不理解&#xff0c;包括一些小问题&#xff0c;比如说&#xff1a;为什么我买了1T硬盘&#xff0c;实际存在虚标。为什么所谓的千兆宽带&#xff0c;下载起来没有1G每秒&…...

IDEA关闭SpringBoot程序后仍然占用端口的排查与解决

IDEA关闭SpringBoot程序后仍然占用端口的排查与解决 问题描述 在使用 IntelliJ IDEA 开发 Spring Boot 应用时&#xff0c;有时即使关闭了应用&#xff0c;程序仍然占用端口&#xff08;例如&#xff1a;4001 端口&#xff09;。这会导致重新启动应用时出现端口被占用的错误&a…...

deepseek清华大学第二版 如何获取 DeepSeek如何赋能职场应用 PDF文档 电子档(附下载)

deepseek清华大学第二版 DeepSeek如何赋能职场 pdf文件完整版下载 https://pan.baidu.com/s/1aQcNS8UleMldcoH0Jc6C6A?pwd1234 提取码: 1234 或 https://pan.quark.cn/s/3ee62050a2ac...

【python随手记】——读取文本文件内容转换为json格式

文章目录 前言一、TXT文件转换为JSON数组1.txt文件内容2.python代码3.输出结果 二、TXT文件转换为JSON对象1.txt文件2.python代码3.输出结果 前言 场景&#xff1a;用于读取包含空格分隔数据的TXT文件&#xff0c;并将其转换为结构化JSON文件 一、TXT文件转换为JSON数组 1.tx…...

k8s集群3主5从高可用架构(kubeadm方式安装k8s)

关键步骤说明 环境准备阶段 系统更新&#xff1a;所有节点执行yum/apt update确保软件包最新时间同步&#xff1a;通过ntpdate time.windows.com或部署NTP服务器网络规划&#xff1a;明确划分Service网段&#xff08;默认10.96.0.0/12&#xff09;和Pod网段&#xff08;如Flann…...

基于 sklearn 的均值偏移聚类算法的应用

基于 sklearn 的均值偏移聚类算法的应用 在机器学习和数据挖掘中&#xff0c;聚类算法是一类非常重要的无监督学习方法。它的目的是将数据集中的数据点划分为若干个类&#xff0c;使得同一类的样本点彼此相似&#xff0c;而不同类的样本点相互之间差异较大。均值偏移聚类&…...

三、大模型微调的多种方法与应用场景

详解大模型微调的多种方法与应用场景 随着大模型的不断发展&#xff0c;如何有效地微调这些庞大的预训练模型以适应特定任务成为了研究和应用中的一个重要问题。大模型微调不仅能够提高任务性能&#xff0c;还能在不同的业务需求中提升模型的适应性。在本文中&#xff0c;我们…...

第2课 树莓派镜像的烧录

树莓派的系统通常是安装在SD卡上的‌。SD卡作为启动设备,负责启动树莓派并加载操作系统。这种设计使得树莓派具有便携性和灵活性,用户可以通过更换SD卡来更换操作系统或恢复出厂设置。 烧录树莓派的镜像即是将树莓派镜像烧录到SD卡上,在此期间会格式化SD卡,如果SD卡…...

SQL之order by盲注

目录 一.order by盲注的原理 二.注入方式 a.布尔盲注 b.时间盲注 三.防御 一.order by盲注的原理 order by子句是用于按指定列排序查询结果&#xff0c;列名或列序号皆可。 order by 后面接的字段或者数字不一样&#xff0c;那么这个数据表的排序就会不同。 order by 盲…...

AI大模型(四)基于Deepseek本地部署实现模型定制与调教

AI大模型&#xff08;四&#xff09;基于Deepseek本地部署实现模型定制与调教 DeepSeek开源大模型在榜单上以黑马之姿横扫多项评测&#xff0c;其社区热度指数暴涨、一跃成为近期内影响力最高的话题&#xff0c;这个来自中国团队的模型向世界证明&#xff1a;让每个普通人都能…...

java后端开发day19--学生管理系统升级

&#xff08;以下内容全部来自上述课程&#xff09; 1.要求及思路 1.总体框架 2.注册 3.登录 4.忘记密码 2.代码 1.javabean public class User1 {private String username;private String password;private String personID;private String phoneNumber;public User1() {…...

MFC文件和注册表的操作

MFC文件和注册表的操作 日志、操作配置文件、ini、注册表、音视频的文件存储 Linux下一切皆文件 C/C操作文件 const char* 与 char* const const char* 常量指针&#xff0c;表示指向的内容为常量。指针可以指向其他变量&#xff0c;但是内容不能再变了 char szName[6]&qu…...

vscode如何使用鼠标滚轮调整字体大小

1.打开设置 2.搜索Font Ligatures 3.编辑配置文件 4.修改代码并保存 修改前 修改后 在最后一行添加&#xff1a;“editor.mouseWheelZoom”: true 记得在上一行最后&#xff0c;加上英文版的“,”逗号 5.配置成功&#xff0c;再次按Ctrl鼠标滚轮便可以缩放了。...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

图表类系列各种样式PPT模版分享

图标图表系列PPT模版&#xff0c;柱状图PPT模版&#xff0c;线状图PPT模版&#xff0c;折线图PPT模版&#xff0c;饼状图PPT模版&#xff0c;雷达图PPT模版&#xff0c;树状图PPT模版 图表类系列各种样式PPT模版分享&#xff1a;图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

人工智能--安全大模型训练计划:基于Fine-tuning + LLM Agent

安全大模型训练计划&#xff1a;基于Fine-tuning LLM Agent 1. 构建高质量安全数据集 目标&#xff1a;为安全大模型创建高质量、去偏、符合伦理的训练数据集&#xff0c;涵盖安全相关任务&#xff08;如有害内容检测、隐私保护、道德推理等&#xff09;。 1.1 数据收集 描…...

Qt 事件处理中 return 的深入解析

Qt 事件处理中 return 的深入解析 在 Qt 事件处理中&#xff0c;return 语句的使用是另一个关键概念&#xff0c;它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别&#xff1a;不同层级的事件处理 方…...

tauri项目,如何在rust端读取电脑环境变量

如果想在前端通过调用来获取环境变量的值&#xff0c;可以通过标准的依赖&#xff1a; std::env::var(name).ok() 想在前端通过调用来获取&#xff0c;可以写一个command函数&#xff1a; #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...

消息队列系统设计与实践全解析

文章目录 &#x1f680; 消息队列系统设计与实践全解析&#x1f50d; 一、消息队列选型1.1 业务场景匹配矩阵1.2 吞吐量/延迟/可靠性权衡&#x1f4a1; 权衡决策框架 1.3 运维复杂度评估&#x1f527; 运维成本降低策略 &#x1f3d7;️ 二、典型架构设计2.1 分布式事务最终一致…...

PH热榜 | 2025-06-08

1. Thiings 标语&#xff1a;一套超过1900个免费AI生成的3D图标集合 介绍&#xff1a;Thiings是一个不断扩展的免费AI生成3D图标库&#xff0c;目前已有超过1900个图标。你可以按照主题浏览&#xff0c;生成自己的图标&#xff0c;或者下载整个图标集。所有图标都可以在个人或…...

2025.6.9总结(利与弊)

凡事都有两面性。在大厂上班也不例外。今天找开发定位问题&#xff0c;从一个接口人不断溯源到另一个 接口人。有时候&#xff0c;不知道是谁的责任填。将工作内容分的很细&#xff0c;每个人负责其中的一小块。我清楚的意识到&#xff0c;自己就是个可以随时替换的螺丝钉&…...