FlinkSQL中使用嵌套Json案例
场景描述
在日常SQL开发中,会遇到很多负责的嵌套Json数据,本次针对FLinkSQL1.16版本输出一个演示Demo,其中会有复杂Map、Row、Array类型的数据输入输出。
Topic Json
{"id": 1, "name": "dtstack", "date_data": "2024-05-14","obj_data": {"time1": "12:12:43", "str": "dtstack_class", "lg": 9099},"arr_data": [{"f1": "f1str1", "f2": 1314}, {"f1": "f1str2", "f2": 9092}],"time_data": "12:12:43", "timestamp_data": "2024-05-14 12:12:43","map_data": {"flink": 1688}, "mapinmap": {"inner_map": {"key": 2022}}}
FlinkSQL语句
- 来源
CREATE TABLE sourceTable(
id BIGINT,
name STRING,
date_data DATE,
obj_data ROW<time1 TIME,str STRING,lg BIGINT>,
arr_data ARRAY<ROW<f1 STRING,f2 INT>>,
time_data TIME,
timestamp_data TIMESTAMP(3) ,
map_data MAP<STRING,BIGINT>,
mapinmap MAP<STRING,MAP<STRING,INT>>
)WITH(
'properties.bootstrap.servers' = '{kafka_host}:9092',
'connector' = 'kafka-x',
'scan.parallelism' = '1',
'format' = 'json',
'topic' = '{topic_name}',
'scan.startup.mode' = 'latest-offset'
);
- 结果表
CREATE TABLE kafkaResultTable(
id BIGINT,
name VARCHAR,
date_data DATE,
obj_str VARCHAR,
arr1_f1 VARCHAR,
map_flink BIGINT,
mapinmap_key INT
)WITH(
'properties.bootstrap.servers' = '{kafka_host}:9092',
'connector' = 'kafka-x',
'format' = 'json',
'topic' = '{topic_name}',
'sink.parallelism' = '1'
);
- SQL
INSERT
INTO
kafkaResultTable
select
id,
name,
date_data,
obj_data.str as obj_str,
arr_data[1].f1 as arr1_f1,
map_data['flink'] as map_flink,
mapinmap['inner_map']['key'] as mapinmap_key
from
sourceTable;
- 输出结果
{"id":1,"name":"dtstack","date_data":"2024-05-14","obj_str":"dtstack_class","arr1_f1":"f1str1","map_flink":1688,"mapinmap_key":2022}
###数据类型映射关系
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
FlinkSQL类型 | JSON类型 |
---|---|
CHAR / VARCHAR / STRING | string |
BOOLEAN | boolean |
BINARY / VARBINARY | string with encoding: base64 |
DECIMAL | number |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | number |
FLOAT | number |
DOUBLE | number |
DATE | string with format: date |
TIME | string with format: time |
TIMESTAMP | string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | string with format: date-time (with UTC time zone) |
INTERVAL | number |
ARRAY | array |
MAP / MULTISET | object |
ROW | object |