博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mysql主从同步(复制)canal跨机房同步
阅读量:4043 次
发布时间:2019-05-24

本文共 7139 字,大约阅读时间需要 23 分钟。

一、基于mysql,binlog二级制日志的主从同步

搭建两个数据库,我本地用的是docker

master:172.22.0.2:3306

slave:172.22.0.3:3306

docker-compose.yml配置如下

master映射到主机13306端口,slave映射到主机23306端口

version: '2' # 这个version是指dockerfile解析时用的版本,不是给我们自己定义版本号用的.services:                                                        mysql13306:    image: mysql:5.7.23 # ./master文件下需要有Dockerfile文件,并且build属性和image属性不能一起使用    ports:      - 0.0.0.0:13306:3306    container_name: mysql13306 # 容器名    environment:      TZ: Asia/Shanghai      MYSQL_ROOT_PASSWORD: 123456    volumes: # 挂载 下边每行前边的`-`代表这个东西是数组的一个元素.就是说volumes属性的值是一个数组      - /home/sy/docker/data/mysql/13306:/var/lib/mysql      - /home/sy/docker/conf/mysql/13306:/etc/mysql/conf.d    networks: # 网络      extnetwork: # 见跟services平级的networks,在最下边        ipv4_address: 172.22.0.2 # 设置静态ipv4的地址    restart: always # 容器重启策略    mysql23306:    image: mysql:5.7.23    ports:      - 0.0.0.0:23306:3306    container_name: mysql23306    environment:      TZ: Asia/Shanghai      MYSQL_ROOT_PASSWORD: 123456    volumes:      - /home/sy/docker/data/mysql/23306:/var/lib/mysql      - /home/sy/docker/conf/mysql/23306:/etc/mysql/conf.d     networks: # 网络      extnetwork: # 见跟services平级的networks,在最下边        ipv4_address: 172.22.0.3 # 设置静态ipv4的地址    restart: always # 容器重启策略  networks: # docker网络设置  extnetwork: # 自定义网络名称    driver: bridge # 桥接    ipam: # 要使用静态ip必须使用ipam插件      driver: default      config:      - subnet: 172.22.0.0/24        gateway: 172.22.0.1
主服务配置文件目录/home/sy/docker/conf/mysql/13306 下增加 my.cnf(具体配置如下)
[mysqld]log-bin=mysql-binserver-id=13306character_set_server=utf8  init_connect='SET NAMES utf8'

从服务配置文件目录/home/sy/docker/conf/mysql/23306 下增加 my.cnf(具体配置如下)

[mysqld]log-bin=mysql-binserver-id=23306character_set_server=utf8  init_connect='SET NAMES utf8'

主服务配置

创建用户,且该账户只能进行主从同步

create user 'repl'@'172.22.0.3' identified by 'repl';grant replication slave on *.* to 'repl'@'172.22.0.3';

使用下面命令查看配置情况

show grants for 'repl'@'172.22.0.3';

 

从服务配置

change master to     master_host='172.22.0.2',     master_port=3306,     master_user='repl',     master_password='repl',     master_log_file='mysql-bin.000003',     master_log_pos=605;

master_log_file和master_log_pos可以在主库执行下面命令查看

show master status ;

使用下面命令启动同步线程

start slave;

查看同步线程状态

show processlist;

 

从库同步线程启动以后在主库新建数据库就可以看到同步效果了

CREATE DATABASE `test` CHARACTER SET utf8 COLLATE utf8_general_ci;CREATE TABLE `d` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `name` varchar(64) DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB   DEFAULT CHARSET=utf8;

二使用canal实现基于ysql数据库binlog的增量订阅&消费组件的主从同步

canal阿里巴巴mysql数据库binlog的增量订阅&消费组件 。阿里云DRDS(  )、阿里巴巴TDDL 二级索引、小表复制powerd by canal. Aliyun Data Lake Analytics  powered by canal

github地址:

下载,并解压,修改conf/example/instance.properties配置文件中的主库地址

# position infocanal.instance.master.address=127.0.0.1:13306

 修改主库配置文件

[mysqld]log-bin=mysql-bin #添加这一行就okbinlog-format=ROW #选择row模式server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

创建同步用户

CREATE USER canal IDENTIFIED BY 'canal';  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;
启动服务
sh bin/startup.sh

客户端配置

工程增加pom依赖

com.alibaba.otter
canal.client
1.1.0

创建客户端测试类

package com.alibaba.otter.canal.sample;import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main(String args[]) {    // 创建链接    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),                                                                                        11111), "example", "", "");    int batchSize = 1000;    int emptyCount = 0;    try {        connector.connect();        connector.subscribe(".*\\..*");        connector.rollback();        int totalEmptyCount = 120;        while (emptyCount < totalEmptyCount) {            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据            long batchId = message.getId();            int size = message.getEntries().size();            if (batchId == -1 || size == 0) {                emptyCount++;                System.out.println("empty count : " + emptyCount);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                }            } else {                emptyCount = 0;                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);                printEntry(message.getEntries());            }            connector.ack(batchId); // 提交确认            // connector.rollback(batchId); // 处理失败, 回滚数据        }        System.out.println("empty too many times, exit");    } finally {        connector.disconnect();    }}private static void printEntry(List
entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } }}private static void printColumn(List
columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); }}}

配置完成,启动客户端程序,在主库执行,新增,修改操作是,对应控制台打印出日志信息

更多canal配置和使用可以参考github的使用文档

转载地址:http://whmdi.baihongyu.com/

你可能感兴趣的文章
qt5 everywhere 编译summary
查看>>
qt5 everywhere编译完成后,找不到qmake
查看>>
qt 创建异形窗体
查看>>
可重入函数与不可重入函数
查看>>
简单Linux C线程池
查看>>
内存池
查看>>
输入设备节点自动生成
查看>>
GNU hello代码分析
查看>>
Qt继电器控制板代码
查看>>
wpa_supplicant控制脚本
查看>>
gstreamer相关工具集合
查看>>
RS232 四入四出模块控制代码
查看>>
gstreamer插件之 videotestsrc
查看>>
linux 驱动开发 头文件
查看>>
/etc/resolv.conf
查看>>
container_of()传入结构体中的成员,返回该结构体的首地址
查看>>
linux sfdisk partition
查看>>
ipconfig,ifconfig,iwconfig
查看>>
opensuse12.2 PL2303 minicom
查看>>
网络视频服务器移植
查看>>