借助Canal实现MySQL数据库间链接(canal链接mysql)

借助Canal实现MySQL数据库间链接

在现代化的应用开发中,不同的系统或应用之间可能需要共享同一个数据库,这时候就需要实现数据库间的链接。本文将介绍如何借助Canal实现MySQL数据库间链接。Canal是阿里巴巴开源的一款数据库增量订阅和消费组件,支持MySQL、Oracle等数据库,它可以将数据库更新的数据通过可靠的方式同步到其他数据存储、NoSQL等系统中。

一、Canal介绍

Canal是阿里巴巴开源的一款数据库增量订阅和消费组件,是基于MySQL数据库增量日志构建的,从而实现了与数据源(如MySQL)解耦,达到了异构神异的目的。Canal主要包括三个模块: Canal.Admin、Canal.Server和Canal.Client。

Canal.Admin: Canal控制台管理界面,用于管理Canal的启停和监控

Canal.Server: Canal的工作服务端,负责从数据源(如MySQL)订阅增量日志,并把日志传输给客户端

Canal.Client: Canal的客户端,用于订阅和消费Canal.Server传输的数据

二、Canal的使用场景

Canal主要应用于以下场景:

1、数据实时同步

提供不同数据存储的数据实时同步,如 MySQL 到 Elasticsearch 的同步,实时更新数据,保持数据一致

2、数据订阅

对于需要全量数据同步的场景,结合 snapshot 快照机制,可以实现数据全量订阅

3、实时数据分析

对数据实时抓取,进行数据分析计算

4、缓存更新

将数据更新到Cache(如Redis)中,提升系统性能

三、Canal的具体实现

将A库的数据同步到B库中,具体实现如下:

1、安装Canal

Canal的安装需要先下载源码,然后进行编译打包,具体步骤可以参考Canal官网: https://github.com/alibaba/canal

2、配置Canal

Canal的配置文件位于config文件夹下,通过修改canal.properties实现配置。

(1)配置MySQL的主从关系

# mysql主从地址信息

canal.instance.master.address=127.0.0.1:3306

canal.instance.dbUsername=canal_test

canal.instance.dbPassword=canal_test

# 配置binlog信息,也可以从当前解析到的binlog中获取,

# 优先从binlog position 获取,找不到才到 GTID_GET中获取, gtid模式推荐打开,

# 当前的timestamp可以通过show master status或 show binary logs获取

canal.instance.connectionCharset = UTF-8

canal.instance.gtidon=on

canal.instance.position =

(2)配置Canal连接内容

# 配置instance连接信息

canal.instance.filter.regex=canal_test.tb_goods

canal.instance.rds.accesskey=

canal.instance.rds.secretkey=

(3)配置数据输出方式

# 配置数据输出方式

canal.mq.topic=test

# 指定数据传输格式

canal.mq.flatMessage = false

(4)配置kafka通常参数和账号信息

# canal.mq.producerGroup 客户端group组名,同一个topic下的不同group组互不影响

# canal.mq.servers 指定mq服务器的地址

# canal.mq.topics 指定MQ topic主题名称

# authAccount ,配置到应用已指定的账号

canal.mq.properties.bootstrap.servers=192.168.11.131:9092

canal.mq.producer.bootstrap-servers=192.168.11.131:9092

canal.mq.producer.topic=myTest can

(5)启动Canal

执行bin目录下的startup脚本,即可启动Canal。

3、配置Canal客户端

在B库中新建表,同步A库的数据到该表中。

(1)在B库中创建表

mysql> create database canal_test2;

mysql> use canal_test2;

mysql> create table tb_goods(

-> id int(11) not null auto_increment primary key,

-> name varchar(60) not null,

-> price int(10) not null

-> )engine=innodb default charset=utf8;

(2)在Canal服务端中新增instance,即配置同步关系

mysql> create database canal_client;

mysql> use canal_client;

mysql> create table canal_client.tb_goods(

-> id int(11) not null,

-> name varchar(60) not null,

-> price int(10) not null

-> )engine=innodb default charset=utf8;

mysql> GRANT ALL PRIVILEGES ON canal_client.* TO canal@’%’ IDENTIFIED BY ‘canal_test’ WITH GRANT OPTION;

(3)在Canal客户端中启动Canal

通过Canal客户端,将A库的数据同步到B库中的表中,具体代码实现如下:

public class SimpleCanalClientExample {

public static void mn(String[] args) {

// 从控制台读取参数

String host = args[0];

int port = Integer.valueOf(args[1]);

String destination = args[2];

String username = args[3];

String password = args[4];

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(host,

port), destination, username, password);

int batchSize = 1000;

try {

connector.connect();

connector.subscribe(“canal_test.tb_goods”);

connector.rollback();

while (true) {

Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

long batchId = message.getId();

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

Thread.sleep(1000);

} else {

System.out.printf(“batchId: %s, size: %s \n”, batchId, size);

printEntry(message.getEntries());

}

connector.ack(batchId);

}

} catch (Exception e) {

e.printStackTrace();

} finally {

connector.disconnect();

}

}

private static void printEntry(List entries) {

for (CanalEntry.Entry entry : entries) {

if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN

|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND

|| entry.getEntryType() == CanalEntry.EntryType.HEARTBEAT) {

continue;

}

RowChange rowChange = null;

try {

rowChange = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

continue;

}

for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {

if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {

printColumn(rowData.getBeforeColumnsList());

} else if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {

printColumn(rowData.getAfterColumnsList());

} else {

printColumn(rowData.getBeforeColumnsList());

System.out.println(“=======”);

printColumn(rowData.getAfterColumnsList());

}

}

}

}

private static void printColumn(List columns) {

for (CanalEntry.Column column : columns) {

System.out.println(column.getName() + “\t” + column.getValue() + “\t” + column.getUpdated());

}

}

}

通过以上代码,我们就可以将A库的tb_goods表中的数据实时同步到B库中的canal_client库的tb_goods表中。

四、总结

Canal是一款非常优秀的数据库增量订阅和消费组件,它可以很好地解决数据库间链接的问题,实现不同数据存储之间的数据同步。我们可以通过Canal的控制台管理界面,或者通过Canal的客户端代码实现数据库之间的数据同步。当然,Canal也有其缺点,就是在高并发场景下,可能会受到性能的限制,这需要我们在具体的应用场景中进行实际评估。


数据运维技术 » 借助Canal实现MySQL数据库间链接(canal链接mysql)