spring-boot2.x整合Kafka步骤
1.pom依赖添加
<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version><relativePath /> <!-- lookup parent from repository --></parent><dependencies><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId></exclusion><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context-support</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>
2. 配置文件添加配置:
server:port: 8080spring:application:name: application-kafkakafka:bootstrap-servers: 192.168.190.100:9092,192.168.190.101:9092 #这个是kafka的地址,对应你server.properties中配置的producer:batch-size: 16384 #批量大小acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)retries: 10 # 消息发送重试次数#transaction-id-prefix: transactionbuffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger:ms: 2000 #提交延迟#partitioner: #指定分区器#class: com.test.config.CustomerPartitionHandlerconsumer:group-id: testGroup,testg2 #默认的消费组IDenable-auto-commit: true #是否自动提交offsetauto-commit-interval: 2000 #提交offset延时# 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: latestmax-poll-records: 500 #单次拉取消息的最大条数key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session:timeout:ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)request:timeout:ms: 18000 # 消费请求的超时时间listener:missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
# type: batchlogging:config: classpath:log4j2.xml
3. 日志配置
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO"><!--全局参数--><Properties><Property name="pattern">%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n</Property>
<!-- <Property name="logDir">/data/logs/logViewer</Property> --><Property name="logDir">logs</Property></Properties><Appenders><!-- 定义输出到控制台 --><Console name="console" target="SYSTEM_OUT" follow="true"><!--控制台只输出level及以上级别的信息-->
<!-- <ThresholdFilter level="debug" onMatch="ACCEPT" onMismatch="DENY"/> --><PatternLayout><Pattern>${pattern}</Pattern></PatternLayout></Console><!-- 同一来源的Appender可以定义多个RollingFile,定义按天存储日志 --><RollingFile name="rolling_file"fileName="${logDir}/logViewer.log"filePattern="${logDir}/logViewer_%d{yyyy-MM-dd}.log"><ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/><PatternLayout><Pattern>${pattern}</Pattern></PatternLayout><Policies><TimeBasedTriggeringPolicy interval="1"/></Policies><!-- 日志保留策略,配置只保留七天 --><DefaultRolloverStrategy><Delete basePath="${logDir}/" maxDepth="1"><IfFileName glob="logViewer_*.log" /><IfLastModified age="7d" /></Delete></DefaultRolloverStrategy></RollingFile></Appenders><Loggers><Root level="INFO"><AppenderRef ref="console"/><AppenderRef ref="rolling_file"/></Root></Loggers>
</Configuration>
4. controller入口类,其它应用通过该接口直接将数据写入kafka
@RequestMapping(value="/kafka")
@Controller
public class ProducerController {@Autowiredprivate KafkaTemplate kafkaTemplate;// 模拟发送消息@RequestMapping(value = "/send",method = RequestMethod.GET)public String sendMessage(@PathParam(value = "msg") String msg) {System.out.println("收到get请求。。。");kafkaTemplate.send("test",msg);return "成功";}
5. kafka回调方法(需要回调通知时使用该方式):
@GetMapping("/kafka/callbackTwo/{message}")public void sendCallbackTwoMessage(@PathVariable("message") String message) {kafkaTemplate.send("test", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败2:"+throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}});}
6.kafka消费者注册
@Component
public class KafkaMessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);/*** containerFactory* 消息过滤器消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。* @param record*/@KafkaListener(topics = {"test","test2"},groupId = "testGroup")public void listenTestStatus(ConsumerRecord<?, ?> record) {LOGGER.info("接收到消息:开始业务处理。。。。。");if (null == record || null == record.value()) {LOGGER.info("接收到空数据,跳过");}else {LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString());}}@KafkaListener(topics = {"test"},groupId = "testg2")public void listenTest2(ConsumerRecord<?, ?> record) {LOGGER.info("###listenTest2接收到消息:开始业务处理。。。。。");if (null == record || null == record.value()) {LOGGER.info("接收到空数据,跳过");}else {LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString());}}/*** id:消费者IDgroupId:消费组IDtopics:监听的topic,可监听多个topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区* @param records*///批量消费@KafkaListener(id = "consumer2", topics = {"test"}, groupId = "testGroup",errorHandler = "consumerAwareErrorHandler")public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println(record.value());}}
7.非spring-boot环境下使用java原生API手写kafka生产消息:
public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");}//消息处理完才停止发送者。producer.close();}
8.非spring-boot环境下使用java原生API手写java手写kafka消费者:
public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test"));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
// consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}
9.非spring-boot环境下使用java原生API手写异步发送kafka:
public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);//Part3:发送消息//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();//消息处理完才停止发送者。producer.close();}
相关文章:
spring-boot2.x整合Kafka步骤
1.pom依赖添加 <properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</ma…...
信创学习笔记(四),信创之数据库DB思维导图
创作不易 只因热爱!! 热衷分享,一起成长! “你的鼓励就是我努力付出的动力” 一. 信创学习回顾 1.信创内容 信创内容思维导图 2.信创之CPU芯片架构 信创之CPU芯片架构思维导图 3.信创之操作系统OS 信创之操作系统OS思维导图 二. 信创之国产数据库DB思维导图 …...
SCP 使用教程
SCP(Secure Copy Protocol)是一种通过加密的方式在本地主机和远程主机之间安全地传输文件的协议。它是基于SSH协议的扩展,允许用户在不同主机之间进行文件复制和传输,是Linux和Unix系统中常用的工具之一。本教程将详细介绍SCP的基…...
python自动化之用flask校验接口token(把token作为参数)
用到的库:flask 实现效果: 写一个接口,需要token正确才能登录 代码: # 导包 from flask import Flask,request,jsonify,json # 创建一个服务 appFlask(__name__) # post请求,路径:/query app.route(/query, met…...
旗晟巡检机器人的应用场景有哪些?
巡检机器人作为现代科技的杰出成果,已广泛应用于各个关键场景。从危险的工业现场到至关重要的基础设施,它们的身影无处不在。它们以精准、高效、不知疲倦的特性,担当起保障生产、守护安全的重任,为行业发展注入新的活力。那么&…...
vue2迁移到vue3注意点
vue2迁移到vue3注意点 1、插槽的修改 使用 #default , 以及加上template 模板 2、 类型的定义,以及路由,vue相关资源(ref, reactive,watch)的引入等 3、类装饰器 1)vue-class-component是vue官方库,作…...
使用windows批量解压和布局ImageNet ISLVRC2012数据集
使用的系统是windows,找到的解压命令很多都linux系统中的,为了能在windows系统下使用,因此下载Git这个软件,在其中的Git Bash中使用以下命令,因为Git Bash集成了很多linux的命令,方便我们的使用。 ImageNe…...
css实现每个小盒子占32%,超出就换行
代码 <div class"visitors"><visitor class"item" v-for"(user,index) in userArr" :key"user.id" :user"user" :index"index"></visitor></div><style lang"scss" scoped&…...
C++的链接指示extern “C“
目录 链接指示extern "C"A.What(概念)B.Why(extern "C"的作用)C.How (如何使用链接指示extern "C") 链接指示extern “C” A.What(概念) extern&quo…...
私域运营 组织架构
**揭秘私域社群运营的神秘面纱:角色与职能一网打尽!** 在私域社群运营的大舞台上,每个角色都扮演着不可或缺的重要角色。今天,就让我们一起揭开这个神秘世界的面纱,看看这些角色们是如何协同作战,共同创造…...
Netty HTTP
Netty 是一个高性能的异步事件驱动的网络应用程序框架,支持快速开发可维护的高性能协议服务器和客户端。它广泛应用于开发网络应用程序,如服务器和客户端协议的实现。Netty 提供了对多种传输类型的抽象,如 TCP/IP 和 UDP/IP 等,使…...
什么是边缘计算技术和边缘计算平台?
随着物联网、5G技术和人工智能的不断发展,数据的规模和种类也在快速增加。在这种背景下,传统的云计算模式面临着一些问题,例如延迟高、网络拥塞等,这些问题限制了数据的处理速度和效率,降低了用户的使用体验。为了解决…...
自然语言处理(NLP)——法国工程师IMT联盟 期末考试题
1. 问题1 (法语)En langue arabe lcrasante majorit des mots sont forms par des combinaisons de racines et de schmes. Dans ce mcanisme... (英语)In Arabic language the vast majority(十之八九) of…...
Linux内核编译安装 - Deepin,Debian系
为什么要自己编译内核 优点 定制化:你可以根据自己的硬件和需求配置内核,去掉不必要的模块,优化性能。性能优化:移除不需要的驱动程序和特性,减小内核体积,提高系统性能。最新特性和修复:获取…...
安全防御,防火墙配置NAT转换智能选举综合实验
目录: 一、实验拓扑图 二、实验需求 三、实验大致思路 四、实验步骤 1、防火墙的相关配置 2、ISP的配置 2.1 接口ip地址配置: 3、新增设备地址配置 4、多对多的NAT策略配置,但是要保存一个公网ip不能用来转换,使得办公区的…...
追溯源码观察HashMap底层原理
引言(Map的重要性) 从事Java的小伙伴,在面试的时候几乎都会被问到Map,Map都被盘包浆了。Map是键值集合,使用的场景有很多比如缓存、数据索引、数据去重等场景,在算法中也经常出现,因为在Map中获…...
为什么渲染农场渲染的是帧,而不是视频?
在3D动画产业的壮阔画卷中,渲染农场作为幕后英雄,以其庞大的计算能力支撑起无数视觉奇观的诞生。这些由高性能计算机集群构成的系统,通过独特的逐帧渲染策略,解锁了单机难以企及的创作自由与效率。本文将深入剖析这一策略背后的逻…...
windows镜像下载网站
一个专注于提供精校、完整、极致的Windows系统下载服务的仓储站,网站托管于Github。 网站:https://hellowindows.cn/ 下载方式:ED2k,BT,百度网盘 MSDN - 山己几子木,提供Windows 11、Windows 10等不同版本…...
ollama + fastgpt 搭建免费本地知识库
目录 1、ollama ollama的一些操作命令: 使用的方式: 2、fastgpt 快速部署: 修改配置: config.json: docker-compose.yml: 运行fastgpt: 访问OneApi: 添加令牌和渠道: 登陆fastgpt,创建知识库和应用 3、总结: 附录: 1. 11434是ollama的端口: 2. m3e 测…...
pytorch中一些最基本函数和类
1.Tensor操作 Tensor是PyTorch中最基本的数据结构,类似于NumPy的数组,但可以在GPU上运行加速计算。 示例:创建和操作Tensor import torch# 创建一个零填充的Tensor x torch.zeros(3, 3) print(x)# 加法操作 y torch.ones(3, 3) z x y pr…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
Mac软件卸载指南,简单易懂!
刚和Adobe分手,它却总在Library里给你写"回忆录"?卸载的Final Cut Pro像电子幽灵般阴魂不散?总是会有残留文件,别慌!这份Mac软件卸载指南,将用最硬核的方式教你"数字分手术"࿰…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!
一、引言 在数据驱动的背景下,知识图谱凭借其高效的信息组织能力,正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合,探讨知识图谱开发的实现细节,帮助读者掌握该技术栈在实际项目中的落地方法。 …...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...
安卓基础(aar)
重新设置java21的环境,临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的: MyApp/ ├── app/ …...
