说道Calcite你可能有些陌生, 但提及Hive、Kylin、Apache Drill、Flink等一定不会陌生,这些都是在我们日常工作中经常用到的,如上这些都是基于Calcite实现查询引擎,还有Druid和Storm也是使用它来实现SQL功能。按照官方的说法,Calcite是动态数据管理框架,这个解释理解起来有点抽象,通俗一点讲,要使用Calcite实现数据库,只需要关注存储引擎以及元数据管理,其他都交给Ca
lcite。可能这个说法有些不严谨,文档中还提到了Calcite不提供处理数据的算法,但Calcite-core和Calite-linq都提供了一些算子的实现对于一个简单的数据库足够了。
对于Calcite的详细介绍推荐大家看一遍文章,本文主要介绍Calcite如何使用,例如已经有一种数据格式的文件存储,如何利用Calcite快速实现SQL查询。我看过kylin、druid的Calcite应用,也是各不相同,这大概也是Calcite的魅力吧。
Calcite文档有一个指南,介绍使用CSV File作为数据存储格式实现SQL查询,掌握了以后我们可以照猫画虎造出一个其他数据格式的数据库,或者对学习kylin、druid的源码有帮忙,概括地讲,在这个例子使用以下技巧:
- 自定义Schema
- 自定义Table
- 决定Table的字段类型
- 使用ScannableTable实现简单的全表扫描
- 更高级一点的技巧,使用Filterable Table实现谓词下推
- 更酷一点的技巧,基于TranslateTable使用Rule实现逻辑表达式的转换
前四点是构建一个简单的,采用全表扫描的方式实现查询。5和6属于进阶内容,在案例中,使用Rule转换的方式实现了Project下推,和5实现的谓词下推是常用的SQL优化方式。下面由浅到深介绍这几项技巧。先来看前四项,完成一个简单的只能全表扫描的数据库。
首先在GitHub上下载Calcite的源码,看calcite-example-csv工程,在src/test/CSVTest中有各种场景的测试用例,例如
- testFilterableWhere是测试谓词下推
- testPushDownProject是Project下推
- testSelect是最简单的全表扫描
可以先跑一下测试用例感受一下Calcite的魅力,Calcite实现一个数据库,只需要关注存储引擎以及元数据管理。存储格式采用csv,一个CSV文件会映射成一个Table,需要注意的是CSV文件的第一行是Table的元数据信息,采用“FieldName1:FieldType,FieldNameN:FieldType”这样的格式存储,类似excel中的表头信息。以下是sales/SALES.csv的示例。
DEPTNO:int,NAME:string
10,"Sales"
20,"Marketing"
30,"Accounts"
至此介绍了存储格式以及元数据,接下来介绍如何使用。
第一步,创建一个json格式的mode文件,描述了如何创建Schema,可以参照test/resource目录下的model.json,
{
"version": "1.0",
"defaultSchema": "SALES",
"schemas": [
{
"name": "SALES",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
"operand": {
"directory": "sales"
}
}
]
}
在分析model文件之前,我们先了解几个重要的概念:
- Schema,是table和function的名称空间,它是一个可嵌套的结构,Schema还可以有subSchema,理论上可以无限嵌套,但一般不会这么做。Schema可以理解成Database,Database下面有table,这样就和传统数据库的概念联系起来了,在Calcite中,顶层的Schema是root,自定义的Schema是root的subSchema,同时还可以设置defaultSchema,类似我们使用数据库时,使用use database命令以后就不用再输入database名字前缀。
- Table,就很好理解了,就是数据库中的表。在table描述了字段名以及相应的类型、表的统计信息,例如表有多少条记录等等,这里先不展开讲。另外重要的是数据文件的存储以及如何扫描读取数据文件。
那么再去看这份model文件,就比较清晰明了。它描述了在数据库中有多少个Schema、每个Schema如何创建以及默认的Schema,这里的Schema可以理解成database。defaultSchema属性设置默认Schema,schemas是数组类型,每一项代表一个Schema描述信息,在描述信息中有一个关键的属性factory,它是创建Schema的工厂类,在这个例子中factory是org.apache.calcite.adapter.csv.CsvSchemaFactory,它实现了SchemaFactory接口。
要自实现只有全表扫描功能的简单数据库需要做如下几步:
- 自定义SchemaFactory
- 自定义Schema
- 自定义Table
- 自定义Enumerator
先看看SchemaFactory接口,它只有一个方法:
Schema create(
SchemaPlus parentSchema,
String name,
Map<String, Object> operand);
create用于创建Schema,其参数说明如下:
- parentSchema,他的父节点,一般为root
- name schema的名字,它在model中定义的
- operand,也是在mode中定义的,是Map类型,用于传入自定义参数。
在这个Model中,CSVSchemaFactory创建一个叫“SALES”的CSVSchema,它会把src/test/resources/sales下所有CSV文件构建成table。所以operand只许设定了一个参数directory,即读取CSV文件的根目录。CSVSchemaFactory的实现比较简单所以就不在展开分析,需要注意是的源码中flavor参数的处理,这个参数涉及优化进阶相关,这里先不用管,默认为SCANNABLE。
自定义Schema需要实现Schema接口,前面提过Schema是table和function的名称空间,其主要方法如下:
- Table getTable(String name),根据表名获取Table
- Set<String> getTableNames(),获取Schema下的所有表名集合
- Collection<Function> getFunctions(String name),根据函数名获取函数列表,和table不同,这里返回的是集合类型。
- Set<String> getFunctionNames(),或者所有的函数名集合。
CsvSchema->AbstractSchema->Schema,AbstractSchema重新设计了一个getTableMap方法,使用tableName->Table的Map结构存储所有table。这样设计的好处是getTable()能够快速查找。CSVSchema的实现也比较简单,遍历读取根目录下的每个
文件创建成表,因为上面的model.json中flavor没有设置,采用默认值SCANNABLE,创建成CsvScannableTable。
自定义Table是本文中最复杂的,先看下图:
如图可知,CSVScannableTable主要实现了两个接口ScannableTable和Table。右边部分,CSVTable实现了Table接口,它的作用是定义Table的字段以及字段类型,左侧的ScannableTable是实现如何遍历读取CSV文件的数据。Table接口有如下三个方法:
- RelDataType getRowType(RelDataTypeFactory typeFactory); 这个方法就是定义Table行记录的字段以及字段类型。
- Statistic getStatistic(); 获取统计信息
- Schema.TableType getJdbcTableType(); table的类型,table的类型有很多种,例如table和view。
AbstractTable默认实现了getStatistic和getJdbcTableType,所以我们只需要实现getRowType方法。首先需要定义type,规范我们这个数据库支持的数据类型。例如字符串是采用String还是VarChar,具体实现在CsvFieldType枚举类,它内部维护了一个Map结构用来存储type的
STRING(String.class, "string"),
BOOLEAN(Primitive.BOOLEAN),
BYTE(Primitive.BYTE),
CHAR(Primitive.CHAR),
//只列举部分类型
由如上代码可知,type并不都是标准的SQL Type,例如String。Calcite中设计了RelDataTypeFactory,不仅支持标准的SQL TYPE,也支持java类型以及Array、Map等集合类型。该实例中,RowType是一个StructType,是集合类型,类似c语言中的struct,非常适合存储行记录中字段名以及类型,这和Hive的方式是一样的。例如SALES文件中的
DEPTNO:int,NAME:string
则Type为
struct<DEPTNO:int,NAME:string>
在这个例子中通过读取csv文件的第一行来获取fieldName以及fieldType的,具体实现在CsvEnumerator的deduceRowType()方法。
在calcite中一般有两种执行模型,解释和编译,这一点类似Java。编译模式更好理解一些,会把逻辑执行计划通过字节码技术生成java code然后编译执行。解释模式则省掉生成代码编译的过程。关于解释执行。我看过一些基于Calcite的应用,大部分还是采用编译模式的,所以你看完这篇文章以后再去看其他使用calicite的项目,可能找不到熟悉的身影,如果table实现了如下三个接口之一,Calcite则会使用解释模式执行
- ScannableTable
- FilterableTable
- ProjectableFilterableTable
ScannableTable用于简单的全表扫描,FilterableTable用于谓词下推,ProjectableFilterableTable更酷一些既能支持谓词下推又能支持project下推。他们都有一个scan,但是参数不同
- ScannableTable
Enumerable<Object[]> scan(DataContext root);
- FilterableTable
Enumerable<Object[]> scan(DataContext root, List<RexNode> filters);
因为要做谓词下推,比ScannableTable多了filters。filters是where语句中的filter。
- ProjectableFilterableTable
Enumerable<Object[]> scan(DataContext root, List<RexNode> filters,
int[] projects);
又增加了projects,投影字段顺序的数组。
Enumerable支持linq和java的迭代器
//返回java的迭代器
Iterator<T> it = enumerable.iterator();
//LINQ风格的迭代器
Enumerator<T> enumerator =enumerable.enumerator();
要使用这两种迭代器之前,必须要实现它!AbstractEnumerable借助Linq4j实现了enumerator和iterator的转换
public Iterator<T> iterator() {
return Linq4j.enumeratorIterator(enumerator());
}
所以我们仅需实现enumerator方法。
Enumerator是Linq风格的迭代器,它有4个方法:
- current()
- moveNext()
- reset()
- close()
current返回游标所指的当前记录,需要注意的是current并不会改变游标的位置,这一点和iterator是不同的,在iterator相对应的是next方法,每一次调用都会将游标移动到下一条记录,current则不会,Enumerator是在调用moveNext方法时才会移动游标。moveNext方法将游标指向下一条记录,并获取当前记录供current方法调用,如果没有下一条记录则返回false。
CsvEnumerator是读取csv文件的迭代器,它还得需要一个RowConverter,因为csv中都是String类型,使用RowConverter转化成相应的类型。在moreNext方法中,有Stream和谓词下推filter部分的实现,在本文只关注如下几行代码:
final String[] strings = reader.readNext();
if (strings == null) {
current = null;
return false;
}
current = rowConverter.convertRow(strings);
return true;
至此,我们完成了使用csv文件存储的数据库全部工作,你可以在CsvTest中使用所有的名为“model”的模型进行测试,
checkSql("model", "select * from EMPS");
//smart模型的会在后续的文中介绍
checkSql("smart", "select name from EMPS");
总结一下:
- 创建模型,model.json
- 自定义SchemaFactory,CsvSchemaFactory
- 自定义Schema,CsvSchema
- 自定义Table,CsvTable、CsvScannableTable
- 自定义Enumerator,CsvEnumerator
分享的过程也是学习的过程,在写本文过程,也了解了不少以前自以为懂了的细节,但也有可能还存在不正确的认识,欢迎指正交流。微信号:zhl5919
参照资料: