实时采集Kafka数据还原Mysql案例
场景描述
在日常数据开发过程中,日志数据的实时处理和分析对于系统监控和故障排查具有重要意义。通过将日志数据从 Kafka 同步到 MySQL,可以方便地进行日志查询和分析。本次针对实时采集1.16版本输出演示Demo。
实时采集-数据还原 SourceMysql -> SinkMysql
前置环境准备
- 数栈6.2版本
- FLinkSQL1.16版本
- Mysql5.7数据源、Kafka2.x数据源
开发数据还原任务
实时采集任务来源表Kafka、目标表Mysql 通过数据还原的方式
配置采集来源表Kafka,本次Demo通过Format 为OGG格式进行数据还原,将Kafka Topic中 Insert、Update、Delete操作实时的还原到目标表。
配置采集目标表Mysql,选择需要还原写入的目标表(支持多选)。Kafka Format用于指定来源端的JSON格式,目前仅支持OGG格式。
配置表、字段的映射关系
字段映射关系配置
来源Kafka Topic 中Json数据结构,可以通过输入Json样例数据,自动解析获取。也可以手动输入Json中的表名、数据结构。
// Json样例
{
"table":"automation.fanshu_001",
"op_type":"D",
"op_ts":"2025-01-05 18:45:36.000000",
"current_ts":"2025-01-05 18:45:36.000000",
"pos":"00000000000000005272",
"before":{
"id":"1",
"name":"dtstream_update",
"age":"13124"
}
}目标端数据结构,平台自动查询元数据获取。
数据映射支持点击同行映射或同名映射匹配数据的映射关系。
字段添加支持采集元数据字段,例如:ogg Format 中的 table、current_ts、op_type、op_ts、pos这类元数据字段。
脚本预览
{
"job":{
"content":[
{
"nameMapping":{
"columnMappings":{
"automation.fanshu_001":{
"name":"name",
"id":"id",
"age":"age"
}
},
"identifierMappings":{
"automation.fanshu_001":"automation.fanshu_001"
}
},
"reader":{
"parameter":{
"mode":"latest-offset",
"codec":"json",
"nullReplaceNotExistsField":false,
"groupId":"default",
"deserialization":"ogg",
"topic":"dtstream_one",
"componentVersion":"1.16",
"consumerSettings":{
"zookeeper.connect":"",
"bootstrap.servers":"localhost:9092",
"auto.commit.interval.ms":"1000",
"auto.offset.reset":"latest"
},
"addMessage":false,
"tableSchema":{
"automation.fanshu_001":[
{
"name":"id",
"type":"string"
},
{
"name":"name",
"type":"string"
},
{
"name":"age",
"type":"string"
}
]
}
},
"name":"kafkareader"
},
"writer":{
"parameter":{
"password":"******",
"isFullColumnUpdateInReduction":false,
"allReplace":true,
"connection":[
{
"jdbcUrl":"jdbc:mysql://localhost:3306/automation",
"column":[
"*"
],
"table":[
"*"
]
}
],
"writeMode":"insert",
"username":"drpeco"
},
"name":"mysqlwriter"
}
}
],
"setting":{
"restore":{
"isRestore":true,
"isStream":true
},
"errorLimit":{},
"speed":{
"readerChannel":1,
"writerChannel":1,
"bytes":-1048576,
"channel":1
}
}
}
}任务提交任务运维中心,任务运行中采集输出至目标表Mysql。
来源表 Kafka Topic 插入 Insert语句,目标表 Mysql 插入操作完成。
Kafka Topic Insert语句
{
"table":"automation.fanshu_001",
"op_type":"I",
"op_ts":"2025-01-05 18:45:36.000000",
"current_ts":"2025-01-05 18:45:36.000000",
"pos":"00000000000000002928",
"after":{
"id":"1",
"name":"dtstream",
"age":"1314"
}
}目标表Mysql查看表数据插入
来源表 Kafka Topic 插入 Update语句,目标表 Mysql 更新操作完成。
Kafka Topic Update语句
{
"table":"automation.fanshu_001",
"op_type":"U",
"op_ts":"2025-01-05 18:45:36.000000",
"current_ts":"2025-01-05 18:45:36.000000",
"pos":"00000000000000004300",
"before":{
"id":"1",
"name":"dtstream",
"age":"1314"
},
"after":{
"id":"1",
"name":"dtstream_update",
"age":"13124"
}
}目标表Mysql查看表数据更新
来源表 Kafka Topic 插入 Delete语句,目标表 Mysql 删除操作完成。
Kafka Topic Delete语句
{
"table":"automation.fanshu_001",
"op_type":"D",
"op_ts":"2025-01-05 18:45:36.000000",
"current_ts":"2025-01-05 18:45:36.000000",
"pos":"00000000000000005272",
"before":{
"id":"1",
"name":"dtstream_update",
"age":"13124"
}
}目标表Mysql查看表数据删除