canal 基于zookeeper的HA安装

我的zookeeper 集群已经安装
我的mysql已经安装
两台机器
10.86.43.154
10.93.0.192

cd /home/q/
sudo mkdir canal
cd canal/
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.0.22/canal.deployer-1.0.22.tar.gz
sudo mkdir canal-deployer
sudo tar zxvf canal.deployer-1.0.22.tar.gz -C ./canal-deployer

需要 mysql支持复制模式

sudo vim /etc/my.cnf
[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
symbolic-links=0
log-bin=mysql-bin #添加这一行就ok  
binlog-format=ROW #选择row模式
server_id=1#配置mysql replaction需要定义,不能和canal的slaveId重复   

[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid

地球人都知道,更新mysql配置my.cnf需要重启mysql才能生效,但是有些时候mysql在线上,不一定允许你重启,这时候应该怎么办呢?(我的已经修改过)

mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)
mysql> show variables like 'binlog_format';
+---------------+-----------+
| Variable_name | Value     |
+---------------+-----------+
| binlog_format | STATEMENT |
+---------------+-----------+
1 row in set (0.00 sec)

mysql> set binlog_format='ROW';
Query OK, 0 rows affected (0.00 sec)

mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)

但是 执行

mysql> set log_bin='mysql_bin';
ERROR 1238 (HY000): Variable 'log_bin' is a read only variable

开启mysql 日志是必须重启mysql了,gdb修改也不管用

mysql有以下几种日志:
错误日志: -log-err
查询日志: -log
慢查询日志: -log-slow-queries
更新日志: -log-update
二进制日志: -log-bin
mysql> show variables like  '%log%';
+-----------------------------------------+---------------------------------+
| Variable_name                           | Value                           |
+-----------------------------------------+---------------------------------+
| back_log                                | 50                              |
| binlog_cache_size                       | 32768                           |
| binlog_direct_non_transactional_updates | OFF                             |
| binlog_format                           | ROW                             |
| expire_logs_days                        | 0                               |
| general_log                             | OFF                             |
| general_log_file                        | /var/run/mysqld/mysqld.log      |
| innodb_flush_log_at_trx_commit          | 1                               |
| innodb_locks_unsafe_for_binlog          | OFF                             |
| innodb_log_buffer_size                  | 1048576                         |
| innodb_log_file_size                    | 5242880                         |
| innodb_log_files_in_group               | 2                               |
| innodb_log_group_home_dir               | ./                              |
| innodb_mirrored_log_groups              | 1                               |
| log                                     | OFF                             |
| log_bin                                 | ON                              |
| log_bin_trust_function_creators         | OFF                             |
| log_bin_trust_routine_creators          | OFF                             |
| log_error                               | /var/log/mysqld.log             |
| log_output                              | FILE                            |
| log_queries_not_using_indexes           | OFF                             |
| log_slave_updates                       | OFF                             |
| log_slow_queries                        | OFF                             |
| log_warnings                            | 1                               |
| max_binlog_cache_size                   | 18446744073709547520            |
| max_binlog_size                         | 1073741824                      |
| max_relay_log_size                      | 0                               |
| relay_log                               |                                 |
| relay_log_index                         |                                 |
| relay_log_info_file                     | relay-log.info                  |
| relay_log_purge                         | ON                              |
| relay_log_space_limit                   | 0                               |
| slow_query_log                          | OFF                             |
| slow_query_log_file                     | /var/run/mysqld/mysqld-slow.log |
| sql_log_bin                             | ON                              |
| sql_log_off                             | OFF                             |
| sql_log_update                          | ON                              |
| sync_binlog                             | 0                               |
+-----------------------------------------+---------------------------------+
38 rows in set (0.00 sec)

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)

canal配置

sudo vim canal.properties

#################################################
#########               common argument         ############# 
#################################################
canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=10.86.43.164:4180,10.86.43.154:4180,10.93.0.192:4180
# flush data to zk
canal.zookeeper.flush.period = 1000
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

#################################################
#########               destinations            ############# 
#################################################
canal.destinations= example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.global.mode = spring 
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

修改 instance.properties

cd example
sudo vim  instance.properties

#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1235 #两台不一样

# position info
canal.instance.master.address = 10.93.0.192:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password
canal.instance.dbUsername = 用户名
canal.instance.dbPassword = 密码
canal.instance.defaultDatabaseName = canal_test
canal.instance.connectionCharset = UTF-8

# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =

#################################################

两台做同样的操作
启动及 查看日志

cd ../../bin
sudo ./startup.sh
cd  /home/q/canal/canal-deployer/logs

可以查看 canal的日志 和对应的instance日志
canal是看canal是否启动正常
instance是查看对应的实例是否和mysql的master连接正常

客户端代码
新建maven项目
pom加入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.sample</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>canal.sample</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>  
        <groupId>com.alibaba.otter</groupId>  
        <artifactId>canal.client</artifactId>  
        <version>1.0.12</version>  
    </dependency>
    <dependency>
        <groupId>org.jetbrains</groupId>
        <artifactId>annotations</artifactId>
        <version>13.0</version>
    </dependency>
  </dependencies>
</project>

创建类

package com.alibaba.otter.canal.sample;


import java.net.InetSocketAddress;  
import java.util.List;  
  
import com.alibaba.otter.canal.client.CanalConnector;  
import com.alibaba.otter.canal.common.utils.AddressUtils;  
import com.alibaba.otter.canal.protocol.Message;  
import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
import com.alibaba.otter.canal.client.*;  
import org.jetbrains.annotations.NotNull;
public class App 
{
     public static void main(String args[]) {  
            // 创建链接  
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.93.0.192",  
                    11111), "example", "", "");  
            int batchSize = 1000;  
            int emptyCount = 0;  
            try {  
                connector.connect();  
                connector.subscribe(".*\\..*");  
                connector.rollback();  
                int totalEmtryCount = 1200;  
                while (emptyCount < totalEmtryCount) {  
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据  
                    System.out.println(message);
                    long batchId = message.getId();  
                    int size = message.getEntries().size();  
                    if (batchId == -1 || size == 0) {  
                        emptyCount++;  
                        System.out.println("empty count : " + emptyCount);  
                        try {  
                            Thread.sleep(1000);  
                        } catch (InterruptedException e) {  
                            e.printStackTrace();  
                        }  
                    } else {  
                        emptyCount = 0;  
                        // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);  
                        printEntry(message.getEntries());  
                    }  
      
                    connector.ack(batchId); // 提交确认  
                    // connector.rollback(batchId); // 处理失败, 回滚数据  
                }  
      
                System.out.println("empty too many times, exit");  
            } finally {  
                connector.disconnect();  
            }  
        }  
      
        private static void printEntry(@NotNull List<Entry> entrys) {  
            for (Entry entry : entrys) {  
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
                    continue;  
                }  
      
                RowChange rowChage = null;  
                try {  
                    rowChage = RowChange.parseFrom(entry.getStoreValue());  
                } catch (Exception e) {  
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
                            e);  
                }  
      
                EventType eventType = rowChage.getEventType();  
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                        eventType));  
      
                for (RowData rowData : rowChage.getRowDatasList()) {  
                    if (eventType == EventType.DELETE) {  
                        printColumn(rowData.getBeforeColumnsList());  
                    } else if (eventType == EventType.INSERT) {  
                        printColumn(rowData.getAfterColumnsList());  
                    } else {  
                        System.out.println("-------> before");  
                        printColumn(rowData.getBeforeColumnsList());  
                        System.out.println("-------> after");  
                        printColumn(rowData.getAfterColumnsList());  
                    }  
                }  
            }  
        }  
      
        private static void printColumn(@NotNull List<Column> columns) {  
            for (Column column : columns) {  
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  
            }  
        }  
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容