ActiveMQ源码解析(三)Failover机制

在看完会话的源码后,最近项目提到了Failover机制的疑问,所以需要先读一下failover机制的源码,看看这个故障重连的机制是怎样的。
在建立连接的那篇文章中(见http://www.jianshu.com/p/d41f32ca22a5),讲到了FactoryFinder会根据schema找到对应的TransportFactory类,这个配置其实是放在transport目录中的。

private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");

这种配置方法很值得学习。只需要写一个工厂类继承自TransportFactory,然后在"META-INF/services/org/apache/activemq/transport/"目录下放个配置文件,指向该工厂类就OK了。使用配置的方式新增,而不是在TansportFactory代码中去添加,更好的遵守了OCP原则。

Anyway,我们现在找到了FailoverTransportFactory,下一步是调用doConnect建立连接

    public Transport doConnect(URI location) throws IOException {
        try {
            //主要看这个
            Transport transport = createTransport(URISupport.parseComposite(location));
            //与Tcptransport类似,加上辅助功能
            transport = new MutexTransport(transport);
            transport = new ResponseCorrelator(transport);
            return transport;
        } catch (URISyntaxException e) {
            throw new IOException("Invalid location: " + location);
        }
    }

    public Transport createTransport(CompositeData compositData) throws IOException {
        Map<String, String> options = compositData.getParameters();
        FailoverTransport transport = createTransport(options);
        if (!options.isEmpty()) {
            throw new IllegalArgumentException("Invalid connect parameters: " + options);
        }
        transport.add(false,compositData.getComponents());
        return transport;
    }

    public FailoverTransport createTransport(Map<String, String> parameters) throws IOException {
        FailoverTransport transport = new FailoverTransport();
        Map<String, Object> nestedExtraQueryOptions = IntrospectionSupport.extractProperties(parameters, "nested.");
        IntrospectionSupport.setProperties(transport, parameters);
        try {
            transport.setNestedExtraQueryOptions(URISupport.createQueryString(nestedExtraQueryOptions));
        } catch (URISyntaxException e) {
        }
        return transport;
    }

调用了FailoverTransport的构造函数,建立了一个FailoverTransport,可以看到在FailoverTransport的方法里建立了一个名为reconnectTask的TaskRunner,跑的Task里的iterate()方法是重复执行的。具体机制见源码解析(二)。

public FailoverTransport() {
        brokerSslContext = SslContext.getCurrentSslContext();
        stateTracker.setTrackTransactions(true);
        // Setup a task that is used to reconnect the a connection async.
        reconnectTaskFactory = new TaskRunnerFactory();
        reconnectTaskFactory.init();
        reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
            @Override
            public boolean iterate() {
                boolean result = false;
                //FailoverTransport被启动才执行该线程
                if (!started) {
                    return result;
                }
                boolean buildBackup = true;
                synchronized (backupMutex) {
                    //若连接未建立||需要重置集群负载||优先连接的服务器可用且该transport未被stop,则执行
                    if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
                        result = doReconnect();
                        buildBackup = false;
                    }
                }
                //是否启用备用连接
                if (buildBackup) {
                    buildBackups();
                    if (priorityBackup && !connectedToPriority) {
                        try {
                            doDelay();
                            if (reconnectTask == null) {
                                return true;
                            }
                            reconnectTask.wakeup();
                        } catch (InterruptedException e) {
                            LOG.debug("Reconnect task has been interrupted.", e);
                        }
                    }
                } else {
                    // build backups on the next iteration
                    buildBackup = true;
                    try {
                        if (reconnectTask == null) {
                            return true;
                        }
                        reconnectTask.wakeup();
                    } catch (InterruptedException e) {
                        LOG.debug("Reconnect task has been interrupted.", e);
                    }
                }
                return result;
            }

        }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
    }

这个reconnectTask的线程会在FailoverTransport调用start()方法的时候开始启用,连接建立后变成wait,且在连接被异常断开时被唤醒。

可用看到上面的代码中最重要的是调用doReconnect()方法。这个方法主要有以下几个部分:
1. 第一部分是用来处理配置形式uris的,如果有配置,优先读取配置的uri,并添加到重连uris中本身不做重连。
2. 第二部分是做负载重连的,根据重连uris如果第一个已经对应的Transport在工作了则无需重连直接返回,否则去掉当前工作的Transport达到负载的目的。
3. 第三部分是关于Transport的备份机制的,如果设置了备份机制,且有Transport已经备份则取出该备份Transport返回。这里也没有设置备份。
可以看到前三部分都没有进行实际的重连工作,第四部分才是在上述三部分都不存在的情况下,进行实际的重连工作。

4. 第四部分才是重连工作,根据uri找到对应的Transport,对该Transport设置独有的TransportListener并启动。然后设置到FailoverTransport的connectedTransport作为当前连接的Transport。

下面把方法中主要的第四部分摘录出来

         Iterator<URI> iter = connectList.iterator();
         while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
             try {
                  // SSL先忽略
                  SslContext.setCurrentSslContext(brokerSslContext);

                  // We could be starting with a backup and if so we wait to grab a
                  // URI from the pool until next time around.
                  // 没建立过连接就先建立一个
                  if (transport == null) {
                      uri = addExtraQueryOptions(iter.next());
                      transport = TransportFactory.compositeConnect(uri);
                  }
                  LOG.debug("Attempting {}th connect to: {}", connectFailures, uri);

                  // 给transport增加一个transportListener,listener可以判断连接是否异常,异常时也唤醒reconnectTask
                  transport.setTransportListener(createTransportListener(transport));
                  // 如果transport的start方法调用没问题,则表示连接建立成功
                  transport.start();

                  // 如果不是第一次建立连接,则需要通知服务器是一个重连的客户端
                  if (started && !firstConnection) {
                      restoreTransport(transport);
                  }

                  LOG.debug("Connection established");
                  // 为下一次的重连进行配置
                  reconnectDelay = initialReconnectDelay;
                  connectedTransportURI = uri;
                  connectedTransport.set(transport);
                  connectedToPriority = isPriority(connectedTransportURI);
                  reconnectMutex.notifyAll(
                  connectFailures = 0;

                  // Make sure on initial startup, that the transportListener
                  // has been initialized for this instance.
                  // 确保为这个连接加上listener
                  synchronized (listenerMutex) {
                      if (transportListener == null) {
                            try {
                                 // if it isn't set after 2secs - it probably never will be
                                 listenerMutex.wait(2000);
                             } catch (InterruptedException ex) {
                             }
                       }
                  }
                  if (firstConnection) {
                     firstConnection = false;
                     LOG.info("Successfully connected to {}", uri);
                  } else {
                     LOG.info("Successfully reconnected to {}", uri);
                  }
                  return false;
           } catch (Exception e) {
               failure = e;
               LOG.debug("Connect fail to: {}, reason: {}", uri, e);
               if (transport != null) {
                  try {
                      transport.stop();
                      transport = null;
                  } catch (Exception ee) {
                       LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee);
                  }
              }
         }

可以看到transport是通过这个语句建立的

transport = TransportFactory.compositeConnect(uri);

需要注意的是这个时候传入的uri是不带failover的,因此如果设置的是形如failover:(tcp://localhost:61616)这样的连接配置,此时uri应该是tcp://localhost:61616。所以返回的transport是tcpTransport。

看着transportListener很重要的样子,那这个transportListener到底起什么作用呢。

    private TransportListener createTransportListener(final Transport owner) {
        return new TransportListener() {
        ……
            @Override
            // 最主要的overide,当异常时调用handleTransportFailure方法
            public void onException(IOException error) {
                try {
                    handleTransportFailure(owner, error);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    if (transportListener != null) {
                        transportListener.onException(new InterruptedIOException());
                    }
                }
            }
        ……
        };
    }

可以看到transportListener就是一个监控的线程,当发现transport异常时,会调用handleTransportFailure进行故障处理:

    public final void handleTransportFailure(Transport failed, IOException e) throws InterruptedException {
        // 如果异常是由于连接正在关闭引起的,就不用管它
        if (shuttingDown) {
            // shutdown info sent and remote socket closed and we see that before a local close
            // let the close do the work
            return;
        }

        if (LOG.isTraceEnabled()) {
            LOG.trace(this + " handleTransportFailure: " + e, e);
        }
        // 重置transport为空
        // could be blocked in write with the reconnectMutex held, but still needs to be whacked
        Transport transport = null;

        if (connectedTransport.compareAndSet(failed, null)) {
            transport = failed;
            if (transport != null) {
                disposeTransport(transport);
            }
        }

        synchronized (reconnectMutex) {
            if (transport != null && connectedTransport.get() == null) {
                boolean reconnectOk = false;

                // 如果transport是started状态且重连次数未达上限,则表示可以开始尝试重连
                if (canReconnect()) {
                    reconnectOk = true;
                }

                LOG.warn("Transport ({}) failed {} attempting to automatically reconnect: {}",
                         connectedTransportURI, (reconnectOk ? "," : ", not"), e);

                failedConnectTransportURI = connectedTransportURI;
                connectedTransportURI = null;
                connectedToPriority = false;

                if (reconnectOk) {
                    // notify before any reconnect attempt so ack state can be whacked
                    if (transportListener != null) {
                        transportListener.transportInterupted();
                    }

                    updated.remove(failedConnectTransportURI);
                    // 唤醒reconnectTask
                    reconnectTask.wakeup();
                } else if (!isDisposed()) {
                    propagateFailureToExceptionListener(e);
                }
            }
        }
    }

嗯,其实主要就是唤醒reconnectTask开始重连。。。

总结:FailoverTransport其实就是封装了一个reconnectTask,这个Task会在建立连接时启动,在连接异常时也会被唤醒,其主要就是对尝试建立连接的封装。

接下来还有个问题:
Transport什么时候会抛异常?因为环境中出现了部分异常客户端的连接处于CLOSE_WAIT的状态。且客户端没有再进行重连的尝试,其机制是怎样的呢?

带着这个疑问,后续再研究。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,594评论 18 139
  • 个人自行阅读时候,翻译的文档。因为比较渣,如果有更合理或者错误的地方烦劳告知,我会做修改。Oracle Data ...
    窝窝的小黑屋阅读 1,201评论 0 3
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,418评论 25 707
  • 老妈今天生日。我刚好今天一大早赶早班机去丽江出差。等到达酒店一切安顿好之后,已经差不多是中午十二点。拿起手机给老妈...
    酒墨阅读 294评论 5 2