Amoro简介,部署配置和使用

Amoro简介

Amoro 是一个与数据湖仓一体管理和优化相关的技术平台,致力于提升数据湖和数据仓库的性能与管理效率。Amoro 是构建在 Apache Iceberg、Pimon 等开放数据湖表格式之上的湖仓管理系统。具备自动优化机制,可以持续监控数据表上的文件状况。适用于需要进行大量数据分析和查询的场景,如企业数据仓库、大数据分析项目等。
本篇基于本人的使用情况,为大家分享Amoro的部署安装,简单使用和注意事项。

环境信息

  • Amoro:0.6.1
  • Flink:1.17.1
  • Spark:3.3.0
  • Hadoop:3.1.1
  • MySQL:5.7.x

安装

下载并解压Amoro到服务器任意目录。
下载链接:Download (apache.org)
Flink和Spark访问Amoro的运行时依赖也在此链接下载。

Amoro需要使用RMDBS作为元数据存储。这里我们以MySQL为例。下载mysql-connector-java到Amoro的lib目录。

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar

使用MySQL命令行创建amoro数据库,并创建一个用户,赋予amoro数据库的所有权限。(下面为了演示方便,直接使用root账户,生产环境不要这么使用)。

修改Amoro配置文件(conf/config.yaml),增加MySQL相关配置。例如:

ams:
  database:
    type: mysql
    jdbc-driver-class: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/amoro?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&useAffectedRows=true&useSSL=false
    username: root
    password: root

最后执行bin/ams.sh start启动Amoro服务。

此外Amoro还支持HA(高可用)模式。Amoro的高可用为一主多从,需要依赖外部Zookeeper集群进行主节点选举。HA的相关配置如下:

ams:
  ha:
    enabled: true  # 开启HA模式
    cluster-name: default # 指定集群名字。Amoro允许同一个Zookeeper集群负责多个Amoro集群的选主,这些Amoro集群需要使用cluster-name来区分
    zookeeper-address: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 # Zookeeper服务的地址和端口号

表格式

Amoro支持的格式和特性记录如下所示。

  • Iceberg。不允许指定主键。
  • Mixed Iceberg。除了iceberg 之外还包含logstore。Logstore为append only,可加速流式数据消费过程。允许指定主键。在minor optimize运行的时候去重。Minor Optimize运行之前可查询到upsert之前的数据,存在重复。Minor Optimize之后历史数据消失。
  • Mixed Hive。使用Hive作为base store,iceberg作为change store。

使用

Catalog配置

创建Optimizer Group

和Hudi类似,Amoro的iceberg表在写入的时候会产生一系列的碎片文件。会存在大量的冗余和过期的数据。如果不定期优化表的话,表服务性能会严重降低,甚至不可用。
Optimizer是Amoro专门为表优化设计的服务。可以类比为Hudi的compaction和clean等服务。相比Hudi的优点是Amoro支持这些服务的可视化管理。这些服务的资源隔离,资源消耗,运行状态等可以通过Amoro管理界面查看和操作。
在Amoro中,一个Optimizer Group代表了一部分资源。Catalog和Optimizer Group存在1对多的关系。通过这种关系可以实现Optimizer资源的共享和隔离。
在创建Catalog之前需要先创建optimizer group。打开左侧菜单的Optimizing,点击右侧的Optimizer Groups标签。然后点击Add Group按钮。输入名字,选择一个之前配置的container。主要注意的是属性必须配置memory,value的单位是MB。它代表了一个并行度占用的内存大小。
创建完毕之后点击Operation中的Scale-Out,指定并行度之后点击确定。最后在Optimizers标签中可以看到正在运行的Optimizer。

创建Catalogs

打开左侧菜单的Catalogs,点击Catalog List中的+按钮。主要指定的配置项为:

  • Name: Catalog的名字。
  • Type:使用内部Catalog还是外部Catalog。如果使用外部Hive metastore,需要上传hive-site.xml
  • Table Format:Catalog中表的格式,对于内部Catalog目前支持的是Mixed Iceberg和Iceberg。
  • Optimizer Group:运行表优化服务的Optimizer Group(可以理解为资源池),选择上一步创建好的optimizer。
  • Storage的Type:存储类型。目前支持Hadoop和S3。对于Hadoop类型,需要上传Hadoop的core-site.xmlhdfs-site.xml两个文件。
  • Authentication的Type:认证类型。SIMPLE类型需要提供Hadoop的用户名。KERBEROS类型需要提供Principal,Keytab文件和krb5.conf文件。
  • Properties:属性设置。必须的属性配置为warehouse。含义为本Catalog下表数据在分布式存储中的路径。如果使用的是External Catalog,无需配置warehouse,例如配置Hive Metastore,表文件所在的位置为hdfs://ip:port/warehouse/tablespace/managed/hive。相当于Hive managed table。
  • Table Properties:如果该Catalog下所有表具有相同的属性配置,可以将这些配置写在此处。

Terminal操作Amoro

为方便用户轻量级使用,Amoro管理页面提供了一个可视化终端。该终端后台使用的是Spark。

创建数据库:

create database default;

创建表:

create table test1 (id int, data string, ts timestamp);

注意
使用界面Terminal的时候,建表语句不要使用use arctic

其余操作和使用Spark SQL完全相同。

Flink操作Amoro

首先需要引入依赖。添加如下jar包到$FLINK_HOME/lib目录中:

  • amoro-flink-runtime-1.17-0.6.1.jar
  • flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
  • httpclient5-5.3.1.jar
  • httpcore5-5.1.5.jar
  • httpcore5-h2-5.2.4.jar

需要注意的是,如果缺少httpclient5和httpcore5相关依赖,执行创建catalog语句的时候会出现类似如下错误:

java.lang.NoClassDefFoundError: org/apache/hc/core5/http/ParseException

java.lang.NoClassDefFoundError: org/apache/hc/client5/http/classic/methods/HttpUriRequest

配置flink-conf.yaml,增加checkpoint配置。只有触发checkpoint的时候,数据才写入到文件。否则写入的数据不可见。

execution.checkpointing.interval: 10s

接下来进入Flink SQL。执行如下命令创建Catalog:

create catalog hdfs_iceberg with (
    'type' = 'arctic', 
    'metastore.url' = 'thrift://amoro_ip:1260/hdfs_iceberg', 
    'default-database' = 'default'
);

命令执行成功。接下来我们可以按照Flink SQL的使用方式创建Amoro表和插入查询表数据。只要操作的表位于上面例子中的hdfs_iceberg catalog 范围内,会自动被Amoro纳管。

如果配置了Amoro HA,metastore.url需要配置为:zookeeper://{zookeeper-server}/{cluster-name}/{catalog-name}

注意
如果Flink连接的catalog使用Hive metastore。需要将hive-exec-xxx.jar复制到$Flink_HOME/lib目录中。这种情况Flink只能使用JDK 8运行,否则会报错。

创建表:

CREATE TABLE `hdfs_iceberg`.`default`.`flink_test` (
    id BIGINT,
    name STRING,
    op_time TIMESTAMP
);

在不创建catalog的情况下直接创建表:

CREATE TABLE `test_table` (
    id BIGINT,
    name STRING,
    op_time TIMESTAMP
) WITH (
    'connector' = 'arctic',
    'metastore.url' = 'thrift://amoro_ip:1260/',
    'arctic.catalog' = 'hdfs_iceberg',
    'arctic.database' = 'default',
    'arctic.table' = 'test_table'
);

注意
在Flink sql-client试用的时候,发现如果长时间没有操作,再次操作时会出现异常,无法查询出数据。为了解决这个问题,需要增加Flink配置classloader.check-leaked-classloader: false

Spark操作Amoro

和Flink类似,首先需要引入依赖。

添加如下jar到$SPARK_HOME/jars目录:

  • amoro-spark-3.3-runtime-0.6.1.jar
  • httpclient5-5.3.1.jar
  • httpcore5-5.1.5.jar
  • httpcore5-h2-5.2.4.jar

使用下面的命令进入spark-sql。命令中需要指定Amoro catalog的thrift连接URL。示例如下:

./spark-sql \
    --conf spark.sql.extensions=com.netease.arctic.spark.ArcticSparkExtensions \
    --conf spark.sql.catalog.hdfs_iceberg=com.netease.arctic.spark.ArcticSparkCatalog \
    --conf spark.sql.catalog.hdfs_iceberg.url=thrift://amoro_ip:1260/hdfs_iceberg

其中参数spark.sql.catalog.hdfs_iceberg中的hdfs_iceberg为Spark SQL中指定的catalog名称。格式为spark.sql.catalog.<catalog名称>
进入spark-sql之后第一次执行show catalogs;,尽管返回结果没有hdfs_iceberg这个catalog,但是我们仍可以通过use hdfs_iceberg切换到这个catalog。

创建表:

CREATE TABLE hdfs_iceberg.default.sample (
    id bigint  COMMENT "unique id",
    data string
) USING arctic;

查询不带时区timetamp字段类型的时候会出现如下错误:

java.lang.IllegalArgumentException: Cannot handle timestamp without timezone fields in Spark. Spark does not natively support this type but if you would like to handle all timestamps as timestamp with timezone set 'spark.sql.iceberg.handle-timestamp-without-timezone' to true. This will not change the underlying values stored but will change their displayed values in Spark. For more information please see https://docs.databricks.com/spark/latest/dataframes-datasets/dates-timestamps.html#ansi-sql-and-spark-sql-timestamps

解决方法:

SET `spark.sql.iceberg.handle-timestamp-without-timezone`=`true`;

Hive操作Amoro

Amoro mixed-hive格式同时支持使用Hive和Flink/Spark格式查询。
Hive查询mixed-hive格式表无需增加任何配置,无需引入任何依赖。
在Amoro创建表之后,Hive不需要事先创建表,会自动从Amoro同步(Hive只有同步过来的数据库和表,没有catalog)。
需要注意的是,Hive查询到iceberg格式的增量数据是有延迟的。必须等到full optimize运行完毕之后才能查询到全量结果。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335