当前位置: 首页 > article >正文

kafka数据拉取和发送

文章目录

  • 一、原生 KafkaConsumer
    • 1、pom文件引入kafka
    • 2、拉取数据
    • 3、发送数据
  • 二、在spring boot中使用@KafkaListener
    • 1、添加依赖
    • 2、application.yml
    • 3、消息拉取:consumer
    • 4、自定义ListenerContainerFactory
    • 5、消息发送:producer
    • 6、kafka通过clientId鉴权时的鉴权失败问题

一、原生 KafkaConsumer

1、pom文件引入kafka

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId>
</dependency>

2、拉取数据

简单说只要以下几个步骤:
1、获取kafka地址,并设置Properties
2、获取consumer:KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3、订阅topic:consumer.subscribe(topic);
4、拉取数据:consumer.poll()
5、遍历数据
6、示例:

package com.yogi.test.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;@Component
public class TestMsgConsumer implements InitializingBean {@Value("${test.kafka.address:127.0.0.1:9092}")private String kafkaAddress;@Value("${test.kafka.msg.topic:topic_test_1,topic_test_2}")private String msgTopic;@Value("${test.consumer.name:yogima}")private String consumerGroupId;/*** 消费开关: true-消费,false-暂停消费* 在服务正常停止时用于停止继续消费数据,将缓存中的数据发送完即可*/private Boolean consumeSwitch = true;public void consumerMessage(List<String> topic, String groupId) {LOGGER.info("consumer topic list1:{}",topic.toString());Properties props = new Properties();/*** 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置* 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表*/LOGGER.info("test.kafka.address:{}",kafkaAddress);props.put("bootstrap.servers", kafkaAddress);/*** 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id* 设置一个有业务意义的名字即可*/props.put("group.id", groupId);/*** 自动提交位移*/props.put("enable.auto.commit", Boolean.TRUE);/*** 位移提交超时时间*/props.put("auto.commit.interval.ms", "1000");/*** 从最早的消息开始消费* 1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* 2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据*/props.put("auto.offset.reset", "latest");/*** 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,* Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer* org.apache.kafka.common.serialization.ByteArrayDeserializer* StringDeserializer*/props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/*** 对消息体进行解序列化,与key解序列化类似*/props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完props.put("max.poll.records", "500");//fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。props.put("fetch.message.max.bytes", "300000000");KafkaConsumer<String, String> consumer;try{/*** 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器*/LOGGER.info("start set consumer,props:{}",props.toString());consumer = new KafkaConsumer<>(props);LOGGER.info("set consumer finished");/*** 订阅consumer group需要消费的topic列表*/LOGGER.info("consumer topic list:{}",topic.toString());consumer.subscribe(topic);}catch (Exception e){LOGGER.info("consumer subscribe failed,msg:{},cause:{},e:{}",e.getMessage(),e.getCause(),e);return;}/*** 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,* 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作*/try {while (true) {if (!consumeSwitch) {try {Thread.sleep(30000);} catch (InterruptedException e) {LOGGER.error("err msg:" + e.getMessage());}}/*** 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据* consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));/*** poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法* 返回即认为consumer成功消费了消息*/for (ConsumerRecord<String, String> record : records) {LOGGER.debug("offset = {}, key = {}, value = {}"

相关文章:

kafka数据拉取和发送

文章目录 一、原生 KafkaConsumer1、pom文件引入kafka2、拉取数据3、发送数据二、在spring boot中使用@KafkaListener1、添加依赖2、application.yml3、消息拉取:consumer4、自定义ListenerContainerFactory5、消息发送:producer6、kafka通过clientId鉴权时的鉴权失败问题一、…...

多智能体框架

多个不同的角色的Agent&#xff0c;共同完成一份复杂的工作。由一个统筹管理的智能体&#xff0c;自主规划多个智能体分别做什么&#xff0c;以及执行的顺序。 agent 应该包含的属性 执行特定任务 根据其角色和目标做出决策 能够使用工具来实现目标 与其他代理沟通和协作 保留…...

C++ 正则表达式分组捕获入门指南

在 C 中&#xff0c;正则表达式&#xff08;regex&#xff09;是一种用于匹配字符串模式的强大工具。正则表达式不仅能帮助你查找符合特定模式的字符&#xff0c;还能捕获匹配的子字符串&#xff08;即分组捕获&#xff09;。这篇文章将介绍 C 正则表达式中的分组捕获机制&…...

C#中级教程(1)——解锁 C# 编程的调试与错误处理秘籍

一、认识错误&#xff1a;编程路上的 “绊脚石” 在 C# 编程中&#xff0c;错误大致可分为两类&#xff1a;语法错误和语义错误&#xff08;逻辑错误&#xff09;。语法错误就像是写作文时的错别字和病句&#xff0c;编译器一眼就能识别出来&#xff0c;比如变量名拼写错误、符…...

Jmeter接口并发测试

Apache JMeter 是一款开源的性能测试工具&#xff0c;广泛用于接口并发测试、负载测试和压力测试。以下是使用 JMeter 进行接口并发测试的详细步骤&#xff1a; 一、准备工作 安装 JMeter 下载地址&#xff1a;Apache JMeter 官网 确保已安装 Java 环境&#xff08;JMeter 依…...

MySQL-增删改查

一、Create(创建) &#x1f4d6; 语法&#xff1a; INSERT INTO table_name(value_list); 当我们使用表的时候&#xff0c;就可以使用这个语法来向表中插入元素~ 我们这边创建一个用于示范的表(Student)~ create table student( id int, name varchar(20), chinese int, math…...

开源堡垒机 JumpServer 社区版实战教程:发布机的配置与Website资产配置使用

文章目录 开源堡垒机 JumpServer 社区版实战教程&#xff1a;发布机的配置与Website资产配置使用一、功能简述二、应用发布机2.1 版本要求2.2 创建应用发布机2.2.1 通过WinRM的协议进行应用发布机的创建2.2.2 通过OpenSSH的协议进行应用发布机的创建2.2.2.1 下载OpenSSH2.2.2.2…...

【STM32】使用电打火器测试火焰传感器,去掉传感器LED依然亮

项目需求&#xff1a;火焰传感器识别到火焰后&#xff0c;LED灯闪烁&#xff0c;然后熄灭。 现象描述&#xff1a;不需要火焰传感器&#xff0c;当使用电打火器时电路板LED灯也会闪烁。&#xff08;详情看底部视频&#xff09; fire.h #ifndef __FIRE_H #define __FIRE_H …...

代码随想录算法训练day64---图论系列8《拓扑排序dijkstra(朴素版)》

代码随想录算法训练 —day64 文章目录 代码随想录算法训练前言一、53. 117. 软件构建—拓扑排序二、47. 参加科学大会---dijkstra&#xff08;朴素版&#xff09;总结 前言 今天是算法营的第64天&#xff0c;希望自己能够坚持下来&#xff01; 今天继续图论part&#xff01;今…...

机器学习数学基础:32.斯皮尔曼等级相关

斯皮尔曼等级相关教程 一、定义与原理 斯皮尔曼等级相关系数&#xff08;Spearman’s rank - correlation coefficient&#xff09;&#xff0c;常用 ρ \rho ρ表示&#xff0c;是一种非参数统计量&#xff0c;用于衡量两个变量的等级之间的关联程度。它基于变量的秩次&…...

《论区块链技术及应用》审题技巧 - 系统架构设计师

区块链技术及应用论题写作框架 一、考点概述 本论题“区块链技术及应用”主要考察软件测试工程师对区块链技术的理解及其在软件项目中的实际应用能力。论题涵盖了多个关键方面&#xff0c;首先要求考生对区块链技术有全面的认识&#xff0c;包括但不限于其作为分布式记账技术…...

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷(四)

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷&#xff08;四&#xff09; 第一部分&#xff1a;网络平台搭建与设备安全防护任务书第二部分&#xff1a;网络安全事件响应、数字取证调查、应用程序安全任务书任务 1&#xff1a;应急响应&…...

单片机的串口(USART)

Tx - 数据的发送引脚&#xff0c;Rx - 数据的接受引脚。 串口的数据帧格式 空闲状态高电平&#xff0c;起始位低电平&#xff0c;数据位有8位校验位&#xff0c;9位校验位&#xff0c;停止位是高电平保持一位或者半位&#xff0c;又或者两位的状态。 8位无校验位传输一个字节…...

Modelfile配置说明

参数说明翻译 参数描述值类型示例用法mirostat启用Mirostat采样以控制困惑度。&#xff08;默认&#xff1a;0&#xff0c;0禁用&#xff0c;1Mirostat&#xff0c;2Mirostat 2.0&#xff09;intmirostat 0mirostat_eta影响算法对生成文本反馈的响应速度。较低的学习率将导致调…...

pnpm的基本用法

以下是 pnpm 的核心命令和使用指南&#xff0c;涵盖从安装依赖到项目管理的常见操作&#xff1a; 1. 基础命令 (1) 安装依赖 pnpm install # 安装 package.json 中的所有依赖 pnpm install <包名> # 安装指定包&#xff08;自动添加到 dependencies&#xf…...

动态规划(背包问题)--是否逆序使用的问题--二进制拆分的问题

动态规划&#xff08;背包问题&#xff09; 题目链接01背包代码 完全背包问题代码 多重背包问题 I代码 什么时候适用逆序多重背包问题 II&#xff08;超百万级的复杂度&#xff09;代码 关于二进制拆分 题目链接 01背包 代码 #include <iostream> #include <vector&…...

Vue 中动态实现进度条

在 Vue 中动态实现进度条&#xff0c;基本上有两种常见的方法&#xff1a;直接通过 Vue 数据绑定控制样式&#xff0c;或者利用外部库来实现更复杂的功能。我们会深入探讨这两种方式&#xff0c;并且详细说明每种方法的实现步骤、优缺点以及使用场景。 1. 使用 Vue 数据绑定来…...

如何基于PyTorch做二次开发

基于PyTorch进行二次开发以实现可视化工程&#xff0c;可以从以下几个方面入手&#xff1a;模型结构可视化、训练过程监控、特征可视化等。以下是一些推荐的GitHub项目&#xff0c;这些项目可以帮助你快速搭建一个可视化的工程环境&#xff1a; ### 1. **PyTorch CNN Visualiz…...

Mac 版 本地部署deepseek ➕ RAGflow 知识库搭建流程分享(附问题解决方法)

安装&#xff1a; 1、首先按照此视频的流程一步一步进行安装&#xff1a;(macos版&#xff09;ragflowdeepseek 私域知识库搭建流程分享_哔哩哔哩_bilibili 2、RAGflow 官网文档指南&#xff1a;https://ragflow.io 3、RAGflow 下载地址&#xff1a;https://github.com/infi…...

算法——后缀平衡树

先回想一下之前讨论的内容。之前我们详细讨论了后缀树&#xff0c;包括它的构建、应用以及相关算法。用户可能是在了解后缀树之后&#xff0c;想要进一步探索相关的数据结构&#xff0c;或者是想比较后缀树和后缀平衡树的异同。 后缀平衡树并不是一个常见的数据结构名称&#…...

姿态矩阵/旋转矩阵/反对称阵

物理意义&#xff0c;端点矢量角速率叉乘本身向量&#xff1b; 负号是动系b看固定系i是相反的&#xff1b; 一个固定 在惯性导航解算中&#xff0c;旋转矢量的叉乘用于描述姿态矩阵的微分方程。你提到的公式中&#xff0c; ω i b b \boldsymbol{\omega}_{ib}^b \times ωibb…...

【大语言模型】【整合版】DeepSeek 模型提示词学习笔记(散装的可以看我之前的学习笔记,这里只是归纳与总结了一下思路,内容和之前发的差不多)

以下是个人笔记的正文内容: 原文在FlowUs知识库上&#xff0c;如下截图。里面内容和这里一样&#xff0c;知识排版好看一点 一、什么是 DeepSeek 1. DeepSeek 简介 DeepSeek 是一家专注于通用人工智能&#xff08;AGI&#xff09;的中国科技公司&#xff0c;主攻大模型研发与…...

ollama无法通过IP:11434访问

目录 1.介绍 2.直接在ollama的当前命令窗口中修改&#xff08;法1&#xff09; 3.更改ollama配置文件&#xff08;法2&#xff09; 3.1更新配置 3.2重启服务 1.介绍 ollama下载后默认情况下都是直接在本地的11434端口中运行&#xff0c;绑定到127.0.0.1(localhost)&#x…...

⭐算法OJ⭐位操作用法总结+实战指南(C++实现)

位操作在OJ 题目中是一种非常高效的工具&#xff0c;常用于优化时间复杂度和空间复杂度。本文是位操作在 OJ 题目中的主要用法总结&#xff0c;并以 C 实现为例。 相关题目&#xff1a;《C⭐算法OJ⭐Single Number 系列&#xff08;位操作&#xff09;》 文章目录 1. 基本位操…...

2.1 用大模型构建新人答疑机器人-大模型ACP模拟题-真题

真题 真题&#xff1a;如何初始化OpenAI客户端 client OpenAI( api_keyos.getenv("DASHSCOPE_API_KEY"), base_url"https://dashscope.aliyuncs.com/compatible-mode/v1", ) AI生成模拟题 一、单选题 &#xff08;每题5分&#xff0c;共6题&#xff…...

单片机裸机编程-时机管理

对于 RTOS 实时操作系统&#xff0c;我们是通过 TASK&#xff08;任务&#xff09;进行底层操作的&#xff0c;这与裸机编程中的函数&#xff08;fun&#xff09;类似。不同的任务或函数实现不同的功能&#xff0c;在RTOS中&#xff0c;单片机有信号量、队列等不同任务之间的通…...

Bugku CTF CRYPTO

Bugku CTF CRYPTO 文章目录 Bugku CTF CRYPTO聪明的小羊ok[-<>]散乱的密文.!? 聪明的小羊 描 述: 一只小羊翻过了2个栅栏 fa{fe13f590lg6d46d0d0} 分 析&#xff1a;栅栏密码&#xff0c;分2栏&#xff0c;一个栏里有11个 ①手动解密 f a { f e 1 3 f 5 9 0 l g 6 d 4 …...

【洛谷】【ARC100E】Or Plus Max(高维前缀和)

传送门&#xff1a;Or Plus Max 高维前缀和 题目描述 長さ 2N の整数列 A0​, A1​, ..., A2N−1​ があります。&#xff08;添字が 0 から始まることに注意&#xff09; 1 ≤ K ≤ 2N−1 を満たすすべての整数 K について、次の問題を解いてください。 i,j を整数と…...

宿主机的 root 是否等于 Docker 容器的 root?

在 Docker 容器化技术中&#xff0c;宿主机的 root 和 容器的 root 并不完全相同&#xff0c;尽管它们都称作 “root 用户”。这里需要明确的是&#xff0c;Docker 容器与宿主机之间存在隔离机制&#xff0c;容器内的 root 用户和宿主机的 root 用户有一些关键的区别。 1. 宿主…...

SmolLM2:多阶段训练策略优化和高质量数据集,小型语言模型同样可以实现卓越的性能表现

SmolLM2 采用创新的四阶段训练策略&#xff0c;在仅使用 1.7B 参数的情况下&#xff0c;成功挑战了大型语言模型的性能边界&#xff1a; 在 MMLU-Pro 等测试中超越 Qwen2.5-1.5B 近 6 个百分点数学推理能力&#xff08;GSM8K、MATH&#xff09;优于 Llama3.2-1B在代码生成和文…...