前篇传送门:https://www.jianshu.com/p/5ffd8730aad8
目录
- Prologue(见前篇)
- Kudu的初衷(见前篇)
- 集群架构与共识保证(见前篇)
- 表与分区的设计(见前篇)
- 底层存储设计细节(见前篇)
- 事务与数据一致性
- 与Impala、Spark集成
- Benchmarking
- 当前的主要不足
- 简单调优方法
事务与数据一致性
Kudu支持单行事务,但不支持多行事务(Kudu中对多行操作不满足ACID原则中的原子性),也不支持事务回滚,这点与HBase是相同的。
前面已经提到过,Kudu采用与关系数据库类似的多版本并发控制(MVCC)机制来实现事务隔离,通过为数据添加时间戳的方式实现。该时间戳不能在写入时由用户添加,但可以在执行读取(Scan)操作时指定,这样就可以读取到历史数据(UndoFile中的数据)。Kudu提供两种读模式:read-latest和read-at-snapshot,分别对应读取当前的快照以及按时间戳读取历史快照。
对于写操作而言,Kudu也提供了两种一致性模型:快照一致性(snapshot consistency)和外部一致性(external consistency)。下面来分析一下它们。
快照一致性比较简单,只保证当前执行写操作的客户端能看到自己提交的最新数据,而不保障跨客户端的可见性。它是Kudu默认的一致性模型,一般情况下都够用。但是特殊情况也同样存在:考虑用Kudu作为点击流数仓的情景,客户端A在某时刻写入了点击事件x,客户端B紧随其后写入事件y,并且这两个事件之间具有关联性。要想让所有客户端都能达到外部一致性(及时取到最新数据),必须手动将写操作完成后产生的时间戳传播(propagate)到其他客户端上,这种方式在Kudu中叫client-propagated。
很显然,client-propagated方案需要频繁地交换时间戳,其overhead比较高,所以Kudu也借鉴了Google Spanner的思路,实现了commit-wait一致性。
我们已经可以发现,保证外部一致性的重点在于事务的版本号(时间戳)必须足够准,并且每台服务器的时间都要保持精确的同步。Google Spanner提出的时间同步方案叫做TrueTime,需要原子钟等硬件的支持,可以将对时间的认知误差控制在±4ms之内。但Kudu集群都是建立在普通商用服务器上的,所以只能靠NTP和算法近似实现,该算法名为HybridTime,不详细展开了,看官可以参考论文《Technical Report: HybridTime - Accessible Global Consistency with High Clock Uncertainty》。
下图粗浅地示出commit-wait机制的原理。
当一个事务获取到锁并开始执行时,它会先生成自己的时间戳,再开始事务操作。当事务执行完之后,还必须要保证后发生的事务时间戳不能比自己的时间戳小,因此最终要等待2倍的误差时间,才能结束本次事务并释放锁。
与Impala、Spark集成
Kudu本身并没有SQL外壳,仅仅提供了Java和C++ API。但是Kudu和查询引擎Impala可以近乎无缝地结合在一起,为Kudu提供SQL能力。下面的简图示出用Impala SQL对Kudu表执行简单查询的流程。
可见,在Impala端会解析SQL语句并生成查询计划,然后作为客户端去连接Kudu集群,执行增删改查操作。关于Kudu与Impala的集成和查询方法,官方文档已经写得非常详细,不再赘述。
相对而言,我们更多地是编写Spark程序来执行一些对Kudu表数据的复杂分析任务。Maven上已经有Kudu与Spark的connector包,其坐标如下。
<!-- scala.bin.version: 2.11, kudu.version: 1.5.0 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_${scala.bin.version}</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2-tools_${scala.bin.version}</artifactId>
<version>${kudu.version}</version>
</dependency>
引入依赖之后,就可以用Spark SQL以及KuduContext来操作Kudu表了,一个简单的示例代码如下。
import org.apache.kudu.client._
import collection.JavaConverters._
// Read a table from Kudu
val df = spark.read
.options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> "kudu_table"))
.format("kudu").load
// Query using the Spark API...
df.select("id").filter("id >= 5").show()
// ...or register a temporary table and use SQL
df.createOrReplaceTempView("kudu_table")
val filteredDF = spark.sql("select id from kudu_table where id >= 5").show()
// Use KuduContext to create, delete, or write to Kudu tables
val kuduContext = new KuduContext("kudu.master:7051", spark.sparkContext)
// Create a new Kudu table from a DataFrame schema
// NB: No rows from the DataFrame are inserted into the table
kuduContext.createTable(
"test_table", df.schema, Seq("key"),
new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("key").asJava, 3))
// Insert data
kuduContext.insertRows(df, "test_table")
// Delete data
kuduContext.deleteRows(filteredDF, "test_table")
// Upsert data
kuduContext.upsertRows(df, "test_table")
// Update data
val alteredDF = df.select("id", $"count" + 1)
kuduContext.updateRows(filteredRows, "test_table")
// Data can also be inserted into the Kudu table using the data source, though the methods on
// KuduContext are preferred
// NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert
// in the options map
// NB: Only mode Append is supported
df.write
.options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table"))
.mode("append")
.format("kudu").save
// Check for the existence of a Kudu table
kuduContext.tableExists("another_table")
// Delete a Kudu table
kuduContext.deleteTable("unwanted_table")
需要注意的是,Spark on Kudu不支持有大写字母和非ASCII字符的表名、列名,必须预先处理。另外,不等于(<>)和or谓词不会下推给Kudu,而是由Spark任务来处理。like谓词同理,当有通配符时,只有以通配符结尾的语句(如like 'some%'
)才会下推给Kudu。
Benchmarking
在TPC-H数据集上进行测试,Impala on Kudu的查询时间比Impala on HDFS (Parquet) 平均缩短了三成。
使用TPC-H中的lineitem表(原始数据大小约62GB)进行Impala on Kudu与Phoenix on HBase的对比测试,包括数据的载入与4种查询。Phoenix on HBase的表划分为100个哈希分区,Kudu表划分为100个Tablet。
测试结果如下。
可见,Phoenix on HBase的方案只有在基于RowKey的查询时有性能优势,并且领先幅度不大。而Impala on Kudu在执行基于列的查询和全表扫描时,效率远远高于HBase。当然,这与HBase偏OLTP的设计思想有关,并不能说明Kudu可以完全取代HBase。
另外,论文中还用了雅虎的YCSB数据集测试随机读写能力。
结果如下,整体上看,Kudu的随机读写与HBase相比都或多或少地落后,其中zipfian数据集(符合Zipf's Law,即长尾分布)上的差距比较大,而uniform数据集(符合均匀分布)上的差距比较小。这也是自然的,要想兼顾OLAP的效率,必然要在OLTP方面做出一些牺牲。Kudu也在持续优化随机读写,不过那是新版的事情了。
当前的主要不足
Kudu现在可以基本满足我们对于OLTP+OLAP混合型分析的需求,但是它毕竟还年轻,采用的设计方案也较新,因此不可避免地还存在一些短板,在实际使用时需要提前避开一些坑。以我们生产环境中部署的1.5版本举例如下:
- 一行的主键组的值不能修改。如果想修改主键,就必须把该行删掉并新插入一行,但这样就无法保证原子性。
- 数据类型相对稀少,不支持所有复杂结构(map、struct等)。数据类型、是否允许为空、压缩编码等属性在列创建后都不能更改。
- 无法像HBase一样手动触发Compaction过程,无法在TServer间做数据均衡,表中已有的数据无法重新分区。
- 不能随意添加或者删除Kudu数据的存储目录,想要更改的话必须格式化所有目录,再进行迁移。
- 不支持像ElasticSearch一样的滚动重启。如果要从单个Master的部署切换到多个Master,必须手动操作,步骤非常复杂,容易出错。
- TServer的总数据量和Tablet的数量都不能过大,官方给出的单节点最大承受值是8TB、2000个Tablet。但在我们的实践中,数据量只达到上述的一半,整个集群重启就几乎起不来了。
简单调优方法
我们的Kudu服务与Hadoop基础服务和Impala一起部署在10个节点上(每个节点双路E5 12C/24T,256G RAM,6TB SAS HDD),3个Master,10个TServer。以下是我们根据集群实际情况对一些主要参数进行的调优:
memory_limit_hard_bytes
该参数是单个TServer能够使用的最大内存量。如果写入量很大而内存太小,会造成写入性能下降。如果集群资源充裕,可以将它设得比较大,比如单台服务器内存总量的一半。我们设定为32GB。
官方也提供了一个近似估计的方法,即:每1TB实际存储的数据约占用1.5GB内存,每个副本的MemRowSet和DeltaMemStore约占用128MB内存,(对多读少写的表而言)每列每CPU核心约占用256KB内存,另外再加上块缓存,最后在这些基础上留出约25%的余量。block_cache_capacity_mb
Kudu中也设计了BlockCache,不管名称还是作用都与HBase中的对应角色相同。默认值512MB,经验值是设置1~4GB之间,我们设了4GB。memory.soft_limit_in_bytes/memory.limit_in_bytes
这是Kudu进程组(即Linux cgroup)的内存软限制和硬限制。当系统内存不足时,会优先回收超过软限制的进程占用的内存,使之尽量低于阈值。当进程占用的内存超过了硬限制,会直接触发OOM导致Kudu进程被杀掉。我们设为-1,即不限制。maintenance_manager_num_threads
单个TServer用于在后台执行Flush、Compaction等后台操作的线程数,默认是1。如果是采用普通硬盘作为存储的话,该值应与所采用的硬盘数相同。max_create_tablets_per_ts
创建表时能够指定的最大分区数目(hash partition * range partition),默认为60。如果不能满足需求,可以调大。follower_unavailable_considered_failed_sec
当Follower与Leader失去联系后,Leader将Follower判定为失败的窗口时间,默认值300s。max_clock_sync_error_usec
NTP时间同步的最大允许误差,单位为微秒,默认值10s。如果Kudu频繁报时间不同步的错误,可以适当调大,比如15s。
The End
谢谢食用~