springBootBatch + Mybatis + Mysql
演示内容:
数据源1的表数据>>>批量插入到>>>数据源2中
1. IDEA快速构建SpringBootBatch简单工程
2. 添加Mybatis,Mysql的pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
<version>5.1.46</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
3. 配置yml,配置两个数据源(1.batch 2.market)
#配置服务器
server:
port: 8081
#配置spring
spring:
datasource:
batch:
jdbcUrl: jdbc:mysql://127.0.0.1:3306/batch
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
market:
jdbcUrl: jdbc:mysql://127.0.0.1:3306/market
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
#配置mybatis
mybatis:
mapper-locations: classpath:mapping/*.xml
type-aliases-package: cn.xsxlq.batch.springbatch_simple.pojo
4. 根据yml配置的数据源,添加配置类(主数据源和副数据源)
1. 主数据源PrimaryDataSourceConfig.java
@Configuration
public class PrimaryDataSourceConfig {
@Bean(name = "batchDataSource")
@Primary
@ConfigurationProperties(prefix = "spring.datasource.batch")
public DataSource getDateSource1() {
return DataSourceBuilder.create().build();
}
@Bean(name = "batchSqlSessionFactory")
@Primary
public SqlSessionFactory batchSqlSessionFactory(@Qualifier("batchDataSource") DataSource datasource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(datasource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources("classpath*:mapping/*.xml"));
return bean.getObject();
}
@Bean("batchSqlSessionTemplate")
@Primary
public SqlSessionTemplate batchSqlsessiontemplate(
@Qualifier("batchSqlSessionFactory") SqlSessionFactory sessionfactory) {
return new SqlSessionTemplate(sessionfactory);
}
}
2. 第二数据源SecondDataSourceConfig.java
@Configuration
public class SecondDataSourceConfig {
@Bean(name = "marketDataSource")
@ConfigurationProperties(prefix = "spring.datasource.market")
public DataSource getDateSource2() {
return DataSourceBuilder.create().build();
}
@Bean(name = "marketSqlSessionFactory")
public SqlSessionFactory batchSqlSessionFactory(@Qualifier("marketDataSource") DataSource datasource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(datasource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources("classpath*:mapping/*.xml"));
return bean.getObject();
}
@Bean("marketSqlSessionTemplate")
public SqlSessionTemplate batchSqlsessiontemplate(
@Qualifier("marketSqlSessionFactory") SqlSessionFactory sessionfactory) {
return new SqlSessionTemplate(sessionfactory);
}
}
6. 工程结构
1. BatchConfig.java,sqlSessionTemplate取的是batch,读操作时使用该数据源
/**
* @author wangjs6
* @version 1.0
* @Description:
* @date: 2019/10/1 14:05
*/
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Resource(name = "batchSqlSessionTemplate")
private SqlSessionTemplate sqlSessionTemplate;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private BatchProcess batchWorldProcess;
@Autowired
private BatchWriter batchWorldWriter;
@Bean
public Job batchWorldJob(){
return jobBuilderFactory.get("batchWorldJob")
.start(batchWorldStep())
.build();
}
@Bean
public Step batchWorldStep(){
return stepBuilderFactory.get("batchWorldStep")
.chunk(Common.chunkCount)
.reader(batchWorldReader())
.processor(batchWorldProcess)
.writer(batchWorldWriter)
.build();
}
@Bean
@StepScope
public BatchReader batchWorldReader(){
return new BatchReader(sqlSessionTemplate);
}
}
2. BatchReader.java,继承MyBatisPagingItemReader简化分页操作
/**
* @author wangjs6
* @version 1.0
* @Description:
* @date: 2019/10/1 14:48
*/
public class BatchReader extends MyBatisPagingItemReader{
public BatchReader(SqlSessionTemplate sqlSessionTemplate){
setSqlSessionFactory(sqlSessionTemplate.getSqlSessionFactory());
setQueryId("cn.xsxlq.batch.mapping.ShopGoodsTypeMapper.selectList");
setPageSize(10);
}
}
<resultMap id="BaseResultMap" type="cn.xsxlq.batch.springbatch_simple.pojo.ShopGoodsType" >
<id column="goodsTypeId" property="goodsTypeId" jdbcType="INTEGER" />
<result column="goodsTypeName" property="goodsTypeName" jdbcType="VARCHAR" />
<result column="parentId" property="parentId" jdbcType="INTEGER" />
</resultMap>
<sql id="Base_Column_List" >
goodsTypeId, goodsTypeName, parentId
</sql>
<select id="selectCount" resultType="int">
select
count(1)
from shopgoodstype
</select>
<select id="selectList" resultMap="BaseResultMap" parameterType="Map" >
select
<include refid="Base_Column_List" />
from shopgoodstype
order by goodsTypeId
limit #{_pagesize} OFFSET #{_skiprows}
</select>
3.BatchProcess.java(按需求可以对每条数据进行处理,这里直接返回)
/**
* @author wangjs6
* @version 1.0
* @Description:
* @date: 2019/10/1 14:08
*/
@Component
public class BatchProcess implements ItemProcessor {
@Override
public Object process(Object item) throws Exception {
return item;
}
}
4. BatchWriter.java(sqlSessionTemplate取数据源market,将batch库的数据写入market库)
/**
* @author wangjs6
* @version 1.0
* @Description:
* @date: 2019/10/1 14:07
*/
@Component
public class BatchWriter implements ItemWriter {
@Resource(name = "marketSqlSessionTemplate")
private SqlSessionTemplate sqlSessionTemplate;
@Override
public void write(List items) throws Exception {
sqlSessionTemplate.insert("cn.xsxlq.batch.mapping.ShopGoodsTypeMapper.insertSelective",
(List<ShopGoodsType>)items);
}
}
<insert id="insertSelective" parameterType="List" >
insert into shopgoodstype(goodsTypeId, goodsTypeName, parentId)
values
<foreach collection ="list" item="shopGoodsType" index= "index" separator =",">
(
#{shopGoodsType.goodsTypeId}, #{shopGoodsType.goodsTypeName}, #{shopGoodsType.parentId}
)
</foreach >
</insert>
7. SpringBatch初始化脚本(必须)与执行结果:
-- Autogenerated: do not edit this file
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME NOT NULL,
START_TIME DATETIME DEFAULT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL DATETIME DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME DATETIME NOT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);