Skip to main content

Flink分布式缓存Distrubuted Cache

  • Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。
  • 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。

官网介绍:Distributed Cache

使用方法

注册缓存文件

通过registerCachedFile方法注册

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

通过配置参数注册

pipeline.cached-files: name:hdfsFile,path:hdfs:///path/to/your/file,executable:true;name:localExecFile,path:file:///path/to/exec/file

Flink中org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#configure()方法会读取该配置,并注册配置中的文件。

registerFile

获取缓存文件

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {

@Override
public void open(Configuration config) {

// access cached file via RuntimeContext and DistributedCache
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
// read the file (or navigate the directory)
...
}

@Override
public Integer map(String value) throws Exception {
// use content of cached file
...
}
}

流程

Client

  • org.apache.flink.client.program.OptimizerPlanEnvironment#getPipeline()中反射调用户main方法,将文件注册至userArtifacts中。

registerFile1

  • 将userArtifacts中的文件设置到JobGraph中。

registerFile2

  • client在org.apache.flink.client.program.rest.RestClusterClient#submitJob提交job时将注册的文件上传。

registerFile3

JobManager

  • jobmanager收到提交的任务请求,开始处理userArtifacts文件。

jobmanager1

  • 将userArtifacts文件上传至blobServer。

jobmanager2

jobmanager3

  • 获取返回的blobKey,并设置回JobGraph中

jobmanager4

jobmanager5

  • 通过AkkaRpcActor将请求发送至taskmanager。

jobmanager6

TaskManager

  • AkkaRpcActor接收到jobmanager中的任务请求,创建Task任务。

taskmanager1

taskmanager2

  • 创建一个线程,启动task任务。

taskmanager3

taskmanager4

  • 根据RPC发送的配置信息开始下载userArtifacts文件,并缓存至distributedCacheEntries中。

taskmanager5

taskmanager6

至此,整个流程结束,用户通过getRuntimeContext().getDistributedCache().getFile("hdfsFile")方法获取,实质就是从缓存的distributedCacheEntries中获取对应的文件信息。