Skip to main content

结果表

一、介绍

向MongoDB中写入数据

二、支持版本

MongoDB 3.4及以上

三、插件名称

SQLmongodb-x

四、参数说明

1、SQL

SQL计算暂时只支持INSERT模式,后续可加入如果配置主键则使用UPSERT模式。

  • url
    • 描述:MongoDB数据库连接的URL字符串,详细请参考MongoDB官方文档
    • 必选:是
    • 默认值:无
  • database
    • 描述:数据库名称
    • 必选:是
    • 默认值:无
  • collection
    • 描述:集合名称
    • 必选:是
    • 默认值:无
  • username
    • 描述:数据源的用户名
    • 必选:否
    • 默认值:无
  • password
    • 描述:数据源指定用户名的密码
    • 必选:否
    • 默认值:无
  • sink.parallelism
    • 描述:sink并行度
    • 必选:是
    • 默认值:无
  • sink.buffer-flush.max-rows
    • 描述:批量写入条数
    • 必选:否
    • 默认值:无
  • sink.buffer-flush.interval
    • 描述:批量写入时间间隔:单位毫秒。
    • 必选:否
    • 默认值:无

五、数据类型

支持int
long
double
decimal
objectId
string
bindata
date
timestamp
bool
暂不支持array

六、脚本示例

-- {"id":100,"name":"lb james阿道夫","money":293.899778,"dateone":"2020-07-30 10:08:22","age":"33","datethree":"2020-07-30 10:08:22.123","datesix":"2020-07-30 10:08:22.123456","datenigth":"2020-07-30 10:08:22.123456789","dtdate":"2020-07-30","dttime":"10:08:22"}
CREATE TABLE ods_k
(
val_int INT,
val_long BIGINT,
val_double DOUBLE,
val_decimal DECIMAL,
`_id` STRING,
val_str STRING,
val_bindata STRING,
val_date DATE,
val_timestamp TIMESTAMP,
val_bool BOOLEAN
-- PROCTIME AS PROCTIME()
) WITH (
'connector' = 'kafka-x',
'topic' = 'luna',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'luna_g',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
'json.timestamp-format.standard' = 'SQL'
);

CREATE TABLE sink_mongo
(
val_int INT,
val_long BIGINT,
val_double DOUBLE,
val_decimal DECIMAL,
`_id` STRING,
val_str STRING,
val_bindata VARBINARY,
val_date DATE,
val_timestamp TIMESTAMP,
val_bool BOOLEAN
) WITH (
'connector' = 'mongodb-x',
'url' = 'mongodb://localhost:27017',
'database' = 'flink_dev',
'collection' = 'dim_m',
'sink.parallelism' = '1'
);

INSERT INTO sink_mongo
SELECT
val_int ,
val_long ,
val_double ,
val_decimal ,
`_id` ,
val_str ,
CAST(val_bindata AS VARBINARY) ,
val_date ,
val_timestamp ,
val_bool
FROM ods_k;