Confluent Avro
使用 upsert-kafka 连接器,Kafka 的 value 在 Schema Registry 中注册为 Avro 记录的表的示例:
CREATE TABLE user_created (
  
  -- 该列映射到 Kafka 原始的 UTF-8 key
  kafka_key_id STRING,
  
  -- 映射到 Kafka value 中的 Avro 字段的一些列
  id STRING, 
  name STRING, 
  email STRING, 
  
  -- upsert-kafka 连接器需要一个主键来定义 upsert 行为
  PRIMARY KEY (kafka_key_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'user_events_example3',
  'properties.bootstrap.servers' = 'localhost:9092',
  -- UTF-8 字符串作为 Kafka 的 keys
  -- 在本例中我们不指定 'key.fields',因为它由表的主键决定
  'key.format' = 'raw',
  
  -- 在本例中,我们希望 Kafka 的 key 和 value 的 Avro 类型都包含 'id' 字段
  -- => 给表中与 Kafka key 字段关联的列添加一个前缀来避免冲突
  'key.fields-prefix' = 'kafka_key_',
  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY'
)
{{< /tab >}} {{< /tabs >}}
Format 参数
| 参数 | 是否必选 | 默认值 | 类型 | 描述 | 
|---|---|---|---|---|
| format | required | (none) | String | Specify what format to use, here should be 'avro-confluent'. | 
| avro-confluent.basic-auth.credentials-source | optional | (none) | String | Basic auth credentials source for Schema Registry | 
| avro-confluent.basic-auth.user-info | optional | (none) | String | Basic auth user info for schema registry | 
| avro-confluent.bearer-auth.credentials-source | optional | (none) | String | Bearer auth credentials source for Schema Registry | 
| avro-confluent.bearer-auth.token | optional | (none) | String | Bearer auth token for Schema Registry | 
| avro-confluent.properties | optional | (none) | Map | Properties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence. | 
| avro-confluent.ssl.keystore.location | optional | (none) | String | Location / File of SSL keystore | 
| avro-confluent.ssl.keystore.password | optional | (none) | String | Password for SSL keystore | 
| avro-confluent.ssl.truststore.location | optional | (none) | String | Location / File of SSL truststore | 
| avro-confluent.ssl.truststore.password | optional | (none) | String | Password for SSL truststore | 
| avro-confluent.subject | optional | (none) | String | The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, 'kafka' and 'upsert-kafka' connectors use '<topic_name>-value' or '<topic_name>-key' as the default subject name if this format is used as the value or key format. But for other connectors (e.g. 'filesystem'), the subject option is required when used as sink. | 
| avro-confluent.url | required | (none) | String | The URL of the Confluent Schema Registry to fetch/register schemas. | 
数据类型映射
目前 Apache Flink 都是从 table schema 去推断反序列化期间的 Avro reader schema 和序列化期间的 Avro writer schema。显式地定义 Avro schema 暂不支持。 Apache Avro Format中描述了 Flink 数据类型和 Avro 类型的对应关系。
除了此处列出的类型之外,Flink 还支持读取/写入可为空(nullable)的类型。 Flink 将可为空的类型映射到 Avro union(something, null), 其中 something 是从 Flink 类型转换的 Avro 类型。
您可以参考 Avro Specification 以获取有关 Avro 类型的更多信息。