介绍
doris 在 0.15.x 开始改变了部分 api,并且新版本接口不兼容旧接口,因此需要根据针对于写入 doris 而言,有两套不同的参数。
如何创建一个doris表
CREATE TABLE example_db.table_hash
(
k1 TINYINT,
k2 DECIMAL(10, 2) DEFAULT "10.5",
k3 CHAR(10) COMMENT "string column",
k4 INT NOT NULL DEFAULT "1" COMMENT "int column"
)
COMMENT "my first table"
DISTRIBUTED BY HASH(k1) BUCKETS 32
连接器参数
连接器支持的参数主要是由基础参数加上对应表角色类型参数,如源表则为基础参数加上源表参数,结果表参数为基础参数加上结果表参数
基础参数
名称 | 是否必须 | 值 | 类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 插件名称, doris 插件需要配置为 doris-x |
username | 否 | (none) | String | 数据源用户名 |
password | 否 | (none) | String | 数据源密码 |
源表参数
名称 | 是否必须 | 值 | 类型 | 描述 |
---|---|---|---|---|
url | 是 | (none) | String | 数据源 jdbcUrl |
table.identifier | 否 | (none) | String | 读取的表的完全限定名,形如 database.table |
table-name | 否 | (none) | String | 读取的表名,如果填写了 table.identifier,该值不生效 |
schema-name | 否 | (none) | String | 读取的表所在 database,如果填写了 table.identifier,该值不生效 |
scan.parallelism | 否 | 1 | INT | 读取并行度 |
scan.partition.column | 否 | (none) | String | 多并行度时,根据此字段进行划分每个子任务读取范围,相当于splitPk |
scan.partition.strategy | 否 | (none) | String | 多并行度时,划分子任务的策略,相当于splitStrategy |
scan.increment.column | 否 | (none) | String | 增量字段名称,相当于increColumn |
scan.increment.column-type | 否 | (none) | String | 增量字段类型,相当于increColumnType |
scan.start-location | 否 | (none) | String | 增量任务的起点,相当于startLocation |
scan.polling-interval | 否 | 0 | INT | 间隔轮询的间隔查询时间,单位毫秒大于0时则认为开启间隔轮询,相当于pollingInterval |
scan.query-timeout | 否 | 0 | INT | 查询的超时时间 |
scan.fetch-size | 否 | 1024 | INT | jdbc的FetchSize参数 |
scan.restore.columnname | 否 | (none) | String | 断点续传字段名称 |
scan.restore.columntype | 否 | (none) | String | 断点续传字段类型 |
结果表参数(doris for jdbc)
名称 | 是否必须 | 值 | 类型 | 描述 |
---|---|---|---|---|
url | 是 | (none) | String | 数据源 jdbcUrl |
table.identifier | 否 | (none) | String | 读取的表的完全限定名,形如 database.table |
table-name | 否 | (none) | String | 读取的表名,如果填写了 table.identifier,该值不生效 |
schema-name | 否 | (none) | String | 读取的表所在 database,如果填写了 table.identifier,该值不生效 |
sink.buffer-flush.max-rows | 否 | 1024 | INT | 批量写数据大小 |
sink.buffer-flush.interval | 否 | 10000 | INT | 批量写间隔时间 |
sink.parallelism | 否 | (none) | INT | sink算子并行度 |
sink.all-replace | 否 | false | Boolean | 写入数据时,如果是根据主键更新,则是否NULL替换原有字段值 |
sink.semantic | 否 | at-least-once(默认) exactly-once | String | 任务一致性级别at-least-once 至少一次,数据有可能重复写入exactly-once 精确一次,数据写入不多不少,不会重复也不会丢失 |
结果表参数(doris for stream load under 0.15.x)
名称 | 是否必须 | 值 | 类型 | 描述 |
---|---|---|---|---|
fenodes | 是 | (none) | String | doris fe 节点信息 |
table.identifier | 否 | (none) | String | 读取的表的完全限定名,形如 database.table |
sink.buffer-flush.max-rows | 否 | 1024 | INT | 批量写数据大小 |
sink.buffer-flush.interval | 否 | 10000 | INT | 批量写间隔时间 |
sink.parallelism | 否 | (none) | INT | sink算子并行度 |
sink.all-replace | 否 | false | Boolean | 写入数据时,如果是根据主键更新,则是否NULL替换原有字段值 |
sink.semantic | 否 | at-least-once(默认) exactly-once | String | 任务一致性级别at-least-once 至少一次,数据有可能重复写入exactly-once 精确一次,数据写入不多不少,不会重复也不会丢失 |
api.version | 否 | V1 | String | V1 代表 doris 版本小于 0.15.x,默认使用 V1 |
结果表参数(doris for stream load above 0.15.x)
名称 | 是否必须 | 值 | 类型 | 描述 |
---|---|---|---|---|
fenodes | 是 | (none) | String | doris fe 节点信息 |
sink.buffer-flush.max-rows | 否 | 1024 | INT | 批量写数据大小 |
sink.buffer-flush.interval | 否 | 10000 | INT | 批量写间隔时间 |
sink.parallelism | 否 | (none) | INT | sink算子并行度 |
sink.all-replace | 否 | false | Boolean | 写入数据时,如果是根据主键更新,则是否NULL替换原有字段值 |
sink.semantic | 否 | at-least-once(默认) exactly-once | String | 任务一致性级别at-least-once 至少一次,数据有可能重复写入exactly-once 精确一次,数据写入不多不少,不会重复也不会丢失 |
api.version | 是 | V2 | String | V1 代表 doris 版本小于 0.15.x,默认使用 V1,当 doris 版本大于 0.15.x 时需要显式声明 api.version 为 V2 |
sink.properties.columns | 否 | V2 | String | 特殊可选参数,当写入 bitmap 类型的数据时需要使用到该参数 |
脚本示例
CREATE TABLE source (
id INT,
`value` STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'kafka-x'
,'topic' = 'jier-doris'
,'properties.bootstrap.servers' = '172.16.83.77:9092'
,'properties.group.id' = 'jier'
,'scan.startup.mode' = 'latest-offset'
,'format' = 'json'
,'json.timestamp-format.standard' = 'SQL'
,'json.ignore-parse-errors' = 'true'
);
CREATE TABLE sink
(
id INT,
`value` STRING
) WITH (
'connector' = 'doris-x',
'jdbc-url' = 'jdbc:mysql://172.16.85.176:9030',
'table.identifier' = 'jier.test001',
'username' = 'root',
'password' = '',
'api.version' = 'V2'
);
insert into sink
select id, `value`
from source;