Oracle Lookup
一、介绍
Oracle维表,支持全量和异步方式
全量缓存:将维表数据全部加载到内存中,建议数据量不大使用。
异步缓存:使用异步方式查询数据,并将查询到的数据使用lru缓存到内存中,建议数据量大使用。
二、支持版本
Oracle 9 及以上
三、插件名称
SQL | oracle-x |
---|
四、参数说明
connector
- 描述:oracle-x
- 必选:是
- 参数类型:String
- 默认值:无
url
- 描述:jdbc:oracle:thin:@0.0.0.1:1521:orcl
- 必选:是
- 参数类型:String
- 默认值:无
schema-name
- 描述:schema名称
- 必选:是
- 参数类型:String
- 默认值:无
table-name
- 描述:表名
- 必选:是
- 参数类型:String
- 默认值:无:
username
- 描述:username
- 必选:是
- 参数类型:String
- 默认值:无
password
- 描述:password
- 必选:是
- 参数类型:String
- 默认值:无
lookup.cache-type
- 描述:维表缓存类型(NONE、LRU、ALL),默认LRU
- 必选:否
- 参数类型:string
- 默认值:LRU
lookup.cache-period
- 描述:ALL维表每隔多久加载一次数据,默认3600000毫秒(一个小时)
- 必选:否
- 参数类型:long
- 默认值:3600000
lookup.cache.max-rows
- 描述:lru维表缓存数据的条数,默认10000条
- 必选:否
- 参数类型:int
- 默认值:10000
lookup.cache.ttl
- 描述:lru维表缓存数据的时间,默认60000毫秒(一分钟)
- 必选:否
- 参数类型:int
- 默认值:60000
lookup.fetch-size
- 描述:ALL维表每次从数据库加载的条数,默认1000条
- 必选:否
- 参数类型:int
- 默认值:1000
lookup.parallelism
- 描述:维表并行度
- 必选:否
- 参数类型:int
- 默认值:无
vertx.worker-pool-size
- 描述:线程池大小
- 必选:否
- 参数类型:int
- 默认值:5
lookup.max-retries
- 描述:查询失败最大重试次数
- 必选:否
- 参数类型:int
- 默认值:3
lookup.error-limit
- 描述:查询失败超时的数据条数最大限制(仅针对于lru)
- 必选:否
- 参数类型:long
- 默认值:0
- lookup.async-timeout
- 描述:异步查询超时时间 单位毫秒
- 必选:否
- 参数类型:int
- 默认值:10000
五、数据类型
支持 | SMALLINT、BINARY_DOUBLE、CHAR、VARCHAR、VARCHAR2、NCHAR、NVARCHAR2、INT、INTEGER、NUMBER、DECIMAL、FLOAT、DATE、RAW、LONG RAW、BINARY_FLOAT、TIMESTAMP、TIMESTAMP WITH LOCAL TIME ZONE、TIMESTAMP WITH TIME ZON、INTERVAL YEAR、INTERVAL DAY |
---|---|
暂不支持 | BFILE、XMLTYPE、Collections、BLOB、CLOB、NCLOB |
注意:由于 flink DecimalType 的 PRECISION(1~38) 与 SCALE(0~PRECISION) 限制,oracle 的数值类型的数据在转换时可能会丢失精度
六、脚本示例
CREATE TABLE source
(
id decimal(38,0) ,
t_binary_double double ,
t_binary_float float ,
t_char string ,
t_char_varying string ,
t_character string ,
t_character_varying string ,
t_date date ,
t_decimal decimal(38,0),
t_double_precision decimal(38,0) ,
t_float decimal(38,0) ,
t_int decimal(38,0) ,
t_integer decimal(38,0) ,
t_long string ,
t_national_char string ,
t_national_char_varying string ,
t_national_character string ,
t_national_character_varying string ,
t_nchar string ,
t_nchar_varying string ,
t_number_1 decimal(38,0) ,
t_number_2 decimal(38,0) ,
t_number_3 decimal(38,0) ,
t_numeric decimal(38,0) ,
t_nvarchar2 string,
t_raw bytes ,
t_real decimal(38,0) ,
t_timestamp timestamp ,
t_varchar string ,
t_varchar2 string,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'oracle-x',
'url' = 'jdbc:oracle:thin:@localhost:1521:orcl',
'table-name' = 'oracle_all_type_lookup',
'username' = 'oracle',
'password' = 'oracle',
'scan.fetch-size' = '2',
'scan.query-timeout' = '10',
'scan.start-location' = '1000',
'scan.increment.column' = 'id',
'scan.increment.column-type' = 'decimal'
);
CREATE TABLE side
(
id decimal(38,0) ,
t_binary_double double ,
t_binary_float float ,
t_char string ,
t_char_varying string ,
t_character string ,
t_character_varying string ,
t_date date ,
t_decimal decimal(38,0),
t_double_precision decimal(38,0) ,
t_float decimal(38,0) ,
t_int decimal(38,0) ,
t_integer decimal(38,0) ,
t_long string ,
t_national_char string ,
t_national_char_varying string ,
t_national_character string ,
t_national_character_varying string ,
t_nchar string ,
t_nchar_varying string ,
t_number_1 decimal(38,0) ,
t_number_2 decimal(38,0) ,
t_number_3 decimal(38,0) ,
t_numeric decimal(38,0) ,
t_nvarchar2 string,
t_raw bytes ,
t_real decimal(38,0) ,
t_timestamp timestamp ,
t_varchar string ,
t_varchar2 string,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oracle-x',
'url' = 'jdbc:oracle:thin:@localhost:1521:orcl',
'table-name' = 'oracle_all_type_lookup',
'username' = 'oracle',
'password' = 'oracle',
'lookup.cache-type' = 'lru'
-- 'lookup.cache-type' = 'all'
);
CREATE TABLE sink
(
id decimal(38,0) ,
t_binary_double double ,
t_binary_float float ,
t_char string ,
t_char_varying string ,
t_character string ,
t_character_varying string ,
t_date date ,
t_decimal decimal(38,0),
t_double_precision decimal(38,0) ,
t_float decimal(38,0) ,
t_int decimal(38,0) ,
t_integer decimal(38,0) ,
t_long string ,
t_national_char string ,
t_national_char_varying string ,
t_national_character string ,
t_national_character_varying string ,
t_nchar string ,
t_nchar_varying string ,
t_number_1 decimal(38,0) ,
t_number_2 decimal(38,0) ,
t_number_3 decimal(38,0) ,
t_numeric decimal(38,0) ,
t_nvarchar2 string,
t_raw bytes ,
t_real decimal(38,0) ,
t_timestamp timestamp ,
t_varchar string ,
t_varchar2 string
) WITH (
'connector' = 'oracle-x',
'url' = 'jdbc:oracle:thin:@localhost:1521:orcl',
'table-name' = 'oracle_all_type_sink',
'username' = 'oracle',
'password' = 'oracle',
'sink.buffer-flush.max-rows' = '2000',
'sink.all-replace' = 'true',
'sink.buffer-flush.interval' = '0'
);
create
TEMPORARY view view_out
as
select s.id AS id,
u.t_binary_double as t_binary_double,
u.t_binary_float as t_binary_float,
u.t_char as t_char,
u.t_char_varying as t_char_varying,
u.t_character as t_character,
u.t_character_varying as t_character_varying,
u.t_date as t_date,
u.t_decimal as t_decimal,
u.t_double_precision as t_double_precision,
u.t_float as t_float,
u.t_int as t_int,
u.t_integer as t_integer,
u.t_long as t_long,
u.t_national_char as t_national_char,
u.t_national_char_varying as t_national_char_varying,
u.t_national_character as t_national_character,
u.t_national_character_varying as t_national_character_varying,
u.t_nchar as t_nchar,
u.t_nchar_varying as t_nchar_varying,
u.t_number_1 as t_number_1,
u.t_number_2 as t_number_2,
u.t_number_3 as t_number_3,
u.t_numeric as t_numeric,
u.t_nvarchar2 as t_nvarchar2,
u.t_raw as t_raw,
u.t_real as t_real,
u.t_timestamp as t_timestamp,
u.t_varchar as t_varchar,
u.t_varchar2 as t_varchar2
from source u
inner join side FOR SYSTEM_TIME AS OF u.PROCTIME AS s
on u.id = s.id;
insert into sink
select *
from view_out;