㈠ spark與hive查詢得出的數據不同
在實際工作的情況中,經常有spark與hive查詢出來的數據存在不一樣的情況,基本的原因如下: 1、由於精度不一樣導致的 2、更多的時候確實是由於元數據混亂導致的 (就是說hive中能讀到這個欄位的值,但是在spark中卻無法讀取到該欄位的值。 很多時候可能還是由於大小寫的混亂所導致的) 同一條sql,hive能生成表,而spark卻生成的一張空表,或者數據缺少,存在null值,與hive結果不一致 設置 spark.sql.hive.convertMetastoreOrc=false convertMetastoreParquet=false 原因: spark用自己的格式讀取hive文件後進行自動轉換後進行操作 官方說明㈡ 給定關鍵詞,怎麼使用spark sql進行查詢,然後返回查詢的結果。就跟搜索引擎類似,怎麼將請求發送給集群
UPDATE m_data SET m_type = '1' WHERE (m_name LIKE '%食品%')
加上條件就行了
㈢ Spark Sql 源碼剖析(二): TreeNode
使用 object CurrentOrigin 為 TreeNodes 提供一個可以查找上下文的地方,比如當前正在解析哪行 code。
object CurrentOrigin 主要包含一個 private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 僅在 parser 中使用,在 visit 每個節點的時候都會使用,記錄當前 parse 的節點是哪行哪列
另外,從 value 是 ThreadLocal 類型可以看出,在 Spark SQL 中,parse sql 時都是在單獨的 thread 里進行的(不同的 sql 不同的 thread)
返回該節點的 seq of children,children 是不可變的。有三種情況:
查找第一個符合 f 條件(比如某個類型的)的 TreeNode,先序遍歷。
將函數 f 遞歸應用於節點及其子節點
與 foreach 不同的是,foreach 先應用於 parent,再應用與 child;而 foreachUp 是先應用於 child 再應用與 parent
調用 foreach,foreach 中應用的函數是 ret += f(_) ,最終返回一個 seq,包含將 f 通過 foreach 方式應用於所有節點並 add 到 ret。其中 f 本身是 BaseType => A 類型
原理與 map 一致,只是 f 變成了 BaseType => TraversableOnce[A]
PartialFunction#lift :將 partial func 轉換為一個返回 Option 結果的函數。將 pf 函數應用於符合 pf 定義的節點(即 pf.lift(node)返回的 Option 不是 None )並都 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回
以 Seq 的形式返回 tree 的所有葉子節點
def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] :注意,因為可能沒有符合 pf 定義的節點,所有返回的 Option 可能是 None
相當於 proctIterator.map(f).toArray ,即對於 proctIterator 每個元素執行 f 然後將 ret 組成一個 arr 返回
注意:TreeNode 沒有實現 Proct 相關方法,都由其子類自行實現
使用 new children 替換並返回該節點的拷貝。該方法會對 proctElement 每個元素進行模式匹配,根據節點類型及一定規則進行替換。
調用 transformDown
rule: PartialFunction[BaseType, BaseType]
返回 f 應用於所有子節點(非遞歸,一般將遞歸操作放在調用該函數的地方)後該節點的 。其內部的原理是調用 mapProctIterator,對每一個 proctElement(i) 進行各種模式匹配,若能匹配上某個再根據一定規則進行轉換,核心匹配轉換如下:
以上都是適用於有 children 的 node,如果是 children 為 null 的 node 直接返回
反射生成節點副本
返回該類型 TreeNode 的 name,默認為 class name;注意,會移除物理操作的 Exec$ 前綴
所有應該以該節點內嵌套樹表示的 nodes,比如,可以被用來表示 sub-queries
(children ++ innerChildren).toSet[TreeNode[_]]
主要用於互動式 debug,返回該 tree 指定下標的節點,num 可以在 numberedTreeString 找到。最終調用的
我的博客即將搬運同步至騰訊雲+社區,邀請大家一同入駐: https://cloud.tencent.com/developer/support-plan?invite_code=x2lzoxh4s5hi
㈣ spark sql 字元串轉化為精確到日日期格式
字元轉換為日期時,Style的使用
--1. Style=101時,表示日期字元串為:mm/dd/yyyy格式
SELECT CONVERT(datetime,'11/1/2003',101)
--結果:2003-11-01 00:00:00.000
--2. Style=101時,表示日期字元串為:dd/mm/yyyy格式
SELECT CONVERT(datetime,'11/1/2003',103)
--結果:2003-01-11 00:00:00.000
/*== 日期轉換為字元串 ==*/
DECLARE @dt datetime
SET @dt='2003-1-11'
㈤ spark sql 字元串轉化為精確到日日期格式
1、首先打開SQL SERVER的管理工具,然後我們選定一個資料庫,點擊新建查詢。
㈥ 如何使用 Spark SQL
一、啟動方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
註:/data/spark-1.4.0-bin-cdh4/為spark的安裝路徑
/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看啟動選項
--master MASTER_URL 指定master url
--executor-memory MEM 每個executor的內存,默認為1G
--total-executor-cores NUM 所有executor的總核數
-e <quoted-query-string> 直接執行查詢SQL
-f <filename> 以文件方式批量執行SQL
二、Spark sql對hive支持的功能
1、查詢語句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作運算:
1) 關系運算:= ==, <>, <, >, >=, <=
2) 算術運算:+, -, *, /, %
3) 邏輯運算:AND, &&, OR, ||
4) 復雜的數據結構
5) 數學函數:(sign, ln, cos, etc)
6) 字元串函數:
3、 UDF
4、 UDAF
5、 用戶定義的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查詢: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分區表
12、 視圖
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE
14、 支持的數據類型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT
三、Spark sql 在客戶端編程方式進行查詢數據
1、啟動spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、編寫程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有數據:df.show()
查看錶結構:df.printSchema()
只看name列:df.select("name").show()
對數據運算:df.select(df("name"), df("age") + 1).show()
過濾數據:df.filter(df("age") > 21).show()
分組統計:df.groupBy("age").count().show()
1、查詢txt數據
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件
val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查詢結果數據
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet「)
四、Spark sql性能調優
緩存數據表:sqlContext.cacheTable("tableName")
取消緩存表:sqlContext.uncacheTable("tableName")
spark.sql.inMemoryColumnarStorage.compressedtrue當設置為true時,Spark SQL將為基於數據統計信息的每列自動選擇一個壓縮演算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱狀緩存的批數據大小。更大的批數據可以提高內存的利用率以及壓縮效率,但有OOMs的風險
㈦ 可能是全網最詳細的 Spark Sql Aggregate 源碼剖析
縱觀 Spark Sql 源碼,聚合的實現是其中較為復雜的部分,本文希望能以例子結合流程圖的方式來說清楚整個過程。這里僅關注 Aggregate 在物理執行計劃相關的內容,之前的 parse、analyze 及 optimize 階段暫不做分析。在 Spark Sql 中,有一個專門的 Aggregation strategy 用來處理聚合,我們先來看看這個策略。
本文暫不討論 distinct Aggregate 的實現(有興趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我們來看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理執行計劃的
創建聚合分為兩個階段:
AggregateExpression 共有以下幾種 mode:
Q:是否支持使用 hash based agg 是如何判斷的?
摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd
為了說明最常用也是最復雜的的 hash based agg,本小節暫時將示例 sql 改為
這樣就能進入 HashAggregateExec 的分支
構造函數主要工作就是對 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 進行了初始化
在 enable code gen 的情況下,會調用 HashAggregateExec#inputRDDs 來生成 RDD,為了分析 HashAggregateExec 是如何生成 RDD 的,我們設置 spark.sql.codegen.wholeStage 為 false 來 disable code gen,這樣就會調用 HashAggregateExec#doExecute 來生成 RDD,如下:
可以看到,關鍵的部分就是根據 child.execute() 生成的 RDD 的每一個 partition 的迭代器轉化生成一個新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各個 partition。由於 TungstenAggregationIterator 涉及內容非常多,我們單開一大節來進行介紹。
此迭代器:
註:UnsafeKVExternalSorter 的實現可以參考:
UnsafeRow 是 InternalRow(表示一行記錄) 的 unsafe 實現,由原始內存(byte array)而不是 Java 對象支持,由三個區域組成:
使用 UnsafeRow 的收益:
構造函數的主要流程已在上圖中說明,需要注意的是:當內存不足時(畢竟每個 grouping 對應的 agg buffer 直接佔用內存,如果 grouping 非常多,或者 agg buffer 較大,容易出現內存用盡)會從 hash based aggregate 切換為 sort based aggregate(會 spill 數據到磁碟),後文會進行詳述。先來看看最關鍵的 processInputs 方法的實現
上圖中,需要注意的是:hashMap 中 get 一個 groupingKey 對應的 agg buffer 時,若已經存在該 buffer 則直接返回;若不存在,嘗試申請內存新建一個:
上圖中,用於真正處理一條 row 的 AggregationIterator#processRow 還需進一步展開分析。在此之前,我們先來看看 AggregateFunction 的分類
AggregateFunction 可以分為 DeclarativeAggregate 和 ImperativeAggregate 兩大類,具體的聚合函數均為這兩類的子類。
DeclarativeAggregate 是一類直接由 Catalyst 中的 Expressions 構成的聚合函數,主要邏輯通過調用 4 個表達式完成,分別是:
我們再次以容易理解的 Count 來舉例說明:
通常來講,實現一個基於 Expressions 的 DeclarativeAggregate 函數包含以下幾個重要的組成部分:
再來看看 AggregationIterator#processRow
AggregationIterator#processRow 會調用
生成用於處理一行數據(row)的函數
說白了 processRow 生成了函數才是直接用來接受一條 input row 來更新對應的 agg buffer,具體是根據 mode 及 aggExpression 中的 aggFunction 的類型調用其 updateExpressions 或 mergeExpressions 方法:
比如,對於 aggFunction 為 DeclarativeAggregate 類型的 Partial 下的 Count 來說就是調用其 updateExpressions 方法,即:
對於 Final 的 Count 來說就是調用其 mergeExpressions 方法,即:
對於 aggFunction 為 ImperativeAggregate 類型的 Partial 下的 Collect 來說就是調用其 update 方法,即:
對於 Final 的 Collect 來說就是調用其 merge 方法,即:
我們都知道,讀取一個迭代器的數據,是要不斷調用 hasNext 方法進行 check 是否還有數據,當該方法返回 true 的時候再調用 next 方法取得下一條數據。所以要知道如何讀取 TungstenAggregationIterator 的數據,就得分析其這兩個方法。
分為兩種情況,分別是:
Agg 的實現確實復雜,本文雖然篇幅已經很長,但還有很多方面沒有 cover 到,但基本最核心、最復雜的點都詳細介紹了,如果對於未 cover 的部分有興趣,請自行閱讀源碼進行分析~
㈧ 一次sparksql問題排查記錄
問題: 在調試一個sparksql左連接查詢時發現數據結果不正確,經過一天折騰才發現使用子查詢方式能夠得到正確的結果,分析執行計劃發現第一種寫法的優化後的執行計劃將where t.ip is null and t.dn條件錯誤的加到了左表子查詢中了,即紅色標出的地方,這樣導致左表子查詢查不出數據來。
結論: 過濾條件寫在where條件中時,spark會將sql優化為inner join, 如果連接條件中的欄位出現在最後的where條件中,那麼該條件在做謂詞下推時也會被加到左表和右表中,此時就不符合預拍鬧期結果,即會導致左表中的查不到預期的數據,但是將過濾數據用的限定條件寫到子查詢中時查出的結果是正確的,執行計劃也是正確的,原因不詳,懷疑碧告是spark執行計劃優化中的bug;襲慧罩
過程數據記錄
1、條件在where中
select
oneday.dn, oneday.ip, ', '
from
(
select
ip,dn
from dwd_dns.t_ip_dn_his_rel2
where dt = '
group by ip,dn
) oneday left join dwd_dns.t_ip_dn_first t on t.ip = oneday.ip and t.dn = oneday.dn
where t.ip is null and t.dn is null and t.dt = '
執行計劃:
== Optimized Logical Plan ==
InsertIntoHiveTable dwd_dns . t_ip_dn_first , org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(dt -> None), true, false, [dn, ip, first_time, dt]
+- Project [dn#1, ip#2, 20201202 AS first_time#28, 20201202 AS dt#29]
+- Join Inner, ((ip#8 = ip#2) && (dn#7 = dn#1))
:- Aggregate [ip#2, dn#1], [ip#2, dn#1]
: +- Project [dn#1, ip#2]
: +- Filter (((((isnotnull(dt#6) && (dt#6 = 20201202)) && isnull(dn#1)) && isnull(ip#2)) && isnotnull(ip#2)) && isnotnull(dn#1))
: +- Relation[uid#0,dn#1,ip#2,cname#3,dnsip#4,probe_time#5,dt#6] orc
+- Project [dn#7, ip#8]
+- Filter (((((isnotnull(dt#10) && isnull(ip#8)) && isnull(dn#7)) && (dt#10 = 20201001)) && isnotnull(ip#8)) && isnotnull(dn#7))
+- Relation[dn#7,ip#8,first_time#9,dt#10] orc
2、條件在子查詢中
select
/ + REPARTITION(10) /
oneday.dn, oneday.ip, ', '
from
(
select
ip,dn
from dwd_dns.t_ip_dn_his_rel2
where dt = '
group by ip,dn
) oneday left join
(
select dn, ip
from
dwd_dns.t_ip_dn_first
where dt = '
) t on t.ip = oneday.ip and t.dn = oneday.dn
where t.ip is null and t.dn is null
執行計劃:
== Optimized Logical Plan ==
InsertIntoHiveTable dwd_dns . t_ip_dn_first , org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(dt -> None), true, false, [dn, ip, first_time, dt]
+- Project [dn#1, ip#2, 20201202 AS first_time#28, 20201202 AS dt#29]
+- Repartition 10, true
+- Project [dn#1, ip#2]
+- Filter (isnull(ip#8) && isnull(dn#7))
+- Join LeftOuter, ((ip#8 = ip#2) && (dn#7 = dn#1))
:- Aggregate [ip#2, dn#1], [ip#2, dn#1]
: +- Project [dn#1, ip#2]
: +- Filter (isnotnull(dt#6) && (dt#6 = 20201202))
: +- Relation[uid#0,dn#1,ip#2,cname#3,dnsip#4,probe_time#5,dt#6] orc
+- Project [dn#7, ip#8]
+- Filter (((isnotnull(dt#10) && (dt#10 = 20201001)) && isnotnull(ip#8)) && isnotnull(dn#7))
+- Relation[dn#7,ip#8,first_time#9,dt#10] orc