Skip to main content

kafkaFormat设置

kafka format 设置

功能背景

Protocol Buffers(简称:ProtoBuf)是一种开源跨平台的序列化数据结构的协议。其对于存储资料或在网络上进行通信的程序是很有用的。这个方法包含一个接口描述语言,描述一些数据结构,并提供程序工具根据这些描述产生代码,这些代码将用来生成或解析代表这些数据结构的字节流。 Google最初开发了Protocol Buffers用于内部使用。Protocol Buffers的设计目标是简单和性能。与XML相比更小且更快。

  • 使用限制:上传后只能在FLinkSQL脚本模式中使用,不支持平台的向导模式

  • 支持Flink版本:1.16

  • 官网文档

proto -> java.class

将.proto文件编译成java类需要用到protoc工具,参考链接:https://protobuf.dev/programming-guides/proto3/#generating,可以通过如下命令生成java类。 protoc --java_out=./ /path/user.proto 生成的java类整体结构如下所示,作为一个嵌套类,类名整体为我们指定的java_outer_classname参数,即UserProtoBuf。

依赖项

如何创建 Protobuf 格式的表

下面是使用 Kafka 连接器和 Protobuf 格式创建表的示例。

下面是 proto 定义文件。

syntax = "proto3";

package com.dtstack.flinkx.format.protobuf;

message RepeatedBool {repeated bool Values = 1;}

message RepeatedMyBool {repeated bool Values = 1;}

message RepeatedInt32 {repeated int32 Values = 1;}

message RepeatedUint32 {repeated uint32 Values = 1;}

message RepeatedInt64 {repeated int64 Values = 1;}

message RepeatedUint64 {repeated uint64 Values = 1;}

message RepeatedFloat {repeated float Values = 1;}

message RepeatedDouble {repeated double Values = 1;}

message RepeatedString {repeated string Values = 1;}

message Variant{
oneof Value{
bool ValueBool = 1;
RepeatedBool ArrayBool = 2;
int32 ValueInt32 = 3;
RepeatedInt32 ArrayInt32 = 4;
uint32 ValueUint32 = 5;
RepeatedUint32 ArrayUint32 = 6;
int64 ValueInt64 = 7;
RepeatedInt64 ArrayInt64 = 8;
uint64 ValueUint64 = 9;
RepeatedUint64 ArrayUint64 = 10;
float ValueFloat = 11;
RepeatedFloat ArrayFloat = 12;
double ValueDouble = 13;
RepeatedDouble ArrayDouble = 14;
string ValueString = 15;
RepeatedString ArrayString = 16;
bytes ValueBytes = 17;
int64 ValueTimestamp = 18;
};
bool boolx = 19;
oneof Value2{
bool ValueBool2 = 20;
RepeatedBool ArrayBool2 = 21;
}
bool booly=22;
}

message MessageItem{
string TagName = 1; //默认optional
Variant TagValue = 2;
int32 UaDataType = 3;
bool Quality = 4;
int64 Timestamp = 5;
map<string, string>TagInfos = 6;
map<string, string>ExValues = 7;
}

message MessageGroup{
map<string, string> GroupInfo = 1;
repeated MessageItem Messages = 2;
}
  1. 使用protoc命令将.proto文件编译为 Java 类
  2. 然后编译并打包类(不需要将proto-java打包到jar中)
  3. 最后,你应该在你的类路径中提供,例如在sql-client中jar传递它-j
  4. 数栈实时计算中使用需要在项目首页-通用设置-kafka Format设置新增格式 protobuf-x

add jar with ProtoMessage.jar;

CREATE TABLE reader (
GroupInfo MAP<STRING,STRING>
,Messages ARRAY<
ROW<
TAGNAME VARCHAR ,
TagValue ROW<
`Value` ROW<
ValueCase INTEGER,
ValueBool BOOlEAN,
ArrayBool ROW<`Values` ARRAY<BOOLEAN>>,
ValueInt32 INTEGER,
ArrayInt32 ROW<`Values` ARRAY<INTEGER>>,
ValueUint32 INTEGER,
ArrayUint32 ROW<`Values` ARRAY<INTEGER>>,
ValueInt64 BIGINT,
ArrayInt64 ROW<`Values` ARRAY<BIGINT>>,
ValueUint64 BIGINT,
ArrayUint64 ROW<`Values` ARRAY<BIGINT>>,
ValueFloat FLOAT,
ArrayFloat ROW<`Values` ARRAY<FLOAT>>,
ValueDouble DOUBLE,
ArrayDouble ROW<`Values` ARRAY<DOUBLE>>,
ValueString STRING,
ArrayString ROW<`Values` ARRAY<STRING>>,
ValueBytes BYTES,
ValueTimestamp BIGINT
>,
boolx BOOLEAN,
Value2 ROW<
ValueCase INTEGER ,
Value2Value BOOLEAN,
ArrayBool2 ROW<`Values` ARRAY<BOOLEAN>>>,
booly BOOLEAN
>,
UaDataType INTEGER,
Quality BOOLEAN,
`Timestamp` BIGINT,
TagInfos MAP<STRING,STRING>,
ExValues MAP<STRING,STRING>
>
>

) WITH (
'connector' = 'kafka-x'
,'topic' = 'liuliu_proto_source'
,'properties.bootstrap.servers' = 'flink01:9092'
,'properties.group.id' = 'luna_g'
,'scan.startup.mode' = 'earliest-offset'
,'format' = 'protobuf-x'
,'protobuf.message-class-name'='ZPMC.Message.MessageGroupOuterClass$MessageGroup'
,'scan.parallelism' = '1'
);

格式选项

选项必需的已转发默认类型描述
format
requiredno(none)String 指定使用什么格式,这里应该是 protobuf-x'
protobuf-x.message-class-name
requiredno(none)String Protobuf 生成的类的全名。该名称必须与 proto 定义文件中的消息名称匹配。$支持内部类名,例如“com.exmample.OuterClass$MessageClass”
protobuf-x.ignore-parse-errors
optionalnofalseBoolean可选标志,用于跳过解析错误而不是失败的行。
protobuf-x.read-default-values
optionalyesfalseBoolean此选项仅在生成的类的版本为 proto2 时有效。如果将此值设置为 true,格式将读取 proto 文件中定义的空值作为默认值。如果将此值设置为 false,则如果二进制 protobuf 消息中不存在数据元素,格式将生成空值。如果 proto 语法为 proto3,则此值将强制设置为 true,因为 proto3 的标准是使用默认值。
protobuf-x.write-null-string-literal
optionalno""String当序列化为 protobuf 数据时,这是可选配置,用于在值为空的情况下指定 Protobuf 的数组/映射中的字符串文字。

Data Type Mapping

The following table lists the type mapping from Flink type to Protobuf type.

Flink SQL typeProtobuf typeDescription
CHAR / VARCHAR / STRINGstring
BOOLEANbool
BINARY / VARBINARYbytes
INTint32
BIGINTint64
FLOATfloat
DOUBLEdouble
ARRAYrepeated元素不能为空,字符串默认值可以通过以下方式指定write-null-string-literal
MAPmap键或值不能为空,字符串默认值可以通过以下方式指定write-null-string-literal
ROWmessage
VARCHAR / CHAR / TINYINT / SMALLINT / INTEGER / BIGINTenumprotobuf的枚举值可以相应地映射到字符串或flink行数。

空值

由于 protobuf 不允许在映射和数组中出现空值,因此我们需要在从 Flink Rows 转换为 Protobuf 时自动生成默认值。

Protobuf Data TypeDefault Value
int32 / int64 / float / double0
string""
boolfalse
enumfirst element of enum
binaryByteString.EMPTY
messageMESSAGE.getDefaultInstance()

OneOf字段

在序列化过程中,无法保证同一个 one-of 组中的 Flink 字段最多只包含一个有效值。序列化时,每个字段都按照 Flink schema 的顺序设置,因此同一个 one-of 组中位置较高的字段将覆盖位置较低的字段。

您可以参考语言指南(proto2)或语言指南(proto3)以获取有关 Protobuf 类型的更多信息。