解析MySQL的变化数据流Canal(mysql中canal)

解析MySQL的变化数据流——Canal

Canal是淘宝金融开源的一款基于MySQL数据库的增量数据同步工具,它实现了MySQL的binlog协议,将增量数据解析成数据操作语句或者数据变更事件,发送到MQ,Kafka或者其他异步处理系统中,从而实现异地数据共享、多级缓存等场景。下面将介绍Canal的基本原理和使用方法。

一、Canal的原理

Canal的基本原理是模拟MySQL从Slave读取Binlog,并解析其内容获取增量数据的过程,因此需要在MySQL中开启Binlog功能,并授权Canal的账号访问MySQL中的Binlog,并通过Canal的配置文件进行配置。Canal监听了MySQL的Binlog文件流,将其解析成增量数据,并使用MQ进行异步传输。

Canal的工作流程如下图所示:

![image.png](https://cdn.nlark.com/yuque/0/2021/png/97322/1635351589653-441d43f2-dc83-48f3-84b2-84b8d793f44c.png#clientId=u8455a5d5-5e5c-4&from=paste&height=274&id=u7466c167&margin=%5Bobject%20Object%5D&name=image.png&originHeight=548&originWidth=758&originalType=binary&ratio=1.562907268170426&size=120593&status=done&style=none&taskId=uc0daba54-6c31-44f7-be6c-6381b57ee6b&width=379)

Canal由三个主要组件组成:

1. Canal Server:用于从MySQL的binlog接收增量数据,并以自定义格式发送数据到不同的消费者。Canal Server可以纯粹地运行在内存中,只需要很少的资源和CPU。

2. Canal Client:作为Canal Server的消费者,接收从Canal Server传送过来的增量数据,并进行解析和处理。Canal Client可以通过自定义的处理逻辑将增量数据转换为状态消息、数据变更事件或数据操作语句。

3. Canal Admin:提供Web界面和API,用于管理Canal Server和Canal Client的配置、启动和关闭。

二、Canal的使用

1. 下载Canal Server并解压:http://github.com/alibaba/canal/releases

2. 修改Canal Server的配置文件:conf/canal.properties,其中需要配置MySQL的主机名、端口、用户名、密码等信息。也可以在该配置文件中启用其他功能,如过滤Binlog的表、字段、DML操作类型、DDL操作类型等。

3. 启动Canal Server:bin/startup.sh

4. 下载Canal Client的依赖包:http://maven.aliyun.com/nexus/content/groups/public/com/alibaba/otter/canal/。如果使用的是Maven,则可以在pom.xml文件中添加以下依赖:


com.alibaba.otter
canal.client
${canal.version}

5. 编写Canal Client程序:Canal提供了基于Java、Python、C++等语言的客户端SDK。以Java为例,可以按照以下步骤进行:

5.1. 定义Canal Client的配置文件:

canal.instance.master.address=127.0.0.1:11111
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset=UTF-8
canal.instance.tsdb.enable=true
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb?useSSL=false
canal.instance.gtidon=false
canal.instance.filter.regex=.*\\..*

5.2. 定义Canal Client的监听器:

public class CanalClientTest {

public static void mn(String[] args) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
"example", "root", "123456");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 1200;
while (emptyCount
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// 解析并处理更新数据
printEntry(message.getEntries());
}

connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
connector.disconnect();
}
}

private static void printEntry(List entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}

RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}

EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}

private static void printColumn(List columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}

}

5.3. 启动Canal Client程序即可监听、解析和处理MySQL的增量数据。

Canal是一个非常强大的基于MySQL的增量数据同步工具,它可以在跨机房、跨数据中心的异构系统之间实现高效的数据同步。在实际应用中,我们只需要编写Canal Client程序即可实现对MySQL的增量数据进行灵活、高效的处理。


数据运维技术 » 解析MySQL的变化数据流Canal(mysql中canal)