Maxwell
Maxwell Format
Maxwell 是一个 CDC(变更日志数据捕获)工具,可以将更改从 MySQL 实时流式传输到 Kafka、Kinesis 和其他流连接器。Maxwell 为变更日志提供了统一的格式架构,并支持使用 JSON 序列化消息
Flink 支持将 Maxwell JSON 消息解释为 Flink SQL 系统中的 INSERT/UPDATE/DELETE 消息。在许多情况下,这对于利用此功能很有用,例如
- 将增量数据从数据库同步到其他系统
- 审计日志
- 数据库的实时物化视图
- 临时连接改变数据库表的历史记录等等
Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Maxwell JSON 消息,并发送到 Kafka 等外部系统。但是,目前 Flink 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 组合成单个 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UDPATE_AFTER 编码为 DELETE 和 INSERT Maxwell 消息。
依赖项
注意:有关如何使用 Maxwell JSON 将变更日志同步到 Kafka 主题,请参阅Maxwell文档
如何使用 Maxwell 格式
Maxwell 为变更日志提供了统一的格式,下面是一个从 MySQL products表中以 JSON 格式捕获更新操作的简单示例:
{
"database":"test",
"table":"e",
"type":"insert",
"ts":1477053217,
"xid":23396,
"commit":true,
"position":"master.000006:800911",
"server_id":23042,
"thread_id":108,
"primary_key": [1, "2016-10-21 05:33:37.523000"],
"primary_key_columns": ["id", "c"],
"data":{
"id":111,
"name":"scooter",
"description":"Big 2-wheel scooter",
"weight":5.15
},
"old":{
"weight":5.18,
}
}
注:各字段含义请参考Maxwell文档
MySQL products表有 4 列(id、name和description)weight。上面的 JSON 消息是表上的更新更改事件products,其中weight的行的值id = 111从 更改5.18为5.15。假设该消息已同步到 Kafka topic products_binlog,那么我们可以使用以下 DDL 来消费该 topic 并解释更改事件。
CREATE TABLE topic_products (
-- schema is totally the same to the MySQL "products" table
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json'
)
将主题注册为 Flink 表后,您可以使用 Maxwell 消息作为变更日志源。
-- a real-time materialized view on the MySQL "products"
-- which calculate the latest average of weight for the same products
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- synchronize all the data and incremental changes of MySQL "products" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;
可用元数据
以下元数据格式可以在表定义中公开为只读(VIRTUAL
)列。
只有当相应的连接器转发格式元数据。目前,只有Kafka连接器能够暴露元数据字段的值格式。
参数 | 数据类型 | 描述 |
---|---|---|
database | STRING NULL | 原始数据库。在Maxwell记录中对应于 database 字段(如果可用) |
table | STRING NULL | 原始数据库表。在Maxwell记录中对应于 table 字段(如果可用)。 |
primary-key-columns | ARRAY<STRING> NULL | 主键名称数组。 在Maxwell 记录中的对应于primary_key_columns 字段(如果可用) |
ingestion-timestamp | TIMESTAMP_LTZ(3) NULL | 连接器处理事件的时间戳. 在Maxwell 记录中对应于ts 字段 |
以下示例展示了如何访问 Kafka 中的 Maxwell 元数据字段:
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'maxwell-json'
);
Format Options
参数 | 是否必须 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 指定使用 'maxwell-json' 格式 |
maxwell-json.ignore-parse-errors | 可选 | false | Boolean | 跳过有解析错误的字段和行而不是失败。如果出现错误,字段将设置为空 |
maxwell-json.timestamp-format.standard | 可选 | 'SQL' | String | 指定输入和输出时间戳格式。当前支持的值为 'SQL' 和 'ISO-8601' :
|
maxwell-json.map-null-key.mode | 可选 | 'FAIL' | String | 指定序列化地图数据的空键时的处理模式。当前支持的值为 'FAIL' , 'DROP' 和 'LITERAL' :
|
maxwell-json.map-null-key.literal | 可选 | 'null' | String | 为LITERAL时,指定字符串文字来替换空键 'maxwell-json.map-null-key.mode' |
maxwell-json.encode.decimal-as-plain-number | 可选 | false | Boolean | 将所有小数编码为普通数字而不是可能的科学记数法。默认情况下,小数可以使用科学记数法书写。例如,默认情况 0.000000027 下会进行编码 2.7E-8 并且会按照将此选项设置为 true 的方式将0.000000027 进行写入 |
注意事项
重复的更改事件
Maxwell 应用程序允许仅传递每个更改事件一次。在这种情况下,Flink 在使用 Maxwell 生成的事件时工作得很好。如果Maxwell应用程序工作在至少一次交付,它可能会向Kafka交付重复的更改事件,并且Flink将获取重复的事件。这可能会导致 Flink 查询得到错误的结果或意外的异常。因此,在这种情况下,建议将作业配置设置table.exec.source.cdc-events-duplicate为true并在源上定义 PRIMARY KEY。框架将生成一个额外的有状态运算符,并使用主键来删除重复的更改事件并生成规范化的更改日志流。
数据类型映射
目前,Maxwell 格式使用 JSON 进行序列化和反序列化。有关数据类型映射的更多详细信息,请参阅JSON 格式文档