Skip to main content

维表

一、介绍

MongoDB Lookup查询数据。

二、支持版本

MongoDB 3.4及以上

三、插件名称

SQLmongodb-x

四、参数说明

  • connector
    • 描述:mongodb-x
    • 必选:是
    • 默认值:无
  • url
    • 描述:mongodb://xxxx
    • 必选:是
    • 默认值:无
  • collection
    • 描述:集合名
    • 必选:是
    • 默认值:无
  • username
    • 描述:用户名
    • 必选:是
    • 默认值:无
  • password
    • 描述:密码
    • 必选:否
    • 默认值:无
  • lookup.cache-type
    • 描述:维表缓存类型(NONE、LRU、ALL),默认LRU
    • 必选:否
    • 默认值:LRU
  • lookup.cache-period
    • 描述:ALL维表每隔多久加载一次数据,默认3600000毫秒(一个小时)
    • 必选:否
    • 默认值:3600000
  • lookup.cache.max-rows
    • 描述:lru维表缓存数据的条数
    • 必选:否
    • 默认值:10000
  • lookup.cache.ttl
    • 描述:lru维表缓存数据的时间
    • 必选:否
    • 默认值:60000
  • lookup.fetch-size
    • 描述:ALL维表每次从数据库加载的条数
    • 必选:否
    • 默认值:1000
  • lookup.parallelism
    • 描述:维表并行度
    • 必选:否
    • 默认值:无

五、脚本示例

-- {"id":100,"name":"lb james阿道夫","money":293.899778,"dateone":"2020-07-30 10:08:22","age":"33","datethree":"2020-07-30 10:08:22.123","datesix":"2020-07-30 10:08:22.123456","datenigth":"2020-07-30 10:08:22.123456789","dtdate":"2020-07-30","dttime":"10:08:22"}
CREATE TABLE ods_k
(
id INT,
name STRING,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'kafka-x',
'topic' = 'luna',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'luna_g',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
'json.timestamp-format.standard' = 'SQL'
);

CREATE TABLE lookup_mongo
(
val_int INT,
val_long BIGINT,
val_double DOUBLE,
val_decimal DECIMAL,
`_id` STRING,
val_str STRING,
val_bindata VARBINARY,
val_date DATE,
val_timestamp TIMESTAMP,
val_bool BOOLEAN,
PRIMARY KEY (val_int) NOT ENFORCED
) WITH (
'connector' = 'mongodb-x',
'url' = 'mongodb://localhost:27017',
'database' = 'flink_dev',
'collection' = 'dim_m',
'lookup.cache-type' = 'lru'
);

CREATE TABLE sink_print
(
val_int INT,
val_long BIGINT,
val_double DOUBLE,
val_decimal DECIMAL,
`_id` STRING,
val_str STRING,
val_bindata VARBINARY,
val_date DATE,
val_timestamp TIMESTAMP,
val_bool BOOLEAN
) WITH (
'connector' = 'print'
);


INSERT INTO sink_print
SELECT val_int,
val_long,
val_double,
val_decimal,
`_id`,
val_str,
val_bindata,
val_date,
val_timestamp,
val_bool
FROM ods_k k
LEFT JOIN
lookup_mongo FOR SYSTEM_TIME AS OF k.PROCTIME AS l
ON k.id = l.val_int;