Skip to main content

FlinkSQL

FlinkSQL任务可以让开发人员在IDE页面通过SQL开发的方式,完成实时数据的分析计算。

开发逻辑

举一个常见的实时计算场景来介绍开发逻辑: 某任务需要分析实时日志数据按APP维度的统计指标,供下游大屏API查询。该场景中的日志数据会实时打入Kafka Topic中,维度数据存储在PostgreSQL中,最后计算加工好的数据需要写入HBase中。

接下去的基础开发逻辑主要分为两部分:

  • 表定义

我们发现这个实时任务中涉及了三种结构完全不一致的数据源类型,要能在FlinkSQL中把它们串起来,就需要将这些表按照Flink的定义把它们映射成统一的、FlinkSQL能够看得懂的表。(借助Flink官方提供的Table API

  • SQL加工

表定义完成后,通过SQL开发完成加工计算。(SQL加工逻辑和离线的SparkSQL类似,insert into A as select * from B join C,只是在部分语法上略有区别,详见Flink1.16 SQL说明

开发模式

在IDE页面开发SQL时,代码逻辑可通过三种模式完成开发:

  • 向导模式(默认)

    该方式就是按步骤还原了上面的开发逻辑。需要用户先在IDE面板右侧定义源表、维表、结果表,然后在SQL编辑器内引用右侧定义的表,进行SQL逻辑的加工。

  • 脚本模式

    该方式是基于官方定义的最基础的开发方法。将表定义和SQL加工在SQL编辑器内同时实现。具有更高的参数自由度,同时对开发能力的也会有更高的要求。

  • Catalog模式(作为向导或脚本模式开发时的补充能力)

    上面两种方式均存在一个缺点,就是用户每次开发任务的时候都需要去定义表。假如我有5个实时任务都用到了同一张PG的维表,我需要在在5个实时任务中重复定义5次。

    Catalog方式可以帮我们把表定义的元数据持久化存储下来,不管哪个实时任务需要用到,直接输入catalog.database.table即可使用,避免重复定义的工作,提升开发效率。

向导模式的介绍

以上述案例介绍源表为Kafka、维表为PG、结果表为HBase时,创建一个向导模式的FlinkSQL任务。

表定义:

各个配置参数的含义。更多参数以及解释,请参考Flink官方 Connector介绍

  • Kafka源表

    • 类型、数据源、Topic:选择过滤出需要定义成Flink Table的Topic

    • 编码类型:指定kafka数据的编码类型,目前支持utf-8、gbk2312

    • 读取类型:将kafka获取到的消息字符串按指定格式进行解析,Kafka formats目前支持json、ogg-json、chunjun-json、ar-json、avro、csv、raw 。

    • 数据预览:查看该topic中最早或最近的数据

    • 映射表:定义成Flink Table之后的表名,用于SQL开发中使用

    • 字段:

      • 自动映射:用户不需要关心JSON格式,不需要手动维护字段映射。实现一键自动映射需要按固定的模版格式去解析,目前仅支持JSON、OGG-JSON、Chunjun-JSON、AR-JSON

        1. 点击「自动映射」,平台会自动采集所选Topic中的最近一条消息进行解析。

          note

          因为Topic数据不支持过滤查询,因此采集最近一条数据可能并不包含完整的JSON KEY,或者并不是你想要的Table JSON。这个问题无法避免,您可以多采集几次进行覆盖。

        2. 如果多次采集解析的结果,均未达到预期。可以点击「自动映射」后的下拉ICON,手动输入样例JSON,平台会跳过JSON采集,直接解析样例JSON。

        3. 自动映射输出字段类型默认String类型,字段类型不满足时手动修改。

        image-20230220151418206

      • 手动映射:将Kafka数据映射成行列结构的字段。根据topic中数据格式的不同,在输入框中的配置方式也略有区别。详见下表:

      Topic数据样例字段配置写法
      {"id":"1","name":"张三","opt_time":"2022-06-28 19:25:00"}id int
      name string
      opt_time datetime
      {"user_info":{"id":"1","name":"张三"},"opt_time":"2022-06-28 19:25:00"}user_info ROW(id int, name string )
      opt_time datetime
      {"jsonArray":[{"id":"1","name":"张三"}],"opt_time":"2022-06-28 19:25:00"}jsonArray ARRAY<ROW(id int, name string)
      opt_time datetime

      更多复杂类型在flink sql中的映射逻辑,详见Flink JSON Format

    note

:::

  • Offset:支持如下四种方式

    • Latest:从Kafka Topic内最新的数据开始消费
    • Earliest:从Kafka Topic内最早的数据开始消费
    • Time:从指定的时间点开始消费
    • 自定义参数:从指定分区的指定偏移量开始消费
  • 时间特征:Flink分为ProcTime和 EventTime两种时间特征,这两个时间特征的选择会对后续基于时间的各类计算结果产生影响。

    • Processing Time:表示数据进入Flink后,执行对应Operation的设备的系统时间。(选择该特征,后续的计算处理更简单)

    • Event Time(推荐):表示数据在它的生产设备上发生的时间。(选择该特征后,需要补充额外的两个配置信息。其数据计算结果最接近真实的业务情况)

      • 时间列:必须是映射表中已声明的一列(当前仅支持为Timestamp类型)。Flink会基于该列生成Watermark,并且标识该列为Event Time列,可以在后续Query中用来定义窗口。

      • 最大延迟时间:该配置项和Flink的Watermark(水印)机制有关。已下图为例:假设我们定义了一个10秒的计算窗口,并且允许延迟7秒。在Flink里会发生如下操作:

        image-20220630170616006

        image-20220630170815041

  • 并行度:算子的并发数,指的是Flink集群的Task Slot的数量。(每个任务的并行度配置不能超过整个Flink集群的Task Slot的数量,每个任务的并行度累加起来也不能超出,否则会造成任务的报错)

  • 时区:时区设置目前只针对时间特征为EventTime的任务生效,默认为Asia/shanghai

  • 自定义参数:自定义参数项和参数名

  • PostgreSQL维表

    • 类型、数据源、Schema、表:选择过滤出需要定义成Flink Table的表

    • 映射表:定义成Flink Table之后的表名,用于SQL开发中使用

    • 字段:定义Flink Table的表结构

      • 导入全部字段:会将PG表中所有字段和数据类型完整的导入作为Flink Table的字段
      • 添加输入:手动选择PG表中需要映射的字段
      • 别名:如需要调整原字段名在Flink Table中的显示,可以维护别名
    • 主键:选择维表主键,系统进行缓存刷新时会根据主键判断数据的超时时间

    • 并行度:算子的并发数,指的是Flink集群的Task Slot的数量。(每个任务的并行度配置不能超过整个Flink集群的Task Slot的数量,每个任务的并行度累加起来也不能超出,否则会造成任务的报错)

    • 缓存策略:因为维表数据变更频率较低,因此在查询时可以将数据进行缓存,避免每次查询都连接数据库,提高查询效率。

      • LRU(部分缓存):Least Recently Used,记录维表每条数据的访问热度,仅缓存热度较高的数据
      • None(不缓存):表示不需要缓存
      • ALL(全缓存):缓存维表的全部数据
    • 缓存大小(行):即缓存空间需要保存多少条数据

    • 缓存超时时间:超出缓存时间时会在缓存空间中删除数据

    • 允许错误数据:可以配置当出现N条错误数据时,任务置为失败。默认无限制。(可能出现目前不兼容的数据格式导致查询错误)

    • 异步线程池:维表缓存采用异步方式查询数据,支持配置线程池数量

  • HBase结果表:

    • 类型、数据源、表:选择过滤出需要定义成Flink Table的表

    • Rowkey:指定表中的rowkey及其类型,提高查询效率避免数据热点问题

    • 映射表:定义成Flink Table之后的表名,用于SQL开发中使用

    • 字段:定义Flink Table的表结构。按$列族$ row<$字段名$ 类型,$字段名$ 类型>的格式进行映射,如:user_info row<id int,name varchar>

    • 更新模式:支持append、upsert两种模式

      • append:表示结果数据会已一条新数据的形式,插入结果表。

      • upsert:表示结果数据会根据主键唯一性,更新至结果表。因此选择upsert方式时,必须选择主键。

SQL加工:

当源表、维表、结果表都在右侧定义好之后,只需要在IDE页面输入符合业务逻辑的SQL代码,即可完成一个实时任务的开发。

image-20220630192918850

脚本模式的介绍

脚本模式的底层逻辑和向导模式完全相同,并且平台也支持将向导模式的任务一键切换为脚本模式。(注意脚本模式无法切换为向导模式)

最主要的区别是,表定义这件事情不通过右侧面板进行配置,而是直接在SQL IDE中维护,如下案例:

-- **该案例通过一个最基础的查询源表、关联维表、输出至结果表的开发demo,介绍FlinkSQL的基础开发逻辑**


-- 第一步:定义源表。
-- Flink的表不是内部表,不在内部维护,而是始终对外部数据库进行操作。
-- 表定义分为两部分:表结构和连接器配置。表结构定义了表中的列名及其类型,是查询操作的对象。连接器配置包含在WITH子句中,定义支持该表的外部数据库。
-- 该案例的连接器使用datagen作为测试数据集,实际场景需要根据实际数据源类型进行参数配置
CREATE TABLE orders (
order_uid BIGINT,
product_id BIGINT,
price DECIMAL(32,2) ,
order_time TIMESTAMP(3)
)WITH(
'connector'='datagen'
);

-- 第二步:定义维表。
CREATE TABLE products (
product_id BIGINT,
product_name STRING,
product_type STRING
)WITH(
'connector'='datagen'
);

-- 第三步:定义结果表。
CREATE TABLE product_order_analysis(
order_uid BIGINT,
product_id BIGINT,
product_name STRING,
product_type STRING,
price DECIMAL(32,2) ,
order_time TIMESTAMP(3)
)WITH(
'connector'='stream-x'
);

-- 第四步:根据业务需求编辑代码逻辑。本案例就是将源表和维表进行关联后写入结果表。
INSERT
INTO
product_order_analysis
SELECT
orders. order_uid as order_uid,
orders. product_id as product_id,
products. product_name as product_name,
products. product_type as product_type,
orders. price as price,
orders. order_time as order_time
FROM
orders
LEFT JOIN
products
on orders.product_id=products.product_id;

Catalog模式的介绍

Catalog作为对向导模式和脚本模式的补充,如果用户在SQL IDE中编辑的时候,使用catalog.database.table的方式,则表示使用在【表管理】模块中已经定义好的Flink Table,无须重复定义。

关于Catalog、Database、Table的创建维护介绍,详见实时湖仓实时湖仓】模块。

FlinkSQL任务管理的介绍

在SQL IDE面板右侧,可以看到【任务详情、源表、维表、结果表、环境参数、任务结构、任务设置、批模式】菜单。其中源表、维表、结果表已经在上文做了详细介绍。下文介绍其它菜单内的功能:

  • 任务详情:任务详情页主要展示任务的元数据信息和版本信息。

    • 引擎版本:支持切换任务执行的引擎版本,Flink1.16和Flink1.12。但是两个版本的SQL语法存在区别,切换后需要调整代码,对于FlinkSQL任务不建议切换。

    • 任务版本:每次保存、自动保存(任务退出等异常情况)、提交任务时会自动生成一个代码版本,支持任意两个版本之间的代码比对,以及回滚至指定版本。

      • 任务历史版本三种操作:保存、自动保存、提交 image-20220701103615884
      • 异常退出重新打开任务时提示 image-20220701103615884
      • 版本对比 image-20220701103615884
      • 版本回滚 image-20220701103615884
    • 任务锁逻辑:同一个任务同时被编辑时,先完成编辑保存的形成一次版本历史,后完成编辑保存时提示给出三个保存方案:放弃当前更改、取消、覆盖并保存。

      • 放弃当前更改:放弃本次保存,本次内容被上一个历史版本内容覆盖
      • 取消:放弃本次保存,返回数据开发IDE编辑页面,显示本次更改内容
      • 覆盖并保存:保存本次内容,覆盖上一个历史版本内容,本次更新内容为最新版本

      image-20220701103615884

    • 预览:向导模式下支持脚本预览功能,支持脚本转自定义模版 image-20220701103615884 image-20220701103615884

  • 环境参数:实时任务运行过程中所使用的参数,系统均会配置默认值。您也可以根据环境、任务的实际情况进行调整,如调整任务并发数、数据一致性,CheckPoint生成间隔等参数,让任务更好地运行。下文介绍常用的参数内容:

    • 资源相关:支持调整任务并行度、TaskSlots数量、JM/TM的内存大小
    • 时间相关:支持调整任务的时间特征,ProcessingTime、EventTime、IngestionTime
    • Checkpoint相关:控制任务取消后的CP自动生成、任务运行中的CP生成间隔、CP语义选择(支持EXACTLY_ONCE、AT_LEAST_ONCE)
    • FlinkSQL状态参数:支持设置状态过期时间,默认为1天
    • 日志等级:设置需要打印的日志等级,默认为info
    • 表动态参数:当你需要在任务中动态调整表参数的时候,开启该配置项。请参考Dynamic Table Options解读
    • Kerberos相关:在FlinkSQL任务中使用开启了Kerberos认证的Kafka数据源时,需要手动开启认证参数。参数配置分如下三种情况:
      • ZK开启Kerberos,Kafka没开:security.kerberos.login.contexts=Client
      • ZK、Kafka都开启了Kerberos:security.kerberos.login.contexts=Client,KafkaClient
      • Zookeeper、Kafka都不开启Kerberos:注销参数(默认)
    • 窗口提前触发时间:当您希望在窗口结束之前就输出数据结果,可以开启改配置。比如需要统计一天的pv时,窗口设置为1day,开启窗口提前触发,时间设为5min,则每5分钟会计算一次输出upsert流。
    • table.exec.source.idle-timeout:开启后,某些source partition没数据,等待N秒,忽略它,让窗口计算继续往下
    • 是否开启minibatch:MiniBatch 聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个 key 只需一个操作即可访问状态。这样可以大大减少状态开销并获得更好的吞吐量。但是,这可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理它们。这是吞吐量和延迟之间的权衡。
    • 是否开启Local-Global聚合:开启后,在遇到SUM、COUNT等聚合计算时,会现在本地预聚合,然后下游进行全局聚合,优化数据倾斜情况;
    • Flink算子链开关:默认为开启。排查性能问题时可暂时设置成关闭,关闭后会降低实时计算性能。
  • 任务设置:

    • 出错重试:开启后,当任务运行失败时会自动重试。您可以设置重试方式(重跑/续跑)、重试次数、重试间隔、等待超时(超过指定时间后,任务自动取消。建议等待超时时长 ≥ 重试次数 * 重试间隔时长)
    • 启停策略:选择启停策略,详见【项目管理-启停策略管理】
    • 脏数据管理:开启后,系统会自动存储sink到结果表失败的数据,具体原因需要具体分析。如数据格式转化后结果表不支持等原因。
      • 内置脏数据记录表建表语句:
          CREATE TABLE dirty_data (
        job_id varchar(32) NOT NULL COMMENT 'Flink Job Id',
        job_name varchar(255) NOT NULL COMMENT 'Flink Job Name',
        operator_name varchar(255) NOT NULL COMMENT '出现异常数据的算子名,包含表名',
        dirty_data text NOT NULL COMMENT '脏数据的异常数据',
        error_message text COMMENT '脏数据中异常原因',
        field_name varchar(255) DEFAULT NULL COMMENT '脏数据中异常字段名',
        error_type varchar(255) DEFAULT NULL COMMENT '脏数据异常类型',
        create_time timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '脏数据出现的时间点',
        KEY idx_job_id (job_id),
        KEY idx_operator_name (operator_name)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='存储脏数据';
      • 脏数据最大值:当脏数据达到指定条数时,任务置为失败
      • 失败条数:当脏数据处理失败(如写入脏数据表失败)达到指定次数时,任务置为失败
      • 脏数据保存:
        • 不保存脏数据内容,仅输出日志。需要设置脏数据日志打印频率(该模式不支持在任务运维处对脏数据进行分析)
        • 保存脏数据内容至指定MySQL库表。
    • 资源组配置:指定该任务运行的资源组。资源组在【控制台】模块配置。
    • 日志推送:
      • 开启日志推送后,通过编辑logback模版,平台会将任务的日志信息推送至指定Kafka。后续运维人员可以再根据实际需求开发对应的实时任务,达到根据日志内容自定义监控的目的
      • 日志模版中的kafka数据源信息,需要根据实际情况修改成真实可用的连接信息。如果开启kerberos认证,需要上传认证文件。
      • 模版克隆:支持从该项目其他已经开启了日志推送的任务处克隆模版,减少logback编辑工作量。
      note
      1. 日志推送仅支持Flink1.16及以上版本,低于则置灰提示不支持。
  • 批模式:

    • 当Source为Iceberg表的时候,FlinkSQL任务支持开启批模式。
    • 开启批模式后,同一个任务在执行流式计算的同时,又可按批模式周期执行,不需要开发人员维护两套脚本。所以一般的应用应用场景有两种:
      • 在刚创建任务的时候,通过批模式计算历史全量数据,流计算无缝衔接。
      • 在流计算过程中,可能会出现数据丢失、数据重复等问题,通过定期的批计算对历史计算结果进行修正。
    • 调度设置:目前支持配置任务生效日期和任务执行时间,跑批时执行的均是历史全量数据。(数据范围的限定会在后续迭代中支持)
    • 批模式执行的前提是流任务在正常运行。