Redis的使用(五)常见使用场景-分布式锁实现原理
1.绪论
为了解决并发问题,我们可以通过加锁的方式来保证数据的一致性,比如java中的synchronize关键字或者ReentrantLock,但是他们只能在同一jvm进程上加锁。现在的项目基本上都是分布式系统,如何对多个java实例进行加锁,这就需要用到分布式锁。分布式锁可以由多种实现方式,本文将要介绍的就是采用redis实现的方式。
2.分布式锁简单实现
2.1 基本原理
redis的分布式锁实现其实是由redis中的setnx命令来实现的,当线程1需要获取锁时,只需要调用setnx lockkey val,如果设置成功,便表示加锁成功。线程2到达时,同样调用setnx lockkey val方法,但是发现redis中已经存在lockkey时,便加锁失败。线程1执行完业务逻辑过后,便可以调用del key删除对应的key,这样便完成解锁。
setnx key val 表示当redis中不存在这个key便设置成功,否者设置失败。
3.如果set key成功,但是del key失败怎么办
3.1 问题
如果线程1在setnx key val调用成功过后,并且执行完业务逻辑,宕机了,此时相当于key便一直存在redis中,这样其他线程便一直不能获取到锁,导致死锁。
3.2 解决方案
我们可以在给key设置一个过期时间,这样利用redis过期策略进行兜底,就算线程1set key成功后宕机了,因为有过期时间,也能保证在一段时间后,这个key会被删除掉。
setnx key val
expire key timeout
可以看出可以使用上面两个命令,来对key设置过期时间,但是还是有问题,就是set key和expire key之间不是一个原子操作,这样会导致在加锁的时候会有并发问题。所以,我们相当redis中的lua脚本,来保证set key和expire key之间是一个原子操作。对于string字符串,redis提供了一个符合命令,来实现上面两个命令的功能:
set key val NX PX timeout
4 如何解决锁误删问题并实现可重入锁功能
4.1 问题描述
4.1.1 锁误删问题
1.线程1获取到锁,过后,由于设置了可以的超时时间,锁过期。
2.线程2来获取锁便能获取成功。
3.线程1执行完成,释放锁,调用del key,删除key成功。
4.线程3加锁,set key成功,但是此时线程2还在执行。
其实上面本质上就是线程释放了不是自己加的锁。
4.1.2 可重入锁
在java中的reentrantLock和syncronized都有可重入功能,即线程在获取到锁过后,能够再次获取当前锁,并且可冲入次数加1,如果释放锁时,可重入次数减1。
4.2 解决方案
既然上面线程释放了不是自己加的锁导致锁误删问题,我们可以在加锁是将线程id,记录到key中,这样,每次释放锁的时候,判断一下是否是本线程程加的锁,如果不是便直接返回,如果是便释放锁,就可以了。
而对于可重入问题,我们在记录线程id的时候,我们可以记录一下重入次数,每次重入的时候,重入加1,释放锁的时候,重入次数减1,减为0便删除key就可以了。
针对上面两点,我们可以采用hash结构来存储key,其中field为线程id,value为重入次数。
所以加锁变为:
//获取重入次数
local time = hget key threadId
//如果未加锁,设置重入次数为1
if(nil == time) thenhset key threadId 1
//如果已经加锁,设置重入次数自增
else hincr key threadId
end
expire key 过期时间
释放锁为
//获取可重入次数
local time = hget key threadId
//如果没有threadId这个feild,表示该key已经过期,不用删除
if(nil = time) then return 0
end
//表示当前只加锁了一次,删除锁
if (time < 2) thendel key
else
//否者重入次数减1hincr key threadId -1
end
4.3 代码实现
4.3.1 java代码
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;private static final DefaultRedisScript<Long> LOCK_SCRIPT;static {LOCK_SCRIPT = new DefaultRedisScript<>();LOCK_SCRIPT.setLocation(new ClassPathResource("lock.lua"));LOCK_SCRIPT.setResultType(Long.class);UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 调用lua脚本Long result = stringRedisTemplate.execute(LOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId(),timeoutSec);if (result == null) {return false;}return 1 == result;}@Overridepublic void unlock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}
}
注意:java和lua的交互execute方法 /*** @param script lua脚本* @param keys 需要操作的key,在lua脚本中用KEYS数组来接收,从1开始* @param args 其他参数,在lua脚本中用ARGVS数组来接收,从1开始* @param <T>* @return*/ public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {return this.scriptExecutor.execute(script, keys, args); }
4.3.2 加锁lua脚本
--1.获取参数列表
--加锁key在keys数组中
local lockKey = KEYS[1]
--线程id
local threadId = ARGV[1]
--超时时间
local timeout = ARGV[2]--2.获取重入次数
local time = redis.call('hget', lockKey, threadId)--3.如果未加锁,设置重入次数为1
if (nil == time) thenredis.call('hset', lockKey, threadId, 1)--如果已经加锁,设置重入次数自增
elseredis.call('hincr', lockKey, threadId)
end
--4.设置过期时间
redis.call('expire', lockKey, timeout)
return 1
4.3.3 释放锁lua脚本
--1.获取参数列表
--加锁key在keys数组中
local lockKey = KEYS[1]
--线程id
local threadId = ARGV[1]
--2.获取重入次数
local time = redis.call('hget', lockKey, threadId)
--3.如果没有threadId这个feild,表示该key已经过期,不用删除
if(nil == time) thenreturn 1
end
--4.如果重入次数为1,表示当前只加锁了一次,删除锁
if (time < 2) thenredis.call('del', lockKey)
else
--5.否者重入次数减1redis.call('hincr', lockKey,threadId)
end
return 1
lua脚本调用redis命令
redis.call('命令名称', 'key', '其它参数', ...)
5.未获取锁时进行重试实现
5.1 问题描述
前面实现的分布式锁时,当线程1获取锁时,线程2尝试获取锁失败,便会直接返回失败。我们如果要线程2在获取锁失败后,在一段时间内尝试获取锁,如果超所该时间,才返回失败,应该如何实现呢?
5.2 问题解决
在线程获取锁失败时,我们可以进行自旋直到获取锁成功为止,但是这样会消耗资源。所以我们可以通过redis的发布订阅机制,当线程获取锁失败过后,订阅加锁的key,然后阻塞。当其他线程释放锁的时候,会给我们发送一个通知,唤醒当前线程。
//订阅某个频道
SUBSCRIBE channel [channel ...]
5.3 redisson源码分析
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//获取线程等待时间long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();//尝试获取锁,如果获取失败,返回当前key还有多久过期Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}//等待时间扣减从当前方法进入到执行到这里花费时间time -= System.currentTimeMillis() - current;//如果小于0表示已经过期if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();//定于当前keyRFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}//真正的等待逻辑while (true) {long currentTime = System.currentTimeMillis();//再次获取锁,如果失败得到锁还有多久过期ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {//如果key被删除,会pulish消息,唤醒当前线程subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}
// return get(tryLockAsync(waitTime, leaseTime, unit));}
6.如何解决锁续期问题
6.1 问题描述
我们在实现分布式锁的时候,会设置过期时间,超过这个过期时间,这个key便会被自动删除。假设超过这个过期时间,当前业务逻辑还未执行完成,这样其他线程能拿到锁,会导致并发问题。
6.2 解决方案
我们可以设置一个线程,专门用来监听当前业务逻辑是否完成,如果未完成,便对key的时间进行续期,在redission实现的分布式锁中,这个线程被称作watchDog。我们来看看redisson中是如何实现的。
6.3 redisson源码分析
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//如果设置锁自动释放时间不等于-1,走正常逻辑if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}
//如果锁自动释放时间等于-1,开启watchDog,并且设置锁自动释放时间为30sRFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {//启动watchDog线程,为当前线程进行锁续期scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}
加锁的流程:可以看出redission实现的分布式锁的lua逻辑其实和我们上面是差不多的。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
锁续期流程:
private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();//其实就是将当前的线程id封装成一个entry,并且加入到一个Map中,然后调用renewExpiration方法ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);renewExpiration();}}
private void renewExpiration() {//从当前map中获取监听的entryExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}//启动一个定时任务Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {//从map中取出需要续约的线程idExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}//将线程id的过期时间重置为30sRFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itself//递归调用,一直监听存活的线程renewExpiration();}});}//时间为10会触发一次定时任务}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}
简单而言,其实就是启动一个定时线程,一直扫描存活的key,如果未过期,便重置其过期时间为30s。可以看出,当线程调用了unlock方法,便会停止锁续期。
6.4watchDog的几个问题
6.4.1 客户端宕机,watchDog是否会一直续期
答案是不会的,如果客户端宕机,证明当前jvm实例已经挂掉,所以执行watchDog的线程自然也挂掉了。
6.4.2 unLock失败,watchDog是否会一直续期
答案是不会的,在redisson的解锁方法中,会用到一个CompletionStage,它能保证无论删除key是否抛出异常,都能将当前线程id的锁续期任务从EXPIRATION_RENEWAL_MAP中移除。
7.redis主从一致性
7.1 问题描述
如果redis采用单机部署的话,redis宕机,导致整个服务都不可用。如果redis采用集群部署,但是会有主动不同步的问题,比如线程1将key加入到主节点,但是主节点还未将数据同步到从节点宕机,选举从节点为新的主节点,这个主节点并没有线程1设置的key,导致线程安全问题。\
7.2 联锁
7.2.1 实现原理
其实就是部署多个redis单击实例,当加锁的时候,向每个redis实例都发送setnx key val请求,当所有的redis实例都返回成功,才认为成功。
7.3 红锁
联锁是要求所有的锁加锁成功,才表示加锁成功,而红锁是只需要满足大于n/2+1个节点加锁成功,便成功。
相关文章:

Redis的使用(五)常见使用场景-分布式锁实现原理
1.绪论 为了解决并发问题,我们可以通过加锁的方式来保证数据的一致性,比如java中的synchronize关键字或者ReentrantLock,但是他们只能在同一jvm进程上加锁。现在的项目基本上都是分布式系统,如何对多个java实例进行加锁ÿ…...
AppML 案例:Products
AppML 案例:Products AppML(Application Markup Language)是一种创新的、基于XML的标记语言,旨在简化Web应用程序的开发。它允许开发者通过声明性的方式定义应用程序的界面和数据绑定,从而提高开发效率和减少代码量。…...

数据库端口LookUp功能:从数据库中获取并添加数据到XML
本文将为大家介绍如何使用知行之桥EDI系统数据库端口的Lookup功能,从数据库中获取数据,并添加进输入的XML中。 使用场景:期待以输入xml中的值为判断条件从数据库中获取数据,并添加进输入xml中。 例如:接收到包含采购…...

视频联网共享平台LntonCVS视频监控汇聚平台视频云解决方案
LntonCVS流媒体平台是一款遵循国家GB28181标准协议的先进视频监控与云服务平台。该平台设计独特,能够同时接入并处理多路设备的视频流,支持包括RTSP、RTMP、FLV、HLS、WebRTC在内的多种视频流格式的分发。其功能丰富多样,涵盖了视频直播监控、…...
深入探索Python中的`__slots__`类属性:优化内存与限制灵活性
深入探索Python中的__slots__类属性:优化内存与限制灵活性 在Python编程的广阔领域中,性能优化总是开发者们关注的焦点之一。特别是在处理大量对象或资源受限的环境中,减少内存占用和提高访问速度显得尤为重要。Python的__slots__类属性正是…...

llama 2 改进之 RMSNorm
RMSNorm 论文:https://openreview.net/pdf?idSygkZ3MTJE Github:https://github.com/bzhangGo/rmsnorm?tabreadme-ov-file 论文假设LayerNorm中的重新居中不变性是可有可无的,并提出了均方根层归一化(RMSNorm)。RMSNorm根据均方根(RMS)将…...
Matlab【光伏预测】基于雪融优化算法SAO优化高斯过程回归GPR实现光伏多输入单输出预测附代码
% 光伏预测 - 基于SAO优化的GPR % 数据准备 % 假设有多个输入特征 X1, X2, …, Xn 和一个目标变量 Y % 假设数据已经存储在 X 和 Y 中,每个变量为矩阵,每行表示一个样本,每列表示一个特征 % 参数设置 numFeatures size(X, 2); % 输入特征的…...
ES6 模块
ES6 模块学习记录 ES6(ECMAScript 2015)模块是JavaScript官方的标准模块系统。它允许开发者以模块化的方式编写代码,模块可以在不同的文件之间进行组织和重用。 基本特征 默认导出(Default Exports):每个…...

谷粒商城-全文检索-ElasticSearch
1.简介 一个分布式的开源搜索和分析引擎,可以 秒 级的从海量数据中检索 主要功能:做数据的检索和分析(MySQL专攻于数据的持久化存储与管理CRUD达到百万以上的数据MSQL就会很慢,海量数据的检索和分析还是要用ElasticSearch) 用途:我们电商项目里的所有的检索功能都是由Elasti…...

Java的LinkedHashMap 源码解析
LinkedHashMap 是 Java 中的一种有序 Map,它扩展了 HashMap,提供了有序的元素存储方式。在 LinkedHashMap 中,元素的有序性可以按照插入顺序或访问顺序来维护,而这个有序性是通过维护一个双向链表来实现的,这也是实现 …...

Linux系统及常用指令
目录 1、什么是Linux系统 2、为什么要用Linux系统 3、Linux系统的种类 4、如何安装Linux系统 5、常见的适配器种类 6、学习第一个Linux指令 7、安装ssh客户端软件 8、Linux系统的目录结构 9、Linux的常用命令 9.1 目录切换命令 9.2 查看目录下的内容 9.3 查看当前…...

Mac Electron 应用如何进行签名(signature)和公证(notarization)?
最近很多客户反映,从官网下载的Mac Electron应用打不开,直接报病毒,类似于这种: 这是因为在MacOS 10.14.5之后,如果应用没有在苹果官方平台进行公证notarization(我们可以理解为安装包需要审核,来判断是否存…...

【C++ | 抽象类】纯虚函数 和 抽象基类,为什么需要抽象基类
😁博客主页😁:🚀https://blog.csdn.net/wkd_007🚀 🤑博客内容🤑:🍭嵌入式开发、Linux、C语言、C、数据结构、音视频🍭 🤣本文内容🤣&a…...
DP(7) | 打家劫舍① | Java | LeetCode 198, 213, 337 做题总结(未完)
打家劫舍问题 来源于代码随想录:https://programmercarl.com/0198.%E6%89%93%E5%AE%B6%E5%8A%AB%E8%88%8D.html#%E6%80%9D%E8%B7%AF ① 确定dp数组(dp table)以及下标的含义 dp[i]:考虑下标i(包括i)以内的房…...

人工智能算法工程师(中级)课程17-模型的量化与部署之剪枝技巧与代码详解
大家好,我是微学AI,今天给大家介绍一下人工智能算法工程师(中级)课程17-模型的量化与部署之剪枝技巧与代码详解。模型剪枝是深度学习领域中一项关键的技术,旨在减少神经网络中的冗余权重,从而降低计算成本和内存占用,同…...
JavaScript 实例:掌握编程技巧
JavaScript 实例:掌握编程技巧 JavaScript 是一种广泛使用的编程语言,它为网页添加交互性,是现代网络开发的重要组成部分。本文将通过一系列实例,帮助您更好地理解和掌握 JavaScript 的核心概念和编程技巧。 基础实例:变量和数据类型 首先,让我们从最基础的开始。Java…...

自己做小项目时,配置的Maven需要用阿里云私服加速Jar包的下载
在我的IDEA中,maven配置在了这个地址,然后我需要去这个地址下找到settings.xml的maven配置文件来配置以下的阿里云私服地址来加速jar包的下载!【不然就是下N年很慢!】...
Linux笔记之time命令测量命令的执行时间
Linux笔记之time命令测量命令的执行时间 在Linux中,time命令用于测量命令的执行时间。这对于分析和优化脚本或程序的性能非常有用。time命令会显示三个主要时间指标: real: 从命令开始到结束的实际时间(也称为挂钟时间)。user: …...

《基于 CDC、Spark Streaming、Kafka 实现患者指标采集》
📢 大家好,我是 【战神刘玉栋】,有10多年的研发经验,致力于前后端技术栈的知识沉淀和传播。 💗 🌻 CSDN入驻不久,希望大家多多支持,后续会继续提升文章质量,绝不滥竽充数…...
重要的单元测试
👽System.out.println(“👋🏼嗨,大家好,我是代码不会敲的小符,目前工作于上海某电商服务公司…”); 📚System.out.println(“🎈如果文章中有错误的地方,恳请大家指正&…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...

Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...
Java 二维码
Java 二维码 **技术:**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...

【JVM面试篇】高频八股汇总——类加载和类加载器
目录 1. 讲一下类加载过程? 2. Java创建对象的过程? 3. 对象的生命周期? 4. 类加载器有哪些? 5. 双亲委派模型的作用(好处)? 6. 讲一下类的加载和双亲委派原则? 7. 双亲委派模…...

MeshGPT 笔记
[2311.15475] MeshGPT: Generating Triangle Meshes with Decoder-Only Transformers https://library.scholarcy.com/try 真正意义上的AI生成三维模型MESHGPT来袭!_哔哩哔哩_bilibili GitHub - lucidrains/meshgpt-pytorch: Implementation of MeshGPT, SOTA Me…...
精益数据分析(98/126):电商转化率优化与网站性能的底层逻辑
精益数据分析(98/126):电商转化率优化与网站性能的底层逻辑 在电子商务领域,转化率与网站性能是决定商业成败的核心指标。今天,我们将深入解析不同类型电商平台的转化率基准,探讨页面加载速度对用户行为的…...
AT模式下的全局锁冲突如何解决?
一、全局锁冲突解决方案 1. 业务层重试机制(推荐方案) Service public class OrderService {GlobalTransactionalRetryable(maxAttempts 3, backoff Backoff(delay 100))public void createOrder(OrderDTO order) {// 库存扣减(自动加全…...
LTR-381RGB-01RGB+环境光检测应用场景及客户类型主要有哪些?
RGB环境光检测 功能,在应用场景及客户类型: 1. 可应用的儿童玩具类型 (1) 智能互动玩具 功能:通过检测环境光或物体颜色触发互动(如颜色识别积木、光感音乐盒)。 客户参考: LEGO(乐高&#x…...

spring中的@KafkaListener 注解详解
KafkaListener 是 Spring Kafka 提供的一个核心注解,用于标记一个方法作为 Kafka 消息的消费者。下面是对该注解的详细解析: 基本用法 KafkaListener(topics "myTopic", groupId "myGroup") public void listen(String message)…...