Java调用Kettle6的transaction和job

在进行复杂数据传输时,特别是异构数据库的多表数据传输,我们经常会用到ETL工具来完成。Kettle是一个典型的ETL工具且使用广泛。由于Kettle功能强大且复杂,对于java开发人员来说无疑增加了项目运维的难度和复杂度。因此将Kettle的集成到Java项目中可以大大降低项目的开发难度和开发效率,同时也降低了运维复杂度。网上大多关于Kettle集成的中文资料都是基于Kettle4.0之前的。以下是根据6.0官方文档及网上相关资料开发的几个Demo如有错误还望及时指出!

1、jar包引用

文件安装目录的data-integration\lib文件夹下有很多jar包,可以根据实际需要进行添加。经过个人测试以下几个是必要的:

avalon-framework-4.1.5.jar;

commons-codec-1.9.jar;

commons-collections-3.2.1.jar;

commons-io-2.1.jar;

commons-lang-2.5.jar;

commons-logging-1.1.3.jar;

commons-vfs2-2.1-20150824.jar;

guava-17.0.jar;

jug-lgpl-2.0.0.jar;

kettle-core-6.0.1.0-386.jar;

kettle-dbdialog-6.0.1.0-386.jar;

kettle-engine-6.0.1.0-386.jar;

kettle-ui-swt-6.0.1.0-386.jar;

metastore-6.0.1.0-386.jar;

ognl-2.6.9.jar;

scannotation-1.0.2.jar

maven:

    <repository>
        <id>pentaho-releases</id>
        <url>http://repository.pentaho.org/artifactory/repo/</url>
     </repository>

    <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>kettle-core</artifactId>
            <version>6.1.0.4-225</version>
        </dependency>
         <dependency>
             <groupId>com.verhas </groupId>
             <artifactId>license3j</artifactId>
             <version>1.0.7</version>
         </dependency>
        <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>kettle-dbdialog</artifactId>
            <version>6.1.0.4-225</version>
        </dependency>
        <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>kettle-engine</artifactId>
            <version>6.1.0.4-225</version>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>metastore</artifactId>
            <version>6.1.0.4-225</version>
        </dependency>
        <!--使用Block组件,因为如果行超过所设的Memory Size就会缓存到磁盘上需要用到下面的序列化列-->
        <dependency>
            <groupId>org.safehaus.jug</groupId>
            <artifactId>jug</artifactId>
            <version>2.0.0</version>
            <!-- the classifer is important!! -->
            <classifier>lgpl</classifier>
        </dependency>

2、Java创建transaction


/**

* Creates a new Transformation using input parameters such as the tablename to read from.

* @param transformationName transformation的名称

* @param sourceDatabaseName 输入的 database 名称

* @param sourceTableName 要读取的表名

* @param sourceFields 要读取的列名

* @param targetDatabaseName 目标database名

* @param targetTableName要写入的表名

* @param targetFields要写入的列名(要跟读取的列长度相同)

* @return A new transformation metadata object

* @throws KettleException In the rare case something goes wrong

*/

public static final TransMeta buildCopyTable(String transformationName,

String sourceDatabaseName, String sourceTableName,

String[] sourceFields, String targetDatabaseName,

String targetTableName, String[] targetFields,

DatabaseMeta[] databases)

throws KettleException {


EnvUtil.environmentInit();


try

{

// Create a new transformation...

TransMeta transMeta = new TransMeta();

transMeta.setName(transformationName);


// 添加数据库连接

for (int i = 0; i < databases.length; i++) {

DatabaseMeta databaseMeta = databases[i];

transMeta.addDatabase(databaseMeta);

}


DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);

DatabaseMeta targetDBInfo  = transMeta.findDatabase(targetDatabaseName);


//添加注释

String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR;

note += "After that, it writes the information to table ["+ targetTableName + "] on database [" + targetDBInfo + "]";

NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);

transMeta.addNote(ni);


// 创建读取数据源的 step...

String fromstepname = "read from [" + sourceTableName + "]";

TableInputMeta tii = new TableInputMeta();

tii.setDatabaseMeta(sourceDBInfo);

String selectSQL = "SELECT " + Const.CR;


for (int i = 0; i < sourceFields.length; i++) {

if (i > 0) selectSQL += ", "; else selectSQL += " ";

selectSQL += sourceFields[i] + Const.CR;

}

selectSQL += "FROM " + sourceTableName;

tii.setSQL(selectSQL);


PluginRegistry registry = PluginRegistry.getInstance();


String fromstepid = registry.getPluginId(tii);

StepMeta fromstep = new StepMeta(fromstepid, fromstepname,(StepMetaInterface) tii);

fromstep.setLocation(150, 100);

fromstep.setDraw(true);

fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]");

transMeta.addStep(fromstep);


// 添加 重命名 fields的逻辑

// Use metadata logic in SelectValues, use SelectValueInfo...

SelectValuesMeta svi = new SelectValuesMeta();

svi.allocate(0, 0, sourceFields.length);


for (int i = 0; i < sourceFields.length; i++) {

svi.getSelectName()[i] = sourceFields[i];

svi.getSelectRename()[i] = targetFields[i];

}


String selstepname = "Rename field names";

String selstepid = registry.getPluginId(svi);

StepMeta selstep = new StepMeta(selstepid, selstepname, (StepMetaInterface) svi);

selstep.setLocation(350, 100);

selstep.setDraw(true);

selstep.setDescription("Rename field names");

transMeta.addStep(selstep);


TransHopMeta shi = new TransHopMeta(fromstep, selstep);

transMeta.addTransHop(shi);

fromstep = selstep;


// 创建 写数据的 step...


// 添加 输出表 step...

String tostepname = "write to [" + targetTableName + "]";

TableOutputMeta toi = new TableOutputMeta();

toi.setDatabaseMeta(targetDBInfo);

toi.setTablename(targetTableName);

toi.setCommitSize(200);

toi.setTruncateTable(true);


String tostepid = registry.getPluginId(toi);

StepMeta tostep = new StepMeta(tostepid, tostepname, (StepMetaInterface) toi);

tostep.setLocation(550, 100);


tostep.setDraw(true);

tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");

transMeta.addStep(tostep);


// 添加连线...

TransHopMeta hi = new TransHopMeta(fromstep, tostep);

transMeta.addTransHop(hi);


// The transformation is complete, return it...

return transMeta;

} catch (Exception e) {

throw new KettleException("An unexpected error occurred creating the new transformation", e);

}

}

3、Java运行Kettle的transaction:


/**

* 运行转换文件方法

* @param params 多个参数变量值

* @param ktrPath 转换文件的路径,后缀ktr

*/

public static void runTransfer(String[] params, String ktrPath) {

Trans trans = null;

try {

// 初始化

// 转换元对象

KettleEnvironment.init();

EnvUtil.environmentInit();

TransMeta transMeta = new TransMeta(ktrPath);

// 转换

trans = new Trans(transMeta);


// 执行转换

trans.execute(params);

// 等待转换执行结束

trans.waitUntilFinished();

// 抛出异常

if (trans.getErrors() > 0) {

throw new Exception(

"There are errors during transformation exception!(传输过程中发生异常)");

}

} catch (Exception e) {

e.printStackTrace();

}

}

4、Java运行Kettle的Job:

/**

* java 调用 kettle 的job

*

* @paramjobPath

*

*/

public static void runJob(String[] params, String jobPath) {

try {

KettleEnvironment.init();

//jobPath是Job脚本的路径及名称

JobMeta jobMeta = new JobMeta(jobPath, null);

Job job = new Job(null, jobMeta);

// 向Job 脚本传递参数,脚本中获取参数值:${参数名}

// job.setVariable(paraname, paravalue);

job.setVariable("id", params[0]);

job.setVariable("dt", params[1]);

job.start();

job.waitUntilFinished();

if (job.getErrors() > 0) {

throw new Exception(

"There are errors during job exception!(执行job发生异常)");

}

} catch (Exception e) {

e.printStackTrace();

}

}

注:

1、在Kettle连接SqlServer数据库时建议使用开源的jtds数据库jar包,微软官方jar包不受支持。

2、个人建议使用项目中的调度框架(如quartz、Spring的schedule等)调用transaction来实现定时执行,可以更灵活的控制我们的Job。

3、Kettle有强大的图形化设计器,transaction的创建建议在Kettle中进行。

顺便附上实现后的系统界面样例

clipboard.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容