Skip to main content

FlinkSQL结果表支持开源Hive案例

场景描述

在 Apache Flink 中,FlinkSQL 是一种声明式 SQL 语言,它允许用户以 SQL 的方式来编写流处理和批处理作业。Flink 官方提供了一系列的 Connector 来连接各种外部系统,例如 Kafka、HDFS、Elasticsearch 等。但是,由于业务的多样性,官方提供的 Connector 可能无法满足所有需求。这时,用户可以自定义 Connector 来扩展 FlinkSQL 的功能。 当前案例模拟写入结果表Hive原生Connector的场景。

操作步骤

前置环境准备

  • 数栈6.2版本
  • FLinkSQL1.16版本
  • Hive2数据源、Kafka2.x数据源

注册Connector

上传资源

mapreducejar.png

Hive Create Table

CREATE TABLE `user_table`(
`id` int,
`name` varchar(127),
`money` decimal(10,0),
`age` int,
`birthday` date,
`create_time` timestamp)
PARTITIONED BY (
`pt_day` string,
`pt_hour` string)
  • 在数栈离线平台通过SparkSQL建表
CREATE TABLE user_table(
id INT,
name VARCHAR(127),
money DECIMAL,
age INT,
birthday Date,
create_time TIMESTAMP)
partitioned by (pt_day string, pt_hour string)
TBLPROPERTIES (
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-time',
'partition.time-extractor.kind' = 'default',
'sink.partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day $pt_hour-00:00',
'sink.partition-commit.policy.kind' = 'metastore',
'sink.parallelism'='2'
);

编写FlinkSQL

ADD JAR WITH /data/sftp/57_hadoop-mapreduce-client-core-2_hadoop-mapreduce-client-core-2.7.5.jar;  -- 连接MR需要Jar手动输入文件sftp地址

CREATE TABLE sourceTable(
id int,
`name` STRING,
money DECIMAL(10,0) ,
age int,
birthday DATE,
create_time TIMESTAMP,
pt_day STRING,
pt_hour STRING
)WITH(
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka-x',
'scan.parallelism' = '1',
'format' = 'json',
'topic' = 'stream',
'scan.startup.mode' = 'latest-offset'
);

CREATE CATALOG myhive WITH(
'type'='hive', -- Connector名称
'hive-conf-dir'='/data/sftp/ci_hadoop2', --sftp存储地址,配置Hive连接所需Core-site\Hive-site\Yarn-site\Hdfs-site\Mapred-site文件地址
'hadoop-conf-dir'='/data/sftp/ci_hadoop2',--sftp存储地址,配置Hive连接所需Core-site\Hive-site\Yarn-site\Hdfs-site\Mapred-site文件地址
'default-database'='default' --Hive Database名称
);

INSERT
INTO
`myhive`.`default`.`user_table` -- 使用Catalog.Database.Table 的方式确定写入表
SELECT
id,
name,
money,
age,
birthday,
create_time,
pt_day,
pt_hour
from
sourceTable;

运行FLinkSQL

  • FlinkSQL语法检查通过并提交调度运行任务

hiveconnector-3.png

源表写入数据

  • kafka Topic 生成数据并写入
{"id":1,"money":1.1,"age":0,"birthday":"2022-07-16","create_time":"2022-07-16 09:09:56","pt_day":"20240827","pt_hour":"04"}

结果表查询数据

  • 数据写入成功查看数据库

hiveconnnector-1.png