《Spark的RDD原理以及2.0特性的介紹》要點:
本文介紹了Spark的RDD原理以及2.0特性的介紹,希望對您有用。如果有疑問,可以聯系我們。
Spark 是 Apache 頂級項目里面最火的大數據處理的計算引擎,它目前是負責大數據計算的工作.包括離線計算或交互式查詢、數據挖掘算法、流式計算以及圖計算等.全世界有許多公司和組織使用或給社區貢獻代碼,社區的活躍度見 www.github.com/apache/spark.
2013 年開始 Spark開發團隊成立 Databricks,來對 Spark 進行運作和管理,并提供 Cloud 服務.Spark 社區基本保持一個季度一個版本,不出意外的話 Spark 2.0 將在五月底發布.
與 Mapreduce 相比,Spark 具備 DAG 執行引擎以及基于內存的多輪迭代計算等優勢,在SQL 層面上,比 Hive/Pig 相比,引入關系數據庫的許多特性,以及內存管理技術.另外在 Spark 上所有的計算模型最終都統一基于 RDD 之上運行執行,包括流式和離線計算.Spark 基于磁盤的性能是 MR 的 10 倍,基于內存的性能是 MR 的 100 倍 ? (見文后參考閱讀? ,下同)?.
Spark 提供 SQL、機器學習庫 MLlib、流計算 Streaming 和圖計算 Graphx,同時也支持 Scala、Java、Python 和 R 語言開發的基于 API 的應用程序.
RDD,英文全稱叫 Resilient Distributed Datasets.
an RDD is a read-only, partitioned collection of records?.?字面意思是只讀的分布式數據集.
但其實個人覺得可以把 RDD 理解為關系數據庫 里的一個個操作,比如 map,filter,Join 等.在 Spark 里面實現了許多這樣的 RDD 類,即可以看成是操作類.當我們調用一個 map 接口,底層實現是會生成一個 MapPartitionsRDD 對象,當 RDD 真正執行時,會調用 MapPartitionsRDD 對象里面的 compute 方法來執行這個操作的計算邏輯.但是不同的是,RDD 是 lazy 模式,只有像 count,saveasText 這種 action 動作被調用后再會去觸發 runJob 動作.
RDD 分為二類:transformation 和 action.
val file = sc.textFile(args(0))
val words = file.flatMap(line => line.split(” “))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _, 2) wordCounts.saveAsTextFile(args(1))
這段代碼生成的 RDD 的執行樹是如下圖所示:
最終在 saveAsTextFile 方法時才會將整個 RDD 的執行圖提交給 DAG 執行引擎,根據相關信息切分成一個一個 Stage,每個 Stage 去執行多個 task,最終完成整個 Job 的執行.
還有一個區別就是,RDD 計算后的中間結果是可以被持久化,當下一次需要使用時,可以直接使用之前持久化好的結果,而不是重新計算,并且這些結果被存儲在各個結點的 executor 上.下一次使用時,調度器可以直接把 task 分發到存儲持久化數據的結點上,減少數據的網絡傳輸開稍.這種場景在數據挖掘迭代計算是經常出現.如下代碼
val links = spark.textFile(…).map(…).persist() var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs // with the contributions sent by each page val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size)) }
// Sum contributions by URL and get new ranks
?
ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum) }
以上代碼生成的 RDD 執行樹如下圖所示:
計算 contribs-0 時需要使用 links 的計算邏輯,當 links 每個分片計算完后,會將這個結果保存到本地內存或磁盤上,下一次 contribs-1 計算要使用 links 的數據時,直接從上一次保存的內存和磁盤上讀取就可以了.這個持久化系統叫做 blockManager,類似于在內部再構建了一個 KV 系統,K 表示每個分區 ID 號,V 表示這個分區計算后的結果.
另外在 streaming 計算時,每個 batch 會去消息隊列上拉取這個時間段的數據,每個 Recevier 接收過來數據形成 block 塊并存放到 blockManager 上,為了可靠性,這個 block 塊可以遠程備份,后續的 batch 計算就直接在之前已讀取的 block 塊上進行計算,這樣不斷循環迭代來完成流處理.
一個 RDD 一般會有以下四個函數組成.
定義為:
def compute(split: Partition, context: TaskContext): Iterator[T]
如在 MapPartitionsRDD 里的實現是如下:
override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))
函數定義
f: (TaskContext, Int, Iterator[T]) => Iterator[U]
protected def getPartitions: Array[Partition]?
即這個操作的數據劃分為多少個分 區.跟 mapreduce 里的 map 上的 split 類似的.
protected def getDependencies:?Seq[Dependency[_]]?
依賴分二種:如果 RDD 的每個分區最多只能被一個 Child RDD 的一個分區使用,則稱之為 narrow dependency;若依賴于多個 Child RDD 分區,則稱之為 wide dependency.不同的操作根據其特性,可能會產生不同的依賴??.如下圖所示
map 操作前后二個 RDD 操作之間的分區是一對一的關系,故產生 narrow dependency,而 join 操作的分區分別對應于它的二個子操作相對應的分區,故產生 wide dependency.當最后要生成具體的 task 運行時,就需要利用這個依賴關系也生成 Stage 的 DAG 圖.
4. 獲取該操作對應數據的存放位置信息,主要是針對 HDFS 這類有數據源的 RDD.
protected def getPreferredLocations(split: Partition): Seq[String]
Spark 的執行模式有 local、Yarn、Standalone、Mesos 四類.后面三個分別有 cluster 和 client 二種.client 和 cluster 的區別就是指 Driver 是在程序提交客戶端還是在集群的 AM 上. 比如常見的 Yarn-cluster 模式如下圖所示:
一般來說,運行簡單測試或 UT 用的是 local 模式運行,其實就是用多線程模似分布式執行. 如果業務部門較少且不需要對部門或組之間的資源做劃分和優先級調度的話,可以使用 Standalone 模式來部署.
當如果有多個部門或組,且希望每個組織可以限制固定運行的最大資源,另外組或者任務需要有優先級執行的話,可以選擇 Yarn 或 Mesos.
Unifying DataFrames and Datasets in Scala/Java
DataFrame? 和 Dataset? 的功能是什么?
它們都是提供給用戶使用,包括各類操作接口的 API.1.3 版本引入 DataFrame,1.6 版本引入 Dataset,2.0 提供的功能是將二者統一,即保留 Dataset,而把 DataFrame 定義為 Dataset[Row],即是 Dataset 里的元素對象為 Row 的一種(SPARK-13485).
在參考資料? 中有介紹 DataFrame,它就是提供了一系列操作 API,與 RDD API 相比較,DataFrame 里操作的數據都是帶有 Schema 信息,所以 DataFrame 里的所有操作是可以享受 Spark SQL Catalyst optimizer 帶來的性能提升,比如 code generation 以及 Tungsten??等.執行過程如下圖所示
但是 DataFrame 出來后發現有些情況下 RDD 可以表達的邏輯用 DataFrame 無法表達.比如 要對 group by 或 join 后的結果用自定義的函數,可能用 SQL 是無法表達的.如下代碼:
case class ClassData(a: String, b: Int)
case class ClassNullableData(a: String, b: Integer)
val ds = Seq(ClassData(“a”, 1), ClassData(“a”, 2)).toDS()
val agged = ds.groupByKey(d => ClassNullableData(d.a, null))
.mapGroups {
case (key, values) => key.a + values.map(_.b).sum
}
中間處理過程的數據是自定義的類型,并且 groupby 后的聚合邏輯也是自定義的,故用 SQL?比較難以表達,所以提出了 Dataset API.Dataset API 擴展 DataFrame API 支持靜態類型和運行已經存在的 Scala 或 Java 語言的用戶自定義函數.同時 Dataset 也能享受 Spark SQL 里所有性能 帶來的提升.
那么后面發現 Dataset 是包含了 DataFrame 的功能,這樣二者就出現了很大的冗余,故在 2.0 時將二者統一,保留 Dataset API,把 DataFrame 表示為 Dataset[Row],即 Dataset 的子集.
因此我們在使用 API 時,優先選擇 DataFrame & Dataset,因為它的性能很好,而且以后的優化它都可以享受到,但是為了兼容早期版本的程序,RDD API 也會一直保留著.后續 Spark 上層的庫將全部會用 DataFrame,比如 MLlib、Streaming、Graphx 等.
Whole-stage code generation
在參考資料 9 中有幾個例子的代碼比較,我們看其中一個例子:
elect count(*) from store_sales where ss_item_sk = 1000
那么在翻譯成計算引擎的執行計劃如下圖:
而通常物理計劃的代碼是這樣實現的:
class Filter {
def next(): Boolean = {
var found = false
while (!found && child.next()) {
found = predicate(child.fetch())
}
return found
}
def fetch(): InternalRow = {
child.fetch()
}…
}
但是真正如果我們用 hard code 寫的話,代碼是這樣的:
var count = 0
for (ss_item_sk in store_sales) {
if (ss_item_sk == 1000) {
count += 1
}
}
發現二者相關如下圖所示:
那么如何使得計算引擎的物理執行速度能達到 hard code 的性能呢?這就提出了 whole-stage code generation,即對物理執行的多次調用轉換為代碼 for 循環,類似 hard code 方式,減少中間執行的函數調用次數,當數據記錄多時,這個調用次數是很大. 最后這個優化帶來的性能提升如下圖所示:
從 benchmark 的結果可以看出,使用了該特性后各操作的性能都有很大的提升.
Spark Streaming 是把流式計算看成一個一個的離線計算來完成流式計算,提供了一套 Dstream 的流 API,相比于其他的流式計算,Spark Streaming 的優點是容錯性和吞吐量上要有優勢?,關于 Spark Streaming 的詳細設計思想和分析,可以到 https://github.com/lw-lin/CoolplaySpark 進行詳細學習和了解.
在 2.0 以前的版本,用戶在使用時,如果有流計算,又有離線計算,就需要用二套 API 去編寫程序,一套是 RDD API,一套是 Dstream API.而且 Dstream API 在易用性上遠不如 SQL 或 DataFrame.
為了真正將流式計算和離線計算在編程 API 上統一,同時也讓 Streaming 作業能夠享受 DataFrame/Dataset 上所帶來的優勢:性能提升和 API 易用,于是提出了 Structured Streaming.最后我們只需要基于 DataFrame/Dataset 可以開發離線計算和流式計算的程序,很容易使得 Spark 在 API 跟業界所說的 DataFlow 來統一離線計算和流式計算效果一樣.
比如在做 Batch Aggregation 時我們可以寫成下面的代碼
那么對于流式計算時,我們僅僅是調用了 DataFrame/Dataset 的不同函數代碼,如下:
最后,在 DataFrame/Dataset 這個 API 上可以完成如下圖所示的所有應用:
在 http://geek.csdn.net/news/detail/70162 提到的 1.6 問題中 Spillable 集合內存溢出問題在 SPARK-4452 里已解決,BlockManager 死鎖問題在 SPARK-12757 里已解決.
最后 2.0 版本還有一些其他的特性,如:
文/王聯輝
文章出處——高可用架構微信公眾號
轉載請注明本頁網址:
http://www.snjht.com/jiaocheng/4521.html