CREATE TABLE source_ods_fact_user_ippv
(
id INT,
name STRING,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp(6),
datenigth timestamp(9),
dtdate date,
dttime time,
ts TIMESTAMP(3) METADATA FROM 'timestamp',
partition_id BIGINT METADATA FROM 'partition' VIRTUAL,
WATERMARK FOR datethree AS datethree - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka-x'
,'topic' = 'da'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'luna_g'
,'scan.startup.mode' = 'earliest-offset'
,'format' = 'json'
,'json.timestamp-format.standard' = 'SQL'
);
CREATE TABLE result_total_pvuv_min
(
id INT,
name STRING,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp(6),
datenigth timestamp(9),
dtdate date,
dttime time,
ts TIMESTAMP(3),
partition_id BIGINT,
PRIMARY KEY (id, name, money, dateone, age, datethree, datesix, datenigth, dtdate, dttime, ts,
partition_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka-x'
,'topic' = 'test'
,'properties.bootstrap.servers' = 'localhost:9092'
,'key.format' = 'json'
,'value.format' = 'json'
,'value.fields-include' = 'ALL'
);
INSERT INTO result_total_pvuv_min
SELECT id
, name
, money
, dateone
, age
, datethree
, datesix
, datenigth
, dtdate
, dttime
, ts
, partition_id
from source_ods_fact_user_ippv
group by id
, name
, money
, dateone
, age
, datethree
, datesix
, datenigth
, dtdate
, dttime
, ts
, partition_id
;