Skip to main content

结果表

一、介绍

ElasticSearch Sink插件支持将数据写入到指定的index中。 ​

二、支持版本

Elasticsearch 7.x ​

三、插件名称

类型名称
SQLelasticsearch7-x

四、参数说明

1、SQL

  • hosts
    • 描述:Elasticsearch集群的连接地址。eg: "localhost:9200", 多地址用分号分隔符隔开。
    • 必选:是
    • 参数类型:List
    • 默认值:无
  • index
    • 描述:指定访问Elasticsearch集群的index名称
    • 必选:是
    • 参数类型:String
    • 默认值:无
  • username
    • 描述:开启basic认证之后的用户名
    • 必须:否
    • 参数类型:String
    • 默认值:无
  • password
    • 描述:开启basic认证之后的密码
    • 必须:否
    • 参数类型:String
    • 默认值:无
  • sink.bulk-flush.max-actions
    • 描述:批量写入es数据的条数
    • 必须:否
    • 参数类型:Integer
    • 默认值:1
  • document-id.key-delimiter
    • 描述:文档id之间的分隔符号,eg:“${col1}_${col2}”
    • 必须:否
    • 参数类型:String
    • 默认值:"_"
  • client.connect-timeout
    • 描述:ES Client最大连接超时时间。
    • 必须:否
    • 参数类型:Integer
    • 默认值:5000
  • client.socket-timeout
    • 描述:ES Client最大socket超时时间。
    • 必须:否
    • 参数类型:Integer
    • 默认值:1800000
  • client.keep-alive-time
    • 描述:ES Client会话最大保持时间。
    • 必须:否
    • 参数类型:Integer
    • 默认值:5000
  • client.request-timeout
    • 描述:ES Client最大请求超时时间。
    • 必须:否
    • 参数类型:Integer
    • 默认值:2000
  • client.max-connection-per-route
    • 描述:每一个路由值的最大连接数量
    • 必须:否
    • 参数类型:Integer
    • 默认值:10

五、数据类型

是否支持类型名称
支持INTEGER,SMALLINT,DECIMAL,TIMESTAM DOUBLE,FLOAT,DATE,VARCHAR,VARCHAR,TIMESTAMP,TIME,BYTE
不支持IP,binary, nested, object

六、脚本示例

-- curl -XPOST 'localhost:9200/teachers/1' -H "Content-Type: application/json" -d '{
-- "id": "100",
-- "phone": "2345678765",
-- "qq": "7576457",
-- "wechat": "这是wechat",
-- "income": "1324.13",
-- "birthday": "2020-07-30 10:08:22",
-- "today": "2020-07-30",
-- "timecurrent": "12:08:22"
-- }'
ADD FILE WITH /data/sftp/ssl_DsCenter_17559/ca.crt;
CREATE TABLE es7_source
(
id int,
name varchar,
age varchar,
text varchar
)
WITH (
'connector' = 'elasticsearch7-x',
'hosts' ='127.0.0.1:9200',
'username' = 'elastic',
'password' = 'elastic',
'index' = 'today_001',
'client.connect-timeout' = '10000',
'security.ssl-keystore-file'='ca.crt',
'security.ssl-keystore-password'='',
'security.ssl-type'='ca'
);


CREATE TABLE es7_sink
(
id int,
name varchar,
age varchar,
text varchar
)
WITH (
'connector' = 'elasticsearch7-x',
'hosts' = '127.0.0.1:9200',
'username' = 'elastic',
'password' = 'elastic',
'index' = 'students_4',
'client.connect-timeout' = '10000',
'security.ssl-keystore-file'='ca.crt',
'security.ssl-keystore-password'='',
'security.ssl-type'='ca'
);


INSERT INTO es7_sink
SELECT *
FROM es7_source;