秒杀优化(异步秒杀,基于redis-stream实现消息队列)
目录
- 秒杀优化
- 一:异步秒杀
- 1:思路
- 2:实现
- 二:redis实现消息队列
- 1:什么是消息队列
- 2:基于list结构实现消息队列
- 3:基于pubsub实现消息队列
- 4:基于stream实现消息队列
- 5:stream的消费者组模式
- 三:基于redis的stream结构实现消息队列
秒杀优化
一:异步秒杀
1:思路
原本我们每一个请求都是串行执行,从头到尾执行完了才算一个请求处理成功,这样过于耗时,我们看到执行的操作中查询优惠券,查询订单,减库存,创建订单都是数据库操作,而数据库的性能又不是很好,我们可以将服务拆分成两部分,将判断优惠券信息和校验一人一单的操作提取出来,先执行判断优惠券和校验操作,然后直接返回订单id,我们在陆续操作数据库减库存和创建订单,这样前端响应的会非常快,并且我们可以将优惠券和一人一单的操作放在redis中去执行,这样又能提高性能,然后我们将优惠券信息,用户信息,订单信息,先保存在队列里,先返回给前端数据,在慢慢的根据队列的信息去存入数据
我们之前说将查询和校验功能放在redis中实现,那么用什么结构呢,查询订单很简单,只要查询相应的优惠券的库存是否大于0就行,我们就可以是否字符串结构,key存优惠券信息,value存库存;那么校验呢,因为是一人一单,所以我们可以使用set,这样就能保证用户的唯一性;
我们执行的具体步骤是:先判断库存是否充足,不充足直接返回,充足判断是否有资格购买,没有返回,有就可以减库存,然后将用户加入集合中,在返回,因为我们执行这些操作时要保证命令的原子性,所以这些操作我们都使用lua脚本来编写;
具体的执行流程就是,先执行lua脚本,如果结果不是0那么直接返回,如果不是0,那么就将信息存入阻塞队列然后返回订单id;
2:实现
1:新增时添加到redis
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
2:lua脚本编写:
local stock =tonumber(redis.call('get', 'seckill:stock:' .. ARGV[1]))
if (stock<=0) thenreturn 1
end
local userId=ARGV[2]
local isok=tonumber(redis.call('sadd','seckill:order:'..ARGV[1],userId))
if isok==0 thenreturn 2
end
redis.call('incrby','seckill:stock:'..ARGV[1],-1)
return 0
然后就能改变之前的代码,在redis中实现异步下单:
@Override
public Result seckilOrder(Long voucherId) throws InterruptedException {Long id = UserHolder.getUser().getId();Long res = (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA, Collections.emptyList(), voucherId.toString(), id.toString());if (res!=0){return Result.fail(res==1?"库存不足":"一人只能购买一单");}long orderID = redisIDWork.nextId("order");return Result.ok(orderID);}
初始化lua脚本文件
@Resource
private RedissonClient redissonClient2;
public static final DefaultRedisScript SECKIL_ORDER_LUA;
static {//初始化SECKIL_ORDER_LUA=new DefaultRedisScript<>();//定位到lua脚本的位置SECKIL_ORDER_LUA.setLocation(new ClassPathResource("seckill.lua"));//设置lua脚本的返回值SECKIL_ORDER_LUA.setResultType(Long.class);
}
还剩一个阻塞队列没有实现:
阻塞队列的功能就是异步的将订单信息存入数据库;
阻塞队列可以使用blockdeque
BlockingQueue<VoucherOrder> blockingQueue = new ArrayBlockingQueue<VoucherOrder>(1024*1024);
在类上直接初始化
然后使用的时候就是,将订单添加到阻塞队列,让另一个线程去执行,往数据库中添加阻塞队列中的订单信息:
blockingQueue.add(voucherOrder);
然后就要开出一个线程,然后执行往数据库添加元素的任务了:
//创建一个线程private ExecutorService SECKILL_ORDER_EXECUTOR=Executors.newSingleThreadExecutor();//注解PostConstruct,添加这个注解的方法就是在类初始化完成之后就会执行;@PostConstructprivate void init(){//提交任务SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle());}//定义一个任务内部类,实现Runnable,然后需要实现run方法,run方法中就是我们的任务private class VoucherOrderHandle implements Runnable {@Overridepublic void run() {try {//从阻塞队列中取出订单VoucherOrder voucherOrder = blockingQueue.take();//执行方法handleVoucherOrder(voucherOrder);} catch (InterruptedException e) {log.info("下单业务异常",e);}}}
当类加载是就会一直提交任务,只要阻塞队列里有订单,就会将订单取出然后调用方法将订单存入数据库
调用的方法是尝试获取锁的方法,而获取锁其实并不需要,因为我们自己开出来的线程只有一个是单线程,而且在lua脚本中已经对一人一单还有超卖问题进行处理,这里只是为了更加保险
@Transactionalpublic void handleVoucherOrder(VoucherOrder voucherOrder) throws InterruptedException {
// SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order" + id, stringRedisTemplate);Long userId = voucherOrder.getUserId();RLock simpleRedisLock = redissonClient2.getLock("lock:order" + userId);boolean trylock = simpleRedisLock.tryLock(1L, TimeUnit.SECONDS);if (!trylock){log.info("获取锁失败");}try {orderService.createVoucherOrder(voucherOrder);} catch (IllegalStateException e) {throw new RuntimeException(e);}finally {simpleRedisLock.unlock();}}
然后获取锁成功后就会调用方法执行数据库操作,但是这个方法是带有事务的,我们单独开出来的子线程无法使事务生效,只能在方法的外部声明一个代理对象,然后通过代理对象去调用方法使事务生效;
@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {Integer count = query().eq("user_id", voucherOrder.getUserId()).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.info("一个用户只能下一单");}//进行更新,库存减一boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();// where id = ? and stock > 0//扣减失败,返回错误信息;if (!success) {log.info("扣减失败");}save(voucherOrder);}
因为我们是开出来的子线程调用的方法,所以不能从线程中获取值,只能从我们传入的订单对象获取,然后就是减库存和存入订单的操作了;
总结:
我们使用异步操作,将下单和存入订单分开来执行,大大提高了执行的销量,在redis中完成超卖和一人一单的问题;
然后使用阻塞队列,开出一个子线程异步存入数据库下单;
问题:
我们的阻塞队列是在jvm中的,jvm中内存是有上线的,超过上限就会有异常,还有就是我们的数据都是存放在内存中,要是出现了一些事故会导致数据丢失
二:redis实现消息队列
1:什么是消息队列
消息队列由三个角色构成:
1:生产者:发送消息到消息队列
2:消息队列:存储和管理消息队列,也被称为消息代理
3:消费者:从消息队列中获取消息并处理
好的消息队列有这几个特点:
1:有独立的服务,独立的内存;
2:可以做到数据的持久化
3:能够发送消息给消费者并且确保消息处理完成
2:基于list结构实现消息队列
使用brpop可以实现阻塞获取
3:基于pubsub实现消息队列
4:基于stream实现消息队列
stream发送消息的方式xadd key * msg
key是指消息队列的名称,* 是发送消息的名称由redis来生成,后面的msg就是键值对,我们要发宋的消息
xread是读取消息的命令:count指定读取消息的数量,block指定阻塞时间,不指定就是不阻塞,指定0就是无限等待,sreams 是消息队列的名称,可以是多个,id是消息的id,0是从0开始读,$是从最新的开始读
但是有个问题就是,指定$是获取最新的消息,但是只是获取使用这个命令之后最新的消息,而如果一次性发多条,只会获取最后一个,就会出现漏消息;
5:stream的消费者组模式
消费者组就是将消费者划分到一个组中监听一个消息队列:
有这些好处:
1:消息分流:消息发送到消费者组中,消费者会处于竞争关系,会争夺消息来处理,这个发送多个消息就会实现分流,就会由不同的消费者来处理,加快了处理速度;
2:消息标识:在读取消息后会记录最后一个被处理的消息,这样就不会出现消息漏读的情况;
3:消息确认:消息发出去会,消息会处于pending状态,会等待消息处理完毕,这个时候会将消息存入pendinglist中,当处理完后才会从pending中移除;确保了消息的安全性,保证消息不会丢失,就算再消息发出去后,服务宕机了,也能知道该消息没有被处理,这个功能的作用就是确保消息至少被消费一次;
三:基于redis的stream结构实现消息队列
首先再redis客户端中输入命令创建一个队列和接受这个队列消息的组
然后修改秒杀下单的lua脚本,直接在redis中通过消息队列将消息发送给消费者:
local orderId=ARGV[3]
local stock =tonumber(redis.call('get', 'seckill:stock:' .. ARGV[1]))
if (stock<=0) thenreturn 1
end
local userId=ARGV[2]
local isok=tonumber(redis.call('sadd','seckill:order:'..ARGV[1],userId))
if isok==0 thenreturn 2
end
redis.call('incrby','seckill:stock:'..ARGV[1],-1)
--将消息发送给stream.orders队列
redis.call('xadd','stream.orders','*','userId',userId,'id',orderId,'voucherId',ARGV[1])
return 0
这里发送的是优惠券id,用户id还有订单id,正是我们存入数据库中所需要的参数
然后就可以去修改前面秒杀下单的逻辑,不用去将消息放到阻塞队列,我们直接从redis的队列中取出就行;
@Override
public Result seckilOrder(Long voucherId) throws InterruptedException {long orderId = redisIDWork.nextId("order");Long userId = UserHolder.getUser().getId();Long res = (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA,Collections.emptyList(), voucherId.toString(),userId.toString(),String.valueOf(orderId));if (res != 0) {return Result.fail(res == 1 ? "库存不足" : "一人只能购买一单");}orderService = (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);
}
这里我们需要将订单id作为lua脚本的参数传入进去,然后将订单信息存入阻塞队列的操作可以省略,因为我们已经将订单信息存入了redis中的消息队列;
然后这里我们需要单独开出一个线程去将队列中的消息存入数据库:
private class VoucherOrderHandle implements Runnable {String ququeName="stream.orders";@Overridepublic void run() {try {//从消息队列中取出订单while (true){//xreadgroup GROUP group consumer count(1) block(2000) streams key >List<MapRecord<String, Object, Object>> msg = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(ququeName, ReadOffset.lastConsumed()));//如果消息为空就继续等待接收if (msg==null||msg.isEmpty()){continue;}//因为每次读取一个消息,所以我们获取第一个消息MapRecord<String, Object, Object> entries = msg.get(0);//获取消息的值,是一些我们传入的键值对Map<Object, Object> value = entries.getValue();//将map转成voucherorder对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//执行方法handleVoucherOrder(voucherOrder);//确认消息已经处理stringRedisTemplate.opsForStream().acknowledge(ququeName,"g1",entries.getId());}} catch (InterruptedException e) {log.info("下单业务异常",e);handleVoucherOrderError();}}
我们要做的就是接受消息,然后再将消息存入数据库:
我们调用stream的方法,作为消费者从队列中读取消息,阻塞时间是2秒,每次读取一个消息,从下一个未消费的消息读取,如果读取的消息为空那么就继续循环读取消息,如果有消息就将消息取出,然后将其转成对象map,再将其转成对象,然后再去做确认消息的处理,如果不确认消息,消息就会存在待处理的队列中;如果出现的异常,那么我们取出的消息可能没有进行确认,没有确认的会存入待处理队列,我们就要从队列里取出然后进行处理;
出错只会执行的方法:
private void handleVoucherOrderError() {try {//从消息队列中取出订单while (true){//xreadgroup GROUP group consumer count(1) streams key 0,表示从第一个未处理的消息开始读取List<MapRecord<String, Object, Object>> msg = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1), StreamOffset.create(ququeName, ReadOffset.from("0")));//如果为空就说明没有待处理的消息结束就行if (msg==null||msg.isEmpty()){break;}//因为每次读取一个消息,所以我们获取第一个消息MapRecord<String, Object, Object> entries = msg.get(0);//获取消息的值,是一些我们传入的键值对Map<Object, Object> value = entries.getValue();//将map转成voucherorder对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//执行方法handleVoucherOrder(voucherOrder);}} catch (InterruptedException e) {log.info("下单业务异常",e);}}
}
这里因为是再待处理中直接取出,所以不用阻塞处理,然后从待消费队列中第一个消息开始读,如果为空,那么就说明没有待处理的消息,我们直接返回就行,如果不为空我们再处理
这样使用redis中的消息队列就实现了:1:独立的服务,足够的内存;2:有确认机制,避免消息漏读;3:消息持久化
BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);
//执行方法
handleVoucherOrder(voucherOrder);
}
} catch (InterruptedException e) {
log.info(“下单业务异常”,e);
}
}
}
> 这里因为是再待处理中直接取出,所以不用阻塞处理,然后从待消费队列中第一个消息开始读,如果为空,那么就说明没有待处理的消息,我们直接返回就行,如果不为空我们再处理这样使用redis中的消息队列就实现了:1:独立的服务,足够的内存;2:有确认机制,避免消息漏读;3:消息持久化
相关文章:

秒杀优化(异步秒杀,基于redis-stream实现消息队列)
目录 秒杀优化一:异步秒杀1:思路2:实现 二:redis实现消息队列1:什么是消息队列2:基于list结构实现消息队列3:基于pubsub实现消息队列4:基于stream实现消息队列5:stream的…...

Node.js——fs模块-文件读取
1、文件读取:通过程序从文件中去除其中的数据 2、方法 方法 说明 readFile 异步读取 readFileSync 同步读取 createReadStrean 流式读取 3、readFile 异步读取 语法: 本文的分享到此结束,欢迎大家评论区一同讨论学习,下一…...
深入理解 ZooKeeper:分布式协调服务的核心与应用
一、引言 随着互联网技术的飞速发展,分布式系统的规模和复杂性不断增加。在分布式环境中,各个节点之间需要进行高效的协调和通信,以确保系统的正常运行。ZooKeeper 正是为了解决分布式系统中的协调问题而诞生的一款开源软件。它提供了一种简单…...
你竟然还不了解 LDAP?
目录 什么是 LDAP LDAP 的工作原理 LDAP 的数据模型 LDAP 操作 LDAP 的使用场景 常见的 LDAP 服务器 小结 什么是 LDAP LDAP(Lightweight Directory Access Protocol,轻量级目录访问协议)是用于访问和管理目录服务的一种开放协议&…...

宝塔使用clickhouse踩坑
前言 最近有个物联网项目,需要存储物联网终端发送过来的信息(类似log日志,但又要存储在数据库里,方便后期聚合统计),本来想写文件的奈何客户要求聚合统计,所以只能用数据库才能达到更高的计算效…...

Linux命令学习记录
ls 查看文件资源,ls [选项] [路径] ls ls [单个选项] ls [组合选项],选项的组合与顺序无关 ls --help 查看更多命令参数 clear 清屏 cd 更换工作目录,cd [路径] cd [特殊路径符] . 表示当前目录 .. …...

一般无人机和FPV无人机的区别
文章目录 一般无人机的工作原理关键组件:一般无人机的应用领域一般无人机的操控体验 FPV无人机的工作原理关键组件:FPV无人机的应用领域FPV无人机的操控体验性能特点FPV无人机的性能特点 未来无人机发展方向和通信方式拓展 一般无人机的工作原理 一般无…...

数据结构初阶排序全解
目录 1>>排序前言 2>>插入排序 2.1>>直接插入排序 2.2>>希尔排序 3>>选择排序 3.1>>直接选择排序 3.2>>堆排序 4>>交换排序 4.1冒泡排序 4.2快速排序 5>>归并排序 6>>测试 test.c sort.h sort.c 7…...

MySQL的SQL语句之触发器的创建和应用
触发器 Trigger 一.触发器 作用:当检测到某种数据表发生数据变化时,自动执行操作,保证数据的完整性,保证数据的一致性。 1.创建一个触发器 如上图所示,查看这个create的帮助信息的时候,这个create trig…...

myWebserver 介绍
项目总结 项目准备过程中,主要阅读了《Linux 高性能服务器编程》游双 一书。源码参考的是:TinyWebServer,我在此源码的基础上做了一定的优化和修改。 我的代码:Github: myWebserver: 我的C服务器项目 我的 Webserver 项目总结&…...

钉钉平台开发小程序
一、下载小程序开发者工具 官网地址:小程序开发工具 - 钉钉开放平台 客户端类型 下载链接 MacOS x64 https://ur.alipay.com/volans-demo_MiniProgramStudio-x64.dmg MacOS arm64 https://ur.alipay.com/volans-demo_MiniProgramStudio-arm64.dmg Windows ht…...

九识智能与徐工汽车达成战略合作,共绘商用车未来新蓝图
近日,九识智能与徐工汽车签署战略合作协议,标志着双方在智能驾驶技术与新能源商用车融合应用、联合生产及市场推广等方面迈入深度合作的新篇章,将共同引领智能驾驶技术商业化浪潮。 近年来,在国家智能化发展战略的引领下ÿ…...

Serverless + AI 让应用开发更简单
本文整理自 2024 云栖大会,阿里云智能高级技术专家,史明伟演讲议题《Serverless AI 让应用开发更简单》 随着云计算和人工智能(AI)技术的飞速发展,企业对于高效、灵活且成本效益高的解决方案的需求日益增长。本文旨在…...

外包功能测试就干了4周,技术退步太明显了。。。。。
先说一下自己的情况,大专生,21年通过校招进入武汉某软件公司,干了差不多3个星期的功能测试,那年国庆,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落!而我才在一个外包企业干了4周的功…...
外观模式及运用场景
外观模式(Facade Pattern)是一种结构性设计模式,它为复杂子系统提供一个统一的接口,从而简化与这些子系统的交互。通过外观模式,客户端可以更轻松地使用复杂系统,而不必了解其内部实现。接下来,…...

PyQt5实战——多脚本集合包,UI以及工程布局(二)
个人博客:苏三有春的博客 系列往期: PyQt5实战——多脚本集合包,前言与环境配置(一) 布局 2.1 UI页面布局 整体框架分为分为三个部分,垂直分布。 第一个部分为功能选择按钮(如UTF-8转换&#…...

Python 数据结构对比:列表与数组的选择指南
文章目录 💯前言💯Python中的列表(list)和数组(array)的详细对比1. 数据类型的灵活性2. 性能与效率3. 功能与操作4. 使用场景5. 数据结构选择的考量6. 实际应用案例7. 结论 💯小结 💯…...

gem5运行简单RISC-V全系统模拟
简单记录gem5中运行最简单的RISC-V Full System Simulation的过程 首先是编译RISC-V和m5term,这部分不多写了,官网均有对应教程。 之后直接使用官方在configs/example/gem5_library目录下的riscv-fs.py 运行如下命令 ./build/RISCV/gem5.opt configs/…...
洛谷 P1195 口袋的天空
自用。 题目传送门:口袋的天空 - 洛谷 题解:Inori_333 参考题解:题解 P1195 【口袋的天空】 - 洛谷专栏 /*P1195 口袋的天空https://www.luogu.com.cn/problem/P11952024/11/03 submit:inori333 */#include <iostream> #include &…...

ffmpeg视频滤镜:膨胀操作-dilation
滤镜介绍 dilation 官网链接 > FFmpeg Filters Documentation 膨胀滤镜会使图片变的更亮,会让细节别的更明显。膨胀也是形态学中的一种操作,在opencv中也有响应的算子。此外膨胀结合此前腐蚀操作,可以构成开闭操作。 开操作是先腐蚀…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...

Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
服务器--宝塔命令
一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行! sudo su - 1. CentOS 系统: yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...
使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度
文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...

算法:模拟
1.替换所有的问号 1576. 替换所有的问号 - 力扣(LeetCode) 遍历字符串:通过外层循环逐一检查每个字符。遇到 ? 时处理: 内层循环遍历小写字母(a 到 z)。对每个字母检查是否满足: 与…...
Java编程之桥接模式
定义 桥接模式(Bridge Pattern)属于结构型设计模式,它的核心意图是将抽象部分与实现部分分离,使它们可以独立地变化。这种模式通过组合关系来替代继承关系,从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...
scikit-learn机器学习
# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...