
我们上节进行了库存的扣减,解决了并发扣减库存问题和接口幂等性,操作在库存流水表中进行了记录,下面需要一个定时任务来定时将流水表中的数据同步到库存表中。
关于定时调度,我们接触过quartz,在单机中应用是完全没有问题的。虽然quartz也给我们提供了集群版本,但是是一种依赖数据库的解决方案。
本节首先尝试整合Elastic-Job,目标是让其每5秒钟打印一句话,所以也算是Elastic-Job入门demo。
根据官网整合spring-boot的demo的pom文件中,我们就可以大概知道要导入什么包。
一、相关依赖
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 
 | <dependency>
 <groupId>com.dangdang</groupId>
 <artifactId>elastic-job-lite-core</artifactId>
 </dependency>
 <dependency>
 <groupId>com.dangdang</groupId>
 <artifactId>elastic-job-lite-spring</artifactId>
 </dependency>
 
 <dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 </dependency>
 <dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-test</artifactId>
 </dependency>
 ...省略其他的spring-boot等依赖
 
 | 
二、配置文件
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 
 | spring:cloud:
 stream:
 kafka:
 binder:
 brokers: ${kafka.brokers.host}
 zk-nodes: ${zookeeper.host}
 auto-create-topics: true
 bindings:
 input:
 destination: dis-transation
 group: point-group
 <!--以上是关于spring config的内容,可以删除-->
 
 <!--自定义数据源-->
 datasource:
 stock:
 url: jdbc:mysql://127.0.0.1:3306/mama-buy-stock
 username: root
 password: root
 
 # 因为依赖于ZK,所以需要确定zk位置;下面是确定定时时间、分片数
 regCenter:
 serverList: ${zookeeper.host}
 namespace: elastic-job-lite-springboot
 stockJob:
 cron: 0/5 * * * * ?
 shardingTotalCount: 1 #只分一片
 shardingItemParameters: 0=nanjing
 
 | 
首先,按照以前的步骤创建mama-buy-schedule-service这个工程。

三、代码层面
1,按照官方的demo,首先关闭自动注入数据源:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class
 })
 public class MamaBuyScheduleServiceApplication {
 
 public static void main(String[] args) {
 SpringApplication.run(MamaBuyScheduleServiceApplication.class, args);
 }
 
 }
 
 | 
2,自定义数据源:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | @Configurationpublic class DataSourceConfig {
 
 @Bean("stockDataSource")
 @ConfigurationProperties(prefix = "spring.datasource.stock")
 public DataSource stockDataSource(){
 return DataSourceBuilder.create().build();
 }
 
 }
 
 | 
3,创建sqlSessionFactory和SqlSessionTemplate:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 
 | @Configuration@MapperScan(basePackages = {"com.njupt.swg.mamabuyscheduleservice.stock.dao"},sqlSessionFactoryRef = "stockSqlSessionFactory")
 public class SqlSessionConfig {
 @Autowired
 @Qualifier("stockDataSource")
 private DataSource stockDataSource;
 
 @Bean(name = "stockSqlSessionFactory")
 public SqlSessionFactory stockSqlSessionFactory() throws Exception {
 SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
 factoryBean.setDataSource(stockDataSource);
 return  factoryBean.getObject();
 }
 @Bean("stockSqlSessionTemplate")
 public SqlSessionTemplate stockSqlSessionTemplate () throws Exception {
 SqlSessionTemplate template = new SqlSessionTemplate(stockSqlSessionFactory());
 return template;
 }
 }
 
 | 
4,注册到ZK上:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | @Configuration@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
 public class JobRegistryCenterConfig {
 
 @Bean(initMethod = "init")
 public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
 return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
 }
 
 }
 
 | 
5,定义任务:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | public class StockSimpleJob implements SimpleJob {@Override
 public void execute(ShardingContext shardingContext) {
 System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, 当前分片项: %s",
 Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
 
 
 
 
 
 
 }
 }
 
 | 
6,任务配置类:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 
 | @Configurationpublic class StockJobConfig {
 
 @Autowired
 private ZookeeperRegistryCenter regCenter;
 
 @Bean
 public SimpleJob stockJob(){
 return new StockSimpleJob();
 }
 
 <!--直接copy-->
 @Bean(initMethod = "init")
 public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${stockJob.cron}") final String cron, @Value("${stockJob.shardingTotalCount}") final int shardingTotalCount,
 @Value("${stockJob.shardingItemParameters}") final String shardingItemParameters) {
 return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
 }
 
 
 
 
 
 private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
 final String cron,
 final int shardingTotalCount,
 final String shardingItemParameters){
 return LiteJobConfiguration
 .newBuilder(
 new SimpleJobConfiguration(
 JobCoreConfiguration.newBuilder(
 jobClass.getName(),cron,shardingTotalCount)
 .shardingItemParameters(shardingItemParameters)
 .build()
 ,jobClass.getCanonicalName()
 )
 )
 .overwrite(true)
 .build();
 
 }
 }
 
 | 
