Skip to main content

湖仓快速入门

设计背景

1、解决Flink映射表的元数据持久化存储和管理的问题。

通过实时湖仓中的映射表管理,可以对Kafka、MySQL等数据源中的topic/表进行持久化映射。当某个映射表可以在多个任务中重复使用时,不需要在每个任务开发时都做一遍重复映射,直接通过Catalog.db.table的方式引用即可。

2、解决湖表(Paimon/Hudi/Iceberg)的创建/管理需求,为实时任务读写湖表提供基础。

湖表管理:在【实时湖仓】模块,创建维护湖表;

湖表读写:在【数据开发】模块,在IDE中通过Catalog.db.table的方式引用湖表,完成读写。

快速入门

通过DEMO串联介绍【实时湖仓】和【数据开发】的配合使用。

Hudi Demo需要实现:流式读取Hudi表,实时Join ES维表,流式写入另一张Hudi表。

一、创建两个Catalog

HMS Catalog用来存储管理Hudi表、DT Catalog用来管理ES映射表。

二、创建湖表和映射表

  • 创建Hudi Source表

image-20230911174917253

  • 创建Hudi Sink表

image-20230911175013745

  • 创建ES维表

image-20230905143358021

三、开发实时任务

通过左侧「表查询」,查询到【实时湖仓】中创建的表,并在IDE中通过catalog.db.table的方式引用,完成一个实时任务的代码编辑。

对于湖表的读写参数,需要通过外挂Hint的方式来实现,比如配置Hudi源表的读取起点。

image-20230905143454433

Paimon Demo实现:流式读取Kafka,写入Paimon表数据入湖

一、创建Catalog

HMS Catalog 存储管理Catalog、Database信息

二、创建湖表

  • 创建Paimon Sink表

image-20230905144442039

三、开发实时任务

通过左侧「表查询」,查询到【实时湖仓】中创建的表,并在IDE中通过catalog.db.table的方式引用,完成一个实时FLinkSQL任务的代码编辑。

CREATE TABLE sourceTable(
order_id BIGINT,
real_price BIGINT,
channel_id BIGINT,
create_time STRING,
update_time STRING
)WITH(
'properties.bootstrap.servers'='localhost:9092',
'connector'='kafka-x',
'scan.parallelism'='1',
'format'='json',
'topic'='test',
'scan.startup.mode'='latest-offset'
);

INSERT
INTO
catalog_test.database_test.paimon_sink_table
SELECT
*
FROM
sourceTable;

四、运行任务检查插入数据

运行FlinkSQL任务消费Kafka数据从当前latest位点,写入Paimon表后可通过平台SQL Query流批读取数据。

image-20230905144121237