Skip to main content

SqlServer Sink

一、介绍

SqlServer Sink插件支持向SqlServer数据库写入数据

二、支持版本

Microsoft SQL Server 2012及以上

三、插件名称

SQLsqlserver-x

四、插件参数

1、SQL

  • connector
    • 描述:connector type
    • 必选:是
    • 字段类型:String
    • 值:sqlserver-x

  • url
    • 描述:使用使用开源的jtds驱动连接 而非Microsoft的官方驱动
    • 必选:是
    • 字段类型:String
    • 默认值:无


  • table-name
    • 描述:表名
    • 必选:是
    • 字段类型:String
    • 默认值:无


  • schema-name
    • 描述:schema
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • username
    • 描述:用户名
    • 必选:是
    • 字段类型:String
    • 默认值:无


  • password
    • 描述:密码
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • sink.buffer-flush.max-rows
    • 描述:批量写数据条数,单位:条
    • 必选:否
    • 参数类型:String
    • 默认值:1024

  • sink.buffer-flush.interval
    • 描述:批量写时间间隔,单位:毫秒
    • 必选:否
    • 参数类型:String
    • 默认值:10000

  • sink.all-replace
    • 描述:是否全部替换数据库中的数据(如果数据库中原值不为null,新值为null,如果为true则会替换为null)
    • 必选:否
    • 参数类型:String
    • 默认值:false

  • sink.semantic

    • 描述:sink端是否支持二阶段提交
    • 注意:
      • 如果此参数为空,默认不开启二阶段提交,即sink端不支持exactly_once语义;
      • 当前只支持exactly-once 和at-least-once
    • 必选:否
    • 参数类型:String
      • 示例:"semantic": "exactly-once"
    • 默认值:at-least-once
  • sink.parallelism

    • 描述:写入结果的并行度
    • 必选:否
    • 参数类型:String
    • 默认值:无

五、数据类型

支持BIT、INT、SMALLINT、TINYINT、BIGINT、INT IDENTITY、REAL、FLOAT、DECIMAL、NUMERIC、CHAR、VARCHAR、VARCHAR(MAX)、TEXT、XML、NCHAR、NVARCHAR、NVARCHAR(MAX)、NTEXT、TIME、DATE、DATETIME、DATETIME2、SMALLDATETIME、DATETIMEOFFSET、TIMESTAMP、BINARY、VARBINARY、IMAGE、MONEY、SMALLMONEY、UNIQUEIDENTIFIER
暂不支持CURSOR、ROWVERSION、HIERARCHYID、SQL_VARIANT、SPATIAL GEOMETRY TYPE、SPATIAL GEOGRAPHY TYPE、TABLE

六、脚本示例

CREATE TABLE source
(
id bigint,
col_bit BOOLEAN,
col_tinyint tinyint,
col_smallint smallint,
col_int int,
col_real float,
col_float double,
col_decimal decimal(10, 3),
col_numric decimal(10, 3),
col_char char(10),
col_varchar varchar(255),
col_varcharmax string,
col_date date,
col_time string,
col_varbinary varbinary
)with(
'connector'='sqlserver-x',
'username'='username',
'password'='password',
'url' = 'jdbc:jtds:sqlserver://127.0.0.1:1433;databaseName=db_test;useLOBs=false',
'schema-name'='schema',
'table-name'='table'
);

CREATE TABLE sink
(
id bigint,
col_bit BOOLEAN,
col_tinyint tinyint,
col_smallint smallint,
col_int int,
col_real float,
col_float double,
col_decimal decimal(10, 3),
col_numric decimal(10, 3),
col_char char(10),
col_varchar varchar(255),
col_varcharmax string,
col_date date,
col_time string,
col_varbinary varbinary

)with(
'connector'='sqlserver-x',
'username'='username',
'password'='password',
'url' = 'jdbc:jtds:sqlserver://127.0.0.1:1433;databaseName=db_test;useLOBs=false',
'schema-name'='schema',
'table-name'='table'
);
insert into sink
select *
from source;