Skip to main content

单表校验远程触发

本节主要说明 远程触发 的应用场景和实现原理。

一、远程触发的概念

由第三方系统通过接口调用的形式来触发数据质量校验任务执行

二、典型的场景

在自建的大数据平台上对某张表进行ETL任务处理,需要在ETL任务处理前, 对此表的数据质量进行校验,校验通过后再启动后续的计算任务 (防止错误数据流入下一个计算环节,同时可以节约计算资源)。

三、配置流程

  1. 在本平台配置表的数据质量检测任务,并在规则配置页面点击列表中的表名,在右侧面板中切换到远程触发标签。
  2. 点击新增远程触发,选择已配置的分区并勾选所需的规则,并点击生产远程调用API
  3. 在远程触发列表中可查看触发接口结果获取接口2个字段,即完成了远程触发接口的配置。

四、外部平台调用规则 远程触发模块需要由外部系统触发,此处使用POSTMAN作为工具来测试调用并获取结果,在POSTMAN中需新建2个接口,一个接口触发规则的运行,另一个接口获取质量校验的结果:

  1. 调用方式:POST
  2. 将远程触发的URL从数据质量中复制到POSTMAN
  3. 请求Body中,配置为Raw,并选择JSON格式;
  4. 触发规则接口、结果获取接口都做如上配置,并先后点击Send,即可在POSTMAN页面看到运行结果;
  5. 规则触发运行后,可在数据质量的任务查询页面看到新增一条记录,即刚刚触发运行的规则;

Postman.png

五、平台调用规则

  1. 新建Python类型的任务
  2. 用Python调用远程触发的接口,根据返回的值判断,是否将本任务置为失败
  3. 将此Python任务配置在多个任务的依赖流程中
  4. Python示例代码如下:
#coding=utf-8
import requests
import json
import time

# 任务状态
INIT_STATUS = "INIT"
RUNNING_STATUS = "RUNNING"
FAILED_STATUS = "FAILED"
PASS_STATUS = "PASS"
UN_PASS_STATUS = "UN_PASS"

# 请求间隔时间(秒)
SLEEP_TIME = 2

# 任务触发地址
trigger_url = "${dq.task.trigger_url}"
# 任务运行结果查询地址
result_url = "${dq.task.result_url}"


# 发送请求
def send_post(url):
try:
resp = requests.post(url)
except BaseException as e:
raise RuntimeError("发送请求到:\n" + url + "\n时发生异常:\n" + str(e))
else:
json_result = json.loads(resp.text)
if json_result["code"] == 1:
return json_result["data"]
else:
raise RuntimeError('接口:\n' + url + "\n调用失败:" + json_result["message"])


# 发送请求触发任务执行
send_post(trigger_url)

check_result = INIT_STATUS

# 任务处于初始状态和运行状态时,每隔 SLEEP_TIME 秒查询一次任务运行结果
while check_result == INIT_STATUS or check_result == RUNNING_STATUS:
time.sleep(SLEEP_TIME)
response = send_post(result_url)
check_result = response["status"]
if check_result == PASS_STATUS:
# 用户代码块
print("任务校验通过")
elif check_result == UN_PASS_STATUS:
# 用户代码块
raise RuntimeError("任务校验不通过")
elif check_result == FAILED_STATUS:
# 用户代码块
raise RuntimeError("任务运行失败")
PYTHONCopied!