‘壹’ Ku:Spark sql操作Ku
摘要: Spark SQL , Ku
参考 https://github.com/xieenze/SparkOnKu/blob/master/src/main/scala/com/spark/test/KuCRUD.scala
引入 spark-core_2.11 , spark-sql_2.11 , ku-spark2_2.11 , hadoop-client 依赖包
指定 ku.master" , ku.table ,如果读取超时加入 ku.operation.timeout.ms 参数
或者
写入数据可以使用dataframe的 write 方法,也可以使用 kuContext 的 updateRows , insertRows , upsertRows , insertIgnoreRows 方法
直接调用dataframe的write方法指定 ku.master , ku.table ,只支持 append 模式,对已有key的数据自动更新
调用kuContext的 upsertRows 方法,效果和dataframe调用write append模式一样
调用kuContext insertRows , insertIgnoreRows 方法,如果插入的数据key已存在insertRows直接报错,insertIgnoreRows忽略已存在的key,只插入不存在的key
调用kuContext updateRows 方法,对已经存在的key数据做更新,如果key不存在直接报错
使用已有dataframe的schema建表
使用 StructType 自定义schema
删除表和判断表是否存在
‘贰’ 如何使用 Spark SQL
一、启动方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
注:/data/spark-1.4.0-bin-cdh4/为spark的安装路径
/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看启动选项
--master MASTER_URL 指定master url
--executor-memory MEM 每个executor的内存,默认为1G
--total-executor-cores NUM 所有executor的总核数
-e <quoted-query-string> 直接执行查询SQL
-f <filename> 以文件方式批量执行SQL
二、Spark sql对hive支持的功能
1、查询语句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作运算:
1) 关系运算:= ==, <>, <, >, >=, <=
2) 算术运算:+, -, *, /, %
3) 逻辑运算:AND, &&, OR, ||
4) 复杂的数据结构
5) 数学函数:(sign, ln, cos, etc)
6) 字符串函数:
3、 UDF
4、 UDAF
5、 用户定义的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查询: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分区表
12、 视图
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE
14、 支持的数据类型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT
三、Spark sql 在客户端编程方式进行查询数据
1、启动spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、编写程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有数据:df.show()
查看表结构:df.printSchema()
只看name列:df.select("name").show()
对数据运算:df.select(df("name"), df("age") + 1).show()
过滤数据:df.filter(df("age") > 21).show()
分组统计:df.groupBy("age").count().show()
1、查询txt数据
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件
val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查询结果数据
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)
四、Spark sql性能调优
缓存数据表:sqlContext.cacheTable("tableName")
取消缓存表:sqlContext.uncacheTable("tableName")
spark.sql.inMemoryColumnarStorage.compressedtrue当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险
‘叁’ Spark SQL CBO 基于代价的优化
Spark CBO 背景
本文将介绍 CBO,它充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划,即 SparkPlan。
Spark CBO 原理
CBO 原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。其核心在于评估一个给定的物理执行计划的代价。
物理执行计划是一个树状结构,其代价等于每个执行节点的代价总合,如下图所示。
而每个执行节点的代价,分为两个部分
每个操作算子的代价相对固定,可用规则来描述。而执行节点输出数据集的大小与分布,分为两个部分:1) 初始数据集,也即原始表,其数据集的大小与分布可直接通过统计得到;2)中间节点输出数据集的大小与分布可由其输入数据集的信息与操作本身的特点推算。
所以,最终主要需要解决两个问题
Statistics 收集
通过如下 SQL 语句,可计算出整个表的记录总数以及总大小
从如下示例中,Statistics 一行可见, customer 表数据总大小为 37026233 字节,即 35.3MB,总记录数为 28万,与事实相符。
通过如下 SQL 语句,可计算出指定列的统计信息
从如下示例可见,customer 表的 c_customer_sk 列最小值为 1, 最大值为 280000,null 值个数为 0,不同值个数为 274368,平均列长度为 8,最大列长度为 8。
除上述示例中的统计信息外,Spark CBO 还直接等高直方图。在上例中,histogram 为 NULL。其原因是,spark.sql.statistics.histogram.enabled 默认值为 false,也即 ANALYZE 时默认不计算及存储 histogram。
下例中,通过 SET spark.sql.statistics.histogram.enabled=true; 启用 histogram 后,完整的统计信息如下。
从上图可见,生成的 histogram 为 equal-height histogram,且高度为 1102.36,bin 数为 254。其中 bin 个数可由 spark.sql.statistics.histogram.numBins 配置。对于每个 bin,匀记录其最小值,最大值,以及 distinct count。
值得注意的是,这里的 distinct count 并不是精确值,而是通过 HyperLogLog 计算出来的近似值。使用 HyperLogLog 的原因有二
算子对数据集影响估计
对于中间算子,可以根据输入数据集的统计信息以及算子的特性,可以估算出输出数据集的统计结果。
本节以 Filter 为例说明算子对数据集的影响。
对于常见的 Column A < value B Filter,可通过如下方式估算输出中间结果的统计信息
上述估算的前提是,字段 A 数据均匀分布。但很多时候,数据分布并不均匀,且当数据倾斜严重是,上述估算误差较大。此时,可充分利用 histogram 进行更精确的估算
启用 Historgram 后,Filter Column A < value B 的估算方法为
在上图中,B.value = 15,A.min = 0,A.max = 32,bin 个数为 10。Filter 后 A.ndv = ndv(<B.value) = ndv(<15)。该值可根据 A < 15 的 5 个 bin 的 ndv 通过 HyperLogLog 合并而得,无须重新计算所有 A < 15 的数据。
算子代价估计
SQL 中常见的操作有 Selection(由 select 语句表示),Filter(由 where 语句表示)以及笛卡尔乘积(由 join 语句表示)。其中代价最高的是 join。
Spark SQL 的 CBO 通过如下方法估算 join 的代价
其中 rows 即记录行数代表了 CPU 代价,size 代表了 IO 代价。weight 由 *spark.sql.cbo.joinReorder.card.weight *决定,其默认值为 0.7。
Build侧选择
对于两表Hash Join,一般选择小表作为build size,构建哈希表,另一边作为 probe side。未开启 CBO 时,根据表原始数据大小选择 t2 作为build side
而开启 CBO 后,基于估计的代价选择 t1 作为 build side。更适合本例
优化 Join 类型
Spark SQL 中,Join 可分为 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代价相对较高。BroadcastJoin 无须 Join,但要求至少有一张表足够小,能通过 Spark 的 Broadcast 机制广播到每个 Executor 中。
在不开启 CBO 中,Spark SQL 通过 spark.sql.autoBroadcastJoinThreshold 判断是否启用 BroadcastJoin。其默认值为 10485760 即 10 MB。
并且该判断基于参与 Join 的表的原始大小。
在下图示例中,Table 1 大小为 1 TB,Table 2 大小为 20 GB,因此在对二者进行 join 时,由于二者都远大于自动 BroatcastJoin 的阈值,因此 Spark SQL 在未开启 CBO 时选用 SortMergeJoin 对二者进行 Join。
而开启 CBO 后,由于 Table 1 经过 Filter 1 后结果集大小为 500 GB,Table 2 经过 Filter 2 后结果集大小为 10 MB 低于自动 BroatcastJoin 阈值,因此 Spark SQL 选用 BroadcastJoin。
优化多表 Join 顺序
未开启 CBO 时,Spark SQL 按 SQL 中 join 顺序进行 Join。极端情况下,整个 Join 可能是 left-deep tree。在下图所示 TPC-DS Q25 中,多路 Join 存在如下问题,因此耗时 241 秒。
开启 CBO 后, Spark SQL 将执行计划优化如下
优化后的 Join 有如下优势,因此执行时间降至 71 秒
总结
5万人关注的大数据成神之路,不来了解一下吗?
5万人关注的大数据成神之路,真的不来了解一下吗?
5万人关注的大数据成神之路,确定真的不来了解一下吗?
‘肆’ 2019-03-05 SparkSQL集群性能调优 CheatSheet
0.买高性能机器,增加节点
1.设置磁盘文件预读值大小为16384,使用linux命令:
echo 16384 > /sys/block/{磁盘名}/queue/read_ahead_kb
2. Spark 任务序列化只支持JavaSerializer,数据序列化支持JavaSerializer和 KryoSerializer 。KryoSerializer能达到JavaSerializer的十倍。
3.在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:" -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ",如果频繁出现Full GC,需要优化GC。把RDD做Cache操作,通过日志查看RDD在内存中的大小,如果数据太大,需要改变RDD的存储级别来优化。
4.一般并行度设置为集群CPU总和的2-3倍
5.大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少了shuffle操作。
6. 合理设计DAG,减少shuffle //TODO
7.使用 mapPartitions 可以更灵活地操作数据,例如对一个很大的数据求TopN,当N不是很大时,可以先使用mapPartitions对每个partition求TopN,collect结果到本地之后再做排序取TopN。这样相比直接对全量数据做排序取TopN效率要高很多。
8.当之前的操作有很多filter时,使用 coalesce 减少空运行的任务数量
9.当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用 repartition 。
10.配置多个磁盘给 localDir ,shuffle时写入数据速度增快
11. 别collect大数据量,数据会回到driver端,容易OOM。非要collect,请配置 spark.sql.bigdata.thriftServer.useHdfsCollect 为true,会存在hdfs再读
12.尽量用receByKey,会在Map端做本地聚合
13. broadcase set/map而不是Iterator, set/map 查询效率O(1) ,iteratorO(n)
14. 数据发生倾斜,repartition大法 ,查出key,salt it
15.使用Hash Shuffle时,通过设置 spark.shuffle.consolidateFiles 为true,来合并shuffle中间文件,减少shuffle文件的数量,减少文件IO操作以提升性能
16.Spark SQL 小表join,把小表broadcast出去。配置 spark.sql.autoBroadcastJoinThreshold 和 spark.sql.bigdata.useExecutorBroadcast 。小表在join 右端。
17.SparkSQL数据倾斜,配置 spark.sql.planner.skewJoin 和 spark.sql.planner.skewJoin.threshold
18. SparkSQL 小文件,配置 spark.sql.small.file.combine 和 spark.sql.small.file.split.size