Skip to main content

Clickhouse Sink

一、介绍

clickhouse sink

二、支持版本

ClickHouse 19.x及以上

三、插件名称

SQLclickhouse-x

四、参数说明

1、sql

  • connector
    • 描述:clickhouse-x
    • 必选:是
    • 字段类型:String
    • 默认值:无


  • url
    • 描述:clickhouse jdbc url
    • 必选:是
    • 字段类型:String
    • 默认值:无


  • table-name
    • 描述:表名
    • 必选:是
    • 字段类型:String
    • 默认值:无


  • username
    • 描述:用户名
    • 必选:是
    • 字段类型:String
    • 默认值:无


  • password
    • 描述:密码
    • 必选:是
    • 字段类型:String
    • 默认值:无


  • password
    • 描述:密码
    • 必选:是
    • 字段类型:String
    • 默认值:无


  • sink.buffer-flush.max-rows
    • 描述:批量写数据条数,单位:条
    • 必选:否
    • 字段类型:String
    • 默认值:1024


  • sink.buffer-flush.interval
    • 描述:批量写时间间隔,单位:毫秒
    • 必选:否
    • 字段类型:String
    • 默认值:10000


  • sink.all-replace
    • 描述:是否全部替换数据库中的数据(如果数据库中原值不为null,新值为null,如果为true则会替换为null)
    • 必选:否
    • 字段类型:String
    • 默认值:false


  • sink.semantic

    • 描述:sink端是否支持二阶段提交
    • 注意:
      • 如果此参数为空,默认不开启二阶段提交,即sink端不支持exactly_once语义;
      • 当前只支持exactly-once 和at-least-once
    • 必选:否
    • 参数类型:String
      • 示例:"semantic": "exactly-once"
    • 默认值:at-least-once
  • sink.parallelism

    • 描述:写入结果的并行度
    • 必选:否
    • 字段类型:String
    • 默认值:无

五、数据类型

支持BOOLEAN
TINYINT
SMALLINT
INT
BIGINT
FLOAT
DOUBLE
DECIMAL
STRING
VARCHAR
CHAR
TIMESTAMP
DATE
BINARY
NULL
暂不支持ARRAY
MAP
STRUCT
UNION

六、脚本示例


CREATE TABLE source (
id INT,
name STRING
) WITH (
'connector' = 'kafka-x',
'topic' = 'test',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'dodge',
'format' = 'json'
);

CREATE TABLE side (
id INT,
name VARCHAR,
create_time TIMESTAMP,
test1 SMALLINT,
test2 BIGINT,
afloat FLOAT,
afloat2 DOUBLE,
is_delete TINYINT,
create_date DATE
) WITH (
'connector' = 'clickhouse-x',
'url' = 'jdbc:clickhouse://localhost:8123/default',
'table-name' = 'sql_side_table',
'username' = 'default',
'password' = 'b6rCe7ZV',
'lookup.cache-type' = 'lru'
);

CREATE TABLE sink (
id INT,
name VARCHAR,
create_time TIMESTAMP,
test1 SMALLINT,
test2 BIGINT,
afloat FLOAT,
afloat2 DOUBLE,
is_delete TINYINT,
create_date DATE
) WITH (
'connector' = 'clickhouse-x',
'url' = 'jdbc:clickhouse://localhost:8123/default',
'table-name' = 'sql_sink_table',
'username' = 'default',
'password' = 'b6rCe7ZV',
'sink.buffer-flush.max-rows' = '1',
'sink.all-replace' = 'true'
);

INSERT INTO sink
SELECT
s1.id AS id,
s1.name AS name,
s2.create_time AS create_time,
s2.test1 AS test1,
s2.test2 AS test2,
s2.afloat AS afloat,
s2.afloat2 AS afloat2,
s2.is_delete AS is_delete,
s2.create_date AS create_date
FROM source s1
JOIN side s2
ON s1.id = s2.id