单表校验远程触发
本节主要说明 远程触发 的应用场景和实现原理。
一、远程触发的概念
由第三方系统通过接口调用的形式来触发数据质量校验任务执行
二、典型的场景
在自建的大数据平台上对某张表进行ETL任务处理,需要在ETL任务处理前, 对此表的数据质量进行校验,校验通过后再启动后续的计算任务 (防止错误数据流入下一个计算环节,同时可以节约计算资源)。
三、配置流程
- 在本平台配置表的数据质量检测任务,并在规则配置页面点击列表中的表名,在右侧面板中切换到远程触发标签。
- 点击新增远程触发,选择已配置的分区并勾选所需的规则,并点击生产远程调用API。
- 在远程触发列表中可查看触发接口和结果获取接口2个字段,即完成了远程触发接口的配置。
四、外部平台调用规则 远程触发模块需要由外部系统触发,此处使用POSTMAN作为工具来测试调用并获取结果,在POSTMAN中需新建2个接口,一个接口触发规则的运行,另一个接口获取质量校验的结果:
- 调用方式:POST
- 将远程触发的URL从数据质量中复制到POSTMAN
- 请求Body中,配置为Raw,并选择JSON格式;
- 触发规则接口、结果获取接口都做如上配置,并先后点击Send,即可在POSTMAN页面看到运行结果;
- 规则触发运行后,可在数据质量的任务查询页面看到新增一条记录,即刚刚触发运行的规则;
五、平台调用规则
- 新建Python类型的任务
- 用Python调用远程触发的接口,根据返回的值判断,是否将本任务置为失败
- 将此Python任务配置在多个任务的依赖流程中
- Python示例代码如下:
#coding=utf-8
import requests
import json
import time
# 任务状态
INIT_STATUS = "INIT"
RUNNING_STATUS = "RUNNING"
FAILED_STATUS = "FAILED"
PASS_STATUS = "PASS"
UN_PASS_STATUS = "UN_PASS"
# 请求间隔时间(秒)
SLEEP_TIME = 2
# 任务触发地址
trigger_url = "${dq.task.trigger_url}"
# 任务运行结果查询地址
result_url = "${dq.task.result_url}"
# 发送请求
def send_post(url):
try:
resp = requests.post(url)
except BaseException as e:
raise RuntimeError("发送请求到:\n" + url + "\n时发生异常:\n" + str(e))
else:
json_result = json.loads(resp.text)
if json_result["code"] == 1:
return json_result["data"]
else:
raise RuntimeError('接口:\n' + url + "\n调用失败:" + json_result["message"])
# 发送请求触发任务执行
send_post(trigger_url)
check_result = INIT_STATUS
# 任务处于初始状态和运行状态时,每隔 SLEEP_TIME 秒查询一次任务运行结果
while check_result == INIT_STATUS or check_result == RUNNING_STATUS:
time.sleep(SLEEP_TIME)
response = send_post(result_url)
check_result = response["status"]
if check_result == PASS_STATUS:
# 用户代码块
print("任务校验通过")
elif check_result == UN_PASS_STATUS:
# 用户代码块
raise RuntimeError("任务校验不通过")
elif check_result == FAILED_STATUS:
# 用户代码块
raise RuntimeError("任务运行失败")
PYTHONCopied!