同步任务配置
新建任务
同步任务的创建入口在「数据开发->新建任务」,选择数据同步类型,之后还需选择「配置模式」和「同步模式」
配置模式 离线开发支持以向导模式或脚本模式进行同步任务的配置,向导模式的特点是便捷、简单,可视化字段映射,快速完成同步任务配置,但对部分数据源(例如Redis、ElasticSearch)不支持,而脚本模式的特点是全能、高效,可深度调优,支持全部数据源。
若选择了向导模式,可修改为脚本模式,如果选择了脚本模式,无法转换为向导模式
在脚本模式下,可以在编辑区左上角点击「导入模板」,并选择数据源类型、数据库等信息,确定后即可导入脚本模板,后续可在模板的基础上继续编辑即可。
导入模板后,之前填写的信息会被清空并覆盖
脚本模式的编辑中可能需要填写平台自身的表所在的Hadoop路径信息(Location),您可以通过在SQL脚本中输入 DESC FORMATTED tableName
命令来获取其路径,详情请参考 [获取表在Hadoop中的路径]
同步模式 标识本任务是否采用增量模式,无增量标识:可通过简单的过滤语句实现增量同步;有增量标识:系统根据源表中的增量标识字段值,记录每次同步的点位,执行时可从上次点位继续同步,关于增量同步可参考:[增量同步]
任务配置
数据同步任务的配置共分为5个步骤:
- 选择数据来源:选择已配置的数据源,系统会读取其中的数据;
- 选择数据目标:选择已配置的数据源,系统会向其写入数据;
- 字段映射:配置数据来源与数据目标之间的字段映射关系,不同的数据类型在这步有不同的配置方法;
- 通道控制:控制数据同步的执行速度、错误数据的处理方式等;
- 预览保存:再次确认已配置的规则并保存。
当选择当前项目或其他项目meta数据源作为数据来源/数据目标时,schema和表的选择范围将会存在以下限制
1、表选择时,脏数据表将不展示
2、当前项目的meta数据源进行schema选择时,仅展示当前项目的meta schema;其他项目的meta数据源引入到当前项目进行schema选择时,仅展示其他项目的meta schema。
因meta schema对应的用户是控制台配置的用户,集群下所有项目的meta数据源对应的用户都是同一个,而数据同步的用户权限判断是以数据源连接用户作为判断对象,因此针对meta schema开放schema的选项会导致任一项目可访问别的项目的meta schema
数据源/目标配置
关系型与MPP型数据库
关系型数据库与MPP数据库涵盖以下数据源:
MySQL、Oracle、SQLServer、PostgreSQL、DB2、达梦(DMDB)、PolarDB for MySQL、Greenplum、GaussDB、GBase 8a、TiDB等
Reader
关系型数据库或MPP型数据库作为数据源,需配置以下信息:
- 数据源
- 表
- 数据过滤:针对源头数据筛选条件,根据指定的column、table、where条件拼接SQL进行数据抽取,暂时不支持limit关键字过滤。利用where条件可进行增量同步,具体说明如下:
增量导入在实际业务场景中,往往会选择当天的数据进行同步,通常需要编写where条件语句,需先确认表中描述增量字段(时间戳)为哪一个。如
tableA
增量的字段为create_time
,则填写create_time>需要的日期
,如果需要日期动态变化,可以填写${bdp.system.bizdate}
、${bdp.system.cyctime}
等调度参数,关于调度参数的详细信息请参考 [参数配置]。 - 切分键:离线开发在进行数据抽取时,如果指定切分键,系统可以使用切分键的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。
- 推荐将表的主键作为切分键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。
- 目前离线开发目前支持MySQL、Oracle、SQLServer、PostgreSQL、DB2、GBase 8a、达梦、PolarDB for MySQL8、Phoenix、Greenplum,仅支持数值型字段作为切分键,不支持字符串、浮点、日期等其他类型。如果指定了不支持的类型,则忽略切分键功能,使用单通道进行同步
- 如果在第四步的「作业并发数」大于1但是没有配置切分键,任务将置为失败
- 如果不填写切分键,数据同步视作使用单通道同步该表数据
- 切分键的配置需要与第四步的
通道控制
联合使用,下面是典型的应用场景: 假设MySQL的tableA
表中,有一个INT类型的字段名为id
的主键,那么可以在切分键输入框中输入id
字段,并在第四步的"作业并发数
"中配置为5,则系统会产生5个并发线程同时读取MySQL表,每个任务会根据id
字段的值进行数据切分,保证5个并发线程读取的数据是不同的,通过以上机制即可以加速数据读取的速度。
- 高级配置:特定场景下,无法满足需求时,支持以JSON的形式传入其他参数,例如fetchSize等。
关系型数据库设置作业速率上限和切分键才能根据作业速率上限将表进行切分,只支持数值型作为切分键
Oracle通过用户名来标识Schema,如果需要同步其他Schema下的数据,则不能在下拉列表中选择表,而是直接输入schemaName.tableName,可读取/写入其他Schema的数据
Writer 关系型数据库作为数据目标,需配置以下信息:
- 数据同步目标库;
- 目标表;
- 导入前、导入后准备语句:执行数据同步任务之前率先执行的SQL语句。目前向导模式只允许执行一条SQL语句,例如:
truncate table
。 - 主键冲突的处理:
- insert into:当主键/唯一性索引冲突时会写不进去冲突的行,以脏数据的形式体现,脏数据的配置与管理请参考 [脏数据管理];
- replace into:没有遇到主键/唯一性索引冲突时,与insert into行为一致,冲突时,先delete再insert,未映射的字段会被映射为NULL;(仅MySQL支持选到replace into)
- on duplicate key update:当主键/约束冲突,update数据,未映射的字段值不变;
字段映射 请参考[字段映射的通用功能]
MySQL分库分表
目前分库分表模式仅支持MySQL数据源,其他数据源暂不支持 mysql分库分表实际是把表结构完全相同的多张表同步到一张目标表里。所以在数据预览的时候只默认显示第一张表的数据样例。所有数据同步任务都实际只支持单张表的同步(MySQL分库分表是特殊),不存在多张表同步到多张表的同步功能。
同一个同步任务,可以同时并发(或串行)读取多个库、多个表,适用于业务库采用了分库分表的情况下,配置方式比较简单,仅需在同一个同步任务中添加多个库、多个表即可。 在页面上勾选「批量选择」,还可以支持按表名搜索、批量选中等操作,便于对大量表配置数据同步。 分库分表的一些限制条件:
- 选中的库、表,表结构需要保持一致(字段名、字段类型),系统不会进行检查,表结构的一致性由用户保证;
- 若MySQL采用分库模式,则不同的数据库需配置不同的数据源,需要在「数据源」模块单独配置;
- 若配置了并发度,则每个并发会分别读取一张表的数据;
仅支持MySQL分库分表数据的读取,不支持写入
Batcworks仅支持关系型数据库的普通数据类型,暂时不支持blob、clob、地理空间等特殊类型的数据读/写
MySQL Sharding-Proxy
上文中提到,在MySQL中可以通过添加多个数据源,选择多个结构相同的真实表,来实现分库分表的功能。
在离线开发中还支持直接选择逻辑表(总表),来实现分库分表的功能。
在数据源中心配置shading-proxy版本的数据源,在离线开发中引入后,可以在数据同步任务中选择shading-proxy数据源,直接选中逻辑表进行数据同步。
sharding-proxy版本数据源在离线开发中也是展示为MySQL数据源
大数据存储型
Hive1.x / Hive 2.x
- Hive1.x与Hive2.x的配置是基本相同的,详情请参考 [Hive1.x与Hive2.x的区别]
- Hive1.x与Hive2.x仅支持Textfile、ORC、Parquet 这3种数据格式,同步的原理请参考 [FlinkX读写Hive表的原理]
Reader
- 分区:当Hive表作为数据源时,可读取某个分区下的数据:
- 分区填写栏支持填写参数,支持的参数请参考 [参数配置]
- 非分区表,不用填写;
- 一级分区表,不填写分区,则会读取所有分区数据;填写了分区名(例如
pt = '20200503'
),则只读取指定分区的数据; - 多级分区表,可以填写多级分区形式(例如
ptd='20200503' /pdh='120000'
),多级分区表,如果只填写一级分区信息(例如ptd='20200503'
),则会将下属的所有二级分区数据全部读取; - 分区支持识别逻辑运算符“>” “=” “<” “and”,比如比如“pt>= ${xxx} and pt <= ${yyy}”,即代表读取范围在${xxx} ~ ${yyy}的所有分区
Writer
- 分区:当Hive表作为写入目标时,可将数据写入某个分区:
- 分区填写栏支持填写参数,支持的参数请参考 [参数配置]
- 动态分区机制:当写入的分区名不存在时,系统按用户指定的分区名自动创建
- 对于非分区表,无需填写;
- 一级分区表,必须填写(例如
pt = '20200503'
) - 多级分区表,必须填写至最末一级分区(例如
ptd='20200503' /pdh='120000'
)
- 写入模式:
- insert overwrite:写入前将清理已存在的数据文件,之后执行写入(默认值);
- insert into:写入前已有的数据不会删除,系统会写入的新的文件;
不支持路由写入,对于分区表请务必保证写入数据到最末一级分区(HDFS上一个确定的目录)
字段映射 请参考[字段映射的通用功能]
Maxcompute
MaxCompute Reader不支持数据过滤功能。如果您在数据同步过程中,需要过滤符合条件的数据,请创建新表并写入过滤数据后,同步新表中的数据。 MaxCompute Reader不支持同步外部表。
Reader 分区:当Maxcompute表作为数据源时,可读取某个分区下的数据:
- 分区填写栏支持填写参数,支持的参数请参考 [参数配置]
- 非分区表,不用填写
- 一级分区表,不填写分区,则会读取所有分区数据;填写了分区名(例如 pt = '20200503' ),则只读取指定分区的数据;
- 多级分区表,可以填写多级分区形式(例如 ptd='20200503' /pdh='120000'),多级分区表,如果只填写一级分区信息(例如 ptd='20200503'),则会将下属的所有二级分区数据全部读取;
Writer
写入模式:
- insert overwrite:写入前将清理已存在的数据文件,之后执行写入(默认值);
- insert into:写入前已有的数据不会删除,系统会写入的新的文件;
半结构化存储
HDFS
离线开发支持读取/写入HDFS数据源内的数据,但仅支持半结构化数据,其中必须存储CSV或文本格式,且其中的内容必须按照二维表的结构存储,不建议使用此功能进行图像、视频等非结构化数据同步
Reader
- 路径:待读取的HDFS文件路径,例如
/user/hive/myTable/
如果最后以一个路径,而不是文件名结尾,则会读取这个路径下的所有文件。如果以文件名结尾(例如/user/hive/myTable/a.csv
),则只读取指定的文件。若要读取的文件路径,多个路径可以用逗号隔开。 - 分隔符:默认为
\001
,支持填写可见字符,例如,
,同时支持通过\
做转义配置不可见字符,例如,
填写可见字符时,只支持1个字符。转义字符需根据8进制ASCII编码填写对应的值,例如\044
表示$
- 文件类型:
- Textfile类型
- 列分隔符:当文件类型为textfile时,需要填写列分隔符,只能填写1个字符(默认值\001)。
- 编码:仅支持UTF-8或GBK,暂不支持其他字符。
- ORC类型、Parquet类型,无需配置分隔符、编码等信息。
- Textfile类型
Writer
- 路径:待写入的HDFS文件路径,例如
/user/hive/myTable
,Writer会根据并发配置在目录下写入多个文件。 - 叶子路径:指定文件存放子路径,存在着写入该子路径,不存在则新建该子路径并写入。
- 文件名:指定文件名称
- 文件类型:
- Textfile类型
- 列分隔符:当文件类型为textfile时,需要填写列分隔符,默认为
\001
,支持填写可见字符,例如,
填写可见字符时,只支持1个字符。同时支持通过\
做转义配置不可见字符,例如\001
。转义字符需根据8进制ASCII编码填写对应的值,例如\044
表示$
。 - 编码:仅支持UTF-8或GBK,暂不支持其他字符。
- 列分隔符:当文件类型为textfile时,需要填写列分隔符,默认为
- ORC类型、Parquet类型,分隔符、编码信息是无效的。
- Textfile类型
高级配置中存在参数“strictMode”,当“strictMode”为true时,开启严格模式。当“strictMode”为false时,开启宽松模式。不填写时,默认为宽松模式。
宽松模式下,指定叶子路径下的文件名,允许存在多个文件名(填写的文件名作为前缀,例如文件名填写test,则实际同步写入时,可能会拆分成test1、test2,当存在多并行度、数据量较大时,都会拆分成多个文件)。宽松模式下,支持多并行度 、断点续传。
严格模式下,指定叶子路径下的文件名,仅允许存在一个文件名,多并行度功能将不生效,断点续传功能将不生效。在通道控制页面隐藏,
写入时的字段分隔符,需要用户保证与创建的Hive表的字段分隔符一致,否则无法在Hive表中查到数据
字段映射 HDFS字段映射的功能,已[字段映射的通用功能]为基础,除此之外,当HDFS作为源或者目标时,还支持其他配置方式:
- 添加单个字段:点击后在弹窗中补充索引值和字段类型。
- 索引值:表示读取/写入文件中的字段序号,从0开始,不可重复
- 字段类型:建议填写string类型,字段类型不区分大小写。由于HDFS是无类型的存储系统,建议采用string类型,同步过程中不易出错
- 文本模式:添加单个字段较为繁琐,也可以采用文本模式,批量编辑字段信息,点击「文本模式」,在弹窗中批量编辑。批量编辑的语法格式为:
index:type,
每行一个字段,每一行填写索引值、英文冒号、类型,且以英文逗号结尾,注意最后一行也要以英文逗号结尾 - 添加或删除字段后,可点击每个字段后面的「删除」icon,删除此字段。新添加的字段都会排在最后,若误删除中间的字段,可用文本模式重新添加在中间位置
- 拷贝源字段/目标字段:当数据源、目标为FTP、HDFS等无类型的数据源,且另一半为有类型的数据源时,可点击「拷贝源字段」或「拷贝目标字段」来快速复制有类型的一方,降低批量配置的工作量
FTP
支持FTP、SFTP协议
Reader
路径:
单个文件:
读取FTP路径下的单个文件时,需要在「路径」框中填写路径+指定的文件名(含扩展名),例如:/data/shier/muyun/singleFile.csv
系统将读取指定的文件,注意,路径、文件名区分大小写。
多路径下多个文件:
离线支持读取FTP某个路径下的多个文件,需要在「路径」框中不指定具体文件,例如:
/data/multiFile ``.../largeFile_20200613_part1.csv ``.../largeFile_20200613_part2.csv
在这种场景下,可在「路径」框中填写为:/data/multiFile/
,(注意,在路径最后写上/
,在正则匹配预览中可以明确预览待读取的文件)即可读取此路径下的2个文件
单个路径下读取多个文件,需要保证这些文件结构的一致性,若不同文件结构不一致,则会报错
路径的通配符(仅支持FlinkX1.10):
支持按照正则的方式批量匹配读取的文件
/largeFile_20200613_part1.csv
/largeFile_20200613_part2.csv
/smallFile_20200613_wieieiweiekdkkd29e.csv
/smallFile_20200613_kskdoskdsnkf2023e.csv
例如:只需要读取以 smallFile_20200613_
开头,任意字符结尾的文件,「路径」框填写: /smallFile_20200613_.*
,其中 .*
表示匹配任意字符
正则匹配只对文件名起效,对路径不起效,当指定的正则表达式中既包含路径,又包含文件时,只能匹配到文件,例如:
/data/abc.csv
/data/abc
.../fileName1.csv
.../fileName2.csv
在「路径」框中填写 /data/ab.*
,只能匹配到 /data/abc.csv
文件,不能匹配到 /data/abc/
下属的文件及目录
正则匹配的几种常见写法: 匹配任意字符:`. * 以…开头:
^ * 以…结尾:
$ * 正则转义字符:
/`
路径中的任务运行参数:
「路径」栏支持填写任务运行参数,可支持的任务运行参数需参考: 参数配置
「路径」中填写为:/data/${bdp.system.currmonth}/${bdp.system.bizdate}/fileName......
假设当前日期为2022年6月14日,FTP中对应今天的路径为: /data/202206/20220613/fileName......
多个路径:
多路径同步与单个路径是相同的,同样支持正则表达式、系统变量,不再赘述。
解析方式:
离线支持解析CSV、EXCEL、Txt、自定义解析方式多种文件类型,同时支持zip格式(要求包内文件类型为csv/txt)。但一次数据同步任务中只能同步一种数据类型的文件。下文中会详细介绍如何使用自定义解析方式
列分隔符:
默认为英文逗号 ,
注意,填写可见字符时支持多个字符。支持配置不可见字符,需通过 \
做转义字符,例如 \001
。转义字符需根据8进制ASCII编码填写对应的值,例如 \044
表示 $
txt解析支持单分隔符、多分隔符、不可见分隔符;csv解析支持单分隔符,单不可见分隔符
编码:
对于csv和txt两种格式的文件,支持UTF-8和GBK 2种编码格式
是否包含表头:
可以选择是或否,如下图所示。选择‘是’时,“姓名”“学号”“班级”等表头默认不进行读取。
是否替换Null值:
选择否,NULL值不进行替换
选择是,可以将NULL值替换为具体字符。
选择是且输入框为空,则替换为空值
同步后文件处理:
支持选择“删除”“重命名”“移动到指定路径”三种方式。
删除:数据同步完成后,对FTP读取对文件做删除处理
重命名:数据同步完成后,对FTP读取对文件做重命名操作,可以给文件命名添加前缀
移动到指定路径:数据同步完成后,对FTP读取对文件做移动到指定路径处理,可以填写指定路径
Writer
路径:
写入FTP的目录地址,注意必须指定到目录,以 /
结束,此处和Reader中的路径填写方式一致支持正则匹配和任务运行参数。
文件名称:
指定写入文件的文件名称。写入FTP数据源时,会根据用户在此填写的文件名称生成新的文件,需要注意填写文件格式后缀,支持任意后缀名。
编码:
对于csv和txt两种格式的文件,支持UTF-8或GBK,默认为UTF-8
列分隔符:
默认为英文逗号,与Reader中的列分隔符相同
写入模式:
append:按文件名称覆盖写入; overwrite:先清空目录下的文件然后写入; nonconflict:按文件名称查找,存在同名文件则报错,不存在同名文件则可正常写入; insert:文件追加写入,存在同名时通过添加后缀的方式修改新文件的文件名称;
字段映射
FTP字段映射功能与HDFS十分类似,请参考[HDFS]
FTP支持文件名称落库
字段映射source端下新增按钮「添加文件名称字段」,点击后会在source端新增‘dtstack_internal_file_name’,类型为‘STRING’。
FTP自定义解析方式(包含自定义并发读取)
概述
当FTP作为数据源进行同步时,支持选择csv、txt等文件格式类型,从而用对应的方式去解析,当FTP中存在一些自定义的文件格式类型时,需要支持自定义解析方式对文件进行解析。
创建并使用自定义解析方式如下:
一、根据demo编写jar包
(1)maven工程构建
在IDEA中构建出maven工程,随后在工程的pom.xml文件中引入以下sdk依赖:
<dependency>
<groupId>com.dtstack.flinkx</groupId>
<artifactId>flinkx-extend</artifactId>
<version>1.12-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
注意点:
flink原生依赖引入后,依赖的scope级别一定要为provided,不要是compile级别。否则会出现类冲突的问题。
目前flinkx ftp工程自依赖的jar如下,如果需要用到以下依赖那么scope也为provided即可:
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.51</version>
</dependency>
<dependency>
<groupId>net.sourceforge.javacsv</groupId>
<artifactId>javacsv</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>4.1.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>6.1.26</version>
<scope>provided</scope>
</dependency>
- 其他非flink依赖的scope为compile级别即可。
(2)接口实现
正常情况下,如果要实现自定义文件的解析,引入以上依赖后,只需要实现com.dtstack.flinkx.ftp.format.IFileReadFormat
接口即可。在这里对该接口中的方法简单说明下:
void open(File file, InputStream inputStream, IFormatConfig config) throws IOException
用来做读取文件前的初始化准备。比如初始化连接、文件流的开启等等。另外一些常量的初始化工作也可放在该方法中进行。
入参说明:
File file : 将要读取的文件。该参数一般在要记录文件的元信息时才会使用到。 InputStream inputStream: 将要读取的文件流。在open方法中可通过将流转换为其他流来提高读取效率。 IFormatConfig config:自定义解析常用配置。文件的编码、分隔符,字段数组等等。这些参数定义了自定义解析的基本规则。
/**
* Opens a file read format to read the inputStream.
*
* @param inputStream the inputStream that need to read.
* @throws IOException Thrown, if the inputStream could not be opened due to an I/O problem.
*/
void open(File file, InputStream inputStream, IFormatConfig config) throws IOException;
|boolean hasNext() throws IOException;
用来判断当前文件是否读取完毕。
|String[] nextRecord() throws IOException;
返回下一行的数据。返回的类型是数组,代表的是一行的各个字段的真实数据。
|void close() throws IOException;
关闭当前资源。
(3)Demo示例
下面是自定义解析XML文件接口实现示例:
/** @author shitou @date 2022/6/23 * */
public class XmlFileReadFormat implements IFileReadFormat {
private static final Logger LOG = LoggerFactory.getLogger(XmlFileReadFormat.class);
// 用来描述每一行的具体列
private String[] fields;
// 当前数据读取节点位置
private int index;
// 根据节点名称消费的xml文件中的节点列表
private NodeList nodeList;
private InputStream intStream;
// 指定要解析的节点名称
private String tagName = "student";
@Override
public void open(File file, InputStream inputStream, IFormatConfig config) throws IOException {
LOG.info("open file : {}", file.getFileName());
this.fields = config.getFields();
this.intStream = inputStream;
this.index = 0;
// 初始化DocumentBuilderFactory
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
try {
DocumentBuilder documentBuilder = dbFactory.newDocumentBuilder();
// 解析inputstream,将流转换为Document用来进行doc读取
Document document = documentBuilder.parse(inputStream);
document.getDocumentElement().normalize();
this.nodeList = document.getElementsByTagName(tagName);
} catch (SAXException | ParserConfigurationException e) {
throw new RuntimeException("can not open this file, filename: " + file.getFileName());
}
}
@Override
public boolean hasNext() throws IOException {
// 判断当前节点列表是否消费完毕
return index < nodeList.getLength();
}
@Override
public String[] nextRecord() throws IOException {
// 返回下一行的数据
Node item = nodeList.item(index);
final String[] data = new String[fields.length];
if (item.getNodeType() == Node.ELEMENT_NODE) {
Element element = (Element) item;
IntStream.range(0, data.length)
.forEach(
i -> {
data[i] =
element.getElementsByTagName(fields[i])
.item(0)
.getTextContent();
});
}
index++;
return data;
}
@Override
public void close() throws IOException {
IOUtils.closeQuietly(intStream);
}
}
下面是自定义并发读取接口实现示例:
不切割文件,如excel:
public class DefaultFileSplit implements ConcurrentFileSplit {
@Override
public List<FtpFileSplit> buildFtpFileSplit(
IFtpHandler handler, IFormatConfig config, List<String> files) {
List<FtpFileSplit> ftpFileSplits = new ArrayList<>();
for (String filePath : files) {
try {
String fileName = FileUtil.getFilename(filePath);
//获取文件大小
long currentFileSize = handler.getFileSize(filePath);
FtpFileSplit fileSplit = new FtpFileSplit(0, currentFileSize, filePath, fileName);
ftpFileSplits.add(fileSplit);
} catch (Exception e) {
throw new FlinkxRuntimeException(e);
}
}
ftpFileSplits.sort(compare());
return ftpFileSplits;
}
/**
* 先根据文件路径排序 再根据文件里面开始的偏移量排序, 从小到大
*
* @return
*/
@Override
public Comparator<FtpFileSplit> compare() {
return Comparator.comparing(FtpFileSplit::getFileAbsolutePath)
.thenComparing(
FtpFileSplit::getStartPosition,
(a, b) -> {
return a.compareTo(b);
});
}
}
切割文件:如csv,txt
public class ConcurrentCsvSplit extends DefaultFileSplit {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentCsvSplit.class);
@Override
public List<FtpFileSplit> buildFtpFileSplit(
IFtpHandler handler, IFormatConfig config, List<String> files) {
List<FtpFileSplit> fileSplits = new ArrayList<>();
for (String filePath : files) {
fileSplits.addAll(analyseSingleFile(handler, config, filePath));
}
return fileSplits;
}
/**
* 对单个文件进行处理, 如果当前文件大小大于配置的maxFetchSize, 表示这个文件需要进行并发读取
* 将文件大小 / 并发数,获取每次大约要处理的数据量
* 根据每次大约要处理的数据量, 单个字节读取, 获取下一个'\n'的位置, 构建FtpFileSplit
* @param ftpHandler
* @param config
* @param filePath
* @return
*/
public List<FtpFileSplit> analyseSingleFile(
IFtpHandler ftpHandler, IFormatConfig config, String filePath) {
List<FtpFileSplit> ftpFileSplits = new ArrayList<>();
String columnDelimiter = config.getColumnDelimiter();
try {
long currentFileSize = ftpHandler.getFileSize(filePath);
String filename = FileUtil.getFilename(filePath);
if (currentFileSize > config.getFetchMaxSize()) {
int perSplit =
Math.min(
(int) currentFileSize / config.getParallelism(), Integer.MAX_VALUE);
long startPosition = 0;
long endPosition = startPosition + perSplit;
while (endPosition <= currentFileSize) {
if (endPosition == currentFileSize) {
FtpFileSplit ftpFileSplit =
new FtpFileSplit(startPosition, endPosition, filePath, filename);
ftpFileSplits.add(ftpFileSplit);
break;
}
InputStream input = ftpHandler.getInputStreamByPosition(filePath, endPosition);
boolean notMatch = true;
while (notMatch) {
notMatch = false;
for (char c: columnDelimiter.toCharArray()) {
char ch = (char) input.read();
endPosition += 1;
if (ch != c) {
notMatch = true;
break;
}
}
}
FtpFileSplit ftpFileSplit =
new FtpFileSplit(startPosition, endPosition, filePath, filename);
ftpFileSplits.add(ftpFileSplit);
LOG.info(
String.format(
"build file split, filename: %s, startPosition: %d, endPosition: %d",
filePath, startPosition, endPosition));
startPosition = endPosition;
endPosition = startPosition + perSplit;
}
if (endPosition != currentFileSize) {
FtpFileSplit ftpFileSplit =
new FtpFileSplit(startPosition, currentFileSize, filePath, filename);
ftpFileSplits.add(ftpFileSplit);
LOG.info(
String.format(
"build file split, filename: %s, startPosition: %d, endPosition: %d",
filePath, startPosition, currentFileSize));
}
} else {
ftpFileSplits.add(new FtpFileSplit(0, currentFileSize, filePath, filename));
}
ftpFileSplits.sort(compare());
return ftpFileSplits;
} catch (Exception e) {
throw new FlinkxRuntimeException(e);
}
}
}
(4)编译打包
将对应的maven工程进行编译打包
二、上传资源
jar包封装完成后,在「数据开发->资源管理」中进行上传。
租户资源是同一租户下所有项目共享的资源目录
三、引用jar包
创建数据同步任务后,数据来源选择「FTP」数据源,解析方式选择「自定义」,可以在弹窗中选择上传的自定义jar包
填写jar包中对应方法的类名,这样就完成了FTP自定义解析方式的配置。
FTP数据同步支持打印同步信息
文件总数,文件内容,文件数据总行数,总耗时
分析型数据库
Impala
当Impala表为Textfile、ORC或者Parquet格式时,使用方式与普通Hive表类似,可参考[Hive表配置]。
Reader
若为Impala+Kudu的形式,仅需选择数据源和表即可,无需配置读取的分区信息。
Writer
若为Impala+Kudu的形式,不能指定具体分区,需补充写入模式。
- 写入模式:
- Insert:写入
- update:更新
- upsert:若主键在表中已经存在,则执行update语义,反之,执行insert。
- flushMode:kudu session刷新模式,
- auto_flush_sync(默认值):同步刷新模式,客户端会等数据刷新到服务器后再返回,这种情况就不能批量插入数据
- auto_flush_background:异步刷新模式,客户端会立即返回,但是写入将在后台发送,可能与来自同一会话的其他写入一起进行批处理。
- manual_flush:手动刷新模式。
flushMode参数不支持页面修改,需在脚本模式中修改
字段映射
请参考[字段映射的通用功能]
Kudu
参考[Impala]一节的描述
Kudu使用数据过滤语句时,不支持写带函数的过滤语句,是官方插件的限制。仅可填写 where column >,=,< 某个值。
Clickhouse
Reader
- 数据过滤:您将要同步数据的筛选条件,暂时不支持limit关键字过滤。SQL语法与选择的数据源一致。 where条件可以有效地进行业务增量同步。如果不填写where语句,数据同步均视作同步全量数据。
- 切分键:可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。 读取数据时,根据配置的字段进行数据分片,实现并发读取,可以提升数据同步效率。
Writer
- 导入前准备语句:写入数据至目标表前,会先执行此处的标准语句。
- 导入后准备语句:写入数据至目标表后,会执行此处的标准语句。
- 写入模式:写入前已有的数据不会删除,系统会写入的新的文件。
NOSQL
HBase
Reader
- startRowkey:指定开始rowkey;
- endRowkey:指定结束rowkey;
- isBinaryRowkey:指定配置的startRowkey和endRowkey转换为byte[]时的方式,默认值为false,若为true,则调用Bytes.toBytesBinary(rowkey方法进行转换;若为false:则调用Bytes.toBytes(rowkey
Writer
- 读取为null值时的处理方式:
- skip:表示不向hbase写这列;
- empty:写入HConstants.EMPTY_BYTE_ARRAY,即new byte [0]
字段映射 HBase的字段映射在 [字段映射的通用功能]的基础上,多了构造rowkey的参数的配置,离线开发支持字符串拼接或取md5函数的方式构造rowkey。
- rowkey:
- 目前仅支持从HBase结果表中进行rowkey的拼接,暂不支持从源表中取值进行拼接,因此在写入HBase时,组成rowkey元素的字段信息必须来自结果表字段;
- 编写格式,
$(colFamily:colName
分别表示列族:列名,中间为英文冒号隔开 - 字符串拼接写法,例如:
$(cf_user:name_$(cf_mrkt:dist
,取HBase结果表的cf_user:name
和cf_mrkt:dist
拼接后作为rowkey值;下划线_
也可以替换为其他一个或多个字符,系统仅识别$(的参数 - md5(函数:使用时,必须以在文本框中以md5开头,支持多字段拼接,例如:
md5($(cf_user:name_$(cf_mrkt:dist
接口
RESTful
在「数据开发-新建任务-数据同步」,可将restful作为数据来源和选择目标。
Reader
请求参数:
json格式填写请求参数,支持填写运行参数
Header:
json格式填写header,支持填写运行参数
Body:
json格式填写body,支持填写运行参数
以上参数可以再配置数据源的时候进行配置,若同一restful数据源要在不同的任务里有不同的参数,可以把参数填在任务里
返回类型:
支持选择JSON、CSV、XML格式。选择CSV格式时,需要填写分隔符
数据主体:
返回类型选择JSON、XML格式时,因实际需要读取的数据很多场景下不在返回内容的第一层,需要用户指定数据主体,即数据从哪一层读取。数据主体的指定方式为JSONPath(写法例如xxx),若没有指定,平台将从返回结果的第一层读取。
数据主体的填写内容如果与运行参数内容一致,平台将优先识别为运行参数
如下所示,需要读取的数据可能不在JSON第一层,例如```
{
"people": [
{ "firstName": "Brett", "lastName":"McLaughlin", "email": "aaaa" },
{ "firstName": "Jason", "lastName":"Hunter", "email": "bbbb"},
{ "firstName": "Elliotte", "lastName":"Harold", "email": "cccc" }
]
}
如果指定数据主体"$.people”或“${people}",则会输出三条数据对应为
//第一条数据
{ "firstName": "Brett", "lastName":"McLaughlin", "email": "aaaa" },
//第二条数据
{ "firstName": "Jason", "lastName":"Hunter", "email": "bbbb"},
//第三条数据
{ "firstName": "Elliotte", "lastName":"Harold", "email": "cccc" }
在做字段映射的时候,reader端的字段名只需要配置字段名称即可(嵌套key不支持,只能支持第一层字段名称选择)
当数据主体为多层级时,可在 .开头后面加上字段名称,多层级后面继续用 英文符号 . 加上字段名称进行追加表示层级
$开头,以英文字符点 . 开头后面加上字段名称,多层级后面继续用英文符号 . 加上字段名称进行追加表示层级
$开头,后面用大括号 {} 包围字段名称,里面字段名称用 英文符号 . 对每一层级字段名称进行连接
{
"data":{
"peopleList":[
{
"id":1,
"name":"xx1"
}
]
}
}
如上JSON所示,如果指定数据主体$.data.peopleList或${data.peopleList},则会输出数据对应为
{ "id":1,"name":"xx1"}
数据主体对应的数据结构必须是数组格式,否则任务异常结束(只有数据格式数据才能划分为多条数据输出)
指定数据主体之后,column里的每个字段名称只能指定为数据主体里每个数据的第一层key名称
请求次数的适用场景:
当返回类型为JSON或XML格式时,可选择单次请求和多次请求。若选择多次请求,需要填写“多次请求参数”、“起始值”、“步长”、“终止值”。每次请求的参数值都是动态变化的:多次请求参数=多次请求参数+步长。当多次请求参数>终止值时,请求终止。
以JSONPath格式指定,可以选择一段离散的值发起多次请求获取数据。例如用“$.msg.req”作为多次请求参数,起始值为1,步长为1,终止值为5,则会请求页面为1-5页的所有数据。不支持指定数组里的某个值的写法,例如“$.msg.req[0]”。
配置多次请求参数$.msg.req , 起始值 1 , 步长 1 ,终止值5。 代表读取1-5页数据
第一次请求 pageNo参数值为1, 终止条件 1 + 1 < 5 ,任务继续
第二次请求 pageNo参数值为2,终止条件 2+ 1 < 5 ,任务继续
第三次请求 pageNo参数值为3, 终止条件 3+ 1 < 5 ,任务继续
第四次请求 pageNo参数值为4,终止条件 4+ 1 = 5 ,任务继续
第五次请求 pageNo参数值为5, 终止条件 5 + 1 > 5 ,任务结束
也就是说配置的终止条件是5,但是第五次请求仍然是发送的,在构建第六次请求的时候,发现pageNo为6 大于5此时条件不成立,任务结束。
Writer
Header:
json格式填写header,支持填写运行参数
数据主体:
非必填
因需要写的数据可能不在JSON的第一层,例如:
{
"people": [
{ "firstName": "Brett", "lastName":"McLaughlin", "email": "aaaa" },
{ "firstName": "Jason", "lastName":"Hunter", "email": "bbbb"},
{ "firstName": "Elliotte", "lastName":"Harold", "email": "cccc" }
]
}
此时需要按JSONPath指定数据主体为“$.people”或“${people}”,从people下一层开始读取。不填写时直接从第一层写数据。
写restful时如果单次数据写入有最大值限制,则可通过调batchsize分批写入;
例如
[ "Brett", "McLaughlin", "aaaa" ]
[ "Jason","Hunter", "bbbb"]
[ "Elliotte", "Harold", "cccc" ]
数据主体按照jsonPath的格式选择,如${data},则发送的数据格式为
{
"data":
[
{ "firstName": "Brett", "lastName":"McLaughlin", "email": "aaaa" },
{ "firstName": "Jason", "lastName":"Hunter", "email": "bbbb"},
{ "firstName": "Elliotte", "lastName":"Harold", "email": "cccc" }
]
}
如果多层级,数据主体指定为 ${data.list}
{
"data":{
"list":
[
{ "firstName": "Brett", "lastName":"McLaughlin", "email": "aaaa" },
{ "firstName": "Jason", "lastName":"Hunter", "email": "bbbb"},
{ "firstName": "Elliotte", "lastName":"Harold", "email": "cccc" }
]
}
}
没有指定数据主体,发送一个数组
[
{ "firstName": "Brett", "lastName":"McLaughlin", "email": "aaaa" },
{ "firstName": "Jason", "lastName":"Hunter", "email": "bbbb"},
{ "firstName": "Elliotte", "lastName":"Harold", "email": "cccc" }
]
restful不支持多并行度读取与写入,也不支持断点续传
get请求的restful数据源暂不支持指定为数据同步目标端
一键生成目标表
在数据同步任务中,若需要将数据同步至一张新表,可点击表名右侧的「一键生成目标表」按钮
图1.1
“一键生成目标表”功能仅在源表和目标表为表1.1中的类型时存在。 表1.1
源表类型 | 目标表类型 |
---|---|
关系型数据库: MySql、Oracle、SqlServer、PostgreSql、DMDB、DB2、PolarDB、Kingbase 大数据存储: Hive、Phoenix、MaxCompute、SparkThrift 分析型数据库: AnalyticDB、CarbonData、ClickHouse、Impala、TiDBMPP、SAP HANA 数据库: GBase、GaussDB、GreenPlum | 关系型数据库: MySql、PostgreSQL 、Oracle 大数据存储: Hive、SparkThrift 分析型数据库: TiDB、AnalyticDBMPP、SAP HANA、Doris 数据库: LibrA |
“一键生成目标表”中来源表字段类型与生成的目标表字段映射关系如表1.2; 数据来源表类型未做限制,仅对目标表数据源类型做限制,不同数据源类型对应不同字段映射关系如表1.2。
表1.2
源表字段类型 | 目标表字段类型 | ||
---|---|---|---|
Hive及其他 | GaussDB、Greenplum、PostgreSQL | Tidb、MySQL | |
DECIMAL | DECIMAL 若无精度,默认为(15,0 | ||
INT(数字长度>11位 | BIGINT | ||
类型中包含 unsigned | BIGINT | ||
INT UNSIGNED | INT | ||
UINT8、UINT16、INT8、INT16、INT32 | INT | ||
UINT32、UINT64、INT64 | BIGINT | ||
FLOAT32 | FLOAT | ||
FLOAT64 | DOUBLE | ||
DOUBLE | - - | DOUBLE PRECISION | - - |
String | - - | - - | varchar(255 |
number(15,5 | 若小数位精度为0,则转为BIGINT,否则转为DOUBLE | 若小数位精度为0,则转为BIGINT,否则转为DOUBLE PRECISION | 若小数位精度为0,则转为BIGINT,否则转为DOUBLE |
特殊:
源表字段类型 | 目标表字段类型 |
---|---|
本单元格需要与上方单元格合并(本行字可删除) | 目标表类型:AnalyticDB PostgreSQL |
任何类型 | varchar |
点击一键生成目标表出现如图,弹窗中的建表语句可直接修改。
请注意检查建表语句中的字段类型:来源表和目标表的字段类型若不匹配可能导致产生脏数据或同步失败。
点击确定后,若建表成功,则会自动将新建的表的表名填充至“表名”文本框中。
通道控制
高级配置
通道控制处的高级配置,主要支持以下几个参数的配置
speed
speed 用于配置任务并发数及速率限制。具体配置如下所示:
"speed" : {
"channel": 1,
"readerChannel": -1,
"writerChannel": -1,
"bytes": 0,
"rebalance" : true
}
名称 | 说明 | 是否必填 | 默认值 | 数据类型 |
---|---|---|---|---|
channel | 任务并发数 | 否 | 1 | Integer |
readerChannel | source 并行度,-1 代表采用全局并行度 | 否 | -1 | Integer |
writerChannel | sink 并行度,-1 代表采用全局并行度 | 否 | -1 | Integer |
bytes | bytes >0 则表示开启任务限速 | 否 | 0 | Long |
rebalance | 是否强制进行 rebalance,开启会消耗性能 | 否 | false | Boolean |
ErrorLimit
errorLimit 用于配置任务运行时数据读取写入的出错控制。具体配置如下所示:
"errorLimit" : {
"record": 100,
"percentage": 10.0
}
名称 | 说明 | 是否必填 | 默认值 | 参数类型 |
---|---|---|---|---|
record | 错误阈值,当错误记录数超过此阈值时任务失败 | 否 | 0 | Integer |
percentage | 错误比例阈值,当错误记录比例超过此阈值时任务失败 | 否 | 0.0 | Double |
除了参数“rebalance”外,其他常用的参数在向导模式已经作为功能支持。因此在高级配置中通常配置“rebalance”参数,值为“true”时,代表开启数据倾斜调优功能。默认为“false”,不开启。
同步任务环境参数
在同步任务的「环境参数」中,运行方式(flinkTaskRunMode
)参数较为重要,任务运行方式有2种:
- per_job:单独为任务创建flink yarn session,任务运行的资源有保障,提高任务运行稳定性
- new:多个任务共用一个flink yarn session,默认new,适合小数据量同步,节约集群资源
设置方式,在任务的「环境参数」中,修改/添加此参数
## flinkTaskRunMode=new
,其中 ##标识为注释状态,用户需要取消注释才能生效
同步任务元数据校验
在数据来源和数据目标高级配置中将参数metadataCheck设置为true,那么该数据同步任务在进行临时运行、手动运行、周期运行时将会进行元数据校验,并在告警信息中打印失败原因。
主要校验有以下几种情况:字段增加、字段减少、字段类型变更。目前数据来源支持:ADB、Clickhouse、DB2、DMDB、GBase、Greenplum、Hive、Impala(hive表)、Inceptor、KingBase、Kudu、MySQL、Oracle、Phoenix、PolarDB for MySQL8、PostgreSQL、HANA、SparkThrift、SQLServer、TiDB、SyBase、Doris(jdbc);数据目标支持:ADB、Clickhouse、DB2、DMDB、GBase、Greenplum、Hive、Impala(hive表)、Inceptor、KingBase、Kudu、MySQL、Oracle、Phoenix、PolarDB for MySQL8、PostgreSQL、HANA、SparkThrift、SQLServer、TiDB、Doris(jdbc)
同步任务数据来源为空校验
在数据来源的高级配置中将参数checkTableEmpty设置为true,那么该数据同步任务在进行临时运行、手动运行、周期运行和补数据运行时将会进行数据来源的表是否为空的校验,如果源表为空,则同步任务置为提交失败,源端数据不会写入目标端。