秒杀优化(异步秒杀,基于redis-stream实现消息队列)
目录
- 秒杀优化
- 一:异步秒杀
- 1:思路
- 2:实现
- 二:redis实现消息队列
- 1:什么是消息队列
- 2:基于list结构实现消息队列
- 3:基于pubsub实现消息队列
- 4:基于stream实现消息队列
- 5:stream的消费者组模式
- 三:基于redis的stream结构实现消息队列
秒杀优化
一:异步秒杀
1:思路
原本我们每一个请求都是串行执行,从头到尾执行完了才算一个请求处理成功,这样过于耗时,我们看到执行的操作中查询优惠券,查询订单,减库存,创建订单都是数据库操作,而数据库的性能又不是很好,我们可以将服务拆分成两部分,将判断优惠券信息和校验一人一单的操作提取出来,先执行判断优惠券和校验操作,然后直接返回订单id,我们在陆续操作数据库减库存和创建订单,这样前端响应的会非常快,并且我们可以将优惠券和一人一单的操作放在redis中去执行,这样又能提高性能,然后我们将优惠券信息,用户信息,订单信息,先保存在队列里,先返回给前端数据,在慢慢的根据队列的信息去存入数据
我们之前说将查询和校验功能放在redis中实现,那么用什么结构呢,查询订单很简单,只要查询相应的优惠券的库存是否大于0就行,我们就可以是否字符串结构,key存优惠券信息,value存库存;那么校验呢,因为是一人一单,所以我们可以使用set,这样就能保证用户的唯一性;
我们执行的具体步骤是:先判断库存是否充足,不充足直接返回,充足判断是否有资格购买,没有返回,有就可以减库存,然后将用户加入集合中,在返回,因为我们执行这些操作时要保证命令的原子性,所以这些操作我们都使用lua脚本来编写;
具体的执行流程就是,先执行lua脚本,如果结果不是0那么直接返回,如果不是0,那么就将信息存入阻塞队列然后返回订单id;
2:实现
1:新增时添加到redis
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
2:lua脚本编写:
local stock =tonumber(redis.call('get', 'seckill:stock:' .. ARGV[1]))
if (stock<=0) thenreturn 1
end
local userId=ARGV[2]
local isok=tonumber(redis.call('sadd','seckill:order:'..ARGV[1],userId))
if isok==0 thenreturn 2
end
redis.call('incrby','seckill:stock:'..ARGV[1],-1)
return 0
然后就能改变之前的代码,在redis中实现异步下单:
@Override
public Result seckilOrder(Long voucherId) throws InterruptedException {Long id = UserHolder.getUser().getId();Long res = (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA, Collections.emptyList(), voucherId.toString(), id.toString());if (res!=0){return Result.fail(res==1?"库存不足":"一人只能购买一单");}long orderID = redisIDWork.nextId("order");return Result.ok(orderID);}
初始化lua脚本文件
@Resource
private RedissonClient redissonClient2;
public static final DefaultRedisScript SECKIL_ORDER_LUA;
static {//初始化SECKIL_ORDER_LUA=new DefaultRedisScript<>();//定位到lua脚本的位置SECKIL_ORDER_LUA.setLocation(new ClassPathResource("seckill.lua"));//设置lua脚本的返回值SECKIL_ORDER_LUA.setResultType(Long.class);
}
还剩一个阻塞队列没有实现:
阻塞队列的功能就是异步的将订单信息存入数据库;
阻塞队列可以使用blockdeque
BlockingQueue<VoucherOrder> blockingQueue = new ArrayBlockingQueue<VoucherOrder>(1024*1024);
在类上直接初始化
然后使用的时候就是,将订单添加到阻塞队列,让另一个线程去执行,往数据库中添加阻塞队列中的订单信息:
blockingQueue.add(voucherOrder);
然后就要开出一个线程,然后执行往数据库添加元素的任务了:
//创建一个线程private ExecutorService SECKILL_ORDER_EXECUTOR=Executors.newSingleThreadExecutor();//注解PostConstruct,添加这个注解的方法就是在类初始化完成之后就会执行;@PostConstructprivate void init(){//提交任务SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle());}//定义一个任务内部类,实现Runnable,然后需要实现run方法,run方法中就是我们的任务private class VoucherOrderHandle implements Runnable {@Overridepublic void run() {try {//从阻塞队列中取出订单VoucherOrder voucherOrder = blockingQueue.take();//执行方法handleVoucherOrder(voucherOrder);} catch (InterruptedException e) {log.info("下单业务异常",e);}}}
当类加载是就会一直提交任务,只要阻塞队列里有订单,就会将订单取出然后调用方法将订单存入数据库
调用的方法是尝试获取锁的方法,而获取锁其实并不需要,因为我们自己开出来的线程只有一个是单线程,而且在lua脚本中已经对一人一单还有超卖问题进行处理,这里只是为了更加保险
@Transactionalpublic void handleVoucherOrder(VoucherOrder voucherOrder) throws InterruptedException {
// SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order" + id, stringRedisTemplate);Long userId = voucherOrder.getUserId();RLock simpleRedisLock = redissonClient2.getLock("lock:order" + userId);boolean trylock = simpleRedisLock.tryLock(1L, TimeUnit.SECONDS);if (!trylock){log.info("获取锁失败");}try {orderService.createVoucherOrder(voucherOrder);} catch (IllegalStateException e) {throw new RuntimeException(e);}finally {simpleRedisLock.unlock();}}
然后获取锁成功后就会调用方法执行数据库操作,但是这个方法是带有事务的,我们单独开出来的子线程无法使事务生效,只能在方法的外部声明一个代理对象,然后通过代理对象去调用方法使事务生效;
@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {Integer count = query().eq("user_id", voucherOrder.getUserId()).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.info("一个用户只能下一单");}//进行更新,库存减一boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();// where id = ? and stock > 0//扣减失败,返回错误信息;if (!success) {log.info("扣减失败");}save(voucherOrder);}
因为我们是开出来的子线程调用的方法,所以不能从线程中获取值,只能从我们传入的订单对象获取,然后就是减库存和存入订单的操作了;
总结:
我们使用异步操作,将下单和存入订单分开来执行,大大提高了执行的销量,在redis中完成超卖和一人一单的问题;
然后使用阻塞队列,开出一个子线程异步存入数据库下单;
问题:
我们的阻塞队列是在jvm中的,jvm中内存是有上线的,超过上限就会有异常,还有就是我们的数据都是存放在内存中,要是出现了一些事故会导致数据丢失
二:redis实现消息队列
1:什么是消息队列
消息队列由三个角色构成:
1:生产者:发送消息到消息队列
2:消息队列:存储和管理消息队列,也被称为消息代理
3:消费者:从消息队列中获取消息并处理
好的消息队列有这几个特点:
1:有独立的服务,独立的内存;
2:可以做到数据的持久化
3:能够发送消息给消费者并且确保消息处理完成
2:基于list结构实现消息队列
使用brpop可以实现阻塞获取
3:基于pubsub实现消息队列
4:基于stream实现消息队列
stream发送消息的方式xadd key * msg
key是指消息队列的名称,* 是发送消息的名称由redis来生成,后面的msg就是键值对,我们要发宋的消息
xread是读取消息的命令:count指定读取消息的数量,block指定阻塞时间,不指定就是不阻塞,指定0就是无限等待,sreams 是消息队列的名称,可以是多个,id是消息的id,0是从0开始读,$是从最新的开始读
但是有个问题就是,指定$是获取最新的消息,但是只是获取使用这个命令之后最新的消息,而如果一次性发多条,只会获取最后一个,就会出现漏消息;
5:stream的消费者组模式
消费者组就是将消费者划分到一个组中监听一个消息队列:
有这些好处:
1:消息分流:消息发送到消费者组中,消费者会处于竞争关系,会争夺消息来处理,这个发送多个消息就会实现分流,就会由不同的消费者来处理,加快了处理速度;
2:消息标识:在读取消息后会记录最后一个被处理的消息,这样就不会出现消息漏读的情况;
3:消息确认:消息发出去会,消息会处于pending状态,会等待消息处理完毕,这个时候会将消息存入pendinglist中,当处理完后才会从pending中移除;确保了消息的安全性,保证消息不会丢失,就算再消息发出去后,服务宕机了,也能知道该消息没有被处理,这个功能的作用就是确保消息至少被消费一次;
三:基于redis的stream结构实现消息队列
首先再redis客户端中输入命令创建一个队列和接受这个队列消息的组
然后修改秒杀下单的lua脚本,直接在redis中通过消息队列将消息发送给消费者:
local orderId=ARGV[3]
local stock =tonumber(redis.call('get', 'seckill:stock:' .. ARGV[1]))
if (stock<=0) thenreturn 1
end
local userId=ARGV[2]
local isok=tonumber(redis.call('sadd','seckill:order:'..ARGV[1],userId))
if isok==0 thenreturn 2
end
redis.call('incrby','seckill:stock:'..ARGV[1],-1)
--将消息发送给stream.orders队列
redis.call('xadd','stream.orders','*','userId',userId,'id',orderId,'voucherId',ARGV[1])
return 0
这里发送的是优惠券id,用户id还有订单id,正是我们存入数据库中所需要的参数
然后就可以去修改前面秒杀下单的逻辑,不用去将消息放到阻塞队列,我们直接从redis的队列中取出就行;
@Override
public Result seckilOrder(Long voucherId) throws InterruptedException {long orderId = redisIDWork.nextId("order");Long userId = UserHolder.getUser().getId();Long res = (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA,Collections.emptyList(), voucherId.toString(),userId.toString(),String.valueOf(orderId));if (res != 0) {return Result.fail(res == 1 ? "库存不足" : "一人只能购买一单");}orderService = (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);
}
这里我们需要将订单id作为lua脚本的参数传入进去,然后将订单信息存入阻塞队列的操作可以省略,因为我们已经将订单信息存入了redis中的消息队列;
然后这里我们需要单独开出一个线程去将队列中的消息存入数据库:
private class VoucherOrderHandle implements Runnable {String ququeName="stream.orders";@Overridepublic void run() {try {//从消息队列中取出订单while (true){//xreadgroup GROUP group consumer count(1) block(2000) streams key >List<MapRecord<String, Object, Object>> msg = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(ququeName, ReadOffset.lastConsumed()));//如果消息为空就继续等待接收if (msg==null||msg.isEmpty()){continue;}//因为每次读取一个消息,所以我们获取第一个消息MapRecord<String, Object, Object> entries = msg.get(0);//获取消息的值,是一些我们传入的键值对Map<Object, Object> value = entries.getValue();//将map转成voucherorder对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//执行方法handleVoucherOrder(voucherOrder);//确认消息已经处理stringRedisTemplate.opsForStream().acknowledge(ququeName,"g1",entries.getId());}} catch (InterruptedException e) {log.info("下单业务异常",e);handleVoucherOrderError();}}
我们要做的就是接受消息,然后再将消息存入数据库:
我们调用stream的方法,作为消费者从队列中读取消息,阻塞时间是2秒,每次读取一个消息,从下一个未消费的消息读取,如果读取的消息为空那么就继续循环读取消息,如果有消息就将消息取出,然后将其转成对象map,再将其转成对象,然后再去做确认消息的处理,如果不确认消息,消息就会存在待处理的队列中;如果出现的异常,那么我们取出的消息可能没有进行确认,没有确认的会存入待处理队列,我们就要从队列里取出然后进行处理;
出错只会执行的方法:
private void handleVoucherOrderError() {try {//从消息队列中取出订单while (true){//xreadgroup GROUP group consumer count(1) streams key 0,表示从第一个未处理的消息开始读取List<MapRecord<String, Object, Object>> msg = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1), StreamOffset.create(ququeName, ReadOffset.from("0")));//如果为空就说明没有待处理的消息结束就行if (msg==null||msg.isEmpty()){break;}//因为每次读取一个消息,所以我们获取第一个消息MapRecord<String, Object, Object> entries = msg.get(0);//获取消息的值,是一些我们传入的键值对Map<Object, Object> value = entries.getValue();//将map转成voucherorder对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//执行方法handleVoucherOrder(voucherOrder);}} catch (InterruptedException e) {log.info("下单业务异常",e);}}
}
这里因为是再待处理中直接取出,所以不用阻塞处理,然后从待消费队列中第一个消息开始读,如果为空,那么就说明没有待处理的消息,我们直接返回就行,如果不为空我们再处理
这样使用redis中的消息队列就实现了:1:独立的服务,足够的内存;2:有确认机制,避免消息漏读;3:消息持久化
BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);
//执行方法
handleVoucherOrder(voucherOrder);
}
} catch (InterruptedException e) {
log.info(“下单业务异常”,e);
}
}
}
> 这里因为是再待处理中直接取出,所以不用阻塞处理,然后从待消费队列中第一个消息开始读,如果为空,那么就说明没有待处理的消息,我们直接返回就行,如果不为空我们再处理这样使用redis中的消息队列就实现了:1:独立的服务,足够的内存;2:有确认机制,避免消息漏读;3:消息持久化
相关文章:

秒杀优化(异步秒杀,基于redis-stream实现消息队列)
目录 秒杀优化一:异步秒杀1:思路2:实现 二:redis实现消息队列1:什么是消息队列2:基于list结构实现消息队列3:基于pubsub实现消息队列4:基于stream实现消息队列5:stream的…...

Node.js——fs模块-文件读取
1、文件读取:通过程序从文件中去除其中的数据 2、方法 方法 说明 readFile 异步读取 readFileSync 同步读取 createReadStrean 流式读取 3、readFile 异步读取 语法: 本文的分享到此结束,欢迎大家评论区一同讨论学习,下一…...
深入理解 ZooKeeper:分布式协调服务的核心与应用
一、引言 随着互联网技术的飞速发展,分布式系统的规模和复杂性不断增加。在分布式环境中,各个节点之间需要进行高效的协调和通信,以确保系统的正常运行。ZooKeeper 正是为了解决分布式系统中的协调问题而诞生的一款开源软件。它提供了一种简单…...
你竟然还不了解 LDAP?
目录 什么是 LDAP LDAP 的工作原理 LDAP 的数据模型 LDAP 操作 LDAP 的使用场景 常见的 LDAP 服务器 小结 什么是 LDAP LDAP(Lightweight Directory Access Protocol,轻量级目录访问协议)是用于访问和管理目录服务的一种开放协议&…...

宝塔使用clickhouse踩坑
前言 最近有个物联网项目,需要存储物联网终端发送过来的信息(类似log日志,但又要存储在数据库里,方便后期聚合统计),本来想写文件的奈何客户要求聚合统计,所以只能用数据库才能达到更高的计算效…...

Linux命令学习记录
ls 查看文件资源,ls [选项] [路径] ls ls [单个选项] ls [组合选项],选项的组合与顺序无关 ls --help 查看更多命令参数 clear 清屏 cd 更换工作目录,cd [路径] cd [特殊路径符] . 表示当前目录 .. …...

一般无人机和FPV无人机的区别
文章目录 一般无人机的工作原理关键组件:一般无人机的应用领域一般无人机的操控体验 FPV无人机的工作原理关键组件:FPV无人机的应用领域FPV无人机的操控体验性能特点FPV无人机的性能特点 未来无人机发展方向和通信方式拓展 一般无人机的工作原理 一般无…...

数据结构初阶排序全解
目录 1>>排序前言 2>>插入排序 2.1>>直接插入排序 2.2>>希尔排序 3>>选择排序 3.1>>直接选择排序 3.2>>堆排序 4>>交换排序 4.1冒泡排序 4.2快速排序 5>>归并排序 6>>测试 test.c sort.h sort.c 7…...

MySQL的SQL语句之触发器的创建和应用
触发器 Trigger 一.触发器 作用:当检测到某种数据表发生数据变化时,自动执行操作,保证数据的完整性,保证数据的一致性。 1.创建一个触发器 如上图所示,查看这个create的帮助信息的时候,这个create trig…...

myWebserver 介绍
项目总结 项目准备过程中,主要阅读了《Linux 高性能服务器编程》游双 一书。源码参考的是:TinyWebServer,我在此源码的基础上做了一定的优化和修改。 我的代码:Github: myWebserver: 我的C服务器项目 我的 Webserver 项目总结&…...

钉钉平台开发小程序
一、下载小程序开发者工具 官网地址:小程序开发工具 - 钉钉开放平台 客户端类型 下载链接 MacOS x64 https://ur.alipay.com/volans-demo_MiniProgramStudio-x64.dmg MacOS arm64 https://ur.alipay.com/volans-demo_MiniProgramStudio-arm64.dmg Windows ht…...

九识智能与徐工汽车达成战略合作,共绘商用车未来新蓝图
近日,九识智能与徐工汽车签署战略合作协议,标志着双方在智能驾驶技术与新能源商用车融合应用、联合生产及市场推广等方面迈入深度合作的新篇章,将共同引领智能驾驶技术商业化浪潮。 近年来,在国家智能化发展战略的引领下ÿ…...

Serverless + AI 让应用开发更简单
本文整理自 2024 云栖大会,阿里云智能高级技术专家,史明伟演讲议题《Serverless AI 让应用开发更简单》 随着云计算和人工智能(AI)技术的飞速发展,企业对于高效、灵活且成本效益高的解决方案的需求日益增长。本文旨在…...

外包功能测试就干了4周,技术退步太明显了。。。。。
先说一下自己的情况,大专生,21年通过校招进入武汉某软件公司,干了差不多3个星期的功能测试,那年国庆,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落!而我才在一个外包企业干了4周的功…...
外观模式及运用场景
外观模式(Facade Pattern)是一种结构性设计模式,它为复杂子系统提供一个统一的接口,从而简化与这些子系统的交互。通过外观模式,客户端可以更轻松地使用复杂系统,而不必了解其内部实现。接下来,…...

PyQt5实战——多脚本集合包,UI以及工程布局(二)
个人博客:苏三有春的博客 系列往期: PyQt5实战——多脚本集合包,前言与环境配置(一) 布局 2.1 UI页面布局 整体框架分为分为三个部分,垂直分布。 第一个部分为功能选择按钮(如UTF-8转换&#…...

Python 数据结构对比:列表与数组的选择指南
文章目录 💯前言💯Python中的列表(list)和数组(array)的详细对比1. 数据类型的灵活性2. 性能与效率3. 功能与操作4. 使用场景5. 数据结构选择的考量6. 实际应用案例7. 结论 💯小结 💯…...

gem5运行简单RISC-V全系统模拟
简单记录gem5中运行最简单的RISC-V Full System Simulation的过程 首先是编译RISC-V和m5term,这部分不多写了,官网均有对应教程。 之后直接使用官方在configs/example/gem5_library目录下的riscv-fs.py 运行如下命令 ./build/RISCV/gem5.opt configs/…...
洛谷 P1195 口袋的天空
自用。 题目传送门:口袋的天空 - 洛谷 题解:Inori_333 参考题解:题解 P1195 【口袋的天空】 - 洛谷专栏 /*P1195 口袋的天空https://www.luogu.com.cn/problem/P11952024/11/03 submit:inori333 */#include <iostream> #include &…...

ffmpeg视频滤镜:膨胀操作-dilation
滤镜介绍 dilation 官网链接 > FFmpeg Filters Documentation 膨胀滤镜会使图片变的更亮,会让细节别的更明显。膨胀也是形态学中的一种操作,在opencv中也有响应的算子。此外膨胀结合此前腐蚀操作,可以构成开闭操作。 开操作是先腐蚀…...
3.3 windows,ReactOS系统中页面的换出----2,结构体PHYSICAL_PAGE
系列文章目录 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 例如:第一章 Python 机器学习入门之pandas的使用 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目…...

lvgl
lvgl 目录 lvgl Lvgl移植到STM32 -- 1、下载LVGL源码 -- 2、将必要文件复制到工程目录 -- 3、修改配置文件 将lvgl与底层屏幕结合到一块 -- lvgl也需要有定时器,专门给自己做了一个函数,告诉lvgl经过了多长时间(ms(毫秒)级别) 编写代码 lvgl的中文教程手册网站…...
【django】RESTful API 设计指南
目录 一、协议 二、域名 三、版本(Versioning) 四、路径(Endpoint) 五、HTTP动词 5.1 CRUD操作: 5.2 其他动词: 六、过滤信息(Filtering) 七、状态码(Status Co…...

提升大数据量分页查询性能:深分页优化全解
前言 在处理数据量逐渐增大的数据库表时,优化查询性能是一个常见的挑战。朋友们可能会建议说,创建索引不就能解决问题了吗?然而,当数据量达到相当规模时,简单的索引可能不足以应对所有情况。这时,可能会有…...
WPF 实现冒泡排序可视化
WPF 实现冒泡排序可视化 实现冒泡排序代码就不过多讲解,主要是实现动画效果思路,本demo使用MVVM模式编写,读者可自行参考部分核心代码,即可实现如视频所示效果。 对于新手了解算法相关知识应该有些许帮助,至于其它类型…...

Claude 3.5 新功能 支持对 100 页的PDF 图像、图表和图形进行可视化分析
Claude 3.5 Sonnet发布PDF图像预览新功能,允许用户分析长度不超过100页的PDF中的视觉内容。 此功能使用户能够轻松上传文档并提取信息,特别适用于包含图表、图形和其他视觉元素的研究论文和技术文档。 视觉PDF分析:用户现在可以从包含各种视觉…...

正式开源:从 Greenplum 到 Cloudberry 迁移工具 cbcopy 发布
Cloudberry Database 作为 Greenplum 衍生版本和首选开源替代,由 Greenplum 原始团队成员创建,与 Greenplum 保持原生兼容,并能实现无缝迁移,且具备更新的 PostgreSQL 内核和更丰富的功能。GitHub: https://github.com/cloudberry…...
Python如何读写文件?
1. 文件读取 (1)使用open()函数打开文件 基本语法是file_object open(file_name, mode),其中file_name是要打开的文件的名称(包括路径,如果文件不在当前目录下),mode是打开文件的模式。例如&a…...
100种算法【Python版】第38篇——Boyer-Moore算法
本文目录 1 算法说明2 算法示例3 python代码1 算法说明 Boyer-Moore算法由Robert S. Boyer和J. Strother Moore于1977年提出,旨在提高字符串匹配的效率。该算法在寻找固定模式的过程中,利用模式本身的信息,优化搜索过程,特别适合长文本中的模式查找。 算法原理 Boyer-Moo…...

贪心算法---java---黑马
贪心算法 1)Greedy algorithm 称之为贪心算法或者贪婪算法,核心思想是 将寻找最优解的问题分为若干个步骤每一步骤都采用贪心原则,选取当前最优解因为未考虑所有可能,局部最优的堆叠不一定得到最终解最优 贪心算法例子 Dijkstra while …...