当前位置: 首页 > news >正文

kafka-消费者服务搭建配置简单消费(SpringBoot整合Kafka)

文章目录

  • 1、使用efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
  • 2、创建生产者发送消息
  • 3、application.yml配置
  • 4、创建消费者监听器
  • 5、创建SpringBoot启动类
  • 6、屏蔽 kafka debug 日志 logback.xml
  • 7、引入spring-kafka依赖

1、使用efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述在这里插入图片描述

2、创建生产者发送消息

[root@localhost ~]# kafka-console-producer.sh --bootstrap-server 192.168.74.148:9095,192.168.748:9096,192.168.74.148:9097 --topic my_topic1
>1
>2
>3
>

在这里插入图片描述

[[{"partition": 1,"offset": 0,"msg": "1","timespan": 1717592203289,"date": "2024-06-05 12:56:43"},{"partition": 1,"offset": 1,"msg": "2","timespan": 1717592204046,"date": "2024-06-05 12:56:44"},{"partition": 1,"offset": 2,"msg": "3","timespan": 1717592204473,"date": "2024-06-05 12:56:44"}]
]

3、application.yml配置

server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # ??????offset# 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

4、创建消费者监听器

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListener {@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")public void onMessage(ConsumerRecord<String, String> record) {System.out.println("消费者获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}

5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

此时启动SpringKafkaConsumerApplication,控制台会打印数据

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = null,value = 1
消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = null,value = 2
消费者获取到消息:topic = my_topic1,partition:1,offset = 2,key = null,value = 3

如果此时重新启动SpringKafkaConsumerApplication,控制台将不会打印数据,因为已经消费过数据

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)

相关文章:

kafka-消费者服务搭建配置简单消费(SpringBoot整合Kafka)

文章目录 1、使用efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本2、创建生产者发送消息3、application.yml配置4、创建消费者监听器5、创建SpringBoot启动类6、屏蔽 kafka debug 日志 logback.xml7、引入spring-kafka依赖 1、使用efak 创建 主题 my_topic1 并…...

C++STL---list常见用法

C STL中的list list是C标准模板库&#xff08;STL&#xff09;中的一个序列容器&#xff0c;它实现了一个双向链表。与vector和deque相比&#xff0c;list支持快速的任意位置插入和删除操作&#xff0c;但不支持快速随机访问。 基本操作 创建和初始化 #include <list> …...

MQTT.FX的使用

背景 在如今物联网的时代下&#xff0c;诞生了许多的物联网产品&#xff0c;这些产品通过BLE、WIFI、4G等各种各样的通信方式讲数据传输到各种各样的平台。 除了各个公司私有的云平台外&#xff0c;更多的初学者会接触到腾讯云、阿里云之类的平台。设备接入方式也有着多种多样…...

SRS、ZLMediakit音视频流媒体服务器

SRS、ZLMediakit都是做为webrtc的SFU&#xff08;selective forward unit&#xff09; WebRTC 开发实践&#xff1a;为什么你需要 SFU 服务器 https://mp.weixin.qq.com/s?__bizMzAxNTc1MjM0Mw&mid2652213442&idx1&sn33f0393a2dbc2b6a39c613bb238ec145&chksm…...

大模型Prompt-Tuning技术进阶

LLM的Prompt-Tuning主流方法 面向超大规模模型的Prompt-Tuning 近两年来&#xff0c;随之Prompt-Tuning技术的发展&#xff0c;有诸多工作发现&#xff0c;对于超过10亿参数量的模型来说&#xff0c;Prompt-Tuning所带来的增益远远高于标准的Fine-tuning&#xff0c;小样本甚至…...

统一响应,自定义校验器,自定义异常,统一异常处理器

文章目录 1.基本准备&#xff08;构建一个SpringBoot模块&#xff09;1.在A_universal_solution模块下创建新的子模块unified-processing2.pom.xml引入基本依赖3.编写springboot启动类4.启动测试 2.统一响应处理1.首先定义一个响应枚举类 RespBeanEnum.java 每个枚举对象都有co…...

完整状态码面试背

{"100": "继续","101": "切换协议","102": "处理中","103": "早期提示","200": "成功","201": "已创建","202": "已接受",&qu…...

QT+FFmpeg+Windows开发环境搭建(加薪点)

01、Windows 环境搭建 FFMPEG官网:http://ffmpeg.org/ 02、下载4.2.1版本源码 源码:https://ffmpeg.org/releases/ffmpeg-4.2.1.tar.bz2 03、下载4.2.1编译好的文件 下载已经编译好的FFMPEG)(迅雷下载很快) 网址:https://ffmpeg.zeranoe.com/builds/ 32位下载地址:(迅雷…...

Linux 主机一键安全整改策略

为防止linux主机被恶意攻击&#xff0c;和受到攻击后能更快定位到源头&#xff0c;需要对linux主机做一些参数配置。 比如禁用root的远程登录、用户多次密码验证失败后被锁、禁止系统账号交互式登录等等。 下面是linux主机安全整改的一些简单介绍&#xff0c;最后会通过脚本一…...

Hot100——二叉树

树的定义&#xff1a; public static class TreeNode{int val;TreeNode left;TreeNode right;TreeNode(){};TreeNode(int val){ this.val val; };TreeNode(int val, TreeNode left, TreeNode right){this.val val;this.left left;this.right right;}} 深度优先遍历&#x…...

C++ static_cast、dynamic_cast、const_cast 和 reinterpret_cast 用处和区别

在 C 中&#xff0c;static_cast、dynamic_cast、const_cast 和 reinterpret_cast 是四种类型转换运算符&#xff0c;它们各自有不同的用途和行为&#xff1a; static_cast 用于编译时已知类型的转换&#xff0c;如基本数据类型转换、派生类到基类的转换、指针和引用的转换等…...

三十七、openlayers官网示例Earthquakes Heatmap解析——在地图上加载热力图

官网demo地址&#xff1a; Earthquakes Heatmap 这篇主要介绍了热力图HeatmapLayer HeatmapLayer 是一个用于在地图上显示热力图的图层类型&#xff0c;通常用于表示地理数据中的密度或强度。例如&#xff0c;它可以用来显示地震、人口密度或其他空间数据的热点区域。在这个示…...

curl 92 HTTP/2 stream 5 was not closed cleanly: CANCEL

source ~/.bash_profile flutter clean Command exited with code 128: git fetch --tags Standard error: 错误&#xff1a;RPC 失败。curl 92 HTTP/2 stream 5 was not closed cleanly: CANCEL (err 8) 错误&#xff1a;预期仍然需要 2737 个字节的正文 fetch-pack: unexpec…...

Spring Security 注册过滤器关键点与最佳实践

在 Spring Security 框架中&#xff0c;注册过滤器是实现身份验证和授权的关键组件。正确配置和使用注册过滤器对于确保应用程序的安全性至关重要。以下是一些关于 Spring Security 注册过滤器的注意事项和最佳实践。 过滤器链顺序&#xff1a; 注册过滤器通常位于过滤器链的末…...

力扣2024.考试的最大困扰度

力扣2024.考试的最大困扰度 注意同时>k才处理 class Solution {public:int maxConsecutiveAnswers(string answerKey, int k) {int n answerKey.size(),res0;unordered_map<int,int> cnt;for(int i0,j0;i<n;i){cnt[answerKey[i] - a] ;while(cnt[T - a] > k …...

java配置文件解析yml/xml/properties文件

XML 以mybatis.xml:获取所有Environment中的数据库并连接session为例 import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.xml.sax.SAXException;import javax.xml.parsers.DocumentBuilder; impo…...

grpc接口调用

grpc接口调用 准备依赖包clientserver 参考博客&#xff1a; Grpc项目集成到java方式调用实践 gRpc入门和springboot整合 java 中使用grpc java调用grpc服务 准备 因为需要生成代码&#xff0c;所以必备插件 安装后重启 依赖包 <?xml version"1.0" encoding&…...

通信技术振幅键控(ASK)调制与解调硬件实验

一、实验目的 1. 掌握用键控法产生ASK信号的方法&#xff1b; 2. 掌握ASK非相干解调的原理。 二、实验内容 1. 观察ASK已调信号的波形&#xff1b; 2. 观察ASK解调信号的波形。 三、实验器材 1. 双踪示波器&#xff1b; 2. 通信原理实验箱信号源模块、③、④、⑦号模块。…...

自动化办公02 用openpyxl库操作excel.xlsx文件(新版本)

目录 一、文件读操作 二、文件写操作 三、修改单元格样式 openpyxl 是一个处理Excel表格的第三方库。openpyxl 库可以处理Excel2010以后的电子表格格式&#xff0c;包括&#xff1a;xlsx/xlsm/xltx/xltm。 openpyxl教程 一、文件读操作 工作簿(workbook): excel文件 工作表…...

用户反馈解决方案 —— 兔小巢构建反馈功能

目录 01: 前言 02: 用户反馈整体实现方案分析 03: 兔小巢全解析 04: 基于兔小巢实现用户反馈 05: 总结 01: 前言 在前台系统中&#xff0c;用户反馈 功能也是一个非常常见的需求。 通过反馈功能&#xff0c;我们可以知道当前的应用存在的一些不足和用户相应的一些诉求。…...

Wan2.2-TI2V-5B:消费级GPU上的720P视频生成革命

Wan2.2-TI2V-5B&#xff1a;消费级GPU上的720P视频生成革命 【免费下载链接】Wan2.2-TI2V-5B Wan2.2-TI2V-5B是一款开源的先进视频生成模型&#xff0c;基于创新的混合专家架构&#xff08;MoE&#xff09;设计&#xff0c;显著提升了视频生成的质量与效率。该模型支持文本生成…...

ModTheSpire技术全解析:从模组加载到高级开发指南

ModTheSpire技术全解析&#xff1a;从模组加载到高级开发指南 【免费下载链接】ModTheSpire External mod loader for Slay The Spire 项目地址: https://gitcode.com/gh_mirrors/mo/ModTheSpire 引言&#xff1a;为何需要模组加载器&#xff1f; 当你在《Slay The Spi…...

NCM格式解密终极指南:三分钟解锁网易云音乐加密文件

NCM格式解密终极指南&#xff1a;三分钟解锁网易云音乐加密文件 【免费下载链接】ncmdump 项目地址: https://gitcode.com/gh_mirrors/ncmd/ncmdump 还在为网易云音乐下载的NCM格式文件无法在其他播放器使用而烦恼吗&#xff1f;ncmdump工具为你提供完整解决方案&#…...

霜儿-汉服-造相Z-Turbo效果可视化:同一提示词不同采样步数质量对比

霜儿-汉服-造相Z-Turbo效果可视化&#xff1a;同一提示词不同采样步数质量对比 1. 引言&#xff1a;为什么关注采样步数&#xff1f; 当你使用AI生成汉服人像时&#xff0c;是否遇到过这样的困惑&#xff1a;同样的提示词&#xff0c;为什么有时候生成的效果惊艳&#xff0c;…...

无需代码:用星图AI云+Clawdbot搭建私有化Qwen3-VL:30B飞书助手

无需代码&#xff1a;用星图AI云Clawdbot搭建私有化Qwen3-VL:30B飞书助手 1. 项目概述与价值 1.1 为什么选择这个方案 在当今企业办公场景中&#xff0c;智能助手已经成为提升效率的关键工具。但大多数方案存在两个痛点&#xff1a;要么功能单一&#xff08;仅支持文本&…...

效果实测:EagleEye(DAMO-YOLO)在多种场景下的目标检测表现

效果实测&#xff1a;EagleEye(DAMO-YOLO)在多种场景下的目标检测表现 想了解一个号称“毫秒级”响应的目标检测模型&#xff0c;在实际使用中到底有多快、多准吗&#xff1f;今天&#xff0c;我们不谈复杂的部署步骤&#xff0c;也不讲深奥的技术原理&#xff0c;就单纯来看看…...

别再只盯着report_timing了!DC综合后,用report_constraint -all_violation全面排查时序与DRC违规(附实战解读)

别再只盯着report_timing了&#xff01;DC综合后全面排查时序与DRC违规的实战指南 在数字IC设计流程中&#xff0c;Design Compiler&#xff08;DC&#xff09;综合后的时序分析环节往往让工程师们又爱又恨。面对密密麻麻的违规报告&#xff0c;新手工程师常陷入两个极端&#…...

【 Postman 使用教程】

一、接口测试介绍 1. 接口分类&#xff1a; 内部接口&#xff1a;系统内部各功能模块之间的接口&#xff08;测试比较详细&#xff09;外部接口&#xff1a;系统与外部系统之间的接口&#xff08;测试基本功能&#xff09; 2. 接口测试的重点&#xff1a; 测试接口数据交换是否…...

OpenClaw排错大全:Phi-3-mini-128k-instruct接口连接失败7种解决方案

OpenClaw排错大全&#xff1a;Phi-3-mini-128k-instruct接口连接失败7种解决方案 1. 问题背景与排查思路 上周我在本地部署Phi-3-mini-128k-instruct模型时&#xff0c;遇到了OpenClaw连接失败的棘手问题。控制台不断报错"Model connection timeout"&#xff0c;但…...

掌握Blender 3MF插件:5大核心场景的全流程解决方案

掌握Blender 3MF插件&#xff1a;5大核心场景的全流程解决方案 【免费下载链接】Blender3mfFormat Blender add-on to import/export 3MF files 项目地址: https://gitcode.com/gh_mirrors/bl/Blender3mfFormat Blender 3MF插件作为连接3D建模与3D打印的关键桥梁&#x…...