我们知道StreamingPro 是一个完全SQL/Script化的,基于Spark平台的一套生产力工具。但是不可避免的,我们可能希望直接操作SqlContext或者使用原生的DataFrame API。 这里我们通过script 让大家支持这个功能:
{
"name": "batch.script.df",
"params": [
{
"script": "context.sql(\"select a as t from test\").registerTempTable(\"finalOutputTable\")",
"source": "-"
}
]
}
在这个模块里,你可以访问任何一张已经注册的表。并且经过处理后注册一张新的表。给了大家无线的灵活性。
如果source 设置为file,script 填写的是文件路径的话,那么就不用在json文件里写脚本了。
这里给大家一个完整的例子:
{
"batch-console": {
"desc": "测试",
"strategy": "spark",
"algorithm": [],
"ref": [],
"compositor": [
{
"name": "batch.sources",
"params": [
{
"path": "file:///tmp/hdfsfile/abc.txt",
"format": "json",
"outputTable": "test"
}
]
},
{
"name": "batch.script.df",
"params": [
{
"script": "context.sql(\"select a as t from test\").registerTempTable(\"finalOutputTable\")",
"source": "-"
}
]
},
{
"name": "batch.outputs",
"params": [
{
"name":"jack",
"format": "console",
"path": "-",
"inputTableName": "finalOutputTable",
"mode":"Overwrite"
}
]
}
],
"configParams": {
}
}
}
另外,对于输出,我们也可以控制文件数目:
{
"name": "batch.outputs",
"params": [
{
"name": "jack",
"format": "json",
"path": "file:///tmp/batch-console",
"outputFileNum": "3",
"inputTableName": "finalOutputTable",
"mode": "Overwrite"
}
]
}
其中 outputFileNum 就是你最后的输出文件数。你也可以通过batch.script.df 模块控制输出的文件数。无非就是df.repartion(3).registerTable("finalOutputTable") 即可达成。
下载地址:StreamingPro