当前位置: 首页 > news >正文

Redis应用—4.在库存里的应用

大纲

1.库存模块设计

2.库存缓存分片和渐进式同步方案

3.基于缓存分片的下单库存扣减方案

4.商品库存设置流程与异步落库的实现

6.库存入库时"缓存分片写入 + 渐进式写入 + 写入失败进行MQ补偿"的实现

7.库存扣减时"基于库存分片依次扣减 + 合并扣减 + 扣不了返还 + 异步落库"的实现

1.库存模块设计

(1)社区电商系统库存模块的设计要求

(2)社区电商系统库存模块功能分析

(3)商品系统处理库存出⼊库时影响库存数据的设计

(4)订单系统扣减和返还库存时影响库存数据的设计

(5)查看商品SKU库存的设计

(1)社区电商系统库存模块的设计要求

由于该库存模块可以支持高性能的并发读写,因此需要支持对商品库存进行多分片写入和读取处理(分片一般等于节点),需要提供单个分片库存不足以扣减时的合并库存功能,以及需要提供操作商品入库时的库存渐进性写入缓存的实现。

也就是对于热点库存能够实现缓存分片。

进行库存分片后,如果遇到单个分片库存不足可以进行合并扣减库存。库存落库之后,库存数据以渐进式的方式写入到缓存里。

(2)社区电商系统库存模块功能分析

主要会有两个系统会操作库存的数据,即商品系统 + 订单系统。首先是商品系统会对商品库存进行入库和出库,然后是订单系统会对商品库存进行购买时的扣减和退款时的返还,所以商品系统和订单系统会影响库存数据变更。

一般而言,库存的数据都是要放到Redis里去的。因为这可以方便后面进行高并发活动如大促和秒杀,而大促和秒杀活动往往会对库存进行高并发读和写,所以库存数据是典型的读多写多数据。

(3)商品系统处理库存出⼊库时影响库存数据的设计

商品中⼼调⽤库存中⼼,添加商品库存信息时,一般会涉及到3个表的数据。第⼀个表是库存表,需要更新相关库存信息(第⼀次要新增)。第二个表是库存变更记录表,需要记录当次的库存变更记录。第三个表是库存变更明细表,需要记录当次的库存变更明细。

库存初始化到库存分⽚中的时候,采⽤渐进性同步的⽅式来进行同步。否则如果采用⼀次性同步的方式,假如过程中失败了就会造成库存不均匀。

例如每个库存分片(节点)需要写100个库存:

说明一:如果一次性同步,那么就是遍历一次节点,每个节点写100个库存。当遍历到某个节点却写入失败时,写入失败的库存数要重新遍历节点写入,这时候就会造成节点库存分配不均匀了。

说明二:如果渐进性同步,那么就是分多次遍历节点,已做好某次遍历节点写入库存就存在节点写入失败情况的准备了。比如每个节点写100个库存,那么就遍历节点10次,每次写10个库存,这样就可以尽量避免节点库存不均匀了。

说明三:当同步过程中出现异常导致同步中断,此时就发送⼀条消息给MQ做补偿。MQ补偿时,会扣减掉已同步缓存的数量,只同步剩余数量。补偿消息要避免重复消费,默认收到就只处理⼀次,异常则再次发送新的消息补偿缓存。

(4)订单系统扣减和返还库存时影响库存数据的设计

说明一:进⾏下单、缺货、取消、⻛控等业务场景时,会涉及对库存的操作变更。

说明二:每个商品SKU都会维护⼀个key,每次操作一个SKU库存时,这个key都会自增+1。通过这个key值对分⽚机器数取模,就可以选择其中⼀台机器进⾏库存扣减。

说明三:当被访问的分⽚库存不能完成此次扣减,则前往下⼀个分⽚继续尝试,直到所有分⽚都不⾜以扣减此次库存以后,则开始尝试合并库存扣减。

说明四:合并扣减⾸先会从每个分⽚尝试扣减,默认扣减分⽚的最⼤剩余库存。当分⽚内的库存可购买数量⼩于用户需要购买数量时,就从lua脚本中返还本次分⽚的实际扣除数量,并记录起来。当全部扣除后还是失败或中途扣除过程发⽣异常时,可以进⾏回滚。

注意:Redis能执行lua脚本,一段lua脚本可以作为一个整体,这样将多条Redis命令写入lua,就可以实现事务的原子性。

(5)查看商品SKU库存的设计

每次查看商品SKU库存时,会去各个分⽚获取分⽚库存,然后合并才返回。

2.库存缓存分片和渐进式同步方案

(1)库存缓存分片方案避免瞬时流量倾斜

(2)渐进性同步方案避免节点库存不均

(1)库存缓存分片方案避免瞬时流量倾斜

库存数据写入单节点缓存后:如果遇到大促活动如秒杀,需要瞬时高并发去操作一个商品SKU的库存时,就会导致对缓存集群里某个Redis节点造成过大压力,造成瞬时流量倾斜。

所以为了解决瞬时流量倾斜问题,往往采用缓存分片。比如商品SKU库存有100个,这时可以把这100个库存拆分为10个分片。假如Redis集群有5个节点,此时分10个分片,那么每个节点就有2个分片。不过库存分片的数量一般设置成与Redis节点数量一样(分片一般等于节点)。这样出现库存的瞬时高并发操作时,就可以将库存扣减请求分到多个节点上。这样高并发流量就能均匀负载到各个节点上去,避免对单个节点写压力过高。

(2)渐进性同步方案避免节点库存不均

在分配库存到分片缓存时,采用渐进性分配库存的方式。例如每个库存分片(分片一般等于节点)需要写100个库存。

如果一次性同步,那么就是遍历一次节点,每个节点写100个库存。当遍历到某个节点却写入失败时,写入失败的库存数要重新遍历节点写入,这时候就会造成节点库存分配不均匀了。

如果渐进性同步,那么就是分多次遍历节点,已做好某次遍历节点写入库存就存在节点写入失败情况的准备了。比如每个节点写100个库存,那么就遍历节点10次,每次写10个库存,这样就可以尽量避免节点库存不均匀了。

但是无论是一次性同步(刚性同步)或者是渐进性同步(柔性同步),都需要考虑将数据从数据库同步到缓存的过程中是有可能出现失败的。失败时就需要基于MQ来做补偿,把没同步成功的库存补偿回去。

图片

3.基于缓存分片的下单库存扣减方案

(1)缓存分片下如何选择节点

(2)如何通过轮询选择Redis节点

(3)如何处理库存分片的库存不足问题

假设一个商品SKU有10000个库存,拆分为10个库存分片,每个分片1000,这10个库存分片会分散在多个Redis节点里。那么用户下单需要扣减商品库存时,到底去哪个Redis节点进行库存扣减。

(1)缓存分片下如何选择节点

此时有两种选择Redis节点的方案:可以通过随机的方式选出一个Redis节点来进行库存扣减,也可以通过轮询的方式选出一个Redis节点来进行库存扣减,这里会通过轮询的方式来选择Redis节点去进行库存扣减。

(2)如何通过轮询选择Redis节点

首先商品SKU需要维护一个访问key,然后每次扣减库存时都对这个访问key进行自增。接着根据这个自增值对库存分片数量进行取模,通过取模确定一个库存分片。然后再根据这个库存分片,确定该分片是在哪个Redis节点里的。这样就可以将库存扣减请求发送到那个Redis节点里进行处理了。

(3)如何处理库存分片的库存不足问题

如果轮询出的某个库存分片没库存或者库存不够了,比如当前库存分片还有1个库存,但这次用户请求需要扣减3个库存。明显当前库存分片不足以扣减,此时就可以尝试下一个库存分片来进行扣减。如果下一个库存分片也不足以扣减,那么继续下一个库存分片来进行扣减。如果最后发现每个库存分片都无法单独进行扣减,那就合并库存再进行扣减。合并库存进行扣减时,会对多个库存分片里的库存逐一扣减。

图片

4.商品库存设置流程与异步落库的实现

商品中心操作库存会分为3步:

第一步:对库存设置进行异步落库

第二步:落库的库存数据会被同步到缓存分片里,并且是渐进式写入的

第三步:如果同步到缓存分片过程出现问题,需要基于MQ进行补偿

比如采购系统发起商品采购,然后供应商把商品发到仓库里。接着仓库操作员对商品入库,商品进行入库时会发送商品入库事件消息。库存系统可以监听并消费该事件,然后异步触发商品库存的设置和初始化。

如果商品系统创建商品时就设置了商品库存,这时就可以同步调用库存系统的接口,去执行商品库存初始化设置操作。商品库存初始化时会更新库存,这时对DB的操作也是通过MQ异步进行。

也就是商品库存初始化、商品库存入库、购物车库存扣减,都是异步写库,但是写缓存是同步的。

@Service
public class InventoryServiceImpl implements InventoryService {...//商品库存入库@Overridepublic void putStorage(InventoryRequest request) {//1.异步更新数据到DBsendAsyncStockUpdateMessage(request);//2.同步执行库存均匀分发到缓存executeStockLua(request);}//发送库存变更的消息private void sendAsyncStockUpdateMessage(InventoryRequest request) {Long startTime = System.currentTimeMillis();//发送消息到MQdefaultProducer.sendMessage(RocketMqConstant.INVENTORY_PRODUCT_STOCK_TOPIC, JsonUtil.object2Json(request), "COOKBOOK库存变更异步落库消息");log.info("商品编号:" + request.getSkuId() + "发送mq,总计耗时" + (System.currentTimeMillis() - startTime) + "毫秒");}
}@Configuration
public class ConsumerBeanConfig {//配置内容对象@Autowiredprivate RocketMQProperties rocketMQProperties;//商品库存扣减变更的topic@Bean("inventoryStockUpdateTopic")public DefaultMQPushConsumer inventoryStockUpdateConsumer(InventoryStockUpdateListener inventoryStockUpdateListener) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.PUSH_DEFAULT_PRODUCER_GROUP);consumer.setNamesrvAddr(rocketMQProperties.getNameServer());consumer.subscribe(RocketMqConstant.INVENTORY_PRODUCT_STOCK_TOPIC, "*");consumer.registerMessageListener(inventoryStockUpdateListener);consumer.start();return consumer;}...
}@Component
public class InventoryStockUpdateListener implements MessageListenerConcurrently {@Autowiredprivate InventoryService inventoryService;@Autowiredprivate RedisLock redisLock;//消费库存变更消息@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {String skuInfoLockKey = "";try {for (MessageExt messageExt : msgList) {String msg = new String(messageExt.getBody());InventoryRequest request = JSON.parseObject(msg, InventoryRequest.class);skuInfoLockKey = RedisKeyConstants.INVENTORY_LOCK_PREFIX + request.getSkuId();//每条库存的日志变更明细,异步场景下需要按顺序进行扣减,避免库存明细数据不准确boolean lock = redisLock.tryLock(skuInfoLockKey, 3000L);if (lock) {//存储库存变化记录inventoryService.updateInventory(request);} else {log.error("consume failure, 消息待下次重试");return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}} catch (Exception e) {//本次消费失败,下次重新消费log.error("consume error, 库存变更消息消费失败", e);return ConsumeConcurrentlyStatus.RECONSUME_LATER;} finally {redisLock.unlock(skuInfoLockKey);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}@Service
public class InventoryServiceImpl implements InventoryService {...//库存变更数据库操作@Override@Transactional(rollbackFor = Exception.class)public void updateInventory(InventoryRequest request){//1.操作库存变更InventoryDO inventoryDO = saveStorageStock(request);//2.记录变更明细saveStorageDetailLog(inventoryDO,request);}//保存商品的入库信息private InventoryDO saveStorageStock(InventoryRequest request) {LambdaQueryWrapper<InventoryDO> queryWrapper = Wrappers.lambdaQuery();queryWrapper.eq(InventoryDO::getSkuId, request.getSkuId());InventoryDO inventoryDO = inventoryDAO.getOne(queryWrapper);//还没有这个商品的入库信息if (Objects.isNull(inventoryDO)) {inventoryDO = inventoryConverter.converterRequest(request);inventoryDO.setCreateTime(new Date());inventoryDO.setUpdateTime(new Date());inventoryDAO.save(inventoryDO);} else {inventoryDO.setInventoryNum(inventoryDO.getInventoryNum() + request.getInventoryNum());inventoryDO.setUpdateTime(new Date());inventoryDAO.updateById(inventoryDO);}return inventoryDO;}//记录本次出入库的明细记录private void saveStorageDetailLog(InventoryDO inventoryDO, InventoryRequest request) {//记录本次入库记录StorageInfoDO storageInfoDO = inventoryConverter.converterStorageRequest(request);storageInfoDO.setStorageTime(new DateTime());storageInfoDAO.save(storageInfoDO);//记录本次记录的明细StorageDetailLogDO storageDetailLogDO = inventoryConverter.converterStorageLogRequest(inventoryDO);storageDetailLogDO.setStorageBeforeNum(inventoryDO.getInventoryNum() - request.getInventoryNum());storageDetailLogDO.setStorageCode(request.getWarehouseCode());storageDetailLogDO.setStorageNum(request.getInventoryNum());storageDetailLogDO.setStorageTime(new DateTime());storageDetailLogDAO.save(storageDetailLogDO);}
}//库存表
@Data
@TableName("inventory_info")
public class InventoryDO implements Serializable {private Long id;//主键IDprivate Long skuId;//商品SKUprivate String warehouse;//仓库编码private Integer inventoryNum;//库存数量private Date createTime;//创建时间private Date updateTime;//更新时间private Integer operator;//操作人
}

6.库存入库时"缓存分片写入 + 渐进式写入 + 写入失败进行MQ补偿"的实现

(1)基于Redis多节点的库存缓存分片的实现细节

(2)对库存缓存分片进行渐进式写入的分析

(1)基于Redis多节点的库存缓存分片的实现细节

首先通过Jedis连接池的大小来获取Redis节点数量,然后获取要分配的商品SKU库存数量,接着通过要分配的库存数量除以Redis节点数量计算单节点要分配的总库存。

渐进式写入库存时,每次遍历节点都要让写入的库存数量满足:如果单节点分配的总库存数比较大,那么每次就写入十分之一的总库存数。如果单节点分配的总库存数比较小,那么每次就默认写入3个库存。

(2)对库存缓存分片进行渐进式写入的分析

假设有3个节点并且入库数量是900,这样每个节点会分配300个库存。然后开始遍历这3节点循环写入,每遍历到一个节点就直接写入300个库存。遍历完前两个节点都各写入300库存,但是遍历到第三个节点却写入失败,这时就会导致第三个节点完全没有任何库存。那么进行库存扣减时,所有压力都会集中到第一个和第二个节点上。

在如下代码实现中,当往某个节点写入库存时,不会关注是否会写入失败。不针对单节点进行写入重试,而是循环写所有节点,只关注写入的库存数量,所以才采用了渐进式写入的方法。如果每个节点要写入300个库存,那么就遍历节点10轮。执行每轮遍历时,遍历到某个节点就对该节点写入30个库存。这样的好处就是即便有节点写入失败了,也可以尽量保证节点库存数量均匀。只要各个节点的库存相差不大,就可以避免出现对某些节点长期压力。

@Service
public class InventoryServiceImpl implements InventoryService {@Autowiredprivate CacheSupport cacheSupport;...//执行库存分配,使用lua脚本执行库存的变更@Overridepublic void executeStockLua(InventoryRequest request) {String productStockKey = RedisKeyConstants.PRODUCT_STOCK_PREFIX + request.getSkuId();Integer sumNum = 0;Long startTime = System.currentTimeMillis();try {//获取默认设定分桶,也就是获取Redis节点数量int redisCount = cacheSupport.getRedisCount();//获取要分配的库存数量Integer inventoryNum = request.getInventoryNum();//计算单个Redis节点预计分配的总库存Integer countNum = inventoryNum / redisCount;//获取渐进式写入库存时、每次遍历节点时对各节点写入的库存数量:十分之一的总库存数、或默认3个库存;countNum = getAverageStockNum(countNum, redisCount);int i = 0;while (true) {for (long count = 0; count < redisCount; count++) {//最后一次分配的库存小于预计分配库存的时候,则以剩余的库存为准if (inventoryNum - sumNum < countNum) {countNum = inventoryNum - sumNum;}//count代表了Redis节点编号Object eval = cacheSupport.eval(count, RedisLua.ADD_INVENTORY, CollUtil.toList(productStockKey), CollUtil.toList(String.valueOf(countNum)));if (!Objects.isNull(eval) && Long.valueOf(eval + "") > 0) {//执行lua脚本分配成功的才累计(可能出现不均匀的情况)sumNum = sumNum + countNum;i++;}if (sumNum.equals(inventoryNum)) {break;}}//分配完成跳出循环if (sumNum.equals(inventoryNum)) {break;}}log.info("商品编号:" + request.getSkuId() + ",同步分配库存共分配" + (i) + "次" + ",分配库存:" + sumNum + ",总计耗时" + (System.currentTimeMillis() - startTime) + "毫秒");} catch (Exception e) {e.printStackTrace();//同步过程中发生异常,减去已被同步的缓存库存,发送消息再行补偿,这里出现异常不抛出,避免异常request.setInventoryNum(request.getInventoryNum() - sumNum);sendAsyncStockCompensationMessage(request);log.error("分配库存到缓存过程中失败", e.getMessage(), e);}}//获取渐进式写入库存时,每次遍历要对节点写入的库存数量//所以主要会分为两种情况://如果单节点要写入的总库存数比较大,那么每次就写入十分之一的总库存数//如果单节点要写入的总库存数比较小,那么每次就写入默认3个库存//@param countNum 单个Redis节点要分配的总库存数private Integer getAverageStockNum(Integer countNum, Integer redisCount) {Integer num = 0;//假设redisCount = 5,StockBucket.STOCK_MAX_WRITE_COUNT = 10//如果countNum > 5,那么就对单个Redis节点写入10次,每次写countNum/10个库存//如果5 < countNum <= 50,那么就对单个Redis节点写入countNum/3次,每次写3个库存//如果countNum < 5,那么就只能对单个Redis节点写入1次了,每次写countNum个库存if (countNum > (redisCount * StockBucket.STOCK_MAX_WRITE_COUNT)) {num = countNum / StockBucket.STOCK_MAX_WRITE_COUNT;} else if (countNum > 3) {num = 3;} else {num = countNum;}return num;}
}@Component
public class RedisCacheSupport implements CacheSupport {private final JedisManager jedisManager;@Overridepublic int getRedisCount() {return jedisManager.getRedisCount();}@Overridepublic Object eval(Long hashKey, String script, List<String> keys, List<String> args) {try (Jedis jedis = jedisManager.getJedisByHashKey(hashKey)) {return jedis.eval(script, keys, args);}}...
}@Component
public class JedisManager implements DisposableBean {...private final List<JedisPool> jedisPools = new ArrayList<>();public JedisManager(JedisConfig jedisConfig) {JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();jedisPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());jedisPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());jedisPoolConfig.setMinIdle(jedisConfig.getMinIdle());jedisPoolConfig.setTestOnBorrow(true);jedisPoolConfig.setTestOnReturn(false);//获取配置中的一系列Redis地址,放到Redis的连接池里for (String addr : jedisConfig.getRedisAddrs()) {String[] ipAndPort = addr.split(":");String redisIp = ipAndPort[0];int redisPort = Integer.parseInt(ipAndPort[1]);JedisPool jedisPool = new JedisPool(jedisPoolConfig, redisIp, redisPort, 3000, jedisConfig.getPassword());LOGGER.info("创建JedisPool, jedisPool={}", jedisPool);jedisPools.add(jedisPool);}}public Jedis getJedisByHashKey(long hashKey) {hashKey = Math.abs(hashKey);int index = (int) (hashKey % getRedisCount());return getJedisByIndex(index);}public int getRedisCount() {//通过Jedis连接池的大小获取Redis节点数量return jedisPools.size();}...
}public class RedisLua {...//初始化新增库存//如果key存在,那么先拿出当前key对应的库存数量,如果库存数量是大于等于0,那么就获取传入的数量进行累加//如果key不存在,那么就就获取传入的数量去设置创建这个keypublic static final String ADD_INVENTORY = ""+ "if (redis.call('exists', KEYS[1]) == 1) then"+ "    local occStock = tonumber(redis.call('get', KEYS[1]));"+ "    if (occStock >= 0) then"+ "        return redis.call('incrBy', KEYS[1], ARGV[1]);"+ "    end;"+ "end;"+ "redis.call('SET', KEYS[1], ARGV[1]);"+ "return tonumber(redis.call('get', KEYS[1]));";...
}

(3)库存缓存分片写入失败MQ补偿方案

@Service
public class InventoryServiceImpl implements InventoryService {...@Overridepublic void executeStockLua(InventoryRequest request) {String productStockKey = RedisKeyConstants.PRODUCT_STOCK_PREFIX  + request.getSkuId();Integer sumNum = 0;Long startTime = System.currentTimeMillis();try {...} catch (Exception e) {e.printStackTrace();//同步过程中发生异常,减去已被同步的缓存库存,发送消息再行补偿,这里出现异常不抛出,避免异常request.setInventoryNum(request.getInventoryNum() - sumNum);sendAsyncStockCompensationMessage(request);log.error("分配库存到缓存过程中失败", e.getMessage(), e);}}
}@Component
public class CompensationStockListener implements MessageListenerConcurrently {@Autowiredprivate InventoryService inventoryService;@Autowiredprivate MqIdempotentDAO mqIdempotentDAO;//消费库存缓存补偿消息@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : msgList) {log.info("库存同步缓存消费,消息内容:{}", messageExt.getBody());String msg = new String(messageExt.getBody());  //保存消息的记录,用来处理幂等,一个消息只能被消费一次,当然这种实现还不够好SaveIdempotentMq(messageExt.getMsgId());InventoryRequest request = JSON.parseObject(msg, InventoryRequest.class);inventoryService.executeStockLua(request);}} catch (Exception e) {//默认每次消息都为成功,重发消息在同步缓存里面进行处理log.error("consume error, 库存同步缓存消息消费失败", e);}//默认只消费一次return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//记录消息,保证一个消息只能被消费一次private void SaveIdempotentMq(String msgId) {MqIdempotentLogDO mqIdempotentLogDO = new MqIdempotentLogDO();mqIdempotentLogDO.setMsgId(msgId);mqIdempotentDAO.save(mqIdempotentLogDO);}
}

7.库存扣减时"基于库存分片依次扣减 + 合并扣减 + 扣不了返还 + 异步落库"的实现

(1)库存扣减分三步

(2)基于库存缓存分片的依次扣减逻辑

(3)基于库存缓存分片的合并扣减逻辑和合并扣减失败的库存返还逻辑

(4)查询库存的实现

(1)库存扣减分三步

步骤一:维护一个商品SKU的消费购买次数Key,每次自增 + 1,用于分片取模。

步骤二:检测分片内的库存是否足够购买。如果不够,则依次选择新的分片进行扣减。如果所有分片都不够扣减,则进行合并后扣减,合并扣减不了那么再进行库存返还。

步骤三:库存的变化需要进行异步落库到DB,使用MQ来保证数据最终一致性。

对库存分片进行扣减库存时,首先会对多个分片依次扣减。如果一个分片扣减不成功就去下一个分片继续尝试扣减。如果所有分片都扣减不成功,那么就进行合并扣减。如果合并扣减也不成功,则进行库存返还。如果扣减成功,则还需要将扣减库存数转发到MQ异步落库到DB。

当然如果某个库存分片的库存已经为0,可以对该分片进行标记,避免下次路由到该分片尝试扣减库存。

(2)基于库存缓存分片的依次扣减逻辑

@Service
public class InventoryServiceImpl implements InventoryService {...//扣减,返还商品库存//基于缓存分片依次扣减、多分片合并扣减、扣不了时库存返还、异步落库@Overridepublic void deductProductStock(InventoryRequest request) {//1.维护一个商品的消费购买次数Key,每次自增+1,并返回本次的请求数Integer incrementCount = increment(request);//2.检测分片内的库存是否足够购买,如果不够,则选择到新的分片进行扣减,甚至进行合并后扣减deductStockLua(incrementCount, request.getSkuId(), request.getInventoryNum());//3.对库存的变化,需要进行异步落库到DB,使用MQ来保证数据最终一致性//注意这里是扣除库存,和入库的是反向的,所以这里要正数转负数request.setInventoryNum(0 - request.getInventoryNum());sendAsyncStockUpdateMessage(request);}//每次请求,商品扣减库存自增1,用于分片取模private Integer increment(InventoryRequest request) {Long startTime = System.currentTimeMillis();String incrementKey = RedisKeyConstants.PRODUCT_STOCK_COUNT_PREFIX + request.getSkuId();Long incrementCount = cacheSupport.incr(incrementKey);log.info("商品编号:" + request.getSkuId() + "获取访问次数" + incrementCount + ",总计耗时" + (System.currentTimeMillis() - startTime) + "毫秒");return incrementCount.intValue();}//对指定的分桶进行数据扣减或者返回库存的lua脚本执行//@param incrementCount 分片标识//@param skuId          商品SKU ID//@param stockNum       扣减或者返回库存private void deductStockLua(Integer incrementCount, Long skuId, Integer stockNum) {String productStockKey = RedisKeyConstants.PRODUCT_STOCK_PREFIX + skuId;int redisCount = cacheSupport.getRedisCount();long maxSequence = incrementCount + redisCount - 1;Object result;Boolean deduct = false;Long startTime = System.currentTimeMillis();//当一个分片不足以扣除,循环至下一个分片进行扣除,直到全部不够后,才进行合并处理for (long i = incrementCount; i <= maxSequence; i++) {result = cacheSupport.eval(i, RedisLua.SCRIPT, CollUtil.toList(productStockKey), CollUtil.toList(String.valueOf(stockNum)));if (Objects.isNull(result)) {continue;}if (Integer.valueOf(result + "") > 0) {int index = (int) (i % redisCount);log.info("redis实例[{}] 商品[{}] 本次扣减缓存库存:[{}], 剩余缓存库存:[{}],耗时:[{}]", index, skuId, stockNum, result, System.currentTimeMillis() - startTime);deduct = true;break;}}//单个分片已经无法扣减库存了,进行合并扣除if (!deduct) {//获取一下当前的商品总库存,如果总库存也已不足以扣减则直接失败BigDecimal sumNum = queryProductStock(skuId);if (sumNum.compareTo(new BigDecimal(stockNum)) >= 0) {mergeDeductStock(productStockKey, stockNum);}throw new InventoryBizException("库存不足");}}
}@Component
public class RedisCacheSupport implements CacheSupport {...@Overridepublic Object eval(Long hashKey, String script, List<String> keys, List<String> args) {try (Jedis jedis = jedisManager.getJedisByHashKey(hashKey)) {return jedis.eval(script, keys, args);}}...
}@Component
public class JedisManager implements DisposableBean {...public Jedis getJedisByHashKey(int hashKey) {hashKey = Math.abs(hashKey);int index = hashKey % getRedisCount();return getJedisByIndex(index);}
}public class RedisLua {//扣减库存//如果key存在,那么就先将库存取出来,尝试进行扣减//如果尝试扣减完发现超卖,就返回-1表示扣减库存失败//如果尝试扣减完发现不超卖,就对库存缓存进行负数的累加操作并返回调用incrBy的结果//如果key不存在,就返回-3public static final String SCRIPT ="if (redis.call('exists', KEYS[1]) == 1) then"+ "    local stock = tonumber(redis.call('get', KEYS[1]));"+ "    local num = tonumber(ARGV[1]);"+ "    local results_num = stock - num"+ "    if (results_num <= 0) then"+ "        return -1;"+ "    end;"+ "    if (stock >= num) then"+ "        return redis.call('incrBy', KEYS[1], 0 - num);"+ "    end;"+ "    return -2;"+ "end;"+ "return -3;";...
}

(3)基于库存缓存分片的合并扣减逻辑和合并扣减失败的库存返还逻辑

合并扣减的时候,每当对一个节点扣减完该节点可以扣的库存后,也就是扣减该节点的缓存库存值和传入待扣减库存值的最小值之后,就返回还剩需要继续扣减的库存值。然后需要记录每个节点扣减成功的库存值,以便合并扣减失败时可进行返还。lua脚本传入的参数如果是负数就代表进行返还,也就是负负得正进行累加。

比如本来要扣10个库存,合并扣减时,第一个节点已经扣完它可以扣的2个库存后,就返回8给第二个节点去扣减。第二个节点已经扣完它可以扣的3个库存后,就返回5给第三个节点去扣减。第三个节点它的库存刚好经历入库有6个库存,那么扣减完5个库存就返回0。

//对每个分片扣减库存发现不足时,进行合并扣减
private void mergeDeductStock(String productStockKey, Integer stockNum) {//TODO 执行多个分片的扣除扣减,对该商品的库存操作上锁,保证原子性Map<Long, Integer> fallbackMap = new HashMap<>();int redisCount = cacheSupport.getRedisCount();try {//开始循环扣减库存for (long i = 0; i < redisCount; i++) {if (stockNum > 0) {Object diffNum = cacheSupport.eval(i, RedisLua.MERGE_SCRIPT, CollUtil.toList(productStockKey), CollUtil.toList(stockNum + ""));if (Objects.isNull(diffNum)) {continue;}//当扣减后返回得值大于0的时候,说明还有库存未能被扣减,对下一个分片进行扣减if (Integer.valueOf(diffNum + "") >= 0) {//存储每一次扣减的记录,防止最终扣减还是失败进行回滚fallbackMap.put(i, (stockNum - Integer.valueOf(diffNum + "")));//重置抵扣后的库存stockNum = Integer.valueOf(diffNum + "");}}}//完全扣除所有的分片库存后,还是未清零,则回退库存返回各自分区if (stockNum > 0) {fallbackMap.forEach((k, v) -> {Object result = cacheSupport.eval(k, RedisLua.SCRIPT, CollUtil.toList(productStockKey), CollUtil.toList((0 - v) + ""));log.info("redis实例[{}] 商品[{}] 本次库存不足,扣减失败,返还缓存库存:[{}], 剩余缓存库存:[{}]", k, productStockKey, v, result);});throw new InventoryBizException("库存不足");}} catch (Exception e) {e.printStackTrace();//开始循环返还库存fallbackMap.forEach((k, v) -> {cacheSupport.eval(k, RedisLua.SCRIPT, CollUtil.toList(productStockKey), CollUtil.toList((0 - v) + ""));});throw new InventoryBizException("合并扣除库存过程中发送异常");}
}public class RedisLua {...//合并库存扣减//如果key存在,那么就先将库存取出来,尝试进行扣减//如果库存已经小于等于0,就返回-1表示没法扣减库存//否则,就对库存缓存扣减传入要扣减的数值和当前库存的最小值,然后把还需扣减多少值进行返回//如果key不存在,就返回-3public static final String MERGE_SCRIPT ="if (redis.call('exists', KEYS[1]) == 1) then"+ "    local stock = tonumber(redis.call('get', KEYS[1]));"+ "    local num = tonumber(ARGV[1]);"+ "    local diff_num = stock - num;"+ "    if (stock <= 0) then"+ "        return -1;"+ "    end;"+ "    if (num > stock) then"+ "        num = stock;"+ "    end;"+ "    redis.call('incrBy', KEYS[1], 0 - num);"+ "    if (diff_num < 0) then"+ "        return 0-diff_num;"+ "    end;"+ "    return 0;"+ "end;"+ "return -3;";//扣减库存//如果key存在,那么就先将库存取出来,尝试进行扣减//如果尝试扣减完发现超卖,就返回-1表示扣减库存失败//如果尝试扣减完发现不超卖,就对库存缓存进行负数的累加操作并返回调用incrBy的结果//如果key不存在,就返回-3public static final String SCRIPT ="if (redis.call('exists', KEYS[1]) == 1) then"+ "    local stock = tonumber(redis.call('get', KEYS[1]));"+ "    local num = tonumber(ARGV[1]);"+ "    local results_num = stock - num"+ "    if (results_num <= 0) then"+ "        return -1;"+ "    end;"+ "    if (stock >= num) then"+ "        return redis.call('incrBy', KEYS[1], 0 - num);"+ "    end;"+ "    return -2;"+ "end;"+ "return -3;";
}

(4)查询库存的实现

遍历每个缓存分片获取库存,然后累加进行返回。

@Service
public class InventoryServiceImpl implements InventoryService {...//查询当前商品SKU的剩余库存@Overridepublic BigDecimal queryProductStock(Long skuId) {//遍历RedisBigDecimal productNum = BigDecimal.ZERO;String productStockKey = RedisKeyConstants.PRODUCT_STOCK_PREFIX + skuId;int redisCount = cacheSupport.getRedisCount();for (long i = 0; i < redisCount; i++) {Object eval = cacheSupport.eval(i, RedisLua.QUERY_STOCK, CollUtil.toList(productStockKey), CollUtil.toList(productStockKey));if (!Objects.isNull(eval)) {productNum = productNum.add(BigDecimal.valueOf(Long.valueOf(eval + "")));}}return productNum;}
}public class RedisLua {...//查询库存public static final String QUERY_STOCK ="local occStock = tonumber(redis.call('get', KEYS[1]));"+ "if (occStock == nil) then"+ "    return 0;"+ "end;"+ "return occStock;";
}

相关文章:

Redis应用—4.在库存里的应用

大纲 1.库存模块设计 2.库存缓存分片和渐进式同步方案 3.基于缓存分片的下单库存扣减方案 4.商品库存设置流程与异步落库的实现 6.库存入库时"缓存分片写入 渐进式写入 写入失败进行MQ补偿"的实现 7.库存扣减时"基于库存分片依次扣减 合并扣减 扣不了…...

selenium获取请求头

【原创】Selenium获取请求头、响应头-腾讯云开发者社区-腾讯云 selenium 4.0.0 selenium-wire 5.1.0 python 3.10 from seleniumwire import webdriver import time from selenium.webdriver.common.by import By import re def get_request_headers(driver):"""…...

Rust中自定义Debug调试输出

在 Rust 中&#xff0c;通过为类型实现 fmt::Debug&#xff0c;可以自定义该类型的调试输出。fmt::Debug 是标准库中的一个格式化 trait&#xff0c;用于实现 {:?} 格式的打印。这个 trait 通常通过自动派生&#xff08;#[derive(Debug)]&#xff09;来实现&#xff0c;但你也…...

docker离线安装、linux 安装docker

之前写过一篇docker的离线安装&#xff0c;现在从头再看繁琐了&#xff0c;服务器换了&#xff0c;既然要重搭一遍就要改进一下了。下面步入正题&#xff1a; 1.下载离线软件包 https://download.docker.com/linux/static/stable/x86_64/docker-20.10.6.tgz 2.下载安装工具包…...

卓易通:鸿蒙Next系统的蜜糖还是毒药?

哈喽&#xff0c;我是老刘 最近很多人都在问鸿蒙next系统新上线的卓易通和出境易两款应用。 老刘分析了一下这个软件的一些细节&#xff0c;觉得还是蛮有意思的&#xff0c;我觉得可以从使用体验、底层原理和对鸿蒙生态的影响这三个角度来分析一下。 使用体验 性能 看到了一些测…...

AI大模型学习笔记|神经网络与注意力机制(逐行解读)

来源分享链接&#xff1a;通过网盘分享的文件&#xff1a;详解神经网络是如何训练的 链接: https://pan.baidu.com/s/12EF7y0vJfH5x6X-0QEVezg 提取码: k924 内容摘要&#xff1a;本文深入探讨了神经网络与注意力机制的基础&#xff0c;以及神经网络参数训练的过程。以鸢尾花数…...

Linux 操作系统中的管道与共享内存

目录 一、匿名管道 &#xff08;一&#xff09;基本概念 &#xff08;二&#xff09;关键现象 &#xff08;三&#xff09;管道特性 二、命名管道 &#xff08;一&#xff09;基本概念 &#xff08;二&#xff09;关键特性 三、共享内存 &#xff08;一&#xff09;基…...

恢复删除的文件:6个免费Windows电脑数据恢复软件

数据恢复软件可帮助您从众多存储设备中恢复损坏或删除的数据。您可以使用这些文件恢复软件来检索文件、文档、视频、图片等。这些应用程序支持多种标准文件格式&#xff0c;如 PNG、RTF、PDF、HTML、JPG、MP3 等。 经过超过 75 小时的研究&#xff0c;我分析了 25 最佳免费数据…...

linux网络编程 | c | select实现多路IO转接服务器

select实现多路IO转接服务器 基于该视频完成 15-select实现多路IO转接设计思路_哔哩哔哩_bilibili 通过响应式–多路IO转接实现 文章目录 select实现多路IO转接服务器1.思路&功能2.代码实现warp.hwarp.cmulti_select_sever.c运行图 3.代码解释&#xff08;细节&#xf…...

基于前后端分离的食堂采购系统源码:从设计到开发的全流程详解

本篇文章&#xff0c;笔者将从系统设计到开发的全过程进行详解&#xff0c;帮助开发者和企业了解如何高效构建一套完善的食堂采购系统。 一、系统需求分析 在开发一套基于前后端分离的食堂采购系统前&#xff0c;必须对业务需求和功能模块进行详细分析&#xff0c;确保系统设…...

小程序自定义tab-bar,踩坑记录

从官方下载代码 https://developers.weixin.qq.com/miniprogram/dev/framework/ability/custom-tabbar.html 1、把custom-tab-bar 文件放置 pages同级 修改下 custom-tab-bar 下的 JS文件 Component({data: {selected: 0,color: "#7A7E83",selectedColor: "#3…...

游戏引擎学习第52天

仓库 : https://gitee.com/mrxiao_com/2d_game 这节的内容相当多 回顾 在游戏中&#xff0c;实体被分为不同的类别&#xff1a;接近玩家的“高频实体”、距离较远并正在模拟的“低频实体”和不进行更新的“休眠实体”。这些实体会根据它们与玩家的距离进行处理&#xff0c;接…...

【热力学与工程流体力学】流体静力学实验,雷诺实验,沿程阻力实验,丘里流量计流量系数测定,局部阻力系数的测定,稳态平板法测定材料的导热系数λ

关注作者了解更多 我的其他CSDN专栏 过程控制系统 工程测试技术 虚拟仪器技术 可编程控制器 工业现场总线 数字图像处理 智能控制 传感器技术 嵌入式系统 复变函数与积分变换 单片机原理 线性代数 大学物理 热工与工程流体力学 数字信号处理 光电融合集成电路…...

【HTML】根据不同域名设置不同的网站图标(替换 link 中 href 地址)

文章目录 代码实现 <!DOCTYPE html> <html><head><meta charset"utf-8" /><meta http-equiv"x-ua-compatible" content"ieedge,chrome1" /><meta name"viewport" content"widthdevice-width&q…...

使用Navicat从SQL Server导入表数据到MySQL

在表上右键选择导入向导 选择ODBC 1.内输入ip即可&#xff0c;不需要端口号 一定要勾选允许保存密码 选择需要的表&#xff0c;下一步 根据需求&#xff0c;可修改表名、是否新建表 根据需求修改不同表的字段类型和长度 按需选择导入方式...

私有云dbPaaS为何被Gartner技术成熟度曲线标记为“废弃”?

当云计算席卷而来&#xff0c;基于云基础设施的数据库部署也改变了数据库。在传统的私有化部署&#xff08;On-premises&#xff09;和公有云部署&#xff08;Public Cloud&#xff09;之间&#xff0c;不断融合的混合IT&#xff08;Mixed IT&#xff09;形式成为最常见的企业级…...

牛客网 SQL1查询所有列

SQL1查询所有列 select id,device_id,gender,age,university,province from user_profile 每日问题 C 中面向对象编程如何实现数据隐藏&#xff1f; 在C中&#xff0c;面向对象编程&#xff08;OOP&#xff09;通过封装&#xff08;Encapsulation&#xff09;实现数据隐藏。…...

【经验分享】OpenHarmony5.0.0-release编译RK3568不过问题(已解决)

问题描述 根据操作手册正常拉取代码&#xff0c;然后编译OpenHarmony5.0.0版本rk3568项目 编译命令 ./build.sh --product-name rk3568 --ccache出现如下报错 然后真正开始出错的位置是下面这句log FAILED: ../kernel/src_tmp/linux-5.10/boot_linux ../kernel/checkpoint/c…...

如何使用ERC404协议

ERC404 ERC404协议的性质 ERC404不是一个开发代码工具包,而是一种智能合约标准规范。它就像是一份蓝图或者规则手册,规定了在以太坊区块链上开发特定智能合约应该遵循的接口、函数和事件等规则。如何使用ERC404协议 定义合约接口 首先,在开发智能合约时,要根据ERC404标准定…...

240004基于Jamva+ssm+maven+mysql的房屋租赁系统的设计与实现

基于ssmmavenmysql的房屋租赁系统的设计与实现 1.项目描述2.运行环境3.项目截图4.源码获取 1.项目描述 该项目在原有的基础上进行了优化&#xff0c;包括新增了注册功能&#xff0c;房屋模糊查询功能&#xff0c;管理员和用户信息管理等功能&#xff0c;以及对网站界面进行了优…...

centos 7 部署awstats 网站访问检测

一、基础环境准备&#xff08;两种安装方式都要做&#xff09; bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats&#xff0…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)

CSI-2 协议详细解析 (一&#xff09; 1. CSI-2层定义&#xff08;CSI-2 Layer Definitions&#xff09; 分层结构 &#xff1a;CSI-2协议分为6层&#xff1a; 物理层&#xff08;PHY Layer&#xff09; &#xff1a; 定义电气特性、时钟机制和传输介质&#xff08;导线&#…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)

骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术&#xff0c;它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton)&#xff1a;由层级结构的骨头组成&#xff0c;类似于人体骨骼蒙皮 (Mesh Skinning)&#xff1a;将模型网格顶点绑定到骨骼上&#xff0c;使骨骼移动…...

python执行测试用例,allure报乱码且未成功生成报告

allure执行测试用例时显示乱码&#xff1a;‘allure’ &#xfffd;&#xfffd;&#xfffd;&#xfffd;&#xfffd;ڲ&#xfffd;&#xfffd;&#xfffd;&#xfffd;ⲿ&#xfffd;&#xfffd;&#xfffd;Ҳ&#xfffd;&#xfffd;&#xfffd;ǿ&#xfffd;&am…...

R语言速释制剂QBD解决方案之三

本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

热门Chrome扩展程序存在明文传输风险,用户隐私安全受威胁

赛门铁克威胁猎手团队最新报告披露&#xff0c;数款拥有数百万活跃用户的Chrome扩展程序正在通过未加密的HTTP连接静默泄露用户敏感数据&#xff0c;严重威胁用户隐私安全。 知名扩展程序存在明文传输风险 尽管宣称提供安全浏览、数据分析或便捷界面等功能&#xff0c;但SEMR…...

2.2.2 ASPICE的需求分析

ASPICE的需求分析是汽车软件开发过程中至关重要的一环&#xff0c;它涉及到对需求进行详细分析、验证和确认&#xff0c;以确保软件产品能够满足客户和用户的需求。在ASPICE中&#xff0c;需求分析的关键步骤包括&#xff1a; 需求细化&#xff1a;将从需求收集阶段获得的高层需…...