Skip to main content

Ogg

Ogg Format

Oracle GoldenGate (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。

Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等

Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。

依赖项

Ogg Json

注意: 请参考 Ogg Kafka Handler documentation, 了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。

如何使用 Ogg format

Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle PRODUCTS 表捕获的更新操作的简单示例:

{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"op_type": "U",
"op_ts": "2020-05-13 15:40:06.000000",
"current_ts": "2020-05-13 15:40:07.000000",
"primary_keys": [
"id"
],
"pos": "00000000000000000000143",
"table": "PRODUCTS"
}

Oracle PRODUCTS 表 有 4 列 (id, name, description and weight). 上面的 JSON 消息是 PRODUCTS 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15. 假设此消息已同步到 Kafka 的 Topic products_ogg, 则可以使用以下 DDL 来使用该 Topic 并解析更新事件。

CREATE TABLE topic_products (
-- schema is totally the same to the Oracle "products" table
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka-x',
'topic' = 'products_ogg',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'ogg-json'
)

再将 Kafka Topic 注册为 Flink 表之后, 可以将 OGG 消息变为变更日志源。

-- a real-time materialized view on the Oracle "PRODUCTS"
-- which calculate the latest average of weight for the same products
SELECT name, AVG(weight)
FROM topic_products
GROUP BY name;

-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT *
FROM topic_products;

可用元数据

以下格式元数据可以在表定义中公开为只读 (VIRTUAL) 列.

::: 注意仅当相应的连接器转发格式元数据时,格式元数据字段才可用。目前,只有 Kafka 连接器能够公开其值格式的元数据字段。 :::

参数数据类型描述
tableSTRING包含完全限定的表名。完全限定表名的格式为: SCHEMA NAME.TABLE NAME
op_typeSTRING源系统创建事件的操作类型。对应Ogg记录中的op_type字段
op_tsSTRING源系统创建事件的时间戳。对应Ogg记录中的op_ts字段
current_tsSTRING连接器处理事件的时间戳。对应Ogg记录中的current_ts字段
posSTRING位点信息,对应Ogg记录中的pos字段
primary_keysSTRING保存源表主键的列名的数组变量。仅当 includePrimaryKeys 配置属性设置为 true 时,主键字段才会包含在 JSON 输出中
tokenSTRINGtoken信息,对应Ogg记录中的token字段

以下示例显示了如何访问Kafka中的Ogg元数据字段:

CREATE TABLE KafkaTable (
op_ts STRING METADATA FROM 'value.op_ts' ,
current_ts STRING METADATA FROM 'value.current_ts' ,
origin_table STRING METADATA FROM 'value.table' ,
primary_keys STRING METADATA FROM 'value.primary_keys',
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka-x',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'ogg-json'
);

Format Options

选项要求默认类型描述
format
必填(none)String指定要使用的格式,此处应为 'ogg-json'.
ogg-json.ignore-parse-errors
选填falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。
debezium-json.timestamp-format.standard
可选'SQL'String声明输入和输出的时间戳格式。当前支持的格式为'SQL' 以及 'ISO-8601'
  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。
ogg-json.map-null-key.mode
选填'FAIL'String指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL', 'DROP''LITERAL':
  • Option 'FAIL' 将抛出异常。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ogg-json.map-null-key.literal 定义。
ogg-json.map-null-key.literal
选填'null'String'ogg-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

数据类型映射

目前, Ogg format 使用 JSON format 进行序列化和反序列化。有关数据类型映射的更多详细信息,请参考 JSON Format 文档。