Skip to main content

PyFlink

基于Flink官方提供的Python API,开发人员可以在Flink上开发Python任务,更好地进行数据分析。

info

该功能仅旗舰版支持

开发环境准备

在平台开发、运行PyFlink任务前提是要准备Python环境。可以在【项目管理-PyFlink环境管理】模块上传准备好的环境文件。下文将以Python3.8为例,为您介绍如何准备虚拟环境。

  • 生成环境包

    1. 在本地准备/build/setup-pyflink-virtual-env-linux.sh脚本,其内容如下:
    set -e
    # 下载Python 3.8 Miniconda3-py38_4.11.0-Linux-x86_64.sh 脚本。
    wget "https://repo.anaconda.com/miniconda/Miniconda3-py38_4.11.0-Linux-x86_64.sh" -O "Miniconda3-py38_4.11.0-Linux-x86_64.sh"

    # 为Python 3.8 miniconda.sh脚本添加执行权限。
    chmod +x Miniconda3-py38_4.11.0-Linux-x86_64.sh

    # 创建Python的虚拟环境。
    ./Miniconda3-py38_4.11.0-Linux-x86_64.sh -b -p linux_venv

    # 激活Conda Python虚拟环境。
    source linux_venv/bin/activate ""

    # 安装PyFlink依赖。
    # update the PyFlink version if needed
    pip install "apache-flink==1.12.5"

    # 关闭Conda Python虚拟环境。
    conda deactivate

    # 删除缓存的包。
    rm -rf linux_venv/pkgs

    # 将准备好的Conda Python虚拟环境打包。
    zip -r linux_venv.zip linux_venv
    1. 在本地准备 linux_build.sh 脚本,其内容如下:
    #!/bin/bash
    set -e -x
    yum install -y zip wget

    cd /root/
    bash /build/setup-pyflink-virtual-env-linux.sh
    mv linux_venv.zip /build/
    1. 在CMD命令行,执行如下命令:
    docker run -it --rm -v $PWD:/build  -w /build quay.io/pypa/manylinux2014_x86_64 ./linux_build.sh
    1. 执行完该命令后,会生成一个名字为venv.zip的文件,即为Python 3.8的虚拟环境。
  • 上传环境包
    1. 登录实时平台后,进入项目管理 - PyFlink环境管理 菜单。
    2. 点击上传文件,上传刚刚生成的环境包venv.zip文件。
      • 平台上传文件大小的限制为200 MB,而Python虚拟环境的大小通常会超过该限制。因此,您需要修改配置文件中的相关参数大小
      • 打开平台配置文件: vim .../DTFront/tengine/conf/nginx.conf
      • 修改client_max_body_size参数的大小
    3. 创建PyFlink任务时,选择venv.zip文件。
    4. 通过任务数量,可查看当前环境被使用的任务列表。 没有任务引用该环境时,可删除虚拟环境。

开发任务

  • 任务开发

    • 新建PyFlink任务:直接在Web端编辑和维护Python代码(Flink1.16)

    pyflink-1.png

    • Python脚本
    import sys

    from pyflink.common import Row
    from pyflink.common.serialization import SimpleStringSchema
    from pyflink.common.typeinfo import Types
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors import FlinkKafkaConsumer


    def kafka_datastream_api_demo(a: str = 'default_a', b: str = 'default_b'):
    # 1. create a StreamExecutionEnvironment
    print(' a 参数为:', a)
    print(' b 参数为:', b)
    print(' datastream_api_demo 参数个数为:', len(sys.argv), '个参数。')
    print(' datastream_api_demo 参数列表:', str(sys.argv))
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    kafka_source = FlinkKafkaConsumer(
    topics='test1',
    deserialization_schema=SimpleStringSchema(),
    properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test1'})
    # setStartFromGroupOffsets(默认方法):从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置。
    # setStartFromEarliest() 或者 setStartFromLatest():从最早或者最新的记录开始消费,在这些模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。
    # setStartFromTimestamp(long):从指定的时间戳开始。对于每个分区,其时间戳大于或等于指定时间戳的记录将用作起始位置。如果一个分区的最新记录早于指定的时间戳,则只从最新记录读取该分区数据。在这种模式下,Kafka 中的已提交 offset 将被忽略,不会用作起始位置。
    kafka_source.set_start_from_earliest()
    ds = env.add_source(kafka_source)

    # 3. define the execution logic
    ds = ds.map(lambda a: Row( a.split(',')[1] , 1), output_type=Types.ROW([Types.STRING(), Types.INT()])) \
    .key_by(lambda a: a[0]) \
    .reduce(lambda a, b: Row(a[0], a[1] + b[1]))

    # 4. create sink and emit result to sink
    ds.print()

    # 5. execute the job
    env.execute('datastream_api_demo')


    if __name__ == '__main__':
    print('参数个数为:', len(sys.argv), '个参数。')
    print('参数列表:', str(sys.argv))
    if (len(sys.argv) >= 3) :
    kafka_datastream_api_demo(sys.argv[1],sys.argv[2])
    else:
    kafka_datastream_api_demo()
    • 环境配置:修改Python相关环境配置参数
    ## client 提交端的 python 环境路径
    python.client.executable=linux_venv/bin/python3
    ## taskmanager 端的 python 环境路径
    python.executable=linux_venv.zip/linux_venv/bin/python3

    pyflink-1.png

    • 任务运行:PyFlink任务提交调度运行

    • Kafka 输入:

    {"id":1,"order_id":1,"item_id":1,"behavior":"content_dasta","real_price":188,"channel_id":1,"name":"name_1","age":"1"}
    {"id":1,"user_id":1,"item_id":1,"behavior":"content_dasta","real_price":188,"channel_id":1,"name":"name_1","age":"1"}
    • 查看taskmaneger.stout:查看输出, 脚本中Json字段数量3个则取Kafka Json中, 对数据集进行处理,程序将数据集中的元素按照第二个部分进行分组,并对相同key的元素进行累加操作。

    pyflink-1.png

参数说明
操作模式资源上传:自己线下完成的Python API作业,必须为.py文件。
Web编辑:在实时开发平台的IDE中完成代码编辑。
Python环境选择在上一步中上传的Python环境
Python入参作业参数,在 python 里面可以通过 sys.argv[1] , sys.argv[2] 去使用。
附加依赖包如果您的Flink Python作业中使用了Java类,例如作业中使用了Connector或者Java自定义函数时,可以通过如下方式来指定Connector或者Java自定义函数的JAR包。
目前只支持传一个 jar 包, 如果要引用多个。需要自己打包多个 jar 为一个 jar,然后上传。这个 jar 会被添加到 classpath 里面去。
第三方Python包在python环境中未打包或者只是该任务需要使用的第三方Python包,可通过此处选择。 第三方Python包为.whl格式

任务管理

note

PyFlink任务必须在环境参数中维护两个python环境的路径,如下,具体路径请根据实际情况修改:

  • taskmanager 端的 python 环境路径

    python.executable=venv.zip/linux_venv/bin/python3 --venv.zip 是上述PyFlink环境zip包
  • client 提交端的 python 环境路径

    python.client.executable=linux_venv/bin/python3 --linux_venv 是上述虚拟环境路径