㈠ spark与hive查询得出的数据不同
在实际工作的情况中,经常有spark与hive查询出来的数据存在不一样的情况,基本的原因如下: 1、由于精度不一样导致的 2、更多的时候确实是由于元数据混乱导致的 (就是说hive中能读到这个字段的值,但是在spark中却无法读取到该字段的值。 很多时候可能还是由于大小写的混乱所导致的) 同一条sql,hive能生成表,而spark却生成的一张空表,或者数据缺少,存在null值,与hive结果不一致 设置 spark.sql.hive.convertMetastoreOrc=false convertMetastoreParquet=false 原因: spark用自己的格式读取hive文件后进行自动转换后进行操作 官方说明㈡ 给定关键词,怎么使用spark sql进行查询,然后返回查询的结果。就跟搜索引擎类似,怎么将请求发送给集群
UPDATE m_data SET m_type = '1' WHERE (m_name LIKE '%食品%')
加上条件就行了
㈢ Spark Sql 源码剖析(二): TreeNode
使用 object CurrentOrigin 为 TreeNodes 提供一个可以查找上下文的地方,比如当前正在解析哪行 code。
object CurrentOrigin 主要包含一个 private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 仅在 parser 中使用,在 visit 每个节点的时候都会使用,记录当前 parse 的节点是哪行哪列
另外,从 value 是 ThreadLocal 类型可以看出,在 Spark SQL 中,parse sql 时都是在单独的 thread 里进行的(不同的 sql 不同的 thread)
返回该节点的 seq of children,children 是不可变的。有三种情况:
查找第一个符合 f 条件(比如某个类型的)的 TreeNode,先序遍历。
将函数 f 递归应用于节点及其子节点
与 foreach 不同的是,foreach 先应用于 parent,再应用与 child;而 foreachUp 是先应用于 child 再应用与 parent
调用 foreach,foreach 中应用的函数是 ret += f(_) ,最终返回一个 seq,包含将 f 通过 foreach 方式应用于所有节点并 add 到 ret。其中 f 本身是 BaseType => A 类型
原理与 map 一致,只是 f 变成了 BaseType => TraversableOnce[A]
PartialFunction#lift :将 partial func 转换为一个返回 Option 结果的函数。将 pf 函数应用于符合 pf 定义的节点(即 pf.lift(node)返回的 Option 不是 None )并都 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回
以 Seq 的形式返回 tree 的所有叶子节点
def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] :注意,因为可能没有符合 pf 定义的节点,所有返回的 Option 可能是 None
相当于 proctIterator.map(f).toArray ,即对于 proctIterator 每个元素执行 f 然后将 ret 组成一个 arr 返回
注意:TreeNode 没有实现 Proct 相关方法,都由其子类自行实现
使用 new children 替换并返回该节点的拷贝。该方法会对 proctElement 每个元素进行模式匹配,根据节点类型及一定规则进行替换。
调用 transformDown
rule: PartialFunction[BaseType, BaseType]
返回 f 应用于所有子节点(非递归,一般将递归操作放在调用该函数的地方)后该节点的 。其内部的原理是调用 mapProctIterator,对每一个 proctElement(i) 进行各种模式匹配,若能匹配上某个再根据一定规则进行转换,核心匹配转换如下:
以上都是适用于有 children 的 node,如果是 children 为 null 的 node 直接返回
反射生成节点副本
返回该类型 TreeNode 的 name,默认为 class name;注意,会移除物理操作的 Exec$ 前缀
所有应该以该节点内嵌套树表示的 nodes,比如,可以被用来表示 sub-queries
(children ++ innerChildren).toSet[TreeNode[_]]
主要用于交互式 debug,返回该 tree 指定下标的节点,num 可以在 numberedTreeString 找到。最终调用的
我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻: https://cloud.tencent.com/developer/support-plan?invite_code=x2lzoxh4s5hi
㈣ spark sql 字符串转化为精确到日日期格式
字符转换为日期时,Style的使用
--1. Style=101时,表示日期字符串为:mm/dd/yyyy格式
SELECT CONVERT(datetime,'11/1/2003',101)
--结果:2003-11-01 00:00:00.000
--2. Style=101时,表示日期字符串为:dd/mm/yyyy格式
SELECT CONVERT(datetime,'11/1/2003',103)
--结果:2003-01-11 00:00:00.000
/*== 日期转换为字符串 ==*/
DECLARE @dt datetime
SET @dt='2003-1-11'
㈤ spark sql 字符串转化为精确到日日期格式
1、首先打开SQL SERVER的管理工具,然后我们选定一个数据库,点击新建查询。
㈥ 如何使用 Spark SQL
一、启动方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
注:/data/spark-1.4.0-bin-cdh4/为spark的安装路径
/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看启动选项
--master MASTER_URL 指定master url
--executor-memory MEM 每个executor的内存,默认为1G
--total-executor-cores NUM 所有executor的总核数
-e <quoted-query-string> 直接执行查询SQL
-f <filename> 以文件方式批量执行SQL
二、Spark sql对hive支持的功能
1、查询语句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作运算:
1) 关系运算:= ==, <>, <, >, >=, <=
2) 算术运算:+, -, *, /, %
3) 逻辑运算:AND, &&, OR, ||
4) 复杂的数据结构
5) 数学函数:(sign, ln, cos, etc)
6) 字符串函数:
3、 UDF
4、 UDAF
5、 用户定义的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查询: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分区表
12、 视图
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE
14、 支持的数据类型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT
三、Spark sql 在客户端编程方式进行查询数据
1、启动spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、编写程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有数据:df.show()
查看表结构:df.printSchema()
只看name列:df.select("name").show()
对数据运算:df.select(df("name"), df("age") + 1).show()
过滤数据:df.filter(df("age") > 21).show()
分组统计:df.groupBy("age").count().show()
1、查询txt数据
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件
val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查询结果数据
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)
四、Spark sql性能调优
缓存数据表:sqlContext.cacheTable("tableName")
取消缓存表:sqlContext.uncacheTable("tableName")
spark.sql.inMemoryColumnarStorage.compressedtrue当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险
㈦ 可能是全网最详细的 Spark Sql Aggregate 源码剖析
纵观 Spark Sql 源码,聚合的实现是其中较为复杂的部分,本文希望能以例子结合流程图的方式来说清楚整个过程。这里仅关注 Aggregate 在物理执行计划相关的内容,之前的 parse、analyze 及 optimize 阶段暂不做分析。在 Spark Sql 中,有一个专门的 Aggregation strategy 用来处理聚合,我们先来看看这个策略。
本文暂不讨论 distinct Aggregate 的实现(有兴趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我们来看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理执行计划的
创建聚合分为两个阶段:
AggregateExpression 共有以下几种 mode:
Q:是否支持使用 hash based agg 是如何判断的?
摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd
为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为
这样就能进入 HashAggregateExec 的分支
构造函数主要工作就是对 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 进行了初始化
在 enable code gen 的情况下,会调用 HashAggregateExec#inputRDDs 来生成 RDD,为了分析 HashAggregateExec 是如何生成 RDD 的,我们设置 spark.sql.codegen.wholeStage 为 false 来 disable code gen,这样就会调用 HashAggregateExec#doExecute 来生成 RDD,如下:
可以看到,关键的部分就是根据 child.execute() 生成的 RDD 的每一个 partition 的迭代器转化生成一个新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各个 partition。由于 TungstenAggregationIterator 涉及内容非常多,我们单开一大节来进行介绍。
此迭代器:
注:UnsafeKVExternalSorter 的实现可以参考:
UnsafeRow 是 InternalRow(表示一行记录) 的 unsafe 实现,由原始内存(byte array)而不是 Java 对象支持,由三个区域组成:
使用 UnsafeRow 的收益:
构造函数的主要流程已在上图中说明,需要注意的是:当内存不足时(毕竟每个 grouping 对应的 agg buffer 直接占用内存,如果 grouping 非常多,或者 agg buffer 较大,容易出现内存用尽)会从 hash based aggregate 切换为 sort based aggregate(会 spill 数据到磁盘),后文会进行详述。先来看看最关键的 processInputs 方法的实现
上图中,需要注意的是:hashMap 中 get 一个 groupingKey 对应的 agg buffer 时,若已经存在该 buffer 则直接返回;若不存在,尝试申请内存新建一个:
上图中,用于真正处理一条 row 的 AggregationIterator#processRow 还需进一步展开分析。在此之前,我们先来看看 AggregateFunction 的分类
AggregateFunction 可以分为 DeclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。
DeclarativeAggregate 是一类直接由 Catalyst 中的 Expressions 构成的聚合函数,主要逻辑通过调用 4 个表达式完成,分别是:
我们再次以容易理解的 Count 来举例说明:
通常来讲,实现一个基于 Expressions 的 DeclarativeAggregate 函数包含以下几个重要的组成部分:
再来看看 AggregationIterator#processRow
AggregationIterator#processRow 会调用
生成用于处理一行数据(row)的函数
说白了 processRow 生成了函数才是直接用来接受一条 input row 来更新对应的 agg buffer,具体是根据 mode 及 aggExpression 中的 aggFunction 的类型调用其 updateExpressions 或 mergeExpressions 方法:
比如,对于 aggFunction 为 DeclarativeAggregate 类型的 Partial 下的 Count 来说就是调用其 updateExpressions 方法,即:
对于 Final 的 Count 来说就是调用其 mergeExpressions 方法,即:
对于 aggFunction 为 ImperativeAggregate 类型的 Partial 下的 Collect 来说就是调用其 update 方法,即:
对于 Final 的 Collect 来说就是调用其 merge 方法,即:
我们都知道,读取一个迭代器的数据,是要不断调用 hasNext 方法进行 check 是否还有数据,当该方法返回 true 的时候再调用 next 方法取得下一条数据。所以要知道如何读取 TungstenAggregationIterator 的数据,就得分析其这两个方法。
分为两种情况,分别是:
Agg 的实现确实复杂,本文虽然篇幅已经很长,但还有很多方面没有 cover 到,但基本最核心、最复杂的点都详细介绍了,如果对于未 cover 的部分有兴趣,请自行阅读源码进行分析~
㈧ 一次sparksql问题排查记录
问题: 在调试一个sparksql左连接查询时发现数据结果不正确,经过一天折腾才发现使用子查询方式能够得到正确的结果,分析执行计划发现第一种写法的优化后的执行计划将where t.ip is null and t.dn条件错误的加到了左表子查询中了,即红色标出的地方,这样导致左表子查询查不出数据来。
结论: 过滤条件写在where条件中时,spark会将sql优化为inner join, 如果连接条件中的字段出现在最后的where条件中,那么该条件在做谓词下推时也会被加到左表和右表中,此时就不符合预拍闹期结果,即会导致左表中的查不到预期的数据,但是将过滤数据用的限定条件写到子查询中时查出的结果是正确的,执行计划也是正确的,原因不详,怀疑碧告是spark执行计划优化中的bug;袭慧罩
过程数据记录
1、条件在where中
select
oneday.dn, oneday.ip, ', '
from
(
select
ip,dn
from dwd_dns.t_ip_dn_his_rel2
where dt = '
group by ip,dn
) oneday left join dwd_dns.t_ip_dn_first t on t.ip = oneday.ip and t.dn = oneday.dn
where t.ip is null and t.dn is null and t.dt = '
执行计划:
== Optimized Logical Plan ==
InsertIntoHiveTable dwd_dns . t_ip_dn_first , org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(dt -> None), true, false, [dn, ip, first_time, dt]
+- Project [dn#1, ip#2, 20201202 AS first_time#28, 20201202 AS dt#29]
+- Join Inner, ((ip#8 = ip#2) && (dn#7 = dn#1))
:- Aggregate [ip#2, dn#1], [ip#2, dn#1]
: +- Project [dn#1, ip#2]
: +- Filter (((((isnotnull(dt#6) && (dt#6 = 20201202)) && isnull(dn#1)) && isnull(ip#2)) && isnotnull(ip#2)) && isnotnull(dn#1))
: +- Relation[uid#0,dn#1,ip#2,cname#3,dnsip#4,probe_time#5,dt#6] orc
+- Project [dn#7, ip#8]
+- Filter (((((isnotnull(dt#10) && isnull(ip#8)) && isnull(dn#7)) && (dt#10 = 20201001)) && isnotnull(ip#8)) && isnotnull(dn#7))
+- Relation[dn#7,ip#8,first_time#9,dt#10] orc
2、条件在子查询中
select
/ + REPARTITION(10) /
oneday.dn, oneday.ip, ', '
from
(
select
ip,dn
from dwd_dns.t_ip_dn_his_rel2
where dt = '
group by ip,dn
) oneday left join
(
select dn, ip
from
dwd_dns.t_ip_dn_first
where dt = '
) t on t.ip = oneday.ip and t.dn = oneday.dn
where t.ip is null and t.dn is null
执行计划:
== Optimized Logical Plan ==
InsertIntoHiveTable dwd_dns . t_ip_dn_first , org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(dt -> None), true, false, [dn, ip, first_time, dt]
+- Project [dn#1, ip#2, 20201202 AS first_time#28, 20201202 AS dt#29]
+- Repartition 10, true
+- Project [dn#1, ip#2]
+- Filter (isnull(ip#8) && isnull(dn#7))
+- Join LeftOuter, ((ip#8 = ip#2) && (dn#7 = dn#1))
:- Aggregate [ip#2, dn#1], [ip#2, dn#1]
: +- Project [dn#1, ip#2]
: +- Filter (isnotnull(dt#6) && (dt#6 = 20201202))
: +- Relation[uid#0,dn#1,ip#2,cname#3,dnsip#4,probe_time#5,dt#6] orc
+- Project [dn#7, ip#8]
+- Filter (((isnotnull(dt#10) && (dt#10 = 20201001)) && isnotnull(ip#8)) && isnotnull(dn#7))
+- Relation[dn#7,ip#8,first_time#9,dt#10] orc