证券股票交易实践案例
场景描述
本文以金融行业为例,证券企业使用实时开发模块的实时采集、FlinkSQL功能将股票下单的数据通过实时采集任务采集到Kafka中,在通过FLinkSQL任务对数据进行清洗、去重、格式化等操作后输出,最后通过BI工具进行数据展示。 以下是该场景的详细描述:
数据来源: 股票交易数据来自多个交易所和金融信息提供商。数据以流的形式实时生成,包括交易ID、股票代号、交易时间、交易价格、交易数量和交易类型等信息。
数据处理流程:
ODS层(原始数据存储层): 接收原始的股票交易数据流。 存储原始数据到 Kafka 消息队列,以便后续处理。 数据未经处理,保留了最初的交易详情。
DWD层(数据仓库细节层): 从 Kafka 读取原始数据。 对数据进行清洗,例如去除无效或错误的记录。 去重和格式化,确保数据的准确性和一致性。 清洗后的数据继续存储,以便进行更深入的分析。
DWS层(数据仓库服务层): 对清洗后的数据进行轻度聚合。 计算每个股票代号在特定时间窗口内的平均交易价格和总交易量。 统计不同交易类型的数量。 聚合结果存储在新的数据流中,供更高层次的分析使用。
ADS层(应用数据服务层): 实时监控股票价格和交易量的变动。 计算价格变化和交易数量变化,为最终用户提供实时的市场洞察。 结果可用于实时数据仪表板、报警系统或其他业务应用。
数据输出: 将实时监控结果输出到 MySQL 数据库或其他存储系统。 数据可用于构建实时股票监控系统,提供给前端应用进行展示。
前置环境准备
- 数栈6.0版本
- FlinkSQL1.16版本
- Mysql5.7数据源、Kafka2.x数据源
登陆数栈平台至实时开发页面
步骤:
- 访问数栈开发平台
- 选择进入租户管理
- 新增租户
- 选择访问的租户
- 进入数栈平台选择控制台
- 控制台进入多集群管理
- 集群绑定新建的租户
- 返回首页选择实时开发产品
创建实时开发项目
创建项目
步骤:
- 进入实时开发首页
- 点击【创建项目】按钮
- 进入【创建项目】编辑页面输入必填项:项目标识、项目名称
- 点击「确定」按钮进行创建项目操作
- 创建成功返回实时开发首页能查看到新创建的项目
TODO: 创建项目需要管理员权限
进入实时开发项目
步骤:
- 点击新建的实时开发项目
- 进入数据开发页面
配置数据源
进入基础管理-数据源中心
步骤:
进入数据源中心的两种方式:
第一种-实时数据源页面-点击「数据源中心」按钮跳转
第二种-左侧数栈产品功能栏-基础服务-点击「公共管理」进行页面跳转
进入【数据源中心】首页
配置Mysql关系型数据库
步骤:
- 点击「新增数据源」按钮进入【新增数据源】页面
- 选择数据源:Mysql数据源,版本5x、8x(具体根据实际版本配置)
- 产品授权:全部项目(默认全部项目:支持租户下全部项目引入该数据源)
- 信息配置:编辑输入 数据源名称、JDBC URL地址、用户名、密码、数据预览功能默认开启
- 测试连通性:测试连通性通过后,点击「确定」按钮配置数据源成功
- 配置成功返回数据源中心首页查看Mysql数据源配置成功
配置Kafka消息队列
步骤:
- 点击「新增数据源」按钮进入【新增数据源】页面
- 选择数据源:Kafka数据源,版本2.x(具体根据实际版本配置)
- 产品授权:全部项目(默认全部项目:支持租户下全部项目引入该数据源)
- 信息配置:编辑输入 数据源名称、连接方式-Broker地址(支持ZK、Broker两种连接方式)、认证方式-无(Kafka支持开启认证例如:SASL、Kerberos)、数据预览功能默认开启
- 测试连通性:测试连通性通过后,点击「确定」按钮配置数据源成功
- 配置成功返回数据源中心首页查看Mysql数据源配置成功
实时开发项目引入数据源
步骤:
- 返回实时开发-进入【数据源】页面
- 点击「引入数据源」按钮引入Mysql、Kafka数据源,点击「确定」按钮
- 引入成功返回数据源首页查看数据源
创建ODS层实时采集任务采集Mysql 写入Kafka
解释:
在ODS层,我们通常存储从各个源系统直接导入的原始数据,不进行任何处理或只进行简单的清洗。当前实现通过Flink CDC采集从来源Mysql中通过Binlog二进制日志的方式进行采集,并以Json格式输出到Kafka Topic中。
创建Mysql来源表
步骤:
--采集Mysql来源表建表语句,定义来源表名称trades 建表语句:
CREATE TABLE trades (
trade_id INT NOT NULL AUTO_INCREMENT,
stock_symbol VARCHAR(255) NOT NULL,
trade_time TIMESTAMP(3) NOT NULL,
trade_price DOUBLE NOT NULL,
trade_volume INT NOT NULL,
trade_type VARCHAR(255) NOT NULL,
PRIMARY KEY (trade_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
字段含义:
- trade_id 是一个自增的整数字段,用于唯一标识每一条交易记录。它是表的主键。
- stock_symbol 是一个字符串字段,用于存储股票代号。
- trade_time 是一个时间戳字段,精度为3,表示交易发生的精确时间到毫秒。
- trade_price 是一个双精度浮点数字段,用于存储交易价格。
- trade_volume 是一个整数字段,用于存储交易数量。
- trade_type 是一个字符串字段,用于存储交易类型,只能是'BUY'或'SELL'。
创建Kafka目标Topic(ods_stock_trade)
步骤:
- 创建Kafka目标Topic方式:
- 实时湖仓-Topic管理创建Topic
- Topic管理:
- 进入-实时湖仓-Topic管理进入Topic管理首页(数据源中心配置完成Kafka数据源自动进入Topic管控)
- 点击「创建Topic」进入创建Topic页面
- 所属Kafka: 选择数据源中心配置的Kafka, Topic名称:ods_stock_trade、分区数:1、副本树:1、存储时间(h) :168 ![image-Topic管理创建Kafka topic.png](assets/images/Topic管理创建Kafka topic.png)
- 点击「确定」按钮添加Topic成功返回Topic管理页面
- Topic管理查询名:ods_stock_trade 的Topic
创建ODS层实时采集任务
步骤:
创建ODS层文件夹
创建实时采集任务
- 创建任务配置:
- 任务名称:Flinkx_Mysql_kafka
- 任务类型:实时采集
- 配置模式:向导模式
- 引擎版本:flink1.16(无法选择则移步至控制台先配置)
- 选择存储位置:DEMO_ODS层
- 创建任务配置:
配置实时采集任务来源表Mysql
- 配置Mysql来源表:
- 数据源类型:MySQL
- 任务类型:Binlog
- 数据还原:不勾选
- 数据源:选择数据源中心配置的Mysql数据源
- 是否分表:不分表
- 表:选择Mysql数据源新建的trades表
- 采集起点:从任务运行时开始
- 数据开发:insert、upsert、delete
- 格式转换:嵌套Json平铺
- 配置Mysql来源表:
TODO:下方案例必须MySQL binlog开启的条件下才能进行,binlog开启步骤参考如下链接:https://blog.csdn.net/king_kgh/article/details/74800513?Readlog
配置实时采集任务目标源Kafka
- 配置Kafka目标Topic
- 数据源:选择数据源中心配置的Kafka数据源
- Topic:选择Topic管理创建的ods_stock_trade
- 数据有序:关闭
- 配置Kafka目标Topic
配置实时采集任务通道控制配置
- 配置通道控制:
- 作业速率上线:不限制上传速率
- 作业读取并发数:1(不支持修改)
- 作业写入并发树:1(根据业务自行调整)
- 配置通道控制:
实时采集预览配置并保存任务
- 预览并保存实时采集任务
任务提交并运行
步骤:
- 实时任务提交调度
- 任务运维-运行实时采集任务
- 任务运维-选择Flinkx_Mysql_kafka点击「提交」按钮运行采集任务
创建DWD层FLinkSQL 任务读取Kafka写入Kafka
解释: DWD 层主要是对 ODS 层的数据进行清洗、去重、格式化等操作,保留所有历史和细节数据。当前对Mysql采集到Kafka Topic的数据进行二次的清洗等操作输出已经清洗完成的数据。
创建Kafka结果Topic(dwd_stock_trade)
步骤:
- 创建Kafka目标Topic方式:
- 实时湖仓-Topic管理创建Topic
- Topic管理:
- 进入-实时湖仓-Topic管理进入Topic管理首页(数据源中心配置完成Kafka数据源自动进入Topic管控)
- 点击「创建Topic」进入创建Topic页面
- 所属Kafka: 选择数据源中心配置的Kafka, Topic名称:dwd_stock_trade、分区数:1、副本树:1、存储时间(h) :168
- 点击「确定」按钮添加Topic成功返回Topic管理页面
- Topic管理查询名:dwd_stock_trade 的Topic
创建DWD层FLinkSQL任务
步骤:
创建DWD层文件夹
创建FlinkSQL任务
- 创建任务配置:
- 任务名称:FlinkSQL_kafka_kafka_DWD
- 任务类型:FlinkSQL
- 配置模式:向导模式
- 引擎版本:flink1.16(无法选择则移步至控制台先配置)
- 选择存储位置:DEMO_DWD层
配置Kafka来源表
-- 字段
after_trade_id INT
after_stock_symbol VARCHAR(255)
after_trade_time TIMESTAMP(3)
after_trade_price DOUBLE
after_trade_volume INT
after_trade_type VARCHAR(255)配置Kafka结果表
-- 字段
trade_id INT
stock_symbol VARCHAR(255)
trade_time TIMESTAMP(3)
trade_price DOUBLE
trade_volume INT
trade_type VARCHAR(255)
cleaned_status BOOLEAN配置FLinkSQL语句
-- SQL语句:
-- 清洗数据后插入DWD层,例如使用Flink SQL的INSERT INTO语句:
INSERT
INTO
DWD_Stock_Trade
SELECT
after_trade_id trade_id,-- 交易ID
after_stock_symbol stock_symbol,-- 股票代号
after_trade_time trade_time,-- 交易时间
after_trade_price trade_price, -- 交易价格
after_trade_volume as trade_volume,-- 交易数量
after_trade_type as trade_type, -- 交易类型,如'BUY'或'SELL'
CASE
WHEN after_trade_price > 0
AND after_trade_volume > 0 THEN TRUE
ELSE FALSE
END AS cleaned_status -- 清洗状态,是否通过清洗
FROM
ODS_Stock_Trade;
任务提交并运行
步骤:
FlinkSQL任务提交调度
任务运维-选择FlinkSQL_kafka_kafka_DWD点击「提交」按钮运行FlinkSQL任务
创建DWS层FLinkSQL 任务读取Kafka写入Kafka
解释: DWS 层是针对特定业务需求进行轻度聚合的数据层。当前的业务中我们需要得到聚合的平均交易价格和交易数量总和,通过FLink 窗口函数进行分钟级别的聚合操作,得到每分钟股票的平均交易价格和交易数量总和。
创建Kafka结果Topic(dws_stock_aggregate)
步骤:
- 创建Kafka目标Topic方式:
- 实时湖仓-Topic管理创建Topic
- Topic管理:
- 进入-实时湖仓-Topic管理进入Topic管理首页(数据源中心配置完成Kafka数据源自动进入Topic管控)
- 点击「创建Topic」进入创建Topic页面
- 所属Kafka: 选择数据源中心配置的Kafka, Topic名称:dws_stock_aggregate、分区数:1、副本树:1、存储时间(h) :168
- 点击「确定」按钮添加Topic成功返回Topic管理页面
- Topic管理查询名:dws_stock_aggregate 的Topic
创建DWS层FLinkSQL任务
步骤:
创建DWS层文件夹
创建FlinkSQL任务
- 创建任务配置:
- 任务名称:FlinkSQL_kafka_kafka_DWS
- 任务类型:FlinkSQL
- 配置模式:向导模式
- 引擎版本:flink1.16(无法选择则移步至控制台先配置)
- 选择存储位置:DEMO_DWS层
- 创建任务配置:
配置Kafka来源表
-- 字段:
trade_id INT
stock_symbol VARCHAR(255)
trade_time TIMESTAMP(3)
trade_price DOUBLE
trade_volume INT
trade_type VARCHAR(255)
cleaned_status BOOLEAN
注意点:
- 时间特征设置EvenTime(EventTime:按照流式数据本身包含的业务时间戳处理)
- 时间列选择字段trade_time(交易时间)
- 最大延迟时间设置1min
流计算事件时间文档: https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/sql/queries/joins/
配置Kafka结果表
-- 字段:
stock_symbol VARCHAR(255)
trade_time timestamp(3)
trade_price_avg DOUBLE
trade_volume_sum INT配置FLinkSQL语句
-- SQL语句:
-- 对DWD层数据进行聚合后插入DWS层,例如使用Flink SQL的INSERT INTO语句:
INSERT
INTO
DWS_Stock_Aggregate
SELECT
stock_symbol,-- 股票代号
CURRENT_ROW_TIMESTAMP() AS trade_time,
AVG(trade_price) as trade_price_avg,-- 平均交易价格
SUM(trade_volume) as trade_volume_sum -- 交易数量总和
FROM
DWD_Stock_Trade
GROUP BY
stock_symbol,TUMBLE(trade_time, INTERVAL '1' MINUTE);
####任务提交并运行
步骤:
- FlinkSQL任务提交调度
- 任务运维-选择FlinkSQL_kafka_kafka_DWS点击「提交」按钮运行FlinkSQL任务
创建ADS层FLinkSQL 任务读取Kafka写入Mysql
解释:
ADS 层是为最终的业务应用提供数据的层。通过DWS层获取的每分钟平均交易价格与交易数量总和相乘得到最后的每分钟股票价格总数指标。
创建Mysql结果表
步骤:
-- 采集Mysql来源表建表语句,定义来源表名称stock_monitor:
-- 建表语句
CREATE TABLE stock_monitor (
stock_symbol VARCHAR(10) NOT NULL, -- 股票代号,假设股票代号不超过10个字符
monitored_time TIMESTAMP(3) NOT NULL, -- 监控时间,精确到毫秒
price_total DOUBLE NOT NULL -- 价格总数
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; -- 使用InnoDB存储引擎和utf8mb4字符
创建ADS层FLinkSQL任务
步骤:
创建ADS层文件夹
创建FlinkSQL任务
- 创建任务配置:
- 任务名称:FlinkSQL_kafka_Mysql_ADS
- 任务类型:FlinkSQL
- 配置模式:向导模式
- 引擎版本:flink1.16(无法选择则移步至控制台先配置)
- 选择存储位置:DEMO_ADS层
- 创建任务配置:
配置Kafka来源表
-- 字段:
stock_symbol VARCHAR(255)
trade_price_avg DOUBLE
trade_volume_sum INT
trade_time TIMESTAMP(3)配置Mysql结果表
配置FLinkSQL语句
-- SQL语句:
-- desc 根据DWS层数据进行实时监控,并将结果插入ADS层,例如使用Flink SQL的INSERT INTO语句:
INSERT
INTO
ADS_Stock_Monitor
SELECT
stock_symbol,
trade_time AS monitored_time,
trade_price_avg * trade_volume_sum AS price_total
FROM
DWS_Stock_Aggregate;
任务提交并运行
步骤:
- FlinkSQL任务提交调度
- 任务运维-选择FlinkSQL_kafka_Mysql_ADS点击「提交」按钮运行FlinkSQL任务
运行任务验证数据正确性
任务运行数据写入后数据流转
步骤:
- 实时采集Mysql trades表插入语句
INSERT INTO trades VALUES (1, "AAPL", CURRENT_TIMESTAMP, 50.0, 100, "BUY");
INSERT INTO trades VALUES (2, "AAPL", CURRENT_TIMESTAMP, 100.0, 100, "BUY");
INSERT INTO trades VALUES (3, "AAPL", CURRENT_TIMESTAMP, 150.0, 100, "BUY");
INSERT INTO trades VALUES (4, "AAPL", CURRENT_TIMESTAMP, 150.0, 100, "BUY");
- 实时采集任务将Mysql trades表中数据采集输出kafka Topic(ods_stock_trade)
- FlinkSQL_kafka_kafka_DWD任务从原始数据表ODS_Stock_Trade中选择数据,对数据进行清洗检查确保交易价格和交易数量都是正数并将清洗后数据写入数据仓库的DWD_Stock_Trade表中
- FlinkSQL_kafka_kafka_DWS任务目的是对DWD_Stock_Trade Topic中的数据进行聚合,通过窗口函数计算每个股票代号每分钟的平均交易价格和交易数量总和,并将结果插入到DWS_Stock_Aggregate Topic中
- FlinkSQL_kafka_Mysql_ADS任务对DWS_Stock_Aggregate Topic中 trade_price_avg与trade_volume_sum相乘得到交易总额并插入Mysql
stock_monitor
表中
注意:
窗口函数配置的一分钟聚合一次,写入stock_monitor
时,需要重新插入一批数据后,上一次计算结果才会被写入成功,时间间隔1分钟。例如:
INSERT INTO trades VALUES (5, "AAPL", CURRENT_TIMESTAMP, 50.0, 100, "BUY");
- 任务运行数据输出
- Mysql 查询语句:select * from stock_monitor;
- 检查输出price_total数据正确
- Mysql 查询语句:select * from stock_monitor;