Skip to main content

Hive Sink

一、介绍

FlinkX只有Hive sink插件,没有Hive source插件,如需要读取Hive表中的数据,请使用HDFS source插件。

Hive sink插件支持实时地往Hive表中写数据,支持自动建表并根据当前系统时间自动创建分区,支持动态解析表名及分组映射,根据映射规则将不同的数据写入不同的Hive表。

Hive sink插件一般配合实时采集(CDC)插件、kafka source插件等实时类的插件一起使用。

Hive sink插件底层依赖HDFS sink,其基本原理也是向指定的HDFS路径中写入数据文件,可以看做是在HDFS sink上做了一些自动建表建分区及分组映射等拓展功能。

Hive sink插件使用时需要开启checkpoint,在checkpoint后数据才能在Hive表中被查出。在开启checkpoint时会使用二阶段提交,预提交时将.data目录中生成的数据文件复制到正式目录中并标记复制的数据文件,提交阶段删除.data目录中标记的数据文件,回滚时删除正式目录中标记的数据文件。

二、支持版本

Hive 1.x、Hive 2.x

四、参数说明

1、SQL

  • url

    • 描述:连接Hive JDBC的字符串
    • 必选:是
    • 字段类型:string
    • 默认值:无
  • username

    • 描述:Hive认证用户名
    • 必选:否
    • 字段类型:string
    • 默认值:无
  • password

    • 描述:Hive认证密码
    • 必选:否
    • 字段类型:string
    • 默认值:无
  • partition

    • 描述:分区字段名称
    • 必选:否
    • 字段类型:string
    • 默认值:pt
  • partition-type

    • 描述:分区类型,包括 DAY、HOUR、MINUTE三种。若分区不存在则会自动创建,自动创建的分区时间以当前任务运行的服务器时间为准
      • DAY:天分区,分区示例:pt=20200101
      • HOUR:小时分区,分区示例:pt=2020010110
      • MINUTE:分钟分区,分区示例:pt=202001011027
    • 必选:否
    • 字段类型:string
    • 默认值:DAY
  • write-mode

    • 描述:HDFS Sink写入前数据清理处理模式:
      • append:追加
      • overwrite:覆盖
    • 注意:overwrite模式时会删除hdfs当前目录下的所有文件
    • 必选:否
    • 字段类型:string
    • 默认值:append
  • file-type

    • 描述:文件的类型,目前只支持用户配置为textorcparquet
      • text:textfile文件格式
      • orc:orcfile文件格式
      • parquet:parquet文件格式
    • 必选:是
    • 参数类型:string
    • 默认值:无
  • default-fs

    • 描述:Hadoop hdfs文件系统namenode节点地址。格式:hdfs://ip:端口;例如:hdfs://127.0.0.1:9000
    • 必选:是
    • 参数类型:string
    • 默认值:无
  • hadoopConfig

    • 描述:集群HA模式时需要填写的core-site.xml及hdfs-site.xml中的配置,开启kerberos时包含kerberos相关配置
    • 必选:否
    • 配置方式:'properties.key' = 'value',key为hadoopConfig中的key,value为hadoopConfig中的value,如下所示:
'properties.hadoop.user.name' = 'root',
'properties.dfs.ha.namenodes.ns' = 'nn1,nn2',
'properties.fs.defaultFS' = 'hdfs://ns',
'properties.dfs.namenode.rpc-address.ns.nn2' = 'ip:9000',
'properties.dfs.client.failover.proxy.provider.ns' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
'properties.dfs.namenode.rpc-address.ns.nn1' = 'ip:9000',
'properties.dfs.nameservices' = 'ns',
'properties.fs.hdfs.impl.disable.cache' = 'true',
'properties.fs.hdfs.impl' = 'org.apache.hadoop.hdfs.DistributedFileSystem'
  • field-delimiter

    • 描述:fileTypetext时字段的分隔符
    • 必选:否
    • 参数类型:string
    • 默认值:\001
  • compress

    • 描述:hdfs文件压缩类型
      • text:支持GZIPBZIP2格式
      • orc:支持SNAPPYGZIPBZIPLZ4格式
      • parquet:支持SNAPPYGZIPLZO格式
    • 注意:SNAPPY格式需要用户安装SnappyCodec
    • 必选:否
    • 字段类型:string
    • 默认值:
      • text 默认不进行压缩
      • orc 默认为ZLIB格式
      • parquet 默认为SNAPPY格式
  • max-file-size

    • 描述:写入hdfs单个文件最大大小,单位字节
    • 必选:否
    • 字段类型:long
    • 默认值:1073741824(1G)
  • next-check-rows

    • 描述:下一次检查文件大小的间隔条数,每达到该条数时会查询当前写入文件的文件大小
    • 必选:否
    • 字段类型:long
    • 默认值:5000
  • enable-dictionary

    • 描述:fileTypeparquet时,是否启动字典编码
    • 必须:否
    • 字段类型:boolean
    • 默认值:true
  • encoding

    • 描述:fileTypetext时字段的字符编码
    • 必选:否
    • 字段类型:string
    • 默认值:UTF-8
  • table-name

    • 描述:Hive表名
    • 必选:是
    • 字段类型:string
    • 默认值:无

五、数据类型

支持BOOLEAN、TINYINT、SMALLINT、INT、BIGINT、FLOAT、DOUBLE、DECIMAL、STRING、VARCHAR、CHAR、TIMESTAMP、DATE、BINARY
暂不支持ARRAY、MAP、STRUCT、UNION

六、脚本示例

-- {"id":1,"col_bit":true,"col_tinyint":127,"col_smallint":32767,"col_int":2147483647,"col_bigint":1,"col_float":1.1,"col_double":1.1,"col_decimal":1,"col_string":"string","col_varchar":"varchar","col_char":"char","col_timestamp":"2020-07-30 10:08:22","col_date":"2020-07-30"}

CREATE TABLE source
(
id bigint,
col_bit boolean,
col_tinyint tinyint,
col_smallint smallint,
col_int int,
col_bigint bigint,
col_float float,
col_double double,
col_decimal decimal(10, 0),
col_string char(10),
col_varchar varchar(10),
col_char char(10),
-- col_binary binary,
col_timestamp timestamp,
col_date date
) WITH (
'connector' = 'kafka-x'
,'topic' = 'tudou'
,'properties.bootstrap.servers' = 'ip:9092'
,'scan.startup.mode' = 'latest-offset'
,'format' = 'json'
,'json.timestamp-format.standard' = 'SQL'
);

CREATE TABLE sink
(
id bigint,
col_boolean boolean,
col_tinyint tinyint,
col_smallint smallint,
col_int int,
col_bigint bigint,
col_float float,
col_double double,
col_decimal decimal(10, 0),
col_string char(10),
col_varchar varchar(10),
col_char char(10),
-- col_binary binary,
col_timestamp timestamp,
col_date date
) WITH (
'connector' = 'hive-x'
,'properties.hadoop.user.name' = 'root'
,'properties.dfs.ha.namenodes.ns' = 'nn1,nn2'
,'properties.fs.defaultFS' = 'hdfs://ns'
,'properties.dfs.namenode.rpc-address.ns.nn2' = 'ip:9000'
,'properties.dfs.client.failover.proxy.provider.ns' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
,'properties.dfs.namenode.rpc-address.ns.nn1' = 'ip:9000'
,'properties.dfs.nameservices' = 'ns'
,'properties.fs.hdfs.impl.disable.cache' = 'true'
,'properties.fs.hdfs.impl' = 'org.apache.hadoop.hdfs.DistributedFileSystem'
,'default-fs' = 'hdfs://ns'
,'field-delimiter' = ','
,'encoding' = 'utf-8'
,'max-file-size' = '10485760'
,'next-check-rows' = '20000'
,'write-mode' = 'overwrite'
,'file-type' = 'parquet'

,'url' = 'jdbc:hive2://ip:10000/tudou'
,'username' = ''
,'password' = ''
,'partition' = 'pt'
,'partition-type' = 'DAY'
,'table-name' = 'kudu'
);

insert into sink
select *
from source u;