征文投稿 | Linkis引擎物料管理功能剖析

导语:本文以引擎相关的物料管理流程为切入点,同时结合底层数据模型和源码,为大家详细剖析引擎物料管理功能的实现细节,期望能够帮助大家更好地理解BML(物料库)服务的架构。

1.BML物料库服务

BML物料库是Linkis中PublicEnhancementService(PS)——公共增强服务架构下的功能模块。

在Linkis的架构体系里,物料的概念是指被统一存储托管起来的各种文件数据,包括脚本代码、资源文件、第三方jar、引擎启动时所需的相关类库和配置文件以及用于安全认证的keytab文件等。

总之,任何以文件态存在的数据,都可以被集中托管在物料库之中,然后在各自所需的场景中被下载使用。

物料服务是无状态的,可进行多实例部署,做到服务高可用,每个实例对外提供独立的服务,互不干扰,所有物料元数据及版本信息等在数据库中共享,底层物料数据可被存储到HDFS或本地(共享)文件系统之中,以及支持实现文件存储相关的接口,扩展其他文件存储系统等。

物料服务提供精确的权限控制,对于引擎资源类型的物料,可被所有用户共享访问;对于一些含有敏感信息的物料数据,也可做到仅有限用户可读。

物料文件采用追加的方式,可将多个版本的资源文件合并成一个大文件,避免产生过多的HDFS小文件,HDFS小文件过多会导致HDFS整体性能的下降。

物料服务提供了文件上传、更新、下载等操作任务的生命周期管理。同时,使用物料服务的方式有rest接口和SDK两种形式,用户可以根据自己的需要进行选择。

BML架构图如下:

上述关于BML架构的概述,有参考官网文档:https://linkis.apache.org/zh-CN/docs/latest/architecture/public_enhancement_services/bml

2. BML物料库服务底层表模型

在深入理解BML物料管理的流程细节之前,有必要先梳理下BML物料管理服务底层依赖的数据库表模型。

可结合Linkis的linkis_ddl.sql文件以及下文内容阐述的引擎物料上传和更新流程来理解bml resources相关表的字段含义以及表与表之间的字段关系。

3. BML物料库服务的使用场景

目前在Linkis中,BML物料库服务的使用场景包括:

  • 引擎物料文件,包括引擎启动时所需的conf和lib中的文件

  • 存储脚本,比如工作流任务节点链接的Scripts中的脚本是存储在BML物料库中的

  • DSS中工作流内容版本管理

  • 任务运行时所需资源文件管理

  • 4. 引擎物料管理流程剖析

    引擎物料是Linkis物料概念中的一个子集,其作用是为引擎启动时提供最新版本的jar包资源和配置文件等。本小节主要从引擎物料管理功能为切入点,剖析引擎物料数据在BML中的流转细节。

    4.1 引擎物料说明

    对Linkis的安装包正常部署之后,在LINKIS_INSTALL_HOME/lib/linkis-engineconn-plugins目录之下可以看到所有的引擎物料目录,以jdbc引擎为例,引擎物料目录的结构如下:

    jdbc
    ├── dist
    │   └── v4
    │   ├── conf
    │   ├── conf.zip
    │   ├── lib
    │   └── lib.zip
    └── plugin
    └── 4 └── linkis-engineplugin-jdbc-1.1.2.jar

    物料目录构成:

    jdbc/dist/版本号/conf.zip
    jdbc/dist/版本号/lib.zip
    jdbc/plugin/版本号(去v留数字)/linkis-engineplugin-引擎名称-1.1.x.jar

    conf.zip和lib.zip会作为引擎物料被托管在物料管理服务中,本地每次对物料conf或lib进行修改之后,对应物料会产生一个新的版本号,物料文件数据会被重新上传。引擎启动时,会获取最新版本号的物料数据,加载lib和conf并启动引擎的java进程。

    4.2 引擎物料上传和更新流程

    在Linkis完成部署并首次启动时,会触发引擎物料(lib.zip和conf.zip)首次上传至物料库;当引擎lib下jar包或conf中引擎配置文件有修改时,则需要触发引擎物料的刷新机制来保证引擎启动时能够加载到最新的物料文件。

    以现在Linkis1.1.x版本为例,触发引擎物料刷新的两种方式有两种:

    通过命令sh sbin/linkis-daemon.sh restart cg-engineplugin重启engineplugin服务

    通过请求引擎物料刷新的接口

    # 刷新所有引擎物料
    curl --cookie "linkis_user_session_ticket_id_v1=kN4HCk555Aw04udC1Npi4ttKa3duaCOv2HLiVea4FcQ=" http://127.0.0.1:9001/api/rest_j/v1/engineplugin/refreshAll
    # 指定引擎类型和版本刷新物料
    curl --cookie "linkis_user_session_ticket_id_v1=kN4HCk555Aw04udC1Npi4ttKa3duaCOv2HLiVea4FcQ=" http://127.0.0.1:9001/api/rest_j/v1/engineplugin/refresh?ecType=jdbc&version=4

    这两种引擎物料的刷新方式,其底层的实现机制是一样的,都是调用了EngineConnResourceService类中的refreshAll()或refresh()方法。

    在抽象类EngineConnResourceService的默认实现类DefaultEngineConnResourceService中的init()方法内部,通过参数wds.linkis.engineconn.dist.load.enable(默认为true)来控制是否在每次启动engineplugin服务时都执行refreshAll(false)来检查所有引擎物料是否有更新(其中faslse代表异步获取执行结果)。

    init()方法被注解@PostConstruct修饰,在DefaultEngineConnResourceService加载后,对象使用前执行,且只执行一次。

    手动调用engineplugin/refresh的接口,即手动执行了EngineConnResourceService类中的refreshAll或refresh方法。

    所以引擎物料检测更新的逻辑在DefaultEngineConnResourceService中的refreshAll和refresh方法内。

    其中refreshAll()的核心逻辑是:

    1)通过参数wds.linkis.engineconn.home获取引擎的安装目录,默认是:

    getEngineConnsHome = Configuration.getLinkisHome() + "/lib/linkis-engineconn-plugins";

    2)遍历引擎目录

    getEngineConnTypeListFromDisk: Array[String] = new File(getEngineConnsHome).listFiles().map(_.getName)

    3)EngineConnBmlResourceGenerator接口提供对各个引擎(版本)底层文件或目录的合法性检测。对应实现存在于抽象类AbstractEngineConnBmlResourceGenerator中。

    4)DefaultEngineConnBmlResourceGenerator类主要是为了生成EngineConnLocalizeResource。EngineConnLocalizeResource是对物料资源文件元数据和InputStream的封装,在后续的逻辑中EngineConnLocalizeResource会被作为物料参数来参与物料的上传过程。

    EngineConnBmlResourceGenerator、AbstractEngineConnBmlResourceGenerator、DefaultEngineConnBmlResourceGenerator这三个文件的代码细节暂不细说,可通过以下UML类图,大致了解其继承机制,并结合方法内的具体实现来理解这一部分的功能。

    再重新回到DefaultEngineConnResourceService类中的refreshAll方法内,继续看refreshTask线程的核心流程:

    engineConnBmlResourceGenerator.getEngineConnTypeListFromDisk foreach { engineConnType => 
    Utils.tryCatch {
    engineConnBmlResourceGenerator.generate(engineConnType).foreach {
    case (version, localize) =>
    logger.info(s" Try to initialize ${engineConnType}EngineConn-$version.")
    refresh(localize, engineConnType, version)
    }
    }
    ......}

    扫描引擎的安装目录,可获得每个引擎物料目录的列表,对于每个引擎物料目录结构的合法性校验通过之后,可得到对应的EngineConnLocalizeResource,然后通过调用refresh(localize: Array[EngineConnLocalizeResource], engineConnType: String, version: String)来完成后续物料的上传工作。

    而在refresh()方法的内部,主要经过的流程有:

    从表linkis_cg_engine_conn_plugin_bml_resources中获取对应engineConnType和version的物料列表数据,赋值给变量engineConnBmlResources。

    val engineConnBmlResources = asScalaBuffer(engineConnBmlResourceDao.getAllEngineConnBmlResource(engineConnType, version))

    4.2.1 引擎物料上传流程

    引擎物料上传流程时序图

    如果表linkis_cg_engine_conn_plugin_bml_resources中没有匹配到数据,则需要拿EngineConnLocalizeResource中的数据来构造EngineConnBmlResource对象,并保存至linkis_cg_engine_conn_plugin_bml_resources表中,此数据保存之前,需要先完成物料文件的上传操作,即执行uploadToBml(localizeResource)方法。

    在uploadToBml(localizeResource)方法内部,通过构造bmlClient来请求物料上传的接口。即:

    private val bmlClient = BmlClientFactory.createBmlClient()
    bmlClient.uploadResource(Utils.getJvmUser, localizeResource.fileName, localizeResource.getFileInputStream)

    在BML Server中,物料上传的接口位置在BmlRestfulApi类中的uploadResource接口方法内。主要经历的过程是:

    ResourceTask resourceTask = taskService.createUploadTask(files, user, properties);

    每一次物料上传,都会构造一个ResourceTask来完成文件上传的流程,并记录此次文件上传Task的执行记录。在createUploadTask方法内部,主要完成的操作如下:

    1)为此次上传的资源文件产生一个全局唯一标识的resource_id,String resourceId = UUID.randomUUID().toString();

    2)构建ResourceTask记录,并存储在表linkis_ps_bml_resources_task中,以及后续一系列的Task状态修改。

    ResourceTask resourceTask = ResourceTask.createUploadTask(resourceId, user, properties);
    taskDao.insert(resourceTask);

    taskDao.updateState(resourceTask.getId(), TaskState.RUNNING.getValue(), new Date());

    3)物料文件真正写入物料库的操作是由ResourceServiceImpl类中的upload方法完成的,在upload方法内部,会把一组List<MultipartFile> files对应的字节流持久化至物料库文件存储系统中;把物料文件的properties数据,存储到资源记录表(linkis_ps_bml_resources)和资源版本记录表(linkis_ps_bml_resources_version)中。

    MultipartFile p = files[0]
    String resourceId = (String) properties.get("resourceId");
    String fileName =new String(p.getOriginalFilename().getBytes(Constant.ISO_ENCODE),
    Constant.UTF8_ENCODE);
    fileName = resourceId;
    String path = resourceHelper.generatePath(user, fileName, properties);
    // generatePath目前支持Local和HDFS路径,路径的构成规则由LocalResourceHelper或HdfsResourceHelper
    // 中的generatePath方法实现
    StringBuilder sb = new StringBuilder();
    long size = resourceHelper.upload(path, user, inputStream, sb, true);
    // 文件size计算以及文件字节流写入文件由LocalResourceHelper或HdfsResourceHelper中的upload方法实现
    Resource resource = Resource.createNewResource(resourceId, user, fileName, properties);
    // 插入一条记录到resource表linkis_ps_bml_resources中
    long id = resourceDao.uploadResource(resource);
    // 新增一条记录到resource version表linkis_ps_bml_resources_version中,此时的版本号是onstant.FIRST_VERSION
    // 除了记录这个版本的元数据信息外,最重要的是记录了该版本的文件的存储位置,包括文件路径,起始位置,结束位置。
    String clientIp = (String) properties.get("clientIp");
    ResourceVersion resourceVersion = ResourceVersion.createNewResourceVersion(
    resourceId, path, md5String, clientIp, size, Constant.FIRST_VERSION, 1);versionDao.insertNewVersion(resourceVersion);

    上述流程执行成功之后,物料数据才算是真正完成,然后把UploadResult返回给客户端,并标记此次ResourceTask的状态为完成,如果有遇到上传文件报错,则标记此次ResourceTask的状态为失败,记录异常信息。

    4.2.2 引擎物料更新流程

    引擎物料更新流程时序图

    如果表linkis_cg_engine_conn_plugin_bml_resources中匹配到本地物料数据,则需要拿EngineConnLocalizeResource中的数据来构造EngineConnBmlResource对象,并更新linkis_cg_engine_conn_plugin_bml_resources表中原有物料文件的版本号、文件大小、修改时间等元数据信息,此数据更新前,需要先完成物料文件的更新上传操作,即执行uploadToBml(localizeResource, engineConnBmlResource.getBmlResourceId)方法。

    在uploadToBml(localizeResource, resourceId)方法内部,通过构造bmlClient来请求物料资源更新的接口。即:

    private val bmlClient = BmlClientFactory.createBmlClient()
    bmlClient.updateResource(Utils.getJvmUser, resourceId, localizeResource.fileName, localizeResource.getFileInputStream)

    在BML Server中,实现物料更新的接口位置在BmlRestfulApi类中的updateVersion接口方法内,主要经历的过程是:

    完成resourceId的有效性检测,即检测传入的resourceId是否在linkis_ps_bml_resources表中存在,如果此resourceId不存在,给客户端抛出异常,在接口层面此次物料更新操作失败。

    所以在表linkis_cg_engine_conn_plugin_bml_resourceslinkis_ps_bml_resources中的资源数据的对应关系需要保证完整,否则会出现物料文件无法更新的报错。

    resourceService.checkResourceId(resourceId)

    resourceId如果存在于linkis_ps_bml_resources表中,会继续执行:

    StringUtils.isEmpty(versionService.getNewestVersion(resourceId))

    getNewestVersion方法是为了在表linkis_ps_bml_resources_version中获取该resourceId的最大版本号,如果resourceId对应的最大version为空,那么物料同样会更新失败,所以此处数据的对应关系完整性也需要严格保证。

    上述两处检查都通过之后,会创建ResourceUpdateTask来完成最终的文件写入和记录更新保存等工作。

    ResourceTask resourceTask = null;
    synchronized (resourceId.intern()) {
    resourceTask = taskService.createUpdateTask(resourceId, user, file, properties);}

    而在createUpdateTask方法内部,主要实现的功能是:

    // 为物料Resource生成新的version
    String lastVersion = getResourceLastVersion(resourceId);
    String newVersion = generateNewVersion(lastVersion);
    // 然后是对ResourceTask的构建,和状态维护
    ResourceTask resourceTask = ResourceTask.createUpdateTask(resourceId, newVersion, user, system, properties);
    // 物料更新上传的逻辑由versionService.updateVersion方法完成versionService.updateVersion(resourceTask.getResourceId(), user, file, properties);

    在versionService.updateVersion方法内部,主要实现的功能是:

    ResourceHelper resourceHelper = ResourceHelperFactory.getResourceHelper();
    InputStream inputStream = file.getInputStream();
    // 获取资源的path
    String newVersion = params.get("newVersion").toString();
    String path = versionDao.getResourcePath(resourceId) + "_" + newVersion;
    // getResourcePath的获取逻辑是从原有路径中limit一条,然后以_拼接newVersion
    // select resource from linkis_ps_bml_resources_version WHERE resource_id = #{resourceId} limit 1
    // 资源上传到hdfs或local
    StringBuilder stringBuilder = new StringBuilder();
    long size = resourceHelper.upload(path, user, inputStream, stringBuilder, OVER_WRITE);
    // 最后在linkis_ps_bml_resources_version表中插入一条新的资源版本记录
    ResourceVersion resourceVersion = ResourceVersion.createNewResourceVersion(resourceId, path, md5String, clientIp, size, newVersion, 1);versionDao.insertNewVersion(resourceVersion);

    5. 文章小结

    本文从Linkis引擎物料管理功能作为切入点,概述了BML物料服务的架构,并结合底层源码,详细地剖析了在引擎物料管理功能中,引擎物料的概念,以及引擎物料的上传、更新、版本管理等操作流程。

    6. 参考文章

    https://linkis.apache.org/zh-CN/docs/latest/architecture/public_enhancement_services/bml 


    更多征文内容请访问: https://github.com/apache/incubator-linkis/discussions/categories/solicit-articles-%E5%BE%81%E6%96%87 

    — END —

    ● 往期精选  ●

  • Apache Linkis有奖征文,诚邀您的参与

  • 版本动态 | Apache Linkis(incubating) 1.1.0 版本发布

  • 版本动态 | 数据质量管理平台 Qualitis 0.9.0 版本发布

  • 版本动态 | DataSphereStudio 1.0.1版本发布

  • WeDataSphere 入门指南

  • 如何成为社区贡献者

    ► 官方文档贡献。发现文档的不足、优化文档,持续更新文档等方式参与社区贡献。通过文档贡献,让开发者熟悉如何提交PR和真正参与到社区的建设。参考攻略:保姆级教程:如何成为Apache Linkis文档贡献者

    ► 代码贡献。我们梳理了社区中简单并且容易入门的的任务,非常适合新人做代码贡献。请查阅新手任务列表:https://github.com/apache/incubator-linkis/issues/1161

    ► 内容贡献:发布WeDataSphere开源组件相关的内容,包括但不限于安装部署教程、使用经验、案例实践等,形式不限,请投稿给小助手。例如:

    ► 社区答疑:积极在社区中进行答疑、分享技术、帮助开发者解决问题等;

    ► 其他:积极参与社区活动、成为社区志愿者、帮助社区宣传、为社区发展提供有效建议等;

    本文使用 文章同步助手 同步

    ©著作权归作者所有,转载或内容合作请联系作者
    • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
      沈念sama阅读 203,547评论 6 477
    • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
      沈念sama阅读 85,399评论 2 381
    • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
      开封第一讲书人阅读 150,428评论 0 337
    • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
      开封第一讲书人阅读 54,599评论 1 274
    • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
      茶点故事阅读 63,612评论 5 365
    • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
      开封第一讲书人阅读 48,577评论 1 281
    • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
      沈念sama阅读 37,941评论 3 395
    • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
      开封第一讲书人阅读 36,603评论 0 258
    • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
      沈念sama阅读 40,852评论 1 297
    • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
      茶点故事阅读 35,605评论 2 321
    • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
      茶点故事阅读 37,693评论 1 329
    • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
      沈念sama阅读 33,375评论 4 318
    • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
      茶点故事阅读 38,955评论 3 307
    • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
      开封第一讲书人阅读 29,936评论 0 19
    • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
      开封第一讲书人阅读 31,172评论 1 259
    • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
      沈念sama阅读 43,970评论 2 349
    • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
      茶点故事阅读 42,414评论 2 342

    推荐阅读更多精彩内容