8.Bus消息总线
为了实现依次redresh自动刷新所有服务的配置文件,所以需要引入消息总线进行消息的通知,本文主要采用kafka作为消息队列来实现,当然了,rabbitMQ也是比较简单的。
一、前言
注:本篇文章是基于spring boot 2.x
,主要参考 SpringCloud实战8-Bus消息总线 这篇文章而写。但是他的版本比较低,有一两个坑需要注意。
上一篇我们讲到,我们如果要去更新所有微服务的配置,在不重启的情况下去更新配置,只能依靠spring cloud config
了,但是,是我们要一个服务一个服务的发送post请求,我们能受的了吗?
虽然这比之前的没配置中心好多了,那但是我们如何继续避免挨个挨个的向服务发送Post请求来告知服务你的配置信息改变了,需要及时修改内存中的配置信息呢?
这时候我们就不要忘记消息队列的发布订阅模型。让所有为服务来订阅这个事件,当这个事件发生改变了,就可以通知所有微服务去更新它们的内存中的配置信息。这时Bus消息总线就能解决,你只需要在springcloud Config Server
端发出refresh
,就可以触发所有微服务更新了。
如下架构图所示:
根据此图我们可以看出利用Spring Cloud Bus做配置更新的步骤:
- 1、提交代码触发post给客户端A发送bus/refresh
- 2、客户端A接收到请求从Server端更新配置并且发送给Spring Cloud Bus
- 3、Spring Cloud bus接到消息并通知给其它客户端
- 4、其它客户端接收到通知,请求Server端获取最新配置
- 5、全部客户端均获取到最新的配置
Spring Cloud Bus
除了支持RabbitMQ
的自动化配置之外,还支持现在被广泛应用的Kafka
。在本文中,我们将搭建一个Kafka的本地环境,并通过它来尝试使用Spring Cloud Bus
对Kafka
的支持,实现消息总线的功能。
二、Kafka
Kafka
使用Scala
实现,被用作LinkedIn的活动流和运营数据处理的管道,现在也被诸多互联网企业广泛地用作为数据流管道和消息系统。
Kafak架构图如下:
Kafka
是基于消息发布/订阅模式实现的消息系统,其主要设计目标如下:
- 消息持久化:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
- 高吞吐:在廉价的商用机器上也能支持单机每秒100K条以上的吞吐量
- 分布式:支持消息分区以及分布式消费,并保证分区内的消息顺序
- 跨平台:支持不同技术平台的客户端(如:Java、PHP、Python等)
- 实时性:支持实时数据处理和离线数据处理
- 伸缩性:支持水平扩展
Kafka
中涉及的一些基本概念:
- Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。
- Topic:逻辑上同Rabbit的Queue队列相似,每条发布到Kafka集群的消息都必须有一个Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
- Partition:Partition是物理概念上的分区,为了提供系统吞吐率,在物理上每个Topic会分成一个或多个Partition,每个Partition对应一个文件夹(存储对应分区的消息内容和索引文件)。
- Producer:消息生产者,负责生产消息并发送到Kafka Broker。
- Consumer:消息消费者,向Kafka Broker读取消息并处理的客户端。
- Consumer Group:每个Consumer属于一个特定的组(可为每个Consumer指定属于一个组,若不指定则属于默认组),组可以用来实现一条消息被组内多个成员消费等功能。
可以从kafka
的架构图看到Kafka
是需要Zookeeper
支持的,你需要在你的Kafka
配置里面指定Zookeeper
在哪里,它是通过Zookeeper
做一些可靠性的保证,做broker
的主从,我们还要知道Kafka
的消息是以topic
形式作为组织的,Producers
发送topic
形式的消息,
Consumer
是按照组来分的,所以,一组Consumers
都会接收同样的topic
形式的消息。在服务端,它还做了一些分片,那么一个Topic
可能分布在不同的分片上面,方便我们拓展部署多个机器,Kafka
是天生分布式的。
首先是要下载对应的kafka:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz
解压之后进去/bin/windows/目录下:
首先启动zookeeper
:
1 | .\zookeeper-server-start.bat D:\kafka_2.11-2.1.0\config\zookeeper.properties |
如果出现错误:
1 | 命令语法不正确。 |
方法是:
首先我们进到下载好的Kafka目录中kafka_2.11-1.1.0\bin\windows 下编辑kafka-run-class.bat如下:
找到这条配置 如下:
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %*
可以看到%CLASSPATH%没有双引号,
因此用双引号括起来,不然启动不起来的,报你JDK没安装好,修改后如下:
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp “%CLASSPATH%” %KAFKA_OPTS% %*
后面启动kafka
:
1 | .\kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties |
这两者配置文件直接默认即可。启动成功之后,就放那吧!
消息总线
在上一章的spring-cloud-config-server
以及client
继续集成。spring boot
版本是2.0.3.RELEASE
.
第一步:spring cloud config
服务端和客户端(搞两个客户端)都要引入kafka
依赖,以config server
端为例:
1 | <dependencies> |
第二步:配置文件(服务端)
1 | server: |
注意,在1.x版本中一般配置
1 | #是否需要权限拉取,默认是true,如果不false就不允许你去拉取配置中心Server更新的内容 |
而在2.x版本中已经把这个改掉了。所以要注意。不配置的话会报错405.可以尝试去掉,用postman测试一把。
第三步:添加注解
在服务端和客户端的启动函数上都增加一条注解@RefreshScope
ok,至此,集成完毕。消息总线的功能就有了。
启动全部工程。修改git上的内容。然后发现客户端都没更新。下面启动postman来对config server
发送一条post请求:
1 | localhost:8085/actuator/bus-refresh |
再刷新浏览器,就会发现所有的客户端都自动更新了。我们也可以指定要刷新的客户端具体实例或者通配符符合的客户端。