实时采集Mysql全量数据还原案例
场景描述
在数据处理流程中,数据还原是一个关键环节。它基于日志采集,将采集到的日志数据转换为下游数据库的 DDL(Data Definition Language,数据定义语言)和 DML(Data Manipulation Language,数据操纵语言)执行语句。这种转换使得可以直接对下游数据库的结构和数据进行变更。本次针对实时采集1.16版本输出演示Demo。
实时采集-数据还原 SourceMysql -> SinkMysql
前置环境准备
- 数栈6.2版本
- FLinkSQL1.16版本
- Mysql5.7数据源
开发数据还原任务
实时采集任务来源表Mysql、目标表Mysql 通过数据还原的方式
配置采集来源表Mysql,支持通过增量、全量+增量的方式进行数据还原,本次选择全量+增量的方式,会将Mysql来源表存量的数据先写入目标数据源,存量数据写入完成后再开始增量数据的实时写入动作。
配置采集目标表Mysql,写入表选择自定义表名支持用户自定义新建表或使用相同表名写入目标表,表名拼装规则中${tableName}为来源表名称,大小写转换不变,DDL自动执行不需要「任务运维-运行信息-待处理的DDL操作」中手动触发执行按钮。
实时采集脚本预览
{
"job" : {
"content" : [ {
"nameMapping" : {
"casing" : "UNCHANGE",
"identifierMappings" : {
// 转换规则
"automation.*" : "automation.${tableName}_0115"
}
},
// 来源表相关配置
"reader" : {
"parameter" : {
"fetchFilterSql" : "id > 10",
"ddlSkip" : false,
"enableFetchAll" : true,
"start" : { },
"readPosition" : "current",
"pavingData" : false,
"password" : "******",
"split" : true,
"port" : 3306,
"cat" : "insert,update,delete",
"host" : "localhost",
"jdbcUrl" : "jdbc:mysql://localhost:3306/automation",
"table" : [ "fanshu_001" ],
"initialTableStructure" : true,
"username" : "drpeco"
},
"name" : "binlogreader"
},
// 目标表相关配置
"writer" : {
"parameter" : {
"password" : "******",
"executeDdlAble" : true,
"connection" : [ {
"jdbcUrl" : "jdbc:mysql://localhost:3306/automation",
"table" : [ "*" ]
} ],
"writeMode" : "insert",
"username" : "drpeco"
},
"name" : "mysqlwriter"
},
// 存储DDL语句之后的所有操作
"restoration" : {
"cache" : {
"cacheTimeout" : 60000,
"cacheSize" : 100000,
"maxBytes" : 104857600,
"type" : "mysql",
"properties" : {
"database" : "automation",
"password" : "******",
"table" : "transaction_data_FlinkX_Mysql_Mysql",
"url" : "jdbc:mysql://localhost:3306/automation",
"username" : "drpeco"
}
},
// 存储日志对应操作位点操作记录
"workerMax" : 3,
"workerSize" : 3,
"stateRecover" : {
"type" : "mysql",
"properties" : {
"database" : "automation",
"password" : "******",
"table" : "FlinkX_Mysql_Mysql_state_recover",
"url" : "jdbc:mysql://localhost:3306/automation",
"username" : "drpeco"
}
},
// 存储最新待执行的DDL操作
"workerNum" : 2,
"ddl" : {
"fetchInterval" : 3000,
"type" : "mysql",
"properties" : {
"database" : "automation",
"password" : "******",
"table" : "ddl_change_FlinkX_Mysql_Mysql",
"url" : "jdbc:mysql://localhost:3306/automation",
"username" : "drpeco"
}
}
}
} ],
"setting" : {
"restore" : {
"isRestore" : true,
"isStream" : true
},
"errorLimit" : { },
"speed" : {
"readerChannel" : 1,
"writerChannel" : 1,
"bytes" : -1048576,
"channel" : 1
}
}
}
}
任务提交任务运维中心,任务运行中采集输出至目标表Mysql,在运行前程序会预先判断表是否存在,当前Demo目标表不存在会自动创建一张与来源表同样的Mysql表名为自定义表名 ${tableName}_0115
数据写入查看Metric写入情况,写入条数比插入的数据多一条是创建目标表的统计数据。最后通过Count查询目标表数量与来源表过滤后的数据一致,输入写入成功。随后增量写入的数据会实时的写入目标表。