【第18篇】Netty对于异步操作与观察者模式

1、Future与ChannelFuture

  • Future未来(期望),Netty事件都是异步
  • Future的get方法是阻塞,Netty对JDK的Future进行异步的封装,如:判断状态isSuccess方法(成功或失败状态)改良点
  • ChannelFuture是Future的特化,维护了一个Channel对象
  • ChannelPromise(复合管道)是ChannelFuture的子类
  • V get() 读取数据
ChannelPromise复合管道
public interface Future<V> extends java.util.concurrent.Future<V> {
    
    boolean isSuccess();  //当且仅当i/o操作成功完成时,返回true
    boolean isCancellable(); //当且仅当可以通过cancel方法取消时,返回true
    Throwable cause();//如果i/o操作失败,返回其失败原因。如果成功完成或者还未完成,返回null
    //将指定的监听器添加到此Future,future完成时,会通知此监听器,如果添加监听器时future已经完成,则立即通知此监听器
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    //移除监听器,如果future已经完成,则不会触发
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    //等待此future done
   Future<V> sync() throws InterruptedException;
    //等待,不可中断
    Future<V> syncUninterruptibly();
    //等待Future完成
  Future<V> await() throws InterruptedException;
    //等待Future完成,不可中断
  Future<V> awaitUninterruptibly();
  //立即返回,如果此时future未完成,返回null
  V getNow();

    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

2、DefaultProgressivePromise

  • DefaultProgressivePromise类的awaitUninterruptiblysetSuccesssetFailure方法
   @Override//等待唤醒呼叫
    public ProgressivePromise<V> awaitUninterruptibly() {
        super.awaitUninterruptibly();
        return this;
    }

 @Override//设置监听成功状态
    public ProgressivePromise<V> setSuccess(V result) {
        super.setSuccess(result);
        return this;
    }
@Override @Override//设置监听失败状态
    public ProgressivePromise<V> setFailure(Throwable cause) {
        super.setFailure(cause);
        return this;
    }

3、DefaultPromise

  • DefaultPromise的awaitUninterruptibly方法
 @Override
    public Promise<V> awaitUninterruptibly() {
        if (isDone()) {
            return this;
        }
        //检测死锁问题
        checkDeadLock();
        boolean interrupted = false;
        synchronized (this) {
            while (!isDone()) {
                incWaiters();//调用等待,自身++
                try {
                    wait();//等待
                } catch (InterruptedException e) {
                    // 打断等待
                    interrupted = true;
                } finally {
                    //减轻,自身 --
                    decWaiters();
                }
            }
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
        return this;
    }
  • DefaultPromise的setSuccess方法
 public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            //成功后通知监听器
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
  • DefaultPromise的setFailure方法
   public Promise<V> setFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();//通知监听器
            return this;
        }
        throw new IllegalStateException("complete already: " + this, cause);
    }
  • DefaultPromise的notifyListeners方法,其实用到了观察者模式

GOF设计模式中有一种叫做观察者模式(Observer),属于行为型模式。又叫发布-订阅(Publish/Subscribe)模式、模型-视图 (Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个 主题对象在状态上发生变化时,会通知所有观察者对象,

private void notifyListeners() {
       //事件执行器
        EventExecutor executor = executor();
      //判断是否在事件循环里面
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();//从InternalThreadLocalMap 线程的值
        //异步监听堆栈的值
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    //立即监听通知
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }
        //安全执行监听通知
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }
  • DefaultPromise的notifyListeners0方法
  private void notifyListeners0(DefaultFutureListeners listeners) {
        GenericFutureListener<?>[] a = listeners.listeners();
        int size = listeners.size();
        for (int i = 0; i < size; i ++) {
            notifyListener0(this, a[i]);
        }
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }

3、JDK的Future

  • JDK所提供的Future,只能通过手工方式检查执行结果,而这个操作是阻塞的,Netty对ChannelFuture进行的增强,通过ChannelFutureListerner以回调的方式获取执行结果,去除了手工检查阻塞的操作。
  • 值得注意的是ChannelFutureListerner的operatonComplete方法是由I/O线程执行,因此要注意的是 不要在这里执行耗时操作,否则需要通过另外的线程或线程池进行执行

4、JDK的Void

  • jdk的Void类实例化对象的关键字的处理
public interface ChannelFuture extends Future<Void> 

5、sync 方法

  • 在server的绑定端口的sync是同步处理


    sync

6、回调函数

  • 回调函数当方法、函数执行完成之后进行回调获取

7、Future关系图

  • Future传统处理


    Future传统处理
  • Netty的Future处理
    Netty的Future对JDK的Future进行改良,加了Listeners监听器进行处理

//添加监听器
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
//添加监听器
Future<V> addListeners(GenericFutureListener... var1);
//移除监听器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
//移除监听器
Future<V> removeListeners(GenericFutureListener... var1);

Netty的Future处理
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容