Skip to main content

介绍

kafka插件支持从kafka的topic里读取以及写入数据

如何创建一个kafka表

CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

##元数据 以下连接器元数据可以作为表定义中的元数据列进行访问。 该R/W列定义元数据字段是否可读 ( R) 和/或可写 ( W)。只读列必须声明VIRTUAL为在操作期间排除它们INSERT INTO。

名称类型描述读/写
topicSTRING NOT NULLKafka 记录的主题名称R
partitionINT NOT NULLKafka 记录的分区 IDR
headersMAP NOT NULLKafka 记录的标头作为原始字节的映射R/W
leader-epochINT NULLKafka 记录的领导纪元(如果有)R
offsetBIGINT NOT NULLKafka 记录在分区中的偏移量)R
timestampTIMESTAMP_LTZ(3) NOT NULLKafka 记录的时间戳R/W
timestamp-typeSTRING NOT NULL(Kafka记录的时间戳类型。“NoTimestampType”、“CreateTime”(也在写入元数据时设置)或“LogAppendTime”。R

连接器参数

连接器支持的参数主要是由基础参数加上对应表角色类型参数,如源表则为基础参数加上源表参数,结果表参数为基础参数加上结果表参数

###基础参数

名称是否必须类型描述
connector(none)String插件名称,kakfa插件需要配置为 kafka-x
topic(none)String写入的kafka topic名称
topic-pattern(none)String写入的topic正则
properties.group.id(none)String设置kafka group id
properties.bootstrap.servers(none)Stringkafka的broker地址
key.format(none)String用于反序列化和序列化kafka消息的关键部分的格式。有关详细信息和更多格式选项,请参阅格式页。注意: 如果定义了键格式,那么还需要“ key.fields”选项。否则,kafka的记录将有一个空键。
key.fields(none)String定义表架构中的物理列的显式列表,这些物理列为键格式配置数据类型。默认情况下,此列表为空,因此键是未定义的。列表应该类似于“ field1; field2”。
key.fields-prefix(none)String为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,那么表模式和‘ key.fields’都将使用前缀名称。在构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求必须将‘ value.fields-include’设置为‘ EXCEPT _ KEY’
value.format(none)Stringtopic值的编码格式
value.fields-include(none)String定义一个策略如何处理值格式数据类型中的键列。默认情况下,'ALL'表模式的物理列将包含在值格式中,这意味着键列出现在键和值格式的数据类型中

###源表参数

名称是否必须类型描述
scan.parallelism1INT读取并行度
scan.startup.mode(none)String多并行度时,根据此字段进行划分每个子任务读取范围,相当于splitPk
scan.startup.specific-offsets(none)String多并行度时,划分子任务的策略,相当于splitStrategy
scan.startup.timestamp-millis"(none)String多并行度时,划分子任务的策略,相当于splitStrategy
scan.topic-partition-discovery.interval(none)String增量字段名称,相当于increColumn

结果表参数

名称是否必须类型描述
sink.partitionerdefault(默认) fixed round-robin customClassNameString输入数据到kafka分区策略default使用kafka默认分区器对记录进行分区fixed每个Flink分区最多在一个Kafka分区中结束round-robin一个Flink分区被分配给Kafka分区customClassName自定义FlinkKafkaPartitioner子类名称
sink.semanticat-least-once(默认) exactly-onceString任务一致性级别at-least-once至少一次,数据有可能重复写入exactly-once 精确一次,数据写入不多不少,不会重复也不会丢失
sink.parallelism(none)INTsink算子并行度

安全

按照下列示例所示配置相关参数

CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka-x',
......
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
)

示例

-- {"id":1238123899121,"name":"asdlkjasjkdla998y1122","date":"1990-10-14","obj":{"time1":"12:12:43","str":"sfasfafs","lg":2324342345},"arr":[{"f1":"f1str11","f2":134},{"f1":"f1str22","f2":555}],"arr2":["a","b"],"time":"12:12:43Z","timestamp":"1990-10-14T12:12:43Z","map":{"flink":123},"mapinmap":{"inner_map":{"key":234}}}
CREATE TABLE source_ods_fact_user_ippv (
id BIGINT
, name STRING
, `date` DATE
, obj ROW<time1 TIME,str STRING,lg BIGINT>
, str as obj.str
, arr ARRAY<ROW<f1 STRING,f2 INT>>
, arr2 ARRAY<STRING>
, `time` TIME
, `timestamp` TIMESTAMP(3)
, `map` MAP<STRING,BIGINT>
, mapinmap MAP<STRING,MAP<STRING,INT>>
, proctime as PROCTIME()
) WITH (
'connector' = 'kafka'
,'topic' = 'test_multi_line'
,'properties.bootstrap.servers' = '172.16.100.109:9092'
,'properties.group.id' = 'dt_test'
,'scan.startup.mode' = 'earliest-offset'
,'format' = 'json'
,'json.timestamp-format.standard' = 'SQL'
-- ,'json.fail-on-missing-field' = 'true'
,'json.ignore-parse-errors' = 'true'
);


CREATE TABLE result_total_pvuv_min
(
id BIGINT
, name STRING
, `date` DATE
, str STRING
, f1 STRING
, tag STRING
, map1 BIGINT
, map2 INT
) WITH (
'connector' = 'stream-x'
);

INSERT INTO result_total_pvuv_min
select
id
, name
, `date`
, str as str
, arr[1].f1 as f1
, tag
, `map`['flink'] as map1
, mapinmap['inner_map']['key'] as map2
from source_ods_fact_user_ippv CROSS JOIN UNNEST(arr2) AS t (tag)