介绍
kafka插件支持从kafka的topic里读取以及写入数据
如何创建一个kafka表
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
##元数据 以下连接器元数据可以作为表定义中的元数据列进行访问。 该R/W列定义元数据字段是否可读 ( R) 和/或可写 ( W)。只读列必须声明VIRTUAL为在操作期间排除它们INSERT INTO。
名称 | 类型 | 描述 | 读/写 |
---|---|---|---|
topic | STRING NOT NULL | Kafka 记录的主题名称 | R |
partition | INT NOT NULL | Kafka 记录的分区 ID | R |
headers | MAP NOT NULL | Kafka 记录的标头作为原始字节的映射 | R/W |
leader-epoch | INT NULL | Kafka 记录的领导纪元(如果有) | R |
offset | BIGINT NOT NULL | Kafka 记录在分区中的偏移量) | R |
timestamp | TIMESTAMP_LTZ(3) NOT NULL | Kafka 记录的时间戳 | R/W |
timestamp-type | STRING NOT NULL | (Kafka记录的时间戳类型。“NoTimestampType”、“CreateTime”(也在写入元数据时设置)或“LogAppendTime”。 | R |
连接器参数
连接器支持的参数主要是由基础参数加上对应表角色类型参数,如源表则为基础参数加上源表参数,结果表参数为基础参数加上结果表参数
###基础参数
名称 | 是否必须 | 值 | 类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 插件名称,kakfa插件需要配置为 kafka-x |
topic | 否 | (none) | String | 写入的kafka topic名称 |
topic-pattern | 否 | (none) | String | 写入的topic正则 |
properties.group.id | 否 | (none) | String | 设置kafka group id |
properties.bootstrap.servers | 是 | (none) | String | kafka的broker地址 |
key.format | 否 | (none) | String | 用于反序列化和序列化kafka消息的关键部分的格式。有关详细信息和更多格式选项,请参阅格式页。注意: 如果定义了键格式,那么还需要“ key.fields”选项。否则,kafka的记录将有一个空键。 |
key.fields | 否 | (none) | String | 定义表架构中的物理列的显式列表,这些物理列为键格式配置数据类型。默认情况下,此列表为空,因此键是未定义的。列表应该类似于“ field1; field2”。 |
key.fields-prefix | 否 | (none) | String | 为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,那么表模式和‘ key.fields’都将使用前缀名称。在构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求必须将‘ value.fields-include’设置为‘ EXCEPT _ KEY’ |
value.format | 否 | (none) | String | topic值的编码格式 |
value.fields-include | 否 | (none) | String | 定义一个策略如何处理值格式数据类型中的键列。默认情况下,'ALL'表模式的物理列将包含在值格式中,这意味着键列出现在键和值格式的数据类型中 |
###源表参数
名称 | 是否必须 | 值 | 类型 | 描述 |
---|---|---|---|---|
scan.parallelism | 否 | 1 | INT | 读取并行度 |
scan.startup.mode | 否 | (none) | String | 多并行度时,根据此字段进行划分每个子任务读取范围,相当于splitPk |
scan.startup.specific-offsets | 否 | (none) | String | 多并行度时,划分子任务的策略,相当于splitStrategy |
scan.startup.timestamp-millis" | 否 | (none) | String | 多并行度时,划分子任务的策略,相当于splitStrategy |
scan.topic-partition-discovery.interval | 否 | (none) | String | 增量字段名称,相当于increColumn |
结果表参数
名称 | 是否必须 | 值 | 类型 | 描述 |
---|---|---|---|---|
sink.partitioner | 否 | default(默认) fixed round-robin customClassName | String | 输入数据到kafka分区策略default 使用kafka默认分区器对记录进行分区fixed 每个Flink分区最多在一个Kafka分区中结束round-robin 一个Flink分区被分配给Kafka分区customClassName 自定义FlinkKafkaPartitioner子类名称 |
sink.semantic | 否 | at-least-once(默认) exactly-once | String | 任务一致性级别at-least-once 至少一次,数据有可能重复写入exactly-once 精确一次,数据写入不多不少,不会重复也不会丢失 |
sink.parallelism | 否 | (none) | INT | sink算子并行度 |
安全
按照下列示例所示配置相关参数
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka-x',
......
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
)
示例
-- {"id":1238123899121,"name":"asdlkjasjkdla998y1122","date":"1990-10-14","obj":{"time1":"12:12:43","str":"sfasfafs","lg":2324342345},"arr":[{"f1":"f1str11","f2":134},{"f1":"f1str22","f2":555}],"arr2":["a","b"],"time":"12:12:43Z","timestamp":"1990-10-14T12:12:43Z","map":{"flink":123},"mapinmap":{"inner_map":{"key":234}}}
CREATE TABLE source_ods_fact_user_ippv (
id BIGINT
, name STRING
, `date` DATE
, obj ROW<time1 TIME,str STRING,lg BIGINT>
, str as obj.str
, arr ARRAY<ROW<f1 STRING,f2 INT>>
, arr2 ARRAY<STRING>
, `time` TIME
, `timestamp` TIMESTAMP(3)
, `map` MAP<STRING,BIGINT>
, mapinmap MAP<STRING,MAP<STRING,INT>>
, proctime as PROCTIME()
) WITH (
'connector' = 'kafka'
,'topic' = 'test_multi_line'
,'properties.bootstrap.servers' = '172.16.100.109:9092'
,'properties.group.id' = 'dt_test'
,'scan.startup.mode' = 'earliest-offset'
,'format' = 'json'
,'json.timestamp-format.standard' = 'SQL'
-- ,'json.fail-on-missing-field' = 'true'
,'json.ignore-parse-errors' = 'true'
);
CREATE TABLE result_total_pvuv_min
(
id BIGINT
, name STRING
, `date` DATE
, str STRING
, f1 STRING
, tag STRING
, map1 BIGINT
, map2 INT
) WITH (
'connector' = 'stream-x'
);
INSERT INTO result_total_pvuv_min
select
id
, name
, `date`
, str as str
, arr[1].f1 as f1
, tag
, `map`['flink'] as map1
, mapinmap['inner_map']['key'] as map2
from source_ods_fact_user_ippv CROSS JOIN UNNEST(arr2) AS t (tag)