當前位置︰技術分享 > 技術參考 > 正文

深入理解 Spark SQL 的 Catalyst 優化器2019-07-24 10:25:48 | 編輯︰hely | 查看︰ | 評論︰0

在本篇博客,我們將重新發表論文中的部分內容,為廣大讀者解釋Catalyst 優化器的內部原理。

Spark SQL是Spark最新且技術最復雜的組件之一。它同時支持SQL查詢和新的DataFrame API。Spark SQL的核心是Catalyst優化器,它以一種全新的方式利用高級語言的特性(例如︰Scala的模式匹配和Quasiquotes ?)構建一個可擴展的查詢優化器。

最近我們在SIGMOD 2015 發表了一篇論文(合作者︰Davies Liu,Joseph K. Bradley,Xiangrui Meng,Tomer Kaftan,Michael J. Franklin 和 Ali Ghodsi)。在本篇博客,我們將重新發表論文中的部分內容,為廣大讀者解釋Catalyst 優化器的內部原理。

為了實現Spark SQL,我們基于Scala函數式編程結構,設計了一個新的可擴展的優化器Catalyst。其可擴展設計有兩個目的︰首先,我們希望能夠非常容易地為Spark SQL添加新的優化技術和特性,尤其是為了應對我們遇到的大數據中的各種問題(例如︰半結構化數據和高級分析);其次,我們希望外部的開發者可以擴展優化器。例如,為數據源添加特定的規則從而使過濾或聚合操作下推到外部的存儲系統,或者支持新的數據類型。Catalyst 同時支持基于規則和基于成本的優化(CBO)。

Catalyst核心是樹和操作樹的規則的一個通用庫。在框架的頂層,我們構建了專門用于關系型查詢處理的庫(例如,表達式,邏輯查詢計劃),以及處理查詢執行的不同階段的幾組規則︰分析,邏輯優化,物理計劃和將部分查詢編譯為Java字節碼的代碼生成。對于後者,我們使用了另一個Scala特性Quasiquotes,它使得在運行時從組合表達式生成代碼機器變得非常簡單。最後,Catalyst 提供了若干公共的擴展點,包括擴展數據源和用戶自定義類型。

樹(Trees)

Catalyst 主要的數據類型是由節點對象構成的樹。每個節點由一個節點類型和零到多個子節點組成。節點類型在Scala 中被定義為TreeNode 類的子類。這些對象是不可變的,可以使用函數式的轉換對其進行操作,我們將在下一小節繼續討論。

舉個簡單的例子,假設我們有以下三個節點類型,可以用更簡化的表達式表示為︰

這些類構建成樹;例如,表達式x+(1+2),可以在Scala代碼中表示為︰

 

 

規則(Rules)

規則用于對樹進行操作,其實際上是一個將一棵樹轉換為另外一棵樹的方法。雖然規則可以在其輸入樹上運行任意的代碼(假定該樹只是一個Scala 對象),但最常見的方式是使用一組模式匹配函數,找到並替換特定結構的子樹。

模式匹配是許多函數式編程語言的特性,允許從代數數據類型的嵌套結構中進行值提取。在Catalyst,樹提供的轉換方法可以遞歸地應用模式匹配函數到樹的所有節點。例如,我們可以實現一個常量之間疊加操作的規則︰

應用這條規則到樹x+(1+2) 就會產生一棵新樹x+3。這里case 關鍵字是Scala 標準模式匹配的語法,可被用于匹配對象的類型以及命名值提取(這里是c1和c2)。

被傳遞給轉換操作的模式匹配表達式是一個偏函數?,其只需要匹配所有可能的輸入樹的子集即可。Catalyst 將測試規則會應用到樹的哪個部分,並自動跳過並下降到還沒有匹配的子樹。這樣的能力意味著規則只需要對優化適用的樹進行推理。因此,即使添加新的操作類型到系統中,也不需要修改規則。

規則(通常是 Scala 的模式匹配)可以在相同的轉換調用中匹配多個模式,這使得一次實現多個轉換操作非常的簡單︰

實踐中,規則可能需要執行多次才能完全轉換一棵樹。Catalyst 將規則分成批次,執行各個批次直到達到一個固定的點,即應用規則之後樹不再更新為止。執行規則達到一個固定的點,意味著每條規則可以非常簡單且是自包含的,但是,最終仍會在樹上產生比較大的全局效果。在上面的例子中,重復地應用規則將不斷折疊一棵大樹,如(x+0)+(3+3)。另一個例子,第一個批次也許分析一個表達式並將類型賦給所有屬性,而第二個批次可能使用這些類型進行不斷折疊。每個批次之後,開發者還可以在生成的新樹上運行健全性檢查(例如,查看所有的屬性都指定了類型),通常這也同樣通過遞歸匹配來編寫。

最後,規則條件及其實現可以包含具體的Scala 代碼。這使得Catalyst 比優化器 DSL 更加強大,同時保持了規則的簡潔性。

根據我們的經驗,對不可變樹執行函數式轉換操作使得整個優化器非常易于推理和調試。同時也使得優化器的轉換操作可以並行化,盡管我們還沒有把它利用起來。

在Spark SQL 中使用Catalyst

我們在四個階段使用了Catalyst 通用樹轉換操作框架,如下所示︰

分析邏輯計劃解析引用
邏輯計劃優化
物理計劃

代碼生成,編譯部分查詢為Java 字節碼

 

 

分析(Analysis)

Spark SQL 以一個需要計算的關系開始,其要麼來自SQL 解析器返回的抽象語法樹(AST),要麼來自使用API 構造的DataFrame 對象。在兩種情況下,關系可能包含未解析的屬性引用或關系︰例如,在SQL 查詢

的類型,甚至是否是一個合法的列名,在我們查詢表sales 之前都是未知的。如果我們不知道其類型或者沒有匹配到輸入表(或別名),那麼這個屬性就未被解析。Spark SQL 使用Catalyst 規則和一個Catalyst 對象去追蹤所有數據源的表來解析這些屬性。從未綁定的屬性和數據類型構建一個“未解析的邏輯計劃”,然後應用規則執行下面的步驟︰

通過名字從Catalog 中查找關系。

映射命名屬性,如col,到輸入的給定操作符子項。

檢查哪些屬性引用了相同的值給它們一個相同的ID(之後允許針對col = col 這樣的表達式進行優化)。

通過表達式傳遞和強制類型︰舉個例子,我們無法知道1+col 的返回類型,直到解析col 並可能將其子表達式轉換為兼容類型。

總共,分析器相關的規則大概1000行代碼。

邏輯優化( Logical Optimizations)

邏輯優化階段對邏輯優化應用了標準的基于規則的優化方式。(執行基于成本的優化,通過使用規則生成多個計劃並計算他們的成本。)包括︰常量折疊(Constant Folding)、謂詞下推(Predicate Pushdown)、投影裁剪(Projection Pruning)、空傳遞(Null Propagation)、布爾表達式簡化(Boolean Expression Simplification)和其它規則。總的來說,我們發現為各種情形添加新的規則都極為簡單。例如,當我們添加固定精度的DECIMAL 類型到Spark SQL時,以低精度的方式優化對DECIMAL的如SUM和AVG的聚合操作;只要12行代碼編寫一條規則在SUM和AVG表達式中找到這樣的DECIMAL,然後將他們轉換為64 位的LONG 類型進行聚合操作,最後將結果轉換回來。下面是一個僅優化了SUM表達式的簡化版本:

另外一個例子,一條12行的規則通過簡單的正則表達式將LIKE表達式優化為調用。在規則中使用任意Scala代碼的自由,使得這些優化超越了模式匹配子樹結構,更易于表達。

總共,邏輯優化規則大概800行代碼。

物理計劃(Physical Planning)

物理計劃階段,Spark SQL將一個邏輯計劃使用匹配的Spark執行引擎的物理操作符生成一個或更多的物理計劃。然後選擇一個計劃應用成本模型。此時,基于成本的優化器只用于選擇連接算法︰對于已知的很小的關系,Spark SQL使用broadcast join,使用Spark里可用的點對點的廣播工具。框架支持更廣泛的使用基于成本的優化,這是因為成本可以通過對整棵樹使用規則來遞歸估計。所以,未來我們打算實現更豐富的基于成本的優化。

物理計劃同樣執行基于規則的物理優化,如在一個Spark的map操作執行流水線投影(Piplining Projection)或過濾。除此之外,還可以從邏輯計劃將操作推到支持謂詞或投影下推的數據源。我們將在之後的章節描述這些數據源的 API。

總共,物理計劃規則大概500行代碼。

代碼生成(Code Generation)

查詢優化的最後階段涉及生成運行在各台機器上的Java字節碼。由于Spark SQL通常是運行在內存數據集上,其處理受限于CPU,因此我們希望支持代碼生成來加快執行速度。然而,構建代碼生成引擎非常的復雜,尤其是編譯器。Catalyst依賴于Scala語言特定的屬性Quasiquotes使得代碼生成更加簡單。Quasiquotes 允許在Scala語言中使用編程的方式構建抽象語法樹(ASTs),然後可以在運行時提供給Scala編譯器生成字節碼。我們使用Calalyst將SQL表達式的樹轉換為Scala代碼的AST評估表達式,然後編譯並運行生成的代碼。

舉一個簡單的例子,回憶4.2節介紹的屬性和字面量樹節點Add,這使得我們能夠寫出表達式(x+y)+1。如果沒有代碼生成,這樣的表達書不得不解析每一行數據,一直走到Add樹,屬性和字面量節點。

大量的分支和虛函數調用將減慢執行速度。通過代碼生成,我們可以向下面,寫一個函數將特定的表達式樹轉換為Scala AST︰

以q開頭的字符串就是Quasiquotes,雖然長得像字符串,但是Scala編譯器會在編譯時解析它們,並表示代碼中的ASTs。Quasiquotes支持變量或其它ASTs 片段拼接,使用$進行表示。舉個例子,Literal(1) 變成了Scala AST中的1,而Attribute("x") 變成了row.get("x")。最後,像是Add(Literal(1), Attribute("x")) 的樹變成了Scala 表達式AST 1+row.get("x")。

Quasiquotes會在編譯時進行類型檢查以確保只有合適的ASTs或者字面量能夠被替換,這比字符串連接更有用,而且是直接生成Scala AST 樹而不是在運行時運行Scala解析器。此外,由于每個節點代碼的生成規則不需要知曉其子節點是如何構建的,因此它們是高度可組合的。最後,如果Catalyst缺少表達式級別的優化,Scala編譯器會對代碼進行進一步的優化。下圖展示了Quasiquotes生成的代碼性能近似于手動優化的程序性能。

 

 

我們發現了Quasiquotes可以直接用于代碼生成,而且我們觀察到即使是Spark SQL新的提交者也可以快速增加新的表達式類型規則。Quasiquotes也與我們運行在原生Java對象的目標相契合︰當需要訪問對象中的字段時,我們通過代碼生成直接訪問需要的字段,而不必拷貝對象到一個Spark SQL的Row中然後使用Row的訪問方法。最後,將代碼生成評估與沒有生成代碼的表達式解析評估結合起來也非常便捷,因為我們編譯的Scala代碼可直接在表達式解析器中調用。

總共, Catalyst代碼生成大概700行代碼。

本篇博客覆蓋了Spark SQL的Catalyst優化器的內部實現。全新的簡單的設計使得Spark社區可以快速建立原型,實現和擴展引擎。可以通過論文中剩余的部分。如果你參加今年的 SIGMOD,請來參加我們的分享吧!

本文原文︰

來源︰過往記憶https://www.iteblog.com/

上一篇︰用Python操作Word文檔 Hadoop YARN︰調度性能優化實踐下一篇︰