Skip to main content

实时采集Mysql多表采集过滤字段案例

场景描述

某银行作为一家区域性商业银行,正在进行数字化转型,核心业务系统(如信用卡交易、客户管理、账户管理等)采用MySQL数据库存储数据。为满足实时风控、客户画像分析、监管报送等需求,银行需要将MySQL的binlog变更数据实时采集到大数据平台(如Flink + Kafka),由于监管数据安全 合规要求,在银行涉及多张业务表采集时需要过滤敏感字段。

  • 现有CDC方案缺乏灵活过滤能力

当前使用Debezium或Canal采集binlog,但无法在采集阶段过滤敏感字段。

  • Chunjun实现原理方案

使用Mysql-X采集binlog日志增加过滤条件,支持配置filterColumns过滤参数,同时支持多表以及正则过滤条件。

实时采集Mysql多表过滤

前置环境准备

  • 数栈6.2版本
  • FLinkSQL1.16版本
  • Mysql5.7数据源、Kafka2.x数据源
  • Mysql来源表构建

开发实时采集任务

  • 实时采集任务来源表Mysql、目标表kafka 通过日志采集的方式进行数据采集

    flinkx_1.png

  • 配置采集来源表Mysql,选择Source采集表,支持多表过滤字段。

    mysql-source-2.png

  • 配置采集Mysql源表高级配置,增加filterColumns过滤参数。

       
    {"filterColumns":"employee_info.bank_account"} -- 单表过滤字段样例

    {"filterColumns":"(customer_info.customer_.*)"} -- 正则表达式过滤样例

    mysql-source-3.png

  • 配置目标表kafka

    • 目标配置Kafka Topic ,选择单个Topic将采集多个来源表写入同一个Topic中

      kafka.png

  • 脚本预览

    {
    "job" : {
    "content" : [ {
    "reader" : {
    "parameter" : {
    "password" : "******",
    "port" : 3306,
    "cat" : "insert,update,delete",
    "host" : "localhost",
    "jdbcUrl" : "jdbc:mysql://localhost:3306/automation",
    "start" : { },
    "filterColumns" : "customer_info.mobile_phone|customer_info.id_card|order_info.payment_card_no|employee_info.mobile_phone|employee_info.id_card|employee_info.bank_account",
    "readPosition" : "current",
    "pavingData" : true,
    "table" : [ "customer_info", "order_info", "employee_info" ],
    "username" : "drpeco"
    },
    "name" : "binlogreader"
    },
    "writer" : {
    "parameter" : {
    "producerSettings" : {
    "zookeeper.connect" : "",
    "bootstrap.servers" : "localhost:9092"
    },
    "dataCompelOrder" : false,
    "topic" : "test"
    },
    "name" : "kafkawriter",
    "type" : 37
    }
    } ],
    "setting" : {
    "restore" : {
    "isRestore" : true,
    "isStream" : true
    },
    "errorLimit" : { },
    "speed" : {
    "readerChannel" : 1,
    "writerChannel" : 1,
    "bytes" : -1048576,
    "channel" : 1
    }
    }
    }
    }
  • 任务提交任务运维中心,任务运行中采集输出至目标表Mysql。

  • 来源表 Mysql表 插入 Insert语句操作完成。

    • Mysql表 插入 Insert语句
      INSERT INTO `employee_info` (`employee_no`, `employee_name`, `department`, `position`, `mobile_phone`, `id_card`, `bank_account`, `entry_date`) VALUES
      ('EMP1022', '赵七一', '财务部', '会计', '13500135003', '120106198712124560', '6222021234567890125', '2018-03-15');
  • 目标表 Kafka Topic 查询FlinkX采集并过滤后的数据

    • 在 「Topic管理」 中通过Topic名称 查询采集过滤后的数据

      topic管理采样.png

    • employee_info表 采集过滤:mobile_phone、id_card、bank_account 字段

      {"schema":"automation","after_employee_id":43,"lsn":"mysql_bin.000030/000000000391863999","after_employee_no":"EMP1022",
      "type":"INSERT","after_entry_date":"2018-03-15 00:00:00","after_position":"会计","after_create_time":"2025-03-27 15:52:37",
      "database":null,"after_employee_name":"赵七一","opTime":"1743061957000","after_id_card":"120106198712124560",
      "after_mobile_phone":"13500135003","table":"employee_info","ts":7310931560398721024,"after_department":"财务部",
      "after_update_time":"2025-03-27 15:52:37"
      }
    • customer_info表 采集过滤:mobile_phone、id_card 字段

      {"schema":"automation","lsn":"mysql_bin.000030/000000000392619317","after_customer_name":"李强","type":"INSERT",
      "after_create_time":"2025-03-27 16:18:37","after_email":"liqiang@example.com","database":null,"after_customer_id":5,
      "after_birth_date":"1991-11-08 00:00:00","opTime":"1743063517000","after_gender":"M","after_address":"广东省深圳市南山区科技园",
      "table":"customer_info","ts":7310938103953362944,"after_update_time":"2025-03-27 16:18:37"
      }

Mysql前置表SQL

  • 建表语句
CREATE TABLE `customer_info` (
`customer_id` bigint NOT NULL AUTO_INCREMENT COMMENT '客户ID',
`customer_name` varchar(50) NOT NULL COMMENT '客户姓名',
`gender` char(1) DEFAULT NULL COMMENT '性别(M-男,F-女)',
`birth_date` date DEFAULT NULL COMMENT '出生日期',
`mobile_phone` varchar(20) DEFAULT NULL COMMENT '手机号码(敏感)',
`id_card` varchar(18) DEFAULT NULL COMMENT '身份证号(敏感)',
`email` varchar(100) DEFAULT NULL COMMENT '电子邮箱',
`address` varchar(200) DEFAULT NULL COMMENT '联系地址',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`customer_id`),
KEY `idx_mobile` (`mobile_phone`),
KEY `idx_id_card` (`id_card`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='客户基本信息表';

CREATE TABLE `order_info` (
`order_id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`order_no` varchar(30) NOT NULL COMMENT '订单编号',
`customer_id` bigint NOT NULL COMMENT '客户ID',
`total_amount` decimal(12,2) NOT NULL COMMENT '订单总金额',
`payment_card_no` varchar(20) DEFAULT NULL COMMENT '支付卡号(敏感)',
`payment_card_name` varchar(50) DEFAULT NULL COMMENT '持卡人姓名',
`payment_time` datetime DEFAULT NULL COMMENT '支付时间',
`order_status` tinyint NOT NULL DEFAULT '0' COMMENT '订单状态',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`order_id`),
UNIQUE KEY `uk_order_no` (`order_no`),
KEY `idx_customer_id` (`customer_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单信息表';

CREATE TABLE `employee_info` (
`employee_id` bigint NOT NULL AUTO_INCREMENT COMMENT '员工ID',
`employee_no` varchar(20) NOT NULL COMMENT '员工编号',
`employee_name` varchar(50) NOT NULL COMMENT '员工姓名',
`department` varchar(50) DEFAULT NULL COMMENT '所属部门',
`position` varchar(50) DEFAULT NULL COMMENT '职位',
`mobile_phone` varchar(20) DEFAULT NULL COMMENT '手机号码(敏感)',
`id_card` varchar(18) DEFAULT NULL COMMENT '身份证号(敏感)',
`bank_account` varchar(20) DEFAULT NULL COMMENT '银行账号(敏感)',
`entry_date` date DEFAULT NULL COMMENT '入职日期',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`employee_id`),
UNIQUE KEY `uk_employee_no` (`employee_no`),
KEY `idx_mobile` (`mobile_phone`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='员工信息表';
  • 插入语句

INSERT INTO `customer_info` (`customer_name`, `gender`, `birth_date`, `mobile_phone`, `id_card`, `email`, `address`) VALUES
('李强', 'M', '1991-11-08', '13700137003', '440305199111083456', 'liqiang@example.com', '广东省深圳市南山区科技园');

INSERT INTO `employee_info` (`employee_no`, `employee_name`, `department`, `position`, `mobile_phone`, `id_card`, `bank_account`, `entry_date`) VALUES
('EMP1022', '赵七一', '财务部', '会计', '13500135003', '120106198712124560', '6222021234567890125', '2018-03-15');