CREATE TABLE source
(
id INT,
name STRING,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp(6),
datenigth timestamp(9),
dtdate date,
dttime time,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'kafka-x'
,'topic' = 'da'
,'properties.bootstrap.servers' = 'kudu1:9092'
,'properties.group.id' = 'luna_g'
,'scan.startup.mode' = 'earliest-offset'
,'format' = 'json'
,'json.timestamp-format.standard' = 'SQL'
);
CREATE TABLE side
(
id int,
name varchar,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp,
phone bigint,
wechat varchar,
income decimal,
birthday timestamp,
dtdate date,
dttime time,
today date,
timecurrent time,
aboolean boolean,
adouble double,
afloat float,
achar char,
abinary BYTES,
atinyint tinyint,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://localhost:3306/test',
'schema-name' = 'test',
'table-name' = 'flink_out',
'username' = 'root',
'password' = 'root'
,'lookup.cache-type' = 'all'
,'lookup.cache-period' = '4600000'
,'lookup.cache.max-rows' = '20000'
,'lookup.cache.ttl' = '700000'
,'lookup.fetch-size' = '2000'
,'lookup.async-timeout' = '30000'
);
CREATE TABLE sink
(
id int,
name varchar,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp,
phone bigint,
wechat varchar,
income decimal,
birthday timestamp,
dtdate date,
dttime time,
today date,
timecurrent time,
aboolean boolean,
adouble double,
afloat float,
achar char,
abinary BYTES,
atinyint tinyint
, PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'flink_type',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.max-rows' = '1024',
'sink.buffer-flush.interval' = '10000',
'sink.all-replace' = 'true',
'sink.parallelism' = '1'
);
create
TEMPORARY view view_out
as
select u.id
, u.name
, u.money
, u.dateone
, u.age
, u.datethree
, u.datesix
, s.phone
, s.wechat
, s.income
, s.birthday
, u.dtdate
, u.dttime
, s.today
, s.timecurrent
, s.aboolean
, s.adouble
, s.afloat
, s.achar
, s.abinary
, s.atinyint
from source u
left join side FOR SYSTEM_TIME AS OF u.PROCTIME AS s
on u.id = s.id;
insert into sink
select *
from view_out;