前端优化完了,后端也需要优化,为了提高系统的健壮性和吞吐量,往往需要结合MQ来进行优化。本文结合rabbitMQ进行操作。但是只是解决了部分问题,其实里面暗含很多其他问题。,比如MQ消费失败问题,事务问题等。

1. 安装RabbitMQ

linux下的安装没什么可说的,因为本机懒得重装虚拟机了,所以就下载了windows版本进行安装。

erlang下载地址:http://www.erlang.org/download.html

rabbitMQ下载:http://www.rabbitmq.com/download.html

直接下载安装即可。

因为想用可视化界面监控消息,所以先激活这个功能。

1
2
//到rabbitMQ安装目录的sbin目录下启动cmd黑窗口
E:\software\RabbitMQServer\rabbitmq_server-3.6.5\sbin>rabbitmq-plugins.bat enable rabbitmq_management

然后重启rabbitMQ服务。输入网址:http://localhost:15672/。 使用默认用户guest/guest进入网页端控制台。

2. rabbitMQ基本原理和使用

rabbitMQ原理

image

  • Broker:简单来说就是消息队列服务器实体。
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
  • producer:消息生产者,就是投递消息的程序。
  • consumer:消息消费者,就是接受消息的程序。
  • channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

消息队列的使用过程大概如下

  • 客户端连接到消息队列服务器,打开一个channel
  • 客户端声明一个exchange,并设置相关属性。
  • 客户端声明一个queue,并设置相关属性。
  • 客户端使用routing key,在exchangequeue之间建立好绑定关系。
  • 客户端投递消息到exchange

总结:exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

Direct交换机

完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。

image

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue

Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

Topic交换机

key进行模式匹配后进行投递的叫做Topic交换机。*(星号)可以代替一个任意标识符 ;#(井号)可以代替零个或多个标识符。

image

在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。
我们创建3个绑定:Q1绑定键是“.orange.”,Q2绑定键是“..rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定TopicQueue上,

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定TopicQueue上,

ExchangeRouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.只会匹配到“log.error”。

Fanout交换机

还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

image

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。

Headers交换机

首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTPHeaders。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列

绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。

持久化

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:

  • exchange持久化,在声明时指定durable => 1
  • queue持久化,在声明时指定durable => 1
  • 消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

如果exchangequeue都是持久化的,那么它们之间的binding也是持久化的。如果exchangequeue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

3. rabbitMQ-Direct交换机

这种模式是最简单的模式,就发送一串字符串,这个字符串为key,接收的时候也完全以这个字符串本来来确定,不需要绑定任何exchange,使用默认的就行。我们以这个模式开始在原来的项目上继续集成。

首先是引入依赖:

1
2
3
4
5
<!--rabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

appilication.yml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true
template:
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
multiplier: 1.0

rabbitMQ配置类MQConfig

1
2
3
4
5
6
7
8
9
@Configuration
public class MQConfig {
//MQ name
public static final String DIRECT_QUEUE_NAME = "queue";
@Bean
public Queue queue(){
return new Queue(QUEUE_NAME,true);
}
}

发送者MQSender

1
2
3
4
5
6
7
8
9
10
11
12
@Service
@Slf4j
public class MQSender {

@Autowired
private AmqpTemplate amqpTemplate;

public void send(Object message){
amqpTemplate.convertAndSend(MQConfig.DIRECT_QUEUE_NAME,message);
log.info("send:{}",message);
}
}

接收者MQReceiver

1
2
3
4
5
6
7
8
9
@Service
@Slf4j
public class MQReceiver {

@RabbitListener(queues = MQConfig.DIRECT_QUEUE_NAME)
public void receive(String message){
log.info("receive:{}",message);
}
}

这样,就完成了最简单的一个字符串的发送-接受。可以在controller中随便测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Controller
@RequestMapping("/test")
public class TestController {
@Autowired
private MQSender mqSender;

@RequestMapping("/mq")
@ResponseBody
public String mq(){
mqSender.send("hello world");
return "success";
}
}

4. rabbitMQ-Topic交换机

这个模式正如上面所言,是可以匹配通配符的,显然更加灵活,这里用程序测试一下这个模式效果。

MQConfig:

先来几个常量:

1
2
3
4
5
6
7
8
9
10
//queue1名字
public static final String TOPIC_QUEUE_NAME1 = "topic.queue1";
//queue2名字
public static final String TOPIC_QUEUE_NAME2 = "topic.queue2";
//交换机名字
public static final String TOPIC_EXCHANGE_NAME = "topicExchange";
//key等于topic.key1的,后面将配置为只被queue1接收
private static final String TOPIC_KEY_ROUTE1 = "topic.key1";
//key匹配topic.#的都被接收进queue2
private static final String TOPIC_KEY_ROUTE2 = "topic.#";

下面配置几个bean

注:带有 @Configuration 的注解类表示这个类可以使用 Spring IoC 容器作为 bean 定义的来源。@Bean 注解告诉 Spring,一个带有 @Bean 的注解方法将返回一个对象,该对象应该被注册为在 Spring应用程序上下文中的 bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//创建两个QUEUE对象queue1,queue2的bean被spring管理
@Bean
public Queue topicQueue1(){
return new Queue(TOPIC_QUEUE_NAME1,true);
}
@Bean
public Queue topicQueue2(){
return new Queue(TOPIC_QUEUE_NAME2,true);
}
//交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE_NAME);
}
//queue1--交换机--匹配规则1
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_KEY_ROUTE1);
}
//queue2--交换机--匹配规则2
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_KEY_ROUTE2);
}

MQSender:

1
2
3
4
5
6
7
8
//消息1与topic.key1和topic.#都匹配;
//消息2与topic.key1不匹配,只与topic.#匹配,那么只能被queue2接收
public void sendTopic(Object message){
log.info("send topic msg:{}",message);
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE_NAME,"topic.key1",message+"--1");
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE_NAME,"topic.key2",message+"--2");

}

MQReceiver:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME1)
public void receiveTopic1(String message){
log.info("topic queue1 receive:{}",message);
}

@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME2)
public void receiveTopic2(String message){
log.info("topic queue2 receive:{}",message);
}

最后测试一把:

1
2
3
4
5
6
@RequestMapping("/mq/topic")
@ResponseBody
public String mq_topic(){
mqSender.sendTopic("hello world");
return "success";
}

运行结果:

1
2
3
4
2018-05-26 18:59:40.281  INFO 9920 --- [nio-8080-exec-1] com.swg.miaosha.mq.MQSender : send topic msg:hello world
2018-05-26 18:59:40.303 INFO 9920 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : topic queue2 receive:hello world--1
2018-05-26 18:59:40.303 INFO 9920 --- [TaskExecutor-10] com.swg.miaosha.mq.MQReceiver : topic queue2 receive:hello world--2
2018-05-26 18:59:40.303 INFO 9920 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : topic queue1 receive:hello world--1

运行结果与初期的分析结果一致。

5. rabbitMQ-Fanout交换机

这种就是广播模式,即所有的绑定到指定的exchange上的queue都可以接收消息。

MQConfig:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static final String FANOUT_EXCHANGE_NAME = "fanoutExchage";
public static final String FANOUT_QUEUE_NAME1 = "fanout.queue1";
public static final String FANOUT_QUEUE_NAME2 = "fanout.queue2";


@Bean
public Queue fanoutQueue1(){
return new Queue(FANOUT_QUEUE_NAME1,true);
}
@Bean
public Queue fanoutQueue2(){
return new Queue(FANOUT_QUEUE_NAME2,true);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE_NAME);
}
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}

MQSender:

1
2
3
4
public void sendFanout(Object message){
log.info("send fanout msg:{}",message);
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE_NAME,"",message);
}

MQReceiver:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = MQConfig.FANOUT_QUEUE_NAME1)
public void receiveFanout1(String message){
log.info("fanout queue1 receive:{}",message);
}

@RabbitListener(queues = MQConfig.FANOUT_QUEUE_NAME2)
public void receiveFanout2(String message){
log.info("fanout queue2 receive:{}",message);
}

运行结果:

1
2
3
2018-05-26 20:03:29.592  INFO 16680 --- [nio-8080-exec-1] com.swg.miaosha.mq.MQSender              : send fanout msg:hello world
2018-05-26 20:03:29.619 INFO 16680 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : fanout queue1 receive:hello world
2018-05-26 20:03:29.619 INFO 16680 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : fanout queue2 receive:hello world

queue1和queue2都接受到了消息。

6. rabbitMQ-Headers交换机

MQConfig:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static final String HEADERS_EXCHANGE_NAME = "headersExchage";
public static final String HEADERS_QUEUE_NAME = "headers.queue";

@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange(HEADERS_EXCHANGE_NAME);
}
@Bean
public Queue headersQueue(){
return new Queue(HEADERS_QUEUE_NAME,true);
}
//就是说要完全匹配这个Map才能进入queue中发送出去
@Bean
public Binding headersBinding(){
Map<String,Object> map = new HashMap<>();
map.put("header1","value1");
map.put("header2","value2");
return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match();
}

MQSender:

1
2
3
4
5
6
7
8
9
10
//map要一样
public void sendHeaders(Object message){
String msg = RedisService.beanToString(message);
log.info("send fanout msg:{}",message);
MessageProperties properties = new MessageProperties();
properties.setHeader("header1","value1");
properties.setHeader("header2","value2");
Message obj = new Message(msg.getBytes(),properties);
amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE_NAME,"",obj);
}

MQReceiver:

1
2
3
4
@RabbitListener(queues = MQConfig.HEADERS_QUEUE_NAME)
public void receiveHeaders(byte[] message){
log.info("fanout queue2 receive:{}",new String(message));
}

7. 秒杀优化

思路:减少数据库访问

  • 系统初始化,把商品库存数量加载到redis
  • 收到请求,redis预减库存,库存不够,直接返回,否则进入3
  • 请求入队,立即返回排队中
  • 请求出队,生成订单,减少库存
  • 客户端轮询,是否秒杀成功

对于之前的秒杀接口do_miaosha

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RequestMapping(value = "/do_miaosha",method = RequestMethod.POST)
@ResponseBody
public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){
if(user == null)
return Result.error(CodeMsg.SESSION_ERROR);
//判断库存
GoodsVo goodsVo = goodsService.getGoodsVoByGoodsId(goodsId);
if(goodsVo.getStockCount() <= 0){
return Result.error(CodeMsg.MIAO_SHA_OVER);
}


//判断是否已经秒杀到了
MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId);
if(miaoshaOrder != null){
return Result.error(CodeMsg.REPEATE_MIAOSHA);
}


//减库存、下订单、写入秒杀订单,需要在一个事务中执行
OrderInfo orderInfo = miaoshaService.miaosha(user,goodsVo);

return Result.success(orderInfo);
}

这里判断库存是直接从数据库查,因为并发量比较大,存在性能问题。后面秒杀到之后,也不是直接减库存, 而是将其放到消息队列中慢慢交给数据库去调整。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@RequestMapping(value = "/do_miaosha",method = RequestMethod.POST)
@ResponseBody
public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){
if(user == null)
return Result.error(CodeMsg.SESSION_ERROR);

//1.预减库存进行优化
/*********************************优化1开始*************************************/
long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId);
if(stock < 0){
return Result.error(CodeMsg.MIAO_SHA_OVER);
}
/*********************************优化1结束*************************************/

//2.判断是否已经秒杀到了
MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId);
if(miaoshaOrder != null){
return Result.error(CodeMsg.REPEATE_MIAOSHA);
}
/*********************************优化2开始*************************************/
//3.进入消息队列
MiaoshaMessage message = new MiaoshaMessage();
message.setUser(user);
message.setGoodsId(goodsId);
sender.sendMiaoshaMessage(message);
/*********************************优化2结束*************************************/
return Result.success(0);//排队中
}

在消息队列中对消息进行消化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RabbitListener(queues = MQConfig.MIAOSHA_QUEUE)
public void receive(String message){
log.info("receive message:{}",message);
MiaoshaMessage msg = RedisService.stringToBean(message,MiaoshaMessage.class);
MiaoshaUser user = msg.getUser();
long goodsId = msg.getGoodsId();
//判断数据库库存是否真的足够
GoodsVo goodsVo = goodsService.getGoodsVoByGoodsId(goodsId);
if(goodsVo.getStockCount() <= 0){
return;
}
//判断是否已经秒杀到了
MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId);
if(miaoshaOrder != null){
return;
}


//减库存、下订单、写入秒杀订单,需要在一个事务中执行
OrderInfo orderInfo = miaoshaService.miaosha(user,goodsVo);

}

对于controller中的优化1:redis预减库存。那么需要在系统启动的时候将秒杀商品的库存先添加到redis中:

1
public class MiaoshaController implements InitializingBean

重写afterPropertiesSet()方法:

1
2
3
4
5
6
7
8
9
10
11
@Override
public void afterPropertiesSet() throws Exception {
//将秒杀商品的库存全部先存储到redis中
List<GoodsVo> goodsVoList = goodsService.getGoodsVoList();
if(goodsVoList == null){
return;
}
for(GoodsVo goods:goodsVoList){
redisService.set(GoodsKey.getMiaoshaGoodsStock,""+goods.getId(),goods.getStockCount());
}
}

对于前端,这时也要进行修改了,因为点击秒杀商品按键后,这里考虑三种情况:排队等待、失败、成功。那么这里规定-1为失败,0为排队,1为秒杀成功已经写入数据库。

原来的detail.htm中秒杀事件函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
function doMiaosha(){
$.ajax({
url:"/miaosha/do_miaosha",
type:"POST",
data:{
goodsId:$("#goodsId").val(),
},
success:function(data){
if(data.code == 0){
window.location.href="/order_detail.htm?orderId="+data.data.id;
}else{
layer.msg(data.msg);
}
},
error:function(){
layer.msg("客户端请求有误");
}
});

}

秒杀到商品就直接返回,现在后端改为消息队列,所以需要增加函数进行判断,必要时需要轮询:

1
2
3
4
5
if(data.code == 0){
window.location.href="/order_detail.htm?orderId="+data.data.id;
}else{
layer.msg(data.msg);
}

所以将其改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//其他的部分省略
...
if(data.code == 0){
//window.location.href="/order_detail.htm?orderId="+data.data.id;
//秒杀到商品的时候,这个时候不是直接返回成功,后端是进入消息队列,所以前端是轮询结果,显示排队中
getMiaoshaResult($("#goodsId").val());
}else{
layer.msg(data.msg);
}
...



function getMiaoshaResult(goodsId) {
g_showLoading();
$.ajax({
url:"/miaosha/result",
type:"GET",
data:{
goodsId:$("#goodsId").val(),
},
success:function(data){
if(data.code == 0){
var result = data.data;
//失败--- -1
if(result <= 0){
layer.msg("对不起,秒杀失败!");
}
//排队等待,轮询--- 0
else if(result == 0){//继续轮询
setTimeout(function () {
getMiaoshaResult(goodsId);
},50);
}
//成功---- 1
else {
layer.msg("恭喜你,秒杀成功,查看订单?",{btn:["确定","取消"]},
function () {
window.location.href="/order_detail.htm?orderId="+result;
},
function () {
layer.closeAll();
}
);
}
}else{
layer.msg(data.msg);
}
},
error:function(){
layer.msg("客户端请求有误");
}
});
}

那么相应地,后台也要增加一个方法:result

1
2
3
4
5
6
7
8
9
@RequestMapping(value = "/result",method = RequestMethod.GET)
@ResponseBody
public Result<Long> result(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){
if(user == null)
return Result.error(CodeMsg.SESSION_ERROR);

long result = miaoshaService.getMiaoshaResult(user.getId(),goodsId);
return Result.success(result);
}

那么如何标记状态呢?这就是getMiaoshaResult方法所做的事情。

对于成功的状态判断,很简单,从数据库查,能查到就说明已经秒杀成功,否则就是两种情况:失败或者正在等待生成订单。

对于这两种状态,我们需要用redis来实现,思路是:在系统初始化的时候,redis中设置秒杀商品是否卖完的状态为false—即未卖完;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public long getMiaoshaResult(Long userId, long goodsId) {
MiaoshaOrder orderInfo = orderService.getMiaoshaOrderByUserIdGoodsId(userId,goodsId);
if(orderInfo != null){
return orderInfo.getId();
}else{
boolean isOver = getGoodsOver(goodsId);
if(isOver){
//库存已经没了
return -1;
}else{
//表示还没入库,继续等待结果
return 0;
}
}
}

MiaoshaService中的Miaosha方法:数据库减库存失败的话,说明数据库的库存已经小于0了,那么这个时候,立即将redis初始设置的秒杀商品是否卖完的状态为true,表示商品已经全部卖完,返回秒杀失败。否则就是要前端等待等待。

1
2
3
4
5
6
7
8
9
10
11
@Transactional
public OrderInfo miaosha(MiaoshaUser user, GoodsVo goods) {
//减库存、下订单、写入秒杀订单
boolean success =goodsService.reduceStock(goods);
if(success){
return orderService.createOrder(user,goods);
}else{
setGoodsOver(goods.getId());
return null;
}
}

对于两个小方法getGoodsOversetGoodsOver

1
2
3
4
5
6
7
private void setGoodsOver(long goodId){
redisService.set(MiaoshaKey.isGoodsOver,""+goodId,true);
}

private boolean getGoodsOver(long goodsId) {
return redisService.exists(MiaoshaKey.isGoodsOver,""+goodsId);
}

那么redis预减库存,然后消息队列来进行创建订单就实现了。

当然,对于redis预减库存这一点,还有要优化的地方,就是现在的do_miaosha接口是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@RequestMapping(value = "/do_miaosha",method = RequestMethod.POST)
@ResponseBody
public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){
if(user == null)
return Result.error(CodeMsg.SESSION_ERROR);

//1.预减库存进行优化
/*********************************优化1开始*************************************/
long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId);
if(stock < 0){
return Result.error(CodeMsg.MIAO_SHA_OVER);
}
/*********************************优化1结束*************************************/

//2.判断是否已经秒杀到了
MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId);
if(miaoshaOrder != null){
return Result.error(CodeMsg.REPEATE_MIAOSHA);
}
/*********************************优化2开始*************************************/
//3.进入消息队列
MiaoshaMessage message = new MiaoshaMessage();
message.setUser(user);
message.setGoodsId(goodsId);
sender.sendMiaoshaMessage(message);
/*********************************优化2结束*************************************/
return Result.success(0);//排队中
}

但是,当秒杀商品已经没了的时候,就没有必要再去redis中进行判断了,毕竟查询redis也是需要网络开销的,解决思路是:在内存中进行判断,如果redisService.decr得到的stock少于零的时候,直接将内存中的一个标志改变一下,那么下次再进入do_miaosha接口,先判断内存这个标记,如果库存已经小于0了,就不再访问redis,而是直接返回秒杀商品已经卖完。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@RequestMapping(value = "/do_miaosha",method = RequestMethod.POST)
@ResponseBody
public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){
if(user == null)
return Result.error(CodeMsg.SESSION_ERROR);

/***************************对redis预减库存再优化***************************
//内存标记,减少不必要的redis的访问
boolean over = localOverMap.get(goodsId);
if(over){
return Result.error(CodeMsg.MIAO_SHA_OVER);
}
*******************************************************************/

//预减库存进行优化
long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId);
if(stock < 0){
return Result.error(CodeMsg.MIAO_SHA_OVER);
}
//判断是否已经秒杀到了
MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId);
if(miaoshaOrder != null){
localOverMap.put(goodsId,true);
return Result.error(CodeMsg.REPEATE_MIAOSHA);
}
//进入消息队列
MiaoshaMessage message = new MiaoshaMessage();
message.setUser(user);
message.setGoodsId(goodsId);
sender.sendMiaoshaMessage(message);

return Result.success(0);//排队中
}

声明一个map

1
private Map<Long,Boolean> localOverMap = new HashMap<>();

那么在afterPropertiesSet这个系统加载的初始化方法中对这个map进行初始化,goodsId--stock

1
localOverMap.put(goods.getId(),false);

在原来的redis预减库存初,发现库存小于0 ,就改为true:

1
2
3
4
if(stock < 0){
localOverMap.put(goodsId,true);
return Result.error(CodeMsg.MIAO_SHA_OVER);
}

最后do_miaosha接口变为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@RequestMapping(value = "/do_miaosha",method = RequestMethod.POST)
@ResponseBody
public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){
if(user == null)
return Result.error(CodeMsg.SESSION_ERROR);

//内存标记,减少不必要的redis的访问
boolean over = localOverMap.get(goodsId);
if(over){
return Result.error(CodeMsg.MIAO_SHA_OVER);
}

//预减库存进行优化
long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId);
if(stock < 0){
localOverMap.put(goodsId,true);
return Result.error(CodeMsg.MIAO_SHA_OVER);
}
//判断是否已经秒杀到了
MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId);
if(miaoshaOrder != null){
return Result.error(CodeMsg.REPEATE_MIAOSHA);
}
//进入消息队列
MiaoshaMessage message = new MiaoshaMessage();
message.setUser(user);
message.setGoodsId(goodsId);
sender.sendMiaoshaMessage(message);

return Result.success(0);//排队中
}

ok,整个关于redis预减库存和rabbitMQ创建订单这个优化已经基本完成了。