我们上节进行了库存的扣减,解决了并发扣减库存问题和接口幂等性,操作在库存流水表中进行了记录,下面需要一个定时任务来定时将流水表中的数据同步到库存表中。
关于定时调度,我们接触过quartz
,在单机中应用是完全没有问题的。虽然quartz
也给我们提供了集群版本,但是是一种依赖数据库的解决方案。
本节首先尝试整合Elastic-Job
,目标是让其每5秒钟打印一句话,所以也算是Elastic-Job
入门demo。
根据官网整合spring-boot的demo的pom文件中,我们就可以大概知道要导入什么包。
一、相关依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> </dependency>
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> </dependency> ...省略其他的spring-boot等依赖
|
二、配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| spring: cloud: stream: kafka: binder: brokers: ${kafka.brokers.host} zk-nodes: ${zookeeper.host} auto-create-topics: true bindings: input: destination: dis-transation group: point-group <!--以上是关于spring config的内容,可以删除--> <!--自定义数据源--> datasource: stock: url: jdbc:mysql://127.0.0.1:3306/mama-buy-stock username: root password: root
# 因为依赖于ZK,所以需要确定zk位置;下面是确定定时时间、分片数 regCenter: serverList: ${zookeeper.host} namespace: elastic-job-lite-springboot stockJob: cron: 0/5 * * * * ? shardingTotalCount: 1 #只分一片 shardingItemParameters: 0=nanjing
|
首先,按照以前的步骤创建mama-buy-schedule-service
这个工程。
三、代码层面
1,按照官方的demo,首先关闭自动注入数据源:
1 2 3 4 5 6 7 8 9 10
| @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class }) public class MamaBuyScheduleServiceApplication {
public static void main(String[] args) { SpringApplication.run(MamaBuyScheduleServiceApplication.class, args); }
}
|
2,自定义数据源:
1 2 3 4 5 6 7 8 9 10
| @Configuration public class DataSourceConfig {
@Bean("stockDataSource") @ConfigurationProperties(prefix = "spring.datasource.stock") public DataSource stockDataSource(){ return DataSourceBuilder.create().build(); }
}
|
3,创建sqlSessionFactory
和SqlSessionTemplate
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Configuration @MapperScan(basePackages = {"com.njupt.swg.mamabuyscheduleservice.stock.dao"},sqlSessionFactoryRef = "stockSqlSessionFactory") public class SqlSessionConfig { @Autowired @Qualifier("stockDataSource") private DataSource stockDataSource;
@Bean(name = "stockSqlSessionFactory") public SqlSessionFactory stockSqlSessionFactory() throws Exception { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(stockDataSource); return factoryBean.getObject(); } @Bean("stockSqlSessionTemplate") public SqlSessionTemplate stockSqlSessionTemplate () throws Exception { SqlSessionTemplate template = new SqlSessionTemplate(stockSqlSessionFactory()); return template; } }
|
4,注册到ZK上:
1 2 3 4 5 6 7 8 9 10
| @Configuration @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0") public class JobRegistryCenterConfig {
@Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) { return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); }
}
|
5,定义任务:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class StockSimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, 当前分片项: %s", Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
} }
|
6,任务配置类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @Configuration public class StockJobConfig {
@Autowired private ZookeeperRegistryCenter regCenter;
@Bean public SimpleJob stockJob(){ return new StockSimpleJob(); }
<!--直接copy--> @Bean(initMethod = "init") public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${stockJob.cron}") final String cron, @Value("${stockJob.shardingTotalCount}") final int shardingTotalCount, @Value("${stockJob.shardingItemParameters}") final String shardingItemParameters) { return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters)); }
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters){ return LiteJobConfiguration .newBuilder( new SimpleJobConfiguration( JobCoreConfiguration.newBuilder( jobClass.getName(),cron,shardingTotalCount) .shardingItemParameters(shardingItemParameters) .build() ,jobClass.getCanonicalName() ) ) .overwrite(true) .build();
} }
|