[Canal] 1 deployer 模块

canal有两种使用方式:1、独立部署 2、内嵌到应用中。 deployer模块主要用于独立部署canal server。

0、deployer模块源码目录结构如下所示:

在独立部署canal时,需要首先对canal的源码进行打包

mvn clean install -Dmaven.test.skip -Denv=release

打包后会在target目录生成一个以下两个文件:

deployer模块主要完成以下功能:

1、读取canal,properties配置文件

2、启动canal server,监听canal client的请求

3、启动canal instance,连接mysql数据库,伪装成slave,解析binlog

4、在canal的运行过程中,监听配置文件的变化

1、启动停止

启动:sh bin/start.sh

启动脚本会调用CanalLauncher来进行启动,同时在bin目录下生成canal.pid文件存储进程id,执行停止脚本时kill掉这个进程id,并删除文件。

停止:sh bin/stop.sh

2、CannalLauncher

启动入口:

1、读取canal.properties文件中的配置。

2、利用读取的配置构造一个CanalController实例,将所有的启动操作都委派给CanalController进行处理。

3、最后注册一个钩子函数,在JVM停止时同时也停止canal server。

**
 * canal独立版本启动的入口类
 * 
 * @author jianghang 2012-11-6 下午05:20:49
 * @version 1.0.0
 */
public class CanalLauncher {

    private static final String CLASSPATH_URL_PREFIX = "classpath:";
    private static final Logger logger               = LoggerFactory.getLogger(CanalLauncher.class);

    public static void main(String[] args) throws Throwable {
        try {
            //1、读取canal.properties文件中配置,默认读取classpath下的canal.properties
            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
            Properties properties = new Properties();
            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
            } else {
                properties.load(new FileInputStream(conf));
            }
            //2、启动canal,首先将properties对象传递给CanalController,然后调用其start方法启动
            logger.info("## start the canal server.");
            final CanalController controller = new CanalController(properties);
            controller.start();
            logger.info("## the canal server is running now ......");
            //3、关闭canal,通过添加JVM的钩子,JVM停止前会回调run方法,其内部调用controller.stop()方法进行停止
            Runtime.getRuntime().addShutdownHook(new Thread() {

                public void run() {
                    try {
                        logger.info("## stop the canal server");
                        controller.stop();
                    } catch (Throwable e) {
                        logger.warn("##something goes wrong when stopping canal Server:\n{}",
                            ExceptionUtils.getFullStackTrace(e));
                    } finally {
                        logger.info("## canal server is down.");
                    }
                }

            });
        } catch (Throwable e) {
            logger.error("## Something goes wrong when starting up the canal Server:\n{}",
                ExceptionUtils.getFullStackTrace(e));
            System.exit(0);
        }
    }
}

3、CanalController

在CanalController的构造方法中,会对配置文件内容解析,初始化相关成员变量,做好canal server的启动前的准备工作,之后在CanalLauncher中调用CanalController.start方法来启动。

/**
 * canal调度控制器
 * 
 * @author jianghang 2012-11-8 下午12:03:11
 * @version 1.0.0
 */
public class CanalController {

    private static final Logger                      logger   = LoggerFactory.getLogger(CanalController.class);
    private Long                                     cid;
    private String                                   ip;
    private int                                      port;
    /**
     *  默认使用spring的方式载入
     */
    private Map<String, InstanceConfig>              instanceConfigs;
    private InstanceConfig                           globalInstanceConfig;
    private Map<String, CanalConfigClient>           managerClients;
    /**
     * 监听instance config的变化
     */
    private boolean                                  autoScan = true;
    private InstanceAction                           defaultAction;
    private Map<InstanceMode, InstanceConfigMonitor> instanceConfigMonitors;
    private CanalServerWithEmbedded                  embededCanalServer;
    private CanalServerWithNetty                     canalServer;

    private CanalInstanceGenerator                   instanceGenerator;
    private ZkClientx                                zkclientx;

    public CanalController(){
        this(System.getProperties());
    }

    public CanalController(final Properties properties){
        managerClients = MigrateMap.makeComputingMap(new Function<String, CanalConfigClient>() {

            public CanalConfigClient apply(String managerAddress) {
                return getManagerClient(managerAddress);
            }
        });

        //1、配置解析
        // 初始化全局参数设置
        globalInstanceConfig = initGlobalConfig(properties);
        instanceConfigs = new MapMaker().makeMap();
        // 初始化instance config
        initInstanceConfig(properties);

        // 2、准备canal server
        cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
        ip = getProperty(properties, CanalConstants.CANAL_IP);
        port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
        embededCanalServer = CanalServerWithEmbedded.instance();
        // 设置自定义的instanceGenerator
        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);
        canalServer = CanalServerWithNetty.instance();
        canalServer.setIp(ip);
        canalServer.setPort(port);

        //3、初始化zk相关代码
        // 处理下ip为空,默认使用hostIp暴露到zk中
        if (StringUtils.isEmpty(ip)) {
            ip = AddressUtils.getHostIp();
        }
        final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
        if (StringUtils.isNotEmpty(zkServers)) {
            zkclientx = ZkClientx.getZkClient(zkServers);
            // 初始化系统目录
            zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
            zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
        }

        //4 CanalInstance运行状态监控
        final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
        ServerRunningMonitors.setServerData(serverData);
        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() { 省略 }));

        //5、autoScan机制相关代码
        autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
        if (autoScan) {
            defaultAction = new InstanceAction() { 省略 });
        }
    }

3.1 配置解析相关代码

        //1、配置解析
        // 初始化全局参数设置
        globalInstanceConfig = initGlobalConfig(properties);
        instanceConfigs = new MapMaker().makeMap();
        // 初始化instance config
        initInstanceConfig(properties);

3.1.1 globalInstanceConfig字段

表示canal instance的全局配置,类型为InstanceConfig,通过initGlobalConfig方法进行初始化。主要用于解析canal.properties以下几个配置项:

  • canal.instance.global.mode:确定canal instance配置加载方式,取值有manager、spring两种方式

  • canal.instance.global.lazy:确定canal instance是否延迟初始化

  • canal.instance.global.manager.address:配置中心地址。如果canal.instance.global.mode=manager,需要提供此配置项

  • canal.instance.global.spring.xml:spring配置文件路径。如果canal.instance.global.mode=spring,需要提供此配置项

private InstanceConfig initGlobalConfig(Properties properties) {
    InstanceConfig globalConfig = new InstanceConfig();
    //读取canal.instance.global.mode
    String modeStr = getProperty(properties, CanalConstants.getInstanceModeKey(CanalConstants.GLOBAL_NAME));
    if (StringUtils.isNotEmpty(modeStr)) {
        //将modelStr转成枚举InstanceMode,这是一个枚举类,只有2个取值,SPRING\MANAGER,对应两种配置方式
        globalConfig.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr)));
    }
    //读取canal.instance.global.lazy
    String lazyStr = getProperty(properties, CanalConstants.getInstancLazyKey(CanalConstants.GLOBAL_NAME));
    if (StringUtils.isNotEmpty(lazyStr)) {
        globalConfig.setLazy(Boolean.valueOf(lazyStr));
    }
   //读取canal.instance.global.manager.address
    String managerAddress = getProperty(properties,
        CanalConstants.getInstanceManagerAddressKey(CanalConstants.GLOBAL_NAME));
    if (StringUtils.isNotEmpty(managerAddress)) {
        globalConfig.setManagerAddress(managerAddress);
    }
    //读取canal.instance.global.spring.xml
    String springXml = getProperty(properties, CanalConstants.getInstancSpringXmlKey(CanalConstants.GLOBAL_NAME));
    if (StringUtils.isNotEmpty(springXml)) {
        globalConfig.setSpringXml(springXml);
    }
 
    instanceGenerator = //...初始化instanceGenerator 
 
    return globalConfig;
}

其中canal.instance.global.mode用于确定canal instance的全局配置加载方式,其取值范围有2个:spring、manager。canal server中可以启动多个canal instance,每个instance都有各自的配置。instance的配置也可以放在本地,也可以放在远程配置中心里。我们可以自定义每个canal instance配置文件存储的位置,如果所有canal instance的配置都在本地或者远程,此时我们就可以通过canal.instance.global.mode这个配置项,来统一的指定配置文件的位置,避免为每个canal instance单独指定。

spring方式:

表示所有的canal instance的配置文件位于本地。此时,我们必须提供配置项canal.instance.global.spring.xml指定spring配置文件的路径。canal提供了多个spring配置文件:file-instance.xml、default-instance.xml、memory-instance.xml、local-instance.xml、group-instance.xml。这些配置文件主要是为了支持canal instance不同的工作方式。

<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
        <property name="ignoreResourceNotFound" value="true" />
        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
        <property name="locationNames">
            <list>
                <value>classpath:canal.properties</value>
                <value>classpath:${canal.instance.destination:}/instance.properties</value>
            </list>
        </property>
    </bean>

通过spring方式加载canal instance配置,无非就是通过spring提供的PropertyPlaceholderConfigurer来加载canal instance的配置文件instance.properties。

这里instance.properties的文件完整路径是${canal.instance.destination:}/instance.properties,其中${canal.instance.destination}是一个变量。这是因为我们可以在一个canal server中配置多个canal instance,每个canal instance配置文件的名称都是instance.properties,因此我们需要通过目录进行区分。例如我们通过配置项canal.destinations指定多个canal instance的名字

canal.destinations= example1,example2
此时我们就要conf目录下,新建两个子目录example1和example2,每个目录下各自放置一个instance.properties。

canal在初始化时就会分别使用example1和example2来替换${canal.instance.destination:},从而分别根据example1/instance.properties和example2/instance.properties创建2个canal instance。

manager方式:

表示所有的canal instance的配置文件位于远程配置中心,此时我们必须提供配置项 canal.instance.global.manager.address来指定远程配置中心的地址。目前alibaba内部配置使用这种方式。开发者可以自己实现CanalConfigClient,连接各自的管理系统,完成接入。

3.1.2 instanceGenerator字段

类型为CanalInstanceGenerator。在initGlobalConfig方法中,除了创建了globalInstanceConfig实例,同时还为instanceGenerator进行了赋值。

instanceGenerator主要用于创建CanalInstance实例。这是instance模块中的类,其作用就是为canal.properties文件中canal.destinations配置项列出的每个destination,创建一个CanalInstance实例。CanalInstanceGenerator是一个接口,定义如下所示:

public interface CanalInstanceGenerator {
 
    /**
     * 通过 destination 产生特定的 {@link CanalInstance}
     */
    CanalInstance generate(String destination);
}

针对spring和manager两种instance配置的加载方式,CanalInstanceGenerator提供了两个对应的实现类,如下所示:



instanceGenerator字段通过一个匿名内部类进行初始化。其内部会判断配置的各个destination的配置加载方式,spring 或者manager。

instanceGenerator = new CanalInstanceGenerator() {
 
        public CanalInstance generate(String destination) {
           //1、根据destination从instanceConfigs获取对应的InstanceConfig对象
            InstanceConfig config = instanceConfigs.get(destination);
            if (config == null) {
                throw new CanalServerException("can't find destination:{}");
            }
          //2、如果destination对应的InstanceConfig的mode是manager方式,使用ManagerCanalInstanceGenerator
            if (config.getMode().isManager()) {
                ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator();
                instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
                return instanceGenerator.generate(destination);
            } else if (config.getMode().isSpring()) {
          //3、如果destination对应的InstanceConfig的mode是spring方式,使用SpringCanalInstanceGenerator
                SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
                synchronized (this) {
                    try {
                        // 设置当前正在加载的通道,加载spring查找文件时会用到该变量                        
                        System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);
                        instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
                        return instanceGenerator.generate(destination);
                    } catch (Throwable e) {
                        logger.error("generator instance failed.", e);
                        throw new CanalException(e);
                    } finally {
                        System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");
                    }
                }
            } else {
                throw new UnsupportedOperationException("unknow mode :" + config.getMode());
            }
 
        }
 
    };

CanalInstantGenerator在调用controller.start()时进行初始化。此时将引用赋值给instanceGenerator。
generate主要确定使用spring模式还是manager模式进行CanalInstance创建,spring使用SpringCanalInstanceGenerator,manager使用ManagerCanalInstanceGenerator。

spring创建CanalInstance
1、首先创建一个SpringCanalInstanceGenerator实例
2、对SpringCanalInstanceGenerator设置beanFactory
3、使用ClassPathXmlApplicationContext加载config中设置的spring配置路径。
4、调用实例的generate方法创建CanalInstance

public class SpringCanalInstanceGenerator implements CanalInstanceGenerator, BeanFactoryAware {

    private String      defaultName = "instance";
    private BeanFactory beanFactory;

    public CanalInstance generate(String destination) {
        String beanName = destination;
        //首先判断beanFactory是否包含以destination为id的bean
        if (!beanFactory.containsBean(beanName)) {
            //如果没有,设置要获取的bean的id为instance
            beanName = defaultName;
        }
        //以默认的bean的id值"instance"来获取CanalInstance实例
        return (CanalInstance) beanFactory.getBean(beanName);
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

}

在canal提供的各个spring配置文件xxx-instance.xml中,都有类似以下配置:

<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
        <property name="destination" value="${canal.instance.destination}" />
        <property name="eventParser">
            <ref local="eventParser" />
        </property>
        <property name="eventSink">
            <ref local="eventSink" />
        </property>
        <property name="eventStore">
            <ref local="eventStore" />
        </property>
        <property name="metaManager">
            <ref local="metaManager" />
        </property>
        <property name="alarmHandler">
            <ref local="alarmHandler" />
        </property>
    </bean>
CanalInstanceWithSpring

CanalInstanceWithManager

此时CanalInstanceGenerator就已经创建完成了,在CanalController的start方法被调用时,CanalInstance才会被真正的创建。

3.1.3 instanceConfigs字段

类型为Map<String, InstanceConfig>。前面提到初始化instanceGenerator后,当其generate方法被调用时,会尝试从instanceConfigs根据一个destination获取对应的InstanceConfig,现在分析instanceConfigs的相关初始化代码。

globalInstanceConfig定义全局的配置加载方式。如果需要把部分CanalInstance配置放于本地,另外一部分CanalIntance配置放于远程配置中心,则只通过全局方式配置,无法达到这个要求。虽然这种情况很少见,但是为了提供最大的灵活性,canal支持每个CanalIntance自己来定义自己的加载方式,来覆盖默认的全局配置加载方式。而每个destination对应的InstanceConfig配置就存放于instanceConfigs字段中。

例:

//当前server上部署的instance列表
canal.destinations=example1,example2 
 
//instance配置全局加载方式
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
 
//example1覆盖全局加载方式
canal.instance.example1.mode = manager
canal.instance.example1.manager.address = 127.0.0.1:1099
canal.instance.example1.lazy = true
    public static final String INSTANCE_MODE_TEMPLATE            = ROOT + "." + "instance.{0}.mode";
    public static final String INSTANCE_LAZY_TEMPLATE            = ROOT + "." + "instance.{0}.lazy";
    public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE = ROOT + "." + "instance.{0}.manager.address";
    public static final String INSTANCE_SPRING_XML_TEMPLATE      = ROOT + "." + "instance.{0}.spring.xml";

这段配置中,设置了instance的全局加载方式为spring,example1覆盖了全局配置,使用manager方式加载配置。而example2没有覆盖配置,因此默认使用spring加载方式。

instanceConfigs字段通过initInstanceConfig方法进行初始化

//这里利用Google Guava框架的MapMaker创建Map实例并赋值给instanceConfigs
instanceConfigs = new MapMaker().makeMap();
// 初始化instance config
initInstanceConfig(properties);

nitInstanceConfig方法源码如下:

private void initInstanceConfig(Properties properties) {
        // 读取配置项canal.destinations
        String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
        // 以","分割canal.destinations,得到一个数组形式的destination
        String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);
        for (String destination : destinations) {
            // 为每一个destination生成一个InstanceConfig实例
            InstanceConfig config = parseInstanceConfig(properties, destination);
            // 将destination对应的InstanceConfig放入instanceConfigs中
            InstanceConfig oldConfig = instanceConfigs.put(destination, config);

            if (oldConfig != null) {
                logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[]{destination,
                        oldConfig, config});
            }
        }
    }

InstanceConfig中维护了全局配置和instance自定义的配置,加载顺序为:如果没有自身没有这个配置,则返回全局配置,如果有,则返回自身的配置。通过这种方式实现对全局配置的覆盖。

3.2 准备canal server

        cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
        ip = getProperty(properties, CanalConstants.CANAL_IP);
        port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
        embededCanalServer = CanalServerWithEmbedded.instance();
        // 设置自定义的instanceGenerator
        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);
        canalServer = CanalServerWithNetty.instance();
        canalServer.setIp(ip);
        canalServer.setPort(port);

cid:Long,对应canal.properties文件中的canal.id,运行状态监控使用
ip:String,对应canal.properties文件中的canal.ip,canal server监听的ip。
port:int,对应canal.properties文件中的canal.port,canal server监听的端口

之后分别为以下两个字段赋值:
embededCanalServer:类型为CanalServerWithEmbedded
canalServer:类型为CanalServerWithNetty
CanalServerWithEmbedded 和 CanalServerWithNetty都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。

关于这两种类型的实现,canal官方文档有以下描述:


在应用直接使用CanalServerWithEmbedded直连mysql数据库。如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。

在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。

因此,在上述代码中,我们看到,用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,而ip和port被设置到CanalServerWithNetty中。

关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行分析。

3.3 初始化zk相关代码

        // 读取canal.properties中的配置项canal.zkServers,如果没有这个配置,则表示项目不使用zk
        final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
        if (StringUtils.isNotEmpty(zkServers)) {
            // 创建zk实例
            zkclientx = ZkClientx.getZkClient(zkServers);
            // 初始化系统目录
            // destination列表,路径为/otter/canal/destinations
            zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
            // 整个canal server的集群列表,路径为/otter/canal/cluster
            zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
        }

canal支持利用了zk来完成HA机制、以及将当前消费到的mysql的binlog位置记录到zk中。ZkClientx是canal对ZkClient进行了一层简单的封装。

显然,当我们没有配置canal.zkServers,那么zkclientx不会被初始化。

关于Canal如何利用ZK做HA,我们将在稍后的代码中进行分。而利用zk记录binlog的消费进度,将在之后的章节进行分析。

3.4 CanalInstance运行状态监控相关代码

final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
        ServerRunningMonitors.setServerData(serverData);
        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {

            public ServerRunningMonitor apply(final String destination) {
                ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
                runningMonitor.setDestination(destination);
                runningMonitor.setListener(new ServerRunningListener() {
                    //
                    public void processActiveEnter() {...}

                    public void processActiveExit() {... }

                    public void processStart() {...}

                    public void processStop() {...}
                // 触发创建一下cid节点
                runningMonitor.init();
                return runningMonitor;
            }
        }));

ServerRunningMonitors是ServerRunningMonitor对象的容器,而ServerRunningMonitor用于监控CanalInstance。

canal会为每一个destination创建一个CanalInstance,每个CanalInstance都会由一个ServerRunningMonitor来进行监控。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。

除了CanalInstance需要监控,CanalServer本身也需要监控。因此我们在代码一开始,就看到往ServerRunningMonitors设置了一个ServerRunningData对象,封装了canal server监听的ip和端口等信息。

public class ServerRunningMonitors {

    private static ServerRunningData serverData;
    private static Map               runningMonitors; // <String,
                                                      // ServerRunningMonitor>

    public static ServerRunningData getServerData() {
        return serverData;
    }

    public static Map<String, ServerRunningMonitor> getRunningMonitors() {
        return runningMonitors;
    }

    public static ServerRunningMonitor getRunningMonitor(String destination) {
        return (ServerRunningMonitor) runningMonitors.get(destination);
    }

    public static void setServerData(ServerRunningData serverData) {
        ServerRunningMonitors.serverData = serverData;
    }

    public static void setRunningMonitors(Map runningMonitors) {
        ServerRunningMonitors.runningMonitors = runningMonitors;
    }

}

ServerRunningMonitors的setRunningMonitors方法接收的参数是一个Map,其中Map的key是destination,value是ServerRunningMonitor,也就是说针对每一个destination都有一个ServerRunningMonitor来监控。

上述代码中,在往ServerRunningMonitors设置Map时,是通过MigrateMap.makeComputingMap方法来创建的,其接收一个Function类型的参数,这是guava中定义的接口,其声明了apply抽象方法。
其工作原理可以通过下面代码片段进行介绍:

Map<String, User> map = MigrateMap.makeComputingMap(new Function<String, User>() {
            @Override
            public User apply(String name) {
                return new User(name);
            }
        });
User user = map.get("tianshouzhi");//第一次获取时会创建
assert user != null;
assert user == map.get("tianshouzhi");//之后获取,总是返回之前已经创建的对象

这段代码中,我们利用MigrateMap.makeComputingMap创建了一个Map,其中key为String类型,value为User类型。当我们调用map.get("tianshouzhi")方法,最开始这个Map中并没有任何key/value的,于是其就会回调Function的apply方法,利用参数"tianshouzhi"创建一个User对象并返回。之后当我们再以"tianshouzhi"为key从Map中获取User对象时,会直接将前面创建的对象返回。不会回调apply方法,也就是说,只有在第一次尝试获取时,才会回调apply方法。

而在上述代码中,实际上就利用了这个特性,只不过是根据destination获取ServerRunningMonitor对象,如果不存在就创建。

在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener对象,接着,判断如果zkClientx字段如果不为空,也设置到ServerRunningMonitor中,最后调用init方法进行初始化。

                    /**
                     * 内部调用了embededCanalServer的start(destination)方法。
                     * 说明每个destination对应的CanalInstance是通过embededCanalServer的start方法启动的,
                     * 对应之前的instanceGenerator设置到embededCanalServer上。
                     * embededCanalServer负责调用instanceGenerator生成CanalInstance实例,并负责其启动。
                     */
                    public void processActiveEnter() {
                        try {
                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                            embededCanalServer.start(destination);
                        } finally {
                            MDC.remove(CanalConstants.MDC_DESTINATION);
                        }
                    }

                    /**
                     * 内部调用embededCanalServer的stop(destination)方法。与上start方法类似,只不过是停止CanalInstance。
                     */
                    public void processActiveExit() {
                        try {
                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                            embededCanalServer.stop(destination);
                        } finally {
                            MDC.remove(CanalConstants.MDC_DESTINATION);
                        }
                    }

                    /**
                     * 处理存在zk的情况下,在Canalinstance启动之前,在zk中创建节点。
                     * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。
                     * 此方法会在processActiveEnter()之前被调用
                     */
                    public void processStart() {
                        try {
                            if (zkclientx != null) {
                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
                                        + port);
                                initCid(path);
                                zkclientx.subscribeStateChanges(new IZkStateListener() {

                                    public void handleStateChanged(KeeperState state) throws Exception {

                                    }

                                    public void handleNewSession() throws Exception {
                                        initCid(path);
                                    }
                                });
                            }
                        } finally {
                            MDC.remove(CanalConstants.MDC_DESTINATION);
                        }
                    }

                    /**
                     * 处理存在zk的情况下,在Canalinstance停止前,释放zk节点,路径为/otter/canal/destinations/{0}/cluster/{1},
                     * 其0会被destination替换,1会被ip:port替换。此方法会在processActiveExit()之前被调用
                     */
                    public void processStop() {
                        try {
                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                            if (zkclientx != null) {
                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
                                        + port);
                                releaseCid(path);
                            }
                        } finally {
                            MDC.remove(CanalConstants.MDC_DESTINATION);
                        }
                    }

1、当ServerRunningMonitor的start方法被调用时,其首先会直接调用processStart方法,这个方法内部直接调了ServerRunningListener的processStart()方法。
2、之后会判断是否存在zkClient,如果不存在,则以本地方式启动,如果存在,则以HA方式启动。canal server可以部署成两种方式:集群方式或者独立部署。其中集群方式是利用zk来做HA,独立部署则可以直接进行启动。

public void start() {
        super.start();
        // 其内部会调用ServerRunningListener的processStart()方法
        processStart();
        // 存在zk,以HA方式启动
        if (zkClient != null) {
            // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
            String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
            zkClient.subscribeDataChanges(path, dataListener);

            initRunning();
        } else {
            // 没有zk,直接启动
            processActiveEnter();
        }
    }
  • 直接启动:
    不存在zk的情况下,会进入else代码块,调用processActiveEnter方法,其内部调用了listener的processActiveEnter,启动相应destination对应的CanalInstance。
private void processActiveEnter() {
    if (listener != null) {
        try {
            listener.processActiveEnter();
        } catch (Exception e) {
            logger.error("processActiveEnter failed", e);
        }
    }
}
  • HA方式启动:
    存在zk,说明canal server使用了集群,因为canal就是利用zk来做HA的。首先根据destination构造一个zk的节点路径,然后进行监听。
            // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start

            // 构建临时节点的路径:/otter/canal/destinations/{0}/running,其中占位符{0}会被destination替换。
            // 在集群模式下,可能会有多个canal server共同处理同一个destination,
            // 在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。
            String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
            // 对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常,
            // 此时需要尝试自己进入running状态。
            zkClient.subscribeDataChanges(path, dataListener);
private void initRunning() {
        if (!isStart()) {
            return;
        }

        // 构建临时节点的路径:/otter/canal/destinations/{0}/running,其中占位符{0}会被destination替换
        String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
        // 序列化
        // 构建临时节点的数据,标记当前destination由哪一个canal server处理
        byte[] bytes = JsonUtils.marshalToByte(serverData);
        try {
            mutex.set(false);
            // 尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。
            // 此时会抛出ZkNodeExistsException,进入catch代码块。
            zkClient.create(path, bytes, CreateMode.EPHEMERAL);
            activeData = serverData;
            // 如果创建成功,触发一下事件,内部调用ServerRunningListener的processActiveEnter方法
            processActiveEnter();
            mutex.set(true);
        } catch (ZkNodeExistsException e) {
            // 创建节点失败,则根据path从zk中获取当前是哪一个canal server创建了当前canal instance的相关信息。
            // 第二个参数true,表示的是,如果这个path不存在,则返回null。
            bytes = zkClient.readData(path, true);
            // 如果不存在节点,立即重试一次
            if (bytes == null) {
                initRunning();
            } else {
                // 如果的确存在,则将创建该canal instance实例信息存入activeData中。
                activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
            }
        } catch (ZkNoNodeException e) {
            // 如果/otter/canal/destinations/{0}/节点不存在,进行创建其中占位符{0}会被destination替换
            // 尝试创建父节点
            zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true);
            initRunning();
        }
    }

可以看到,initRunning方法内部只有在尝试在zk中创建节点成功后,才会去调用listener的processActiveEnter方法来真正启动destination对应的canal instance,这是canal HA方式启动的核心。canal官方文档中介绍了CanalServer HA机制启动的流程,如下:



官方说明的前两步,都是在initRunning方法中实现的。从代码中可以看出,在HA机启动的情况下,initRunning方法不一定能走到processActiveEnter()方法,因为创建临时节点可能会出错。
如果出错,那么当前canal instance则进入standBy状态。也就是另外一个canal instance出现异常时,当前canal instance顶上去,对应的代码为

String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);

其中dataListener类型是IZkDataListener,当zk节点中的数据发生变更时,会自动回调这两个方法,很明显,一个是用于处理节点数据发生变化,一个是用于处理节点数据被删除。

而dataListener是在ServerRunningMonitor的构造方法中初始化的,如下:

public ServerRunningMonitor(){
    // 创建父节点
    dataListener = new IZkDataListener() {
        //!!!目前看来,好像并没有存在修改running节点数据的代码,为什么这个方法不是空实现?
        public void handleDataChange(String dataPath, Object data) throws Exception {
            MDC.put("destination", destination);
            ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
            if (!isMine(runningData.getAddress())) {
                mutex.set(false);
            }
 
            if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active                                   
              release = true;
                releaseRunning();// 彻底释放mainstem            }
 
            activeData = (ServerRunningData) runningData;
        }
        //当其他canal instance出现异常,临时节点数据被删除时,会自动回调这个方法,此时当前canal instance要顶上去
        public void handleDataDeleted(String dataPath) throws Exception {
            MDC.put("destination", destination);
            mutex.set(false);
            if (!release && activeData != null && isMine(activeData.getAddress())) {
                // 如果上一次active的状态就是本机,则即时触发一下active抢占                
                initRunning();
            } else {
                // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作                
                delayExector.schedule(new Runnable() {
 
                    public void run() {
                        initRunning();//尝试自己进入running状态
                    }
                }, delayTime, TimeUnit.SECONDS);
            }
        }
    }; 
}

3.5 autoScan机制相关代码

autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
        if (autoScan) {
            defaultAction = new InstanceAction() {

                public void start(String destination) {...}

                public void stop(String destination) {...}

                public void reload(String destination) {...};

            instanceConfigMonitors

可以看到,autoScan是否需要自动扫描的开关,只有当autoScan为true时,才会初始化defaultAction字段和instanceConfigMonitors字段。其中:
1、defaultAction:其作用是如果配置发生了变更,默认应该采取什么样的操作。其实现了InstanceAction接口定义的三个抽象方法:start、stop和reload。当新增一个destination配置时,需要调用start方法来启动;当移除一个destination配置时,需要调用stop方法来停止;当某个destination配置发生变更时,需要调用reload方法来进行重启。

2、instanceConfigMonitors:类型为Map<InstanceMode, InstanceConfigMonitor>。defaultAction字段是定义了配置发生变化默认应该采取的操作,那么总该有一个类来监听配置是否发生了变化,这就是InstanceConfigMonitor的作用。官方文档中,只提到了对canal.conf.dir配置项指定的目录的监听,这指的是通过spring方式加载配置。通过manager方式加载配置,配置中心的内容也是可能发生变化的,也需要进行监听。instanceConfigMonitors的类型是一个Map,key为InstanceMode,就是为了对这两种方式的配置加载方式都进行监听。

3.5.1 instanceConfigMonitors

instanceConfigMonitors也是根据mode属性,来采取不同的监控实现类SpringInstanceConfigMonitor 或者ManagerInstanceConfigMonitor,二者都实现了InstanceConfigMonitor接口。

public interface InstanceConfigMonitor extends CanalLifeCycle {
    void register(String destination, InstanceAction action);
    void unregister(String destination);
}

当需要对一个destination进行监听时,调用register方法
当取消对一个destination监听时,调用unregister方法。

unregister方法在canal 内部并没有有任何地方被调用,即某个destination如果开启了autoScan=true,那么你是无法在运行时停止对其进行监控的。如果要停止,你可以选择将对应的目录删除。

InstanceConfigMonitor本身并不知道哪些canal instance需要进行监控,因为不同的canal instance,有的可能设置autoScan为true,另外一些可能设置为false。

在CanalConroller的start方法中,对于autoScan为true的destination,会调用InstanceConfigMonitor的register方法进行注册,此时InstanceConfigMonitor才会真正的对这个destination配置进行扫描监听。对于那些autoScan为false的destination,则不会进行监听。

目前SpringInstanceConfigMonitor对这两个方法都进行了实现,而ManagerInstanceConfigMonitor目前对这两个方法实现的都是空,需要自己来实现。

在实现ManagerInstanceConfigMonitor时,可以参考SpringInstanceConfigMonitor。

此处不打算再继续进行分析SpringInstanceConfigMonitor的源码,因为逻辑很简单,可以自行查看SpringInstanceConfigMonitor 的scan方法,内部在什么情况下会回调defualtAction的start、stop、reload方法 。

3.6 start方法

ServerRunningMonitor的start方法,是在CanalController中的start方法中被调用的,CanalController中的start方法是在CanalLauncher中被调用的。

public void start() throws Throwable {
        logger.info("## start the canal server[{}:{}]", ip, port);
        // 创建整个canal的工作节点 :/otter/canal/cluster/{0}
        final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port);
        initCid(path);
        if (zkclientx != null) {
            this.zkclientx.subscribeStateChanges(new IZkStateListener() {

                public void handleStateChanged(KeeperState state) throws Exception {

                }

                public void handleNewSession() throws Exception {
                    initCid(path);
                }
            });
        }
        // 优先启动embeded服务
        embededCanalServer.start();
        // 尝试启动一下非lazy状态的通道
        for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
            final String destination = entry.getKey();
            InstanceConfig config = entry.getValue();
            // 创建destination的工作节点
            // 如果destination对应的CanalInstance没有启动,则进行启动
            if (!embededCanalServer.isStart(destination)) {
                // 如果不是lazy,lazy模式需要等到第一次有客户端请求才会启动
                ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                if (!config.getLazy() && !runningMonitor.isStart()) {
                    runningMonitor.start();
                }
            }

            if (autoScan) {
                instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
            }
        }

        // 启动配置文件自动检测机制
        if (autoScan) {
            instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
            for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
                if (!monitor.isStart()) {
                    monitor.start();
                }
            }
        }

        // 启动网络接口,监听客户端请求
        canalServer.start();
    }

4 deployer总结

deployer模块的主要作用:

1、读取canal.properties,确定canal instance的配置加载方式(配置解析),初始化配置(准备canal server)
2、确定canal instance的启动方式:独立启动或者集群方式启动(初始化zk相关代码)
3、监听canal instance的配置的变化,动态停止、启动或新增(CanalInstance运行状态监控)
4、启动canal server,监听客户端请求 (start)

参考:

https://github.com/alibaba/canal

http://www.tianshouzhi.com/api/tutorials/canal/380

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343