AR JSON Format
AR 为变更日志提供了统一的格式, 这是一个 JSON 格式的表捕获的更新操作的简单示例:
{
"magic": "atMSG",
"type": "DT",
"headers": null,
"messageSchemaId": null,
"messageSchema": null,
"message": {
"data": {
"USER_CODE": 1301,
"USER_ORLE": 3
},
"beforeData": {
"USER_CODE": 1301,
"USER_ORLE": 1
},
"headers": {
"operation": "UPDATE",
"changeSequence": "20231020053318510000000000000614701",
"timestamp": "2023-10-20T05:33:18.510",
"streamPosition": "00076e59:000238c5:0004",
"transactionId": "00000000000000000000000067B402DA",
"changeMask": "068160",
"columnMask": "7FFFFF"
}
}
}
表 有 2 列 (USER_CODE
, USER_ORLE
, ). 上面的 JSON 消息是上的一条更新事件,其中
USER_ORLE
值从 1
更改为 2
. 假设此消息已同步到 Kafka 的 Topic user_behavior
, 则可以使用以下 DDL 使用更新事件。
CREATE TABLE user_behavior (
USER_CODE BIGINT,
USER_ORLE BIGINT
) WITH (
'connector' = 'kafka-x',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'ar-json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
结果表输出选择Mysql,创建主键表user_product
CREATE TABLE IF NOT EXISTS user_product
(
USER_CODE int not null primary key,
USER_ORLE bigint
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
DDL更新事件同步下游使用主键表,向导模式结果表配置更新模式选择Upsert模式。
CREATE TABLE mysqlResultTable(
USER_CODE int,
USER_ORLE bigint,
PRIMARY KEY(USER_CODE) NOT ENFORCED
)WITH(
'password' = '******',
'connector'='mysql-x',
'sink.buffer-flush.interval'='1000',
'sink.all-replace'='false',
'sink.buffer-flush.max-rows'='100',
'table-name'='user_product',
'sink.parallelism'='1',
'url'='jdbc:mysql://localhost:3306/defalut',
'username'='drpeco'
);
DDL 更新事件截图
更新前
更新后
Format 参数
参数 | 是否必须 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 声明使用的格式,这里应为'ar-json' 。 |