自定义Connector
自定义Connector
功能背景
在开发FlinkSQL任务时,需要使用SQL Connectors连接源表、结果表和维表。目前我们平台仅支持已经完成过开发适配的Connectors,但实际情况是随着客户越来越多,平台适配Connectors的速度可能赶不上项目的诉求,而项目上客户也有开发能力,因此需要支持用户上传自行根据社区要求开发的Connector Jar,然后在开发中使用。
使用限制:这类自定义Connector,上传后只能在FlinkSQL脚本模式中使用,不支持平台的向导模式
支持Flink版本:1.12、1.16
上传自定义connector:
- 上传JDBC连接Connector
- connector 选择 jdbc
- connector上传完成定义connector名称为jdbc,在FlinkSQL中使用脚本模式使用时,connector保持一致
- 上传FlinkSQL依赖Jar文件:FlinkSQL目标表为Mysql需要先上传Mysql数据源连接Jar包,可以在实时平台资源管理中上传并获取资源存放路径。
- 上传JDBC连接Connector
创建FlinkSQL 任务
FLinkSQL脚本
-- ADD JAR WITH 连接Mysql需要,可使用上传资源获取资源存放路径
ADD JAR WITH /data/sftp/stream/resource/0_custom_connector_mysql-connector-java-5.1.46.jar;
CREATE TABLE sourceTable(
id int,
int_data int,
varchar_data varchar
)WITH(
'properties.bootstrap.servers'='localhost:9092',
'connector'='kafka-x',
'scan.parallelism'='1',
'format'='json',
'topic'='test',
'scan.startup.mode'='latest-offset'
);
CREATE TABLE SinkTable(
id INT,
int_data INT,
varchar_data VARCHAR
)WITH(
'connector'='jdbc', --自定义connector
'url'='jdbc:mysql://localhost:3306/automation',
'username'='drpeco',
'password' = '******',
'table-name'='flink_catalog_one'
);
insert
into
SinkTable
select
*
from
sourceTable;Kafka输入:
{"id":1,"int_data":1,"varchar_data":"varchar_data","age":"1"}
{"id":2,"int_data":2,"varchar_data":"varchar_data","age":"2"}
结果输出: Mysql查询flink_catalog_one表新增两条数据