‘壹’ 可能是全网最详细的 Spark sql Aggregate 源码剖析
纵观 Spark Sql 源码,聚合的实现是其中较为复杂的部分,本文希望能以例子结合流程图的方式来说清楚整个过程。这里仅关注 Aggregate 在物理执行计划相关的内容,之前的 parse、analyze 及 optimize 阶段暂不做分析。在 Spark Sql 中,有一个专门的 Aggregation strategy 用来处理聚合,我们先来看看这个策略。
本文暂不讨论 distinct Aggregate 的实现(有兴趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我们来看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理执行计划的
创建聚合分为两个阶段:
AggregateExpression 共有以下几种 mode:
Q:是否支持使用 hash based agg 是如何判断的?
摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd
为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为
这样就能进入 HashAggregateExec 的分支
构造函数主要工作就是对 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 进行了初始化
在 enable code gen 的情况下,会调用 HashAggregateExec#inputRDDs 来生成 RDD,为了分析 HashAggregateExec 是如何生成 RDD 的,我们设置 spark.sql.codegen.wholeStage 为 false 来 disable code gen,这样就会调用 HashAggregateExec#doExecute 来生成 RDD,如下:
可以看到,关键的部分就是根据 child.execute() 生成的 RDD 的每一个 partition 的迭代器转化生成一个新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各个 partition。由于 TungstenAggregationIterator 涉及内容非常多,我们单开一大节来进行介绍。
此迭代器:
注:UnsafeKVExternalSorter 的实现可以参考:
UnsafeRow 是 InternalRow(表示一行记录) 的 unsafe 实现,由原始内存(byte array)而不是 Java 对象支持,由三个区域组成:
使用 UnsafeRow 的收益:
构造函数的主要流程已在上图中说明,需要注意的是:当内存不足时(毕竟每个 grouping 对应的 agg buffer 直接占用内存,如果 grouping 非常多,或者 agg buffer 较大,容易出现内存用尽)会从 hash based aggregate 切换为 sort based aggregate(会 spill 数据到磁盘),后文会进行详述。先来看看最关键的 processInputs 方法的实现
上图中,需要注意的是:hashMap 中 get 一个 groupingKey 对应的 agg buffer 时,若已经存在该 buffer 则直接返回;若不存在,尝试申请内存新建一个:
上图中,用于真正处理一条 row 的 AggregationIterator#processRow 还需进一步展开分析。在此之前,我们先来看看 AggregateFunction 的分类
AggregateFunction 可以分为 DeclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。
DeclarativeAggregate 是一类直接由 Catalyst 中的 Expressions 构成的聚合函数,主要逻辑通过调用 4 个表达式完成,分别是:
我们再次以容易理解的 Count 来举例说明:
通常来讲,实现一个基于 Expressions 的 DeclarativeAggregate 函数包含以下几个重要的组成部分:
再来看看 AggregationIterator#processRow
AggregationIterator#processRow 会调用
生成用于处理一行数据(row)的函数
说白了 processRow 生成了函数才是直接用来接受一条 input row 来更新对应的 agg buffer,具体是根据 mode 及 aggExpression 中的 aggFunction 的类型调用其 updateExpressions 或 mergeExpressions 方法:
比如,对于 aggFunction 为 DeclarativeAggregate 类型的 Partial 下的 Count 来说就是调用其 updateExpressions 方法,即:
对于 Final 的 Count 来说就是调用其 mergeExpressions 方法,即:
对于 aggFunction 为 ImperativeAggregate 类型的 Partial 下的 Collect 来说就是调用其 update 方法,即:
对于 Final 的 Collect 来说就是调用其 merge 方法,即:
我们都知道,读取一个迭代器的数据,是要不断调用 hasNext 方法进行 check 是否还有数据,当该方法返回 true 的时候再调用 next 方法取得下一条数据。所以要知道如何读取 TungstenAggregationIterator 的数据,就得分析其这两个方法。
分为两种情况,分别是:
Agg 的实现确实复杂,本文虽然篇幅已经很长,但还有很多方面没有 cover 到,但基本最核心、最复杂的点都详细介绍了,如果对于未 cover 的部分有兴趣,请自行阅读源码进行分析~
‘贰’ Spark Sql 源码剖析(二): TreeNode
使用 object CurrentOrigin 为 TreeNodes 提供一个可以查找上下文的地方,比如当前正在解析哪行 code。
object CurrentOrigin 主要包含一个 private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 仅在 parser 中使用,在 visit 每个节点的时候都会使用,记录当前 parse 的节点是哪行哪列
另外,从 value 是 ThreadLocal 类型可以看出,在 Spark SQL 中,parse sql 时都是在单独的 thread 里进行的(不同的 sql 不同的 thread)
返回该节点的 seq of children,children 是不可变的。有三种情况:
查找第一个符合 f 条件(比如某个类型的)的 TreeNode,先序遍历。
将函数 f 递归应用于节点及其子节点
与 foreach 不同的是,foreach 先应用于 parent,再应用与 child;而 foreachUp 是先应用于 child 再应用与 parent
调用 foreach,foreach 中应用的函数是 ret += f(_) ,最终返回一个 seq,包含将 f 通过 foreach 方式应用于所有节点并 add 到 ret。其中 f 本身是 BaseType => A 类型
原理与 map 一致,只是 f 变成了 BaseType => TraversableOnce[A]
PartialFunction#lift :将 partial func 转换为一个返回 Option 结果的函数。将 pf 函数应用于符合 pf 定义的节点(即 pf.lift(node)返回的 Option 不是 None )并都 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回
以 Seq 的形式返回 tree 的所有叶子节点
def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] :注意,因为可能没有符合 pf 定义的节点,所有返回的 Option 可能是 None
相当于 proctIterator.map(f).toArray ,即对于 proctIterator 每个元素执行 f 然后将 ret 组成一个 arr 返回
注意:TreeNode 没有实现 Proct 相关方法,都由其子类自行实现
使用 new children 替换并返回该节点的拷贝。该方法会对 proctElement 每个元素进行模式匹配,根据节点类型及一定规则进行替换。
调用 transformDown
rule: PartialFunction[BaseType, BaseType]
返回 f 应用于所有子节点(非递归,一般将递归操作放在调用该函数的地方)后该节点的 。其内部的原理是调用 mapProctIterator,对每一个 proctElement(i) 进行各种模式匹配,若能匹配上某个再根据一定规则进行转换,核心匹配转换如下:
以上都是适用于有 children 的 node,如果是 children 为 null 的 node 直接返回
反射生成节点副本
返回该类型 TreeNode 的 name,默认为 class name;注意,会移除物理操作的 Exec$ 前缀
所有应该以该节点内嵌套树表示的 nodes,比如,可以被用来表示 sub-queries
(children ++ innerChildren).toSet[TreeNode[_]]
主要用于交互式 debug,返回该 tree 指定下标的节点,num 可以在 numberedTreeString 找到。最终调用的
我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻: https://cloud.tencent.com/developer/support-plan?invite_code=x2lzoxh4s5hi
‘叁’ 大数据专业主要学什么
大数据专业
全称:数据科学与大数据技术,强调交叉学科特点,以大数据分析为核心,以统计学、计算机科学和数学为三大基础支撑性学科,培养面向多层次应用需求的复合型人才。
开设课程:
数学分析、高等代数、普通物理数学与信息科学概论、数据结构、数据科学导论、程序设计导论、程序设计实践、离散数学、概率与统计、算法分析与设计、数据计算智能、数据库系统概论、计算机系统基础、并行体系结构与编程、非结构化大数据分析等。
‘肆’ 大数据分析应该掌握哪些基础知识
Java基础语法
· 分支结构if/switch
· 循环结构for/while/do while
· 方法声明和调用
· 方法重载
· 数组的使用
· 命令行参数、可变参数
IDEA
· IDEA常用设置、常用快捷键
· 自定义模板
· 关联Tomcat
· Web项目案例实操
面向对象编程
· 封装、继承、多态、构造器、包
· 异常处理机制
· 抽象类、接口、内部类
· 常有基础API、集合List/Set/Map
· 泛型、线程的创建和启动
· 深入集合源码分析、常见数据结构解析
· 线程的安全、同步和通信、IO流体系
· 反射、类的加载机制、网络编程
Java8/9/10/11新特性
· Lambda表达式、方法引用
· 构造器引用、StreamAPI
· jShell(JShell)命令
· 接口的私有方法、Optional加强
· 局部变量的类型推断
· 更简化的编译运行程序等
MySQL
· DML语言、DDL语言、DCL语言
· 分组查询、Join查询、子查询、Union查询、函数
· 流程控制语句、事务的特点、事务的隔离级别等
JDBC
· 使用JDBC完成数据库增删改查操作
· 批处理的操作
· 数据库连接池的原理及应用
· 常见数据库连接池C3P0、DBCP、Druid等
Maven
· Maven环境搭建
· 本地仓库&中央仓库
· 创建Web工程
· 自动部署
· 持续继承
· 持续部署
Linux
· VI/VIM编辑器
· 系统管理操作&远程登录
· 常用命令
· 软件包管理&企业真题
Shell编程
· 自定义变量与特殊变量
· 运算符
· 条件判断
· 流程控制
· 系统函数&自定义函数
· 常用工具命令
· 面试真题
Hadoop
· Hadoop生态介绍
· Hadoop运行模式
· 源码编译
· HDFS文件系统底层详解
· DN&NN工作机制
· HDFS的API操作
· MapRece框架原理
· 数据压缩
· Yarn工作机制
· MapRece案例详解
· Hadoop参数调优
· HDFS存储多目录
· 多磁盘数据均衡
· LZO压缩
· Hadoop基准测试
Zookeeper
· Zookeeper数据结果
· 内部原理
· 选举机制
· Stat结构体
· 监听器
· 分布式安装部署
· API操作
· 实战案例
· 面试真题
· 启动停止脚本
HA+新特性
· HDFS-HA集群配置
Hive
· Hive架构原理
· 安装部署
· 远程连接
· 常见命令及基本数据类型
· DML数据操作
· 查询语句
· Join&排序
· 分桶&函数
· 压缩&存储
· 企业级调优
· 实战案例
· 面试真题
Flume
· Flume架构
· Agent内部原理
· 事务
· 安装部署
· 实战案例
· 自定义Source
· 自定义Sink
· Ganglia监控
Kafka
· 消息队列
· Kafka架构
· 集群部署
· 命令行操作
· 工作流程分析
· 分区分配策略
· 数据写入流程
· 存储策略
· 高阶API
· 低级API
· 拦截器
· 监控
· 高可靠性存储
· 数据可靠性和持久性保证
· ISR机制
· Kafka压测
· 机器数量计算
· 分区数计算
· 启动停止脚本
DataX
· 安装
· 原理
· 数据一致性
· 空值处理
· LZO压缩处理
Scala
· Scala基础入门
· 函数式编程
· 数据结构
· 面向对象编程
· 模式匹配
· 高阶函数
· 特质
· 注解&类型参数
· 隐式转换
· 高级类型
· 案例实操
Spark Core
· 安装部署
· RDD概述
· 编程模型
· 持久化&检查点机制
· DAG
· 算子详解
· RDD编程进阶
· 累加器&广播变量
Spark SQL
· SparkSQL
· DataFrame
· DataSet
· 自定义UDF&UDAF函数
Spark Streaming
· SparkStreaming
· 背压机制原理
· Receiver和Direct模式原理
· Window原理及案例实操
· 7x24 不间断运行&性能考量
Spark内核&优化
· 内核源码详解
· 优化详解
Hbase
· Hbase原理及架构
· 数据读写流程
· API使用
· 与Hive和Sqoop集成
· 企业级调优
Presto
· Presto的安装部署
· 使用Presto执行数仓项目的即席查询模块
Ranger2.0
· 权限管理工具Ranger的安装和使用
Azkaban3.0
· 任务调度工具Azkaban3.0的安装部署
· 使用Azkaban进行项目任务调度,实现电话邮件报警
Kylin3.0
· Kylin的安装部署
· Kylin核心思想
· 使用Kylin对接数据源构建模型
Atlas2.0
· 元数据管理工具Atlas的安装部署
Zabbix
· 集群监控工具Zabbix的安装部署
DolphinScheler
· 任务调度工具DolphinScheler的安装部署
· 实现数仓项目任务的自动化调度、配置邮件报警
Superset
· 使用SuperSet对数仓项目的计算结果进行可视化展示
Echarts
· 使用Echarts对数仓项目的计算结果进行可视化展示
Redis
· Redis安装部署
· 五大数据类型
· 总体配置
· 持久化
· 事务
· 发布订阅
· 主从复制
Canal
· 使用Canal实时监控MySQL数据变化采集至实时项目
Flink
· 运行时架构
· 数据源Source
· Window API
· Water Mark
· 状态编程
· CEP复杂事件处理
Flink SQL
· Flink SQL和Table API详细解读
Flink 内核
· Flink内核源码讲解
· 经典面试题讲解
Git&GitHub
· 安装配置
· 本地库搭建
· 基本操作
· 工作流
· 集中式
ClickHouse
· ClickHouse的安装部署
· 读写机制
· 数据类型
· 执行引擎
DataV
· 使用DataV对实时项目需求计算结果进行可视化展示
sugar
· 结合Springboot对接网络sugar实现数据可视化大屏展示
Maxwell
· 使用Maxwell实时监控MySQL数据变化采集至实时项目
ElasticSearch
· ElasticSearch索引基本操作、案例实操
Kibana
· 通过Kibana配置可视化分析
Springboot
· 利用Springboot开发可视化接口程序
‘伍’ 《深入理解SPARK核心思想与源码分析》epub下载在线阅读,求百度网盘云资源
《深入理解SPARK》(耿嘉安)电子书网盘下载免费在线阅读
资源链接:
链接:https://pan..com/s/1x42N8QDPGv5-KPMDdZ4krA
书名:深入理解SPARK
作者:耿嘉安
豆瓣评分:7.2
出版社:机械工业出版社
出版年份:2016-1-1
页数:469
内容简介:
《深入理解SPARK:核心思想与源码分析》结合大量图和示例,对Spark的架构、部署模式和工作模块的设计理念、实现源码与使用技巧进行了深入的剖析与解读。
《深入理解SPARK:核心思想与源码分析》一书对Spark1.2.0版本的源代码进行了全面而深入的分析,旨在为Spark的优化、定制和扩展提供原理性的指导。阿里巴巴集团专家鼎力推荐、阿里巴巴资深Java开发和大数据专家撰写。
本书分为三篇:
准备篇(第1~2章),介绍了Spark的环境搭建、设计理念与基本架构,帮助读者了解一些背景知识。
核心设计篇(第3~7章),着重讲解SparkContext的初始化、存储体系、任务提交与执行、计算引擎及部署模式的原理和源码分析。通过这部分的内容,读者可以通过源码剖析更加深入理解Spark的核心设计与实现,以便在实际使用中能够快速解决线上问题并对性能进行调优。
扩展篇(第8~11章),主要讲解基于Spark核心的各种扩展及应用,包括SQL处理引擎、Hive处理、流式计算框架Spark Streaming、图计算框架GraphX、机器学习库MLlib等内容。通过阅读这部分内容,读者可以扩展实际项目中对Spark的应用场景,让Spark焕发活力。
作者简介:
耿嘉安,10年IT行业相关经验。就职于阿里巴巴商家业务事业部,任资深Java工程师,专注于开源和大数据领域,目前与小伙伴们基于ODPS构建阿里的大数据商业解决方案——御膳房。在大量的工作实践中,对J2EE、JVM、Tomcat、Spring、Hadoop、Spark、MySQL、Redis都有深入研究,尤其喜欢剖析开源项目的源码实现。早期从事J2EE企业级应用开发,对Java相关技术有独到见解。业余时间喜欢研究中国古代历史,古诗词,旅游,足球等。
‘陆’ 《大数据Spark企业级实战》pdf下载在线阅读全文,求百度网盘云资源
《大数据Spark企业级实战》网络网盘pdf最新全集下载:
链接:https://pan..com/s/1ZKawITVbG7MADTW0Q-b4jw
简介:《大数据Spark企业级实战》详细解析了企业级Spark开发所需的几乎所有技术内容,涵盖Spark的架构设计、Spark的集群搭建、Spark内核的解析、Spark SQL、MLLib、GraphX、Spark Streaming、Tachyon、SparkR、Spark多语言编程
‘柒’ Spark SQL(十):Hive On Spark
Hive是目前大数据领域,事实上的SQL标准。其底层默认是基于MapRece实现的,但是由于MapRece速度实在比较慢,因此这几年,陆续出来了新的SQL查询引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。
Spark SQL与Hive On Spark是不一样的。Spark SQL是Spark自己研发出来的针对各种数据源,包括Hive、JSON、Parquet、JDBC、RDD等都可以执行查询的,一套基于Spark计算引擎的查询引擎。因此它是Spark的一个项目,只不过提供了针对Hive执行查询的工功能而已,适合在一些使用Spark技术栈的大数据应用类系统中使用。
而Hive On Spark,是Hive的一个项目,它是将Spark作为底层的查询引擎(不通过MapRece作为唯一的查询引擎)。Hive On Spark,只适用于Hive,在可预见的未来,很有可能Hive默认的底层引擎就从MapRece切换为Spark了;适合于将原有的Hive数据仓库以及数据统计分析替换为Spark引擎,作为全公司通用的大数据统计分析引擎。
Hive On Spark做了一些优化:
1、Map Join
Spark SQL默认对join是支持使用broadcast机制将小表广播到各个节点上,以进行join的。但是问题是,这会给Driver和Worker带来很大的内存开销。因为广播的数据要一直保留在Driver内存中。所以目前采取的是,类似乎MapRece的Distributed Cache机制,即提高HDFS replica factor的复制因子,以让数据在每个计算节点上都有一个备份,从而可以在本地进行数据读取。
2、Cache Table
对于某些需要对一张表执行多次操作的场景,Hive On Spark内部做了优化,即将要多次操作的表cache到内存中,以便于提升性能。但是这里要注意,并不是对所有的情况都会自动进行cache。所以说,Hive On Spark还有很多不完善的地方。
Hive QL语句 =>
语法分析 => AST =>
生成逻辑执行计划 => Operator Tree =>
优化逻辑执行计划 => Optimized Operator Tree =>
生成物理执行计划 => Task Tree =>
优化物理执行计划 => Optimized Task Tree =>
执行优化后的Optimized Task Tree
‘捌’ 2022-02-26-Spark-45(性能调优通用SQL调优)
RDD 的核心痛点是优化空间有限,它指的是 RDD 高阶算子中封装的函数对于 Spark 来说完全透明,因此 Spark 对于计算逻辑的优化无从下手。相比 RDD,DataFrame 是携带 Schema 的分布式数据集,只能封装结构化数据。DataFrame 的算子大多数都是普通的标量函数,以消费数据列为主。但是,DataFrame 更弱的表示能力和表达能力,反而为 Spark 引擎的内核优化打开了全新的空间。根据 DataFrame 简单的标量算子和明确的 Schema 定义,借助 Catalyst 优化器和 Tungsten,Spark SQL 有能力在运行时,构建起一套端到端的优化机制。这套机制运用启发式的规则与策略和运行时的执行信息,将原本次优、甚至是低效的查询计划转换为高效的执行计划,从而提升端到端的执行性能
这里的 Cache 指的就是我们常说的分布式数据缓存。想要对数据进行缓存,你可以调用 DataFrame 的.cache 或.persist,或是在 SQL 语句中使用“cache table”关键字。Cache Manager 其实很简单,它的主要职责是维护与缓存有关的信息。具体来说,Cache Manager 维护了一个 Mapping 映射字典,字典的 Key 是逻辑计划,Value 是对应的 Cache 元信息。当 Catalyst 尝试对逻辑计划做优化时,会先尝试对 Cache Manager 查找,看看当前的逻辑计划或是逻辑计划分支,是否已经被记录在 Cache Manager 的字典里。如果在字典中可以查到当前计划或是分支,Catalyst 就用 InMemoryRelation 节点来替换整个计划或是计划的一部分,从而充分利用已有的缓存数据做优化。
从 Spark Plan 到 Physical Plan 的转换,需要几组叫做 Preparation Rules 的规则。这些规则坚守最后一班岗,负责生成 Physical Plan。那么,这些规则都是什么,它们都做了哪些事情呢?
AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。
DemoteBroadcastHashJoin 规则的作用,是把 Shuffle Joins 降级为 Broadcast Joins。需要注意的是,这个规则仅适用于 Shuffle Sort Merge Join 这种关联机制,其他机制如 Shuffle Hash Join、Shuffle Nested Loop Join 都不支持。
在 Rece 阶段,当 Rece Task 从全网把数据分片拉回,AQE 按照分区编号的顺序,依次把小于目标尺寸的分区合并在一起
在 Rece 阶段,当 Rece Task 所需处理的分区尺寸大于一定阈值时,利用 OptimizeSkewedJoin 策略,AQE 会把大分区拆成多个小分区。
相比于谓词下推,分区剪裁往往能更好地提升磁盘访问的 I/O 效率。谓词下推操作往往是根据文件注脚中的统计信息完成对文件的过滤,过滤效果取决于文件中内容的“纯度”。分区剪裁则不同,它的分区表可以把包含不同内容的文件,隔离到不同的文件系统目录下。这样一来,包含分区键的过滤条件能够以文件系统目录为粒度对磁盘文件进行过滤,从而大幅提升磁盘访问的 I/O 效率。
动态分区剪裁运作的背后逻辑,是把维度表中的过滤条件,通过关联关系传导到事实表,来完成事实表的优化。在数据关联的场景中,开发者要想利用好动态分区剪裁特性,需要注意 3 点:
NLJ 是采用“嵌套循环”的方式来实现关联的。也就是说,NLJ 会使用内、外两个嵌套的 for 循环依次扫描外表和内表中的数据记录,判断关联条件是否满足
SMJ 的思路是先排序、再归并。具体来说,就是参与 Join 的两张表先分别按照 Join Key 做升序排序。然后,SMJ 会使用两个独立的游标对排好序的两张表完成归并关联。
先用 Hash Key 取代 Join Keys,再清除内表冗余数据。Hash Key 实际上是 Join Keys 拼接之后的哈希值。既然存在哈希运算,我们就必须要考虑哈希冲突的问题。
AQE 允许 Spark SQL 在运行时动态地调整 Join 策略。我们刚好可以利用这个特性,把最初制定的 SMJ 策略转化为 BHJ 策略
当参与 Join 的两张表尺寸相差悬殊且小表数据分布均匀的时候,SHJ 往往比 SMJ 的执行效率更高。这种情况下,我们不妨使用 Join Hints 来强制 Spark SQL 去选择 SHJ 策略进行关联计算
先把一个复杂任务拆解成多个简单任务,再合并多个简单任务的计算结果。首先,我们要根据两张表的尺寸大小区分出外表和内表。一般来说,内表是尺寸较小的那一方。然后,我们人为地在内表上添加过滤条件,把内表划分为多个不重复的完整子集。接着,我们让外表依次与这些子集做关联,得到部分计算结果。最后,再用 Union 操作把所有的部分结果合并到一起,得到完整的计算结果,这就是端到端的关联计算。
“分而治之”中一个关键的环节就是内表拆分,我们要求每一个子表的尺寸相对均匀,且都小到可以放进广播变量。拆分的关键在于拆分列的选取,为了让子表足够小,拆分列的基数(Cardinality)要足够大才行。
对于外表参与的每一个子关联,在逻辑上,我们完全可以只扫描那些与内表子表相关的外表数据,并不需要每次都扫描外表的全量数据。
有了 AQE 的自动倾斜处理特性,在应对数据倾斜问题的时候,我们确实能够大幅节省开发成本。不过,天下没有免费的午餐,AQE 的倾斜处理是以 Task 为粒度的,这意味着原本 Executors 之间的负载倾斜并没有得到根本改善
“两阶段 Shuffle”
“两阶段 Shuffle”指的是,通过“加盐、Shuffle、关联、聚合”与“去盐化、Shuffle、聚合”这两个阶段的计算过程,在不破坏原有关联关系的前提下,在集群范围内以 Executors 为粒度平衡计算负载 。