当前位置: 首页 > news >正文

kafka数据拉取和发送

文章目录

  • 一、原生 KafkaConsumer
    • 1、pom文件引入kafka
    • 2、拉取数据
    • 3、发送数据
  • 二、在spring boot中使用@KafkaListener
    • 1、添加依赖
    • 2、application.yml
    • 3、消息拉取:consumer
    • 4、自定义ListenerContainerFactory
    • 5、消息发送:producer
    • 6、kafka通过clientId鉴权时的鉴权失败问题

一、原生 KafkaConsumer

1、pom文件引入kafka

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId>
</dependency>

2、拉取数据

简单说只要以下几个步骤:
1、获取kafka地址,并设置Properties
2、获取consumer:KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3、订阅topic:consumer.subscribe(topic);
4、拉取数据:consumer.poll()
5、遍历数据
6、示例:

package com.yogi.test.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;@Component
public class TestMsgConsumer implements InitializingBean {@Value("${test.kafka.address:127.0.0.1:9092}")private String kafkaAddress;@Value("${test.kafka.msg.topic:topic_test_1,topic_test_2}")private String msgTopic;@Value("${test.consumer.name:yogima}")private String consumerGroupId;/*** 消费开关: true-消费,false-暂停消费* 在服务正常停止时用于停止继续消费数据,将缓存中的数据发送完即可*/private Boolean consumeSwitch = true;public void consumerMessage(List<String> topic, String groupId) {LOGGER.info("consumer topic list1:{}",topic.toString());Properties props = new Properties();/*** 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置* 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表*/LOGGER.info("test.kafka.address:{}",kafkaAddress);props.put("bootstrap.servers", kafkaAddress);/*** 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id* 设置一个有业务意义的名字即可*/props.put("group.id", groupId);/*** 自动提交位移*/props.put("enable.auto.commit", Boolean.TRUE);/*** 位移提交超时时间*/props.put("auto.commit.interval.ms", "1000");/*** 从最早的消息开始消费* 1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* 2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据*/props.put("auto.offset.reset", "latest");/*** 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,* Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer* org.apache.kafka.common.serialization.ByteArrayDeserializer* StringDeserializer*/props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/*** 对消息体进行解序列化,与key解序列化类似*/props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完props.put("max.poll.records", "500");//fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。props.put("fetch.message.max.bytes", "300000000");KafkaConsumer<String, String> consumer;try{/*** 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器*/LOGGER.info("start set consumer,props:{}",props.toString());consumer = new KafkaConsumer<>(props);LOGGER.info("set consumer finished");/*** 订阅consumer group需要消费的topic列表*/LOGGER.info("consumer topic list:{}",topic.toString());consumer.subscribe(topic);}catch (Exception e){LOGGER.info("consumer subscribe failed,msg:{},cause:{},e:{}",e.getMessage(),e.getCause(),e);return;}/*** 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,* 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作*/try {while (true) {if (!consumeSwitch) {try {Thread.sleep(30000);} catch (InterruptedException e) {LOGGER.error("err msg:" + e.getMessage());}}/*** 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据* consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));/*** poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法* 返回即认为consumer成功消费了消息*/for (ConsumerRecord<String, String> record : records) {LOGGER.debug("offset = {}, key = {}, value = {}"

相关文章:

kafka数据拉取和发送

文章目录 一、原生 KafkaConsumer1、pom文件引入kafka2、拉取数据3、发送数据二、在spring boot中使用@KafkaListener1、添加依赖2、application.yml3、消息拉取:consumer4、自定义ListenerContainerFactory5、消息发送:producer6、kafka通过clientId鉴权时的鉴权失败问题一、…...

LLM全栈框架完整分类清单(预训练+微调+工具链)

一、预训练框架 1. 大规模分布式训练框架 框架名称核心能力GitHub地址Megatron-LM3D并行训练、FlashAttention支持、Transformer架构优化&#xff08;NVIDIA生态&#xff09;NVIDIA/Megatron-LMDeepSpeedZeRO优化系列、3D并行、RLHF全流程支持&#xff08;微软生态&#xff09…...

蓝桥杯备考:贪心算法之矩阵消除游戏

这道题是牛客上的一道题&#xff0c;它呢和我们之前的排座位游戏非常之相似&#xff0c;但是&#xff0c;排座位问题选择行和列是不会改变元素的值的&#xff0c;这道题呢每每选一行都会把这行或者这列清零&#xff0c;所以我们的策略就是先用二进制把选择所有行的情况全部枚举…...

【Matlab仿真】Matlab Function中如何使用静态变量?

背景 根据Simulink的运行机制&#xff0c;每个采样点会调用一次MATLAB Function的函数&#xff0c;两次调用之间&#xff0c;同一个变量的前次计算的终值如何传递到当前计算周期来&#xff1f;其实可以使用persistent变量实现函数退出和进入时内部变量值的保持。 persistent变…...

DeepSeek 提示词:高效的提示词设计

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…...

深入学习Java中的Lambda表达式

深入学习Java中的Lambda表达式 自Java 8引入以来&#xff0c;Lambda表达式彻底改变了Java的编程风格&#xff0c;让代码变得更加简洁、易读&#xff0c;尤其是在函数式编程的场景中。接下来&#xff0c;我们将深入探讨Lambda表达式的语法、原理以及实际应用&#xff0c;帮助你…...

1.2 AI 量化炒股的起源与发展

**定性价值**&#xff1a;AI量化炒股通过算法模型实现投资决策自动化&#xff0c;显著提升交易效率与风险控制能力&#xff0c;打破传统人工交易的主观性与延迟性&#xff0c;推动金融科技向智能化、数据驱动方向迭代&#xff0c;具有颠覆传统投资模式的战略意义。 **定量价值…...

计算机单位之详解——存储单位Byte 网络传输单位bps 视频码率单位bps

前言&#xff1a; 计算机里面单位有点复杂&#xff0c;容易混淆&#xff0c;很多时候混起来就容易概念不理解&#xff0c;包括一些小问题&#xff0c;比如说&#xff1a;为什么我买了1T硬盘&#xff0c;实际存在虚标。为什么所谓的千兆宽带&#xff0c;下载起来没有1G每秒&…...

IDEA关闭SpringBoot程序后仍然占用端口的排查与解决

IDEA关闭SpringBoot程序后仍然占用端口的排查与解决 问题描述 在使用 IntelliJ IDEA 开发 Spring Boot 应用时&#xff0c;有时即使关闭了应用&#xff0c;程序仍然占用端口&#xff08;例如&#xff1a;4001 端口&#xff09;。这会导致重新启动应用时出现端口被占用的错误&a…...

deepseek清华大学第二版 如何获取 DeepSeek如何赋能职场应用 PDF文档 电子档(附下载)

deepseek清华大学第二版 DeepSeek如何赋能职场 pdf文件完整版下载 https://pan.baidu.com/s/1aQcNS8UleMldcoH0Jc6C6A?pwd1234 提取码: 1234 或 https://pan.quark.cn/s/3ee62050a2ac...

【python随手记】——读取文本文件内容转换为json格式

文章目录 前言一、TXT文件转换为JSON数组1.txt文件内容2.python代码3.输出结果 二、TXT文件转换为JSON对象1.txt文件2.python代码3.输出结果 前言 场景&#xff1a;用于读取包含空格分隔数据的TXT文件&#xff0c;并将其转换为结构化JSON文件 一、TXT文件转换为JSON数组 1.tx…...

k8s集群3主5从高可用架构(kubeadm方式安装k8s)

关键步骤说明 环境准备阶段 系统更新&#xff1a;所有节点执行yum/apt update确保软件包最新时间同步&#xff1a;通过ntpdate time.windows.com或部署NTP服务器网络规划&#xff1a;明确划分Service网段&#xff08;默认10.96.0.0/12&#xff09;和Pod网段&#xff08;如Flann…...

基于 sklearn 的均值偏移聚类算法的应用

基于 sklearn 的均值偏移聚类算法的应用 在机器学习和数据挖掘中&#xff0c;聚类算法是一类非常重要的无监督学习方法。它的目的是将数据集中的数据点划分为若干个类&#xff0c;使得同一类的样本点彼此相似&#xff0c;而不同类的样本点相互之间差异较大。均值偏移聚类&…...

三、大模型微调的多种方法与应用场景

详解大模型微调的多种方法与应用场景 随着大模型的不断发展&#xff0c;如何有效地微调这些庞大的预训练模型以适应特定任务成为了研究和应用中的一个重要问题。大模型微调不仅能够提高任务性能&#xff0c;还能在不同的业务需求中提升模型的适应性。在本文中&#xff0c;我们…...

第2课 树莓派镜像的烧录

树莓派的系统通常是安装在SD卡上的‌。SD卡作为启动设备,负责启动树莓派并加载操作系统。这种设计使得树莓派具有便携性和灵活性,用户可以通过更换SD卡来更换操作系统或恢复出厂设置。 烧录树莓派的镜像即是将树莓派镜像烧录到SD卡上,在此期间会格式化SD卡,如果SD卡…...

SQL之order by盲注

目录 一.order by盲注的原理 二.注入方式 a.布尔盲注 b.时间盲注 三.防御 一.order by盲注的原理 order by子句是用于按指定列排序查询结果&#xff0c;列名或列序号皆可。 order by 后面接的字段或者数字不一样&#xff0c;那么这个数据表的排序就会不同。 order by 盲…...

AI大模型(四)基于Deepseek本地部署实现模型定制与调教

AI大模型&#xff08;四&#xff09;基于Deepseek本地部署实现模型定制与调教 DeepSeek开源大模型在榜单上以黑马之姿横扫多项评测&#xff0c;其社区热度指数暴涨、一跃成为近期内影响力最高的话题&#xff0c;这个来自中国团队的模型向世界证明&#xff1a;让每个普通人都能…...

java后端开发day19--学生管理系统升级

&#xff08;以下内容全部来自上述课程&#xff09; 1.要求及思路 1.总体框架 2.注册 3.登录 4.忘记密码 2.代码 1.javabean public class User1 {private String username;private String password;private String personID;private String phoneNumber;public User1() {…...

MFC文件和注册表的操作

MFC文件和注册表的操作 日志、操作配置文件、ini、注册表、音视频的文件存储 Linux下一切皆文件 C/C操作文件 const char* 与 char* const const char* 常量指针&#xff0c;表示指向的内容为常量。指针可以指向其他变量&#xff0c;但是内容不能再变了 char szName[6]&qu…...

vscode如何使用鼠标滚轮调整字体大小

1.打开设置 2.搜索Font Ligatures 3.编辑配置文件 4.修改代码并保存 修改前 修改后 在最后一行添加&#xff1a;“editor.mouseWheelZoom”: true 记得在上一行最后&#xff0c;加上英文版的“,”逗号 5.配置成功&#xff0c;再次按Ctrl鼠标滚轮便可以缩放了。...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)

文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

爬虫基础学习day2

# 爬虫设计领域 工商&#xff1a;企查查、天眼查短视频&#xff1a;抖音、快手、西瓜 ---> 飞瓜电商&#xff1a;京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空&#xff1a;抓取所有航空公司价格 ---> 去哪儿自媒体&#xff1a;采集自媒体数据进…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习

禁止商业或二改转载&#xff0c;仅供自学使用&#xff0c;侵权必究&#xff0c;如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)

在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马&#xff08;服务器方面的&#xff09;的原理&#xff0c;连接&#xff0c;以及各种木马及连接工具的分享 文件木马&#xff1a;https://w…...

JVM 内存结构 详解

内存结构 运行时数据区&#xff1a; Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器&#xff1a; ​ 线程私有&#xff0c;程序控制流的指示器&#xff0c;分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 ​ 每个线程都有一个程序计数…...

【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)

LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 题目描述解题思路Java代码 题目描述 题目链接&#xff1a;LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容&#xff1b;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容&#xff08;CL&#xff09;与匹配电容&#xff08;CL1、CL2&#xff09;的关系 2. 如何选择 CL1 和 CL…...

MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用

文章目录 一、背景知识&#xff1a;什么是 B-Tree 和 BTree&#xff1f; B-Tree&#xff08;平衡多路查找树&#xff09; BTree&#xff08;B-Tree 的变种&#xff09; 二、结构对比&#xff1a;一张图看懂 三、为什么 MySQL InnoDB 选择 BTree&#xff1f; 1. 范围查询更快 2…...

探索Selenium:自动化测试的神奇钥匙

目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...