当前位置:首页 » 硬盘大全 » flink缓存变量
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

flink缓存变量

发布时间: 2023-01-26 09:20:44

1. Flink 指标(二)

通过 conf/flink-conf.yaml 文件配置一个或多个 Reporters 来暴露度量值给外部系统,这些 Reporter 将在作业和任务启动的时候实例化。

所有的 Reporter 配置至少需要配置 class 属性,还有一些允许配置记录间隔。下面是一些 Reporter 的配置实例:

包含 Reporter 的 jar 必须放到 /lib 文件夹,这样 Flink 就可以访问到这些 jar。
可以通过继承 org.apache.flink.metrics.reporter.MetricReporter 接口来实现自己的 Reporter,如果需要定期发送记录,需要继承 Scheled 接口。

下面是一些支持的 Reporter:

不需要添加额外的依赖就可以支持 JMX Reporter,默认是不激活的。

参数:

配置示例:

通过 JMX 公开的度量由域(domain)和键属性列表(key-properties)标识,这些属性一起构成对象名。

域始终以 org.apache.flink 开头,后跟一个通用的度量标识符。与通常的标识符不同,它不受作用域格式的影响,不包含任何变量,并且在跨作业时也是常量。例子: org.apache.flink.job.task.numbytesout 。

键属性列表包含与给定指标关联的所有变量的值,无论配置的作用域格式如何。例子: host=localhost,job_name=myjob,task_name=mytask 。

因此,域标识一个度量类,键属性列表标识该度量的一个(或多个)实例。

要使用此 Reporter,必须复制 /opt/flink-metrics-ganglia-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

配置示例:

要使用此 Reporter,必须复制 /opt/flink-metrics-graphite-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

配置示例:

要使用此 Reporter,必须复制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

配置示例:

Flink 度量类型映射到 Prometheus 度量类型,如下所示:

要使用此 Reporter,必须复制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

参数:

配置示例:

PrometheusPushGatewayReporter 将指标推送到 Pushgateway,可由 Prometheus 抓取。

要使用此 Reporter,必须复制 /opt/flink-metrics-statsd-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。
参数:

配置示例:

要使用此 Reporter,必须复制 /opt/flink-metrics-datadog-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。
Flink 指标,如任何变量 <host> , <job_name> , <tm_id> , <subtask_index> , <task_name> 和 <operator_name> ,将被发送到 Datadog 作为标签。标签看起来像 host:localhost 和 job_name:myjobname 。

参数:

配置示例:

要使用此 Reporter,必须复制 /opt/flink-metrics-slf4j-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夹下。

配置示例:

Flink 默认会收集当前状态的指标,下文的表格中包括以下5列:

请注意,“infix” 和 “Metrics” 列中所有的点根据 “metrics.delimiter” 设置变化。

因此,为了推断指标的标识符:

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html

2. 5 一文看完flink的内存管理

1)java对象的存储密度比较低,对象主要包含 对象头,对象数据,对齐填充。 其中对齐填充是没用的,纯粹是为了让对象的大小到达8的倍数

2)Full GC非常影响性能,对大数据量的计算来说,fullGC可能会持续很久(秒级甚至分钟级)

3)OOM导致JVM崩溃,因为是大数据计算,很有可能会分配出大的对象。

4)缓存未命中,CPU在进行计算时,会先从CPU的缓存中抓取数据,但是jvm堆上的内存不是连续的,会导致CPU缓存不命中,CPU空转,影响效率。

5)传输过程,要序列化和反序列化

Flink将对象存储在堆外内存中,或者存在 memorySegment上

memorySegment: 

1 翻译为内存段,存储序列化后的对象

2 它是一段固定长度的内存(大小为32KB)

3 是FLink中最小的内存分配单元

4 读写非常高效,很多算子可以直接操作其二进制数据,不需要反序列化

5 Java本身自带的序列化和反序列化的功能,但是辅助信息占用空间比较大,在序列化对象时记录了过多的类信息。

Flink实现了自己的序列化框架,使用TypeInformation表示每种数据类型,所以可以只保存一份对象Schema信息,节省存储空间。又因为对象类型固定,所以可以通过偏移量存取。

jobmanager.heap.size:1024m

jobmanager.memory.process.size:1600m

主要包含 堆内存和非堆内存,相对比较简单一些。

关于rocksDb内存管理:

由于rocksdb分配的是堆外内存,内存量理论上不受jvm控制。于是产生问题,如果进程的内存使用超过容器限定的量,就会被资源管理器杀死。

3. Flink的处理机制以及侧输出应用

Flink EventTime和Watermark
https://www.jianshu.com/p/5e735b63fb5b

Flink只要不用时间窗口函数,就是基于事件处理,对于事件驱动的任务,我们需要关心的点,尤其是存在shuffle和聚合的时候:
<1> 是否存在数据倾斜
<2> 是否会存在某些节点状态过大 (例如使用状态时,不配置过期时间,那么状态会一直缓存,就会导致内容一直增加,带来gc等问题)

基于窗口操作。

watermark机制,就可以理解为驱动flink基于事件时间处理的机制

一旦允许了元素滞后,那么滞后元素在截止时间前到达后也会触发计算并输出结果。
注意:滞后元素触发计算应该视为之前计算的更新结果,也即是相同计算会有多个结果。根据你的应用逻辑,要考虑是否会重复计算或者进行去重。

就是数据什么时候计算并落地。
Triggers定义了何时开始使用窗口计算函数计算窗口。每个WindowAssigner都会有一个默认的Trigger。如果,默认的Trigger不能满足你的需求,你可以指定一个自定义的trigger()。

抽象类Trigger:

onElement():进入窗口的每个元素都会调用该方法。
onEventTime():事件时间timer触发的时候被调用。
onProcessingTime():处理时间timer触发的时候会被调用。
onMerge():有状态的触发器相关,并在它们相应的窗口合并时合并两个触发器的状态,例如使用会话窗口。
clear():该方法主要是执行窗口的删除操作。

1).前三方法决定着如何通过返回一个TriggerResult来操作输入事件。
CONTINUE:什么都不做。
FIRE:触发计算。
PURE:清除窗口的元素。
FIRE_AND_PURE:触发计算和清除窗口元素。

2). 这些方法中的任何一个都可用于为将来的操作注册处理或事件时间计时器

所有基于事件时间的窗口分配器都用EventTimeTrigger作为默认触发器。该触发器会在watermark达到窗口的截止时间(window.maxTimestamp())时直接触发计算输出。

注意:GlobalWindows的默认触发器是NeverTrigger,这个是不会触发计算的。因此,你需要为GlobalWindows自己定义一个触发器。

注意:通过调用trigger()方法指定触发器,就可以覆盖掉默认的触发器。例如,如果把TumblingEventTimeWindows的触发器指定为CountTrigger,那么就不会在根据时间进度触发窗口计算了。如果想同时支持时间和计数进行计算触发,那么就需要你自定义触发器了。

trigger()方法====》
在DataStream调用过窗口函数后,返回WindowedStream,就可以调用trigger()方法,指定新的Trigger替代默认Trigger。

窗口生命周期
简单来说,窗口创建于属于该窗口的第一个元素到达,结束于事件时间或者处理时间达到了窗口的结尾时间加上用户自定义的允许延迟时间。

分流:使用split函数;
异常值捕获:(比如空值,字段缺失数据,异常大值等),滞后数据处理等可以用sideoutput。

场景举例:
比如现在有一篇文章吧,单词长度不一,但是我们想对单词长度小于5的单词进行wordcount操作,同时又想记录下来哪些单词的长度大于了5,那么我们该如何做呢?

普遍的做法是:
datastream.filter(word.length>=5); //获取不统计的单词,也即是单词长度大于等于5。

datastream.filter(word.length <5);// 获取需要进行wordcount的单词。

这样数据,然后每次筛选都要保留整个流,然后遍历整个流,显然很浪费性能,假如能够在一个流了多次输出就好了, flink的侧输出提供了这个功能,侧输出的输出(sideoutput)类型可以与主流不同,可以有多个侧输出(sideoutput),每个侧输出不同的类型。

在使用侧输出的时候需要先定义OutputTag
如:

OutputTag有两个构造函数,上面例子构造函数只有一个id参数,还有一个构造函数包括两个参数,id,TypeInformation信息。

要使用侧输出, 在处理数据的时候除了要定义相应类型的OutputTag外,还要使用特定的函数,主要是有四个
ProcessFunction、CoProcessFunction、
ProcessWindowFunction、ProcessAllWindowFunction
1.8之后添加了 KeyedProcessFunction

这里滞后数据的类型应该需要与DataStream中的类型保持一致。

4. Flink | Checkpoint 机制详解

一、Checkpoint 简介

Flink 的 Checkpoint 机制是其 可靠性 的基石。当一个任务在运行过程中出现故障时,可以根据 Checkpoint 的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。 在 Flink 中,Checkpoint 机制采用的是 chandy-lamport (分布式快照)算法,通过 Checkpoint 机制,保证了 Flink 程序内部的 Exactly Once 语义。

二、Checkpoint 机制流程详解

1. 任务启动

我们假设任务从 Kafka 的某个 Topic 中读取数据,该Topic 有 2 个 Partition,故任务的并行度为 2。根据读取到数据的奇偶性,将数据分发到两个 task 进行求和。

某一时刻,状态如下:

2.启动Checkpoint

JobManager 根据 Checkpoint 间隔时间,启动 Checkpoint。此时会给每个 Source 发送一个 barrier 消息,消息中的数值表示 Checkpoint 的序号,每次启动新的 Checkpoint 该值都会递增。

3. Source启动Checkpoint

当Source接收到barrier消息,会将当前的状态(Partition、Offset)保存到 StateBackend,然后向 JobManager 报告Checkpoint 完成。之后Source会将barrier消息广播给下游的每一个 task:

4.task 接收 barrier

当task接收到某个上游(如这里的Source1)发送来的barrier,会将该上游barrier之前的数据继续进行处理,而barrier之后发送来的消息不会进行处理,会被缓存起来。

之前对barrier的理解比较模糊,直到看到了下面这幅图。barrier的作用和这里 "欢迎光临" 牌子的作用类似,用于区分流中的数据属于哪一个 Checkpoint:

我们可以理解为:barrier之前的数据属于本次Checkpoint,barrier之后的数据属于下一次Checkpoint,所以下次Checkpoint的数据是不应该在本次Checkpoint过程中被计算的,因此会将数据进行缓存。

5.barrier对齐

如果某个task有多个上游输入,如这里的 sum_even 有两个 Source 源,当接收到其中一个 Source 的barrier后,会等待其他 Source 的 barrier 到来。在此期间,接收到 barrier 的 Source 发来的数据不会处理,只会缓存(如下图中的数据4)。而未接收到 barrier 的 Source 发来的数据依然会进行处理,直到接收到该Source 发来的 barrier,这个过程称为 barrier的对齐

barrier是否对齐决定了程序实现的是 Exactly Once 还是 At Least Once:

如果不进行barrier对齐,那么这里 sum_even 在接收 Source2 的 barrier 之前,对于接收到 Source1的 数据4 ,不会进行缓存,而是直接进行计算,sum_even 的状态改为12,当接收到 Source2 的barrier,会将 sum_even 的状态 sum=12 进行持久化。如果本次Checkpoint成功,在进行下次 Checkpoint 前任务崩溃,会根据本次Checkpoint进行恢复。此时状态如下:

从这里我们就可以看出, Source1的数据4被计算了两次 。因此,Exactly Once语义下,必须进行barrier的对齐,而 At Least Once语义下 barrier 可以不对齐。

注意:barrier对齐只会发生在多对一的Operator(如 join)或者一对多的Operator(如 reparation/shuffle)。如果是一对一的Operator,如map、flatMap 或 filter 等,则没有对齐这个概念,都会实现Exactly Once语义,即使程序中配置了At Least Once 。

6.处理缓存数据

当task接收到所有上游发送来的barrier,即可以认为当前task收到了本次 Checkpoint 的所有数据。之后 task 会将 barrier 继续发送给下游,然后处理缓存的数据,比如这里 sum_even 会处理 Source1 发送来的数据4. 而且,在这个过程中 Source 会 继续读取数据 发送给下游,并不会中断。

7.上报Checkpoint完成

当sink收到barrier后,会向JobManager上报本次Checkpoint完成。至此,本次Checkpoint结束,各阶段的状态均进行了持久化,可以用于后续的故障恢复。

5. Flink 分布式缓存原理及使用

在1.9.1版本中分布式缓存并未拷贝HDFS下的文件到TM,运行时抛出如下异常。

升级到1.10.1版本,能正常使用。借此,学习下Flink 分布式缓存相关知识。

官网对 distributed cache 的定义:

意思是通过Flink程序注册一个本地或者Hdfs文件,程序在运行时,Flink会自动将该文件拷贝到每个tm中,每个函数可以通过注册的名称获取该文件。

官网给出的使用案例:

参考flink1.10.1版本的源码,了解实现流程。

StreamGraphGenerator-->StreamGraph

4. yarnPerjob 模式部署jobGraph时,如果是本地文件则上传本地zip,返回该文件所在的hdfs路径。如果缓存文件为hdfs已存在路径,则直接写入配置文件。

6. flink流处理特点

flink的流处理特性:

7. Flink 原理详解

Flink 是一个流处理框架,支持流处理和批处理,特点是流处理有限,可容错,可扩展,高吞吐,低延迟。

流处理是处理一条,立马下一个节点会从缓存中取出,在下一个节点进行计算

批处理是只有处理一批完成后,才会经过网络传输到下一个节点

流处理的优点是低延迟 批处理的优点是高吞吐

flink同时支持两种,flink的网络传输是设计固定的缓存块为单位,用户可以设置缓存块的超时值来决定换存块什么时候进行传输。 数据大于0 进行处理就是流式处理。
如果设置为无限大就是批处理模型。

Flink 集群包括 JobManager 和 TaskManager .

JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包 等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。

TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要 部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。

flink on yarn 是由client 提交 app到 RM 上, 然后RM 分配一个 AppMaster负责运行 Flink JobManager 和 Yarn AppMaster, 然后 AppMaster 分配 容器去运行 Flink TaskManger

SparkStreaming 是将流处理分成微批处理的作业, 最后的处理引擎是spark job

Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块,Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,然后分批次提交job到集群中去运行,运行每个 job的过程和真正的spark 任务没有任何区别。

JobScheler, 负责 Job的调度通过定时器每隔一段时间根据Dstream的依赖关系生一个一个DAG图

ReceiverTracker负责数据的接收,管理和分配
ReceiverTracker在启动Receiver的时候他有ReceiverSupervisor,其实现是ReceiverSupervisorImpl, ReceiverSupervisor本身启 动的时候会启动Receiver,Receiver不断的接收数据,通过BlockGenerator将数据转换成Block。定时器会不断的把Block数据通会不断的把Block数据通过BlockManager或者WAL进行存储,数据存储之后ReceiverSupervisorlmpl会把存储后的数据的元数据Metadate汇报给ReceiverTracker,其实是汇报给ReceiverTracker中的RPC实体ReceiverTrackerEndpoin

spark on yarn 的cluster模式, Spark client 向RM提交job请求, RM会分配一个 AppMaster, driver 和 运行在AppMAster节点里, AM然后把Receiver作为一个Task提交给Spark Executor 节点, Receive启动接受数据,生成数据块,并通知Spark Appmaster, AM会根据数据块生成相应的Job, 并把Job 提交给空闲的 Executor 去执行。

1:需要关注流数据是否需要进行状态管理
2:At-least-once或者Exectly-once消息投递模式是否有特殊要求
3:对于小型独立的项目,并且需要低延迟的场景,建议使用storm
4:如果你的项目已经使用了spark,并且秒级别的实时处理可以满足需求的话,建议使用sparkStreaming
5:要求消息投递语义为 Exactly Once 的场景;数据量较大,要求高吞吐低延迟的场景;需要进行状态管理或窗口统计的场景,建议使用flink

Flink 提供的Api右 DataStream 和 DataSet ,他们都是不可变的数据集合,不可以增加删除中的元素, 通过 Source 创建 DataStream 和 DataSet

在创建运行时有:

Flink的每一个Operator称为一个任务, Operator 的每一个实例称为子任务,每一个任务在JVM线程中执行。可以将多个子任务链接成一个任务,减少上下文切换的开销,降低延迟。

source 和 算子map 如果是 one by one 的关系,他们的数据交换可以通过缓存而不是网络通信

TaskManager 为控制执行任务的数量,将计算资源划分多个slot,每个slot独享计算资源,这种静态分配利于任务资源隔离。

同一个任务可以共享一个slot, 不同作业不可以。

这里因为 Source 和 Map 并行度都是4 采用直连方式,他们的数据通信采用缓存形式

所以一共需要两个TaskManager source,Map 一个,rece一个, 每个TaskManager 要3个slot

JobManager 将 JobGraph 部署 ExecutionGraph

设置的并行度,可以让一个ExecJobVertex 对应 多个并行的ExecVertex 实例。

Flink通过状态机管理 ExecGraph的作业执行进度。

Flink 将对象序列化为固定数量的预先分配的内存段,而不是直接把对象放在堆内存上。
Flink TaskManager 是由几个内部组件组成的:actor 系统(负责与 Flink master 协调)、IOManager(负责将数据溢出到磁盘并将其读取回来)、MemoryManager(负责协调内存使用。

数据源:

Sink:

时间:

处理时间:取自Operator的机器系统时间

事件时间: 由数据源产生

进入时间: 被Source节点观察时的系统时间

如果数据源没有自己正确创建水印,程序必须自己生成水印来确保基于事件的时间窗口可以正常工作。。

DataStream 提供了 周期性水印,间歇式水印,和递增式水印

8. 哪位好心人能提供个最新flink视频学习教程,感谢

大数据教程flink从入门到精通
了解Flink,了解集群环境搭建运维,学习Flink中重要概念、原理和API的用法,通过知识点 + 案例教学法帮助小白快速掌握Flink。

课程内容:

1、Flink框架简介

2、Flink集群搭建运维

3、Flink Dataset开发

4、Flink 广播变量,分布式缓存,累加器

5、Flink Datastream开发

6、Flink Window操作

7、Flink watermark与侧道输出

8、Flink状态计算

9、Flink容错checkpoint与一致性语义

10、Flink进阶 异步IO,背压,内存管理

11、Flink Table API与SQL

9. Flink的类加载器解析

在运行 Flink 应用程序时,JVM 会随着时间的推移加载各种类。 这些类可以根据它们的来源分为三组:

作为一般规则,无论何时您先启动 Flink 进程然后再提交作业,作业的类都会动态加载。 如果 Flink 进程与作业/应用程序一起启动,或者如果应用程序产生 Flink 组件(JobManager、TaskManager 等),那么所有作业的类都在 Java 类路径中。

插件组件中的代码由每个插件的专用类加载器动态加载一次。

以下是有关不同部署模式的更多详细信息:

当作为独立会话启动 Flink 集群时,JobManagers 和 TaskManagers 使用 Java 类路径中的 Flink 框架类启动。 针对会话(通过 REST / CLI)提交的所有作业/应用程序中的类都是动态加载的。

Docker / Kubernetes 设置首先启动一组 JobManagers / TaskManagers,然后通过 REST 或 CLI 提交作业/应用程序,其行为类似于独立会话:Flink 的代码位于 Java 类路径中,插件组件和作业代码在启动时动态加载。

YARN 类加载在单个作业部署和会话之间有所不同:

当直接向 YARN 提交 Flink 作业/应用程序时(通过 bin/flink run -m yarn-cluster ...),将为该作业启动专用的 TaskManager 和 JobManager。 这些 JVM 在 Java 类路径中具有用户代码类。 这意味着在这种情况下,作业不涉及动态类加载。

当启动一个 YARN 会话时,JobManagers 和 TaskManagers 是用 classpath 中的 Flink 框架类启动的。 针对会话提交的所有作业的类都是动态加载的。

在涉及动态类加载的设置中(插件组件、会话设置中的 Flink 作业),通常有两个类加载器的层次结构:(1)Java 的应用程序类加载器,它包含类路径中的所有类,以及(2)动态插件/ 用户代码类加载器。 用于从插件或用户代码 jar 加载类。 动态 ClassLoader 将应用程序类加载器作为其父级。

默认情况下,Flink 反转类加载顺序,这意味着它首先查看动态类加载器,如果类不是动态加载代码的一部分,则仅查看父类(应用程序类加载器)。

反向类加载的好处是插件和作业可以使用与 Flink 核心本身不同的库版本,这在不同版本的库不兼容时非常有用。 该机制有助于避免常见的依赖冲突错误,如 IllegalAccessError 或 NoSuchMethodError。 代码的不同部分只是具有单独的类副本(Flink 的核心或其依赖项之一可以使用与用户代码或插件代码不同的副本)。 在大多数情况下,这运行良好,不需要用户进行额外配置。

但是,在某些情况下,反向类加载会导致问题(请参阅下文,“X cannot be cast to X”)。 对于用户代码类加载,您可以通过在 Flink 配置中通过 classloader.resolve-order 将 ClassLoader 解析顺序配置为 parent-first(从 Flink 的默认 child-first)来恢复到 Java 的默认模式。

请注意,某些类总是以父级优先的方式解析(首先通过父类加载器),因为它们在 Flink 的核心和插件/用户代码或面向插件/用户代码的 API 之间共享。 这些类的包是通过 classloader.parent-first-patterns-default 和 classloader.parent-first-patterns-additional 配置的。 要添加父级优先加载的新包,请设置 classloader.parent-first-patterns-additional 配置选项。

所有组件(JobManger、TaskManager、Client、ApplicationMaster 等)在启动时记录它们的类路径设置。 它们可以作为日志开头的环境信息的一部分找到。

当运行 JobManager 和 TaskManagers 专用于一项特定作业的设置时,可以将用户代码 JAR 文件直接放入 /lib 文件夹中,以确保它们是类路径的一部分而不是动态加载。

通常将作业的 JAR 文件放入 /lib 目录中。 JAR 将成为类路径(AppClassLoader)和动态类加载器(FlinkUserCodeClassLoader)的一部分。 因为 AppClassLoader 是 FlinkUserCodeClassLoader 的父级(并且 Java 加载父级,默认情况下),这应该导致类只加载一次。

对于无法将作业的 JAR 文件放入 /lib 文件夹的设置(例如因为安装程序是由多个作业使用的会话),仍然可以将公共库放入 /lib 文件夹,并避免动态为那些类进行加载。

在某些情况下,转换函数、源或接收器需要手动加载类(通过反射动态加载)。 为此,它需要能够访问作业类的类加载器。

在这种情况下,函数(或源或接收器)可以成为 RichFunction(例如 RichMapFunction 或 RichWindowFunction)并通过 getRuntimeContext().getUserCodeClassLoader() 访问用户代码类加载器。

在使用动态类加载的设置中,您可能会看到 com.foo.X cannot be cast to com.foo.X 样式中的异常。 这意味着 com.foo.X 类的多个版本已被不同的类加载器加载,并且该类的类型试图相互分配。

一个常见的原因是库与 Flink 的反向类加载方法不兼容。 您可以关闭反向类加载来验证这一点(在 Flink 配置中设置 classloader.resolve-order: parent-first)或从反向类加载中排除库(在 Flink 配置中设置 classloader.parent-first-patterns-additional)。

另一个原因可能是缓存对象实例,如 Apache Avro 之类的某些库或通过注册(例如通过 Guava 的 Interners)生成的对象实例。 这里的解决方案是要么在没有任何动态类加载的情况下进行设置,要么确保相应的库完全是动态加载代码的一部分。 后者意味着该库不能被添加到 Flink 的 /lib 文件夹中,而必须是应用程序的 fat-jar/uber-jar 的一部分

所有涉及动态用户代码类加载(会话)的场景都依赖于再次卸载类。 类卸载意味着垃圾收集器发现类中不存在任何对象,因此删除该类(代码、静态变量、元数据等)。

每当 TaskManager 启动(或重新启动)一个任务时,它将加载该特定任务的代码。 除非可以卸载类,否则这将成为内存泄漏,因为加载了新版本的类,并且加载的类总数会随着时间的推移而累积。 这通常通过 OutOfMemoryError: Metaspace 表现出来。

类泄漏的常见原因和建议的修复:

卸载动态加载类的一个有用工具是用户代码类加载器释放钩子。 这些是在卸载类加载器之前执行的钩子。 通常建议关闭和卸载资源作为常规函数生命周期的一部分(通常是 close() 方法)。 但在某些情况下(例如对于静态字段),最好在不再需要类加载器时卸载。

类加载器释放钩子可以通过 RuntimeContext.() 方法注册。

从应用程序开发人员的角度解决依赖冲突的一种方法是通过隐藏它们来避免暴露依赖关系。

Apache Maven 提供了 maven-shade-plugin,它允许在编译后更改类的包(因此您编写的代码不受阴影影响)。 例如,如果您的用户代码 jar 中有来自 aws sdk 的 com.amazonaws 包,则 shade 插件会将它们重新定位到 org.myorg.shaded.com.amazonaws 包中,以便您的代码调用您的 aws sdk 版本。

注意 Flink 的大部分依赖,比如 guava、netty、jackson 等,都被 Flink 的维护者屏蔽掉了,所以用户通常不用担心。

10. Flink内存管理

java所有数据类型对应的字节大小

java对象的组成 : 对象头,实例数据,对齐部分

jvm 序列化缺点

上面图为TaskManager内存模型,左边为细分的内存模型,右边为整体内存模型,该图摘自Flink官网

heap内存在jvm启动的时候申请的一块不变的内存区域,该内存实际上是Flink和task公用的一块区域,在flink层面通过控制来区分框架使用和task内存,heap内存管理起来是比较容易的,实际上non-heap的内存是难管理的一块,如果管理不当或者使用不当可能造成内存泄漏或者内存无限增长等问题

内存参数配置

在flink中对内存进行了抽象成了MemorySegment,�默认情况下,一个 MemorySegment 对应着一个 32KB 大小的内存块,这块内存既可以是堆上内存( byte数组) ,也可以是堆外内存(nio的ByteBufferr ) .

同时MemorySegment也提供了对二进制数据的操作方法,以及读取字节数组序列化以及序列化字节数组的方法等

下面是类继承图,该类有两MemorySegment实现类有两个分别为使用heap的以及混合的即有heap和non-heap,对于内存的访问有子类具体的实现

MemorySemgent是flink内存分配的最小单元了,对于数据夸MemorySemgent保存,那么对于上层的使用者来说,需要考虑考虑所有的细节,由于过于繁琐,所以在MemorySemgent上又抽象了一层内存也,内存也是在MemorySemgent数据访问上的视图,对数据输入和输出分别抽象为DataInputView/DataOutputView,有了这一层,上层使用者无需关心跨MemorySemgent的细节问题,内存也对自动处理跨MemorySemgent的内存操作

DataInputView

DataInputView继承DataInput,DataInputView是对MemorySemgent读取的抽象视图,提供一系列读取二进制数据不同类型的方法,AbstractPageInputView是DataInputView的一个抽象实现类,并且基本所有InputView都实现了该类,即所有实现该类的InputView都支持Page

InputView持有了多个MemorySemgent的引用(可以基于数组,list,deque等),这些MemorySemgent被视为一个内存页,可以顺序,随机等方式读取数据,要基于不同的实现类,实现类不同读取方式不同

方法图

DataOutputView

与DataInputView相对应,继承Output,并有一个拥有Page功能的抽象类(AbstractPagedOutputView),其大部outputView的实现都是继承自该抽象类,对一组MemorySemgent提供一个基于页的写入功能

方法图

类继承图

用于网络io数据的包装,每个buffer持有一个MemorySegment的引用,resultPartition写数据的时候,会向LocalBufferPool申请Buffer,会返回BufferBuilder,通过BufferBuilder想Buffe r<实际写入的是MemorySegment> 写数据

BufferBuilder是在上游Task中,负责想Buffer写入数据,BufferConsumer位于下游,与BufferBuilder相对应,用于消费Buffer的数据,每个bufferBuilder对应一个bufferConsumer

常用参数介绍

buffer申请

buffer回收

当buffer用完之后需要进行回收比如在netty的clientHandler收到响应之后进行处理就会把buffer回收掉,buffer回收之后并不会释放memorySegment,而是放回池中,变为可用内存,反复使用

flink托管的内存,托管内存使用堆外内存,用于批处理缓存排序等以及提供rocksDB内存

NetworkBufferPool是一个固定大小的MemorySegment实例吃,用于网络栈中,NettyBufferPool会为每个ResultPartition创建属于自己的LocalBufferPool,NettyBufferPool会作为全局的pool来提供内存,LocalBufferPool会通过限制来控制自己内存的申请,防止过多申请

LocalBufferPool继承关系,实现了bufferRecycler的接口,用于回收自己持有的buffer

在数据接收的时候会将数据封装成NettyBuffer,在数据发送的时候会通过BufferBilder向MemorySegment写入数据,然后通过BufferConsumer读取MemorySegment的数据

BufferManager主要用于为RemoteInputChannel提供buffer的,bufferManager在启动的时候会向全局bufferPool请求自己的独有buffer,当bufferManager的buffer不够的时候,则会向localBufferPool请求buffer,此时请求的buffer为浮动buffer

实际上提供的buffer是