Flink分布式缓存Distrubuted Cache
- Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。
- 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。
官网介绍:Distributed Cache
使用方法
注册缓存文件
通过registerCachedFile方法注册
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();
通过配置参数注册
pipeline.cached-files: name:hdfsFile,path:hdfs:///path/to/your/file,executable:true;name:localExecFile,path:file:///path/to/exec/file
Flink中org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#configure()
方法会读取该配置,并注册配置中的文件。
获取缓存文件
// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {
@Override
public void open(Configuration config) {
// access cached file via RuntimeContext and DistributedCache
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
// read the file (or navigate the directory)
...
}
@Override
public Integer map(String value) throws Exception {
// use content of cached file
...
}
}
流程
Client
- org.apache.flink.client.program.OptimizerPlanEnvironment#getPipeline()中反射调用户main方法,将文件注册至userArtifacts中。
- 将userArtifacts中的文件设置到JobGraph中。
- client在org.apache.flink.client.program.rest.RestClusterClient#submitJob提交job时将注册的文件上传。
JobManager
- jobmanager收到提交的任务请求,开始处理userArtifacts文件。
- 将userArtifacts文件上传至blobServer。
- 获取返回的blobKey,并设置回JobGraph中
- 通过AkkaRpcActor将请求发送至taskmanager。
TaskManager
- AkkaRpcActor接收到jobmanager中的任务请求,创建Task任务。
- 创建一个线程,启动task任务。
- 根据RPC发送的配置信息开始下载userArtifacts文件,并缓存至distributedCacheEntries中。
至此,整个流程结束,用户通过getRuntimeContext().getDistributedCache().getFile("hdfsFile")方法获取,实质就是从缓存的distributedCacheEntries中获取对应的文件信息。