使用Flink开发的实时ETL,数据从MySQL到Greenplum。使用canal解析MySQL的binlog,投放进kafka,使用Flink消费kafka并把数据组装进Greenplum,后续将会添加更多的数据源和目标源。
- 利用canal解析MySQL的binary log,并将解析的log投入kafka中。
- 使用mriya消费kafka中的消息,还原MySQL 的增删改。
- 将MySQL的增删改转义成目标源的增删改语句
- 基于binlog的近实时同步ETL
- 支持自动化表创建,自动化DDL变更同步
- 使用nacos注册中心同步,变更配置不需要重启
- 后续添加支持多目标源
-
支持近实时级别的数据增删改
-
支持自动创建表
CREATE TABLE [IF NOT EXISTS] tbl_name create_definition: {...}
- 支持MySql表结构的变更
ALTER TABLE tbl_name
| ADD [COLUMN] col_name column_definition
| ADD [COLUMN] (col_name column_definition,...)
| DROP [COLUMN] col_name
| MODIFY [COLUMN] col_name column_definition
-
支持主键的修改
-
删除表
-
修改表名
- 从kafka中获取canal解析完成的MySQLBinary log。
- 使用Flink的keyBy对targetTable进行分组,并使用时间窗口。
- 自定义一个trigger,触发事件为解析到DDL语句。
- 步骤2和步骤3组成,时间窗口+自定义trigger组合使用,如果没有DDL语句则根据时间进行滚动,如果存在DDL语句数据立即滚动。
- 定义aggregate,将同一张表的数据进行合并去重
- 自定义Sink,定义GreenplumSink或者其他目标数据源。
git clone https://github.com/JeasonPeople/mriya.git
cd mriya/docker-compose/
docker-compose up
- 访问http://docker-ip:8848/nacos修改配置(默认账号nacos/nacos) 在public下新增Properties文件, Data ID=MRIYA, group=MRIYA_GROUP
mriya.source.kafka.bootstrap.servers=kafka:9092
mriya.source.kafka.zookeeper.connect=zk:2181
mriya.source.kafka.group.id=dw-etl-prod-gp6
mriya.source.kafka.auto.offset.reset=earliest
mriya.source.kafka.topic=mriya
mriya.target.datasource.type=greenplum
mriya.target.datasource.url=jdbc:postgresql://greenplum:5432/mriya?serverTimezone=GMT+8
mriya.target.datasource.schema=dw_ods
mriya.target.datasource.username=gpadmin
mriya.target.datasource.password=pivotal
# 支持freemarker语法,${table}为必写项
mriya.table.name.template=${topic}_${database}_${table}
# psql -d template1 -c "alter user gpadmin password 'pivotal'"
# mriya.message.filer=${topic}-${database}-${table}
# mriya.message.filer=mes-accounting_bak-*
- 使用gpadmin账号连接greenplum创建database以及schema(默认账号root/pivotal gpadmin/pivotal)
CREATE DATABASE "mriya";
CREATE SCHEMA "dw_ods";
-
使用连接工具连接MySql(默认账号root/Mriya@Mriya)运行sql
CREATE DATABASE `mriya`;
CREATE TABLE `mriya`.`table_1` (
`k1` int(10) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`c1` varchar(255) NULL,
`c2` varchar(255) NULL,
`c3` varchar(255) NULL,
`c4` datetime(2) NULL,
PRIMARY KEY (`k1`)
);
- 安装MySql
- 安装canal
- 安装kafka
- 安装zookeeper
1-4 安装教程(https://github.com/alibaba/canal/wiki)
- 安装配置中心nacos
nacos 安装教程(https://nacos.io/zh-cn/docs/deployment.html)
- 安装Flink
- 安装Greenplum
docker安装Greenplum
docker pull datagrip/greenplum
docker run -it -p 5432:5432 datagrip/greenplum
用户名: gpadmin 密码: pivotal 用户名: root 密码: pivotal
- 使用源码编译
git clone https://github.com/JeasonPeople/mriya.git
cd mriya
mvn install -Dmaven.test.skip=true
cd mriya-flink/target
将打包好的jar包通过Flink Web上传并执行