实时采集Mysql多表采集过滤字段案例
场景描述
某银行作为一家区域性商业银行,正在进行数字化转型,核心业务系统(如信用卡交易、客户管理、账户管理等)采用MySQL数据库存储数据。为满足实时风控、客户画像分析、监管报送等需求,银行需要将MySQL的binlog变更数据实时采集到大数据平台(如Flink + Kafka),由于监管数据安全 合规要求,在银行涉及多张业务表采集时需要过滤敏感字段。
- 现有CDC方案缺乏灵活过滤能力
当前使用Debezium或Canal采集binlog,但无法在采集阶段过滤敏感字段。
- Chunjun实现原理方案
使用Mysql-X采集binlog日志增加过滤条件,支持配置filterColumns过滤参数,同时支持多表以及正则过滤条件。
实时采集Mysql多表过滤
前置环境准备
- 数栈6.2版本
- FLinkSQL1.16版本
- Mysql5.7数据源、Kafka2.x数据源
- Mysql来源表构建
开发实时采集任务
实时采集任务来源表Mysql、目标表kafka 通过日志采集的方式进行数据采集
配置采集来源表Mysql,选择Source采集表,支持多表过滤字段。
配置采集Mysql源表高级配置,增加filterColumns过滤参数。
{"filterColumns":"employee_info.bank_account"} -- 单表过滤字段样例
{"filterColumns":"(customer_info.customer_.*)"} -- 正则表达式过滤样例配置目标表kafka
目标配置Kafka Topic ,选择单个Topic将采集多个来源表写入同一个Topic中
脚本预览
{
"job" : {
"content" : [ {
"reader" : {
"parameter" : {
"password" : "******",
"port" : 3306,
"cat" : "insert,update,delete",
"host" : "localhost",
"jdbcUrl" : "jdbc:mysql://localhost:3306/automation",
"start" : { },
"filterColumns" : "customer_info.mobile_phone|customer_info.id_card|order_info.payment_card_no|employee_info.mobile_phone|employee_info.id_card|employee_info.bank_account",
"readPosition" : "current",
"pavingData" : true,
"table" : [ "customer_info", "order_info", "employee_info" ],
"username" : "drpeco"
},
"name" : "binlogreader"
},
"writer" : {
"parameter" : {
"producerSettings" : {
"zookeeper.connect" : "",
"bootstrap.servers" : "localhost:9092"
},
"dataCompelOrder" : false,
"topic" : "test"
},
"name" : "kafkawriter",
"type" : 37
}
} ],
"setting" : {
"restore" : {
"isRestore" : true,
"isStream" : true
},
"errorLimit" : { },
"speed" : {
"readerChannel" : 1,
"writerChannel" : 1,
"bytes" : -1048576,
"channel" : 1
}
}
}
}任务提交任务运维中心,任务运行中采集输出至目标表Mysql。
来源表 Mysql表 插入 Insert语句操作完成。
- Mysql表 插入 Insert语句
INSERT INTO `employee_info` (`employee_no`, `employee_name`, `department`, `position`, `mobile_phone`, `id_card`, `bank_account`, `entry_date`) VALUES
('EMP1022', '赵七一', '财务部', '会计', '13500135003', '120106198712124560', '6222021234567890125', '2018-03-15');
- Mysql表 插入 Insert语句
目标表 Kafka Topic 查询FlinkX采集并过滤后的数据
在 「Topic管理」 中通过Topic名称 查询采集过滤后的数据
employee_info表 采集过滤:mobile_phone、id_card、bank_account 字段
{"schema":"automation","after_employee_id":43,"lsn":"mysql_bin.000030/000000000391863999","after_employee_no":"EMP1022",
"type":"INSERT","after_entry_date":"2018-03-15 00:00:00","after_position":"会计","after_create_time":"2025-03-27 15:52:37",
"database":null,"after_employee_name":"赵七一","opTime":"1743061957000","after_id_card":"120106198712124560",
"after_mobile_phone":"13500135003","table":"employee_info","ts":7310931560398721024,"after_department":"财务部",
"after_update_time":"2025-03-27 15:52:37"
}customer_info表 采集过滤:mobile_phone、id_card 字段
{"schema":"automation","lsn":"mysql_bin.000030/000000000392619317","after_customer_name":"李强","type":"INSERT",
"after_create_time":"2025-03-27 16:18:37","after_email":"liqiang@example.com","database":null,"after_customer_id":5,
"after_birth_date":"1991-11-08 00:00:00","opTime":"1743063517000","after_gender":"M","after_address":"广东省深圳市南山区科技园",
"table":"customer_info","ts":7310938103953362944,"after_update_time":"2025-03-27 16:18:37"
}
Mysql前置表SQL
- 建表语句
CREATE TABLE `customer_info` (
`customer_id` bigint NOT NULL AUTO_INCREMENT COMMENT '客户ID',
`customer_name` varchar(50) NOT NULL COMMENT '客户姓名',
`gender` char(1) DEFAULT NULL COMMENT '性别(M-男,F-女)',
`birth_date` date DEFAULT NULL COMMENT '出生日期',
`mobile_phone` varchar(20) DEFAULT NULL COMMENT '手机号码(敏感)',
`id_card` varchar(18) DEFAULT NULL COMMENT '身份证号(敏感)',
`email` varchar(100) DEFAULT NULL COMMENT '电子邮箱',
`address` varchar(200) DEFAULT NULL COMMENT '联系地址',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`customer_id`),
KEY `idx_mobile` (`mobile_phone`),
KEY `idx_id_card` (`id_card`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='客户基本信息表';
CREATE TABLE `order_info` (
`order_id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`order_no` varchar(30) NOT NULL COMMENT '订单编号',
`customer_id` bigint NOT NULL COMMENT '客户ID',
`total_amount` decimal(12,2) NOT NULL COMMENT '订单总金额',
`payment_card_no` varchar(20) DEFAULT NULL COMMENT '支付卡号(敏感)',
`payment_card_name` varchar(50) DEFAULT NULL COMMENT '持卡人姓名',
`payment_time` datetime DEFAULT NULL COMMENT '支付时间',
`order_status` tinyint NOT NULL DEFAULT '0' COMMENT '订单状态',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`order_id`),
UNIQUE KEY `uk_order_no` (`order_no`),
KEY `idx_customer_id` (`customer_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单信息表';
CREATE TABLE `employee_info` (
`employee_id` bigint NOT NULL AUTO_INCREMENT COMMENT '员工ID',
`employee_no` varchar(20) NOT NULL COMMENT '员工编号',
`employee_name` varchar(50) NOT NULL COMMENT '员工姓名',
`department` varchar(50) DEFAULT NULL COMMENT '所属部门',
`position` varchar(50) DEFAULT NULL COMMENT '职位',
`mobile_phone` varchar(20) DEFAULT NULL COMMENT '手机号码(敏感)',
`id_card` varchar(18) DEFAULT NULL COMMENT '身份证号(敏感)',
`bank_account` varchar(20) DEFAULT NULL COMMENT '银行账号(敏感)',
`entry_date` date DEFAULT NULL COMMENT '入职日期',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`employee_id`),
UNIQUE KEY `uk_employee_no` (`employee_no`),
KEY `idx_mobile` (`mobile_phone`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='员工信息表';
- 插入语句
INSERT INTO `customer_info` (`customer_name`, `gender`, `birth_date`, `mobile_phone`, `id_card`, `email`, `address`) VALUES
('李强', 'M', '1991-11-08', '13700137003', '440305199111083456', 'liqiang@example.com', '广东省深圳市南山区科技园');
INSERT INTO `employee_info` (`employee_no`, `employee_name`, `department`, `position`, `mobile_phone`, `id_card`, `bank_account`, `entry_date`) VALUES
('EMP1022', '赵七一', '财务部', '会计', '13500135003', '120106198712124560', '6222021234567890125', '2018-03-15');