当前位置: 首页 > news >正文

Spring-Kafka笔记整理

  1. 引入依赖
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. 配置application.properties
    spring.kafka.bootstrap-servers=192.168.99.51:9092
    
  3. 编写kafka的配置类
    @Configuration
    public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configs);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();// 并发数就是一个消费者实例起几个线程factory.setConcurrency(3);factory.setConsumerFactory(consumerFactory());return factory;}
    }
    
  4. Kafka消息监听
    @Component
    public class KafkaConsumer {@Autowiredprivate ObjectMapper mapper;@KafkaListener(topics = {"hello-kafka-topic"},groupId = "hello-kafka-group",containerFactory = "kafkaListenerContainerFactory")public void listener01(ConsumerRecord<String, String> record) throws Exception {String key = record.key();String value = record.value();HelloMessage kafkaMessage = mapper.readValue(value, HelloMessage.class);log.info("in listener consume kafka message: [{}], [{}]", key, mapper.writeValueAsString(kafkaMessage));}
    }
    
  5. Kafka消息发送
    @Component
    public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String key, String value, String topic) {if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {throw new IllegalArgumentException("value or topic is null or empty");}ListenableFuture<SendResult<String, String>> future = StringUtils.isBlank(key) ?kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);// 异步回调的方式获取通知future.addCallback(success -> {assert null != success && null != success.getRecordMetadata();// 发送到 kafka 的 topicString _topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的 offsetlong offset = success.getRecordMetadata().offset();log.info("send kafka message success: [{}], [{}], [{}]", _topic, partition, offset);}, failure -> {log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic);});}
    }
    

相关文章:

Spring-Kafka笔记整理

引入依赖<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>配置application.propertiesspring.kafka.bootstrap-servers192.168.99.51:9092编写kafka的配置类Configuration …...

已解决org.apache.hadoop.hdfs.protocol.QuotaExceededException异常的正确解决方法,亲测有效!!!

已解决org.apache.hadoop.hdfs.protocol.QuotaExceededException异常的正确解决方法&#xff0c;亲测有效&#xff01;&#xff01;&#xff01; 目录 问题分析 报错原因 解决思路 解决方法 总结 博主v&#xff1a;XiaoMing_Java 问题分析 在使用Hadoop分布式文件系统&a…...

GitHub打不开的解决方案(超简单)

在国内&#xff0c;github官网经常面临打不开或访问极慢的问题&#xff0c;不挂VPN&#xff08;梯子&#xff0c;飞机&#xff0c;魔法&#xff09;使用体验极差&#xff0c;那有什么好办法解决github官网访问不了的问题&#xff1f;今天小布教你几招轻松访问github官网。 git…...

Unity开发一个FPS游戏之二

在之前的文章中,我介绍了如何开发一个FPS游戏,添加一个第一人称的主角,并设置武器。现在我将继续完善这个游戏,打算添加敌人,实现其智能寻找玩家并进行对抗。完成的效果如下: fps_enemy_demo 下载资源 首先是设计敌人,我们可以在网上找到一些好的免费素材,例如在Unity…...

STM32F103 CubeMX 使用USB生成鼠标设备

STM32F103 CubeMX 使用USB生成鼠标设备 1 配置cubeMX1.1配置外部晶振&#xff0c;配置debug口1.2 配置USB1.3 配置芯片的时钟1.4 生成工程 2. 编写代码2.1 添加申明2.2 main函数代码 1 配置cubeMX 1.1配置外部晶振&#xff0c;配置debug口 1.2 配置USB 1.3 配置芯片的时钟 需…...

HJXH-E1/U静态信号继电器 面板安装 辅助电源220VDC 启动电压220VDC JOSEF约瑟

HJXH系列静态信号继电器 HJXH-61/U静态信号继电器&#xff1b; HJXH-61/I静态信号继电器&#xff1b; HJXH-62/U静态信号继电器&#xff1b; HJXH-62/I静态信号继电器&#xff1b; HJXH-E1/U静态信号继电器&#xff1b; HJXH-E1/I静态信号继电器&#xff1b; HJXH-E2/U静态信号…...

SpringBoot3下Kafka分组均衡消费实现

首先添加maven依赖&#xff1a; <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version><exclusions><!--此处一定要排除kafka-clients&#xff0c;然…...

鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:GridItem)

网格容器中单项内容容器。 说明&#xff1a; 该组件从API Version 7开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。仅支持作为Grid组件的子组件使用。 子组件 可以包含单个子组件。 接口 GridItem GridItem(value?: GridItemOptions)…...

Qt 使用RAW INPUT获取HID触摸屏,笔设备,鼠标的原始数据,最低受支持的客户端:Windows XP [仅限桌面应用]

在开发绘图应用程序时&#xff0c;经常会需要读取笔设备的数据&#xff0c;通过对笔数据的解析&#xff0c;来判断笔的坐标&#xff0c;粗细。如果仅仅只是读取鼠标的坐标&#xff0c;就需要人为在应用程序端去修改笔的粗细&#xff0c;并且使用体验不好&#xff0c;如果可以实…...

easyexcel导出excel文件到s3服务器

导出excel文件是开发中常见的需求 常见的做法一般是直接通过请求接口响应对象HttpServletResponse把文件输出 我们可以使用原生的poi工具类操作.也可以使用easypoi.easyexcel等基于poi二次封装的工具处理 下面是代码 /*** 导出列表** param request* param response*/Overri…...

xss.haozi.me靶场“0x0B-0x12”通关教程

君衍. 一、0x0B 实体编码绕过二、0x0C script绕过三、0x0D 注释绕过四、0X0E ſ符号绕过五、0x0F 编码解码六、0x10 直接执行七、0x11 闭合绕过八、0x12 闭合绕过 XSS-Labs靶场“1-5”关通关教程 XSS-Labs靶场“6-10”关通关教程 Appcms存储型XSS漏洞复现 XSS-Labs靶场“11-13、…...

linux--redhat系统Yum源配置

1&#xff09;说明 redhat yum命令使用报错解决-重新配置yum源 解决--更改yum源 2&#xff09;更改yum源 &#xff08;1&#xff09;进入源目录 cd /etc/yum.repos.d/ &#xff08;2&#xff09;备份 redhat 默认源 mv redhat.repo redhat.repo-bak &#xff08;3&#xff09;…...

el-Switch 开关二次确认

前言 最近在做毕设&#xff0c;有个需求是点击按钮控制用户的状态是否禁用&#xff0c;就看到element有个switch组件可以改造一下&#xff0c;就上网看了一下&#xff0c;结果为了这个效果忙活了很久。。。所以说记录一下&#xff0c;让大家少踩坑。 前置条件 先看完我的需求再…...

(二)丶RabbitMQ的六大核心

一丶什么是MQ Message Queue(消息队列&#xff09;简称MQ&#xff0c;是一种应用程序对应用程序的消息通信机制。在MQ中&#xff0c;消息以队列形式存储&#xff0c;以便于异步传输&#xff0c;在MQ中&#xff0c;发布者&#xff08;生产者&#xff09;将消息放入队列&#xff…...

微信小程序实现上下手势滑动切换

效果图 思路 实现一个微信小程序的复合滚动页面&#xff0c;主要通过Swiper组件实现垂直方向的轮播功能&#xff0c;每个轮播项内部使用Scroll-View组件来展示可垂直滚动的长内容&#xff0c;如图片和文本。 代码 <!-- wxml --> <view class"swiper-container…...

详解命令docker run -d --name container_name -e TZ=Asia/Shanghai your_image

docker run 是Docker的主要命令&#xff0c;用于从镜像启动一个新的容器。下面详细解释并举例说明 -d, --name, -e TZ 参数的用法&#xff1a; -d 或 --detach&#xff1a; 这个标志告诉Docker以守护进程&#xff08;后台&#xff09;模式运行容器。这意味着当你执行 docker ru…...

javaEE7

1. <% page pageEncoding"UTF-8"%><% page import"java.io.*"%> <% page import"java.util.*"%> <% page import"java.math.*"%> <html> <head><title>网站计数器</title></head&…...

int与integer的区别

int和integer都是用来表示整数的数据类型&#xff0c;但有一些细微的区别。 int是Java中的基本数据类型&#xff0c;它可以存储整数值。int类型在内存中占4个字节&#xff0c;范围为-2,147,483,648到2,147,483,647。int类型使用最频繁&#xff0c;因为它的存储空间较小&#x…...

Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)

Golang实现Redis分布式锁&#xff08;Lua脚本可重入自动续期&#xff09; 1 概念 应用场景 Golang自带的Lock锁单机版OK&#xff08;存储在程序的内存中&#xff09;&#xff0c;分布式不行 分布式锁&#xff1a; 简单版&#xff1a;redis setnx》加锁设置过期时间需要保证原…...

音乐播放器-C#实现

音乐播放器-C#实现 目录 一、 代码介绍 二、 音乐播放器-C#实现 三、 音乐播放器-C#实现 四、 音乐播放器-C#实现 五、 音乐播放器-C#实现 代码介绍 代码中使用了.NET框架中的System.Media命名空间来处理音频文件的播放和控制。这段代码创建了一个简单的音乐播放器界…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

【论文笔记】若干矿井粉尘检测算法概述

总的来说&#xff0c;传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度&#xff0c;通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现

摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序&#xff0c;以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务&#xff0c;提供稳定高效的数据处理与业务逻辑支持&#xff1b;利用 uniapp 实现跨平台前…...

C++中string流知识详解和示例

一、概览与类体系 C 提供三种基于内存字符串的流&#xff0c;定义在 <sstream> 中&#xff1a; std::istringstream&#xff1a;输入流&#xff0c;从已有字符串中读取并解析。std::ostringstream&#xff1a;输出流&#xff0c;向内部缓冲区写入内容&#xff0c;最终取…...

LLM基础1_语言模型如何处理文本

基于GitHub项目&#xff1a;https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken&#xff1a;OpenAI开发的专业"分词器" torch&#xff1a;Facebook开发的强力计算引擎&#xff0c;相当于超级计算器 理解词嵌入&#xff1a;给词语画"…...

Spring数据访问模块设计

前面我们已经完成了IoC和web模块的设计&#xff0c;聪明的码友立马就知道了&#xff0c;该到数据访问模块了&#xff0c;要不就这俩玩个6啊&#xff0c;查库势在必行&#xff0c;至此&#xff0c;它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据&#xff08;数据库、No…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

3-11单元格区域边界定位(End属性)学习笔记

返回一个Range 对象&#xff0c;只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意&#xff1a;它移动的位置必须是相连的有内容的单元格…...

基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解

JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用&#xff0c;结合SQLite数据库实现联系人管理功能&#xff0c;并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能&#xff0c;同时可以最小化到系统…...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...