数据同步工具考古
2024-2-7 15:21:3 Author: cloudsjhan.github.io(查看原文) 阅读量:56 收藏

前言

在过去的一年里,我除了日常的 Databend Cloud 开发工作之外,还围绕着 Databend 做了大量的数据生态工作。这包括在 Databend Native 协议的基础上提供了多语言的 SDK,并在 SDK 基础上增加了对诸如 TableauMetabaseSuperSetRedash等BI工具的支持。此外,我还完善了关于数据传输和实时同步领域的生态,涉及到 Debezium Engine、Flink CDC、Kafka Connect、Airbyte、Datax Plugin、TapData 等工具。

在这一系列的工作中,数据同步是一个非常关键的环节,它构建了用户从外部数据、数据库到 Databend 的桥梁。起初我对于一些流行的数据同步工具只是略有耳闻,有些甚至从未听说过,更不了解它们的内部原理。通过一段时间的摸索和开发,成功实现了几个工具的整合之后,我对这些工具和平台有了更深入的了解。

所以在本文中,我将对几个目前广泛使用、比较流行的数据同步工具进行考古。这包括它们的发展历史、技术特点以及在实际应用中的使用情况。通过对这些工具的分析介绍,在给自己做笔记的同时希望能够为其他读者提供对数据同步领域的全面认识。

数据同步工具

Debezium

1
Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.

Debezium 是一种 CDC(Change Data Capture)工具,工作原理类似大家所熟知的 Canal, DataBus, Maxwell 等,是通过抽取数据库日志来获取变更事件。Debezium 最初设计成一个 Kafka Connect 的 Source Plugin,目前开发者虽致力于将其与 Kafka Connect 解耦。下图引自 Debeizum 官方文档,可以看到一个 Debezium 在一个完整 CDC 系统中的位置。

img

Kafka Connect 为 Source Plugin 提供了一系列的编程接口,最主要的就是要实现 SourceTask 的 poll 方法,其返回 List 将会被以最少一次语义(At Least Once)的方式投递至 Kafka。

Debezium 抽取原理

下图是 Debezium 从 PG 中抽取数据到 kafka topic 的流程,对于 PG 连接器来说是从逻辑复制流读取到变更日志,经由 kafka 发送到下游的数据处理服务以及相应的 writer plugin。

img

再来看下 Debezium MySQL Reader 的代码,Reader体系构成了 MySQL 模块中代码的主线。

Reader 的继承关系如下图所示:

img

SnapshotReader 和 BinlogReader 分别实现了对 MySQL 数据的全量读取和增量读取,他们继承于 AbstractReader,里面封装了一些共用的逻辑代码。AbstractReader 的流程如下图所示:

image-20240207164331400

从图中可以看到 AbstractReader 在实现时,并没有直接将 enqueue 进来的 event record 直接写到 Kafka,而是通过一个内存阻塞队列BlockingQueue进行了解耦,这样写的好处是:

  1. 职责解耦

Event Record 在进入 BlockingQueue之前,要根据条件判断是否接受该 record;在向 Kafka 投递 record 之前,判断 task 的 running 状态。

  1. 线程隔离

BlockingQueue 是一个线程安全的阻塞队列,通过 BlockingQueue 实现的生产者消费者模型,是可以跑在不同的线程里的,这样避免局部的阻塞带来的整体的干扰。如上图所示,系统会定期判断 running 标志位,若 running 被stop信号置为了false,可以立刻停止整个task,而不会因 MySQL IO 阻塞延迟响应。

  1. 单条与Batch的互相转化

Enqueue record 是单条的投递 record,drain_to 是批量的消费 records。这个用法也可以反过来,实现 batch 到 single 的转化。

而 MySQL Connector 每日次获取 snapshot 的时候基本上是沿用了 MySQL 官方从库搭建的方案,详细步骤是:

  1. 获取一个全局读锁,从而阻塞住其他数据库客户端的写操作。
  2. 开启一个可重复读语义的事务,来保证后续的在同一个事务内读操作都是在一个一致性快照中完成的。
  3. 读取binlog的当前位置。
  4. 读取连接器中配置的数据库和表的模式(schema)信息。
  5. 释放全局读锁,允许其他的数据库客户端对数据库进行写操作。
  6. (可选)把DDL改变事件写入模式改变 topic(schema change topic),包括所有的必要的DROP和CREATEDDL语句。
  7. 扫描所有数据库的表,并且为每一个表产生一个和特定表相关的kafka topic创建事件(即为每一个表创建一个kafka topic)。
  8. 提交事务。
  9. 记录连接器成功完成快照任务时的连接器偏移量。

部署方式

基于 Kafka Connect

最常见的架构是通过 Apache Kafka Connect 部署 Debezium。Kafka Connect 为在 Kafka 和外部存储系统之间系统数据提供了一种可靠且可伸缩性的方式。它为 Connector 插件提供了一组 API 和一个运行时:Connect 负责运行这些插件,它们则负责移动数据。通过 Kafka Connect 可以快速实现 Source Connector 和 Sink Connector 进行交互构造一个低延迟的数据 Pipeline:

  • Source Connector(例如,Debezium):将记录发送到 Kafka
  • Sink Connector:将 Kafka Topic 中的记录发送到其他系统

img

如上图所示,部署了 MySQL 和 PostgresSQL 的 Debezium Connector 以捕获这两种类型数据库的变更。每个 Debezium Connector 都会与其源数据库建立连接:

  • MySQL Connector 使用客户端库来访问 binlog。
  • PostgreSQL Connector 从逻辑副本流中读取数据。

除了 Kafka Broker 之外,Kafka Connect 也作为一个单独的服务运行。默认情况下,数据库表的变更会写入名称与表名称对应的 Kafka Topic 中。如果需要,您可以通过配置 Debezium 的 Topic 路由转换来调整目标 Topic 名称。例如,您可以:

  • 将记录路由到名称与表名不同的 Topic 中
  • 将多个表的变更事件记录流式传输到一个 Topic 中

变更事件记录在 Apache Kafka 中后,Kafka Connect 生态系统中的不同 Sink Connector 可以将记录流式传输到其他系统、数据库,例如 Elasticsearch、Databend、分析系统或者缓存。

基于Debezium Server

第二种部署 Debezium 的方法是使用 Debezium Server。Debezium Server 是一个可配置的、随时可用的应用程序,可以将变更事件从源数据库流式传输到各种消息中间件上。

下图展示了基于 Debezium Server 的变更数据捕获 Pipeline 架构:

img

Debezium Server 配置使用 Debezium Source Connector 来捕获源数据库中的变更。变更事件可以序列化为不同的格式,例如 JSON 或 Apache Avro,然后发送到各种消息中间件,例如 Amazon Kinesis、Apache Pulsar。

基于 Debezium Engine

使用 Debezium Connector 的另一种方法是基于 Debezium Engine。在这种情况下,Debezium 不会通过 Kafka Connect 运行,而是作为嵌入到用户自定义 Java 应用程序中的库运行。这对于在应用程序本身获取变更事件非常有帮助,无需部署完整的 Kafka 和 Kafka Connect 集群,也不用将变更流式传输到消息中间件上。这篇文章展示了如何使用 Debezium Databend Server 实现从 MySQL 的数据全增量同步。

特性总结

Debezium 是一组用于 Apache Kafka Connect 的 Source Connector。每个 Connector 都通过使用该数据库的变更数据捕获 (CDC) 功能从不同的数据库中获取变更。与其他方法(例如轮询或双重写入)不同,Debezium 的实现基于日志的 CDC:

  • 确保捕获所有的数据变更。
  • 以极低的延迟生成变更事件,同时避免因为频繁轮询导致 CPU 使用率增加。例如,对于 MySQL 或 PostgreSQL,延迟在毫秒范围内。
  • 不需要更改数据模型,例如增加 ‘Last Updated’ 列。
  • 可以捕获删除操作。
  • 可以捕获旧记录状态以及其他元数据,例如,事务 ID,具体取决于数据库的功能和配置。

Flink CDC 是基于数据库日志 CDC(Change Data Capture)技术的实时数据集成框架,支持了全增量一体化、无锁读取、并行读取、表结构变更自动同步、分布式架构等高级特性。Flink CDC 发展了三年多的时间。2020 年,从作为个人的 Side project 出发,后来被越来越多人认识到。

image-20240207172125425

1.0

Flink CDC 1.0 比简单,直接就是底层封装了 Debezium, 而 Debezium 同步一张表分为两个阶段:

  • 全量阶段:查询当前表中所有记录;
  • 增量阶段:从 binlog 消费变更数据。

大部分用户使用的场景都是全量 + 增量同步,加锁是发生在全量阶段,目的是为了确定全量阶段的初始位点,保证增量 + 全量实现一条不多,一条不少,从而保证数据一致性。从下图中我们可以分析全局锁和表锁的一些加锁流程,左边红色线条是锁的生命周期,右边是 MySQL 开启可重复读事务的生命周期。

image-20240207172616108

以全局锁为例,首先是获取一个锁,然后再去开启可重复读的事务。这里锁住操作是读取 binlog 的起始位置和当前表的 schema。这样做的目的是保证 binlog 的起始位置和读取到的当前 schema 是可以对应上的,因为表的 schema 是会改变的,比如如删除列或者增加列。在读取这两个信息后,SnapshotReader 会在可重复读事务里读取全量数据,在全量数据读取完成后,会启动 BinlogReader 从读取的 binlog 起始位置开始增量读取,从而保证全量数据 + 增量数据的无缝衔接。

表锁是全局锁的退化版,因为全局锁的权限会比较高,因此在某些场景,用户只有表锁。表锁锁的时间会更长,因为表锁有个特征:锁提前释放了可重复读的事务默认会提交,所以锁需要等到全量数据读完后才能释放。而 FLUSH TABLE WITH READ LOCK 的操作会存在以下问题:

  1. 该命令等待所有正在进行的 update 完成,同时阻止所有新来的 update;
  2. 执行成功之前必须等待所有正在运行的 select 完成,等待执行的 update 就会等待的更久。
  3. 会阻止其他的事务 commit。

所以对于加锁来说在时间上是不确定的,严重的可能会 hang 住数据库。

当然,Flink CDC 1.x 版本也可以不加锁,但是会丢失一定的数据准确性。总的来说 1.x 存在的问题是:

  • 全量 + 增量读取的过程需要保证所有数据的一致性,因此需要通过加锁保证,但是加锁在数据库层面上是一个十分高危的操作。底层 Debezium 在保证数据一致性时,需要对读取的库或表加锁,全局锁可能导致数据库锁住,表级锁会锁住表的读,DBA 一般不给锁权限。
  • 不支持水平扩展,因为 Flink CDC 底层是基于 Debezium,起架构是单节点,所以Flink CDC 只支持单并发。在全量阶段读取阶段,如果表非常大 (亿级别),读取时间在小时甚至天级别,用户不能通过增加资源去提升作业速度。
  • 全量读取阶段不支持 checkpoint:CDC 读取分为两个阶段,全量读取和增量读取,目前全量读取阶段是不支持 checkpoint 的,因此会存在一个问题:当我们同步全量数据时,假设需要 5 个小时,当我们同步了 4 小时的时候作业失败,这时候就需要重新开始,再读取 5 个小时。
2.0

了解了 1.0 的痛点之后,2.0 主要解决的问题有三个:支持无锁、水平扩展、checkpoint。

image-20240207174105516

Flink CDC 2.0 在借鉴了 Netflix 这篇论文 之后可以做到全程无锁和并发读取。

3.0

图片

Flink CDC 3.0 拥有了更多数据同步过程中的实用特性,成为了一个端到端的数据集成框架:

  • 上游 schema 变更自动同步到下游,已有作业支持动态加表
  • 空闲资源自动回收,一个 sink 实例支持写入多表
  • API 设计直接面向数据集成场景,帮助用户轻松构建同步作业

Databend 也提供了 Flink-Databend-Connector,这篇文章展示了如何使用 databend connector 实现从 MySQL 的数据同步。

Canal

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的canal支持源端MySQL版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议
  • MySQL master收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

Binlog获取详解

Binlog发送接收流程,流程如下图所示:

img

首先,我们需要伪造一个slave,向master注册,这样master才会发送binlog event。注册很简单,就是向master发送COM_REGISTER_SLAVE命令,带上slave相关信息。这里需要注意,因为在MySQL的replication topology中,都需要使用一个唯一的server id来区别标示不同的server实例,所以这里我们伪造的slave也需要一个唯一的server id。

接着实现binlog的dump。MySQL只支持一种 binlog dump方式,也就是指定binlog filename + position,向master发送COM_BINLOG_DUMP命令。在发送dump命令的时候,我们可以指定flag为BINLOG_DUMP_NON_BLOCK,这样master在没有可发送的binlog event之后,就会返回一个EOF package。不过通常对于slave来说,一直把连接挂着可能更好,这样能更及时收到新产生的binlog event。

Canal 架构

img

  • server代表一个canal运行实例,对应于一个jvm,也可以理解为一个进程
  • instance对应于一个数据队列 (1个server对应1..n个instance),每一个数据队列可以理解为一个数据库实例。

instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。

抽象了CanalInstanceGenerator,主要是考虑配置的管理方式:

manager方式:和你自己的内部web console/manager系统进行对接。(目前主要是公司内部使用,Otter采用这种方式) spring方式:基于spring xml + properties进行定义,构建spring配置.

下面是 canalServer 和 instance 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {

public CanalInstance generate(String destination) {
Canal canal = canalConfigClient.findCanal(destination);
// 此处省略部分代码 大致逻辑是设置canal一些属性

CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, filter) {

protected CanalHAController initHaController() {
HAMode haMode = parameters.getHaMode();
if (haMode.isMedia()) {
return new MediaHAController(parameters.getMediaGroup(),
parameters.getDbUsername(),
parameters.getDbPassword(),
parameters.getDefaultDatabaseName());
} else {
return super.initHaController();
}
}

protected void startEventParserInternal(CanalEventParser parser, boolean isGroup) {
//大致逻辑是 设置支持的类型
//初始化设置MysqlEventParser的主库信息,这处抽象不好,目前只支持mysql
}

};
return instance;
}
});
canalServer.start(); //启动canalServer

canalServer.start(destination);//启动对应instance
this.clientIdentity = new ClientIdentity(destination, pipeline.getParameters().getMainstemClientId(), filter);
canalServer.subscribe(clientIdentity);// 发起一次订阅,当监听到instance配置时,调用generate方法注入新的instance

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

基于 Canal 的实现原理来看,Canal 并不支持全量+增量的同步模式。

Airbyte

Airbyte 是一个比较新的数据集成平台,提供了非常多的 connector,号称支持上千种数据源。Databend 也提供了Airbyte Destination Connecto,用户可以从 Airbyte 支持的上百个 source connector 比如 mysql, es, pg 等同步数据到 Databend。

Airbyte 的优势是支持的数据源非常多,甚至像 Google Docs, Facebook 都支持。 Airbyte 是 ELT 模式,先抽取数据到目标表后,再进行清洗。但是缺点也非常明显,Airbyte 是一个打包的平台,所有的数据源都要被集成到里面,所以使用起来非常地重。并且Airbyte 同步过来的数据是一张大宽表,依赖 dbt Normalization 或者一些 ELT 工作才能够表展开。所以适合的是数据源多,并且需要统一管理的场景。

DataX

DataX 是阿里开源的一个异构数据源离线同步工具,能够实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive等各种异构数据源之间稳定高效的数据同步功能。DataX 本身作为数据同步框架,将不同数据源的同步, 抽象为从 source 端读取数据的 Reader 插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源的数据同步工作。

Databend 提供了 Databend Writer 的 Datax Plugin , 可以支持从任意具有 Datax Reader 插件的数据库同步数据到 Databend,并且支持全量insert 和 upsert 两种同步模式。Datax 最适合的场景是 T+1 的离线数据同步。

总结

CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:

基于查询的 CDC:

  • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
  • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
  • 不保障实时性,基于离线调度存在天然的延迟。

基于日志的 CDC:

  • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
  • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
  • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。

对比常见的开源 CDC 方案,我们可以发现,对比增量同步能力:

  • 基于日志的方式,可以很好的做到增量同步;
  • 而基于查询的方式是很难做到增量同步的。
  • 对比全量同步能力,基于查询或者日志的 CDC 方案基本都支持,除了 Canal。
  • 而对比全量 + 增量同步的能力,只有 Flink CDC、Debezium 支持较好。
  • 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。

  • 在数据转换 / 数据清洗能力上,当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗

  • 在 Flink CDC 上操作很简单,可以通过 Flink SQL 去操作;
  • 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。

另外,在生态方面,这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 Databend、MySQL、Pg、ClickHouse 等常见的一些系统,也支持各种自定义 connector。



文章来源: https://cloudsjhan.github.io/2024/02/07/%E6%95%B0%E6%8D%AE%E5%90%8C%E6%AD%A5%E5%B7%A5%E5%85%B7%E8%80%83%E5%8F%A4/
如有侵权请联系:admin#unsafe.sh