Skip to main content

Cassandra Sink

一、介绍

Cassandra sink

二、支持版本

主流版本

三、插件名称

SQLCassandra-x

四、参数说明

1、SQL

  • host

    • 描述:Cassandra的IP地址,多个地址之间用逗号隔开
    • 必选:是
    • 参数类型:string
    • 默认值:无
  • port

    • 描述:Cassandra连接端口
    • 必选:否
    • 参数类型:int
    • 默认值:9042
  • table-name

    • 描述:要读取Cassandra表名
    • 必选:是
    • 参数类型:string
    • 默认值:无
  • keyspaces

    • 描述:Cassandra keyspaces
    • 必选:否
    • 参数类型:string
    • 默认值:无
  • clusterName

    • 描述:Cassandra cluster name
    • 必选:否
    • 参数类型:string
    • 默认值:flinkx-cluster
  • consistency

    • 描述:Cassandra consistence(一致性)
    • 说明:一致性级别决定了副本中必须有多少节点响应协调器节点才能成功处理非轻量级事务。
    • 必选:否
    • 参数类型:string
    • 默认值:LOCAL_QUORUM
  • coreConnectionsPerHost

    • 描述:Cassandra 每个地址可供最多可用连接数
    • 必选:否
    • 参数类型:int
    • 默认值:8
  • maxConnectionsPerHost

    • 描述:Cassandra 每个地址可供最多可连接数
    • 必选:否
    • 参数类型:int
    • 默认值:32768
  • maxRequestsPerConnection

    • 描述:Cassandra 每个连接的最多请求数
    • 必选:否
    • 参数类型:int
    • 默认值:1
  • maxQueueSize

    • 描述:Cassandra 队列最大数
    • 必选:否
    • 参数类型:int
    • 默认值:10000
  • readTimeoutMillis

    • 描述:Cassandra read 超时时长
    • 必选:否
    • 参数类型:int
    • 默认值:60 * 1000
  • poolTimeoutMillis

    • 描述:Cassandra pool 超时时长
    • 必选:否
    • 参数类型:int
    • 默认值:60 * 1000
  • connectTimeoutMillis

    • 描述:Cassandra connect 超时时长
    • 必选:否
    • 参数类型:int
    • 默认值:60 * 1000
  • sink.parallelism

    • 描述:sink并行度
    • 必选:否
    • 参数类型:string
    • 默认值:无

五、数据类型

支持BYTE、INT、FLOAT、DOUBLE、BOOLEAN、TEXT、VARCHAR、DECIMAL、TIMESTAMP
暂不支持ARRAY、MAP、STRUCT、UNION

六、脚本示例

CREATE TABLE source_one
(
id int,
name varchar,
birth timestamp,
todayTime time,
todayDate date,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'cassandra-x',
'host' = 'ip1,ip2,ip3',
'port' = '9042',
'hostDistance' = 'local',
'user-name' = 'cassandra',
'password' = 'xxxxxxxx',
'table-name' = 'one',
'keyspaces' = 'tiezhu',
);


CREATE TABLE side_one
(
id int,
name varchar,
birth timestamp,
todayTime time,
todayDate date,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'cassandra-x',
'host' = 'ip1,ip2,ip3',
'port' = '9042',
'hostDistance' = 'local',
'user-name' = 'cassandra',
'password' = 'xxxxxxxx',
'table-name' = 'one',
'keyspaces' = 'tiezhu',
'lookup.cache-type' = 'all'
);

CREATE TABLE sink_one
(
id int,
name varchar,
birth timestamp,
todayTime time,
todayDate date
) WITH (
'connector' = 'cassandra-x',
'host' = 'ip1,ip2,ip3',
'port' = '9042',
'hostDistance' = 'local',
'user-name' = 'cassandra',
'password' = 'xxxxxxxx',
'table-name' = 'two',
'keyspaces' = 'tiezhu',
);

CREATE VIEW view_out AS
SELECT u.id AS id,
u.name AS name,
s.birth AS birth,
u.todayTime AS todayTime,
s.todayDate AS todayDate
FROM source_one s
JOIN side_one FOR SYSTEM_TIME AS OF s.PROCTIME AS u
ON s.id = u.id;


INSERT INTO sink_one
SELECT *
FROM view_out;