Skip to main content

AR JSON Format

AR 为变更日志提供了统一的格式, 这是一个 JSON 格式的表捕获的更新操作的简单示例:

{
"magic": "atMSG",
"type": "DT",
"headers": null,
"messageSchemaId": null,
"messageSchema": null,
"message": {
"data": {
"USER_CODE": 1301,
"USER_ORLE": 3
},
"beforeData": {
"USER_CODE": 1301,
"USER_ORLE": 1
},
"headers": {
"operation": "UPDATE",
"changeSequence": "20231020053318510000000000000614701",
"timestamp": "2023-10-20T05:33:18.510",
"streamPosition": "00076e59:000238c5:0004",
"transactionId": "00000000000000000000000067B402DA",
"changeMask": "068160",
"columnMask": "7FFFFF"
}
}
}

表 有 2 列 (USER_CODE, USER_ORLE, ). 上面的 JSON 消息是上的一条更新事件,其中 USER_ORLE 值从 1 更改为 2. 假设此消息已同步到 Kafka 的 Topic user_behavior, 则可以使用以下 DDL 使用更新事件。

CREATE TABLE user_behavior (
USER_CODE BIGINT,
USER_ORLE BIGINT
) WITH (
'connector' = 'kafka-x',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'ar-json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)

结果表输出选择Mysql,创建主键表user_product

CREATE TABLE IF NOT EXISTS user_product
(
USER_CODE int not null primary key,
USER_ORLE bigint
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

DDL更新事件同步下游使用主键表,向导模式结果表配置更新模式选择Upsert模式。

CREATE TABLE mysqlResultTable(
USER_CODE int,
USER_ORLE bigint,
PRIMARY KEY(USER_CODE) NOT ENFORCED
)WITH(
'password' = '******',
'connector'='mysql-x',
'sink.buffer-flush.interval'='1000',
'sink.all-replace'='false',
'sink.buffer-flush.max-rows'='100',
'table-name'='user_product',
'sink.parallelism'='1',
'url'='jdbc:mysql://localhost:3306/defalut',
'username'='drpeco'
);

DDL 更新事件截图

更新前

image-20220701103615884

更新后

image-20220701103615884

Format 参数

参数是否必须默认值类型描述
format
必选(none)String声明使用的格式,这里应为'ar-json'