Leaf分布式Id生成系统segment方式的源码全解析

写在所有之前

在日常业务中可能遇到生成业务类全局ID的情况,这类ID的关键点在于全局不重复,对于单例来说,这个不难实现,但是对于分布式场景下,如何保证每个独立部署的服务都能生成互补重复的唯一键呢,它的核心思路就是找到一个唯一的“权威”来做这个事情,也就是说独立于分布式的应用来找一位发布者。但是这又引出另一个问题,如何保证这个唯一“权威”不成为系统瓶颈呢?
实际上目前不少支持分布式部署的中间件都有自己的CAP逻辑,美团推出的开源项目Leaf也就是利用了数据库和ZK以及他们的分布式方案最终实现了分布式Id生成系统。
具体的实现逻辑可见美团技术团队的官方文档

  • 框架介绍
  • 源码地址
    本文的目的是从源码的角度介绍Leaf是如何工作的。如前文所说,在Leaf的设计中有两种实现方式,一是基于数据库的segment算法,二是基于ZK的雪花算法。我们先整体看一下Leaf源码的结构:
    Leaf项目框架.png

    Leaf的源码很简单,分为两部分,leaf-core作为具体的Id生成实现自成一体,另外的leaf-server是官方以web服务的形式对外提供查询api和监控系统。
    监控web.png

    实际上,我们在阅读源码时只用关注到leaf-core,leaf-server仅仅是展示层,在真实项目中可用,也可自行替换。以下从segment(基于database)和snowflake(基于雪花算法)两个方面分别进行源码解读。

segment

原理解释

实际上,segemnt利用数据库作为一个Id生成规则的存放地址,从表结构可以看到:

CREATE TABLE `leaf_alloc` (
  `biz_tag` varchar(128) NOT NULL DEFAULT '',
  `max_id` bigint NOT NULL DEFAULT '1',
  `step` int NOT NULL,
  `description` varchar(256) DEFAULT NULL,
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`biz_tag`)
)

biz_tag:业务标签,它意味着该表中每一条数据都是一个业务的id生成规则,比如在一个电商系统中,除了订单id需要生成外,可能还有商品条形码id,如此就会出现两条表记录。
max_id:当前规则会分配的最大id值。注意这里是bigint类型,因此意味着segment方式生成的id就是普通数字。
step:步长,不难理解,当前规则的号段就是从max_id - step ~ max_id。
我们注意到这一系列字段中leaf_alloc表中并不会存储当前的id,那客户端在申请一个新id时,它是怎么知道下一个id是多少呢?实际上,当前id是记录在该服务运行时内存中。
这就涉及到segment形式中的核心概念——segment,它的定义如下:

public class Segment {
    private AtomicLong value = new AtomicLong(0);
    private volatile long max;
    private volatile int step;
    private SegmentBuffer buffer;
}

没错第一个成员变量就是当前id值。可以想见,当leaf服务启动时,程序会读取db中的规则配置,将其读入内存并以Segment形式存储(见其他三个fields),并初始化当前id=0.
这就是segment的大致逻辑,总结起来,它利用了db存储规则,然后在每个leaf-core启动时加载规则,并初始化当前id。读到这里有个小问题:“分布式系统中,自然leaf服务也会分布式部署,当有多个leaf服务时,如何保证生成的id不重复呢”。这里先说结论:
每个leaf-core在启动时都会去读取db中的规则(如果没有则需提前手动insert)。每次读取前都会根据步长(step)取修改max_id:

max_id = max_id + step

理解了吧,也就是说每个leaf服务都有一个独立的号段(max_id-step, max_id),因此它们在分配id时并不会重复。
关于其原理言尽于此,我们直接看看源码:
segemnt核心类如下,
一个实现类SegmentIDGenImpl实现接口IDGen,结束!是不是很简单


segment核心类.png

在IDGen interface中两方法get,init

public interface IDGen {
    Result get(String key);
    boolean init();
}

自然从init实现方法开始,里面涉及到两个具体方法updateCacheFromDb和updateCacheFromDbAtEveryMinute。从字面理解,将规则从db中加载到segment对象中,并且从开启每分钟同步一次的定时任务。关注点可以放在它的加载过程。

    private void updateCacheFromDb() {
        logger.info("update cache from db");
        StopWatch sw = new Slf4JStopWatch();
        try {
            // 1.获取所有biz_tag
            List<String> dbTags = dao.getAllTags();
            if (dbTags == null || dbTags.isEmpty()) {
                return;
            }
            // 2.获取当前缓存中的biz_tag
            List<String> cacheTags = new ArrayList<String>(cache.keySet());
            Set<String> insertTagsSet = new HashSet<>(dbTags);
            Set<String> removeTagsSet = new HashSet<>(cacheTags);
            //db中新加的tags灌进cache
            for(int i = 0; i < cacheTags.size(); i++){
                String tmp = cacheTags.get(i);
                if(insertTagsSet.contains(tmp)){
                    insertTagsSet.remove(tmp);
                }
            }
            // 3.至此获得目前需要新增到cache中的tags,存放到insertTagsSet中
            for (String tag : insertTagsSet) {
                SegmentBuffer buffer = new SegmentBuffer();
                buffer.setKey(tag);
                Segment segment = buffer.getCurrent();
                segment.setValue(new AtomicLong(0));
                segment.setMax(0);
                segment.setStep(0);
                cache.put(tag, buffer);
                logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
            }
            //4. cache中已失效的tags从cache删除
            for(int i = 0; i < dbTags.size(); i++){
                String tmp = dbTags.get(i);
                if(removeTagsSet.contains(tmp)){
                    removeTagsSet.remove(tmp);
                }
            }
            for (String tag : removeTagsSet) {
                cache.remove(tag);
                logger.info("Remove tag {} from IdCache", tag);
            }
        } catch (Exception e) {
            logger.warn("update cache from db exception", e);
        } finally {
            sw.stop("updateCacheFromDb");
        }
    }

代码很简单,值得一提的是,第三步中segment对象被放到了SegmentBuffer中,实际上塞到本地缓存中的是SegmentBuffer,它是个什么玩意儿?这涉及到美团对leaf的一层优化。当value==max_id后,会更新db中的规则,并reload规则到segmnt中,这里涉及到数据库的I/O,在高qps情况下这个过程会频繁发生。


单缓存.png

因此leaf在当前value达到某个阈值后,会提前开启下一个max_id的生成,并把该规则读入新segment中,一旦segment1中id达到max_id,则直接内存级别切换segment即可。


双缓存.png

因此引入了SegmentBuffer来管理这两个segemnts:
public class SegmentBuffer {
    private String key;
    private Segment[] segments; //双buffer
    private volatile int currentPos; //当前的使用的segment的index
    private volatile boolean nextReady; //下一个segment是否处于可切换状态
    private volatile boolean initOk; //是否初始化完成
    private final AtomicBoolean threadRunning; //线程是否在运行中
    private final ReadWriteLock lock;

    private volatile int step;
    private volatile int minStep;
    private volatile long updateTimestamp;
}

到这里我们看到了leaf-segemnt形式中缓存的最上层是业务tag-SegmentBuffer,SegmentBuffer中对应了两个Segment,而当前的value就存在segment当中。


存储结构.png

因此我们看到上面提到的updateCacheFromDb方法,实际上是在更新第一层关系,也就是扫描db中所有的bizTag,跟缓存中做比对,新增的bizTag在缓存中创建map,删除缓存中不存在的bizTag,保留与db一致的bizTag。需要注意的是,这时候对于新增的bizTag仅仅是在cache中创建空的SegmentBuffer。与updateCacheFromDb对应,updateSegmentFromDb就在更新第二层。

    public void updateSegmentFromDb(String key, Segment segment) {
        StopWatch sw = new Slf4JStopWatch();
        SegmentBuffer buffer = segment.getBuffer();
        LeafAlloc leafAlloc;
        if (!buffer.isInitOk()) {
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
        } else if (buffer.getUpdateTimestamp() == 0) {
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
        } else {
            long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
            int nextStep = buffer.getStep();
            //表示15min内更新过,代表更新频繁,这样的化,通过增加步长,
            // 缓存更多的id,来减少对db的操作
            if (duration < SEGMENT_DURATION) {
                if (nextStep * 2 > MAX_STEP) {
                    //do nothing
                } else {
                    nextStep = nextStep * 2;
                }
            } else if (duration < SEGMENT_DURATION * 2) {
                //do nothing with nextStep
            } else {
                //相当于duration >= 30min,表示该key的访问并不频繁,因此可以降低步长。
                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }
            logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
            LeafAlloc temp = new LeafAlloc();
            temp.setKey(key);
            temp.setStep(nextStep);
            leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(nextStep);
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
        }
        // must set value before set max
        long value = leafAlloc.getMaxId() - buffer.getStep();
        segment.getValue().set(value);
        segment.setMax(leafAlloc.getMaxId());
        segment.setStep(buffer.getStep());
        sw.stop("updateSegmentFromDb", key + " " + segment);
    }

这样一来,当客户端请求获取一个id时,会传入当前服务所属bizTag,如果cache中存在则直接找到segment并获取其中的value;如果cache中不存在bizTag则直接返回找不到的异常。这里有个疑问:缓存中找不到bizTag的时候,为什么不去db中主动查询一遍呢?我猜想,开发者的设计理念就是通过缓存去应对高并发场景降低db的I/O,直接去避免缓存穿透的问题。
写到这里,笔者意识到一个问题,当部署了多个leaf服务,此时流量可能随机的访问这些leaf,可能存在一个问题,那就是leaf生成的id的确是全局唯一,但不见得是全局按时间递增

client依次请求leaf id.png

以上是leaf服务初始化或从db中load规则,准备数据的源码内容。接下来是最关键的客户端获取id的过程:

    public Result get(final String key) {
        if (!initOK) {
            return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
        }
        //如果key(bizTag)存在,表示对应的规则已加载到缓存
        if (cache.containsKey(key)) {
            SegmentBuffer buffer = cache.get(key);
            //如果SegmentBuffer未初始化(仅实例化)则先初始化
            if (!buffer.isInitOk()) {
                synchronized (buffer) {
                    // 双重验证:第一次验证是快速判断SegmentBuffer是否需要初始化,如果需要则进入同步模块,
                    // 在进入同步模块后,为了避免其他线程在当前线程等待锁的过程中进行了初始化,因此会二次初始化
                    if (!buffer.isInitOk()) {
                        try {
                            updateSegmentFromDb(key, buffer.getCurrent());
                            logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                            buffer.setInitOk(true);
                        } catch (Exception e) {
                            logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                        }
                    }
                }
            }
            //id都是从本地缓存中获取,即便当前缓存中不存在数据,也是先由db同步到cache中,再获取
            return getIdFromSegmentBuffer(cache.get(key));
        }
        return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
    }

最终获取id的过程在getIdFromSegmentBuffer中,代码执行过程参见下图:


get id的过程.png

可以看到,在getIdFromSegmentBuffer过程中主要发生了两件事:

  • 当前id+1,得到应该返回的id,但是这个id在返回前需要判断是否超过了规则定义的max_id,如果超过了(严格来说等于也不行)。则将当前的buffer指向另一个备用的segment,他在之前已经预加载过。
  • 第二件事,实际上就是有个异步的判断,在刚进入getIdFromSegmentBuffer就会判断,是否当前的value值超过了10% * max_id,如果超过了,就预加载另一个segment。
    根据上图,和这两点看这段逻辑就会很清晰了。leaf segemnt形式介绍完成,总体来说,它是一种利用数据库持久化id生成规则,而非具体的id数据。而把具体的id生成任务丢给内存来进行,这样做的好处,在于即便在高并发的场景下,内存可以最快的反应请求,并且每次生成的规则都代表了一批数据,因此真实的db
    I/O也不会很高。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,264评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,549评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,389评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,616评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,461评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,351评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,776评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,414评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,722评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,760评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,537评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,381评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,787评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,030评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,304评论 1 252
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,734评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,943评论 2 336

推荐阅读更多精彩内容