当前位置: 首页 > news >正文

Spring-Kafka笔记整理

  1. 引入依赖
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. 配置application.properties
    spring.kafka.bootstrap-servers=192.168.99.51:9092
    
  3. 编写kafka的配置类
    @Configuration
    public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configs);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();// 并发数就是一个消费者实例起几个线程factory.setConcurrency(3);factory.setConsumerFactory(consumerFactory());return factory;}
    }
    
  4. Kafka消息监听
    @Component
    public class KafkaConsumer {@Autowiredprivate ObjectMapper mapper;@KafkaListener(topics = {"hello-kafka-topic"},groupId = "hello-kafka-group",containerFactory = "kafkaListenerContainerFactory")public void listener01(ConsumerRecord<String, String> record) throws Exception {String key = record.key();String value = record.value();HelloMessage kafkaMessage = mapper.readValue(value, HelloMessage.class);log.info("in listener consume kafka message: [{}], [{}]", key, mapper.writeValueAsString(kafkaMessage));}
    }
    
  5. Kafka消息发送
    @Component
    public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String key, String value, String topic) {if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {throw new IllegalArgumentException("value or topic is null or empty");}ListenableFuture<SendResult<String, String>> future = StringUtils.isBlank(key) ?kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);// 异步回调的方式获取通知future.addCallback(success -> {assert null != success && null != success.getRecordMetadata();// 发送到 kafka 的 topicString _topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的 offsetlong offset = success.getRecordMetadata().offset();log.info("send kafka message success: [{}], [{}], [{}]", _topic, partition, offset);}, failure -> {log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic);});}
    }
    

相关文章:

Spring-Kafka笔记整理

引入依赖<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>配置application.propertiesspring.kafka.bootstrap-servers192.168.99.51:9092编写kafka的配置类Configuration …...

已解决org.apache.hadoop.hdfs.protocol.QuotaExceededException异常的正确解决方法,亲测有效!!!

已解决org.apache.hadoop.hdfs.protocol.QuotaExceededException异常的正确解决方法&#xff0c;亲测有效&#xff01;&#xff01;&#xff01; 目录 问题分析 报错原因 解决思路 解决方法 总结 博主v&#xff1a;XiaoMing_Java 问题分析 在使用Hadoop分布式文件系统&a…...

GitHub打不开的解决方案(超简单)

在国内&#xff0c;github官网经常面临打不开或访问极慢的问题&#xff0c;不挂VPN&#xff08;梯子&#xff0c;飞机&#xff0c;魔法&#xff09;使用体验极差&#xff0c;那有什么好办法解决github官网访问不了的问题&#xff1f;今天小布教你几招轻松访问github官网。 git…...

Unity开发一个FPS游戏之二

在之前的文章中,我介绍了如何开发一个FPS游戏,添加一个第一人称的主角,并设置武器。现在我将继续完善这个游戏,打算添加敌人,实现其智能寻找玩家并进行对抗。完成的效果如下: fps_enemy_demo 下载资源 首先是设计敌人,我们可以在网上找到一些好的免费素材,例如在Unity…...

STM32F103 CubeMX 使用USB生成鼠标设备

STM32F103 CubeMX 使用USB生成鼠标设备 1 配置cubeMX1.1配置外部晶振&#xff0c;配置debug口1.2 配置USB1.3 配置芯片的时钟1.4 生成工程 2. 编写代码2.1 添加申明2.2 main函数代码 1 配置cubeMX 1.1配置外部晶振&#xff0c;配置debug口 1.2 配置USB 1.3 配置芯片的时钟 需…...

HJXH-E1/U静态信号继电器 面板安装 辅助电源220VDC 启动电压220VDC JOSEF约瑟

HJXH系列静态信号继电器 HJXH-61/U静态信号继电器&#xff1b; HJXH-61/I静态信号继电器&#xff1b; HJXH-62/U静态信号继电器&#xff1b; HJXH-62/I静态信号继电器&#xff1b; HJXH-E1/U静态信号继电器&#xff1b; HJXH-E1/I静态信号继电器&#xff1b; HJXH-E2/U静态信号…...

SpringBoot3下Kafka分组均衡消费实现

首先添加maven依赖&#xff1a; <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version><exclusions><!--此处一定要排除kafka-clients&#xff0c;然…...

鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:GridItem)

网格容器中单项内容容器。 说明&#xff1a; 该组件从API Version 7开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。仅支持作为Grid组件的子组件使用。 子组件 可以包含单个子组件。 接口 GridItem GridItem(value?: GridItemOptions)…...

Qt 使用RAW INPUT获取HID触摸屏,笔设备,鼠标的原始数据,最低受支持的客户端:Windows XP [仅限桌面应用]

在开发绘图应用程序时&#xff0c;经常会需要读取笔设备的数据&#xff0c;通过对笔数据的解析&#xff0c;来判断笔的坐标&#xff0c;粗细。如果仅仅只是读取鼠标的坐标&#xff0c;就需要人为在应用程序端去修改笔的粗细&#xff0c;并且使用体验不好&#xff0c;如果可以实…...

easyexcel导出excel文件到s3服务器

导出excel文件是开发中常见的需求 常见的做法一般是直接通过请求接口响应对象HttpServletResponse把文件输出 我们可以使用原生的poi工具类操作.也可以使用easypoi.easyexcel等基于poi二次封装的工具处理 下面是代码 /*** 导出列表** param request* param response*/Overri…...

xss.haozi.me靶场“0x0B-0x12”通关教程

君衍. 一、0x0B 实体编码绕过二、0x0C script绕过三、0x0D 注释绕过四、0X0E ſ符号绕过五、0x0F 编码解码六、0x10 直接执行七、0x11 闭合绕过八、0x12 闭合绕过 XSS-Labs靶场“1-5”关通关教程 XSS-Labs靶场“6-10”关通关教程 Appcms存储型XSS漏洞复现 XSS-Labs靶场“11-13、…...

linux--redhat系统Yum源配置

1&#xff09;说明 redhat yum命令使用报错解决-重新配置yum源 解决--更改yum源 2&#xff09;更改yum源 &#xff08;1&#xff09;进入源目录 cd /etc/yum.repos.d/ &#xff08;2&#xff09;备份 redhat 默认源 mv redhat.repo redhat.repo-bak &#xff08;3&#xff09;…...

el-Switch 开关二次确认

前言 最近在做毕设&#xff0c;有个需求是点击按钮控制用户的状态是否禁用&#xff0c;就看到element有个switch组件可以改造一下&#xff0c;就上网看了一下&#xff0c;结果为了这个效果忙活了很久。。。所以说记录一下&#xff0c;让大家少踩坑。 前置条件 先看完我的需求再…...

(二)丶RabbitMQ的六大核心

一丶什么是MQ Message Queue(消息队列&#xff09;简称MQ&#xff0c;是一种应用程序对应用程序的消息通信机制。在MQ中&#xff0c;消息以队列形式存储&#xff0c;以便于异步传输&#xff0c;在MQ中&#xff0c;发布者&#xff08;生产者&#xff09;将消息放入队列&#xff…...

微信小程序实现上下手势滑动切换

效果图 思路 实现一个微信小程序的复合滚动页面&#xff0c;主要通过Swiper组件实现垂直方向的轮播功能&#xff0c;每个轮播项内部使用Scroll-View组件来展示可垂直滚动的长内容&#xff0c;如图片和文本。 代码 <!-- wxml --> <view class"swiper-container…...

详解命令docker run -d --name container_name -e TZ=Asia/Shanghai your_image

docker run 是Docker的主要命令&#xff0c;用于从镜像启动一个新的容器。下面详细解释并举例说明 -d, --name, -e TZ 参数的用法&#xff1a; -d 或 --detach&#xff1a; 这个标志告诉Docker以守护进程&#xff08;后台&#xff09;模式运行容器。这意味着当你执行 docker ru…...

javaEE7

1. <% page pageEncoding"UTF-8"%><% page import"java.io.*"%> <% page import"java.util.*"%> <% page import"java.math.*"%> <html> <head><title>网站计数器</title></head&…...

int与integer的区别

int和integer都是用来表示整数的数据类型&#xff0c;但有一些细微的区别。 int是Java中的基本数据类型&#xff0c;它可以存储整数值。int类型在内存中占4个字节&#xff0c;范围为-2,147,483,648到2,147,483,647。int类型使用最频繁&#xff0c;因为它的存储空间较小&#x…...

Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)

Golang实现Redis分布式锁&#xff08;Lua脚本可重入自动续期&#xff09; 1 概念 应用场景 Golang自带的Lock锁单机版OK&#xff08;存储在程序的内存中&#xff09;&#xff0c;分布式不行 分布式锁&#xff1a; 简单版&#xff1a;redis setnx》加锁设置过期时间需要保证原…...

音乐播放器-C#实现

音乐播放器-C#实现 目录 一、 代码介绍 二、 音乐播放器-C#实现 三、 音乐播放器-C#实现 四、 音乐播放器-C#实现 五、 音乐播放器-C#实现 代码介绍 代码中使用了.NET框架中的System.Media命名空间来处理音频文件的播放和控制。这段代码创建了一个简单的音乐播放器界…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

C++八股 —— 单例模式

文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全&#xff08;Thread Safety&#xff09; 线程安全是指在多线程环境下&#xff0c;某个函数、类或代码片段能够被多个线程同时调用时&#xff0c;仍能保证数据的一致性和逻辑的正确性&#xf…...

MySQL 部分重点知识篇

一、数据库对象 1. 主键 定义 &#xff1a;主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 &#xff1a;确保数据的完整性&#xff0c;便于数据的查询和管理。 示例 &#xff1a;在学生信息表中&#xff0c;学号可以作为主键&#xff…...

C语言中提供的第三方库之哈希表实现

一. 简介 前面一篇文章简单学习了C语言中第三方库&#xff08;uthash库&#xff09;提供对哈希表的操作&#xff0c;文章如下&#xff1a; C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...

论文阅读笔记——Muffin: Testing Deep Learning Libraries via Neural Architecture Fuzzing

Muffin 论文 现有方法 CRADLE 和 LEMON&#xff0c;依赖模型推理阶段输出进行差分测试&#xff0c;但在训练阶段是不可行的&#xff0c;因为训练阶段直到最后才有固定输出&#xff0c;中间过程是不断变化的。API 库覆盖低&#xff0c;因为各个 API 都是在各种具体场景下使用。…...