Skip to main content

StarRocks Lookup

一、介绍

StarRocks维表,支持全量和异步方式。
全量缓存:将维表数据全部加载到内存中,建议数据量不大使用。
异步缓存:使用异步方式查询数据,并将查询到的数据使用lru缓存到内存中,建议数据量大使用。

二、支持版本

StarRocks 2.x

三、插件名称

SQLstarrocks-x

四、参数说明

  • url
    • 描述:StarRocks JdbcUrl
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • table-name
    • 描述:表名
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • schema-name
    • 描述:schema
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • username
    • 描述:用户名
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • password
    • 描述:密码
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • maxRetries
    • 描述:be获取连接、读取数据的重试次数
    • 必选:否
    • 字段类型:int
    • 默认值:3

  • scan.be.client.timeout
    • 描述:与be通信允许的超时毫秒值
    • 必选:否
    • 参数类型:int
    • 默认值:3000

  • scan.be.client.keep-live-min
    • 描述:be连接存活时间
    • 必选:否
    • 参数类型:int

  • scan.be.query.timeout-s
    • 描述:be查询超时时长
    • 必选:否
    • 参数类型:int
    • 默认值:600

  • scan.be.fetch-rows
    • 描述:be批量读取的数据条数
    • 必选:否
    • 参数类型:int
    • 默认值:1024

  • scan.be.fetch-bytes-limit

    • 描述:be批量读取的数据最大byte值
    • 必选:否
    • 参数类型:long
    • 默认值:1073741824
  • scan.be.param.properties

    • 描述:连接be的其他可配置参数
    • 必选:否
    • 参数类型:map
    • 默认值:无
  • scan.be.host-mapping

    • 描述:be节点host映射
    • 必选:否
    • 参数类型:list
    • 默认值:无
  • lookup.cache-period

    • 描述:ALL维表每隔多久加载一次数据,默认3600000毫秒(一个小时)
    • 必选:否
    • 参数类型:string
    • 默认值:3600000

  • lookup.cache.max-rows
    • 描述:lru维表缓存数据的条数,默认10000条
    • 必选:否
    • 参数类型:string
    • 默认值:10000

  • lookup.cache.ttl
    • 描述:lru维表缓存数据的时间,默认60000毫秒(一分钟)
    • 必选:否
    • 参数类型:string
    • 默认值:60000

  • lookup.max-retries

    • 描述:LRU维表查找数据库失败时的最大重试次数
    • 必选:否
    • 参数类型:int
    • 默认值:3
  • lookup.error-limit

    • 描述:LRU维表发生超时、ALL Cache维表发送数据失败次数的容忍值
    • 必选:否
    • 参数类型:int
    • 默认值:0
  • lookup.fetch-size

    • 描述:ALL维表每次从数据库加载的条数,默认1000条
    • 必选:否
    • 参数类型:string
    • 默认值:1000

  • lookup.parallelism
    • 描述:维表并行度
    • 必选:否
    • 参数类型:string
    • 默认值:无

五、数据类型

Flink typeStarRocks typeFlinkx Column
BOOLEANBOOLEANBooleanColumn
TINYINTTINYINTByteColumn
SMALLINTSMALLINTBigDecimalColumn
INTEGERINTEGERBigDecimalColumn
BIGINTBIGINTBigDecimalColumn
FLOATFLOATBigDecimalColumn
DOUBLEDOUBLEBigDecimalColumn
DECIMALDECIMALBigDecimalColumn
BINARYINTEGERBigDecimalColumn
CHARSTRINGStringColumn
VARCHARSTRINGStringColumn
STRINGSTRINGStringColumn
STRINGLARGEINTStringColumn
DATEDATESqlDateColumn
TIMESTAMP_WITHOUT_TIME_ZONE(N)DATETIMETimestampColumn
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)DATETIMETimestampColumn
ARRAY< T >ARRAY< T >暂不支持
MAP< KT,VT >JSON STRING暂不支持
ROW< arg T... >JSON STRING暂不支持

六、脚本示例

CREATE TABLE table1
(
id int,
name string,
date_data date,
datetime_data timestamp(0),
proctime AS PROCTIME()
) with (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true',
'schema-name' = 'test',
'table-name' = 'source',
'username' = 'root',
'password' = 'root'
);

CREATE TABLE side
(
id int,
date_data date,
datetime_data timestamp(0)
) with (
'connector' = 'starrocks-x',
'url' = 'jdbc:mysql://172.16.82.221:9030',
'feNodes' = '172.16.82.221:8030;172.16.82.68:8030;172.16.82.151:8030',
'schema-name' = 'liuliu_test',
'table-name' = 'type_test',
'lookup.cache-type' = 'ALL',
'username' = 'root',
'password' = ''
);

CREATE TABLE sink
(
id int PRIMARY KEY,
date_data1 date ,
date_data2 date
) WITH (
'connector' = 'stream-x'
);

insert into sink
select u.id
, u.date_data
, s.date_data
from table1 u
left join side FOR SYSTEM_TIME AS OF u.proctime AS s
on u.id = s.id and u.datetime_data=s.datetime_data and u.date_data = s.date_data;