序
本文主要研究一下BinaryLogClient的connect
connect(timeout)
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
public class BinaryLogClient implements BinaryLogClientMXBean {
//......
public void connect(final long timeout) throws IOException, TimeoutException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
AbstractLifecycleListener connectListener = new AbstractLifecycleListener() {
@Override
public void onConnect(BinaryLogClient client) {
countDownLatch.countDown();
}
};
registerLifecycleListener(connectListener);
final AtomicReference<IOException> exceptionReference = new AtomicReference<IOException>();
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
setConnectTimeout(timeout);
connect();
} catch (IOException e) {
exceptionReference.set(e);
countDownLatch.countDown(); // making sure we don't end up waiting whole "timeout"
}
}
};
newNamedThread(runnable, "blc-" + hostname + ":" + port).start();
boolean started = false;
try {
started = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, e.getMessage());
}
}
unregisterLifecycleListener(connectListener);
if (exceptionReference.get() != null) {
throw exceptionReference.get();
}
if (!started) {
try {
terminateConnect();
} finally {
throw new TimeoutException("BinaryLogClient was unable to connect in " + timeout + "ms");
}
}
}
//......
}
- 带timeout参数的connect方法使用setConnectTimeout方法设置了超时时间,然后再执行无参的connect方法;之后在捕获IOException的时候更新exceptionReference以及触发countDownLatch.countDown(),之后抛出exceptionReference.get()
connect
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
public class BinaryLogClient implements BinaryLogClientMXBean {
//......
public void connect() throws IOException {
if (!connectLock.tryLock()) {
throw new IllegalStateException("BinaryLogClient is already connected");
}
boolean notifyWhenDisconnected = false;
try {
Callable cancelDisconnect = null;
try {
try {
long start = System.currentTimeMillis();
channel = openChannel();
if (connectTimeout > 0 && !isKeepAliveThreadRunning()) {
cancelDisconnect = scheduleDisconnectIn(connectTimeout -
(System.currentTimeMillis() - start));
}
if (channel.getInputStream().peek() == -1) {
throw new EOFException();
}
} catch (IOException e) {
throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
". Please make sure it's running.", e);
}
GreetingPacket greetingPacket = receiveGreeting();
authenticate(greetingPacket);
connectionId = greetingPacket.getThreadId();
if ("".equals(binlogFilename)) {
synchronized (gtidSetAccessLock) {
if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) {
gtidSet = new GtidSet(fetchGtidPurged());
}
}
}
if (binlogFilename == null) {
fetchBinlogFilenameAndPosition();
}
if (binlogPosition < 4) {
if (logger.isLoggable(Level.WARNING)) {
logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4);
}
binlogPosition = 4;
}
ChecksumType checksumType = fetchBinlogChecksum();
if (checksumType != ChecksumType.NONE) {
confirmSupportOfChecksum(checksumType);
}
if (heartbeatInterval > 0) {
enableHeartbeat();
}
gtid = null;
tx = false;
requestBinaryLogStream();
} catch (IOException e) {
disconnectChannel();
throw e;
} finally {
if (cancelDisconnect != null) {
try {
cancelDisconnect.call();
} catch (Exception e) {
if (logger.isLoggable(Level.WARNING)) {
logger.warning("\"" + e.getMessage() +
"\" was thrown while canceling scheduled disconnect call");
}
}
}
}
connected = true;
notifyWhenDisconnected = true;
if (logger.isLoggable(Level.INFO)) {
String position;
synchronized (gtidSetAccessLock) {
position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
}
logger.info("Connected to " + hostname + ":" + port + " at " + position +
" (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");
}
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onConnect(this);
}
if (keepAlive && !isKeepAliveThreadRunning()) {
spawnKeepAliveThread();
}
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
}
}
listenForEventPackets();
} finally {
connectLock.unlock();
if (notifyWhenDisconnected) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onDisconnect(this);
}
}
}
}
//......
}
- connect方法先执行openChannel方法,之后在connectTimeout大于0且keepAliveThreadRunning为false时通过scheduleDisconnectIn调度执行超时逻辑;之后执行receiveGreeting获取greetingPacket,然后执行authenticate(greetingPacket),之后获取connectionId及设置gtidSet;在binlogFilename为null时执行fetchBinlogFilenameAndPosition;之后执行fetchBinlogChecksum,对于heartbeatInterval大于0的执行enableHeartbeat,最后执行requestBinaryLogStream;若出现IOException则执行disconnectChannel并抛出异常;连接成功之后回调lifecycleListeners的onConnect,对于需要keepAlive的执行spawnKeepAliveThread;最后执行listenForEventPackets
scheduleDisconnectIn
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
public class BinaryLogClient implements BinaryLogClientMXBean {
//......
private Callable scheduleDisconnectIn(final long timeout) {
final BinaryLogClient self = this;
final CountDownLatch connectLatch = new CountDownLatch(1);
final Thread thread = newNamedThread(new Runnable() {
@Override
public void run() {
try {
connectLatch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, e.getMessage());
}
}
if (connectLatch.getCount() != 0) {
if (logger.isLoggable(Level.WARNING)) {
logger.warning("Failed to establish connection in " + timeout + "ms. " +
"Forcing disconnect.");
}
try {
self.disconnectChannel();
} catch (IOException e) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, e.getMessage());
}
}
}
}
}, "blc-disconnect-" + hostname + ":" + port);
thread.start();
return new Callable() {
public Object call() throws Exception {
connectLatch.countDown();
thread.join();
return null;
}
};
}
private void disconnectChannel() throws IOException {
connected = false;
if (channel != null && channel.isOpen()) {
channel.close();
}
}
//......
}
- scheduleDisconnectIn方法主要是通过CountDownLatch来执行超时等待,超时的话执行disconnectChannel方法,它会关闭channel
receiveGreeting
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
public class BinaryLogClient implements BinaryLogClientMXBean {
//......
private GreetingPacket receiveGreeting() throws IOException {
byte[] initialHandshakePacket = channel.read();
if (initialHandshakePacket[0] == (byte) 0xFF /* error */) {
byte[] bytes = Arrays.copyOfRange(initialHandshakePacket, 1, initialHandshakePacket.length);
ErrorPacket errorPacket = new ErrorPacket(bytes);
throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
errorPacket.getSqlState());
}
return new GreetingPacket(initialHandshakePacket);
}
private void fetchBinlogFilenameAndPosition() throws IOException {
ResultSetRowPacket[] resultSet;
channel.write(new QueryCommand("show master status"));
resultSet = readResultSet();
if (resultSet.length == 0) {
throw new IOException("Failed to determine binlog filename/position");
}
ResultSetRowPacket resultSetRow = resultSet[0];
binlogFilename = resultSetRow.getValue(0);
binlogPosition = Long.parseLong(resultSetRow.getValue(1));
}
private ChecksumType fetchBinlogChecksum() throws IOException {
channel.write(new QueryCommand("show global variables like 'binlog_checksum'"));
ResultSetRowPacket[] resultSet = readResultSet();
if (resultSet.length == 0) {
return ChecksumType.NONE;
}
return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase());
}
private void enableHeartbeat() throws IOException {
channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000));
byte[] statementResult = channel.read();
if (statementResult[0] == (byte) 0xFF /* error */) {
byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length);
ErrorPacket errorPacket = new ErrorPacket(bytes);
throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
errorPacket.getSqlState());
}
}
private void requestBinaryLogStream() throws IOException {
long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178
Command dumpBinaryLogCommand;
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
gtidSet);
} else {
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);
}
}
channel.write(dumpBinaryLogCommand);
}
private void spawnKeepAliveThread() {
final ExecutorService threadExecutor =
Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port);
}
});
try {
keepAliveThreadExecutorLock.lock();
threadExecutor.submit(new Runnable() {
@Override
public void run() {
while (!threadExecutor.isShutdown()) {
try {
Thread.sleep(keepAliveInterval);
} catch (InterruptedException e) {
// expected in case of disconnect
}
if (threadExecutor.isShutdown()) {
return;
}
boolean connectionLost = false;
if (heartbeatInterval > 0) {
connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval;
} else {
try {
channel.write(new PingCommand());
} catch (IOException e) {
connectionLost = true;
}
}
if (connectionLost) {
if (logger.isLoggable(Level.INFO)) {
logger.info("Trying to restore lost connection to " + hostname + ":" + port);
}
try {
terminateConnect();
connect(connectTimeout);
} catch (Exception ce) {
if (logger.isLoggable(Level.WARNING)) {
logger.warning("Failed to restore connection to " + hostname + ":" + port +
". Next attempt in " + keepAliveInterval + "ms");
}
}
}
}
}
});
keepAliveThreadExecutor = threadExecutor;
} finally {
keepAliveThreadExecutorLock.unlock();
}
}
//......
}
- receiveGreeting方法接收initialHandshakePacket;fetchBinlogFilenameAndPosition方法发送
show master status
命令,然后解析binlogFilename及binlogPosition;fetchBinlogChecksum方法发送show global variables like 'binlog_checksum'
命令;enableHeartbeat方法发送set @master_heartbeat_period
命令;requestBinaryLogStream方法发送DumpBinaryLogGtidCommand命令;spawnKeepAliveThread方法会调度执行keepAliveThread,它主体是while循环,然后通过Thread.sleep(keepAliveInterval)来实现间隔,然后定时发送PingCommand命令,对于connectionLost的执行terminateConnect及connect方法
listenForEventPackets
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
public class BinaryLogClient implements BinaryLogClientMXBean {
//......
private void listenForEventPackets() throws IOException {
ByteArrayInputStream inputStream = channel.getInputStream();
boolean completeShutdown = false;
try {
while (inputStream.peek() != -1) {
int packetLength = inputStream.readInteger(3);
inputStream.skip(1); // 1 byte for sequence
int marker = inputStream.read();
if (marker == 0xFF) {
ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
errorPacket.getSqlState());
}
if (marker == 0xFE && !blocking) {
completeShutdown = true;
break;
}
Event event;
try {
event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
inputStream);
if (event == null) {
throw new EOFException();
}
} catch (Exception e) {
Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
if (cause instanceof EOFException || cause instanceof SocketException) {
throw e;
}
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEventDeserializationFailure(this, e);
}
}
continue;
}
if (isConnected()) {
eventLastSeen = System.currentTimeMillis();
updateGtidSet(event);
notifyEventListeners(event);
updateClientBinlogFilenameAndPosition(event);
}
}
} catch (Exception e) {
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onCommunicationFailure(this, e);
}
}
} finally {
if (isConnected()) {
if (completeShutdown) {
disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)
} else {
disconnectChannel();
}
}
}
}
//......
}
- listenForEventPackets方法读取channel.getInputStream(),并通过eventDeserializer.nextEvent解析为event,然后通过notifyEventListeners方法将event通知eventListener
小结
connect方法先执行openChannel方法,之后在connectTimeout大于0且keepAliveThreadRunning为false时通过scheduleDisconnectIn调度执行超时逻辑;之后执行receiveGreeting获取greetingPacket,然后执行authenticate(greetingPacket),之后获取connectionId及设置gtidSet;在binlogFilename为null时执行fetchBinlogFilenameAndPosition;之后执行fetchBinlogChecksum,对于heartbeatInterval大于0的执行enableHeartbeat,最后执行requestBinaryLogStream;若出现IOException则执行disconnectChannel并抛出异常;连接成功之后回调lifecycleListeners的onConnect,对于需要keepAlive的执行spawnKeepAliveThread;最后执行listenForEventPackets