Clickhouse Lookup
一、介绍
clickhouse维表,支持全量和异步方式全量缓存:将维表数据全部加载到内存中,建议数据量不大使用。
异步缓存:使用异步方式查询数据,并将查询到的数据使用lru缓存到内存中,建议数据量大使用。
二、支持版本
ClickHouse 19.x及以上
三、插件名称
sql | clickhouse-x |
---|
四、参数说明
connector
- 描述:clickhouse-x
- 必选:是
- 字段类型:String
- 默认值:无
url
- 描述:clickhouse jdbc url
- 必选:是
- 字段类型:String
- 默认值:无
- table-name
- 描述:表名
- 必选:是
- 字段类型:String
- 默认值:无
- username
- 描述:用户名
- 必选:是
- 字段类型:String
- 默认值:无
- password
- 描述:密码
- 必选:是
- 字段类型:String
- 默认值:无
- password
- 描述:密码
- 必选:是
- 字段类型:String
- 默认值:无
- lookup.cache-type
- 描述:维表缓存类型(NONE、LRU、ALL),默认LRU
- 必选:否
- 字段类型:String
- 默认值:LRU
- lookup.cache-period
- 描述:ALL维表每隔多久加载一次数据,默认3600000毫秒(一个小时)
- 必选:否
- 字段类型:String
- 默认值:3600000
- lookup.cache.max-rows
- 描述:lru维表缓存数据的条数
- 必选:否
- 字段类型:String
- 默认值:10000
- lookup.cache.ttl
- 描述:lru维表缓存数据的时间
- 必选:否
- 字段类型:String
- 默认值:60000
- lookup.fetch-size
- 描述:ALL维表每次从数据库加载的条数
- 必选:否
- 字段类型:String
- 默认值:1000
- lookup.parallelism
- 描述:维表并行度
- 必选:否
- 字段类型:String
- 默认值:无
五、数据类型
支持 | BOOLEAN |
---|---|
TINYINT | |
SMALLINT | |
INT | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL | |
STRING | |
VARCHAR | |
CHAR | |
TIMESTAMP | |
DATE | |
BINARY | |
NULL | |
暂不支持 | ARRAY |
MAP | |
STRUCT | |
UNION |
六、脚本示例
CREATE TABLE source (
id INT,
name STRING
) WITH (
'connector' = 'kafka-x',
'topic' = 'test',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'dodge',
'format' = 'json'
);
CREATE TABLE side (
id INT,
name VARCHAR,
create_time TIMESTAMP,
test1 SMALLINT,
test2 BIGINT,
afloat FLOAT,
afloat2 DOUBLE,
is_delete TINYINT,
create_date DATE
) WITH (
'connector' = 'clickhouse-x',
'url' = 'jdbc:clickhouse://localhost:8123/default',
'table-name' = 'sql_side_table',
'username' = 'default',
'password' = 'b6rCe7ZV',
'lookup.cache-type' = 'lru'
);
CREATE TABLE sink (
id INT,
name VARCHAR,
create_time TIMESTAMP,
test1 SMALLINT,
test2 BIGINT,
afloat FLOAT,
afloat2 DOUBLE,
is_delete TINYINT,
create_date DATE
) WITH (
'connector' = 'clickhouse-x',
'url' = 'jdbc:clickhouse://localhost:8123/default',
'table-name' = 'sql_sink_table',
'username' = 'default',
'password' = 'b6rCe7ZV',
'sink.buffer-flush.max-rows' = '1',
'sink.all-replace' = 'true'
);
INSERT INTO sink
SELECT
s1.id AS id,
s1.name AS name,
s2.create_time AS create_time,
s2.test1 AS test1,
s2.test2 AS test2,
s2.afloat AS afloat,
s2.afloat2 AS afloat2,
s2.is_delete AS is_delete,
s2.create_date AS create_date
FROM source s1
JOIN side s2
ON s1.id = s2.id