首先说一下我对canal中位点的理解。什么是位点?位点是 binlog事件在binlog文件中的位置。但是对于canal而言,canal server发送dump请求前需要确定mysql的同步位点
,主要包括canal server启动,mysql主备切换,canal server主备切换,dump异常后重启等情况。 同时,在canal client不断从canal server读取数据的过程中, canal client
需要告知 canal server自己消费成功的位点
,这样当发生canal client崩溃或者canal server崩溃重启后,都会考虑是否按照原来消费成功的位点之后继续消费或dump。下面我将通过canal server dump前找mysql同步位点
的过程分析我对canal中位点的理解。
对于HA模式的canal server,我们先看下有哪些位点管理器。
FailbackLogPositionManager
:实现基于failover查找的机制完成meta的操作,应用场景:比如针对内存buffer,出现HA切换,先尝试从内存buffer区中找到lastest position,如果不存在才尝试找一下meta里消费的信息。初始化过程为 :
//eventParser中的成员变量logPositionManager
<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
</constructor-arg>
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
<constructor-arg ref="metaManager"/>
</bean>
</constructor-arg>
</bean>
//上面logPositionManager中的成员变量secondary
<bean id="metaManager" class="com.alibaba.otter.canal.meta.PeriodMixedMetaManager">
<property name="zooKeeperMetaManager">
<bean class="com.alibaba.otter.canal.meta.ZooKeeperMetaManager">
<property name="zkClientx" ref="zkClientx" />
</bean>
</property>
<property name="period" value="${canal.zookeeper.flush.period:1000}" />
</bean>
主要包括两个成员变量:
//内存
private final CanalLogPositionManager primary;
private final CanalLogPositionManager secondary;
其中primary
是在内存中记录位点变化的管理器MemoryLogPositionManager
,对应canal server中dump下来的binlog event最新位点。
secondary
是位点管理器MetaLogPositionManager
,它有一个成员变量metaManager
是PeriodMixedMetaManager
。
PeriodMixedMetaManager
:canal client消费信息管理器,如上是定时刷新canal client消费位点信息到zk上的位点管理器。主要成员变量:
//内存中记录的canal client ack position
protected Map<ClientIdentity, Position> cursors;
//定时刷新线程池
private ScheduledExecutorService executor;
//与zk交互的位点管理器,更新ack position等
private ZooKeeperMetaManager zooKeeperMetaManager;
//定时刷新到zk ack position的时间间隔
private long period = 1000; // 单位ms
private Set<ClientIdentity> updateCursorTasks;
canal server dump前找mysql同步位点
在特定instance上激活的canal server变开始启动自己的CanalInstanceWithSpring实例,其中会启动自己的eventParser。如官方文档所说,parser的启动和解析过程为:
1.Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
2.Connection建立链接,发送BINLOG_DUMP指令
3.Mysql开始推送Binaly Log
4.接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
// 补充字段名字,字段类型,主键信息,unsigned类型处理
传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
5.存储成功后,定时记录Binaly Log位置
在此,我们聚焦在步骤一,即canal server dump前找mysql同步位点。代码如下:
//MysqlEventParser.findStartPosition
protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
if (isGTIDMode()) {
//步骤一
// GTID模式下,CanalLogPositionManager里取最后的gtid,没有则取instanc配置中的
LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination);
if (logPosition != null) {
//1.3
// 如果以前是非GTID模式,后来调整为了GTID模式,那么为了保持兼容,需要判断gtid是否为空
if (StringUtils.isNotEmpty(logPosition.getPostion().getGtid())) {
return logPosition.getPostion();
}
}else {
//1.4
if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
return masterPosition;
}
}
}
//步骤二
EntryPosition startPosition = findStartPositionInternal(connection);
if (needTransactionPosition.get()) {
//步骤三
logger.warn("prepare to find last position : {}", startPosition.toString());
Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition);
if (!preTransactionStartPosition.equals(startPosition.getPosition())) {
logger.warn("find new start Transaction Position , old : {} , new : {}",
startPosition.getPosition(),
preTransactionStartPosition);
startPosition.setPosition(preTransactionStartPosition);
}
needTransactionPosition.compareAndSet(true, false);
}
return startPosition;
}
代码逻辑为:
步骤一
.如果是GTID Mode
,找到位点就直接返回。
步骤二
.否则调用findStartPositionInternal(connection)
继续寻找位点。
步骤三
.如果needTransactionPosition=true
,则必须要求找到事务开启使的binlog位点作为起始位点返回。
在具体分析上述步骤前,我们先看一下位点类的结构:
public class LogPosition extends Position {
//伪装成slave的canal server信息
private LogIdentity identity;
//位点的包装类
private EntryPosition postion;
}
public class EntryPosition extends TimePosition {
//
private boolean included = false;
//binlog文件名称
private String journalName;
//所在binlog文件中的位点位置
private Long position;
// 记录一下位点对应的mysql serverId
private Long serverId = null;
//伪装成slave的canal server的gtid集合
private String gtid = null;
}
//基于时间的位置,position数据不唯一
public class TimePosition extends Position {
private static final long serialVersionUID = 6185261261064226380L;
protected Long timestamp;
可以看出位点也是考虑了两种dump方式,一种是 binlog filename + position
,一种是GTID
。下面我们先看步骤一中的GTID模式找起始位点。
//FailbackLogPositionManager. getLatestIndexBy
public LogPosition getLatestIndexBy(String destination) {
//1.1
LogPosition logPosition = primary.getLatestIndexBy(destination);
if (logPosition != null) {
return logPosition;
}
//1.2
return secondary.getLatestIndexBy(destination);
}
1.1先从内存位点管理器primary
中寻找instance对应的dump位点,能找到就返回。
//MemoryLogPositionManager.getLatestIndexBy
public LogPosition getLatestIndexBy(String destination) {
return positions.get(destination);
}
1.2如果内存位点管理器中找不到,则到位点管理器secondary
中寻找位点并返回。
//MetaLogPositionManager.getLatestIndexBy
public LogPosition getLatestIndexBy(String destination) {
List<ClientIdentity> clientIdentities = metaManager.listAllSubscribeInfo(destination);
LogPosition result = null;
if (!CollectionUtils.isEmpty(clientIdentities)) {
// 尝试找到一个最小的logPosition
for (ClientIdentity clientIdentity : clientIdentities) {
//1.2.1
LogPosition position = (LogPosition) metaManager.getCursor(clientIdentity);
if (position == null) {
continue;
}
if (result == null) {
result = position;
} else {
result = CanalEventUtils.min(result, position);
}
}
}
return result;
}
1.2.1.主要逻辑是到metaManager
,也就是PeriodMixedMetaManager
中寻找内存中记录的canal client消费位点信息,找不到则到zk上cursor节点找。
//PeriodMixedMetaManager.getCursor
public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
//从内存中获取该client对应的消费位点
Position position = super.getCursor(clientIdentity);
if (position == nullCursor) {
return null;
} else {
return position;
}
}
// super.getCursor
public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
return cursors.get(clientIdentity);
}
cursors中找不到数据时将会调用apply方法初始化client对应的数据。
cursors = MigrateMap.makeComputingMap(new Function<ClientIdentity, Position>() {
public Position apply(ClientIdentity clientIdentity) {
Position position = zooKeeperMetaManager.getCursor(clientIdentity);
if (position == null) {
return nullCursor; // 返回一个空对象标识,避免出现异常
} else {
return position;
}
}
});
1.3.如果上述找到的位点不为空 ,且位点中的gtid也不为空,说明以前就是gtid模式,可以使用这个gtid set作为gtid dump的位点,也就是找到了gtid模式的位点。
1.4.如果上述找到的位点为空,则判断masterPosition
是否不为空且gtid也不为空,如果满足则使用masterPosition
作为gtid模式的位点。其中 masterPosition是在该instance配置文件instance.properties
中配置的mysql dump位点。
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=binlog.000008
canal.instance.master.position=2993
canal.instance.master.timestamp=99999999999999
canal.instance.master.gtid=c6704a02-8354-11e9-a0c3-f054d220296f:1-16
总结一下
:步骤一找GTID位点的逻辑为:先到primary中寻找canal server中dump下来的binlog event最新位点(内存中 ),找不到就到secondary中寻找canal client成功消费位点。如果找到了就判断位点的gtid是否为空,如果为空则说明以前不是gtid模式,则不支持gtid模式,继续步骤二。如果上述找不到位点,则判断masterPosition
是否不为空,且gtid也不为空,如果满足则使用masterPosition
作为gtid模式的位点。否则继续步骤二。
接下来看步骤二
是如何寻找非GTID模式下的起始位点的。
//MysqlEventParser.findStartPositionInternal
protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
MysqlConnection mysqlConnection = (MysqlConnection) connection;
//2.1
LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
if (logPosition == null) {// 找不到历史成功记录
EntryPosition entryPosition = null;
if (masterInfo != null && mysqlConnection.getConnector().getAddress().equals(masterInfo.getAddress())) {
//2.2.1
entryPosition = masterPosition;
} else if (standbyInfo != null
&& mysqlConnection.getConnector().getAddress().equals(standbyInfo.getAddress())) {
//2.2.2
entryPosition = standbyPosition;
}
if (entryPosition == null) {
//2.2.3
entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
}
// 判断一下是否需要按时间订阅
if (StringUtils.isEmpty(entryPosition.getJournalName())) {
//2.2.4
// 如果没有指定binlogName,尝试按照timestamp进行查找
if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
logger.warn("prepare to find start position {}:{}:{}",
new Object[] { "", "", entryPosition.getTimestamp() });
return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp());
} else {
logger.warn("prepare to find start position just show master status");
return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
}
} else {
//2.2.5
if (entryPosition.getPosition() != null && entryPosition.getPosition() > 0L) {
// 如果指定binlogName + offest,直接返回
entryPosition = findPositionWithMasterIdAndTimestamp(mysqlConnection, entryPosition);
logger.warn("prepare to find start position {}:{}:{}",
new Object[] { entryPosition.getJournalName(), entryPosition.getPosition(),
entryPosition.getTimestamp() });
return entryPosition;
} else {
//2.2.6
EntryPosition specificLogFilePosition = null;
if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
// 如果指定binlogName +
// timestamp,但没有指定对应的offest,尝试根据时间找一下offest
EntryPosition endPosition = findEndPosition(mysqlConnection);
if (endPosition != null) {
logger.warn("prepare to find start position {}:{}:{}",
new Object[] { entryPosition.getJournalName(), "", entryPosition.getTimestamp() });
specificLogFilePosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
entryPosition.getTimestamp(),
endPosition,
entryPosition.getJournalName(),
true);
}
}
//2.2.7
if (specificLogFilePosition == null) {
// position不存在,从文件头开始
entryPosition.setPosition(BINLOG_START_OFFEST);
return entryPosition;
} else {
return specificLogFilePosition;
}
}
}
} else {
//2.1.1
if (logPosition.getIdentity().getSourceAddress().equals(mysqlConnection.getConnector().getAddress())) {
if (dumpErrorCountThreshold >= 0 && dumpErrorCount > dumpErrorCountThreshold) {
//2.1.1.1
// binlog定位位点失败,可能有两个原因:
// 1. binlog位点被删除
// 2.vip模式的mysql,发生了主备切换,判断一下serverId是否变化,针对这种模式可以发起一次基于时间戳查找合适的binlog位点
boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
&& logPosition.getPostion().getServerId() != null
&& !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
if (case2) {
long timestamp = logPosition.getPostion().getTimestamp();
long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "",
logPosition.getPostion().getTimestamp() });
EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
// 重新置为一下
dumpErrorCount = 0;
return findPosition;
}
Long timestamp = logPosition.getPostion().getTimestamp();
if (isRdsOssMode() && (timestamp != null && timestamp > 0)) {
// 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
return null;
}
}
// 其余情况
//2.1.1.2
logger.warn("prepare to find start position just last position\n {}",
JsonUtils.marshalToString(logPosition));
return logPosition.getPostion();
} else {
//2.1.2
// 针对切换的情况,考虑回退时间
long newStartTimestamp = logPosition.getPostion().getTimestamp() - fallbackIntervalInSeconds * 1000;
logger.warn("prepare to find start position by switch {}:{}:{}", new Object[] { "", "",
logPosition.getPostion().getTimestamp() });
return findByStartTimeStamp(mysqlConnection, newStartTimestamp);
}
}
}
这是一段蛮长的代码。。。其实我们需要思考的问题是:如何根据能够查到的LogPosition的信息(数据库信息,binlog信息)中提炼出最精确的dump位点。废话不多说,看代码。
2.1.从logPositionManager中寻找内存中最精确的dump位点(先寻找canal server中dump下来的binlog event最新位点,找不到就寻找canal client成功消费位),前面分析过。分为找到位点和找不到位点两种情况。我们先看找不到的情况 。
2.2.如果从logPositionManager中找不到位点,说明canal server是第一次从该instance dump。那么就要从instance.properties文件中找可能位点。该文件是可以配置一主一从mysql的,这样可以实现主备切换的mysql dump。上面介绍了masterPosition
的配置选项,standbyPosition
也是同理。
# mysql多节点解析配置
#canal.instance.standby.address = 10.20.144.29:3306
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
2.2.1.如果mysql主库信息masterInfo不为null并且当前mysqlConnection连接的是主库,则entryPosition = masterPosition。
2.2.2.否则如果mysql从库信息masterInfo不为null并且当前mysqlConnection连接是从库,则entryPosition = standbyPosition。
2.2.3.如果此时entryPosition为空,表明canal server没有指定任何位点,则将当前mysqlConnection连接的数据库的binlog最后一个位置作为dump位点并返回。
如果entryPosition能够从instance.properties中获得位点信息,则说明entryPosition中journalName
,position
,timestamp
不为空,可通过这些值进一步确定精确的dump位点。
2.2.4.如果entryPosition中journalName
为空但是timestamp
不为空,则尝试按照timestamp进行查找小于timestamp最接近的binlog事件。findByStartTimeStamp
的逻辑就是返回从最后一个binlog文件往前依次找满足条件的事务开始的位点。如果能找到,就返回该位点,找不到则将当前mysqlConnection连接的数据库的binlog最后一个位置作为dump位点并返回。
2.2.5.如果entryPosition中journalName
不为空并且position
不为空,则直接返回该位点。
2.2.6.如果entryPosition中journalName
不为空但是position
不为空,则只会在该binlog文件中根据timestamp
查找小于timestamp最接近的binlog事件。找到就返回该位点。
2.2.7.上述找不到则将当前mysqlConnection连接的数据库的binlog最后一个位置作为dump位点并返回。
至此,根据instance.properties文件中配置的位点信息确定最终dump的位点分析完毕,主旨就是如果binlog filename+position
存在,则直接作为dump位点,否则根据timestamp
确定。如果最终根据binlog filename,position,timestamp
确定不了位点,则使用当前mysqlConnection连接的数据库的binlog最后一个位置
作为dump位点
回到2.1,如果从logPositionManager中从内存中找到位点logPosition
,则说明之前这个instance是从mysql中 dump过数据的,需要结合连接状态确定当前canal server的状态是怎么样的,然后才能进一步确定dump位点。
2.1.1.logPosition
中mysql数据库信息和当前mysqlConnection连接的数据库信息
一致,说明logPosition
就是当前mysqlConnection连接的数据库的位点信息。那么为什么又会再次确定dump位点呢?可能原因是eventParser解析过程中发生dump异常,导致关闭整个parser过程,再重新启动parser解析过程。发生异常的原因可能有:binlog定位位点失败,mysql的连接临时断开又重连上,canal server HA切换等
。
2.1.1.1. dumpErrorCount默认超过2次,则说明是binlog定位位点失败导致的。可能原因:
binlog位点被删除
:此时canal server会不停打印“errno = 1236.....”的错误日志,代表该位点的binlog在mysql中已经找不到了。
vip模式的mysql,发生了主备切换
:判断一下serverId是否变化,针对这种模式可以发起一次基于时间戳查找合适的binlog位点。(应该是和阿里云mysql rds vip的模式相关的,没用过表示不懂这种场景。。)
2.1.1.2.可能是mysql的连接临时断开又重连上,canal server HA切换等
导致的重新parser过程,确定了dump位点后接着用就行。
2.1.2.如果logPosition
中mysql数据库信息和当前mysqlConnection连接的数据库信息
不一致,说明发生了mysql dump主备切换。此时需要根据timestamp
确定在新的mysql上的dump位点,而不能依赖原来连接的数据库的binlog filename 和position。
步骤三:
在步骤一和步骤二中解析出来的位点不一定是事务起始事件处的位点,此时在dump过程中可能找不到binlog事件所在table信息,会抛出TableIdNotFoundException
异常,同时将needTransactionPosition设置为true。到了步骤三会根据步骤一和步骤二中解析出来的位点确定小于它的最近的事务起始事件处的位点,作为最终的dump位点。
至此,关于canal server dump前找mysql同步位点的代码分析完了,总结一下:
步骤一:
如果是GTID Mode,则根据 logPositionManager的 primary或者secondary位点管理器
的从内存中找位点,找不到从配置文件instance.properties中找。
步骤二:
如果步骤一找不到,说明是根据binlog filename + position确定dump位点。同理先根据 logPositionManager的 primary或者secondary位点管理器
的从内存中找位点,找不到从配置文件instance.properties中找。当然会考虑canal server HA切换,dump异常,mysql连接切换对确定位点作出一些调整。
步骤三:
如果将needTransactionPosition
设置为true
,会根据步骤一和步骤二中解析出来的位点确定小于它的最近的事务起始事件处的位点,作为最终的dump位点。
如果你看懂了上面的碎碎念,不知道是否会疑惑 logPositionManager的 primary或者secondary位点管理器
管理的内存位点是如何第一次创建的?之后是如何更新的?下面我们再捋一捋这些过程。
primary的位点管理
上文我们提到primary的初始化过程,primary是在内存中记录位点变化的管理器MemoryLogPositionManager
,对应canal server中dump下来的binlog event最新位点。所以每当dump到binlog数据时便会更新primary位点信息。如何定义canal server dump到binlog数据呢?在文章开头的parser过程步骤5
中,binlog数据经过Binlog parser—>EventSink传递—>存储到EventStore后,会记录Binaly Log位点。代码中会回调TransactionFlushCallback.flush
方法刷新位点。
public AbstractEventParser(){
// 初始化一下
transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {
public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
if (!running) {
return;
}
if (!successed) {
throw new CanalParseException("consume failed!");
}
//返回的是最大的TRANSACTIONEND类型的事件位点,尽量记录一个事务做为position
LogPosition position = buildLastTransactionPosition(transaction);
if (position != null) { // 可能position为空
logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
}
}
});
}
EventParser初始化时会初始化一个TransactionFlushCallback匿名类对象,其中flush方法中logPositionManager.persistLogPosition
便会刷新primary中位点。
//FailbackLogPositionManager.persistLogPosition
public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
try {
primary.persistLogPosition(destination, logPosition);
} catch (CanalParseException e) {
logger.warn("persistLogPosition use primary log position manager exception. destination: {}, logPosition: {}",
destination,
logPosition,
e);
secondary.persistLogPosition(destination, logPosition);
}
}
secondary位点管理
上文我们提到secondary的初始化过程,secondary是位点管理器MetaLogPositionManager
,它有一个成员变量metaManager是PeriodMixedMetaManager
,是canal client消费信息管理器,如上是定时刷新canal client消费位点信息到zk上的位点管理器。所以每当canal client ack时会更新该位点。
//CanalServerWithEmbedded.ack
//客户端ack时canal server的处理
public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
checkStart(clientIdentity.getDestination());
checkSubscribe(clientIdentity);
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
PositionRange<LogPosition> positionRanges = null;
//注意,客户端是按批次去binlog 数据,一个batchId可能对应多条binlog数据,由服务端根据batchId确定最终ack的位点是什么
positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
if (positionRanges == null) { // 说明是重复的ack/rollback
throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
clientIdentity.getClientId(),
batchId));
}
// 更新cursor
if (positionRanges.getAck() != null) {
canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
if (logger.isInfoEnabled()) {
logger.info("ack successfully, clientId:{} batchId:{} position:{}",
clientIdentity.getClientId(),
batchId,
positionRanges);
}
}
// 可定时清理数据
canalInstance.getEventStore().ack(positionRanges.getEnd(), positionRanges.getEndSeq());
}
更新cursor时会更新secondary的位点信息
PeriodMixedMetaManager. updateCursor
public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
//更新成员变量cursors中的位点
super.updateCursor(clientIdentity, position);
// 添加到任务队列中,定时触发zk上该instance的位点更新,/otter/canal/destinations/{destination}/{clientId}/cursor节点
updateCursorTasks.add(clientIdentity);
}
此外,我们知道/otter/canal/destinations/{destination}/{clientId}/cursor节点是zk上的持久节点,不知道的参考canal源码解析-HA模式的实现。那么当发生canal server HA切换时,canal server应该从zk上初始化原来的client ack位点信息,避免无用的dump。上面canal server dump前解析位点时步骤1.2.1
便会从zk上拉取原来的client ack位点信息。