Skip to main content

实时采集Kafka数据还原Mysql案例

场景描述

在日常数据开发过程中,日志数据的实时处理和分析对于系统监控和故障排查具有重要意义。通过将日志数据从 Kafka 同步到 MySQL,可以方便地进行日志查询和分析。本次针对实时采集1.16版本输出演示Demo。

实时采集-数据还原 SourceMysql -> SinkMysql

前置环境准备

  • 数栈6.2版本
  • FLinkSQL1.16版本
  • Mysql5.7数据源、Kafka2.x数据源

开发数据还原任务

  • 实时采集任务来源表Kafka、目标表Mysql 通过数据还原的方式

    FlinkX_kafka_ogg_mysql-5.png

  • 配置采集来源表Kafka,本次Demo通过Format 为OGG格式进行数据还原,将Kafka Topic中 Insert、Update、Delete操作实时的还原到目标表。

    FlinkX_kafka_ogg_mysql-1.png

  • 配置采集目标表Mysql,选择需要还原写入的目标表(支持多选)。Kafka Format用于指定来源端的JSON格式,目前仅支持OGG格式。

    FlinkX_kafka_ogg_mysql-2.png

  • 配置表、字段的映射关系

    FlinkX_kafka_ogg_mysql-3.png

    • 字段映射关系配置

      • 来源Kafka Topic 中Json数据结构,可以通过输入Json样例数据,自动解析获取。也可以手动输入Json中的表名、数据结构。

        // Json样例
        {
        "table":"automation.fanshu_001",
        "op_type":"D",
        "op_ts":"2025-01-05 18:45:36.000000",
        "current_ts":"2025-01-05 18:45:36.000000",
        "pos":"00000000000000005272",
        "before":{
        "id":"1",
        "name":"dtstream_update",
        "age":"13124"
        }
        }
      • 目标端数据结构,平台自动查询元数据获取。

      • 数据映射支持点击同行映射或同名映射匹配数据的映射关系。

      • 字段添加支持采集元数据字段,例如:ogg Format 中的 table、current_ts、op_type、op_ts、pos这类元数据字段。

      FlinkX_kafka_ogg_mysql-4.png

  • 脚本预览

    {
    "job":{
    "content":[
    {
    "nameMapping":{
    "columnMappings":{
    "automation.fanshu_001":{
    "name":"name",
    "id":"id",
    "age":"age"
    }
    },
    "identifierMappings":{
    "automation.fanshu_001":"automation.fanshu_001"
    }
    },
    "reader":{
    "parameter":{
    "mode":"latest-offset",
    "codec":"json",
    "nullReplaceNotExistsField":false,
    "groupId":"default",
    "deserialization":"ogg",
    "topic":"dtstream_one",
    "componentVersion":"1.16",
    "consumerSettings":{
    "zookeeper.connect":"",
    "bootstrap.servers":"localhost:9092",
    "auto.commit.interval.ms":"1000",
    "auto.offset.reset":"latest"
    },
    "addMessage":false,
    "tableSchema":{
    "automation.fanshu_001":[
    {
    "name":"id",
    "type":"string"
    },
    {
    "name":"name",
    "type":"string"
    },
    {
    "name":"age",
    "type":"string"
    }
    ]
    }
    },
    "name":"kafkareader"
    },
    "writer":{
    "parameter":{
    "password":"******",
    "isFullColumnUpdateInReduction":false,
    "allReplace":true,
    "connection":[
    {
    "jdbcUrl":"jdbc:mysql://localhost:3306/automation",
    "column":[
    "*"
    ],
    "table":[
    "*"
    ]
    }
    ],
    "writeMode":"insert",
    "username":"drpeco"
    },
    "name":"mysqlwriter"
    }
    }
    ],
    "setting":{
    "restore":{
    "isRestore":true,
    "isStream":true
    },
    "errorLimit":{},
    "speed":{
    "readerChannel":1,
    "writerChannel":1,
    "bytes":-1048576,
    "channel":1
    }
    }
    }
    }
  • 任务提交任务运维中心,任务运行中采集输出至目标表Mysql。

  • 来源表 Kafka Topic 插入 Insert语句,目标表 Mysql 插入操作完成。

    • Kafka Topic Insert语句

      {
      "table":"automation.fanshu_001",
      "op_type":"I",
      "op_ts":"2025-01-05 18:45:36.000000",
      "current_ts":"2025-01-05 18:45:36.000000",
      "pos":"00000000000000002928",
      "after":{
      "id":"1",
      "name":"dtstream",
      "age":"1314"
      }
      }
    • 目标表Mysql查看表数据插入

      FlinkX_kafka_ogg_mysql-6.png

  • 来源表 Kafka Topic 插入 Update语句,目标表 Mysql 更新操作完成。

    • Kafka Topic Update语句

      {
      "table":"automation.fanshu_001",
      "op_type":"U",
      "op_ts":"2025-01-05 18:45:36.000000",
      "current_ts":"2025-01-05 18:45:36.000000",
      "pos":"00000000000000004300",
      "before":{
      "id":"1",
      "name":"dtstream",
      "age":"1314"
      },
      "after":{
      "id":"1",
      "name":"dtstream_update",
      "age":"13124"
      }
      }
    • 目标表Mysql查看表数据更新

      FlinkX_kafka_ogg_mysql-7.png

  • 来源表 Kafka Topic 插入 Delete语句,目标表 Mysql 删除操作完成。

    • Kafka Topic Delete语句

      {
      "table":"automation.fanshu_001",
      "op_type":"D",
      "op_ts":"2025-01-05 18:45:36.000000",
      "current_ts":"2025-01-05 18:45:36.000000",
      "pos":"00000000000000005272",
      "before":{
      "id":"1",
      "name":"dtstream_update",
      "age":"13124"
      }
      }
    • 目标表Mysql查看表数据删除

      FlinkX_kafka_ogg_mysql-8.png