Verticle

背景

  • verticle相当于1个执行模块,是vertx的部署单元。
  • vertx可以部署多个verticle,且verticle之间可以互相通信。
  • 因为vertx主要是个网络框架,所以在verticle中通常会启动server,如http或tcp。

Verticle部署源码分析

@Override
public void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
    if (options.isHa() && haManager() != null && haManager().isEnabled()) {
        haManager().deployVerticle(name, options, completionHandler);
    } else {
        deploymentManager.deployVerticle(name, options, completionHandler);
    }
}

deploymentManager.deployVerticle代码如下:

public void deployVerticle(String identifier,
                           DeploymentOptions options,
                           Handler<AsyncResult<String>> completionHandler) {
    ContextImpl callingContext = vertx.getOrCreateContext();
    ClassLoader cl = getClassLoader(options, callingContext);
    /**
     * identifier为verticle类的全限定名
     * callingContext为eventloop context
     */
    doDeployVerticle(identifier, generateDeploymentID(), options, callingContext, callingContext, cl, completionHandler);
}

首先是创建context,ContextImpl callingContext = vertx.getOrCreateContext(),这块代码如下:

public ContextImpl getOrCreateContext() {
    ContextImpl ctx = getContext();
    if (ctx == null) {
        // We are running embedded - Create a context
        ctx = createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader());
    }
    return ctx;
}

public ContextImpl getContext() {
    ContextImpl context = (ContextImpl) context();
    if (context != null && context.owner == this) {
        return context;
    }
    return null;
}

public static Context context() {
    Thread current = Thread.currentThread();
    if (current instanceof VertxThread) {
        return ((VertxThread) current).getContext();
    }
    return null;
}

public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) {
    return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl);
}

最终会调到创建EventLoopContext的方法,详细见context部分分析,这里得到context后,接下来分析doDeployVerticle方法,如下:

private void doDeployVerticle(Iterator<VerticleFactory> iter,
                              Throwable prevErr,
                              String identifier,
                              String deploymentID,
                              DeploymentOptions options,
                              ContextImpl parentContext,
                              ContextImpl callingContext,
                              ClassLoader cl,
                              Handler<AsyncResult<String>> completionHandler) {
    if (iter.hasNext()) {
        VerticleFactory verticleFactory = iter.next();
        Future<String> fut = Future.future();
        if (verticleFactory.requiresResolve()) {
            try {
                verticleFactory.resolve(identifier, options, cl, fut);
            } catch (Exception e) {
                try {
                    fut.fail(e);
                } catch (Exception ignore) {
                    // Too late
                }
            }
        } else {
            fut.complete(identifier);
        }
        fut.setHandler(ar -> {
            Throwable err;
            if (ar.succeeded()) {
                /**
                 * resolvedName为verticle类全限定名
                 */
                String resolvedName = ar.result();
                if (!resolvedName.equals(identifier)) {
                    deployVerticle(resolvedName, options, completionHandler);
                    return;
                } else {
                    if (verticleFactory.blockingCreate()) {
                        vertx.<Verticle[]>executeBlocking(createFut -> {
                            try {
                                Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
                                createFut.complete(verticles);
                            } catch (Exception e) {
                                createFut.fail(e);
                            }
                        }, res -> {
                            if (res.succeeded()) {
                                doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, res.result());
                            } else {
                                // Try the next one
                                doDeployVerticle(iter, res.cause(), identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler);
                            }
                        });
                        return;
                    } else {
                        try {
                            Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
                            doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, verticles);
                            return;
                        } catch (Exception e) {
                            err = e;
                        }
                    }
                }
            } else {
                err = ar.cause();
            }
            // Try the next one
            doDeployVerticle(iter, err, identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler);
        });
    } else {
        if (prevErr != null) {
            // Report failure if there are no more factories to try otherwise try the next one
            reportFailure(prevErr, callingContext, completionHandler);
        } else {
            // not handled or impossible ?
        }
    }
}

核心代码如下:

    Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
    doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, verticles);

createVerticles方法如下,创建verticle就是根据其类的全限定名创建对象:

private Verticle[] createVerticles(VerticleFactory verticleFactory, String identifier, int instances, ClassLoader cl) throws Exception {
    Verticle[] verticles = new Verticle[instances];
    for (int i = 0; i < instances; i++) {
        verticles[i] = verticleFactory.createVerticle(identifier, cl);
        if (verticles[i] == null) {
            throw new NullPointerException("VerticleFactory::createVerticle returned null");
        }
    }
    return verticles;
}

doDeploy方法如下:

    private void doDeploy(String identifier, String deploymentID, DeploymentOptions options,
                      ContextImpl parentContext,
                      ContextImpl callingContext,
                      Handler<AsyncResult<String>> completionHandler,
                      ClassLoader tccl, Verticle... verticles) {
    if (options.isMultiThreaded() && !options.isWorker()) {
        throw new IllegalArgumentException("If multi-threaded then must be worker too");
    }
    JsonObject conf = options.getConfig() == null ? new JsonObject() : options.getConfig().copy(); // Copy it
    String poolName = options.getWorkerPoolName();

    Deployment parent = parentContext.getDeployment();
    DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options);

    AtomicInteger deployCount = new AtomicInteger();
    AtomicBoolean failureReported = new AtomicBoolean();
    for (Verticle verticle : verticles) {
        WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) : null;
        WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
        /**
         * parentContext为部署verticle的context,这里又创建了新的context,用于启动verticle。
         * 根据verticle类型,这里分为3种context,EventLoopContext、WorkerContext和MultiThreadedWorkerContext。
         */
        ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
                vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
        if (workerExec != null) {
            context.addCloseHook(workerExec);
        }
        context.setDeployment(deployment);
        deployment.addVerticle(new VerticleHolder(verticle, context));
        /**
         * 对于EventLoopContext,在context的eventloop中执行
         * 对于WorkerContext,在worker线程顺序执行器中执行
         * 对于MultiThreadedWorkerContext,在worker线程中执行
         */
        context.runOnContext(v -> {
            try {
                verticle.init(vertx, context);
                Future<Void> startFuture = Future.future();
                verticle.start(startFuture);
                startFuture.setHandler(ar -> {
                    if (ar.succeeded()) {
                        if (parent != null) {
                            parent.addChild(deployment);
                            deployment.child = true;
                        }
                        vertx.metricsSPI().verticleDeployed(verticle);
                        deployments.put(deploymentID, deployment);
                        if (deployCount.incrementAndGet() == verticles.length) {
                            reportSuccess(deploymentID, callingContext, completionHandler);
                        }
                    } else if (!failureReported.get()) {
                        reportFailure(ar.cause(), callingContext, completionHandler);
                    }
                });
            } catch (Throwable t) {
                reportFailure(t, callingContext, completionHandler);
            }
        });
    }
}

doDeploy核心代码主如下:
首先创建上下文:

ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
                vertx.createEventLoopContext(deploymentID, pool, conf, tccl);

parentContext为部署verticle的context,这里又创建了新的context,用于启动verticle。
根据verticle类型,这里分为3种context,EventLoopContext、WorkerContext和MultiThreadedWorkerContext。

然后启动verticle,调用其init和start方法,但执行线程会因为verticle的不同模式而有所不同:

  • 对于EventLoopContext,在context的eventloop中执行
  • 对于WorkerContext,在worker线程顺序执行器中执行
  • 对于MultiThreadedWorkerContext,在worker线程中执行

下面看下context.runOnContext方法,这里传入的回调是执行verticle的启动和初始化,除了这些,看还做了什么,以standard模式verticle为例,该context为EventLoopContext,runOnContext方法如下:

  //EventLoopContext
  public void executeAsync(Handler<Void> task) {  
    nettyEventLoop().execute(wrapTask(null, task, true, null));
  }

nettyEventLoop()得到的是该context绑定的event loop,task回调参数即为verticle启动和初始化的回调,看看wrapTask都做了啥:

  protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread, PoolMetrics metrics) {
        Object metric = metrics != null ? metrics.submitted() : null;
        return () -> {
            Thread th = Thread.currentThread();
            if (!(th instanceof VertxThread)) {
                throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + contextThread + " got " + th);
            }
            VertxThread current = (VertxThread) th;
            if (THREAD_CHECKS && checkThread) {
                if (contextThread == null) {
                    contextThread = current;
                } else if (contextThread != current && !contextThread.isWorker()) {
                    throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + contextThread + " got " + current);
                }
            }
            if (metrics != null) {
                metrics.begin(metric);
            }
            if (!DISABLE_TIMINGS) { 
                //记录开始时间
                current.executeStart();
            }
            try {
                //为当前线程current设置为当前上下文ContextImpl.this
                setContext(current, ContextImpl.this);
                if (cTask != null) {
                    cTask.run();
                } else {
                    //执行启动和初始化verticle的回调
                    hTask.handle(null);
                }
                if (metrics != null) {
                    metrics.end(metric, true);
                }
            } catch (Throwable t) {
                log.error("Unhandled exception", t);
                Handler<Throwable> handler = this.exceptionHandler;
                if (handler == null) {
                    handler = owner.exceptionHandler();
                }
                if (handler != null) {
                    handler.handle(t);
                }
                if (metrics != null) {
                    metrics.end(metric, false);
                }
            } finally {
                // We don't unset the context after execution - this is done later when the context is closed via
                // VertxThreadFactory
                if (!DISABLE_TIMINGS) {
                    //清楚开始时间
                    current.executeEnd();
                }
            }
        };
    }

这段代码主要有4处核心调用:

  • current.executeStart():记录线程执行开始时间,为线程阻塞检查器提供线程开始时间信息。
//VertxThread
public final void executeStart() {  
  execStart = System.nanoTime();
}
  • setContext(current, ContextImpl.this):设置线程上下文
//ContextImpl
private static void setContext(VertxThread thread, ContextImpl context) {  
      thread.setContext(context);  
      if (!DISABLE_TCCL) {    
            if (context != null) {     
                   context.setTCCL();   
            } else {      
                   Thread.currentThread().setContextClassLoader(null);   
            }  
      }
}
//VertxThread
void setContext(ContextImpl context) {  
    this.context = context;
}
  • hTask.handle(null):执行verticle启动和初始化回调
  • current.executeEnd():清楚开始时间
//VertxThread
public final void executeEnd() {  
  execStart = 0;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容