Github: https://github.com/alibaba/DataX
clone: https://github.com/alibaba/DataX.git
本地下载后编译打包
./datax/taget/datax/bin目录下执行
命令行方式
python datax.py [job文件目录]/streamtest.json
控制台乱码处理
chcp 65001
demo json
streamtest
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
mysqltest
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"where": "id=5861",
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"splitPk": "id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:mysql://192.168.1.1:3306/db"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
java调用方式demo
public static void main(String[] args) {
String dataxPath = "C:\\project\\datax\\target\\datax\\datax\\bin\\";
String jobPath = "C:\\project\\datax\\target\\datax\\datax\\job\\";
String stream = "stream2stream.json";
String mysql = "mysqltest.json";
String job = jobPath + stream;
String[] arguments = new String[]{"py", dataxPath + "datax.py", job};
try {
Process process = Runtime.getRuntime().exec(arguments);
BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8));
String line = null;
while ((line = in.readLine()) != null) {
System.out.println(line);
}
in.close();
int re = process.waitFor();
System.out.println(re);
} catch (Exception e) {
e.printStackTrace();
}
}