实时采集
以实时电商案例数据源表存储为例,为您介绍如何准备数据源表存储并将其实时采集到Kafka内等待消费。
数据源表创建
在本地MySQL数据库通过建表语句创建数据源表
-- 创建数据源表 singal_stream_simple_source
create table singal_stream_simple_source
(
event_id int unsigned auto_increment, -- 数据库自增id,主键
id varchar(255), -- 商品id
buyer_id varchar(255), -- 消费者id
goods_type varchar(255), -- 商品类型
pay_time timestamp, -- 支付时间
buy_amount double, -- 购买金额
buy_way varchar(255), -- 购买渠道
primary key (event_id)
)
SQLCopied!
数据源接入
1.登陆 实时计算模块
2.选择目标项目,点击 数据开发,进入项目。
3.点击 数据源-引用数据源 ,弹出 引用数据源 窗口如下:
列表中仅展示分配给实时开发的数据源,若无目标数据源需前往 数据源中心 分配至实时开发产品。 4.根据数据源类型进行筛选,选择目标数据源接入至实时开发中。
创建实时采集任务
静态关系数据库中的数据通过实时采集任务可转化为Kafka内的数据流,从而进入Flinkx被消费。
1.在 数据开发 页面,点击创建任务 ,创建 实时采集 任务如下:
2.数据源类型选择 MySQLBinLog ,选择实时采集的数据源表,并配置相关参数
3.指向目标Kafka的某一Topic(后续FlinkSQL任务中会调用这一Topic)
4.根据需求配置速率,默认不修改
5.再次浏览所有配置内容,确认无误后点击 保存 即完成实时采集任务配置。
此时,实时采集任务已创建于开发环境,后续调用需提交至生产环境。