Skip to main content

介绍

dm-x支持从达梦数据库读取数据或者写入数据

如何创建一个dm表

CREATE TABLE source
(
id decimal(38,0) ,
t_binary_double double
) WITH (
'connector' = 'dm-x',
'url' = 'jdbc:dm://127.0.0.1:5236',
'schema-name' = 'test'
'table-name' = 'test_source',
'username' = 'SYSDBA',
'password' = 'SYSDBA'
);

连接器参数

连接器支持的参数主要是由基础参数加上对应表角色类型参数,如源表则为基础参数加上源表参数,结果表参数为基础参数加上结果表参数

基础参数

名称是否必须类型描述
connector(none)String插件名称,dm插件需要配置为 dm-x
url(none)String数据源jdbcUrl
table-name(none)String读取的表名
username(none)String数据源用户名
password(none)String数据源密码
schema-name(none)String读取的表所在schema

源表参数

名称是否必须类型描述
scan.parallelism1INT读取并行度
scan.partition.column(none)String多并行度时,根据此字段进行划分每个子任务读取范围,相当于splitPk
scan.partition.strategy(none)String多并行度时,划分子任务的策略,相当于splitStrategy
scan.increment.column(none)String增量字段名称,相当于increColumn
scan.increment.column-type(none)String增量字段类型,相当于increColumnType
scan.start-location(none)String增量任务的起点,相当于startLocation
scan.polling-interval0INT间隔轮询的间隔查询时间,单位毫秒大于0时则认为开启间隔轮询,相当于pollingInterval
scan.query-timeout0INT查询的超时时间
scan.fetch-size1024INTjdbc的FetchSize参数
scan.restore.columnname(none)String断点续传字段名称
scan.restore.columntype(none)String断点续传字段类型

维表参数

名称是否必须类型描述
lookup.cache-typeLRU(默认)ALLString缓存数据策略,LRU数据会在每次触发join时从数据库读取数据,读取的数据通过LRU机制进行数据缓存ALL初始化维表算子的时候会从数据源中查询所有数据到内存里缓存起来
lookup.cache-period3600000INT缓存类型为all时,根据配置的缓存周期进行数据重新加载,单位毫秒
lookup.cache.max-rows1000INT维表缓存数据条数,超过1000条按照LRU进行淘汰
lookup.cache.ttl60000INT缓存数据的过期时间
lookup.max-retries3INT维表从数据库查询数据失败进行重试的次数
lookup.error-limit0LONG查询数据失败,脏数据数量
lookup.fetch-size1000INTjdbc的fetchSize参数
lookup.async-timeout10000INT维表异步查询超时时间 单位毫秒
lookup.parallelism(none)INT维表并行度

结果表参数

名称是否必须类型描述
sink.buffer-flush.max-rows1024INT批量写数据大小
sink.buffer-flush.interval10000INT批量写间隔时间
sink.parallelism(none)INTsink算子并行度
sink.all-replacefalseBoolean写入数据时,如果是根据主键更新,则是否NULL替换原有字段值
sink.semanticat-least-once(默认) exactly-onceString任务一致性级别at-least-once至少一次,数据有可能重复写入exactly-once 精确一次,数据写入不多不少,不会重复也不会丢失

脚本示例

CREATE TABLE source
(
CHAR1 CHAR,
CHARACTER1 CHARACTER,
VARCHAR1 VARCHAR,
VARCHAR21 VARCHAR,
NUMERIC1 NUMERIC,
DECIMAL1 DECIMAL,
BIT1 BOOLEAN,
INTEGER1 INTEGER,
INT1 INT,
BIGINT1 BIGINT,
TINYINT1 TINYINT,
BYTE1 TINYINT,
SMALLINT1 SMALLINT,
DOUBLE1 DOUBLE,
DATE1 DATE,
TIME1 TIME,
TIMESTAMP1 TIMESTAMP,
DATETIME1 TIMESTAMP,
DEC1 DECIMAL,
FLOAT1 DOUBLE,
REAL1 FLOAT,
TEXT1 VARCHAR,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'dm-x',
'url' = 'jdbc:dm://127.0.0.1:5236',
'schema-name' = 'Test',
'table-name' = 'TABLE_1',
'username' = 'SYSDBA',
'password' = 'SYSDBA',
'scan.fetch-size' = '2',
'scan.query-timeout' = '10'
);

CREATE TABLE side
(
INT2 INT,
PRIMARY KEY (INT2) NOT ENFORCED
) WITH (
'connector' = 'dm-x',
'url' = 'jdbc:dm://127.0.0.1:5236',
'schema-name' = 'Test',
'table-name' = 'TABLE_2',
'username' = 'SYSDBA',
'password' = 'SYSDBA',
'scan.fetch-size' = '2',
'scan.query-timeout' = '10',
'lookup.cache-type' = 'lru'
);


CREATE TABLE sink
(
CHAR1 CHAR,
CHARACTER1 CHARACTER,
VARCHAR1 VARCHAR,
VARCHAR21 VARCHAR,
NUMERIC1 NUMERIC,
DECIMAL1 DECIMAL,
BIT1 BOOLEAN,
INTEGER1 INTEGER,
INT1 INT,
BIGINT1 BIGINT,
TINYINT1 TINYINT,
BYTE1 TINYINT,
SMALLINT1 SMALLINT,
DOUBLE1 DOUBLE,
DATE1 DATE,
TIME1 TIME,
TIMESTAMP1 TIMESTAMP,
DATETIME1 TIMESTAMP,
DEC1 DECIMAL,
FLOAT1 DOUBLE,
REAL1 FLOAT,
TEXT1 VARCHAR,
INT2 INT,
PRIMARY KEY (INT2) NOT ENFORCED
) WITH (
'connector' = 'dm-x',
'url' = 'jdbc:dm://127.0.0.1:5236',
'schema-name' = 'Test',
'table-name' = 'TABLE_3',
'username' = 'SYSDBA',
'password' = 'SYSDBA',
'sink.buffer-flush.max-rows' = '1',
'sink.all-replace' = 'true'
);

create
TEMPORARY view view_out
as
select u.CHAR1
, u.CHARACTER1
, u.VARCHAR1
, u.VARCHAR21
, u.NUMERIC1
, u.DECIMAL1
, u.BIT1
, u.INTEGER1
, u.INT1
, u.BIGINT1
, u.TINYINT1
, u.BYTE1
, u.SMALLINT1
, u.DOUBLE1
, u.DATE1
, u.TIME1
, u.TIMESTAMP1
, u.DATETIME1
, u.DEC1
, u.FLOAT1
, u.REAL1
, u.TEXT1
, s.INT2
from source u
left join side FOR SYSTEM_TIME AS OF u.PROCTIME AS s
on u.INT1 = s.INT2;

insert into sink
select *
from view_out;