Skip to main content

StarRocks Sink

一、介绍

StarRocks Sink插件使用stream-load以json格式向数据库写入数据

二、支持版本

StarRocks 2.x

三、插件名称

SQLstarrocks-x

四、插件参数

1.SQL

  • url
    • 描述:StarRocks JdbcUrl
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • table-name
    • 描述:表名
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • schema-name
    • 描述:schema
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • username
    • 描述:用户名
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • password
    • 描述:密码
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • feNodes
    • 描述:StarRocks FrontendEngine地址
    • 必选:是
    • 参数类型:String
    • 默认值:无

  • nameMapped
    • 描述:配置此选项为true后,schema-name和table-name配置失效。将从上游来的数据中获取这两项值,可利用此配置实现多表写入
    • 必选:否
    • 参数类型:boolean
    • 默认值:false

  • maxRetries
    • 描述:stream-load写数据失败次数
    • 必选:否
    • 参数类型:int
    • 默认值:3

  • batchSize
    • 描述:写入内部缓存的数据批大小,不代表一次写入StarRocks的数据量
    • 必选:否
    • 参数类型:
    • 默认值:1024

  • sink.batch.max-rows
    • 描述:以schema+table为单位的批量写入StarRocks的最大条数
    • 必选:否
    • 参数类型:long
    • 默认值:200000L

  • sink.batch.max-bytes

    • 描述:以schema+table为单位的批量写入StarRocks的最大byte
    • 必选:否
    • 参数类型:long
    • 默认值:2147483648L

  • http.check.timeout

    • 描述:检查FE节点连通性时,允许的超时时长毫秒值
    • 必选:否
    • 参数类型:int
    • 默认值:10000

  • queue.offer.timeout
    • 描述:数据写入内部缓冲队列允许的超时时长毫秒值
    • 必选:否
    • 参数类型:int
    • 默认值:60000

  • queue.poll.timeout
    • 描述:从内部缓冲队列读取数据允许的超时时长毫秒值
    • 必选:否
    • 参数类型:int
    • 默认值:60000

  • stream-load.head.properties
    • 描述:自选的stream-load的http请求头配置
    • 必选:否
    • 参数类型:map
    • 默认值:无

  • sink.parallelism
    • 描述:写入结果的并行度
    • 必选:否
    • 参数类型:String
    • 默认值:无

五、数据类型

Flink typeStarRocks typeFlinkx Column
BOOLEANBOOLEANBooleanColumn
TINYINTTINYINTByteColumn
SMALLINTSMALLINTBigDecimalColumn
INTEGERINTEGERBigDecimalColumn
BIGINTBIGINTBigDecimalColumn
FLOATFLOATBigDecimalColumn
DOUBLEDOUBLEBigDecimalColumn
DECIMALDECIMALBigDecimalColumn
BINARYINTEGERBigDecimalColumn
CHARSTRINGStringColumn
VARCHARSTRINGStringColumn
STRINGSTRINGStringColumn
STRINGLARGEINTStringColumn
DATEDATESqlDateColumn
TIMESTAMP_WITHOUT_TIME_ZONE(N)DATETIMETimestampColumn
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)DATETIMETimestampColumn
ARRAY< T >ARRAY< T >暂不支持
MAP< KT,VT >JSON STRING暂不支持
ROW< arg T... >JSON STRING暂不支持

六、脚本示例

CREATE TABLE source
(
id int,
boolean_data boolean,
tinyint_data tinyint,
smallint_data smallint,
integer_data integer,
bigint_data bigint,
float_data float,
double_data double,
decimal_data decimal,
string_data string,
date_data date,
datetime_data timestamp(0)
) with (
'connector' = 'starrocks-x',
'url' = 'jdbc:mysql://172.16.82.221:9030',
'feNodes' = '172.16.82.221:8030;172.16.82.68:8030;172.16.82.151:8030',
'schema-name' = 'liuliu_test',
'table-name' = 'type_test',
'username' = 'root',
'password' = ''
);


CREATE TABLE sink
(
id int,
boolean_data boolean,
tinyint_data tinyint,
smallint_data smallint,
integer_data integer,
bigint_data bigint,
float_data float,
double_data double,
decimal_data decimal,
string_data string,
date_data date,
datetime_data timestamp(0)
) with (
'connector' = 'starrocks-x',
'url' = 'jdbc:mysql://172.16.82.221:9030',
'feNodes' = '172.16.82.221:8030;172.16.82.68:8030;172.16.82.151:8030',
'schema-name' = 'liuliu_test',
'table-name' = 'type_test_2',
'username' = 'root',
'password' = ''
);

insert into sink
select *
from source;