Redis源码阅读笔记(6)-跳跃表

跳跃表在平常的数据结构和算法中比较少接触,它是一种有序的数据结构,通过建立类似索引的来实现快速的查找,其支持O(LogN)的平均时间复杂度,最坏O(N)的时间复杂度;在大部分情况下,其可以与平衡树相媲美,而且由于实现较平衡树简单,因此常常作为平衡树的替代方案。

设计的主要代码:

  1. redis.h
  2. t_zset.h

跳跃表的定义

我们知道,即使对于一个有序链表,其查询一个元素的时间复杂度为O(N),即使是排序的,也不能通过二分查找来缩短查找的时间。如下图,查找节点15需要比较8个节点。


链表

那么有没有其他优化的算法呢?我们可以利用索引的原理,在链表上提取部分节点建立索引,如下图,只需要比较6个节点。


一级索引

根据这个思路,可以在一级索引之上,再提取部分节点作为二级索引,如下图,只需要比较4个节点


二级索引

在此基础上,再往上建立一层,如下图,只需要比较3个节点。

三级索引

以上就是跳跃表的思想,通过增加多层索引,以空间换时间,加快节点的查询。图中元素不多,查找速度的加快没有体现出,如果节点一多,那跳跃表的优势就体现出来了。

跳跃表具有如下性质:

  1. 以空间换时间,由多层组成;
  2. 每一层都是有序链表;
  3. 最后一层包含所有元素;
  4. 如果节点node出现在layer n出现,那么该节点在layer n之下的链表层一定会出现;
  5. 每一个节点包含两个指针,一个指向同一层链表的下一个节点,另一个指向下一层的节点。

跳跃表的实现

//跳跃表节点定义
typedef struct zskiplistNode {
    robj *obj;      //成员对象
    double score;   //分值  主要按分值来排序,其次才按对象
    struct zskiplistNode *backward; //回退指针
    struct zskiplistLevel {             //层级,按1~32层随机指定
        struct zskiplistNode *forward;  //层级的前向指着
        unsigned int span;              //跨度
    } level[];
} zskiplistNode;

//管理跳跃表
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;              //最大节点层数
} zskiplist;

Redis的跳跃表由zskiplistNodezskiplist组成,其图像结构如下:

一个简单跳跃表

其中zskiplist用来管理跳跃表,其包含了跳跃表的相关信息:

  1. header:指向跳跃表的表头;
  2. tail:指向跳跃表的表尾;
  3. level:记录该跳跃表目前的最大层级(表头节点不计算在内);
  4. length:记录了该跳跃表的长度。

zskiplistNode是实际跳跃表的节点,其记录了节点信息:

  1. obj:节点保存的对象;
  2. score:节点排序的分值,节点的排序按照该值从小到大排序,如果该值相同,则按照obj的大小来排序;
  3. backward:最底层是一个双向链表,用来从表尾向表头遍历节点用,因为每个节点只有一个后退指针,因此每次只能后退一个节点;
  4. level.forward:前向指针,用来保存同层下一个节点的地址;
  5. level.span记录了同层节点之间的距离span,即在最底层两节点相距多少个节点,用来快速计算节点的rank(即节点所在的index)时使用。指向null的span为0

相关实现细节

创建和销毁
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
//跳跃表节点创建函数
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));       //申请内存
    zn->score = score;
    zn->obj = obj;
    return zn;
}

//创建跳跃表
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);     //创建第一列节点,固定32层级,保存用来执行每层的头结点
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {  //初始化
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

//释放节点
void zslFreeNode(zskiplistNode *node) {
    decrRefCount(node->obj);        //减少对象被引用引用数
    zfree(node);
}
//释放跳跃表
void zslFree(zskiplist *zsl) {
    zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree(zsl->header);         //释放第一列32层级的表头结点
    while(node) {
        next = node->level[0].forward;
        zslFreeNode(node);
        node = next;
    }
    zfree(zsl);
}

Redis在创建跳跃表的时候,会创建一个32层的表头节点,该表头节点用来指向各层的开始节点,初始化时,该节点的每一层的forward都指向NULLspan为0。

插入节点
//生成1~32的随机数
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

//插入节点,从高往低计算,从低层往高层插入,从span可以计算出每个节点的rank
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    redisAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {                               //从最高层往下一层一层的插入
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            rank[i] += x->level[i].span;            //记录在要插入的前节点的排名
            x = x->level[i].forward;
        }
        update[i] = x;          //记录每一层插入的前一节点
    }
    level = zslRandomLevel();           //设置该节点level
    if (level > zsl->level) {           //level比当前的level还高的
        for (i = zsl->level; i < level; i++) {         //设置新添层的rank和update
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,obj);         //创建节点
    for (i = 0; i < level; i++) {               //从低往高处理
        x->level[i].forward = update[i]->level[i].forward;              //设置每层该节点的forward指针
        update[i]->level[i].forward = x;                                //改变前一节点forward

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);      //设置该层本节点span,因为有要插入的位置之前的节点rank信息
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;                     //设置前一节点的span
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

Redis在添加节点的时候,该节点所拥有的层数通过zslRandomLevel计算得到的(作者注释该随机数满足powerlaw-alike 分布,还没研究,有空研究研究)。
每次插入节点,要更改每一层同层前置节点的forward和span。

删除节点
//删除节点
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    //从底层往高层删
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;           //本节点被删除,所以要减一
            update[i]->level[i].forward = x->level[i].forward;          //设置forward
        } else {                                                        //当前层级不包含x节点,只需减一
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;               //看最高层级是否需要减1
    zsl->length--;
}

//指定要删除的节点
/* Delete an element with matching score/object from the skiplist. */
int zslDelete(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {           //寻找该节点可以插入的前一节点
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0)))
            x = x->level[i].forward;
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;            //还要再次确认是否存在该节点
    if (x && score == x->score && equalStringObjects(x->obj,obj)) {
        zslDeleteNode(zsl, x, update);
        zslFreeNode(x);
        return 1;
    }
    return 0; /* not found */
}

删除节点比较简单,首先要寻找每一层(该节点存在的每一层)该节点的前置节点,然后更改每一个同层节点前置节点的forwardspan

获取对应节点的rank以及返回获取rank为指定值的节点
//获取值对应的rank
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->obj && equalStringObjects(x->obj,o)) {
            return rank;
        }
    }
    return 0;
}

//获取值对应的rank
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->obj && equalStringObjects(x->obj,o)) {
            return rank;
        }
    }
    return 0;
}

利用每一层记录的span,可以快速的计算出节点的rank。

删除指定score范围的节点

static int zslValueGteMin(double value, zrangespec *spec) {
    return spec->minex ? (value > spec->min) : (value >= spec->min);
}

static int zslValueLteMax(double value, zrangespec *spec) {
    return spec->maxex ? (value < spec->max) : (value <= spec->max);
}

//删除指定score范围的节点,返回被删除的节点数量
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long removed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (range->minex ?
            x->level[i].forward->score <= range->min :
            x->level[i].forward->score < range->min))
                x = x->level[i].forward;
        update[i] = x;
    }

    /* Current node is the last with score < or <= min. */
    //找到第一个大于range->min的值
    x = x->level[0].forward;

    /* Delete nodes while in range. */
    //删除直到score>max
    while (x &&
           (range->maxex ? x->score < range->max : x->score <= range->max))
    {
        zskiplistNode *next = x->level[0].forward;
        zslDeleteNode(zsl,x,update);
        dictDelete(dict,x->obj);
        zslFreeNode(x);
        removed++;
        x = next;
    }
    return removed;
}

首先要找到第一个符合条件的节点。

总结

跳跃表的名字听起来会觉得这个算法很复杂,但实际懂了以后,会发现跳跃表其实很简单。

Redis的跳跃表由以下几个特点:

  1. 跳跃表节点的层数是1~32的随机数;
  2. 同一个跳跃表中,节点可以包含相同的值,但每个节点的成员对象必须是唯一的;
  3. 跳跃表是先按score进行排序,然后在根据成员对象的大小进行排序;
  4. 跳跃表查找的平均时间复杂度为O(LogN),最坏时间复杂度是O(N)。

跳跃表API

function description time complexity
zslCreate 创建一个跳跃表 O(1)
zslFree 释放一个跳跃表 O(N)
zslInsert 插入一个节点 平均O(LogN),最坏O(LogN)
zslDelete 删除一个节点 平均O(LogN),最坏O(LogN)
zslFirstInRange 获取满足分值范围的第一个节点 平均O(LogN),最坏O(LogN)
zslLastInRange 获取满足分值范围的最后一个节点 平均O(LogN),最坏O(LogN)
zzlGetScore 获取节点score 平均O(LogN),最坏O(LogN)
zslGetRank 获取节点的index 平均O(LogN),最坏O(LogN)
zslGetElementByRank 获取指定index的节点 平均O(LogN),最坏O(LogN)
zslIsInRange 跳跃表是否在指定score范围之内 通过表头表尾,O(1)
zslDeleteRangeByScore 删除指定score范围之内的节点 平均O(N),N为被删节点数量
zslDeleteRangeByRank 删除指定rank范围之内的节点 平均O(N),N为被删节点数量

Reference

  1. 漫画算法:什么是跳跃表?
  2. 跳跃表原理
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容