本文共 7139 字,大约阅读时间需要 23 分钟。
搭建两个数据库,我本地用的是docker
master:172.22.0.2:3306
slave:172.22.0.3:3306
docker-compose.yml配置如下
master映射到主机13306端口,slave映射到主机23306端口
version: '2' # 这个version是指dockerfile解析时用的版本,不是给我们自己定义版本号用的.services: mysql13306: image: mysql:5.7.23 # ./master文件下需要有Dockerfile文件,并且build属性和image属性不能一起使用 ports: - 0.0.0.0:13306:3306 container_name: mysql13306 # 容器名 environment: TZ: Asia/Shanghai MYSQL_ROOT_PASSWORD: 123456 volumes: # 挂载 下边每行前边的`-`代表这个东西是数组的一个元素.就是说volumes属性的值是一个数组 - /home/sy/docker/data/mysql/13306:/var/lib/mysql - /home/sy/docker/conf/mysql/13306:/etc/mysql/conf.d networks: # 网络 extnetwork: # 见跟services平级的networks,在最下边 ipv4_address: 172.22.0.2 # 设置静态ipv4的地址 restart: always # 容器重启策略 mysql23306: image: mysql:5.7.23 ports: - 0.0.0.0:23306:3306 container_name: mysql23306 environment: TZ: Asia/Shanghai MYSQL_ROOT_PASSWORD: 123456 volumes: - /home/sy/docker/data/mysql/23306:/var/lib/mysql - /home/sy/docker/conf/mysql/23306:/etc/mysql/conf.d networks: # 网络 extnetwork: # 见跟services平级的networks,在最下边 ipv4_address: 172.22.0.3 # 设置静态ipv4的地址 restart: always # 容器重启策略 networks: # docker网络设置 extnetwork: # 自定义网络名称 driver: bridge # 桥接 ipam: # 要使用静态ip必须使用ipam插件 driver: default config: - subnet: 172.22.0.0/24 gateway: 172.22.0.1
主服务配置文件目录/home/sy/docker/conf/mysql/13306 下增加 my.cnf(具体配置如下)
[mysqld]log-bin=mysql-binserver-id=13306character_set_server=utf8 init_connect='SET NAMES utf8'
从服务配置文件目录/home/sy/docker/conf/mysql/23306 下增加 my.cnf(具体配置如下)
[mysqld]log-bin=mysql-binserver-id=23306character_set_server=utf8 init_connect='SET NAMES utf8'
创建用户,且该账户只能进行主从同步
create user 'repl'@'172.22.0.3' identified by 'repl';grant replication slave on *.* to 'repl'@'172.22.0.3';
使用下面命令查看配置情况
show grants for 'repl'@'172.22.0.3';
change master to master_host='172.22.0.2', master_port=3306, master_user='repl', master_password='repl', master_log_file='mysql-bin.000003', master_log_pos=605;
master_log_file和master_log_pos可以在主库执行下面命令查看
show master status ;
使用下面命令启动同步线程
start slave;
查看同步线程状态
show processlist;
从库同步线程启动以后在主库新建数据库就可以看到同步效果了
CREATE DATABASE `test` CHARACTER SET utf8 COLLATE utf8_general_ci;CREATE TABLE `d` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(64) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
canal阿里巴巴mysql数据库binlog的增量订阅&消费组件 。阿里云DRDS( )、阿里巴巴TDDL 二级索引、小表复制powerd by canal. Aliyun Data Lake Analytics powered by canal
github地址:
下载,并解压,修改conf/example/instance.properties配置文件中的主库地址
# position infocanal.instance.master.address=127.0.0.1:13306
修改主库配置文件
[mysqld]log-bin=mysql-bin #添加这一行就okbinlog-format=ROW #选择row模式server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
创建同步用户
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;
启动服务
sh bin/startup.sh
工程增加pom依赖
com.alibaba.otter canal.client 1.1.0
创建客户端测试类
package com.alibaba.otter.canal.sample;import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); }}private static void printEntry(Listentrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } }}private static void printColumn(List columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); }}}
配置完成,启动客户端程序,在主库执行,新增,修改操作是,对应控制台打印出日志信息
更多canal配置和使用可以参考github的使用文档
转载地址:http://whmdi.baihongyu.com/