Flink CDC 源码学习(四)

CDC全量阶段chunk划分实现

前面分析到其主要划分chunk入口在MySqlSourceEnumerator类的start方法中, 最终实现在MySqlSnapshotSplitAssigner类的open方法


image.png
 public void open() {
        chunkSplitter.open();
        discoveryCaptureTables();
        captureNewlyAddedTables();
        startAsynchronouslySplit();
    }

核心查看startAsynchronouslySplit方法

private void startAsynchronouslySplit() {
        if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
            if (executor == null) {
                ThreadFactory threadFactory =
                        new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
                this.executor = Executors.newSingleThreadExecutor(threadFactory);
            }
            // 创建单线程池, 异步执行splitChunksForRemainingTables进行chunk划分
            executor.submit(this::splitChunksForRemainingTables);
        }
    }

调用splitChunksForRemainingTables

private void splitChunksForRemainingTables() {
        try {
            // restore from a checkpoint and start to split the table from the previous
            // checkpoint
            // CDC任务重启从CK恢复处理
            if (chunkSplitter.hasNextChunk()) {
                LOG.info(
                        "Start splitting remaining chunks for table {}",
                        chunkSplitter.getCurrentSplittingTableId());
                splitTable(chunkSplitter.getCurrentSplittingTableId());
            }

            // split the remaining tables
            for (TableId nextTable : remainingTables) {
                // 进行chunk划分
                splitTable(nextTable);
            }
        } catch (Throwable e) {
            synchronized (lock) {
                if (uncaughtSplitterException == null) {
                    uncaughtSplitterException = e;
                } else {
                    uncaughtSplitterException.addSuppressed(e);
                }
                // Release the potential waiting getNext() call
                lock.notify();
            }
        }
    }

再调用splitTable 方法

 private void splitTable(TableId nextTable) {
       // 省略
        do {
            synchronized (lock) {
                List<MySqlSnapshotSplit> splits;
                try {
                    splits = chunkSplitter.splitChunks(partition, nextTable);
                } catch (Exception e) {
                    throw new IllegalStateException(
                            "Error when splitting chunks for " + nextTable, e);
                }

               
        } while (chunkSplitter.hasNextChunk());
         // 省略
    }

继续调用MySqlChunkSplitter类的splitChunks方法进行划分

@Override
    public List<MySqlSnapshotSplit> splitChunks(MySqlPartition partition, TableId tableId)
            throws Exception {
        if (!hasNextChunk()) {
            analyzeTable(partition, tableId);
            Optional<List<MySqlSnapshotSplit>> evenlySplitChunks =
                    trySplitAllEvenlySizedChunks(partition, tableId);
            if (evenlySplitChunks.isPresent()) {
                return evenlySplitChunks.get();
            } else {
                synchronized (lock) {
                    this.currentSplittingTableId = tableId;
                    this.nextChunkStart = ChunkSplitterState.ChunkBound.START_BOUND;
                    this.nextChunkId = 0;
                    return Collections.singletonList(
                            splitOneUnevenlySizedChunk(partition, tableId));
                }
            }
        } else {
            Preconditions.checkState(
                    currentSplittingTableId.equals(tableId),
                    "Can not split a new table before the previous table splitting finish.");
            if (currentSplittingTable == null) {
                /**
                 *  1.如果表没有主键,则必须设置chunkKeyColumn。
                 *  2.如果表有主键,则chunkKeyColumn必须是其中的一列,否则为空。
                 *  3.当参数chunkKeyColumn未设置且表具有主键时,返回主键的第一列(联合主键情况)。
                 *  4.根据拆分列, 查询其最大值, 最小值
                 *  5.执行SHOW TABLE STATUS LIKE 'TablaName'获取表数据量
                 */
                analyzeTable(partition, currentSplittingTableId);
            }
            synchronized (lock) {
                return Collections.singletonList(splitOneUnevenlySizedChunk(partition, tableId));
            }
        }
    }

最终调用splitOneUnevenlySizedChunk方法, 划分后的MySqlSnapshotSplit列表保存到remainingSplits集合中

  • 均匀分布

主键列自增且类型为整数类型(int,bigint,decimal)。查询出主键列的最小值,最大值,按 chunkSize 大小将数据均匀划分,因为主键为整数类型,根据当前chunk 起始位置、chunkSize大小,直接计算chunk 的结束位置。

// 计算主键列数据区间
select min(order_id), max(order_id) from demo_orders;
// 将数据划分为 chunkSize 大小的切片
chunk-0: [min,start + chunkSize)
chunk-1: [start + chunkSize, start + 2chunkSize)
.......
chunk-last: [max,null)

  • 非均匀分布
    主键列非自增或者类型为非整数类型。主键为非数值类型,每次划分需要对未划分的数据按主键进行升序排列,取出前 chunkSize 的最大值为当前 chunk 的结束位置

// 未拆分的数据排序后,取 chunkSize 条数据取最大值,作为切片的终止位置。
chunkend = SELECT MAX(order_id) FROM (
SELECT order_id FROM demo_orders
WHERE order_id >= [前一个切片的起始位置]
ORDER BY order_id ASC
LIMIT [chunkSize]
) AS T

 private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, TableId tableId)
            throws SQLException {
        final int chunkSize = sourceConfig.getSplitSize();
        final Object chunkStartVal = nextChunkStart.getValue();
        LOG.info(
                "Use unevenly-sized chunks for table {}, the chunk size is {} from {}",
                tableId,
                chunkSize,
                nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
                        ? "null"
                        : chunkStartVal.toString());
        // we start from [null, min + chunk_size) and avoid [null, min)
        Object chunkEnd =
                nextChunkEnd(
                        jdbcConnection,
                        nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
                                ? minMaxOfSplitColumn[0]
                                : chunkStartVal,
                        tableId,
                        splitColumn.name(),
                        minMaxOfSplitColumn[1],
                        chunkSize);
        // may sleep a while to avoid DDOS on MySQL server
        maySleep(nextChunkId, tableId);
        if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) {
            nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
            return createSnapshotSplit(
                    jdbcConnection,
                    partition,
                    tableId,
                    nextChunkId++,
                    splitType,
                    chunkStartVal,
                    chunkEnd);
        } else {
            currentSplittingTableId = null;
            nextChunkStart = ChunkSplitterState.ChunkBound.END_BOUND;
            return createSnapshotSplit(
                    jdbcConnection,
                    partition,
                    tableId,
                    nextChunkId++,
                    splitType,
                    chunkStartVal,
                    null);
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容