StarRocks Sink
一、介绍
StarRocks Sink插件使用stream-load以json格式向数据库写入数据
二、支持版本
StarRocks 2.x
三、插件名称
SQL | starrocks-x |
---|
四、插件参数
1.SQL
- url
- 描述:StarRocks JdbcUrl
- 必选:是
- 字段类型:String
- 默认值:无
- table-name
- 描述:表名
- 必选:是
- 字段类型:String
- 默认值:无
- schema-name
- 描述:schema
- 必选:是
- 字段类型:String
- 默认值:无
- username
- 描述:用户名
- 必选:是
- 字段类型:String
- 默认值:无
- password
- 描述:密码
- 必选:是
- 字段类型:String
- 默认值:无
- feNodes
- 描述:StarRocks FrontendEngine地址
- 必选:是
- 参数类型:String
- 默认值:无
- nameMapped
- 描述:配置此选项为true后,schema-name和table-name配置失效。将从上游来的数据中获取这两项值,可利用此配置实现多表写入
- 必选:否
- 参数类型:boolean
- 默认值:false
- maxRetries
- 描述:stream-load写数据失败次数
- 必选:否
- 参数类型:int
- 默认值:3
- batchSize
- 描述:写入内部缓存的数据批大小,不代表一次写入StarRocks的数据量
- 必选:否
- 参数类型:
- 默认值:1024
- sink.batch.max-rows
- 描述:以schema+table为单位的批量写入StarRocks的最大条数
- 必选:否
- 参数类型:long
- 默认值:200000L
sink.batch.max-bytes
- 描述:以schema+table为单位的批量写入StarRocks的最大byte
- 必选:否
- 参数类型:long
- 默认值:2147483648L
http.check.timeout
- 描述:检查FE节点连通性时,允许的超时时长毫秒值
- 必选:否
- 参数类型:int
- 默认值:10000
- queue.offer.timeout
- 描述:数据写入内部缓冲队列允许的超时时长毫秒值
- 必选:否
- 参数类型:int
- 默认值:60000
- queue.poll.timeout
- 描述:从内部缓冲队列读取数据允许的超时时长毫秒值
- 必选:否
- 参数类型:int
- 默认值:60000
- stream-load.head.properties
- 描述:自选的stream-load的http请求头配置
- 必选:否
- 参数类型:map
- 默认值:无
- sink.parallelism
- 描述:写入结果的并行度
- 必选:否
- 参数类型:String
- 默认值:无
五、数据类型
Flink type | StarRocks type | Flinkx Column |
---|---|---|
BOOLEAN | BOOLEAN | BooleanColumn |
TINYINT | TINYINT | ByteColumn |
SMALLINT | SMALLINT | BigDecimalColumn |
INTEGER | INTEGER | BigDecimalColumn |
BIGINT | BIGINT | BigDecimalColumn |
FLOAT | FLOAT | BigDecimalColumn |
DOUBLE | DOUBLE | BigDecimalColumn |
DECIMAL | DECIMAL | BigDecimalColumn |
BINARY | INTEGER | BigDecimalColumn |
CHAR | STRING | StringColumn |
VARCHAR | STRING | StringColumn |
STRING | STRING | StringColumn |
STRING | LARGEINT | StringColumn |
DATE | DATE | SqlDateColumn |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME | TimestampColumn |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME | TimestampColumn |
ARRAY< T > | ARRAY< T > | 暂不支持 |
MAP< KT,VT > | JSON STRING | 暂不支持 |
ROW< arg T... > | JSON STRING | 暂不支持 |
六、脚本示例
CREATE TABLE source
(
id int,
boolean_data boolean,
tinyint_data tinyint,
smallint_data smallint,
integer_data integer,
bigint_data bigint,
float_data float,
double_data double,
decimal_data decimal,
string_data string,
date_data date,
datetime_data timestamp(0)
) with (
'connector' = 'starrocks-x',
'url' = 'jdbc:mysql://172.16.82.221:9030',
'feNodes' = '172.16.82.221:8030;172.16.82.68:8030;172.16.82.151:8030',
'schema-name' = 'liuliu_test',
'table-name' = 'type_test',
'username' = 'root',
'password' = ''
);
CREATE TABLE sink
(
id int,
boolean_data boolean,
tinyint_data tinyint,
smallint_data smallint,
integer_data integer,
bigint_data bigint,
float_data float,
double_data double,
decimal_data decimal,
string_data string,
date_data date,
datetime_data timestamp(0)
) with (
'connector' = 'starrocks-x',
'url' = 'jdbc:mysql://172.16.82.221:9030',
'feNodes' = '172.16.82.221:8030;172.16.82.68:8030;172.16.82.151:8030',
'schema-name' = 'liuliu_test',
'table-name' = 'type_test_2',
'username' = 'root',
'password' = ''
);
insert into sink
select *
from source;