Skip to main content

自定义Connector

自定义Connector

功能背景

在开发FlinkSQL任务时,需要使用SQL Connectors连接源表、结果表和维表。目前我们平台仅支持已经完成过开发适配的Connectors,但实际情况是随着客户越来越多,平台适配Connectors的速度可能赶不上项目的诉求,而项目上客户也有开发能力,因此需要支持用户上传自行根据社区要求开发的Connector Jar,然后在开发中使用。

  • 使用限制:这类自定义Connector,上传后只能在FlinkSQL脚本模式中使用,不支持平台的向导模式

  • 支持Flink版本:1.12、1.16

  • 上传自定义connector:

    • 上传JDBC连接Connector con-1.png
    • connector 选择 jdbc con-2.png
    • connector上传完成定义connector名称为jdbc,在FlinkSQL中使用脚本模式使用时,connector保持一致 con-2.png
    • 上传FlinkSQL依赖Jar文件:FlinkSQL目标表为Mysql需要先上传Mysql数据源连接Jar包,可以在实时平台资源管理中上传并获取资源存放路径。
  • 创建FlinkSQL 任务 con-2.png

  • FLinkSQL脚本

    -- ADD JAR WITH 连接Mysql需要,可使用上传资源获取资源存放路径
    ADD JAR WITH /data/sftp/stream/resource/0_custom_connector_mysql-connector-java-5.1.46.jar;
    CREATE TABLE sourceTable(
    id int,
    int_data int,
    varchar_data varchar
    )WITH(
    'properties.bootstrap.servers'='localhost:9092',
    'connector'='kafka-x',
    'scan.parallelism'='1',
    'format'='json',
    'topic'='test',
    'scan.startup.mode'='latest-offset'
    );
    CREATE TABLE SinkTable(
    id INT,
    int_data INT,
    varchar_data VARCHAR
    )WITH(
    'connector'='jdbc', --自定义connector
    'url'='jdbc:mysql://localhost:3306/automation',
    'username'='drpeco',
    'password' = '******',
    'table-name'='flink_catalog_one'
    );
    insert
    into
    SinkTable
    select
    *
    from
    sourceTable;
  • Kafka输入:

{"id":1,"int_data":1,"varchar_data":"varchar_data","age":"1"}
{"id":2,"int_data":2,"varchar_data":"varchar_data","age":"2"}
  • 结果输出: Mysql查询flink_catalog_one表新增两条数据

    con-2.png