Oracle Sink
一、介绍
oracle sink
二、支持版本
Oracle 9 及以上
三、插件名称
SQL | oracle-x |
---|
四、参数说明
1、SQL
connector
- 描述:oracle-x
- 必选:是
- 参数类型:String
- 默认值:无
url
- 描述:jdbc:oracle:thin:@0.0.0.1:1521:orcl
- 必选:是
- 参数类型:String
- 默认值:无
schema-name
- 描述:schema名称
- 必选:是
- 参数类型:String
- 默认值:无
table-name
- 描述:表名
- 必选:是
- 参数类型:String
- 默认值:无:
username
- 描述:username
- 必选:是
- 参数类型:String
- 默认值:无
password
- 描述:password
- 必选:是
- 参数类型:String
- 默认值:无
sink.buffer-flush.max-rows
- 描述:批量写数据条数,单位:条
- 必选:否
- 参数类型:String
- 默认值:1024
sink.buffer-flush.interval
- 描述:批量写时间间隔,单位:毫秒
- 必选:否
- 参数类型:String
- 默认值:10000
sink.all-replace
- 描述:是否全部替换数据库中的数据(如果数据库中原值不为null,新值为null,如果为true则会替换为null)
- 必选:否
- 参数类型:boolean
- 默认值:false
sink.parallelism
- 描述:写入结果的并行度
- 必选:否
- 参数类型:String
- 默认值:无
sink.semantic
- 描述:sink端是否支持二阶段提交
- 注意:
- 如果此参数为空,默认不开启二阶段提交,即sink端不支持exactly_once语义;
- 当前只支持exactly-once 和at-least-once
- 必选:否
- 参数类型:String
- 示例:"semantic": "exactly-once"
- 默认值:at-least-once
五、数据类型
支持 | SMALLINT、BINARY_DOUBLE、CHAR、VARCHAR、VARCHAR2、NCHAR、NVARCHAR2、INT、INTEGER、NUMBER、DECIMAL、FLOAT、DATE、RAW、LONG RAW、BINARY_FLOAT、TIMESTAMP、TIMESTAMP WITH LOCAL TIME ZONE、TIMESTAMP WITH TIME ZON、INTERVAL YEAR、INTERVAL DAY |
---|---|
暂不支持 | BFILE、XMLTYPE、Collections |
仅在 Sync 中支持 | BLOB、CLOB、NCLOB |
注意:由于 flink DecimalType 的 PRECISION(1~38) 与 SCALE(0~PRECISION) 限制,oracle 的数值类型的数据在转换时可能会丢失精度
六、脚本示例
CREATE TABLE source
(
id decimal(38,0) ,
t_binary_double double ,
t_binary_float float ,
t_char string ,
t_char_varying string ,
t_character string ,
t_character_varying string ,
t_date date ,
t_decimal decimal(38,0),
t_double_precision decimal(38,0) ,
t_float decimal(38,0) ,
t_int decimal(38,0) ,
t_integer decimal(38,0) ,
t_long string ,
t_national_char string ,
t_national_char_varying string ,
t_national_character string ,
t_national_character_varying string ,
t_nchar string ,
t_nchar_varying string ,
t_number_1 decimal(38,0) ,
t_number_2 decimal(38,0) ,
t_number_3 decimal(38,0) ,
t_numeric decimal(38,0) ,
t_nvarchar2 string,
t_raw bytes ,
t_real decimal(38,0) ,
t_timestamp timestamp ,
t_varchar string ,
t_varchar2 string,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'oracle-x',
'url' = 'jdbc:oracle:thin:@localhost:1521:orcl',
'table-name' = 'oracle_all_type_lookup',
'username' = 'oracle',
'password' = 'oracle',
'scan.fetch-size' = '2',
'scan.query-timeout' = '10',
'scan.start-location' = '1000',
'scan.increment.column' = 'id',
'scan.increment.column-type' = 'decimal'
);
CREATE TABLE side
(
id decimal(38,0) ,
t_binary_double double ,
t_binary_float float ,
t_char string ,
t_char_varying string ,
t_character string ,
t_character_varying string ,
t_date date ,
t_decimal decimal(38,0),
t_double_precision decimal(38,0) ,
t_float decimal(38,0) ,
t_int decimal(38,0) ,
t_integer decimal(38,0) ,
t_long string ,
t_national_char string ,
t_national_char_varying string ,
t_national_character string ,
t_national_character_varying string ,
t_nchar string ,
t_nchar_varying string ,
t_number_1 decimal(38,0) ,
t_number_2 decimal(38,0) ,
t_number_3 decimal(38,0) ,
t_numeric decimal(38,0) ,
t_nvarchar2 string,
t_raw bytes ,
t_real decimal(38,0) ,
t_timestamp timestamp ,
t_varchar string ,
t_varchar2 string,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oracle-x',
'url' = 'jdbc:oracle:thin:@localhost:1521:orcl',
'table-name' = 'oracle_all_type_lookup',
'username' = 'oracle',
'password' = 'oracle',
'lookup.cache-type' = 'lru'
-- 'lookup.cache-type' = 'all'
);
CREATE TABLE sink
(
id decimal(38,0) ,
t_binary_double double ,
t_binary_float float ,
t_char string ,
t_char_varying string ,
t_character string ,
t_character_varying string ,
t_date date ,
t_decimal decimal(38,0),
t_double_precision decimal(38,0) ,
t_float decimal(38,0) ,
t_int decimal(38,0) ,
t_integer decimal(38,0) ,
t_long string ,
t_national_char string ,
t_national_char_varying string ,
t_national_character string ,
t_national_character_varying string ,
t_nchar string ,
t_nchar_varying string ,
t_number_1 decimal(38,0) ,
t_number_2 decimal(38,0) ,
t_number_3 decimal(38,0) ,
t_numeric decimal(38,0) ,
t_nvarchar2 string,
t_raw bytes ,
t_real decimal(38,0) ,
t_timestamp timestamp ,
t_varchar string ,
t_varchar2 string
) WITH (
'connector' = 'oracle-x',
'url' = 'jdbc:oracle:thin:@localhost:1521:orcl',
'table-name' = 'oracle_all_type_sink',
'username' = 'oracle',
'password' = 'oracle',
'sink.buffer-flush.max-rows' = '2000',
'sink.all-replace' = 'true',
'sink.buffer-flush.interval' = '0'
);
create
TEMPORARY view view_out
as
select s.id AS id,
u.t_binary_double as t_binary_double,
u.t_binary_float as t_binary_float,
u.t_char as t_char,
u.t_char_varying as t_char_varying,
u.t_character as t_character,
u.t_character_varying as t_character_varying,
u.t_date as t_date,
u.t_decimal as t_decimal,
u.t_double_precision as t_double_precision,
u.t_float as t_float,
u.t_int as t_int,
u.t_integer as t_integer,
u.t_long as t_long,
u.t_national_char as t_national_char,
u.t_national_char_varying as t_national_char_varying,
u.t_national_character as t_national_character,
u.t_national_character_varying as t_national_character_varying,
u.t_nchar as t_nchar,
u.t_nchar_varying as t_nchar_varying,
u.t_number_1 as t_number_1,
u.t_number_2 as t_number_2,
u.t_number_3 as t_number_3,
u.t_numeric as t_numeric,
u.t_nvarchar2 as t_nvarchar2,
u.t_raw as t_raw,
u.t_real as t_real,
u.t_timestamp as t_timestamp,
u.t_varchar as t_varchar,
u.t_varchar2 as t_varchar2
from source u
inner join side FOR SYSTEM_TIME AS OF u.PROCTIME AS s
on u.id = s.id;
insert into sink
select *
from view_out;