拓展 sequelize-typescript 库支持 Postgresql 分区表

一、背景

sequelize是一个基于promise的关系型数据库Node.js ORM框架,提供了支持建立model结构,连接并封装数据库操作函数等功能,而sequelize-typescript是为了支持typescript语法而基于sequelize封装的框架。

由于项目前期阶段方便db维护,应用服务启动时会调用sequelize-typescript提供的sync函数根据model结构自动同步到数据库,使用这种方式同步普通表是没有很大问题的,但是目前sync函数不支持同步分区表,导致出现分区表只能另外手动创建和维护的情况。

所以为了解决上述这个问题,只能基于sequelize-typescript来实现分区表自动同步的功能了,下面开始讨论拓展sequelize-typescript使其支持分区表的方案和实现。

首先通过源码了解sequelize是怎样将model结构同步到数据库的,才能去对其进行改造,先看sequelize.sync函数的实现:

// 文件路径为<project_path>/node_modules/_sequelize@4.44.2@sequelize/lib/sequelize.js
// 路径中的"4.44.2"是sequelize-typescript库引用的sequelize版本号,可能sequelize-typescript版本不同使得引用的sequelize版本有所差异
sync(options) {
  options = _.clone(options) || {};
  options.hooks = options.hooks === undefined ? true : !!options.hooks;
  options = _.defaults(options, this.options.sync, this.options);
  if (options.match) {
    if (!options.match.test(this.config.database)) {
      return Promise.reject(new Error(`Database "${this.config.database}" does not match sync match parameter "${options.match}"`));
    }
  }
  return Promise.try(() => {
    if (options.hooks) {
      return this.runHooks('beforeBulkSync', options);
    }
  }).then(() => {
    if (options.force) {
      return this.drop(options);
    }
  }).then(() => {
    const models = [];

    // Topologically sort by foreign key constraints to give us an appropriate
    // creation order
    this.modelManager.forEachModel(model => {
      if (model) {
        models.push(model);
      } else {
        // DB should throw an SQL error if referencing inexistant table
      }
    });
    return Promise.each(models, model => model.sync(options));
  }).then(() => {
    if (options.hooks) {
      return this.runHooks('afterBulkSync', options);
    }
  }).return(this);
}

从以上代码可知,sequelize.sync函数中除了做一些前后置的hook操作外就是调用每个model的sync函数同步表结构,再贴下model.sync函数代码:

// 文件路径为<project_path>/node_modules/_sequelize@4.44.2@sequelize/lib/model.js
// 路径中的"4.44.2"是sequelize-typescript库引用的sequelize版本号,可能sequelize-typescript版本不同使得引用的sequelize版本有所差异
static sync(options) {
  options = _.extend({}, this.options, options);
  options.hooks = options.hooks === undefined ? true : !!options.hooks;

  const attributes = this.tableAttributes;

  return Promise.try(() => {
    if (options.hooks) {
      return this.runHooks('beforeSync', options);
    }
  }).then(() => {
    if (options.force) {
      return this.drop(options);
    }
  })
    .then(() => this.QueryInterface.createTable(this.getTableName(options), attributes, options, this))
    .then(() => {
    if (options.alter) {
      return Promise.all([
        this.QueryInterface.describeTable(this.getTableName(options)),
        this.QueryInterface.getForeignKeyReferencesForTable(this.getTableName(options))
      ])
        .then(tableInfos => {
        const columns = tableInfos[0];
        // Use for alter foreign keys
        const foreignKeyReferences = tableInfos[1];

        const changes = []; // array of promises to run
        const removedConstraints = {};

        _.each(attributes, (columnDesc, columnName) => {
          if (!columns[columnName]) {
            changes.push(() => this.QueryInterface.addColumn(this.getTableName(options), columnName, attributes[columnName]));
          }
        });
        _.each(columns, (columnDesc, columnName) => {
          const currentAttributes = attributes[columnName];
          if (!currentAttributes) {
            changes.push(() => this.QueryInterface.removeColumn(this.getTableName(options), columnName, options));
          } else if (!currentAttributes.primaryKey) {
            // Check foreign keys. If it's a foreign key, it should remove constraint first.
            const references = currentAttributes.references;
            if (currentAttributes.references) {
              const database = this.sequelize.config.database;
              const schema = this.sequelize.config.schema;
              // Find existed foreign keys
              _.each(foreignKeyReferences, foreignKeyReference => {
                const constraintName = foreignKeyReference.constraintName;
                if (!!constraintName
                    && foreignKeyReference.tableCatalog === database
                    && (schema ? foreignKeyReference.tableSchema === schema : true)
                    && foreignKeyReference.referencedTableName === references.model
                    && foreignKeyReference.referencedColumnName === references.key
                    && (schema ? foreignKeyReference.referencedTableSchema === schema : true)
                    && !removedConstraints[constraintName]) {
                  // Remove constraint on foreign keys.
                  changes.push(() => this.QueryInterface.removeConstraint(this.getTableName(options), constraintName, options));
                  removedConstraints[constraintName] = true;
                }
              });
            }
            changes.push(() => this.QueryInterface.changeColumn(this.getTableName(options), columnName, attributes[columnName]));
          }
        });
        return changes.reduce((p, fn) => p.then(fn), Promise.resolve());
      });
    }
  })
    .then(() => this.QueryInterface.showIndex(this.getTableName(options), options))
    .then(indexes => {
    // Assign an auto-generated name to indexes which are not named by the user
    this.options.indexes = this.QueryInterface.nameIndexes(this.options.indexes, this.tableName);

    indexes = _.filter(this.options.indexes, item1 =>
                       !_.some(indexes, item2 => item1.name === item2.name)
                      ).sort((index1, index2) => {
      if (this.sequelize.options.dialect === 'postgres') {
        // move concurrent indexes to the bottom to avoid weird deadlocks
        if (index1.concurrently === true) return 1;
        if (index2.concurrently === true) return -1;
      }

      return 0;
    });

    return Promise.map(indexes, index => this.QueryInterface.addIndex(
      this.getTableName(options),
      _.assign({
        logging: options.logging,
        benchmark: options.benchmark,
        transaction: options.transaction
      }, index),
      this.tableName
    ));
  }).then(() => {
    if (options.hooks) {
      return this.runHooks('afterSync', options);
    }
  }).return(this);
}

model.sync代码稍微有点长,其中大致执行流程如下:

model.sync执行流程.png

二、方案

理解model结构同步到数据库的实现过程后,先设计一种支持定义分区表的方式,我的方案是在定义表的配置项中新增几个新属性:partitionpartitionKeypartitionRule分别表示分区类型、分区字段、分区规则。

其中partition支持RANGE/LIST/HASH三种类型;partitionRule是一个object, key是分区子表名后缀,且key数量即是分区子表数量,value可以分别是RANGE区间/LIST值列表/HASH映射值;生成的分区子表名就是由主表名+partitionRule中每个key生成。

然后需要重写分区表的sync函数,其中实现主要流程如下:

model.sync改进后的执行流程.png

因为拓展 sequelize 时使用的是 Postgresql 10.x 版本,我们需要先了解 Postgresql 10.x 和 Postgresql 11.x 版本所支持的分区表有几个区别:

  • Postgresql 10.x 只支持 RANGE 和 LIST 分区方式,而 Postgresql 11.x 除此外还支持HASH分区方式;
  • Postgresql 10.x 主表不支持创建主键/索引,必须单独操作每个子表的索引,而 Postgresql 11.x 可以直接操作主表的索引并会自动同步到每个子表;
  • Postgresql 10.x 没有默认分区,当插入数据找不到所对应的分区时会插入失败,而Postgresql 11.x 支持默认分区;
  • Postgresql 10.x 只支持一级分区,即不能对子表再次做分区,而Postgresql 11.x 支持多级分区;

三、实现

接下来会详细说明几个关键过程(在流程图中用深色表示)的具体实现,值得一提的是这里将在支持Postgresql 10.x版本前提下使用typescript语法重写model.sync函数。

以下为代码会分段贴出来讲解,完整代码在 github 上可以自取。

1. 新建分区主表

这一步需要新建分区主表而不是普通表,所以需要在原来创建表sql的基础上进行修改,具体实现是调用createTableQuery生成创建普通表sql后,在该sql末尾加上PARTITION BY <partition> (<partitionKey>)即可通过直接执行生成分区主表。

// 生成创建分区主表的SQL
const attrs = this.QueryInterface.QueryGenerator.attributesToSQL(attributes, {
  context: 'createTable',
});
let createTableSql = this.QueryInterface.QueryGenerator.createTableQuery(this.getTableName(options), attrs, options);
createTableSql = createTableSql.substring(0, createTableSql.length - 1) + ` PARTITION BY ${options.partition} ("${options.partitionKey}");`;
return this.sequelize.query(createTableSql, options);

但是这个环节需要注意的是,由于Postgresql 10.x版本的分区主表不支持创建主键和索引,所以生成创建分区主表sql之前需要先移除主键,并在分区子表中使用唯一索引替代主键。

// postgresql 10.x版本中分区主表不能有主键和索引
const primaryKey: string[] = [];
for (const name in attributes) {
  const attr = attributes[name];
  if (attr.primaryKey) {
    primaryKey.push(name);
    attr.allowNull = false;
    delete attr.primaryKey;
  }
  if (name === this.options.partitionKey) {
    prititionColumn = attributes[name];
  }
}
// 分区子表使用unique索引代替主键
if (!this.options.indexes) {
  this.options.indexes = [];
}
this.options.indexes.push({
  fields: primaryKey,
  unique: true,
});
2. 新建以 partitionRule 的每个键为后缀的分区子表

这一步只需根据分区方式和分区主表名拼接出创建子表的sql,因为子表会自动与主表的字段结构保持一致,所以生成sql也比较简单。因为每次都会遍历partitionRule,所以当新增键值和分区规则后也能自动创建新的子表。

// 同步分区子表
const pmList: any = [];
for (const suffix in options.partitionRule) {
  let rule = options.partitionRule[suffix];
  if (prititionColumn.type instanceof DataType.STRING || prititionColumn.type instanceof DataType.TEXT || prititionColumn.type instanceof DataType.CHAR) {
    rule = rule.map(val => `'${val}'`);
  }
  let sql = `CREATE TABLE IF NOT EXISTS "${this.tableName + suffix}" PARTITION OF "${this.tableName}" FOR VALUES`;
  if (options.partition.toUpperCase() === 'LIST') {
    sql += ` IN (${rule.join(',')});`;
  } else if (options.partition.toUpperCase() === 'RANGE') {
    sql += ` FROM (${rule[0]}) TO (${rule[1]});`;
  }
  pmList.push(this.sequelize.query(sql, options));
}
return Promise.all(pmList);
3. 对比并增删改分区主表字段

由于增删改分区主表字段会自动将改动同步到每个分区子表,这一步只需要直接操作分区主表就可以,又因为对分区主表和普通表的逻辑操作基本没有区别,所以这一步和原生sync函数实现基本一致。

但是Postgresql分区表在创建之后就不允许修改分区字段,所以在调用changeColumn时需要将分区字段排除掉。

if (columnName !== options.partitionKey) {
  changes.push(() => this.QueryInterface.changeColumn(this.getTableName(options), columnName, attributes[columnName]));
}
4. 新增分区子表索引

Postgresql分区主表是不能创建索引的,故这里需要遍历每个分区子表,从db查询每个子表已有的索引,并创建每个子表新添加的索引,不过需要注意的是索引名须用子表名而不是主表名去生成,否则会发生创建索引冲突。

// 同步每个分区子表的索引结构
const tableNameList: string[] = [];
let indexOptions: any = this.options.indexes;
for (const suffix in options.partitionRule) {
  tableNameList.push(`${this.getTableName(options) + suffix}`);
}
return Promise.map(tableNameList, tableName => this.QueryInterface.showIndex(tableName, options))
  .then(indexesList => {
  const createIdxPrmList: any = [];
  for (let i = 0 ; i < indexesList.length; i++) {
    let indexes = indexesList[i];
    const tableName = tableNameList[i];
    for (const index of indexOptions) {
      delete index.name;
    }
    indexOptions = this.QueryInterface.nameIndexes(indexOptions, tableName);
    indexes = _.filter(indexOptions, item1 =>
                       !_.some(indexes, item2 => item1.name === item2.name),
                      ).sort((index1, index2) => {
      if (this.sequelize.options.dialect === 'postgres') {
        // move concurrent indexes to the bottom to avoid weird deadlocks
        if (index1.concurrently === true) return 1;
        if (index2.concurrently === true) return -1;
      }
      return 0;
    });
    for (const index of indexes) {
      createIdxPrmList.push(this.QueryInterface.addIndex(
        tableName,
        _.assign({
          logging: options.logging,
          benchmark: options.benchmark,
          transaction: options.transaction,
        }, index),
        tableName,
      ));
    }
  }
  return Promise.all(createIdxPrmList);
});

四、结论

文章基本上是讲如何拓展sequelize支持分区表结构同步到数据库的方案和实现,却没有说怎么对已有分区表的一些读写操作,这是因为分区表是物理存储上做分区而操作逻辑统一,对外的读写和普通表几乎没有区别,所以sequelize封装的大部分函数都可以直接适用于分区表的操作。

然后为什么sequelize不提供分区表结构同步到数据库的功能呢?我觉得有以下几个可能的原因:

  • 使用sequelize.sync来同步表结构本身不是sequelize自动维护表结构的初衷,项目逐渐发展后会使用sequelize-cli库来单独维护数据库表结构的改动;
    -sequelize.sync同步表结构执行时间会较长,若同步分区数量很大的表则可能直接影响整个数据库其他IO操作,而且在Postgresql 11.x版本当分区规则发生改动的时候还需要自动迁移现有数据,这会带来更大的性能影响;
  • 分区表被应用的程度不是很广,主要是大部分场景下随着数据量的增加往往会性能会下降得很快,因为分区表存储上做了分区但提供对外的操作逻辑和普通表没有差别,即数据库本身要做更多的工作如定位分区、数据整合等,甚至某些情况下需要对每个子表进行扫描,由于这些场景的存在都使分区表没有使用物理分表那样灵活可控;

既然分区表性能上表现不理想,那么为什么还要使用分区表,或者哪些场景下可以考虑使用分区表呢?

我觉得是在一些项目初期还未有成熟的物理分表库可用,线上数据量会在短期内变得很大,而且对这些数据需要做多种比较复杂查询的情况下,可以先考虑使用分区表。

因为项目前期可能有根据查询offset+limit查询大表数据这种看似简单的需求,这种一般要使用物理分表结合相应的数据冗余表来实现,但是前期出于减少开发成本和考虑项目进度,先使用分区表完成类似的需求,但总之分区表是不适用较成熟的应用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343