kafkaFormat设置
kafka format 设置
功能背景
Protocol Buffers(简称:ProtoBuf)是一种开源跨平台的序列化数据结构的协议。其对于存储资料或在网络上进行通信的程序是很有用的。这个方法包含一个接口描述语言,描述一些数据结构,并提供程序工具根据这些描述产生代码,这些代码将用来生成或解析代表这些数据结构的字节流。 Google最初开发了Protocol Buffers用于内部使用。Protocol Buffers的设计目标是简单和性能。与XML相比更小且更快。
使用限制:上传后只能在FLinkSQL脚本模式中使用,不支持平台的向导模式
支持Flink版本:1.16
proto -> java.class
将.proto文件编译成java类需要用到protoc工具,参考链接:https://protobuf.dev/programming-guides/proto3/#generating,可以通过如下命令生成java类。 protoc --java_out=./ /path/user.proto 生成的java类整体结构如下所示,作为一个嵌套类,类名整体为我们指定的java_outer_classname参数,即UserProtoBuf。
依赖项
如何创建 Protobuf 格式的表
下面是使用 Kafka 连接器和 Protobuf 格式创建表的示例。
下面是 proto 定义文件。
syntax = "proto3";
package com.dtstack.flinkx.format.protobuf;
message RepeatedBool {repeated bool Values = 1;}
message RepeatedMyBool {repeated bool Values = 1;}
message RepeatedInt32 {repeated int32 Values = 1;}
message RepeatedUint32 {repeated uint32 Values = 1;}
message RepeatedInt64 {repeated int64 Values = 1;}
message RepeatedUint64 {repeated uint64 Values = 1;}
message RepeatedFloat {repeated float Values = 1;}
message RepeatedDouble {repeated double Values = 1;}
message RepeatedString {repeated string Values = 1;}
message Variant{
oneof Value{
bool ValueBool = 1;
RepeatedBool ArrayBool = 2;
int32 ValueInt32 = 3;
RepeatedInt32 ArrayInt32 = 4;
uint32 ValueUint32 = 5;
RepeatedUint32 ArrayUint32 = 6;
int64 ValueInt64 = 7;
RepeatedInt64 ArrayInt64 = 8;
uint64 ValueUint64 = 9;
RepeatedUint64 ArrayUint64 = 10;
float ValueFloat = 11;
RepeatedFloat ArrayFloat = 12;
double ValueDouble = 13;
RepeatedDouble ArrayDouble = 14;
string ValueString = 15;
RepeatedString ArrayString = 16;
bytes ValueBytes = 17;
int64 ValueTimestamp = 18;
};
bool boolx = 19;
oneof Value2{
bool ValueBool2 = 20;
RepeatedBool ArrayBool2 = 21;
}
bool booly=22;
}
message MessageItem{
string TagName = 1; //默认optional
Variant TagValue = 2;
int32 UaDataType = 3;
bool Quality = 4;
int64 Timestamp = 5;
map<string, string>TagInfos = 6;
map<string, string>ExValues = 7;
}
message MessageGroup{
map<string, string> GroupInfo = 1;
repeated MessageItem Messages = 2;
}
- 使用protoc命令将.proto文件编译为 Java 类
- 然后编译并打包类(不需要将proto-java打包到jar中)
- 最后,你应该在你的类路径中提供,例如在sql-client中jar传递它-j
- 数栈实时计算中使用需要在项目首页-通用设置-kafka Format设置新增格式 protobuf-x
add jar with ProtoMessage.jar;
CREATE TABLE reader (
GroupInfo MAP<STRING,STRING>
,Messages ARRAY<
ROW<
TAGNAME VARCHAR ,
TagValue ROW<
`Value` ROW<
ValueCase INTEGER,
ValueBool BOOlEAN,
ArrayBool ROW<`Values` ARRAY<BOOLEAN>>,
ValueInt32 INTEGER,
ArrayInt32 ROW<`Values` ARRAY<INTEGER>>,
ValueUint32 INTEGER,
ArrayUint32 ROW<`Values` ARRAY<INTEGER>>,
ValueInt64 BIGINT,
ArrayInt64 ROW<`Values` ARRAY<BIGINT>>,
ValueUint64 BIGINT,
ArrayUint64 ROW<`Values` ARRAY<BIGINT>>,
ValueFloat FLOAT,
ArrayFloat ROW<`Values` ARRAY<FLOAT>>,
ValueDouble DOUBLE,
ArrayDouble ROW<`Values` ARRAY<DOUBLE>>,
ValueString STRING,
ArrayString ROW<`Values` ARRAY<STRING>>,
ValueBytes BYTES,
ValueTimestamp BIGINT
>,
boolx BOOLEAN,
Value2 ROW<
ValueCase INTEGER ,
Value2Value BOOLEAN,
ArrayBool2 ROW<`Values` ARRAY<BOOLEAN>>>,
booly BOOLEAN
>,
UaDataType INTEGER,
Quality BOOLEAN,
`Timestamp` BIGINT,
TagInfos MAP<STRING,STRING>,
ExValues MAP<STRING,STRING>
>
>
) WITH (
'connector' = 'kafka-x'
,'topic' = 'liuliu_proto_source'
,'properties.bootstrap.servers' = 'flink01:9092'
,'properties.group.id' = 'luna_g'
,'scan.startup.mode' = 'earliest-offset'
,'format' = 'protobuf-x'
,'protobuf.message-class-name'='ZPMC.Message.MessageGroupOuterClass$MessageGroup'
,'scan.parallelism' = '1'
);
格式选项
选项 | 必需的 | 已转发 | 默认 | 类型 | 描述 |
---|---|---|---|---|---|
format | required | no | (none) | String | 指定使用什么格式,这里应该是 protobuf-x' |
protobuf-x.message-class-name | required | no | (none) | String | Protobuf 生成的类的全名。该名称必须与 proto 定义文件中的消息名称匹配。$支持内部类名,例如“com.exmample.OuterClass$MessageClass” |
protobuf-x.ignore-parse-errors | optional | no | false | Boolean | 可选标志,用于跳过解析错误而不是失败的行。 |
protobuf-x.read-default-values | optional | yes | false | Boolean | 此选项仅在生成的类的版本为 proto2 时有效。如果将此值设置为 true,格式将读取 proto 文件中定义的空值作为默认值。如果将此值设置为 false,则如果二进制 protobuf 消息中不存在数据元素,格式将生成空值。如果 proto 语法为 proto3,则此值将强制设置为 true,因为 proto3 的标准是使用默认值。 |
protobuf-x.write-null-string-literal | optional | no | "" | String | 当序列化为 protobuf 数据时,这是可选配置,用于在值为空的情况下指定 Protobuf 的数组/映射中的字符串文字。 |
Data Type Mapping
The following table lists the type mapping from Flink type to Protobuf type.
Flink SQL type | Protobuf type | Description |
---|---|---|
CHAR / VARCHAR / STRING | string | |
BOOLEAN | bool | |
BINARY / VARBINARY | bytes | |
INT | int32 | |
BIGINT | int64 | |
FLOAT | float | |
DOUBLE | double | |
ARRAY | repeated | 元素不能为空,字符串默认值可以通过以下方式指定write-null-string-literal |
MAP | map | 键或值不能为空,字符串默认值可以通过以下方式指定write-null-string-literal |
ROW | message | |
VARCHAR / CHAR / TINYINT / SMALLINT / INTEGER / BIGINT | enum | protobuf的枚举值可以相应地映射到字符串或flink行数。 |
空值
由于 protobuf 不允许在映射和数组中出现空值,因此我们需要在从 Flink Rows 转换为 Protobuf 时自动生成默认值。
Protobuf Data Type | Default Value |
---|---|
int32 / int64 / float / double | 0 |
string | "" |
bool | false |
enum | first element of enum |
binary | ByteString.EMPTY |
message | MESSAGE.getDefaultInstance() |
OneOf字段
在序列化过程中,无法保证同一个 one-of 组中的 Flink 字段最多只包含一个有效值。序列化时,每个字段都按照 Flink schema 的顺序设置,因此同一个 one-of 组中位置较高的字段将覆盖位置较低的字段。
您可以参考语言指南(proto2)或语言指南(proto3)以获取有关 Protobuf 类型的更多信息。