当前位置: 首页 > news >正文

kafka数据拉取和发送

文章目录

  • 一、原生 KafkaConsumer
    • 1、pom文件引入kafka
    • 2、拉取数据
    • 3、发送数据
  • 二、在spring boot中使用@KafkaListener
    • 1、添加依赖
    • 2、application.yml
    • 3、消息拉取:consumer
    • 4、自定义ListenerContainerFactory
    • 5、消息发送:producer
    • 6、kafka通过clientId鉴权时的鉴权失败问题

一、原生 KafkaConsumer

1、pom文件引入kafka

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId>
</dependency>

2、拉取数据

简单说只要以下几个步骤:
1、获取kafka地址,并设置Properties
2、获取consumer:KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3、订阅topic:consumer.subscribe(topic);
4、拉取数据:consumer.poll()
5、遍历数据
6、示例:

package com.yogi.test.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;@Component
public class TestMsgConsumer implements InitializingBean {@Value("${test.kafka.address:127.0.0.1:9092}")private String kafkaAddress;@Value("${test.kafka.msg.topic:topic_test_1,topic_test_2}")private String msgTopic;@Value("${test.consumer.name:yogima}")private String consumerGroupId;/*** 消费开关: true-消费,false-暂停消费* 在服务正常停止时用于停止继续消费数据,将缓存中的数据发送完即可*/private Boolean consumeSwitch = true;public void consumerMessage(List<String> topic, String groupId) {LOGGER.info("consumer topic list1:{}",topic.toString());Properties props = new Properties();/*** 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置* 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表*/LOGGER.info("test.kafka.address:{}",kafkaAddress);props.put("bootstrap.servers", kafkaAddress);/*** 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id* 设置一个有业务意义的名字即可*/props.put("group.id", groupId);/*** 自动提交位移*/props.put("enable.auto.commit", Boolean.TRUE);/*** 位移提交超时时间*/props.put("auto.commit.interval.ms", "1000");/*** 从最早的消息开始消费* 1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* 2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据*/props.put("auto.offset.reset", "latest");/*** 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,* Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer* org.apache.kafka.common.serialization.ByteArrayDeserializer* StringDeserializer*/props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/*** 对消息体进行解序列化,与key解序列化类似*/props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完props.put("max.poll.records", "500");//fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。props.put("fetch.message.max.bytes", "300000000");KafkaConsumer<String, String> consumer;try{/*** 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器*/LOGGER.info("start set consumer,props:{}",props.toString());consumer = new KafkaConsumer<>(props);LOGGER.info("set consumer finished");/*** 订阅consumer group需要消费的topic列表*/LOGGER.info("consumer topic list:{}",topic.toString());consumer.subscribe(topic);}catch (Exception e){LOGGER.info("consumer subscribe failed,msg:{},cause:{},e:{}",e.getMessage(),e.getCause(),e);return;}/*** 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,* 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作*/try {while (true) {if (!consumeSwitch) {try {Thread.sleep(30000);} catch (InterruptedException e) {LOGGER.error("err msg:" + e.getMessage());}}/*** 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据* consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));/*** poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法* 返回即认为consumer成功消费了消息*/for (ConsumerRecord<String, String> record : records) {LOGGER.debug("offset = {}, key = {}, value = {}"

相关文章:

kafka数据拉取和发送

文章目录 一、原生 KafkaConsumer1、pom文件引入kafka2、拉取数据3、发送数据二、在spring boot中使用@KafkaListener1、添加依赖2、application.yml3、消息拉取:consumer4、自定义ListenerContainerFactory5、消息发送:producer6、kafka通过clientId鉴权时的鉴权失败问题一、…...

LLM全栈框架完整分类清单(预训练+微调+工具链)

一、预训练框架 1. 大规模分布式训练框架 框架名称核心能力GitHub地址Megatron-LM3D并行训练、FlashAttention支持、Transformer架构优化&#xff08;NVIDIA生态&#xff09;NVIDIA/Megatron-LMDeepSpeedZeRO优化系列、3D并行、RLHF全流程支持&#xff08;微软生态&#xff09…...

蓝桥杯备考:贪心算法之矩阵消除游戏

这道题是牛客上的一道题&#xff0c;它呢和我们之前的排座位游戏非常之相似&#xff0c;但是&#xff0c;排座位问题选择行和列是不会改变元素的值的&#xff0c;这道题呢每每选一行都会把这行或者这列清零&#xff0c;所以我们的策略就是先用二进制把选择所有行的情况全部枚举…...

【Matlab仿真】Matlab Function中如何使用静态变量?

背景 根据Simulink的运行机制&#xff0c;每个采样点会调用一次MATLAB Function的函数&#xff0c;两次调用之间&#xff0c;同一个变量的前次计算的终值如何传递到当前计算周期来&#xff1f;其实可以使用persistent变量实现函数退出和进入时内部变量值的保持。 persistent变…...

DeepSeek 提示词:高效的提示词设计

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…...

深入学习Java中的Lambda表达式

深入学习Java中的Lambda表达式 自Java 8引入以来&#xff0c;Lambda表达式彻底改变了Java的编程风格&#xff0c;让代码变得更加简洁、易读&#xff0c;尤其是在函数式编程的场景中。接下来&#xff0c;我们将深入探讨Lambda表达式的语法、原理以及实际应用&#xff0c;帮助你…...

1.2 AI 量化炒股的起源与发展

**定性价值**&#xff1a;AI量化炒股通过算法模型实现投资决策自动化&#xff0c;显著提升交易效率与风险控制能力&#xff0c;打破传统人工交易的主观性与延迟性&#xff0c;推动金融科技向智能化、数据驱动方向迭代&#xff0c;具有颠覆传统投资模式的战略意义。 **定量价值…...

计算机单位之详解——存储单位Byte 网络传输单位bps 视频码率单位bps

前言&#xff1a; 计算机里面单位有点复杂&#xff0c;容易混淆&#xff0c;很多时候混起来就容易概念不理解&#xff0c;包括一些小问题&#xff0c;比如说&#xff1a;为什么我买了1T硬盘&#xff0c;实际存在虚标。为什么所谓的千兆宽带&#xff0c;下载起来没有1G每秒&…...

IDEA关闭SpringBoot程序后仍然占用端口的排查与解决

IDEA关闭SpringBoot程序后仍然占用端口的排查与解决 问题描述 在使用 IntelliJ IDEA 开发 Spring Boot 应用时&#xff0c;有时即使关闭了应用&#xff0c;程序仍然占用端口&#xff08;例如&#xff1a;4001 端口&#xff09;。这会导致重新启动应用时出现端口被占用的错误&a…...

deepseek清华大学第二版 如何获取 DeepSeek如何赋能职场应用 PDF文档 电子档(附下载)

deepseek清华大学第二版 DeepSeek如何赋能职场 pdf文件完整版下载 https://pan.baidu.com/s/1aQcNS8UleMldcoH0Jc6C6A?pwd1234 提取码: 1234 或 https://pan.quark.cn/s/3ee62050a2ac...

【python随手记】——读取文本文件内容转换为json格式

文章目录 前言一、TXT文件转换为JSON数组1.txt文件内容2.python代码3.输出结果 二、TXT文件转换为JSON对象1.txt文件2.python代码3.输出结果 前言 场景&#xff1a;用于读取包含空格分隔数据的TXT文件&#xff0c;并将其转换为结构化JSON文件 一、TXT文件转换为JSON数组 1.tx…...

k8s集群3主5从高可用架构(kubeadm方式安装k8s)

关键步骤说明 环境准备阶段 系统更新&#xff1a;所有节点执行yum/apt update确保软件包最新时间同步&#xff1a;通过ntpdate time.windows.com或部署NTP服务器网络规划&#xff1a;明确划分Service网段&#xff08;默认10.96.0.0/12&#xff09;和Pod网段&#xff08;如Flann…...

基于 sklearn 的均值偏移聚类算法的应用

基于 sklearn 的均值偏移聚类算法的应用 在机器学习和数据挖掘中&#xff0c;聚类算法是一类非常重要的无监督学习方法。它的目的是将数据集中的数据点划分为若干个类&#xff0c;使得同一类的样本点彼此相似&#xff0c;而不同类的样本点相互之间差异较大。均值偏移聚类&…...

三、大模型微调的多种方法与应用场景

详解大模型微调的多种方法与应用场景 随着大模型的不断发展&#xff0c;如何有效地微调这些庞大的预训练模型以适应特定任务成为了研究和应用中的一个重要问题。大模型微调不仅能够提高任务性能&#xff0c;还能在不同的业务需求中提升模型的适应性。在本文中&#xff0c;我们…...

第2课 树莓派镜像的烧录

树莓派的系统通常是安装在SD卡上的‌。SD卡作为启动设备,负责启动树莓派并加载操作系统。这种设计使得树莓派具有便携性和灵活性,用户可以通过更换SD卡来更换操作系统或恢复出厂设置。 烧录树莓派的镜像即是将树莓派镜像烧录到SD卡上,在此期间会格式化SD卡,如果SD卡…...

SQL之order by盲注

目录 一.order by盲注的原理 二.注入方式 a.布尔盲注 b.时间盲注 三.防御 一.order by盲注的原理 order by子句是用于按指定列排序查询结果&#xff0c;列名或列序号皆可。 order by 后面接的字段或者数字不一样&#xff0c;那么这个数据表的排序就会不同。 order by 盲…...

AI大模型(四)基于Deepseek本地部署实现模型定制与调教

AI大模型&#xff08;四&#xff09;基于Deepseek本地部署实现模型定制与调教 DeepSeek开源大模型在榜单上以黑马之姿横扫多项评测&#xff0c;其社区热度指数暴涨、一跃成为近期内影响力最高的话题&#xff0c;这个来自中国团队的模型向世界证明&#xff1a;让每个普通人都能…...

java后端开发day19--学生管理系统升级

&#xff08;以下内容全部来自上述课程&#xff09; 1.要求及思路 1.总体框架 2.注册 3.登录 4.忘记密码 2.代码 1.javabean public class User1 {private String username;private String password;private String personID;private String phoneNumber;public User1() {…...

MFC文件和注册表的操作

MFC文件和注册表的操作 日志、操作配置文件、ini、注册表、音视频的文件存储 Linux下一切皆文件 C/C操作文件 const char* 与 char* const const char* 常量指针&#xff0c;表示指向的内容为常量。指针可以指向其他变量&#xff0c;但是内容不能再变了 char szName[6]&qu…...

vscode如何使用鼠标滚轮调整字体大小

1.打开设置 2.搜索Font Ligatures 3.编辑配置文件 4.修改代码并保存 修改前 修改后 在最后一行添加&#xff1a;“editor.mouseWheelZoom”: true 记得在上一行最后&#xff0c;加上英文版的“,”逗号 5.配置成功&#xff0c;再次按Ctrl鼠标滚轮便可以缩放了。...

Python爬虫实战:研究MechanicalSoup库相关技术

一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)

HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

从WWDC看苹果产品发展的规律

WWDC 是苹果公司一年一度面向全球开发者的盛会&#xff0c;其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具&#xff0c;对过去十年 WWDC 主题演讲内容进行了系统化分析&#xff0c;形成了这份…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建

制造业采购供应链管理是企业运营的核心环节&#xff0c;供应链协同管理在供应链上下游企业之间建立紧密的合作关系&#xff0c;通过信息共享、资源整合、业务协同等方式&#xff0c;实现供应链的全面管理和优化&#xff0c;提高供应链的效率和透明度&#xff0c;降低供应链的成…...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

连锁超市冷库节能解决方案:如何实现超市降本增效

在连锁超市冷库运营中&#xff0c;高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术&#xff0c;实现年省电费15%-60%&#xff0c;且不改动原有装备、安装快捷、…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...