CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)); ALTER TABLE products AUTO_INCREMENT = 10;
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistent black wind breaker"), (default,"cloud","test for databend"), (default,"spare tire","24 inch spare tire");
Databend 中建表
1
CREATE TABLE bend_products (id INT NOT NULL, name VARCHAR(255) NOT NULL, description VARCHAR(512) );
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistent black wind breaker"), (default,"cloud","test for databend"), (default,"spare tire","24 inch spare tire");
这些数据会立即同步到 Databend 当中。
假如此时 MySQL 中更新了一条数据:
那么 id=10 的数据在 databend 中也会被立即更新:
环境清理
操作结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:
1
docker-compose down
在 Flink 所在目录 flink-1.16.0 下执行如下命令停止 Flink 集群:
1
./bin/stop-cluster.sh
结论
以上就是基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步的全部过程。
mysql flink connector is over 2.4.1
flin version is 1.17.1
-------------The End-------------
subscribe to my blog by scanning my public wechat account