当前位置:首页 » 编程语言 » sparksql实例
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

sparksql实例

发布时间: 2023-02-07 05:01:33

㈠ Ku:Spark sql操作Ku

摘要: Spark SQL , Ku

参考 https://github.com/xieenze/SparkOnKu/blob/master/src/main/scala/com/spark/test/KuCRUD.scala

引入 spark-core_2.11 , spark-sql_2.11 , ku-spark2_2.11 , hadoop-client 依赖包

指定 ku.master" , ku.table ,如果读取超时加入 ku.operation.timeout.ms 参数

或者

写入数据可以使用dataframe的 write 方法,也可以使用 kuContext 的 updateRows , insertRows , upsertRows , insertIgnoreRows 方法

直接调用dataframe的write方法指定 ku.master , ku.table ,只支持 append 模式,对已有key的数据自动更新

调用kuContext的 upsertRows 方法,效果和dataframe调用write append模式一样

调用kuContext insertRows , insertIgnoreRows 方法,如果插入的数据key已存在insertRows直接报错,insertIgnoreRows忽略已存在的key,只插入不存在的key

调用kuContext updateRows 方法,对已经存在的key数据做更新,如果key不存在直接报错

使用已有dataframe的schema建表

使用 StructType 自定义schema

删除表和判断表是否存在

㈡ Spark SQL(十):Hive On Spark

Hive是目前大数据领域,事实上的SQL标准。其底层默认是基于MapRece实现的,但是由于MapRece速度实在比较慢,因此这几年,陆续出来了新的SQL查询引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。

Spark SQL与Hive On Spark是不一样的。Spark SQL是Spark自己研发出来的针对各种数据源,包括Hive、JSON、Parquet、JDBC、RDD等都可以执行查询的,一套基于Spark计算引擎的查询引擎。因此它是Spark的一个项目,只不过提供了针对Hive执行查询的工功能而已,适合在一些使用Spark技术栈的大数据应用类系统中使用。

而Hive On Spark,是Hive的一个项目,它是将Spark作为底层的查询引擎(不通过MapRece作为唯一的查询引擎)。Hive On Spark,只适用于Hive,在可预见的未来,很有可能Hive默认的底层引擎就从MapRece切换为Spark了;适合于将原有的Hive数据仓库以及数据统计分析替换为Spark引擎,作为全公司通用的大数据统计分析引擎。

Hive On Spark做了一些优化:
1、Map Join
Spark SQL默认对join是支持使用broadcast机制将小表广播到各个节点上,以进行join的。但是问题是,这会给Driver和Worker带来很大的内存开销。因为广播的数据要一直保留在Driver内存中。所以目前采取的是,类似乎MapRece的Distributed Cache机制,即提高HDFS replica factor的复制因子,以让数据在每个计算节点上都有一个备份,从而可以在本地进行数据读取。

2、Cache Table
对于某些需要对一张表执行多次操作的场景,Hive On Spark内部做了优化,即将要多次操作的表cache到内存中,以便于提升性能。但是这里要注意,并不是对所有的情况都会自动进行cache。所以说,Hive On Spark还有很多不完善的地方。

Hive QL语句 =>
语法分析 => AST =>
生成逻辑执行计划 => Operator Tree =>
优化逻辑执行计划 => Optimized Operator Tree =>
生成物理执行计划 => Task Tree =>
优化物理执行计划 => Optimized Task Tree =>
执行优化后的Optimized Task Tree

㈢ 如何使用 Spark SQL

一、启动方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2

注:/data/spark-1.4.0-bin-cdh4/为spark的安装路径

/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看启动选项

--master MASTER_URL 指定master url
--executor-memory MEM 每个executor的内存,默认为1G
--total-executor-cores NUM 所有executor的总核数
-e <quoted-query-string> 直接执行查询SQL

-f <filename> 以文件方式批量执行SQL

二、Spark sql对hive支持的功能

1、查询语句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作运算:
1) 关系运算:= ==, <>, <, >, >=, <=
2) 算术运算:+, -, *, /, %
3) 逻辑运算:AND, &&, OR, ||
4) 复杂的数据结构
5) 数学函数:(sign, ln, cos, etc)
6) 字符串函数:
3、 UDF
4、 UDAF

5、 用户定义的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查询: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分区表
12、 视图
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE

14、 支持的数据类型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT

三、Spark sql 在客户端编程方式进行查询数据
1、启动spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、编写程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有数据:df.show()
查看表结构:df.printSchema()
只看name列:df.select("name").show()
对数据运算:df.select(df("name"), df("age") + 1).show()
过滤数据:df.filter(df("age") > 21).show()

分组统计:df.groupBy("age").count().show()

1、查询txt数据
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件

val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查询结果数据
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)

四、Spark sql性能调优

缓存数据表:sqlContext.cacheTable("tableName")

取消缓存表:sqlContext.uncacheTable("tableName")

spark.sql.inMemoryColumnarStorage.compressedtrue当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险

㈣ Spark Sql 函数使用

round -  保留数据精度 

如 round(col("col1"),0) 对应数值为 21.23 -> 21.0 ;21.73 -> 22.0

如 round(col("col1"),1) 对应数值为 21.23 -> 21.2

如 round(col("col1"),-1) 对应数值为 21.23 -> 20.0

㈤ AWS Glue中使用Spark SQL

AWS Glue 是一项完全托管的提取、转换和加载 (ETL) 服务,让客户能够轻松准备和加载数据进行分析。您只需在 AWS 管理控制台中单击几次,即可创建并运行 ETL 作业。您只需将 AWS Glue 指向存储在 AWS 上的数据,AWS Glue 便会发现您的数据,并将关联的元数据(例如表定义和架构)存储到 AWS Glue 数据目录中。存入目录后,您的数据可立即供 ETL 搜索、查询和使用。

Glue提供了DynamicFrame来操作数据,但如果用户习惯用Spark SQL来做ETL,那是否可行呢? 本文就做了一个尝试:

首先我们创建一个基本的Glue Job,选择Spark,这里要注意在Job parameters里面加上

--enable-glue-datacatalog = true

这是为了在Spark SQL中使用Glue的元数据。

之后其他步骤都随意选择,进入脚本编辑环境,将脚本替换成如下:

这里做了一个简单的insert overwrite操作,从表testdata1中选择数据到表table_6。

尝试运行Job,等待7-8分钟后就可以看到任务完成了。此时去检查table_6的数据,已经有了。

㈥ spark sql 2.3 源码解读 - Execute (7)

终于到了最后一步执行了:

最关键的两个函数便是 doPrepare和 doExecute了。

还是以上一章的sql语句为例,其最终生成的sparkplan为:

看一下SortExec的doPrepare 和 doExecute方法:

下面看child也就是ShuffleExchangeExec:

先看没有exchangeCoordinator的情况,

首先执行:

上面的方法会返回一个ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它决定了每一条InternalRow shuffle后的partition id:

接下来:

返回结果是ShuffledRowRDD:

CoalescedPartitioner的逻辑:

再看有exchangeCoordinator的情况:

同样返回的是ShuffledRowRDD:

再看doEstimationIfNecessary:

estimatePartitionStartIndices 函数得到了 partitionStartIndices:

有exchangeCoordinator的情况就生成了partitionStartIndices,从而对分区进行了调整。

最后来一个例子:

未开启exchangeCoordinator的plan:

开启exchangeCoordinator的plan:

不同之处是 两个Exchange都带了coordinator,且都是同一个coordinator。

执行withExchangeCoordinator前:

执行withExchangeCoordinator后:

生成了coordinator,且执行了 doPrepare后,可以看到两个exchange都向其注册了。

doExecute后:

原先的numPartitions是200,经过执行后,生成的partitionStartIndices为[1],也就是只有1个partition,显然在测试数据量很小的情况下,1个partition是更为合理的。这就是ExchangeCoordinator的功劳。

execute 最终的输出是rdd,剩下的结果便是spark对rdd的运算了。其实 spark sql 最终的目标便也是生成rdd,交给spark core来运算。

spark sql的介绍到这里就结束了。

㈦ spark从hive数据仓库中读取的数据可以使用sparksql进行查询吗

1、为了让Spark能够连接到Hive的原有数据仓库,我们需要将Hive中的hive-site.xml文件拷贝到Spark的conf目录下,这样就可以通过这个配置文件找到Hive的元数据以及数据存放。
在这里由于我的Spark是自动安装和部署的,因此需要知道CDH将hive-site.xml放在哪里。经过摸索。该文件默认所在的路径是:/etc/hive/conf 下。
同理,spark的conf也是在/etc/spark/conf。
此时,如上所述,将对应的hive-site.xml拷贝到spark/conf目录下即可
如果Hive的元数据存放在Mysql中,我们还需要准备好Mysql相关驱动,比如:mysql-connector-java-5.1.22-bin.jar。
2、编写测试代码
val conf=new SparkConf().setAppName("Spark-Hive").setMaster("local")
val sc=new SparkContext(conf)

//create hivecontext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ") //这里需要注意数据的间隔符

sqlContext.sql("LOAD DATA INPATH '/user/liujiyu/spark/kv1.txt' INTO TABLE src ");

sqlContext.sql(" SELECT * FROM jn1").collect().foreach(println)

sc.stop()

3、下面列举一下出现的问题:
(1)如果没有将hive-site.xml拷贝到spark/conf目录下,会出现:

分析:从错误提示上面就知道,spark无法知道hive的元数据的位置,所以就无法实例化对应的client。
解决的办法就是必须将hive-site.xml拷贝到spark/conf目录下
(2)测试代码中没有加sc.stop会出现如下错误:
ERROR scheler.LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
在代码最后一行添加sc.stop()解决了该问题。

㈧ Spark SQL CBO 基于代价的优化

Spark CBO 背景

本文将介绍 CBO,它充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划,即 SparkPlan。

Spark CBO 原理

CBO 原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。其核心在于评估一个给定的物理执行计划的代价。

物理执行计划是一个树状结构,其代价等于每个执行节点的代价总合,如下图所示。

而每个执行节点的代价,分为两个部分

每个操作算子的代价相对固定,可用规则来描述。而执行节点输出数据集的大小与分布,分为两个部分:1) 初始数据集,也即原始表,其数据集的大小与分布可直接通过统计得到;2)中间节点输出数据集的大小与分布可由其输入数据集的信息与操作本身的特点推算。

所以,最终主要需要解决两个问题

Statistics 收集

通过如下 SQL 语句,可计算出整个表的记录总数以及总大小

从如下示例中,Statistics 一行可见, customer 表数据总大小为 37026233 字节,即 35.3MB,总记录数为 28万,与事实相符。

通过如下 SQL 语句,可计算出指定列的统计信息

从如下示例可见,customer 表的 c_customer_sk 列最小值为 1, 最大值为 280000,null 值个数为 0,不同值个数为 274368,平均列长度为 8,最大列长度为 8。

除上述示例中的统计信息外,Spark CBO 还直接等高直方图。在上例中,histogram 为 NULL。其原因是,spark.sql.statistics.histogram.enabled 默认值为 false,也即 ANALYZE 时默认不计算及存储 histogram。

下例中,通过 SET spark.sql.statistics.histogram.enabled=true; 启用 histogram 后,完整的统计信息如下。

从上图可见,生成的 histogram 为 equal-height histogram,且高度为 1102.36,bin 数为 254。其中 bin 个数可由 spark.sql.statistics.histogram.numBins 配置。对于每个 bin,匀记录其最小值,最大值,以及 distinct count。

值得注意的是,这里的 distinct count 并不是精确值,而是通过 HyperLogLog 计算出来的近似值。使用 HyperLogLog 的原因有二

算子对数据集影响估计

对于中间算子,可以根据输入数据集的统计信息以及算子的特性,可以估算出输出数据集的统计结果。

本节以 Filter 为例说明算子对数据集的影响。

对于常见的 Column A < value B Filter,可通过如下方式估算输出中间结果的统计信息

上述估算的前提是,字段 A 数据均匀分布。但很多时候,数据分布并不均匀,且当数据倾斜严重是,上述估算误差较大。此时,可充分利用 histogram 进行更精确的估算

启用 Historgram 后,Filter Column A < value B 的估算方法为

在上图中,B.value = 15,A.min = 0,A.max = 32,bin 个数为 10。Filter 后 A.ndv = ndv(<B.value) = ndv(<15)。该值可根据 A < 15 的 5 个 bin 的 ndv 通过 HyperLogLog 合并而得,无须重新计算所有 A < 15 的数据。

算子代价估计

SQL 中常见的操作有 Selection(由 select 语句表示),Filter(由 where 语句表示)以及笛卡尔乘积(由 join 语句表示)。其中代价最高的是 join。

Spark SQL 的 CBO 通过如下方法估算 join 的代价

其中 rows 即记录行数代表了 CPU 代价,size 代表了 IO 代价。weight 由 *spark.sql.cbo.joinReorder.card.weight *决定,其默认值为 0.7。

Build侧选择

对于两表Hash Join,一般选择小表作为build size,构建哈希表,另一边作为 probe side。未开启 CBO 时,根据表原始数据大小选择 t2 作为build side

而开启 CBO 后,基于估计的代价选择 t1 作为 build side。更适合本例

优化 Join 类型

Spark SQL 中,Join 可分为 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代价相对较高。BroadcastJoin 无须 Join,但要求至少有一张表足够小,能通过 Spark 的 Broadcast 机制广播到每个 Executor 中。

在不开启 CBO 中,Spark SQL 通过 spark.sql.autoBroadcastJoinThreshold 判断是否启用 BroadcastJoin。其默认值为 10485760 即 10 MB。

并且该判断基于参与 Join 的表的原始大小。

在下图示例中,Table 1 大小为 1 TB,Table 2 大小为 20 GB,因此在对二者进行 join 时,由于二者都远大于自动 BroatcastJoin 的阈值,因此 Spark SQL 在未开启 CBO 时选用 SortMergeJoin 对二者进行 Join。

而开启 CBO 后,由于 Table 1 经过 Filter 1 后结果集大小为 500 GB,Table 2 经过 Filter 2 后结果集大小为 10 MB 低于自动 BroatcastJoin 阈值,因此 Spark SQL 选用 BroadcastJoin。

优化多表 Join 顺序

未开启 CBO 时,Spark SQL 按 SQL 中 join 顺序进行 Join。极端情况下,整个 Join 可能是 left-deep tree。在下图所示 TPC-DS Q25 中,多路 Join 存在如下问题,因此耗时 241 秒。

开启 CBO 后, Spark SQL 将执行计划优化如下

优化后的 Join 有如下优势,因此执行时间降至 71 秒

总结

5万人关注的大数据成神之路,不来了解一下吗?

5万人关注的大数据成神之路,真的不来了解一下吗?

5万人关注的大数据成神之路,确定真的不来了解一下吗?

㈨ 可能是全网最详细的 Spark Sql Aggregate 源码剖析

纵观 Spark Sql 源码,聚合的实现是其中较为复杂的部分,本文希望能以例子结合流程图的方式来说清楚整个过程。这里仅关注 Aggregate 在物理执行计划相关的内容,之前的 parse、analyze 及 optimize 阶段暂不做分析。在 Spark Sql 中,有一个专门的 Aggregation strategy 用来处理聚合,我们先来看看这个策略。

本文暂不讨论 distinct Aggregate 的实现(有兴趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我们来看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理执行计划的

创建聚合分为两个阶段:

AggregateExpression 共有以下几种 mode:

Q:是否支持使用 hash based agg 是如何判断的?

摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd

为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为

这样就能进入 HashAggregateExec 的分支

构造函数主要工作就是对 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 进行了初始化

在 enable code gen 的情况下,会调用 HashAggregateExec#inputRDDs 来生成 RDD,为了分析 HashAggregateExec 是如何生成 RDD 的,我们设置 spark.sql.codegen.wholeStage 为 false 来 disable code gen,这样就会调用 HashAggregateExec#doExecute 来生成 RDD,如下:

可以看到,关键的部分就是根据 child.execute() 生成的 RDD 的每一个 partition 的迭代器转化生成一个新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各个 partition。由于 TungstenAggregationIterator 涉及内容非常多,我们单开一大节来进行介绍。

此迭代器:

注:UnsafeKVExternalSorter 的实现可以参考:

UnsafeRow 是 InternalRow(表示一行记录) 的 unsafe 实现,由原始内存(byte array)而不是 Java 对象支持,由三个区域组成:

使用 UnsafeRow 的收益:

构造函数的主要流程已在上图中说明,需要注意的是:当内存不足时(毕竟每个 grouping 对应的 agg buffer 直接占用内存,如果 grouping 非常多,或者 agg buffer 较大,容易出现内存用尽)会从 hash based aggregate 切换为 sort based aggregate(会 spill 数据到磁盘),后文会进行详述。先来看看最关键的 processInputs 方法的实现

上图中,需要注意的是:hashMap 中 get 一个 groupingKey 对应的 agg buffer 时,若已经存在该 buffer 则直接返回;若不存在,尝试申请内存新建一个:

上图中,用于真正处理一条 row 的 AggregationIterator#processRow 还需进一步展开分析。在此之前,我们先来看看 AggregateFunction 的分类

AggregateFunction 可以分为 DeclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。

DeclarativeAggregate 是一类直接由 Catalyst 中的 Expressions 构成的聚合函数,主要逻辑通过调用 4 个表达式完成,分别是:

我们再次以容易理解的 Count 来举例说明:

通常来讲,实现一个基于 Expressions 的 DeclarativeAggregate 函数包含以下几个重要的组成部分:

再来看看 AggregationIterator#processRow

AggregationIterator#processRow 会调用

生成用于处理一行数据(row)的函数

说白了 processRow 生成了函数才是直接用来接受一条 input row 来更新对应的 agg buffer,具体是根据 mode 及 aggExpression 中的 aggFunction 的类型调用其 updateExpressions 或 mergeExpressions 方法:

比如,对于 aggFunction 为 DeclarativeAggregate 类型的 Partial 下的 Count 来说就是调用其 updateExpressions 方法,即:

对于 Final 的 Count 来说就是调用其 mergeExpressions 方法,即:

对于 aggFunction 为 ImperativeAggregate 类型的 Partial 下的 Collect 来说就是调用其 update 方法,即:

对于 Final 的 Collect 来说就是调用其 merge 方法,即:

我们都知道,读取一个迭代器的数据,是要不断调用 hasNext 方法进行 check 是否还有数据,当该方法返回 true 的时候再调用 next 方法取得下一条数据。所以要知道如何读取 TungstenAggregationIterator 的数据,就得分析其这两个方法。

分为两种情况,分别是:

Agg 的实现确实复杂,本文虽然篇幅已经很长,但还有很多方面没有 cover 到,但基本最核心、最复杂的点都详细介绍了,如果对于未 cover 的部分有兴趣,请自行阅读源码进行分析~

㈩ Spark SQL 到底怎么搭建起来

1、spark1.0的包编译时指定支持hive: ./make-distribution.sh --hadoop 2.3.0-cdh5.0.0 --with-yarn --with-hive --tgz
2、安装完spark1.0;
3、安装与hadoop对应的CDH版本的hive;
Spark SQL 支持Hive案例:
1、将hive-site.xml配置文件拷贝到$SPARK_HOME/conf下
hive-site.xml文件内容形如:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop000:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>

<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>

<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
</property>
</configuration>

2、启动spark: spark-shell
案例来源于spark官方文档: http://spark.apache.org/docs/latest/sql-programming-guide.html
//创建hiveContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// 隐式转换
import hiveContext._

//创建hive表
hql("CREATE TABLE IF NOT EXISTS hive.kv_src (key INT, value STRING)")

//加载数据到hive表
hql("LOAD DATA LOCAL INPATH '/home/spark/app/spark-1.0.0-bin-2.3.0-cdh5.0.0/examples/src/main/resources/kv1.txt' INTO TABLE hive.kv_src")

//通过hql查询
hql("FROM hive.kv_src SELECT key, value").collect().foreach(println)