在看完会话的源码后,最近项目提到了Failover机制的疑问,所以需要先读一下failover机制的源码,看看这个故障重连的机制是怎样的。
在建立连接的那篇文章中(见http://www.jianshu.com/p/d41f32ca22a5),讲到了FactoryFinder会根据schema找到对应的TransportFactory类,这个配置其实是放在transport目录中的。
private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
这种配置方法很值得学习。只需要写一个工厂类继承自TransportFactory,然后在"META-INF/services/org/apache/activemq/transport/"目录下放个配置文件,指向该工厂类就OK了。使用配置的方式新增,而不是在TansportFactory代码中去添加,更好的遵守了OCP原则。
Anyway,我们现在找到了FailoverTransportFactory,下一步是调用doConnect建立连接
public Transport doConnect(URI location) throws IOException {
try {
//主要看这个
Transport transport = createTransport(URISupport.parseComposite(location));
//与Tcptransport类似,加上辅助功能
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
} catch (URISyntaxException e) {
throw new IOException("Invalid location: " + location);
}
}
public Transport createTransport(CompositeData compositData) throws IOException {
Map<String, String> options = compositData.getParameters();
FailoverTransport transport = createTransport(options);
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
transport.add(false,compositData.getComponents());
return transport;
}
public FailoverTransport createTransport(Map<String, String> parameters) throws IOException {
FailoverTransport transport = new FailoverTransport();
Map<String, Object> nestedExtraQueryOptions = IntrospectionSupport.extractProperties(parameters, "nested.");
IntrospectionSupport.setProperties(transport, parameters);
try {
transport.setNestedExtraQueryOptions(URISupport.createQueryString(nestedExtraQueryOptions));
} catch (URISyntaxException e) {
}
return transport;
}
调用了FailoverTransport的构造函数,建立了一个FailoverTransport,可以看到在FailoverTransport的方法里建立了一个名为reconnectTask的TaskRunner,跑的Task里的iterate()方法是重复执行的。具体机制见源码解析(二)。
public FailoverTransport() {
brokerSslContext = SslContext.getCurrentSslContext();
stateTracker.setTrackTransactions(true);
// Setup a task that is used to reconnect the a connection async.
reconnectTaskFactory = new TaskRunnerFactory();
reconnectTaskFactory.init();
reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
@Override
public boolean iterate() {
boolean result = false;
//FailoverTransport被启动才执行该线程
if (!started) {
return result;
}
boolean buildBackup = true;
synchronized (backupMutex) {
//若连接未建立||需要重置集群负载||优先连接的服务器可用且该transport未被stop,则执行
if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
result = doReconnect();
buildBackup = false;
}
}
//是否启用备用连接
if (buildBackup) {
buildBackups();
if (priorityBackup && !connectedToPriority) {
try {
doDelay();
if (reconnectTask == null) {
return true;
}
reconnectTask.wakeup();
} catch (InterruptedException e) {
LOG.debug("Reconnect task has been interrupted.", e);
}
}
} else {
// build backups on the next iteration
buildBackup = true;
try {
if (reconnectTask == null) {
return true;
}
reconnectTask.wakeup();
} catch (InterruptedException e) {
LOG.debug("Reconnect task has been interrupted.", e);
}
}
return result;
}
}, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
}
这个reconnectTask的线程会在FailoverTransport调用start()方法的时候开始启用,连接建立后变成wait,且在连接被异常断开时被唤醒。
可用看到上面的代码中最重要的是调用doReconnect()方法。这个方法主要有以下几个部分:
1. 第一部分是用来处理配置形式uris的,如果有配置,优先读取配置的uri,并添加到重连uris中本身不做重连。
2. 第二部分是做负载重连的,根据重连uris如果第一个已经对应的Transport在工作了则无需重连直接返回,否则去掉当前工作的Transport达到负载的目的。
3. 第三部分是关于Transport的备份机制的,如果设置了备份机制,且有Transport已经备份则取出该备份Transport返回。这里也没有设置备份。
可以看到前三部分都没有进行实际的重连工作,第四部分才是在上述三部分都不存在的情况下,进行实际的重连工作。
4. 第四部分才是重连工作,根据uri找到对应的Transport,对该Transport设置独有的TransportListener并启动。然后设置到FailoverTransport的connectedTransport作为当前连接的Transport。
下面把方法中主要的第四部分摘录出来
Iterator<URI> iter = connectList.iterator();
while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
try {
// SSL先忽略
SslContext.setCurrentSslContext(brokerSslContext);
// We could be starting with a backup and if so we wait to grab a
// URI from the pool until next time around.
// 没建立过连接就先建立一个
if (transport == null) {
uri = addExtraQueryOptions(iter.next());
transport = TransportFactory.compositeConnect(uri);
}
LOG.debug("Attempting {}th connect to: {}", connectFailures, uri);
// 给transport增加一个transportListener,listener可以判断连接是否异常,异常时也唤醒reconnectTask
transport.setTransportListener(createTransportListener(transport));
// 如果transport的start方法调用没问题,则表示连接建立成功
transport.start();
// 如果不是第一次建立连接,则需要通知服务器是一个重连的客户端
if (started && !firstConnection) {
restoreTransport(transport);
}
LOG.debug("Connection established");
// 为下一次的重连进行配置
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport.set(transport);
connectedToPriority = isPriority(connectedTransportURI);
reconnectMutex.notifyAll(
connectFailures = 0;
// Make sure on initial startup, that the transportListener
// has been initialized for this instance.
// 确保为这个连接加上listener
synchronized (listenerMutex) {
if (transportListener == null) {
try {
// if it isn't set after 2secs - it probably never will be
listenerMutex.wait(2000);
} catch (InterruptedException ex) {
}
}
}
if (firstConnection) {
firstConnection = false;
LOG.info("Successfully connected to {}", uri);
} else {
LOG.info("Successfully reconnected to {}", uri);
}
return false;
} catch (Exception e) {
failure = e;
LOG.debug("Connect fail to: {}, reason: {}", uri, e);
if (transport != null) {
try {
transport.stop();
transport = null;
} catch (Exception ee) {
LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee);
}
}
}
可以看到transport是通过这个语句建立的
transport = TransportFactory.compositeConnect(uri);
需要注意的是这个时候传入的uri是不带failover的,因此如果设置的是形如failover:(tcp://localhost:61616)这样的连接配置,此时uri应该是tcp://localhost:61616。所以返回的transport是tcpTransport。
看着transportListener很重要的样子,那这个transportListener到底起什么作用呢。
private TransportListener createTransportListener(final Transport owner) {
return new TransportListener() {
……
@Override
// 最主要的overide,当异常时调用handleTransportFailure方法
public void onException(IOException error) {
try {
handleTransportFailure(owner, error);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (transportListener != null) {
transportListener.onException(new InterruptedIOException());
}
}
}
……
};
}
可以看到transportListener就是一个监控的线程,当发现transport异常时,会调用handleTransportFailure进行故障处理:
public final void handleTransportFailure(Transport failed, IOException e) throws InterruptedException {
// 如果异常是由于连接正在关闭引起的,就不用管它
if (shuttingDown) {
// shutdown info sent and remote socket closed and we see that before a local close
// let the close do the work
return;
}
if (LOG.isTraceEnabled()) {
LOG.trace(this + " handleTransportFailure: " + e, e);
}
// 重置transport为空
// could be blocked in write with the reconnectMutex held, but still needs to be whacked
Transport transport = null;
if (connectedTransport.compareAndSet(failed, null)) {
transport = failed;
if (transport != null) {
disposeTransport(transport);
}
}
synchronized (reconnectMutex) {
if (transport != null && connectedTransport.get() == null) {
boolean reconnectOk = false;
// 如果transport是started状态且重连次数未达上限,则表示可以开始尝试重连
if (canReconnect()) {
reconnectOk = true;
}
LOG.warn("Transport ({}) failed {} attempting to automatically reconnect: {}",
connectedTransportURI, (reconnectOk ? "," : ", not"), e);
failedConnectTransportURI = connectedTransportURI;
connectedTransportURI = null;
connectedToPriority = false;
if (reconnectOk) {
// notify before any reconnect attempt so ack state can be whacked
if (transportListener != null) {
transportListener.transportInterupted();
}
updated.remove(failedConnectTransportURI);
// 唤醒reconnectTask
reconnectTask.wakeup();
} else if (!isDisposed()) {
propagateFailureToExceptionListener(e);
}
}
}
}
嗯,其实主要就是唤醒reconnectTask开始重连。。。
总结:FailoverTransport其实就是封装了一个reconnectTask,这个Task会在建立连接时启动,在连接异常时也会被唤醒,其主要就是对尝试建立连接的封装。
接下来还有个问题:
Transport什么时候会抛异常?因为环境中出现了部分异常客户端的连接处于CLOSE_WAIT的状态。且客户端没有再进行重连的尝试,其机制是怎样的呢?
带着这个疑问,后续再研究。