FlinkSQL结果表支持开源Hive案例
场景描述
在 Apache Flink 中,FlinkSQL 是一种声明式 SQL 语言,它允许用户以 SQL 的方式来编写流处理和批处理作业。Flink 官方提供了一系列的 Connector 来连接各种外部系统,例如 Kafka、HDFS、Elasticsearch 等。但是,由于业务的多样性,官方提供的 Connector 可能无法满足所有需求。这时,用户可以自定义 Connector 来扩展 FlinkSQL 的功能。 当前案例模拟写入结果表Hive原生Connector的场景。
操作步骤
前置环境准备
- 数栈6.2版本
- FLinkSQL1.16版本
- Hive2数据源、Kafka2.x数据源
注册Connector
实时计算平台自定义Connector设置上传Connector
自定义Connector
上传Hive Connector flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar
上传资源
- 实时计算平台资源管理上传所需的Jar hadoop-mapreduce-client-core-2_hadoop-mapreduce-client-core-2.7.5.jar
Hive Create Table
CREATE TABLE `user_table`(
`id` int,
`name` varchar(127),
`money` decimal(10,0),
`age` int,
`birthday` date,
`create_time` timestamp)
PARTITIONED BY (
`pt_day` string,
`pt_hour` string)
- 在数栈离线平台通过SparkSQL建表
CREATE TABLE user_table(
id INT,
name VARCHAR(127),
money DECIMAL,
age INT,
birthday Date,
create_time TIMESTAMP)
partitioned by (pt_day string, pt_hour string)
TBLPROPERTIES (
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-time',
'partition.time-extractor.kind' = 'default',
'sink.partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day $pt_hour-00:00',
'sink.partition-commit.policy.kind' = 'metastore',
'sink.parallelism'='2'
);
编写FlinkSQL
ADD JAR WITH /data/sftp/57_hadoop-mapreduce-client-core-2_hadoop-mapreduce-client-core-2.7.5.jar; -- 连接MR需要Jar手动输入文件sftp地址
CREATE TABLE sourceTable(
id int,
`name` STRING,
money DECIMAL(10,0) ,
age int,
birthday DATE,
create_time TIMESTAMP,
pt_day STRING,
pt_hour STRING
)WITH(
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka-x',
'scan.parallelism' = '1',
'format' = 'json',
'topic' = 'stream',
'scan.startup.mode' = 'latest-offset'
);
CREATE CATALOG myhive WITH(
'type'='hive', -- Connector名称
'hive-conf-dir'='/data/sftp/ci_hadoop2', --sftp存储地址,配置Hive连接所需Core-site\Hive-site\Yarn-site\Hdfs-site\Mapred-site文件地址
'hadoop-conf-dir'='/data/sftp/ci_hadoop2',--sftp存储地址,配置Hive连接所需Core-site\Hive-site\Yarn-site\Hdfs-site\Mapred-site文件地址
'default-database'='default' --Hive Database名称
);
INSERT
INTO
`myhive`.`default`.`user_table` -- 使用Catalog.Database.Table 的方式确定写入表
SELECT
id,
name,
money,
age,
birthday,
create_time,
pt_day,
pt_hour
from
sourceTable;
运行FLinkSQL
- FlinkSQL语法检查通过并提交调度运行任务
源表写入数据
- kafka Topic 生成数据并写入
{"id":1,"money":1.1,"age":0,"birthday":"2022-07-16","create_time":"2022-07-16 09:09:56","pt_day":"20240827","pt_hour":"04"}
结果表查询数据
- 数据写入成功查看数据库