Skip to main content

介绍

doris 在 0.15.x 开始改变了部分 api,并且新版本接口不兼容旧接口,因此需要根据针对于写入 doris 而言,有两套不同的参数。

如何创建一个doris表

CREATE TABLE example_db.table_hash
(
k1 TINYINT,
k2 DECIMAL(10, 2) DEFAULT "10.5",
k3 CHAR(10) COMMENT "string column",
k4 INT NOT NULL DEFAULT "1" COMMENT "int column"
)
COMMENT "my first table"
DISTRIBUTED BY HASH(k1) BUCKETS 32

连接器参数

连接器支持的参数主要是由基础参数加上对应表角色类型参数,如源表则为基础参数加上源表参数,结果表参数为基础参数加上结果表参数

基础参数

名称是否必须类型描述
connector(none)String插件名称, doris 插件需要配置为 doris-x
username(none)String数据源用户名
password(none)String数据源密码

源表参数

名称是否必须类型描述
url(none)String数据源 jdbcUrl
table.identifier(none)String读取的表的完全限定名,形如 database.table
table-name(none)String读取的表名,如果填写了 table.identifier,该值不生效
schema-name(none)String读取的表所在 database,如果填写了 table.identifier,该值不生效
scan.parallelism1INT读取并行度
scan.partition.column(none)String多并行度时,根据此字段进行划分每个子任务读取范围,相当于splitPk
scan.partition.strategy(none)String多并行度时,划分子任务的策略,相当于splitStrategy
scan.increment.column(none)String增量字段名称,相当于increColumn
scan.increment.column-type(none)String增量字段类型,相当于increColumnType
scan.start-location(none)String增量任务的起点,相当于startLocation
scan.polling-interval0INT间隔轮询的间隔查询时间,单位毫秒大于0时则认为开启间隔轮询,相当于pollingInterval
scan.query-timeout0INT查询的超时时间
scan.fetch-size1024INTjdbc的FetchSize参数
scan.restore.columnname(none)String断点续传字段名称
scan.restore.columntype(none)String断点续传字段类型

结果表参数(doris for jdbc)

名称是否必须类型描述
url(none)String数据源 jdbcUrl
table.identifier(none)String读取的表的完全限定名,形如 database.table
table-name(none)String读取的表名,如果填写了 table.identifier,该值不生效
schema-name(none)String读取的表所在 database,如果填写了 table.identifier,该值不生效
sink.buffer-flush.max-rows1024INT批量写数据大小
sink.buffer-flush.interval10000INT批量写间隔时间
sink.parallelism(none)INTsink算子并行度
sink.all-replacefalseBoolean写入数据时,如果是根据主键更新,则是否NULL替换原有字段值
sink.semanticat-least-once(默认) exactly-onceString任务一致性级别at-least-once至少一次,数据有可能重复写入exactly-once 精确一次,数据写入不多不少,不会重复也不会丢失

结果表参数(doris for stream load under 0.15.x)

名称是否必须类型描述
fenodes(none)Stringdoris fe 节点信息
table.identifier(none)String读取的表的完全限定名,形如 database.table
sink.buffer-flush.max-rows1024INT批量写数据大小
sink.buffer-flush.interval10000INT批量写间隔时间
sink.parallelism(none)INTsink算子并行度
sink.all-replacefalseBoolean写入数据时,如果是根据主键更新,则是否NULL替换原有字段值
sink.semanticat-least-once(默认) exactly-onceString任务一致性级别at-least-once至少一次,数据有可能重复写入exactly-once 精确一次,数据写入不多不少,不会重复也不会丢失
api.versionV1StringV1 代表 doris 版本小于 0.15.x,默认使用 V1

结果表参数(doris for stream load above 0.15.x)

名称是否必须类型描述
fenodes(none)Stringdoris fe 节点信息
sink.buffer-flush.max-rows1024INT批量写数据大小
sink.buffer-flush.interval10000INT批量写间隔时间
sink.parallelism(none)INTsink算子并行度
sink.all-replacefalseBoolean写入数据时,如果是根据主键更新,则是否NULL替换原有字段值
sink.semanticat-least-once(默认) exactly-onceString任务一致性级别at-least-once至少一次,数据有可能重复写入exactly-once 精确一次,数据写入不多不少,不会重复也不会丢失
api.versionV2StringV1 代表 doris 版本小于 0.15.x,默认使用 V1,当 doris 版本大于 0.15.x 时需要显式声明 api.version 为 V2
sink.properties.columnsV2String特殊可选参数,当写入 bitmap 类型的数据时需要使用到该参数

脚本示例

CREATE TABLE source (
id INT,
`value` STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'kafka-x'
,'topic' = 'jier-doris'
,'properties.bootstrap.servers' = '172.16.83.77:9092'
,'properties.group.id' = 'jier'
,'scan.startup.mode' = 'latest-offset'
,'format' = 'json'
,'json.timestamp-format.standard' = 'SQL'
,'json.ignore-parse-errors' = 'true'
);

CREATE TABLE sink
(
id INT,
`value` STRING
) WITH (
'connector' = 'doris-x',
'jdbc-url' = 'jdbc:mysql://172.16.85.176:9030',
'table.identifier' = 'jier.test001',
'username' = 'root',
'password' = '',
'api.version' = 'V2'
);

insert into sink
select id, `value`
from source;