聊聊rsocket load balancer的Ewma

本文主要研究一下rsocket load balancer的Ewma

Moving Average

SMA

SMA(Simple Moving Average),即简单移动平均,其公式如下:

SMAt = (Pt + Pt-1 + Pt-2 + Pt-3 + ... + Pt-n+1) / n

这里的Pt到为Pt-n+1为最近的n个数据

WMA

WMA(Weighted Moving Average),即加权移动平均,其公式如下:

WMAt = (Pt * Wt) + (Pt-1 * Wt-1) + ... + (Pt-n+1 * Wt-n+1)

WMA会给最近的n个数据加上权重,其中这些权重加起来和为1,一般是较近的数据权重比较大

EMA或EWMA

EMA(Exponentially Moving Average)指数移动平均或EWMA(Exponentially Weighted Moving Average)指数加权移动平均,其公式如下:

EMAt = (Pt * S) + (1- S) * EMAt-1

它有一个S参数为平滑指数,一般是取2/(N+1)

Ewma

rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/stat/Ewma.java

public class Ewma {
  private final long tau;
  private volatile long stamp;
  private volatile double ewma;

  public Ewma(long halfLife, TimeUnit unit, double initialValue) {
    this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
    stamp = 0L;
    ewma = initialValue;
  }

  public synchronized void insert(double x) {
    long now = Clock.now();
    double elapsed = Math.max(0, now - stamp);
    stamp = now;

    double w = Math.exp(-elapsed / tau);
    ewma = w * ewma + (1.0 - w) * x;
  }

  public synchronized void reset(double value) {
    stamp = 0L;
    ewma = value;
  }

  public double value() {
    return ewma;
  }

  @Override
  public String toString() {
    return "Ewma(value=" + ewma + ", age=" + (Clock.now() - stamp) + ")";
  }
}
  • Ewma的构造器需要指定halfLife、timeunit、initialValue(ewma初始值)参数;ewma = w * ewma + (1.0 - w) * x,其中x为当前值,w为权重
  • 权重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed为距离上次计算的时间长度;tau(希腊字母)为EWMA的时间常量
  • 这里的tau = halfLife / Math.log(2)根据timeunit转换后的值;其中halfLife参数,代表speed of convergence,即收敛的速度

RSocketSupplier

rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/client/filter/RSocketSupplier.java

public class RSocketSupplier implements Availability, Supplier<Mono<RSocket>>, Closeable {

  private static final double EPSILON = 1e-4;

  private Supplier<Mono<RSocket>> rSocketSupplier;

  private final MonoProcessor<Void> onClose;

  private final long tau;
  private long stamp;
  private final Ewma errorPercentage;

  public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier, long halfLife, TimeUnit unit) {
    this.rSocketSupplier = rSocketSupplier;
    this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
    this.stamp = Clock.now();
    this.errorPercentage = new Ewma(halfLife, unit, 1.0);
    this.onClose = MonoProcessor.create();
  }

  public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier) {
    this(rSocketSupplier, 5, TimeUnit.SECONDS);
  }

  @Override
  public double availability() {
    double e = errorPercentage.value();
    if (Clock.now() - stamp > tau) {
      // If the window is expired artificially increase the availability
      double a = Math.min(1.0, e + 0.5);
      errorPercentage.reset(a);
    }
    if (e < EPSILON) {
      e = 0.0;
    } else if (1.0 - EPSILON < e) {
      e = 1.0;
    }

    return e;
  }

  private synchronized void updateErrorPercentage(double value) {
    errorPercentage.insert(value);
    stamp = Clock.now();
  }

  @Override
  public Mono<RSocket> get() {
    return rSocketSupplier
        .get()
        .doOnNext(o -> updateErrorPercentage(1.0))
        .doOnError(t -> updateErrorPercentage(0.0))
        .map(AvailabilityAwareRSocketProxy::new);
  }

  @Override
  public void dispose() {
    onClose.onComplete();
  }

  @Override
  public boolean isDisposed() {
    return onClose.isDisposed();
  }

  @Override
  public Mono<Void> onClose() {
    return onClose;
  }

  private class AvailabilityAwareRSocketProxy extends RSocketProxy {
    public AvailabilityAwareRSocketProxy(RSocket source) {
      super(source);

      onClose.doFinally(signalType -> source.dispose()).subscribe();
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
      return source
          .fireAndForget(payload)
          .doOnError(t -> errorPercentage.insert(0.0))
          .doOnSuccess(v -> updateErrorPercentage(1.0));
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
      return source
          .requestResponse(payload)
          .doOnError(t -> errorPercentage.insert(0.0))
          .doOnSuccess(p -> updateErrorPercentage(1.0));
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
      return source
          .requestStream(payload)
          .doOnError(th -> errorPercentage.insert(0.0))
          .doOnComplete(() -> updateErrorPercentage(1.0));
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
      return source
          .requestChannel(payloads)
          .doOnError(th -> errorPercentage.insert(0.0))
          .doOnComplete(() -> updateErrorPercentage(1.0));
    }

    @Override
    public Mono<Void> metadataPush(Payload payload) {
      return source
          .metadataPush(payload)
          .doOnError(t -> errorPercentage.insert(0.0))
          .doOnSuccess(v -> updateErrorPercentage(1.0));
    }

    @Override
    public double availability() {
      // If the window is expired set success and failure to zero and return
      // the child availability
      if (Clock.now() - stamp > tau) {
        updateErrorPercentage(1.0);
      }
      return source.availability() * errorPercentage.value();
    }
  }
}
  • RSocketSupplier实现了Availability、Supplier、Closeable接口,其中它定义了errorPercentage变量,其类型为Ewma;如果没有指定halfLife值,则RSocketSupplier默认halfLife为5秒,ewma的初始值为1.0
  • RSocketSupplier定义了一个常量EPSILON = 1e-4,其availability方法会先计算availability,然后在距离上次计算时间stamp超过tau值时会重置errorPercentage;之后当availability小于EPSILON时返回0,当availability + EPSILON大于1时返回1.0
  • updateErrorPercentage方法用于往ewma插入新值,同时更新stamp;get方法的doOnNext方法updateErrorPercentage值为1.0,doOnError方法updateErrorPercentage值为0.0;map会将RSocket转换为AvailabilityAwareRSocketProxy;AvailabilityAwareRSocketProxy对目标RSocket进行代理,对相关方法的doOnError及doOnSuccess都织入errorPercentage的统计

小结

  • Moving Average有好几种算法,包括SMA(Simple Moving Average)、WMA(Weighted Moving Average)、EMA(Exponentially Moving Average)或EWMA(Exponentially Weighted Moving Average)
  • Ewma的构造器需要指定halfLife、timeunit、initialValue(ewma初始值)参数;ewma = w * ewma + (1.0 - w) * x,其中x为当前值,w为权重;权重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed为距离上次计算的时间长度;tau(希腊字母)为EWMA的时间常量;这里的tau = halfLife / Math.log(2)根据timeunit转换后的值;其中halfLife参数,代表speed of convergence,即收敛的速度
  • rsocket load balancer使用了Ewma了统计服务的availability;其中RSocketSupplier实现了Availability、Supplier、Closeable接口,其中它定义了errorPercentage变量,其类型为Ewma;如果没有指定halfLife值,则RSocketSupplier默认halfLife为5秒,ewma的初始值为1.0;RSocketSupplier的get方法会将RSocket转换为AvailabilityAwareRSocketProxy,而AvailabilityAwareRSocketProxy则会对目标RSocket进行代理,对相关方法的doOnError及doOnSuccess都织入errorPercentage的统计

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容