前端优化完了,后端也需要优化,为了提高系统的健壮性和吞吐量,往往需要结合MQ来进行优化。本文结合rabbitMQ进行操作。但是只是解决了部分问题,其实里面暗含很多其他问题。,比如MQ消费失败问题,事务问题等。
1. 安装RabbitMQ
linux下的安装没什么可说的,因为本机懒得重装虚拟机了,所以就下载了windows版本进行安装。
erlang下载地址:http://www.erlang.org/download.html
rabbitMQ下载:http://www.rabbitmq.com/download.html
直接下载安装即可。
因为想用可视化界面监控消息,所以先激活这个功能。
1 2
| //到rabbitMQ安装目录的sbin目录下启动cmd黑窗口 E:\software\RabbitMQServer\rabbitmq_server-3.6.5\sbin>rabbitmq-plugins.bat enable rabbitmq_management
|
然后重启rabbitMQ
服务。输入网址:http://localhost:15672/
。 使用默认用户guest/guest
进入网页端控制台。
2. rabbitMQ基本原理和使用
rabbitMQ原理
- Broker:简单来说就是消息队列服务器实体。
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
- Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
- vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
- producer:消息生产者,就是投递消息的程序。
- consumer:消息消费者,就是接受消息的程序。
- channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
消息队列的使用过程大概如下
- 客户端连接到消息队列服务器,打开一个
channel
。
- 客户端声明一个
exchange
,并设置相关属性。
- 客户端声明一个
queue
,并设置相关属性。
- 客户端使用
routing key
,在exchange
和queue
之间建立好绑定关系。
- 客户端投递消息到
exchange
。
总结:exchange
接收到消息后,就根据消息的key
和已经设置的binding
,进行消息路由,将消息投递到一个或多个队列里。
Direct交换机
完全根据key
进行投递的叫做Direct交换机,例如,绑定时设置了routing key
为”abc”,那么客户端提交的消息,只有设置了key
为”abc”的才会投递到队列。
所有发送到Direct Exchange
的消息被转发到RouteKey
中指定的Queue
。
Direct
模式,可以使用rabbitMQ
自带的Exchange:default Exchange
。所以不需要将Exchange
进行任何绑定(binding
)操作 。消息传递时,RouteKey
必须完全匹配,才会被队列接收,否则该消息会被抛弃。
Topic交换机
对key
进行模式匹配后进行投递的叫做Topic交换机。*(星号)可以代替一个任意标识符 ;#(井号)可以代替零个或多个标识符。
在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。
我们创建3个绑定:Q1绑定键是“.orange.”,Q2绑定键是“..rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。
所有发送到Topic Exchange
的消息被转发到所有关心RouteKey
中指定Topic
的Queue
上,
所有发送到Topic Exchange
的消息被转发到所有关心RouteKey
中指定Topic
的Queue
上,
Exchange
将RouteKey
和某Topic
进行模糊匹配。此时队列需要绑定一个Topic
。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.” 只会匹配到“log.error”。
Fanout交换机
还有一种不需要key
的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
所有发送到Fanout Exchange
的消息都会被转发到与该Exchange
绑定(Binding
)的所有Queue
上。
Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。
所以,Fanout Exchange 转发消息是最快的。
Headers交换机
首部交换机是忽略routing_key
的一种路由方式。路由器和交换机路由的规则是通过Headers
信息来交换的,这个有点像HTTP
的Headers
。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash
的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。
绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value
可以是any
或者all
,这代表消息携带的Hash是需要全部匹配(all
),还是仅匹配一个键(any
)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。
持久化
RabbitMQ
支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
exchange
持久化,在声明时指定durable => 1
queue
持久化,在声明时指定durable => 1
- 消息持久化,在投递时指定
delivery_mode => 2
(1是非持久化)
如果exchange
和queue
都是持久化的,那么它们之间的binding
也是持久化的。如果exchange
和queue两
者之间有一个持久化,一个非持久化,就不允许建立绑定。
3. rabbitMQ-Direct交换机
这种模式是最简单的模式,就发送一串字符串,这个字符串为key
,接收的时候也完全以这个字符串本来来确定,不需要绑定任何exchange
,使用默认的就行。我们以这个模式开始在原来的项目上继续集成。
首先是引入依赖:
1 2 3 4 5
| <!--rabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
appilication.yml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: simple: concurrency: 10 max-concurrency: 10 prefetch: 1 auto-startup: true default-requeue-rejected: true template: retry: enabled: true initial-interval: 1000 max-attempts: 3 max-interval: 10000 multiplier: 1.0
|
rabbitMQ
配置类MQConfig
:
1 2 3 4 5 6 7 8 9
| @Configuration public class MQConfig { public static final String DIRECT_QUEUE_NAME = "queue"; @Bean public Queue queue(){ return new Queue(QUEUE_NAME,true); } }
|
发送者MQSender
:
1 2 3 4 5 6 7 8 9 10 11 12
| @Service @Slf4j public class MQSender {
@Autowired private AmqpTemplate amqpTemplate;
public void send(Object message){ amqpTemplate.convertAndSend(MQConfig.DIRECT_QUEUE_NAME,message); log.info("send:{}",message); } }
|
接收者MQReceiver
:
1 2 3 4 5 6 7 8 9
| @Service @Slf4j public class MQReceiver {
@RabbitListener(queues = MQConfig.DIRECT_QUEUE_NAME) public void receive(String message){ log.info("receive:{}",message); } }
|
这样,就完成了最简单的一个字符串的发送-接受。可以在controller
中随便测试一下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Controller @RequestMapping("/test") public class TestController { @Autowired private MQSender mqSender;
@RequestMapping("/mq") @ResponseBody public String mq(){ mqSender.send("hello world"); return "success"; } }
|
4. rabbitMQ-Topic交换机
这个模式正如上面所言,是可以匹配通配符的,显然更加灵活,这里用程序测试一下这个模式效果。
MQConfig:
先来几个常量:
1 2 3 4 5 6 7 8 9 10
| public static final String TOPIC_QUEUE_NAME1 = "topic.queue1";
public static final String TOPIC_QUEUE_NAME2 = "topic.queue2";
public static final String TOPIC_EXCHANGE_NAME = "topicExchange";
private static final String TOPIC_KEY_ROUTE1 = "topic.key1";
private static final String TOPIC_KEY_ROUTE2 = "topic.#";
|
下面配置几个bean
:
注:带有 @Configuration
的注解类表示这个类可以使用 Spring IoC
容器作为 bean
定义的来源。@Bean
注解告诉 Spring
,一个带有 @Bea
n 的注解方法将返回一个对象,该对象应该被注册为在 Spring
应用程序上下文中的 bean
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Bean public Queue topicQueue1(){ return new Queue(TOPIC_QUEUE_NAME1,true); } @Bean public Queue topicQueue2(){ return new Queue(TOPIC_QUEUE_NAME2,true); }
@Bean public TopicExchange topicExchange(){ return new TopicExchange(TOPIC_EXCHANGE_NAME); }
@Bean public Binding topicBinding1(){ return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_KEY_ROUTE1); }
@Bean public Binding topicBinding2(){ return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_KEY_ROUTE2); }
|
MQSender:
1 2 3 4 5 6 7 8
|
public void sendTopic(Object message){ log.info("send topic msg:{}",message); amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE_NAME,"topic.key1",message+"--1"); amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE_NAME,"topic.key2",message+"--2");
}
|
MQReceiver:
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME1) public void receiveTopic1(String message){ log.info("topic queue1 receive:{}",message); }
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME2) public void receiveTopic2(String message){ log.info("topic queue2 receive:{}",message); }
|
最后测试一把:
1 2 3 4 5 6
| @RequestMapping("/mq/topic") @ResponseBody public String mq_topic(){ mqSender.sendTopic("hello world"); return "success"; }
|
运行结果:
1 2 3 4
| 2018-05-26 18:59:40.281 INFO 9920 --- [nio-8080-exec-1] com.swg.miaosha.mq.MQSender : send topic msg:hello world 2018-05-26 18:59:40.303 INFO 9920 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : topic queue2 receive:hello world--1 2018-05-26 18:59:40.303 INFO 9920 --- [TaskExecutor-10] com.swg.miaosha.mq.MQReceiver : topic queue2 receive:hello world--2 2018-05-26 18:59:40.303 INFO 9920 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : topic queue1 receive:hello world--1
|
运行结果与初期的分析结果一致。
5. rabbitMQ-Fanout交换机
这种就是广播模式,即所有的绑定到指定的exchange
上的queue
都可以接收消息。
MQConfig:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static final String FANOUT_EXCHANGE_NAME = "fanoutExchage"; public static final String FANOUT_QUEUE_NAME1 = "fanout.queue1"; public static final String FANOUT_QUEUE_NAME2 = "fanout.queue2";
@Bean public Queue fanoutQueue1(){ return new Queue(FANOUT_QUEUE_NAME1,true); } @Bean public Queue fanoutQueue2(){ return new Queue(FANOUT_QUEUE_NAME2,true); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(FANOUT_EXCHANGE_NAME); } @Bean public Binding fanoutBinding1(){ return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2(){ return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); }
|
MQSender:
1 2 3 4
| public void sendFanout(Object message){ log.info("send fanout msg:{}",message); amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE_NAME,"",message); }
|
MQReceiver:
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = MQConfig.FANOUT_QUEUE_NAME1) public void receiveFanout1(String message){ log.info("fanout queue1 receive:{}",message); }
@RabbitListener(queues = MQConfig.FANOUT_QUEUE_NAME2) public void receiveFanout2(String message){ log.info("fanout queue2 receive:{}",message); }
|
运行结果:
1 2 3
| 2018-05-26 20:03:29.592 INFO 16680 --- [nio-8080-exec-1] com.swg.miaosha.mq.MQSender : send fanout msg:hello world 2018-05-26 20:03:29.619 INFO 16680 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : fanout queue1 receive:hello world 2018-05-26 20:03:29.619 INFO 16680 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : fanout queue2 receive:hello world
|
queue1和queue2都接受到了消息。
6. rabbitMQ-Headers交换机
MQConfig:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static final String HEADERS_EXCHANGE_NAME = "headersExchage"; public static final String HEADERS_QUEUE_NAME = "headers.queue";
@Bean public HeadersExchange headersExchange(){ return new HeadersExchange(HEADERS_EXCHANGE_NAME); } @Bean public Queue headersQueue(){ return new Queue(HEADERS_QUEUE_NAME,true); }
@Bean public Binding headersBinding(){ Map<String,Object> map = new HashMap<>(); map.put("header1","value1"); map.put("header2","value2"); return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match(); }
|
MQSender:
1 2 3 4 5 6 7 8 9 10
| public void sendHeaders(Object message){ String msg = RedisService.beanToString(message); log.info("send fanout msg:{}",message); MessageProperties properties = new MessageProperties(); properties.setHeader("header1","value1"); properties.setHeader("header2","value2"); Message obj = new Message(msg.getBytes(),properties); amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE_NAME,"",obj); }
|
MQReceiver:
1 2 3 4
| @RabbitListener(queues = MQConfig.HEADERS_QUEUE_NAME) public void receiveHeaders(byte[] message){ log.info("fanout queue2 receive:{}",new String(message)); }
|
7. 秒杀优化
思路:减少数据库访问
- 系统初始化,把商品库存数量加载到
redis
- 收到请求,
redis
预减库存,库存不够,直接返回,否则进入3
- 请求入队,立即返回排队中
- 请求出队,生成订单,减少库存
- 客户端轮询,是否秒杀成功
对于之前的秒杀接口do_miaosha
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @RequestMapping(value = "/do_miaosha",method = RequestMethod.POST) @ResponseBody public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR); GoodsVo goodsVo = goodsService.getGoodsVoByGoodsId(goodsId); if(goodsVo.getStockCount() <= 0){ return Result.error(CodeMsg.MIAO_SHA_OVER); }
MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ return Result.error(CodeMsg.REPEATE_MIAOSHA); }
OrderInfo orderInfo = miaoshaService.miaosha(user,goodsVo);
return Result.success(orderInfo); }
|
这里判断库存是直接从数据库查,因为并发量比较大,存在性能问题。后面秒杀到之后,也不是直接减库存, 而是将其放到消息队列中慢慢交给数据库去调整。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @RequestMapping(value = "/do_miaosha",method = RequestMethod.POST) @ResponseBody public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR);
long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId); if(stock < 0){ return Result.error(CodeMsg.MIAO_SHA_OVER); } MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ return Result.error(CodeMsg.REPEATE_MIAOSHA); } MiaoshaMessage message = new MiaoshaMessage(); message.setUser(user); message.setGoodsId(goodsId); sender.sendMiaoshaMessage(message); return Result.success(0); }
|
在消息队列中对消息进行消化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @RabbitListener(queues = MQConfig.MIAOSHA_QUEUE) public void receive(String message){ log.info("receive message:{}",message); MiaoshaMessage msg = RedisService.stringToBean(message,MiaoshaMessage.class); MiaoshaUser user = msg.getUser(); long goodsId = msg.getGoodsId(); GoodsVo goodsVo = goodsService.getGoodsVoByGoodsId(goodsId); if(goodsVo.getStockCount() <= 0){ return; } MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ return; }
OrderInfo orderInfo = miaoshaService.miaosha(user,goodsVo);
}
|
对于controller
中的优化1:redis
预减库存。那么需要在系统启动的时候将秒杀商品的库存先添加到redis
中:
1
| public class MiaoshaController implements InitializingBean
|
重写afterPropertiesSet()
方法:
1 2 3 4 5 6 7 8 9 10 11
| @Override public void afterPropertiesSet() throws Exception { List<GoodsVo> goodsVoList = goodsService.getGoodsVoList(); if(goodsVoList == null){ return; } for(GoodsVo goods:goodsVoList){ redisService.set(GoodsKey.getMiaoshaGoodsStock,""+goods.getId(),goods.getStockCount()); } }
|
对于前端,这时也要进行修改了,因为点击秒杀商品按键后,这里考虑三种情况:排队等待、失败、成功。那么这里规定-1为失败,0为排队,1为秒杀成功已经写入数据库。
原来的detail.htm
中秒杀事件函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| function doMiaosha(){ $.ajax({ url:"/miaosha/do_miaosha", type:"POST", data:{ goodsId:$("#goodsId").val(), }, success:function(data){ if(data.code == 0){ window.location.href="/order_detail.htm?orderId="+data.data.id; }else{ layer.msg(data.msg); } }, error:function(){ layer.msg("客户端请求有误"); } }); }
|
秒杀到商品就直接返回,现在后端改为消息队列,所以需要增加函数进行判断,必要时需要轮询:
1 2 3 4 5
| if(data.code == 0){ window.location.href="/order_detail.htm?orderId="+data.data.id; }else{ layer.msg(data.msg); }
|
所以将其改为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| ... if(data.code == 0){ getMiaoshaResult($("#goodsId").val()); }else{ layer.msg(data.msg); } ...
function getMiaoshaResult(goodsId) { g_showLoading(); $.ajax({ url:"/miaosha/result", type:"GET", data:{ goodsId:$("#goodsId").val(), }, success:function(data){ if(data.code == 0){ var result = data.data; if(result <= 0){ layer.msg("对不起,秒杀失败!"); } else if(result == 0){ setTimeout(function () { getMiaoshaResult(goodsId); },50); } else { layer.msg("恭喜你,秒杀成功,查看订单?",{btn:["确定","取消"]}, function () { window.location.href="/order_detail.htm?orderId="+result; }, function () { layer.closeAll(); } ); } }else{ layer.msg(data.msg); } }, error:function(){ layer.msg("客户端请求有误"); } }); }
|
那么相应地,后台也要增加一个方法:result
1 2 3 4 5 6 7 8 9
| @RequestMapping(value = "/result",method = RequestMethod.GET) @ResponseBody public Result<Long> result(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR);
long result = miaoshaService.getMiaoshaResult(user.getId(),goodsId); return Result.success(result); }
|
那么如何标记状态呢?这就是getMiaoshaResult
方法所做的事情。
对于成功的状态判断,很简单,从数据库查,能查到就说明已经秒杀成功,否则就是两种情况:失败或者正在等待生成订单。
对于这两种状态,我们需要用redis来实现,思路是:在系统初始化的时候,redis中设置秒杀商品是否卖完的状态为false—即未卖完;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public long getMiaoshaResult(Long userId, long goodsId) { MiaoshaOrder orderInfo = orderService.getMiaoshaOrderByUserIdGoodsId(userId,goodsId); if(orderInfo != null){ return orderInfo.getId(); }else{ boolean isOver = getGoodsOver(goodsId); if(isOver){ return -1; }else{ return 0; } } }
|
在MiaoshaService
中的Miaosha
方法:数据库减库存失败的话,说明数据库的库存已经小于0了,那么这个时候,立即将redis初始设置的秒杀商品是否卖完的状态为true,表示商品已经全部卖完,返回秒杀失败。否则就是要前端等待等待。
1 2 3 4 5 6 7 8 9 10 11
| @Transactional public OrderInfo miaosha(MiaoshaUser user, GoodsVo goods) { boolean success =goodsService.reduceStock(goods); if(success){ return orderService.createOrder(user,goods); }else{ setGoodsOver(goods.getId()); return null; } }
|
对于两个小方法getGoodsOver
和setGoodsOver
:
1 2 3 4 5 6 7
| private void setGoodsOver(long goodId){ redisService.set(MiaoshaKey.isGoodsOver,""+goodId,true); }
private boolean getGoodsOver(long goodsId) { return redisService.exists(MiaoshaKey.isGoodsOver,""+goodsId); }
|
那么redis
预减库存,然后消息队列来进行创建订单就实现了。
当然,对于redis
预减库存这一点,还有要优化的地方,就是现在的do_miaosha
接口是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @RequestMapping(value = "/do_miaosha",method = RequestMethod.POST) @ResponseBody public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR);
long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId); if(stock < 0){ return Result.error(CodeMsg.MIAO_SHA_OVER); } MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ return Result.error(CodeMsg.REPEATE_MIAOSHA); } MiaoshaMessage message = new MiaoshaMessage(); message.setUser(user); message.setGoodsId(goodsId); sender.sendMiaoshaMessage(message); return Result.success(0); }
|
但是,当秒杀商品已经没了的时候,就没有必要再去redis
中进行判断了,毕竟查询redis
也是需要网络开销的,解决思路是:在内存中进行判断,如果redisService.decr
得到的stock
少于零的时候,直接将内存中的一个标志改变一下,那么下次再进入do_miaosha
接口,先判断内存这个标记,如果库存已经小于0了,就不再访问redis
,而是直接返回秒杀商品已经卖完。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @RequestMapping(value = "/do_miaosha",method = RequestMethod.POST) @ResponseBody public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR);
long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId); if(stock < 0){ return Result.error(CodeMsg.MIAO_SHA_OVER); } MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ localOverMap.put(goodsId,true); return Result.error(CodeMsg.REPEATE_MIAOSHA); } MiaoshaMessage message = new MiaoshaMessage(); message.setUser(user); message.setGoodsId(goodsId); sender.sendMiaoshaMessage(message);
return Result.success(0); }
|
声明一个map
:
1
| private Map<Long,Boolean> localOverMap = new HashMap<>();
|
那么在afterPropertiesSet
这个系统加载的初始化方法中对这个map
进行初始化,goodsId--stock
:
1
| localOverMap.put(goods.getId(),false);
|
在原来的redis
预减库存初,发现库存小于0 ,就改为true:
1 2 3 4
| if(stock < 0){ localOverMap.put(goodsId,true); return Result.error(CodeMsg.MIAO_SHA_OVER); }
|
最后do_miaosha
接口变为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @RequestMapping(value = "/do_miaosha",method = RequestMethod.POST) @ResponseBody public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR);
boolean over = localOverMap.get(goodsId); if(over){ return Result.error(CodeMsg.MIAO_SHA_OVER); }
long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId); if(stock < 0){ localOverMap.put(goodsId,true); return Result.error(CodeMsg.MIAO_SHA_OVER); } MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ return Result.error(CodeMsg.REPEATE_MIAOSHA); } MiaoshaMessage message = new MiaoshaMessage(); message.setUser(user); message.setGoodsId(goodsId); sender.sendMiaoshaMessage(message);
return Result.success(0); }
|
ok,整个关于redis
预减库存和rabbitMQ
创建订单这个优化已经基本完成了。