HyperLogLog: Cardinality Estimation(基数估计算法源码解析)

背景

我们经常会统计某个字段的distinct value(DV),比如mysql 的distinct 关键字,但在大数据场景下,考虑空间复杂度,往往需要更少的内存来进行排重逻辑。
HyperLogLog算法就是解决此类问题的算法之一(下文简称DV),如Redis的HyperLogLog结构,ES的distinct语句.
HyperLogLog算法来源于论文
HyperLogLog the analysis of a near-optimal cardinality estimation algorithm
可以使用固定大小的字节计算任意大小的DV。

算法原理

HLL 算法的原理会涉及到比较多的数学知识,这边对这些数学原理和证明不会展开。举一个生活中的例子来帮助大家理解HLL算法的原理:比如你在进行一个实验,内容是不停地抛硬币,记录你连续抛到正面的次数(这是数学中的伯努利过程);如果你最多的连抛正面记录是3次,那可以想象你并没有做这个实验太多次,如果你最长的连抛正面记录是 20 次,那你可能进行了这个实验上千次。

一种理论上存在的情况是,你非常幸运,第一次进行这个实验就连抛了 20 次正面,我们也会认为你进行了很多次这个实验才得到了这个记录,这就会导致错误的预估;改进的方式是请 10 位同学进行这项实验,这样就可以观察到更多的样本数据,降低出现上述情况的概率。这就是 HLL 算法的核心思想。

感性认识

HASH CODE 发生概率 第一个1出现的位置 预计实验次数
1xxxxxxx 1/2 1 2
01xxxxxx 1/4 1 4
001xxxxx 1/8 3 8
0001xxxx 1/16 4 16

也就是说前导0越多,需要实验的次数越多。

分桶

为解决以上错误的预估,改进方法是m个人进行这项实验(数据分成m个均等的部分),分别估计其值,并求其平均数后再乘以m,称之为分桶。这样就能一定程度上避免单一突发事件造成的误差。

公式:DV_{LL} = constant*m*2^\overline{R}

loglog 算法的基数计算公式

其中

  • m代表分桶数
  • 桶中数据为最长前导零的个数+1
    代表桶的结果的均值
  • constant 常数,后续源代码介绍。

调和平均数

前面的LogLog算法中我们是使用的是平均数来将每个桶的结果汇总起来,但是平均数有一个广为人知的缺点,就是容易受到大的数值的影响,一个常见的例子是人均GDP。
用调和平均数就可以解决这一问题,调和平均数的结果会倾向于集合中比较小的数(倒数),x1到xn的调和平均数的公式如下:


调和平均数

HyperLogLog算法相比LogLog算法一个重要的改进就是使用调和平均数而不是平均数来聚合每个桶中的结果,HyperLogLog算法的公式如下:


HLL 算法基数计算公式

基中
每个桶的估计值

以下为HLL算法的基本原理,关于细节的公式推导,暂不涉及,有兴的小伙伴可以参考上文的论文原文。

RSD

相对标准误差(relative standard deviation,简称RSD)


rsd

其中m为桶数,由此可知rsd与m成正比

老外的在线demo 演示

step 1
step 2

HLL源码分析

依赖引用

<dependency>
            <groupId>net.agkn</groupId>
            <artifactId>hll</artifactId>
            <version>1.6.0</version>
</dependency>

github
https://github.com/aggregateknowledge/java-hll

依赖类图

依赖类图
package net.agkn.hll;
...
public class HLL implements Cloneable {

翻译

一种概率性的集合,集合的元素为 long 型的hash值,这种数据结构(算法适用于使用相对很小的存储来近似地计算数据流的基数。
根据 HLL论文实现的改近版本,HLL的数据结构和算法都被采用了,并结合概率性和非概率性的技术来改进预测的精度和原始算法的存储需求(空间复杂度低)
更具体地说

  1. HLL初始化时,会分配一个EMPTY的空集合
  2. 接着加入几个值之后会升级为EXPLICIT的HashSet
  3. 为了减少内存的占用,会牺牲精度:EXPLICIT类型会升级为基于映射的SPARSE(稀疏)
  4. 当更多的桶(register)被占用时,基于映射的HLL会转换成基于位封装的结构(FULL)
/**
   ...
    public HLL(final int log2m, final int regwidth, final int expthresh, final boolean sparseon, final HLLType type) 
翻译
  1. @param log2m 桶数的log-base-2 比如桶数为64, 则log2m=6,该参数为4到30之间
  2. @param regwidth 每个桶的位数,该参数在1到8之间,即最大为一个字节
  3. @param expthresh 当EXPLICIT提升为SPARSE的阈值,取值范围为1-18
expthresh 含义
-1 Promote at whatever cutoff makes sense for optimal memory usage.
0 Skip EXPLICIT representation in hierarchy
1-18 Promote at 2expthresh - 1 cardinality

表格中的参数含义不直接翻译,直接看代码

 if(expthresh == -1) {
//Promote at whatever cutoff makes sense for optimal memory usage. 
            this.explicitAuto = true;
            this.explicitOff = false;

            // NOTE:  This math matches the size calculation in the PostgreSQL impl.
 所有桶点的字节数
            final long fullRepresentationSize = (this.regwidth * (long)this.m + 7/*round up to next whole byte*/)/Byte.SIZE;
//所有桶占用的long 类型数组length
            final int numLongs = (int)(fullRepresentationSize / 8/*integer division to round down*/);
        
            if(numLongs > MAXIMUM_EXPLICIT_THRESHOLD) {
                this.explicitThreshold = MAXIMUM_EXPLICIT_THRESHOLD;
            } else {
                this.explicitThreshold = numLongs;
            }
        } else if(expthresh == 0) {
// Skip `EXPLICIT` representation in hierarchy
            this.explicitAuto = false;
            this.explicitOff = true;
            this.explicitThreshold = 0;
        } else if((expthresh > 0) && (expthresh <= MAXIMUM_EXPTHRESH_PARAM)){
            this.explicitAuto = false;
            this.explicitOff = false;
//Promote at pow(2,expthresh - 1) cardinality
            this.explicitThreshold = (1 << (expthresh - 1));
        } else {
            throw new IllegalArgumentException("'expthresh' must be at least " + MINIMUM_EXPTHRESH_PARAM + " and at most " + MAXIMUM_EXPTHRESH_PARAM + " (was: " + expthresh + ")");
        }
  1. @param sparseon 标识SPARSE是否被使用,false则skip SPARSE
this.sparseOff = !sparseon;
        if(this.sparseOff) {
            this.sparseThreshold = 0;
        }
  1. @param type 在提升层次关系中的起始类型

不同存储介质

private void initializeStorage(final HLLType type) {
        this.type = type;
        switch(type) {
            case EMPTY:
                // nothing to be done
                break;
            case EXPLICIT:
//精确存储,基于long 的hashset
                this.explicitStorage = new LongOpenHashSet();
                break;
            case SPARSE:
//稀疏类型,因为数据量不是很大时,大部分的桶为空,为节省空间,则只保存有值的桶
                this.sparseProbabilisticStorage = new Int2ByteOpenHashMap();
                break;
            case FULL:
//当大部分的桶有值时,用位向量来保存所有的桶的内容,以节省空间
                this.probabilisticStorage = new BitVector(regwidth, m);
                break;
            default:
                throw new RuntimeException("Unsupported HLL type " + type);
        }
    }

ADD

根据上文类的注释不难理解ADD会提升存储类型

**
     * Adds <code>rawValue</code> directly to the HLL.
     * rawValue 必须被hash,官方建议用google guave的Murmur3_128HashFunction 
     * @param  rawValue the value to be added. It is very important that this
     *         value <em>already be hashed</em> with a strong (but not
     *         necessarily cryptographic) hash function. For instance, the
     *         Murmur3 implementation in
     *         <a href="http://guava-libraries.googlecode.com/git/guava/src/com/google/common/hash/Murmur3_128HashFunction.java">
     *         Google's Guava</a> library is an excellent hash function for this
     *         purpose and, for seeds greater than zero, matches the output
     *         of the hash provided in the PostgreSQL implementation.
     */
    public void addRaw(final long rawValue) {
        switch(type) {
            case EMPTY: {
                // NOTE:  EMPTY type is always promoted on #addRaw()
//explicitThreshold==0 时则skip EXPLICT
                if(explicitThreshold > 0) {
                    initializeStorage(HLLType.EXPLICIT);
                    explicitStorage.add(rawValue);
//sparseOff==true 时,则skip SPARSE
                } else if(!sparseOff) {
                    initializeStorage(HLLType.SPARSE);
                    addRawSparseProbabilistic(rawValue);
                } else {
//都skip则初始化为FULL
                    initializeStorage(HLLType.FULL);
                    addRawProbabilistic(rawValue);
                }
                return;
            }
            case EXPLICIT: {
                explicitStorage.add(rawValue);

                // promotion, if necessary
                if(explicitStorage.size() > explicitThreshold) {
                    if(!sparseOff) {
                        initializeStorage(HLLType.SPARSE);
                        for(final long value : explicitStorage) {
                            addRawSparseProbabilistic(value);
                        }
                    } else {
                        initializeStorage(HLLType.FULL);
                        for(final long value : explicitStorage) {
                            addRawProbabilistic(value);
                        }
                    }
                    explicitStorage = null;
                }
                return;
            }
            case SPARSE: {
                addRawSparseProbabilistic(rawValue);

                // promotion, if necessary
                if(sparseProbabilisticStorage.size() > sparseThreshold) {
                    initializeStorage(HLLType.FULL);
                    for(final int registerIndex : sparseProbabilisticStorage.keySet()) {
                        final byte registerValue = sparseProbabilisticStorage.get(registerIndex);
                        probabilisticStorage.setMaxRegister(registerIndex, registerValue);
                    }
                    sparseProbabilisticStorage = null;
                }
                return;
            }
            case FULL:
                addRawProbabilistic(rawValue);
                return;
            default:
                throw new RuntimeException("Unsupported HLL type " + type);
        }
    }

EXPLICIT LongOpenHashSet

该类型为long 类型的hash set
public boolean add(long k) {
        int pos;
//根据 hash值找到位置
        for(pos = (int)HashCommon.murmurHash3(k ^ (long)this.mask) & this.mask; this.used[pos]; pos = pos + 1 & this.mask) {
//如果已经存在则退出
            if (this.key[pos] == k) {
                return false;
            }
        }

        this.used[pos] = true;
        this.key[pos] = k;
//如果超过最大值 Math.min((int)Math.ceil((double)((float)n * f)), n - 1);,则重hash
        if (this.size++ >= this.maxFill) {
            this.rehash(HashCommon.arraySize(this.size + 1, this.f));
        }

        return true;
    }

SPARSE 稀疏概率预测存储

/**
     * Adds the raw value to the {@link #sparseProbabilisticStorage}.
     * {@link #type} must be {@link HLLType#SPARSE}.
     *
     * @param rawValue the raw value to add to the sparse storage.
     */
    private void addRawSparseProbabilistic(final long rawValue) {
        // p(w): position of the least significant set bit (one-indexed)
        // By contract: p(w) <= 2^(registerValueInBits) - 1 (the max register value)
        //
        // By construction of pwMaxMask (see #Constructor()),
        //      lsb(pwMaxMask) = 2^(registerValueInBits) - 2,
        // thus lsb(any_long | pwMaxMask) <= 2^(registerValueInBits) - 2,
        // thus 1 + lsb(any_long | pwMaxMask) <= 2^(registerValueInBits) -1.
//hash code中移掉桶的标志位,剩下的value标志位
        final long substreamValue = (rawValue >>> log2m);
        final byte p_w;

        if(substreamValue == 0L) {
            // The paper does not cover p(0x0), so the special value 0 is used.
            // 0 is the original initialization value of the registers, so by
            // doing this the multiset simply ignores it. This is acceptable
            // because the probability is 1/(2^(2^registerSizeInBits)).
            p_w = 0;
        } else {
// 取value 标志位的最低小有效位
            p_w = (byte)(1 + BitUtil.leastSignificantBit(substreamValue| pwMaxMask));
        }

        // Short-circuit if the register is being set to zero, since algorithmically
        // this corresponds to an "unset" register, and "unset" registers aren't
        // stored to save memory. (The very reason this sparse implementation
        // exists.) If a register is set to zero it will break the #algorithmCardinality
        // code.
        if(p_w == 0) {
            return;
        }

        // NOTE:  no +1 as in paper since 0-based indexing
//取桶的索引
        final int j = (int)(rawValue & mBitsMask);
//当前桶中的值
        final byte currentValue = sparseProbabilisticStorage.get(j);
//当前值比桶中的值大,则替换
        if(p_w > currentValue) {
            sparseProbabilisticStorage.put(j, p_w);
        }
    }

FULL BitVector

插入逻辑同上

/**
     * Adds the raw value to the {@link #probabilisticStorage}.
     * {@link #type} must be {@link HLLType#FULL}.
     *
     * @param rawValue the raw value to add to the full probabilistic storage.
     */
    private void addRawProbabilistic(final long rawValue) {
        // p(w): position of the least significant set bit (one-indexed)
        // By contract: p(w) <= 2^(registerValueInBits) - 1 (the max register value)
        //
        // By construction of pwMaxMask (see #Constructor()),
        //      lsb(pwMaxMask) = 2^(registerValueInBits) - 2,
        // thus lsb(any_long | pwMaxMask) <= 2^(registerValueInBits) - 2,
        // thus 1 + lsb(any_long | pwMaxMask) <= 2^(registerValueInBits) -1.
        final long substreamValue = (rawValue >>> log2m);
        final byte p_w;

        if (substreamValue == 0L) {
            // The paper does not cover p(0x0), so the special value 0 is used.
            // 0 is the original initialization value of the registers, so by
            // doing this the multiset simply ignores it. This is acceptable
            // because the probability is 1/(2^(2^registerSizeInBits)).
            p_w = 0;
        } else {
            p_w = (byte)(1 + BitUtil.leastSignificantBit(substreamValue| pwMaxMask));
        }

        // Short-circuit if the register is being set to zero, since algorithmically
        // this corresponds to an "unset" register, and "unset" registers aren't
        // stored to save memory. (The very reason this sparse implementation
        // exists.) If a register is set to zero it will break the #algorithmCardinality
        // code.
        if(p_w == 0) {
            return;
        }

        // NOTE:  no +1 as in paper since 0-based indexing
        final int j = (int)(rawValue & mBitsMask);

        probabilisticStorage.setMaxRegister(j, p_w);
    }

基数计算

// Cardinality
    /**
     * Computes the cardinality of the HLL.
     *
     * @return the cardinality of HLL. This will never be negative.
     */
    public long cardinality() {
        switch(type) {
            case EMPTY:
                return 0/*by definition*/;
            case EXPLICIT:
                return explicitStorage.size();
            case SPARSE:
                return (long)Math.ceil(sparseProbabilisticAlgorithmCardinality());
            case FULL:
                return (long)Math.ceil(fullProbabilisticAlgorithmCardinality());
            default:
                throw new RuntimeException("Unsupported HLL type " + type);
        }
    }

Sparse Probabilistic Algorithm Cardinality

稀疏和FULL两种情况计算公式一致

    /**
     * Computes the exact cardinality value returned by the HLL algorithm when
     * represented as a {@link HLLType#SPARSE} HLL. Kept
     * separate from {@link #cardinality()} for testing purposes. {@link #type}
     * must be {@link HLLType#SPARSE}.
     *
     * @return the exact, unrounded cardinality given by the HLL algorithm
     */
    /*package, for testing*/ double sparseProbabilisticAlgorithmCardinality() {
        final int m = this.m/*for performance*/;

        // compute the "indicator function" -- sum(2^(-M[j])) where M[j] is the
        // 'j'th register value
计算sum(2^(-M[j])),基中M[j]为第j个桶的值
        double sum = 0;
        int numberOfZeroes = 0/*"V" in the paper*/;
        for(int j=0; j<m; j++) {
            final long register = sparseProbabilisticStorage.get(j);

            sum += 1.0 / (1L << register);
            if(register == 0L) numberOfZeroes++;
        }

        // apply the estimate and correction to the indicator function
//alphaMSquared 为alphaMSquared方法中计算的值,有兴趣的小伙伴可以对照公上公司,这里计算公式与上边公式描述一致。
        final double estimator = alphaMSquared / sum;
        if((numberOfZeroes != 0) && (estimator < smallEstimatorCutoff)) {
            return HLLUtil.smallEstimator(m, numberOfZeroes);
        } else if(estimator <= largeEstimatorCutoff) {
            return estimator;
        } else {
            return HLLUtil.largeEstimator(log2m, regwidth, estimator);
        }
    }

alpha-m-square 为常量,即上文公式中提到的常量

/**
     * Computes the 'alpha-m-squared' constant used by the HyperLogLog algorithm.
     *
     * @param  m this must be a power of two, cannot be less than
     *         16 (2<sup>4</sup>), and cannot be greater than 65536 (2<sup>16</sup>).
     * @return gamma times <code>registerCount</code> squared where gamma is
     *         based on the value of <code>registerCount</code>.
     * @throws IllegalArgumentException if <code>registerCount</code> is less
     *         than 16.
     */
    public static double alphaMSquared(final int m) {
        switch(m) {
            case 1/*2^0*/:
            case 2/*2^1*/:
            case 4/*2^2*/:
            case 8/*2^3*/:
                throw new IllegalArgumentException("'m' cannot be less than 16 (" + m + " < 16).");

            case 16/*2^4*/:
                return 0.673 * m * m;

            case 32/*2^5*/:
                return 0.697 * m * m;

            case 64/*2^6*/:
                return 0.709 * m * m;

            default/*>2^6*/:
                return (0.7213 / (1.0 + 1.079 / m)) * m * m;
        }
    }

参考资料

https://www.jianshu.com/p/55defda6dcd2
https://github.com/aggregateknowledge/java-hll

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容