当前位置: 首页 > news >正文

spring-boot2.x整合Kafka步骤

1.pom依赖添加

<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version><relativePath /> <!-- lookup parent from repository --></parent><dependencies><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId></exclusion><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context-support</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>

2. 配置文件添加配置:

server:port: 8080spring:application:name: application-kafkakafka:bootstrap-servers: 192.168.190.100:9092,192.168.190.101:9092 #这个是kafka的地址,对应你server.properties中配置的producer:batch-size: 16384 #批量大小acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)retries: 10 # 消息发送重试次数#transaction-id-prefix: transactionbuffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger:ms: 2000 #提交延迟#partitioner: #指定分区器#class: com.test.config.CustomerPartitionHandlerconsumer:group-id: testGroup,testg2 #默认的消费组IDenable-auto-commit: true #是否自动提交offsetauto-commit-interval: 2000 #提交offset延时# 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: latestmax-poll-records: 500 #单次拉取消息的最大条数key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session:timeout:ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)request:timeout:ms: 18000 # 消费请求的超时时间listener:missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
#      type: batchlogging:config: classpath:log4j2.xml

3. 日志配置

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO"><!--全局参数--><Properties><Property name="pattern">%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n</Property>
<!--         <Property name="logDir">/data/logs/logViewer</Property> --><Property name="logDir">logs</Property></Properties><Appenders><!-- 定义输出到控制台 --><Console name="console" target="SYSTEM_OUT" follow="true"><!--控制台只输出level及以上级别的信息-->
<!--             <ThresholdFilter level="debug" onMatch="ACCEPT" onMismatch="DENY"/> --><PatternLayout><Pattern>${pattern}</Pattern></PatternLayout></Console><!-- 同一来源的Appender可以定义多个RollingFile,定义按天存储日志 --><RollingFile name="rolling_file"fileName="${logDir}/logViewer.log"filePattern="${logDir}/logViewer_%d{yyyy-MM-dd}.log"><ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/><PatternLayout><Pattern>${pattern}</Pattern></PatternLayout><Policies><TimeBasedTriggeringPolicy interval="1"/></Policies><!-- 日志保留策略,配置只保留七天 --><DefaultRolloverStrategy><Delete basePath="${logDir}/" maxDepth="1"><IfFileName glob="logViewer_*.log" /><IfLastModified age="7d" /></Delete></DefaultRolloverStrategy></RollingFile></Appenders><Loggers><Root level="INFO"><AppenderRef ref="console"/><AppenderRef ref="rolling_file"/></Root></Loggers>
</Configuration>

4. controller入口类,其它应用通过该接口直接将数据写入kafka

@RequestMapping(value="/kafka")
@Controller
public class ProducerController {@Autowiredprivate KafkaTemplate kafkaTemplate;//    模拟发送消息@RequestMapping(value = "/send",method = RequestMethod.GET)public String sendMessage(@PathParam(value = "msg") String msg) {System.out.println("收到get请求。。。");kafkaTemplate.send("test",msg);return "成功";}

5. kafka回调方法(需要回调通知时使用该方式):

    @GetMapping("/kafka/callbackTwo/{message}")public void sendCallbackTwoMessage(@PathVariable("message") String message) {kafkaTemplate.send("test", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败2:"+throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}});}

6.kafka消费者注册

@Component
public class KafkaMessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);/*** containerFactory* 消息过滤器消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。* @param record*/@KafkaListener(topics = {"test","test2"},groupId = "testGroup")public void listenTestStatus(ConsumerRecord<?, ?> record) {LOGGER.info("接收到消息:开始业务处理。。。。。");if (null == record || null == record.value()) {LOGGER.info("接收到空数据,跳过");}else {LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString());}}@KafkaListener(topics = {"test"},groupId = "testg2")public void listenTest2(ConsumerRecord<?, ?> record) {LOGGER.info("###listenTest2接收到消息:开始业务处理。。。。。");if (null == record || null == record.value()) {LOGGER.info("接收到空数据,跳过");}else {LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString());}}/*** id:消费者IDgroupId:消费组IDtopics:监听的topic,可监听多个topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区* @param records*///批量消费@KafkaListener(id = "consumer2", topics = {"test"}, groupId = "testGroup",errorHandler = "consumerAwareErrorHandler")public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println(record.value());}}

7.非spring-boot环境下使用java原生API手写kafka生产消息:

 public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
//        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");}//消息处理完才停止发送者。producer.close();}

8.非spring-boot环境下使用java原生API手写java手写kafka消费者:

 public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test"));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}

9.非spring-boot环境下使用java原生API手写异步发送kafka:

 public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);//Part3:发送消息//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();//消息处理完才停止发送者。producer.close();}

相关文章:

spring-boot2.x整合Kafka步骤

1.pom依赖添加 <properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</ma…...

信创学习笔记(四),信创之数据库DB思维导图

创作不易 只因热爱!! 热衷分享&#xff0c;一起成长! “你的鼓励就是我努力付出的动力” 一. 信创学习回顾 1.信创内容 信创内容思维导图 2.信创之CPU芯片架构 信创之CPU芯片架构思维导图 3.信创之操作系统OS 信创之操作系统OS思维导图 二. 信创之国产数据库DB思维导图 …...

SCP 使用教程

SCP&#xff08;Secure Copy Protocol&#xff09;是一种通过加密的方式在本地主机和远程主机之间安全地传输文件的协议。它是基于SSH协议的扩展&#xff0c;允许用户在不同主机之间进行文件复制和传输&#xff0c;是Linux和Unix系统中常用的工具之一。本教程将详细介绍SCP的基…...

python自动化之用flask校验接口token(把token作为参数)

用到的库&#xff1a;flask 实现效果: 写一个接口&#xff0c;需要token正确才能登录 代码&#xff1a; # 导包 from flask import Flask,request,jsonify,json # 创建一个服务 appFlask(__name__) # post请求&#xff0c;路径&#xff1a;/query app.route(/query, met…...

旗晟巡检机器人的应用场景有哪些?

巡检机器人作为现代科技的杰出成果&#xff0c;已广泛应用于各个关键场景。从危险的工业现场到至关重要的基础设施&#xff0c;它们的身影无处不在。它们以精准、高效、不知疲倦的特性&#xff0c;担当起保障生产、守护安全的重任&#xff0c;为行业发展注入新的活力。那么&…...

vue2迁移到vue3注意点

vue2迁移到vue3注意点 1、插槽的修改 使用 #default &#xff0c; 以及加上template 模板 2、 类型的定义&#xff0c;以及路由&#xff0c;vue相关资源&#xff08;ref, reactive,watch&#xff09;的引入等 3、类装饰器 1&#xff09;vue-class-component是vue官方库,作…...

使用windows批量解压和布局ImageNet ISLVRC2012数据集

使用的系统是windows&#xff0c;找到的解压命令很多都linux系统中的&#xff0c;为了能在windows系统下使用&#xff0c;因此下载Git这个软件&#xff0c;在其中的Git Bash中使用以下命令&#xff0c;因为Git Bash集成了很多linux的命令&#xff0c;方便我们的使用。 ImageNe…...

css实现每个小盒子占32%,超出就换行

代码 <div class"visitors"><visitor class"item" v-for"(user,index) in userArr" :key"user.id" :user"user" :index"index"></visitor></div><style lang"scss" scoped&…...

C++的链接指示extern “C“

目录 链接指示extern "C"A.What&#xff08;概念&#xff09;B.Why&#xff08;extern "C"的作用&#xff09;C.How &#xff08;如何使用链接指示extern "C"&#xff09; 链接指示extern “C” A.What&#xff08;概念&#xff09; extern&quo…...

私域运营 组织架构

**揭秘私域社群运营的神秘面纱&#xff1a;角色与职能一网打尽&#xff01;** 在私域社群运营的大舞台上&#xff0c;每个角色都扮演着不可或缺的重要角色。今天&#xff0c;就让我们一起揭开这个神秘世界的面纱&#xff0c;看看这些角色们是如何协同作战&#xff0c;共同创造…...

Netty HTTP

Netty 是一个高性能的异步事件驱动的网络应用程序框架&#xff0c;支持快速开发可维护的高性能协议服务器和客户端。它广泛应用于开发网络应用程序&#xff0c;如服务器和客户端协议的实现。Netty 提供了对多种传输类型的抽象&#xff0c;如 TCP/IP 和 UDP/IP 等&#xff0c;使…...

什么是边缘计算技术和边缘计算平台?

随着物联网、5G技术和人工智能的不断发展&#xff0c;数据的规模和种类也在快速增加。在这种背景下&#xff0c;传统的云计算模式面临着一些问题&#xff0c;例如延迟高、网络拥塞等&#xff0c;这些问题限制了数据的处理速度和效率&#xff0c;降低了用户的使用体验。为了解决…...

自然语言处理(NLP)——法国工程师IMT联盟 期末考试题

1. 问题1 &#xff08;法语&#xff09;En langue arabe lcrasante majorit des mots sont forms par des combinaisons de racines et de schmes. Dans ce mcanisme... &#xff08;英语&#xff09;In Arabic language the vast majority&#xff08;十之八九&#xff09; of…...

Linux内核编译安装 - Deepin,Debian系

为什么要自己编译内核 优点 定制化&#xff1a;你可以根据自己的硬件和需求配置内核&#xff0c;去掉不必要的模块&#xff0c;优化性能。性能优化&#xff1a;移除不需要的驱动程序和特性&#xff0c;减小内核体积&#xff0c;提高系统性能。最新特性和修复&#xff1a;获取…...

安全防御,防火墙配置NAT转换智能选举综合实验

目录&#xff1a; 一、实验拓扑图 二、实验需求 三、实验大致思路 四、实验步骤 1、防火墙的相关配置 2、ISP的配置 2.1 接口ip地址配置&#xff1a; 3、新增设备地址配置 4、多对多的NAT策略配置&#xff0c;但是要保存一个公网ip不能用来转换&#xff0c;使得办公区的…...

追溯源码观察HashMap底层原理

引言&#xff08;Map的重要性&#xff09; 从事Java的小伙伴&#xff0c;在面试的时候几乎都会被问到Map&#xff0c;Map都被盘包浆了。Map是键值集合&#xff0c;使用的场景有很多比如缓存、数据索引、数据去重等场景&#xff0c;在算法中也经常出现&#xff0c;因为在Map中获…...

为什么渲染农场渲染的是帧,而不是视频?

在3D动画产业的壮阔画卷中&#xff0c;渲染农场作为幕后英雄&#xff0c;以其庞大的计算能力支撑起无数视觉奇观的诞生。这些由高性能计算机集群构成的系统&#xff0c;通过独特的逐帧渲染策略&#xff0c;解锁了单机难以企及的创作自由与效率。本文将深入剖析这一策略背后的逻…...

windows镜像下载网站

一个专注于提供精校、完整、极致的Windows系统下载服务的仓储站&#xff0c;网站托管于Github。 网站&#xff1a;https://hellowindows.cn/ 下载方式&#xff1a;ED2k&#xff0c;BT&#xff0c;百度网盘 MSDN - 山己几子木&#xff0c;提供Windows 11、Windows 10等不同版本…...

ollama + fastgpt 搭建免费本地知识库

目录 1、ollama ollama的一些操作命令: 使用的方式: 2、fastgpt 快速部署: 修改配置: config.json: docker-compose.yml: 运行fastgpt: 访问OneApi: 添加令牌和渠道: 登陆fastgpt,创建知识库和应用 3、总结: 附录: 1. 11434是ollama的端口: 2. m3e 测…...

pytorch中一些最基本函数和类

1.Tensor操作 Tensor是PyTorch中最基本的数据结构&#xff0c;类似于NumPy的数组&#xff0c;但可以在GPU上运行加速计算。 示例&#xff1a;创建和操作Tensor import torch# 创建一个零填充的Tensor x torch.zeros(3, 3) print(x)# 加法操作 y torch.ones(3, 3) z x y pr…...

Flask RESTful 示例

目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题&#xff1a; 下面创建一个简单的Flask RESTful API示例。首先&#xff0c;我们需要创建环境&#xff0c;安装必要的依赖&#xff0c;然后…...

模型参数、模型存储精度、参数与显存

模型参数量衡量单位 M&#xff1a;百万&#xff08;Million&#xff09; B&#xff1a;十亿&#xff08;Billion&#xff09; 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的&#xff0c;但是一个参数所表示多少字节不一定&#xff0c;需要看这个参数以什么…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用

文章目录 问题现象问题原因解决办法 问题现象 macOS启动台&#xff08;Launchpad&#xff09;多出来了&#xff1a;Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显&#xff0c;都是Google家的办公全家桶。这些应用并不是通过独立安装的…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

【git】把本地更改提交远程新分支feature_g

创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

Angular微前端架构:Module Federation + ngx-build-plus (Webpack)

以下是一个完整的 Angular 微前端示例&#xff0c;其中使用的是 Module Federation 和 npx-build-plus 实现了主应用&#xff08;Shell&#xff09;与子应用&#xff08;Remote&#xff09;的集成。 &#x1f6e0;️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)

前言&#xff1a; 最近在做行为检测相关的模型&#xff0c;用的是时空图卷积网络&#xff08;STGCN&#xff09;&#xff0c;但原有kinetic-400数据集数据质量较低&#xff0c;需要进行细粒度的标注&#xff0c;同时粗略搜了下已有开源工具基本都集中于图像分割这块&#xff0c…...