Skip to main content

Maxwell

Maxwell Format

Maxwell 是一个 CDC(变更日志数据捕获)工具,可以将更改从 MySQL 实时流式传输到 Kafka、Kinesis 和其他流连接器。Maxwell 为变更日志提供了统一的格式架构,并支持使用 JSON 序列化消息

Flink 支持将 Maxwell JSON 消息解释为 Flink SQL 系统中的 INSERT/UPDATE/DELETE 消息。在许多情况下,这对于利用此功能很有用,例如

  • 将增量数据从数据库同步到其他系统
  • 审计日志
  • 数据库的实时物化视图
  • 临时连接改变数据库表的历史记录等等

Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Maxwell JSON 消息,并发送到 Kafka 等外部系统。但是,目前 Flink 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 组合成单个 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UDPATE_AFTER 编码为 DELETE 和 INSERT Maxwell 消息。

依赖项

注意:有关如何使用 Maxwell JSON 将变更日志同步到 Kafka 主题,请参阅Maxwell文档

如何使用 Maxwell 格式

Maxwell 为变更日志提供了统一的格式,下面是一个从 MySQL products表中以 JSON 格式捕获更新操作的简单示例:

{
"database":"test",
"table":"e",
"type":"insert",
"ts":1477053217,
"xid":23396,
"commit":true,
"position":"master.000006:800911",
"server_id":23042,
"thread_id":108,
"primary_key": [1, "2016-10-21 05:33:37.523000"],
"primary_key_columns": ["id", "c"],
"data":{
"id":111,
"name":"scooter",
"description":"Big 2-wheel scooter",
"weight":5.15
},
"old":{
"weight":5.18,
}
}

注:各字段含义请参考Maxwell文档

MySQL products表有 4 列(id、name和description)weight。上面的 JSON 消息是表上的更新更改事件products,其中weight的行的值id = 111从 更改5.18为5.15。假设该消息已同步到 Kafka topic products_binlog,那么我们可以使用以下 DDL 来消费该 topic 并解释更改事件。

CREATE TABLE topic_products (
-- schema is totally the same to the MySQL "products" table
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json'
)

将主题注册为 Flink 表后,您可以使用 Maxwell 消息作为变更日志源。

-- a real-time materialized view on the MySQL "products"
-- which calculate the latest average of weight for the same products
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- synchronize all the data and incremental changes of MySQL "products" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

可用元数据

以下元数据格式可以在表定义中公开为只读(VIRTUAL)列。

info

只有当相应的连接器转发格式元数据。目前,只有Kafka连接器能够暴露元数据字段的值格式。

参数数据类型描述
databaseSTRING NULL原始数据库。在Maxwell记录中对应于 database 字段(如果可用)
tableSTRING NULL原始数据库表。在Maxwell记录中对应于 table 字段(如果可用)。
primary-key-columnsARRAY<STRING> NULL主键名称数组。 在Maxwell 记录中的对应于primary_key_columns字段(如果可用)
ingestion-timestampTIMESTAMP_LTZ(3) NULL连接器处理事件的时间戳. 在Maxwell 记录中对应于ts字段

以下示例展示了如何访问 Kafka 中的 Maxwell 元数据字段:

CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'maxwell-json'
);

Format Options

参数是否必须默认值类型描述
format
必选(none)String指定使用 'maxwell-json'格式
maxwell-json.ignore-parse-errors
可选falseBoolean跳过有解析错误的字段和行而不是失败。如果出现错误,字段将设置为空
maxwell-json.timestamp-format.standard
可选'SQL'String指定输入和输出时间戳格式。当前支持的值为 'SQL''ISO-8601':
  • 选项 'SQL'将以 "yyyy-MM-dd HH:mm:ss.s{precision}" 格式解析输入时间戳,例如'2020-12-30 12:13:14.123' 并以相同格式输出时间戳
  • 选项 'ISO-8601'将以"yyyy-MM-ddTHH:mm:ss.s{precision}" 格式解析输入时间戳,例如 '2020-12-30T12:13:14.123' 并以相同格式输出时间戳
maxwell-json.map-null-key.mode
可选'FAIL'String指定序列化地图数据的空键时的处理模式。当前支持的值为 'FAIL', 'DROP''LITERAL':
  • 当遇到带有 null key 的映射时,选项 'FAIL' 报错
  • 选项 'DROP'将删除地图数据的空键条目
  • 选项'LITERAL'将用字符串文字替换空键。字符串文字由 maxwell-json.map-null-key.literal 选项定义
maxwell-json.map-null-key.literal
可选'null'String为LITERAL时,指定字符串文字来替换空键 'maxwell-json.map-null-key.mode'
maxwell-json.encode.decimal-as-plain-number
可选falseBoolean将所有小数编码为普通数字而不是可能的科学记数法。默认情况下,小数可以使用科学记数法书写。例如,默认情况 0.000000027下会进行编码 2.7E-8 并且会按照将此选项设置为 true 的方式将0.000000027进行写入

注意事项

重复的更改事件

Maxwell 应用程序允许仅传递每个更改事件一次。在这种情况下,Flink 在使用 Maxwell 生成的事件时工作得很好。如果Maxwell应用程序工作在至少一次交付,它可能会向Kafka交付重复的更改事件,并且Flink将获取重复的事件。这可能会导致 Flink 查询得到错误的结果或意外的异常。因此,在这种情况下,建议将作业配置设置table.exec.source.cdc-events-duplicate为true并在源上定义 PRIMARY KEY。框架将生成一个额外的有状态运算符,并使用主键来删除重复的更改事件并生成规范化的更改日志流。

数据类型映射

目前,Maxwell 格式使用 JSON 进行序列化和反序列化。有关数据类型映射的更多详细信息,请参阅JSON 格式文档