一、扣减库存问题分析 
在提交订单的时候,要扣减库存,对于sql,是这么写的:
1 update  t_stcok set  stock = stock-2  where  sku_id = 1 
首先这条sql存在超卖问题,很有可能会减成负数。可能会改成如下:
1 update  t_stcok set  stock = stock-2  where  sku_id = 1  and  stock > 2 
这样好像解决了超卖问题。但是引入了新的问题。由于库存牵涉进货、补货等系统,所以是个独立的服务。
并且,比如我是通过MQ去通知库存进行扣减库存,但是由于网络抖动,请求扣减库存没有结果,这个时候可能需要进行重试。重试之后,可能成功了,这个时候,有可能这两次都成功了。那么,一个用户买一样东西,但是库存扣了两遍。这就是幂等。如果不做幂等处理,重试会出现上述这种致命问题。
那么如何做到幂等呢?
实际上就是追求数据一致性。那么就可以考虑锁来保证,比如我这里用乐观锁来实现:
1 2 3 select  stock,version  from  t_stock;if(stock > 用户购买数量)     update  t_stcok set  stock = stock-2  where  sku_id = 1  and  version  = last_version 
但是,一旦出现并发,那么可能这个用户是执行update失败的,所以还需要去重试(guava retry或者spring retry都可以优雅地实现重试),直到成功或者库存已经不足。
那么,在少量并发的情况下,可以考虑乐观锁,要不然会大量失败,此时需要用悲观锁:
1 2 select  * from  t_stock for  update ;下面执行update 操作。。。 
在一个事务内,第一句为select for update,那么这一行数据就会被本线程锁住,整个事务执行完才能允许其他线程进来。
存在的问题:一个线程锁住这行数据,那么其他线程都要等待,效率很低。
那么,如何保证数据一致性,还可以提高效率呢?
对于扣减库存,往往是先在redis中进行扣减库存。redis是单线程,是高速串行执行,不存在并发问题。
如果是单机redis,可以在同一个事务中保证一次性执行:
1 2 3 4 5 watch stock multi if stock > count     stock = stock - count; exec 
但是不能在集群中用(分布在不同节点上时),所以用watch不通用。
redis都是原子操作,比如自增:incrby,用这个就可以判断库存是否够。就是所谓的redis预减库存。
但是在实际中,库存表里有两个字段:库存和锁定库存。
锁定库存是表示多少用户真正下单了,但是还没有支付。锁定库存+库存=总库存,等用户真正支付之后,就可以将锁定库存减掉。那么,此时,redis中需要存库存和锁定库存这两个值,上面单一的原子操作就不行了。
解决方案:redis+lua
为什么要用lua呢?可以用lua将一系列操作封装起来执行,输入自己的参数即可。lua脚本在redis中执行是串行的、原子性的。
OK,下面就实战一波:根据skuId查询缓存中的库存值。
二、查询库存(设置库存) 
首先,我们要明确一点,redis中的库存初始值是由后台的系统人工提前配置好的,在进行商品销售时(用户下单时),直接从redis中先进行库存的扣减。
这里呢,我们没有进行初始化,而是在程序中进行判断:如果redis已经有了这个库存值,就将他查询出来返回;否则,就去数据库查询,然后对redis进行初始化。
这里的一个问题是:如果存在并发问题,但是我们初始化两个值(库存值和库存锁定值),这里采用lua脚本,在lua脚本中完成初始化,并且对于两个用户同时进行初始化库存的问题,可以在lua中进行判断,因为redis是单线程,lua也是单线程,不用担心会同时初始化两次。
下面首先写一个接口,根据skuid查询库存(库存和锁定库存):
1 2 3 4 5 6 7 8 9 10 11 12 @RequestMapping ("/query/{skuId}" )public  ApiResult<Stock> queryStock (@PathVariable long  skuId)     ApiResult<Stock>  result = new  ApiResult(Constants.RESP_STATUS_OK,"库存查询成功" );     Stock stock = new  Stock();     stock.setSkuId(skuId);     int  stockCount = stockService.queryStock(skuId);     stock.setStock(stockCount);     result.setData(stock);     return   result; } 
service层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public  int  queryStock (long  skuId)           Stock stock ;     String stockKey = Constants.CACHE_PRODUCT_STOCK+":" +skuId;     String stockLockKey = Constants.CACHE_PRODUCT_STOCK_LOCK+":" +skuId;               Object stockObj = redisTemplate.opsForValue().get(stockKey);     Integer stockInRedis = null  ;     if (stockObj!=null ){         stockInRedis = Integer.valueOf(stockObj.toString());     }               if (stockInRedis==null ){                  stock = stockMapper.selectBySkuId(skuId);                                             redisUtils.skuStockInit(stockKey,stockLockKey,stock.getStock().toString(),stock.getLockStock().toString());     }else {         return  stockInRedis;     }               return  stock.getStock(); } 
那么这个工具类为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public  static  final  String STOCK_CACHE_LUA =        "local stock = KEYS[1] "  +                 "local stock_lock = KEYS[2] "  +                 "local stock_val = tonumber(ARGV[1]) "  +                 "local stock_lock_val = tonumber(ARGV[2]) "  +                 "local is_exists = redis.call(\"EXISTS\", stock) "  +                 "if is_exists == 1  then "  +                 "   return 0 "  +                 "else  "  +                 "   redis.call(\"SET\", stock, stock_val) "  +                 "   redis.call(\"SET\", stock_lock, stock_lock_val) "  +                 "   return 1 "  +                 "end" ;                                           public  boolean  skuStockInit (String stockKey,String stockLockKey,String stock,String stockLock)          Object result  = redisTemplate.execute((RedisCallback<Object>) redisConnection -> {         Jedis jedis = (Jedis)redisConnection.getNativeConnection();         return  jedis.eval(STOCK_CACHE_LUA, Collections.unmodifiableList(Arrays.asList(stockKey,stockLockKey))                 ,Collections.unmodifiableList(Arrays.asList(stock, stockLock)));     });     if  (EXCUTE_SUCCESS.equals(result)) {         return  true ;     }     return  false ; } 
对于lua脚本进行稍微的解释一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 //第一组数据是key数组;第二组数据是args数组,是与key数组对应的值,就是库存 //我们这里第一组为[stockKey,stockLockKey],就是存在redis中的名字,这里是在service层中定义好了 //第二组为[50 ,0 ],这个值就是可以从数据库表t_stock中查询出来的 //因为执行这段lua脚本的话,说明redis中没有缓存的数据,所以需要先查询数据库,然后将缓存设置好 //lua中定义变量用local  local  stock = KEYS[1 ]local  stock_lock = KEYS[2 ]local  stock_val = tonumber (ARGV[1 ])local  stock_lock_val = tonumber (ARGV[2 ])//再查询一遍缓存是否存在,防止两个线程同时进来设置缓存 //存在就不用设置缓存了,否则就设置缓存 local  is_exists = redis.call("EXISTS" , stock)if  is_exists == 1   then     return  0  else     redis.call("SET" , stock, stock_val)     redis.call("SET" , stock_lock, stock_lock_val)     return  1  end 
那么,启动工程mama-buy-stock:假如我去查询skuId=1的商品:
第一次库存不存在,那么就会去查询数据库:
我们再来看看redis中的数据:
三、扣减库存 
下面来看看扣减库存是如何实现的。因为提交订单后,往往是不止一件商品的,往往购物车内有很多件商品,同时过来,假设有五件商品,但是其中只有一件暂时没有库存了,那么我还是希望其他的四件商品能够卖出去,只是没有库存的商品就不算钱了。所以扣减库存用一个map来装,即Map<skuId,count>
controller层:
1 2 3 4 5 6 7 @RequestMapping ("/reduce" )public  ApiResult<Map<Long,Integer>> reduceStock(@RequestBody  List<StockReduce> stockReduceList){    ApiResult result = new  ApiResult(Constants.RESP_STATUS_OK,"库存扣减成功" );     Map<Long,Integer> resultMap =  stockService.reduceStock(stockReduceList);     result.setData(resultMap);     return  result; } 
service层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Override @Transactional public  Map<Long, Integer> reduceStock (List<StockReduce> stockReduceList)      Map<Long, Integer> resultMap = new  HashMap<>();          stockReduceList.stream().forEach(param -> {         String stockKey = Constants.CACHE_PRODUCT_STOCK+":" +param.getSkuId();         String stockLockKey = Constants.CACHE_PRODUCT_STOCK_LOCK+":" +param.getSkuId();         Object result = redisUtils.reduceStock(stockKey,                                                stockLockKey,                                                param.getReduceCount().toString(),                                                String.valueOf(Math.abs(param.getReduceCount())));         if (result instanceof  Long){                          resultMap.put(param.getSkuId(),-1 );         }else  if  (result instanceof  List){                          List resultList =  ((List) result);             int  stockAftChange =  ((Long)resultList.get(0 )).intValue();             int  stockLockAftChange = ((Long) resultList.get(1 )).intValue();             StockFlow stockFlow = new  StockFlow();             stockFlow.setOrderNo(param.getOrderNo());             stockFlow.setSkuId(param.getSkuId());             stockFlow.setLockStockAfter(stockLockAftChange);             stockFlow.setLockStockBefore(stockLockAftChange+param.getReduceCount());             stockFlow.setLockStockChange(Math.abs(param.getReduceCount()));             stockFlow.setStockAfter(stockAftChange);             stockFlow.setStockBefore(stockAftChange+Math.abs(param.getReduceCount()));             stockFlow.setStockChange(param.getReduceCount());             stockFlowMapper.insertSelective(stockFlow);             resultMap.put(param.getSkuId(),1 );         }     });     return  resultMap; } 
对于redis的操作,基本与上一致:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public  static  final  String STOCK_REDUCE_LUA=        "local stock = KEYS[1]\n"  +                 "local stock_lock = KEYS[2]\n"  +                 "local stock_change = tonumber(ARGV[1])\n"  +                 "local stock_lock_change = tonumber(ARGV[2])\n"  +                 "local is_exists = redis.call(\"EXISTS\", stock)\n"  +                 "if is_exists == 1 then\n"  +                 "    local stockAftChange = redis.call(\"INCRBY\", stock,stock_change)\n"  +                 "    if(stockAftChange<0) then\n"  +                 "        redis.call(\"DECRBY\", stock,stock_change)\n"  +                 "        return -1\n"  +                 "    else \n"  +                 "        local stockLockAftChange = redis.call(\"INCRBY\", stock_lock,stock_lock_change)\n"  +                 "        return {stockAftChange,stockLockAftChange}\n"  +                 "    end "  +                 "else \n"  +                 "    return 0\n"  +                 "end" ;                                                                public  Object reduceStock (String stockKey,String stockLockKey,String stockChange,String stockLockChange)     Object result  = redisTemplate.execute((RedisCallback<Object>) redisConnection -> {         Jedis jedis = (Jedis)redisConnection.getNativeConnection();         return  jedis.eval(STOCK_REDUCE_LUA, Collections.unmodifiableList(Arrays.asList(stockKey,stockLockKey))                 ,Collections.unmodifiableList(Arrays.asList(stockChange, stockLockChange)));     });     return  result; } 
此时,一旦数据库发生异常,那么就会回滚,但是redis中是无法回滚的。这个问题不用担心,因为数据库发生异常是及其严重的问题,是很少会发生的,一旦发生,只需要去这个流水的表中去查看情况,然后去执行脚本去初始化这个redis即可。所以是可以补救的。
但是接口的幂等性还没有做。重复尝试调用这个接口(通常是发生在MQ的失败重传机制,客户端的连续点击一般是可以避免的),可能会重复减redis库存并且重复地去插入流水记录。这个问题该如何解决呢?
四、redis分布式锁来实现幂等性 
主流的方案,比如有用一张表来控制,比如以这个orderID为唯一主键,一旦插入成功,就可以根据这个唯一主键的存在与否判断是否为重复请求(也就是说,这里的扣减库存和插入去重表放在一个事务里,去重表中有一个字段为orderId,全局唯一不重复,用唯一索引进行约束,那么插入的时候判断这个去重表是否可以插入成功,如果不成功,那么数据库操作全部回滚)。
可以用redis分布式锁给这个订单上锁。以订单id为锁,不会影响其他线程来扣减库存,所以不影响性能。
针对这个订单,第一次肯定是可以去扣减库存的,但是第二次再接收到这个请求,那么就要返回已经成功了,不要再重复扣减。
对于reduceStock()这个方法最前面增加锁:
1 2 3 4 5 6 7 8 9 Long orderNo = stockReduceList.get(0 ).getOrderNo(); boolean  lockResult = redisUtils.distributeLock(Constants.ORDER_RETRY_LOCK+orderNo.toString(),orderNo.toString(),300000 );if (!lockResult){         return   Collections.EMPTY_MAP; } ... 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private  static  final  String LOCK_SUCCESS = "OK" ;private  static  final  String SET_IF_NOT_EXIST = "NX" ;private  static  final  String SET_WITH_EXPIRE_TIME = "PX" ;private  static  final  Long EXCUTE_SUCCESS = 1L ;private  static  final  String UNLOCK_LUA=        "if redis.call('get', KEYS[1]) == ARGV[1] then "  +                 "   return redis.call('del', KEYS[1]) "  +                 "else "  +                 "   return 0 "  +                 "end" ; public   boolean  distributeLock (String lockKey, String requestId, int  expireTime)     String result = redisTemplate.execute((RedisCallback<String>) redisConnection -> {         JedisCommands commands = (JedisCommands)redisConnection.getNativeConnection();         return  commands.set(lockKey,requestId,SET_IF_NOT_EXIST,SET_WITH_EXPIRE_TIME,expireTime);     });     if  (LOCK_SUCCESS.equals(result)) {         return  true ;     }     return  false ; } public  boolean  releaseDistributelock (String lockKey, String requestId)     Object result  = redisTemplate.execute((RedisCallback<Object>) redisConnection -> {         Jedis jedis = (Jedis)redisConnection.getNativeConnection();         return  jedis.eval(UNLOCK_LUA, Collections.singletonList(lockKey), Collections.singletonList(requestId));     });     if  (EXCUTE_SUCCESS.equals(result)) {         return  true ;     }     return  false ; } 
注意,这里不需要我们主动去释放分布式锁,只要设置一个大于重试时间的过期时间即可。让它自己删除。
注意redis在集群下做分布式锁,最好要用Redission。这里如果用于集群,如何lua脚本在一个事务里同时操作多个key的时候,如果要保证这个事务生效,就需要保证这几个key都要在同一个节点上。但是,比如我们这里的两个key:
1 2 public  static  final  String CACHE_PRODUCT_STOCK = "product:stock" ;public  static  final  String CACHE_PRODUCT_STOCK_LOCK = "product:stock:lock" ;
因为我们这里要同时对库存和锁定库存这两个key进行操作,需要放在一个事务内执行,不处理的话,一旦他们不在一个节点,那么事务就不会生效,解决方案:
1 2 public  static  final  String CACHE_PRODUCT_STOCK = "{product:stock}" ;public  static  final  String CACHE_PRODUCT_STOCK_LOCK = "{product:stock}:lock" ;
如果加上花括号,那么在进行计算hash值的时候,他们两就会是一样的,会被投放到同一个slot中,自然就保证了在同一个节点上。
五、测试一下