Skip to main content

实时采集

以实时电商案例数据源表存储为例,为您介绍如何准备数据源表存储并将其实时采集到Kafka内等待消费。

数据源表创建

在本地MySQL数据库通过建表语句创建数据源表

-- 创建数据源表 singal_stream_simple_source
create table singal_stream_simple_source
(
event_id int unsigned auto_increment, -- 数据库自增id,主键
id varchar(255), -- 商品id
buyer_id varchar(255), -- 消费者id
goods_type varchar(255), -- 商品类型
pay_time timestamp, -- 支付时间
buy_amount double, -- 购买金额
buy_way varchar(255), -- 购买渠道
primary key (event_id)
)
SQLCopied!

数据源接入

1.登陆 实时计算模块
2.选择目标项目,点击 数据开发,进入项目。
3.点击 数据源-引用数据源 ,弹出 引用数据源 窗口如下:

数据源引入

列表中仅展示分配给实时开发的数据源,若无目标数据源需前往 数据源中心 分配至实时开发产品。 4.根据数据源类型进行筛选,选择目标数据源接入至实时开发中。

创建实时采集任务

静态关系数据库中的数据通过实时采集任务可转化为Kafka内的数据流,从而进入Flinkx被消费。
1.在 数据开发 页面,点击创建任务 ,创建 实时采集 任务如下:

实时采集-1.png

2.数据源类型选择 MySQLBinLog ,选择实时采集的数据源表,并配置相关参数

实时采集-2.png

3.指向目标Kafka的某一Topic(后续FlinkSQL任务中会调用这一Topic)

实时采集-4.png

4.根据需求配置速率,默认不修改

实时采集-5.png

5.再次浏览所有配置内容,确认无误后点击 保存 即完成实时采集任务配置。

实时采集-6.png

此时,实时采集任务已创建于开发环境,后续调用需提交至生产环境。