秒杀优化(异步秒杀,基于redis-stream实现消息队列)
目录
- 秒杀优化
- 一:异步秒杀
- 1:思路
- 2:实现
- 二:redis实现消息队列
- 1:什么是消息队列
- 2:基于list结构实现消息队列
- 3:基于pubsub实现消息队列
- 4:基于stream实现消息队列
- 5:stream的消费者组模式
- 三:基于redis的stream结构实现消息队列
秒杀优化
一:异步秒杀
1:思路

原本我们每一个请求都是串行执行,从头到尾执行完了才算一个请求处理成功,这样过于耗时,我们看到执行的操作中查询优惠券,查询订单,减库存,创建订单都是数据库操作,而数据库的性能又不是很好,我们可以将服务拆分成两部分,将判断优惠券信息和校验一人一单的操作提取出来,先执行判断优惠券和校验操作,然后直接返回订单id,我们在陆续操作数据库减库存和创建订单,这样前端响应的会非常快,并且我们可以将优惠券和一人一单的操作放在redis中去执行,这样又能提高性能,然后我们将优惠券信息,用户信息,订单信息,先保存在队列里,先返回给前端数据,在慢慢的根据队列的信息去存入数据

我们之前说将查询和校验功能放在redis中实现,那么用什么结构呢,查询订单很简单,只要查询相应的优惠券的库存是否大于0就行,我们就可以是否字符串结构,key存优惠券信息,value存库存;那么校验呢,因为是一人一单,所以我们可以使用set,这样就能保证用户的唯一性;
我们执行的具体步骤是:先判断库存是否充足,不充足直接返回,充足判断是否有资格购买,没有返回,有就可以减库存,然后将用户加入集合中,在返回,因为我们执行这些操作时要保证命令的原子性,所以这些操作我们都使用lua脚本来编写;
具体的执行流程就是,先执行lua脚本,如果结果不是0那么直接返回,如果不是0,那么就将信息存入阻塞队列然后返回订单id;
2:实现

1:新增时添加到redis
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
2:lua脚本编写:
local stock =tonumber(redis.call('get', 'seckill:stock:' .. ARGV[1]))
if (stock<=0) thenreturn 1
end
local userId=ARGV[2]
local isok=tonumber(redis.call('sadd','seckill:order:'..ARGV[1],userId))
if isok==0 thenreturn 2
end
redis.call('incrby','seckill:stock:'..ARGV[1],-1)
return 0
然后就能改变之前的代码,在redis中实现异步下单:
@Override
public Result seckilOrder(Long voucherId) throws InterruptedException {Long id = UserHolder.getUser().getId();Long res = (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA, Collections.emptyList(), voucherId.toString(), id.toString());if (res!=0){return Result.fail(res==1?"库存不足":"一人只能购买一单");}long orderID = redisIDWork.nextId("order");return Result.ok(orderID);}
初始化lua脚本文件
@Resource
private RedissonClient redissonClient2;
public static final DefaultRedisScript SECKIL_ORDER_LUA;
static {//初始化SECKIL_ORDER_LUA=new DefaultRedisScript<>();//定位到lua脚本的位置SECKIL_ORDER_LUA.setLocation(new ClassPathResource("seckill.lua"));//设置lua脚本的返回值SECKIL_ORDER_LUA.setResultType(Long.class);
}
还剩一个阻塞队列没有实现:
阻塞队列的功能就是异步的将订单信息存入数据库;
阻塞队列可以使用blockdeque
BlockingQueue<VoucherOrder> blockingQueue = new ArrayBlockingQueue<VoucherOrder>(1024*1024);
在类上直接初始化
然后使用的时候就是,将订单添加到阻塞队列,让另一个线程去执行,往数据库中添加阻塞队列中的订单信息:
blockingQueue.add(voucherOrder);
然后就要开出一个线程,然后执行往数据库添加元素的任务了:
//创建一个线程private ExecutorService SECKILL_ORDER_EXECUTOR=Executors.newSingleThreadExecutor();//注解PostConstruct,添加这个注解的方法就是在类初始化完成之后就会执行;@PostConstructprivate void init(){//提交任务SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle());}//定义一个任务内部类,实现Runnable,然后需要实现run方法,run方法中就是我们的任务private class VoucherOrderHandle implements Runnable {@Overridepublic void run() {try {//从阻塞队列中取出订单VoucherOrder voucherOrder = blockingQueue.take();//执行方法handleVoucherOrder(voucherOrder);} catch (InterruptedException e) {log.info("下单业务异常",e);}}}
当类加载是就会一直提交任务,只要阻塞队列里有订单,就会将订单取出然后调用方法将订单存入数据库
调用的方法是尝试获取锁的方法,而获取锁其实并不需要,因为我们自己开出来的线程只有一个是单线程,而且在lua脚本中已经对一人一单还有超卖问题进行处理,这里只是为了更加保险
@Transactionalpublic void handleVoucherOrder(VoucherOrder voucherOrder) throws InterruptedException {
// SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order" + id, stringRedisTemplate);Long userId = voucherOrder.getUserId();RLock simpleRedisLock = redissonClient2.getLock("lock:order" + userId);boolean trylock = simpleRedisLock.tryLock(1L, TimeUnit.SECONDS);if (!trylock){log.info("获取锁失败");}try {orderService.createVoucherOrder(voucherOrder);} catch (IllegalStateException e) {throw new RuntimeException(e);}finally {simpleRedisLock.unlock();}}
然后获取锁成功后就会调用方法执行数据库操作,但是这个方法是带有事务的,我们单独开出来的子线程无法使事务生效,只能在方法的外部声明一个代理对象,然后通过代理对象去调用方法使事务生效;
@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {Integer count = query().eq("user_id", voucherOrder.getUserId()).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.info("一个用户只能下一单");}//进行更新,库存减一boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();// where id = ? and stock > 0//扣减失败,返回错误信息;if (!success) {log.info("扣减失败");}save(voucherOrder);}
因为我们是开出来的子线程调用的方法,所以不能从线程中获取值,只能从我们传入的订单对象获取,然后就是减库存和存入订单的操作了;

总结:
我们使用异步操作,将下单和存入订单分开来执行,大大提高了执行的销量,在redis中完成超卖和一人一单的问题;
然后使用阻塞队列,开出一个子线程异步存入数据库下单;
问题:
我们的阻塞队列是在jvm中的,jvm中内存是有上线的,超过上限就会有异常,还有就是我们的数据都是存放在内存中,要是出现了一些事故会导致数据丢失
二:redis实现消息队列
1:什么是消息队列

消息队列由三个角色构成:
1:生产者:发送消息到消息队列
2:消息队列:存储和管理消息队列,也被称为消息代理
3:消费者:从消息队列中获取消息并处理
好的消息队列有这几个特点:
1:有独立的服务,独立的内存;
2:可以做到数据的持久化
3:能够发送消息给消费者并且确保消息处理完成

2:基于list结构实现消息队列


使用brpop可以实现阻塞获取
3:基于pubsub实现消息队列


4:基于stream实现消息队列

stream发送消息的方式xadd key * msg
key是指消息队列的名称,* 是发送消息的名称由redis来生成,后面的msg就是键值对,我们要发宋的消息

xread是读取消息的命令:count指定读取消息的数量,block指定阻塞时间,不指定就是不阻塞,指定0就是无限等待,sreams 是消息队列的名称,可以是多个,id是消息的id,0是从0开始读,$是从最新的开始读


但是有个问题就是,指定$是获取最新的消息,但是只是获取使用这个命令之后最新的消息,而如果一次性发多条,只会获取最后一个,就会出现漏消息;

5:stream的消费者组模式

消费者组就是将消费者划分到一个组中监听一个消息队列:
有这些好处:
1:消息分流:消息发送到消费者组中,消费者会处于竞争关系,会争夺消息来处理,这个发送多个消息就会实现分流,就会由不同的消费者来处理,加快了处理速度;
2:消息标识:在读取消息后会记录最后一个被处理的消息,这样就不会出现消息漏读的情况;
3:消息确认:消息发出去会,消息会处于pending状态,会等待消息处理完毕,这个时候会将消息存入pendinglist中,当处理完后才会从pending中移除;确保了消息的安全性,保证消息不会丢失,就算再消息发出去后,服务宕机了,也能知道该消息没有被处理,这个功能的作用就是确保消息至少被消费一次;




三:基于redis的stream结构实现消息队列

首先再redis客户端中输入命令创建一个队列和接受这个队列消息的组
然后修改秒杀下单的lua脚本,直接在redis中通过消息队列将消息发送给消费者:
local orderId=ARGV[3]
local stock =tonumber(redis.call('get', 'seckill:stock:' .. ARGV[1]))
if (stock<=0) thenreturn 1
end
local userId=ARGV[2]
local isok=tonumber(redis.call('sadd','seckill:order:'..ARGV[1],userId))
if isok==0 thenreturn 2
end
redis.call('incrby','seckill:stock:'..ARGV[1],-1)
--将消息发送给stream.orders队列
redis.call('xadd','stream.orders','*','userId',userId,'id',orderId,'voucherId',ARGV[1])
return 0
这里发送的是优惠券id,用户id还有订单id,正是我们存入数据库中所需要的参数
然后就可以去修改前面秒杀下单的逻辑,不用去将消息放到阻塞队列,我们直接从redis的队列中取出就行;
@Override
public Result seckilOrder(Long voucherId) throws InterruptedException {long orderId = redisIDWork.nextId("order");Long userId = UserHolder.getUser().getId();Long res = (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA,Collections.emptyList(), voucherId.toString(),userId.toString(),String.valueOf(orderId));if (res != 0) {return Result.fail(res == 1 ? "库存不足" : "一人只能购买一单");}orderService = (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);
}
这里我们需要将订单id作为lua脚本的参数传入进去,然后将订单信息存入阻塞队列的操作可以省略,因为我们已经将订单信息存入了redis中的消息队列;
然后这里我们需要单独开出一个线程去将队列中的消息存入数据库:
private class VoucherOrderHandle implements Runnable {String ququeName="stream.orders";@Overridepublic void run() {try {//从消息队列中取出订单while (true){//xreadgroup GROUP group consumer count(1) block(2000) streams key >List<MapRecord<String, Object, Object>> msg = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(ququeName, ReadOffset.lastConsumed()));//如果消息为空就继续等待接收if (msg==null||msg.isEmpty()){continue;}//因为每次读取一个消息,所以我们获取第一个消息MapRecord<String, Object, Object> entries = msg.get(0);//获取消息的值,是一些我们传入的键值对Map<Object, Object> value = entries.getValue();//将map转成voucherorder对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//执行方法handleVoucherOrder(voucherOrder);//确认消息已经处理stringRedisTemplate.opsForStream().acknowledge(ququeName,"g1",entries.getId());}} catch (InterruptedException e) {log.info("下单业务异常",e);handleVoucherOrderError();}}
我们要做的就是接受消息,然后再将消息存入数据库:
我们调用stream的方法,作为消费者从队列中读取消息,阻塞时间是2秒,每次读取一个消息,从下一个未消费的消息读取,如果读取的消息为空那么就继续循环读取消息,如果有消息就将消息取出,然后将其转成对象map,再将其转成对象,然后再去做确认消息的处理,如果不确认消息,消息就会存在待处理的队列中;如果出现的异常,那么我们取出的消息可能没有进行确认,没有确认的会存入待处理队列,我们就要从队列里取出然后进行处理;
出错只会执行的方法:
private void handleVoucherOrderError() {try {//从消息队列中取出订单while (true){//xreadgroup GROUP group consumer count(1) streams key 0,表示从第一个未处理的消息开始读取List<MapRecord<String, Object, Object>> msg = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1), StreamOffset.create(ququeName, ReadOffset.from("0")));//如果为空就说明没有待处理的消息结束就行if (msg==null||msg.isEmpty()){break;}//因为每次读取一个消息,所以我们获取第一个消息MapRecord<String, Object, Object> entries = msg.get(0);//获取消息的值,是一些我们传入的键值对Map<Object, Object> value = entries.getValue();//将map转成voucherorder对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//执行方法handleVoucherOrder(voucherOrder);}} catch (InterruptedException e) {log.info("下单业务异常",e);}}
}
这里因为是再待处理中直接取出,所以不用阻塞处理,然后从待消费队列中第一个消息开始读,如果为空,那么就说明没有待处理的消息,我们直接返回就行,如果不为空我们再处理
这样使用redis中的消息队列就实现了:1:独立的服务,足够的内存;2:有确认机制,避免消息漏读;3:消息持久化
BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);
//执行方法
handleVoucherOrder(voucherOrder);
}
} catch (InterruptedException e) {
log.info(“下单业务异常”,e);
}
}
}
> 这里因为是再待处理中直接取出,所以不用阻塞处理,然后从待消费队列中第一个消息开始读,如果为空,那么就说明没有待处理的消息,我们直接返回就行,如果不为空我们再处理这样使用redis中的消息队列就实现了:1:独立的服务,足够的内存;2:有确认机制,避免消息漏读;3:消息持久化相关文章:
秒杀优化(异步秒杀,基于redis-stream实现消息队列)
目录 秒杀优化一:异步秒杀1:思路2:实现 二:redis实现消息队列1:什么是消息队列2:基于list结构实现消息队列3:基于pubsub实现消息队列4:基于stream实现消息队列5:stream的…...
Node.js——fs模块-文件读取
1、文件读取:通过程序从文件中去除其中的数据 2、方法 方法 说明 readFile 异步读取 readFileSync 同步读取 createReadStrean 流式读取 3、readFile 异步读取 语法: 本文的分享到此结束,欢迎大家评论区一同讨论学习,下一…...
深入理解 ZooKeeper:分布式协调服务的核心与应用
一、引言 随着互联网技术的飞速发展,分布式系统的规模和复杂性不断增加。在分布式环境中,各个节点之间需要进行高效的协调和通信,以确保系统的正常运行。ZooKeeper 正是为了解决分布式系统中的协调问题而诞生的一款开源软件。它提供了一种简单…...
你竟然还不了解 LDAP?
目录 什么是 LDAP LDAP 的工作原理 LDAP 的数据模型 LDAP 操作 LDAP 的使用场景 常见的 LDAP 服务器 小结 什么是 LDAP LDAP(Lightweight Directory Access Protocol,轻量级目录访问协议)是用于访问和管理目录服务的一种开放协议&…...
宝塔使用clickhouse踩坑
前言 最近有个物联网项目,需要存储物联网终端发送过来的信息(类似log日志,但又要存储在数据库里,方便后期聚合统计),本来想写文件的奈何客户要求聚合统计,所以只能用数据库才能达到更高的计算效…...
Linux命令学习记录
ls 查看文件资源,ls [选项] [路径] ls ls [单个选项] ls [组合选项],选项的组合与顺序无关 ls --help 查看更多命令参数 clear 清屏 cd 更换工作目录,cd [路径] cd [特殊路径符] . 表示当前目录 .. …...
一般无人机和FPV无人机的区别
文章目录 一般无人机的工作原理关键组件:一般无人机的应用领域一般无人机的操控体验 FPV无人机的工作原理关键组件:FPV无人机的应用领域FPV无人机的操控体验性能特点FPV无人机的性能特点 未来无人机发展方向和通信方式拓展 一般无人机的工作原理 一般无…...
数据结构初阶排序全解
目录 1>>排序前言 2>>插入排序 2.1>>直接插入排序 2.2>>希尔排序 3>>选择排序 3.1>>直接选择排序 3.2>>堆排序 4>>交换排序 4.1冒泡排序 4.2快速排序 5>>归并排序 6>>测试 test.c sort.h sort.c 7…...
MySQL的SQL语句之触发器的创建和应用
触发器 Trigger 一.触发器 作用:当检测到某种数据表发生数据变化时,自动执行操作,保证数据的完整性,保证数据的一致性。 1.创建一个触发器 如上图所示,查看这个create的帮助信息的时候,这个create trig…...
myWebserver 介绍
项目总结 项目准备过程中,主要阅读了《Linux 高性能服务器编程》游双 一书。源码参考的是:TinyWebServer,我在此源码的基础上做了一定的优化和修改。 我的代码:Github: myWebserver: 我的C服务器项目 我的 Webserver 项目总结&…...
钉钉平台开发小程序
一、下载小程序开发者工具 官网地址:小程序开发工具 - 钉钉开放平台 客户端类型 下载链接 MacOS x64 https://ur.alipay.com/volans-demo_MiniProgramStudio-x64.dmg MacOS arm64 https://ur.alipay.com/volans-demo_MiniProgramStudio-arm64.dmg Windows ht…...
九识智能与徐工汽车达成战略合作,共绘商用车未来新蓝图
近日,九识智能与徐工汽车签署战略合作协议,标志着双方在智能驾驶技术与新能源商用车融合应用、联合生产及市场推广等方面迈入深度合作的新篇章,将共同引领智能驾驶技术商业化浪潮。 近年来,在国家智能化发展战略的引领下ÿ…...
Serverless + AI 让应用开发更简单
本文整理自 2024 云栖大会,阿里云智能高级技术专家,史明伟演讲议题《Serverless AI 让应用开发更简单》 随着云计算和人工智能(AI)技术的飞速发展,企业对于高效、灵活且成本效益高的解决方案的需求日益增长。本文旨在…...
外包功能测试就干了4周,技术退步太明显了。。。。。
先说一下自己的情况,大专生,21年通过校招进入武汉某软件公司,干了差不多3个星期的功能测试,那年国庆,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落!而我才在一个外包企业干了4周的功…...
外观模式及运用场景
外观模式(Facade Pattern)是一种结构性设计模式,它为复杂子系统提供一个统一的接口,从而简化与这些子系统的交互。通过外观模式,客户端可以更轻松地使用复杂系统,而不必了解其内部实现。接下来,…...
PyQt5实战——多脚本集合包,UI以及工程布局(二)
个人博客:苏三有春的博客 系列往期: PyQt5实战——多脚本集合包,前言与环境配置(一) 布局 2.1 UI页面布局 整体框架分为分为三个部分,垂直分布。 第一个部分为功能选择按钮(如UTF-8转换&#…...
Python 数据结构对比:列表与数组的选择指南
文章目录 💯前言💯Python中的列表(list)和数组(array)的详细对比1. 数据类型的灵活性2. 性能与效率3. 功能与操作4. 使用场景5. 数据结构选择的考量6. 实际应用案例7. 结论 💯小结 💯…...
gem5运行简单RISC-V全系统模拟
简单记录gem5中运行最简单的RISC-V Full System Simulation的过程 首先是编译RISC-V和m5term,这部分不多写了,官网均有对应教程。 之后直接使用官方在configs/example/gem5_library目录下的riscv-fs.py 运行如下命令 ./build/RISCV/gem5.opt configs/…...
洛谷 P1195 口袋的天空
自用。 题目传送门:口袋的天空 - 洛谷 题解:Inori_333 参考题解:题解 P1195 【口袋的天空】 - 洛谷专栏 /*P1195 口袋的天空https://www.luogu.com.cn/problem/P11952024/11/03 submit:inori333 */#include <iostream> #include &…...
ffmpeg视频滤镜:膨胀操作-dilation
滤镜介绍 dilation 官网链接 > FFmpeg Filters Documentation 膨胀滤镜会使图片变的更亮,会让细节别的更明显。膨胀也是形态学中的一种操作,在opencv中也有响应的算子。此外膨胀结合此前腐蚀操作,可以构成开闭操作。 开操作是先腐蚀…...
Android Wi-Fi 连接失败日志分析
1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...
【Python】 -- 趣味代码 - 小恐龙游戏
文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...
解锁数据库简洁之道:FastAPI与SQLModel实战指南
在构建现代Web应用程序时,与数据库的交互无疑是核心环节。虽然传统的数据库操作方式(如直接编写SQL语句与psycopg2交互)赋予了我们精细的控制权,但在面对日益复杂的业务逻辑和快速迭代的需求时,这种方式的开发效率和可…...
转转集团旗下首家二手多品类循环仓店“超级转转”开业
6月9日,国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解,“超级…...
剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...
HarmonyOS运动开发:如何用mpchart绘制运动配速图表
##鸿蒙核心技术##运动开发##Sensor Service Kit(传感器服务)# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
