Cassandra Sink
一、介绍
Cassandra sink
二、支持版本
主流版本
三、插件名称
SQL | Cassandra-x |
---|
四、参数说明
1、SQL
host
- 描述:Cassandra的IP地址,多个地址之间用逗号隔开
- 必选:是
- 参数类型:string
- 默认值:无
port
- 描述:Cassandra连接端口
- 必选:否
- 参数类型:int
- 默认值:9042
table-name
- 描述:要读取Cassandra表名
- 必选:是
- 参数类型:string
- 默认值:无
keyspaces
- 描述:Cassandra keyspaces
- 必选:否
- 参数类型:string
- 默认值:无
clusterName
- 描述:Cassandra cluster name
- 必选:否
- 参数类型:string
- 默认值:flinkx-cluster
consistency
- 描述:Cassandra consistence(一致性)
- 说明:一致性级别决定了副本中必须有多少节点响应协调器节点才能成功处理非轻量级事务。
- 必选:否
- 参数类型:string
- 默认值:LOCAL_QUORUM
coreConnectionsPerHost
- 描述:Cassandra 每个地址可供最多可用连接数
- 必选:否
- 参数类型:int
- 默认值:8
maxConnectionsPerHost
- 描述:Cassandra 每个地址可供最多可连接数
- 必选:否
- 参数类型:int
- 默认值:32768
maxRequestsPerConnection
- 描述:Cassandra 每个连接的最多请求数
- 必选:否
- 参数类型:int
- 默认值:1
maxQueueSize
- 描述:Cassandra 队列最大数
- 必选:否
- 参数类型:int
- 默认值:10000
readTimeoutMillis
- 描述:Cassandra read 超时时长
- 必选:否
- 参数类型:int
- 默认值:60 * 1000
poolTimeoutMillis
- 描述:Cassandra pool 超时时长
- 必选:否
- 参数类型:int
- 默认值:60 * 1000
connectTimeoutMillis
- 描述:Cassandra connect 超时时长
- 必选:否
- 参数类型:int
- 默认值:60 * 1000
sink.parallelism
- 描述:sink并行度
- 必选:否
- 参数类型:string
- 默认值:无
五、数据类型
支持 | BYTE、INT、FLOAT、DOUBLE、BOOLEAN、TEXT、VARCHAR、DECIMAL、TIMESTAMP |
---|---|
暂不支持 | ARRAY、MAP、STRUCT、UNION |
六、脚本示例
CREATE TABLE source_one
(
id int,
name varchar,
birth timestamp,
todayTime time,
todayDate date,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'cassandra-x',
'host' = 'ip1,ip2,ip3',
'port' = '9042',
'hostDistance' = 'local',
'user-name' = 'cassandra',
'password' = 'xxxxxxxx',
'table-name' = 'one',
'keyspaces' = 'tiezhu',
);
CREATE TABLE side_one
(
id int,
name varchar,
birth timestamp,
todayTime time,
todayDate date,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'cassandra-x',
'host' = 'ip1,ip2,ip3',
'port' = '9042',
'hostDistance' = 'local',
'user-name' = 'cassandra',
'password' = 'xxxxxxxx',
'table-name' = 'one',
'keyspaces' = 'tiezhu',
'lookup.cache-type' = 'all'
);
CREATE TABLE sink_one
(
id int,
name varchar,
birth timestamp,
todayTime time,
todayDate date
) WITH (
'connector' = 'cassandra-x',
'host' = 'ip1,ip2,ip3',
'port' = '9042',
'hostDistance' = 'local',
'user-name' = 'cassandra',
'password' = 'xxxxxxxx',
'table-name' = 'two',
'keyspaces' = 'tiezhu',
);
CREATE VIEW view_out AS
SELECT u.id AS id,
u.name AS name,
s.birth AS birth,
u.todayTime AS todayTime,
s.todayDate AS todayDate
FROM source_one s
JOIN side_one FOR SYSTEM_TIME AS OF s.PROCTIME AS u
ON s.id = u.id;
INSERT INTO sink_one
SELECT *
FROM view_out;