Skip to main content

DB2 Lookup

一、介绍

db2维表,支持全量和异步方式
全量缓存:将维表数据全部加载到内存中,建议数据量不大使用。
异步缓存:使用异步方式查询数据,并将查询到的数据使用lru缓存到内存中,建议数据量大使用。

二、支持版本

三、插件名称

SQLdb2-x

四、参数说明

  • connector

    • 描述:db2-x
    • 必选:是
    • 参数类型:String
    • 默认值:无
  • url

    • 描述:jdbc:db2://localhost:50002/DT_TEST
    • 必选:是
    • 参数类型:String
    • 默认值:无
  • table-name

    • 描述:表名
    • 必选:是
    • 参数类型:String
    • 默认值:无:
  • username

    • 描述:username
    • 必选:是
    • 参数类型:String
    • 默认值:无
  • password

    • 描述:password
    • 必选:是
    • 参数类型:String
    • 默认值:无
  • lookup.cache-type

    • 描述:维表缓存类型(NONE、LRU、ALL),默认LRU
    • 必选:否
    • 参数类型:string
    • 默认值:LRU
  • lookup.cache-period

    • 描述:ALL维表每隔多久加载一次数据,默认3600000毫秒(一个小时)
    • 必选:否
    • 参数类型:string
    • 默认值:3600000
  • lookup.cache.max-rows

    • 描述:lru维表缓存数据的条数,默认10000条
    • 必选:否
    • 参数类型:string
    • 默认值:10000
  • lookup.cache.ttl

    • 描述:lru维表缓存数据的时间,默认60000毫秒(一分钟)
    • 必选:否
    • 参数类型:string
    • 默认值:60000
  • lookup.fetch-size

    • 描述:ALL维表每次从数据库加载的条数,默认1000条
    • 必选:否
    • 参数类型:string
    • 默认值:1000
  • lookup.parallelism

    • 描述:维表并行度
    • 必选:否
    • 参数类型:string
    • 默认值:无

五、数据类型

支持INT,BIGINT,SMALLINT,DOUBLE,DECFLOAT,DECIMAL,VARCHAR,CHAR,CLOB,DECIMAL,TIMESTAMP,DATETIME,DATE,TIME,BYTES
暂不支持

六、脚本示例

-- {"id":100,"name":"lb james阿道夫","money":293.899778,"dateone":"2020-07-30 10:08:22","age":"33","datethree":"2020-07-30 10:08:22.123","datesix":"2020-07-30 10:08:22.123456","datenigth":"2020-07-30 10:08:22.123456789","dtdate":"2020-07-30","dttime":"10:08:22"}
CREATE TABLE source
(
id INT,
name STRING,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp(6),
datenigth timestamp(9),
dtdate date,
dttime time,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'kafka-x'
,'topic' = 'da'
,'properties.bootstrap.servers' = 'kudu1:9092'
,'properties.group.id' = 'luna_g'
,'scan.startup.mode' = 'earliest-offset'
-- ,'scan.startup.mode' = 'latest-offset'
,'format' = 'json'
,'json.timestamp-format.standard' = 'SQL'
);


CREATE TABLE side
(
ID int,
NAME varchar,
MONEY decimal,
DATEONE timestamp,
AGE bigint,
DATETHREE timestamp,
DATESIX timestamp,
PHONE bigint,
WECHAT varchar,
INCOME decimal,
BIRTHDAY timestamp,
DTDATE date,
DTTIME time,
TODAY date,
TIMECURRENT time,
ABOOLEAN smallint ,
ADOUBLE double,
AFLOAT decimal ,
ACHAR char,
ABINARY BYTES,
ATINYINT smallint ,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector' = 'db2-x',
'url' = 'jdbc:db2://localtest:50002/DT_TEST',
'table-name' = 'FLINK_DIM',
'username' = 'db2inst1',
'password' = 'dtstack1',
'lookup.cache-type' = 'all',
'lookup.parallelism' = '2'
);

CREATE TABLE sink
(
ID int,
NAME varchar,
MONEY decimal,
DATEONE timestamp,
AGE bigint,
DATETHREE timestamp,
DATESIX timestamp,
PHONE bigint,
WECHAT varchar,
INCOME decimal,
BIRTHDAY timestamp,
DTDATE date,
DTTIME time,
TODAY date,
TIMECURRENT time,
ABOOLEAN smallint,
ADOUBLE double,
AFLOAT decimal ,
ACHAR char,
ABINARY BYTES,
ATINYINT smallint
) WITH (
'connector' = 'stream-x',
'sink.parallelism' = '1'
);

create
TEMPORARY view view_out
as
select u.id
, u.name
, u.money
, u.dateone
, u.age
, u.datethree
, u.datesix
, s.PHONE
, s.WECHAT
, s.INCOME
, s.BIRTHDAY
, u.dtdate
, u.dttime
, s.TODAY
, s.TIMECURRENT
, s.ABOOLEAN
, s.ADOUBLE
, s.AFLOAT
, s.ACHAR
, s.ABINARY
, s.ATINYINT
from source u
left join side FOR SYSTEM_TIME AS OF u.PROCTIME AS s
on u.id = s.ID;

insert into sink
select *
from view_out;