最近研究离线批处理任务的加工,发现Spring Batch是一个非常不错的选择。其应用场景介于数据库存储过程与Hadoop大数据加工之间。可以处理日常的批处理任务,监控任务的执行情况。从现在开始,我们就慢慢深入研究下Spring Batch如何使用。
Spring Batch简单介绍
- Spring Batch的任务包含多个步骤Step,每个Step包含三步:Reader、Processor、Writer,其实我们主要关注的就是这三步。具体图如下:
- 名称解释
JobRepository : 管理所有Job操作
Job:某一个任务
JobInstance:某一个任务对应的实例(类似类的实例)
Step:步骤
环境说明
- 主要版本说明
Spring Batch 3.0.7.RELEASE (当前最新版本)
Spring Framework 4.0.5.RELEASE(当前Spring batch 所能支持的最高版本,再高就会报错哦)
官方的例子都是使用spring boot做DEMO,由于对Spring Boot不太了解,这里使用的都是直接Spring framework。
- 开发环境
STS
MAVEN
第一个Spring Batch例子
定义 Reader、Processor、Writer,他们分别实现ItemReader
、ItemProcessor
、ItemWriter
这三个接口。具体看代码如下:
- Reader 返回一个简单的字符串
package com.me.springbatch.hello;
import org.springframework.batch.item.ItemReader;
/**
* {@link ItemReader} with hard-coded input data.
*/
public class HelloItemReader implements ItemReader<String> {
private int index = 0;
/**
* 模拟读操作
*/
public String read() throws Exception {
if (index < 1000) {
index++;
return "Hello world!" + index;
} else {
return null;
}
}
}
- Processor 为字符串做一个简单修改
package com.me.springbatch.hello;
import org.springframework.batch.item.ItemProcessor;
public class HelloItemProcessor implements ItemProcessor<String, String> {
/**
* 模拟处理过程
*/
public String process(String item) throws Exception {
// TODO Auto-generated method stub
return item + " processed";
}
}
- Writer 通过Log输出处理后的字符串
package com.me.springbatch.hello;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ItemWriter;
/**
* Dummy {@link ItemWriter} which only logs data it receives.
*/
public class HelloItemWriter implements ItemWriter<Object> {
private static final Log log = LogFactory.getLog(HelloItemWriter.class);
/**
* 模拟写操作
*
* @see ItemWriter#write(java.util.List)
*/
public void write(List<? extends Object> data) throws Exception {
log.info(data);
}
}
三个接口实现都非常简单,只是简单模拟了下。这三个类如何组织成一个Job,通过XML简单配置即可。
<description>Hello Job</description>
<batch:job id="helloJob">
<batch:step id="helloStep">
<batch:tasklet>
<!-- commit-interval:事务批量提交数 -->
<batch:chunk reader="helloItemReader" processor="helloItemProcessor" writer="helloItemWriter"
commit-interval="1" />
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="helloItemReader" class="com.me.springbatch.hello.HelloItemReader"></bean>
<bean id="helloItemWriter" class="com.me.springbatch.hello.HelloItemWriter"></bean>
<bean id="helloItemProcessor" class="com.me.springbatch.hello.HelloItemProcessor"></bean>
这样,一个简单JOB就完成了。
执行Job如下(jobId即xml中为job起的id名称):
public static void run (String jobId) {
//通过应用程序上下文获得bean
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"spring-context.xml");
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
//任务
Job job = (Job) context.getBean(jobId);
log.debug("任务【"+jobId+"】开始执行");
long start = System.currentTimeMillis() ;
try {
//执行任务
JobExecution execution = jobLauncher.run(job, new JobParameters());
log.debug("任务【"+jobId+"】执行结果:"+execution.getStatus());
} catch (Exception e) {
e.printStackTrace();
}finally{
if(context != null){
context.close();
}
}
long end = System.currentTimeMillis() ;
log.debug("任务【"+jobId+"】执行完毕!共花费:"+(end - start) + "毫秒");
}