當前位置︰技術分享 > 技術參考 > 正文

流式數據處理在百度數據工廠的應用與實踐2019-06-20 16:55:08 | 編輯︰hely | 查看︰ | 評論︰0

流式數據處理在百度數據工廠的應用與實踐

本文整理自百度李俊卿在QCon上的演講︰《流式數據處理在百度數據工廠的應用與實踐》。

百度數據工廠最原先用Hive引擎,進行離線批量數據分析和PB級別的查詢,處理一些核心報表數據。但是在我們推廣過程中發現,用戶其實還是有復雜分析、實時處理、數據挖掘的請求,我們在Spark1.0推出的時候,就開始跟進Spark。在Spark1.6時候徹底在團隊中推廣起來,當時是SparkStreaming。當時Spark1.6的API基于RDD,它和Spark批式處理的API並不同步。後來在Spark2.2、2.3發布的時候,Spark推出了StructStreaming,它的API和批處理API已經完全一致,這時候我們迎來一次完整的架構升級,基于Spark做了一次更強大的封裝;以Spark為基礎,加上其他的CRM,PFS等各種模塊來做統一元數據處理;統一的資源調度;以Spark為基礎做了統一的一個計算引擎,以前Hive的一套也有完全融入到Spark里來;包括多種提交方式;安全管理等等。最後形成一套完整的成品。

 

 

這就是我們目前的一個整體的技術架構。左下角是統一元數據處理,包括文件類的元數據處理和結構類數據,比如說Hive,或者MySQL的元數據處理,這都是作為統一的處理層的。還有就是統一調度引擎處理,我們會有統一的調度引擎,由用戶注冊隊列,執行的時候只需要顯示隊列,去執行具體的資源。目前也支持K8S等。再往上一層是統一的Spark引擎。Spark引擎里面我們現在目前支持了SQL、DatasetAPI,可在上面跑各種復雜的處理。

再往上一層就是由Jupyter提供的工作空間,還有由我們自研提供的調度,自研的一套流式計算作業處理,整個一套就構成了現在完整的數據工廠。

流式數據處理在百度數據工廠的應用

接下來是比較最核心的部分,百度在Spark流批處理上做的哪些內容,主要是Spark流式SQL問題、實時轉離線問題、實時轉大屏展示問題。

 

 

首先Spark本身有一套完整的API的,有專門的引擎用來分析,所有的流、Batch分析,信息都會經過這套API進行一系列的處理,包括語義分析、語法分析、一些優化等。大家可以注意到,右下角是空缺的,Spark目前是沒有提供這部分內容的。而用戶轉型過程中很多用戶都是由Hive過來的,是熟悉應用SQL的用戶,他們要處理流式數據,就會迫切的需要有一個SQL的引擎,來幫他將Hive的SQL轉型成一個流式的SQL。

 

 

我們針對這個問題進行了一些開發。首先大家看一下,上面是SparkAPI層,Spark在流批處理上的API已經做得很完善了,是通過read和readStream來進行區分的。批到SQL處理,其實就是,可以完全看到它是首先是Sparkread一個Source,而它會映射成一個FromTable,具體的處理會映射成select、join、union等各種操作。然後WriteTable這種,在最後映射成了kafka_Table。我們流式SQL也可以映射這種類型的。

比如說這個例子,它從Kafka讀取,輸出到HDFS,映射的流式SQL的就是Kafka_Table。我們可以專門定義一個KafkaTable。用戶的處理,我們會變成。。大家注意,我中間加了一個stream關鍵字。Spark在處理的時候,做了統一引擎的處理。只有API層用戶寫readStream的時候,它才是一個流式的處理,如果它用戶寫一個Read,它就是一個批處理。相對來說,SQL層我們也做了相應的處理,跑的同樣的SQL,用戶如果沒加Stream關鍵字,它就是批處理的一個SQL,如果加Stream關鍵字,它就變成一個流式處理。但里面處理的引擎是完全一致的,只不過最後轉換Plan的時候,將最後原Plan的部分轉換成了流的plan。

 

 

這里也涉及了Table是怎麼存儲的。Table存儲,如圖所示,一個批量的Table存儲的時候,無非就是Tablename、Schema、Properties,還有一些定義,比如說location等一些配置的信息。同樣可以把我們創建的KafkaTable也可以映射進去,比如說我創建的Table名稱可以映射TableName等,和Meta這邊進行一一對應,這樣我們在Hive那一側,已經有了完善的一套Table的存儲信息。它會生成一個虛擬的路徑,其實並不會真正去創建,並不會真正去對它執行的。在數據工廠中,我們是這樣定義一個Table的。一個Table定義出來以後為要能被多個人使用,但臨時Table除外,你能夠授權給其他人去讀,也能夠去進行列裁減,能夠對它進行一些如數脫敏的數據分析,這時候就需要考慮Table的通用性。

首先第一點,我們創建的Table不能存放多個數據源。比如說存放三個數據源,這時完全可以定義三個Table,不同的數據源做Join,做Unit分析等,這在Spark里面都是OK的。

第二點只能定義通用配置。給別人使用,授權,計算的時候,肯定會需要考慮他自己的應用場景,這里面不能去攙雜何與應用場景相關信息,比如說watermark配置,這就是屬于與應用場景相關的信息,這部分信息都是不能存放于Table里面的。

另外就是Spark原生是支持Kafka,流式的Table。它要既能夠去流式的去讀,也能夠批量的去讀,既然定義了Table我們怎麼去讀呢?這就是我們在Spark里面的改進,主要改動的部分就是改動了語義分析層的一個邏輯。大家可以看一下右側整個邏輯框架。

 

 

我們主要改動就改動在語義層分析上。在這個規則上增加一個語義分析FindDataSouce,完成流式表解析,和增加了一個專門的可執行類SQLStreamingSink,用來專門讀取。比如說讀取Watermark或各種的配置,最終產生一個流式的Plan去執行。最後經過語義層分析之後,就變成了一個SQLStreamingSink。到這一步,一旦執行起來,它和正常API調度是一樣的,這樣API和SQL在語義層之後已經達到完整一致了。整體只在語義層這個地方做了一些區別而已。

 

 

總結來說,首先我們提供了一個統一的Hive元數據存儲,我們所有的Table都是基于Hive的,當然Hive只是一種方案,主要是用數據倉庫的形式來對數據進行統一管理。我們升級了FindDataSource,用來處理Table到Source一個定義。比如說用戶的groupby,filter,sum等操作都會通過語義解析的時候,會對它進行分析。現在這套方案,我們已經提供給社區。如果大家有興趣的可以一塊看看有什麼更好的方案解決StreamingJoinBach的處理,在Spark的Patch中搜SQLStreaming可以看到目前正在討論的鏈接,歡迎一起討論相關方案。

實時轉離線問題

很多人會說實時轉離線用的場景多嗎?其實在百度內的場景用的不少的,因為比如說用戶的點擊日志數據,大部分都是實時存放在日志中的,他會進行流式處理時候會全部打到Kafka消息隊列里面,它還是有些需求,對數據進行一些比如說天級別,月級別,甚至年級別的數據分析。這時候應該怎麼辦呢?于是我們就提供了實時轉離線的功能,其實Spark本身已經提供了一個實時轉離線的,就是右側的這個代碼。

 

 

比如說定義了一個CSV,定義一下它的分區路徑,定義partition、location等,去執行寫的操作時是有問題的。首先它輸出信息完全是由人為去記在如一個卡片里,記在一個統一的文檔里,下游去用的時候,問上游詢要。一旦上游改變了,里面信息肯定就完全不對稱了。

另外一個就是遷移升級問題。在百度內部存在集群遷移的問題,一個機房一般用幾年之後,都會下線,這時候就會存在機型遷移。在輸出路徑上,或者在輸出格式上,需要做出改變。而現在,一般來說,就是很多數據開發的人員,更期望的是開發的代碼,能長時間運行。如果我用當前的實時轉離線方案的話,肯定需要拿到原生的代碼進行修改,再搭架構,再執行,這也是現實場景中用戶吐槽最多的。

另外就是一個拓展文件格式,比如說甲方要求輸出sequenceFile,並指定了格式。但下游需要的sequenceFile格式是不同,怎麼去拓展?這樣開發代價很大,很多用戶是比較抵制的,我們提供的方案是什麼呢?就是實時轉數倉的方案。

 

 

大家知道Hive保存了很多信息,里面這些元數據信息,比如說有統一的管理員進行管理之後,這些信息都可以很明晰,很明了的被其他人看到。但輸出升級或遷移了,這時候修改location就可以了,代碼比較好改。具體代碼是什麼樣呢?就是右下角的這行代碼。

 

 

這個方案具體實現是這樣的,原生有一個FileSink,讀取CSV的數據,然後對數據進行處理之後,進行流式輸出。只不過我們改動變成了Hive的FileFormat,以批量的形式進行寫入,然後輸出出去。我們在創建FileSink的時候,我們會讀一下用戶填入的Table,拿到它的信息,注入進去,進行一個寫操作。大家可以注意到有一個分析監听器,在運行完每一個批量,都有一個信息反饋,如到底輸出了哪些partition,能拿到相應的分區信息。拿到這個分區信息之後,可以對它進行一些處理,最後注入到Hive中,這樣的話到底生產了哪些partition,完全可以在Hive里面可以去查到的。

基本上這就是這完整的一套實時轉數倉方案。我們基于這一套方案,在百度內部已經推廣到了很多業務上。在百度內部還有很多細粒度的權限處理使用實時轉數倉方案。一旦寫入數倉,下游在訂閱的時候會有一些數據脫敏,或者是讀寫權限控制,或者列權限控制,都能用這套方案進行控制,所以實時轉數倉方案在百度內部還是比較受歡迎的。

實時轉大屏展示

我們在推廣過程中,用戶是也些大屏展示的使用場景的。

 

 

這頁顯示的這是一個通用的用戶使用場景,比如說將Streaming實時的數據輸出到一個OLAP里面,最終展示給大屏。但是這個場景有什麼問題呢?這里需要額外使用其他的系統,比如Kafka系統、OLAP系統,最後再接入到大屏展示里面。每個系統,都需要找不同的負責人去處理,中間一旦出問題,需要反復去找負責人,與他們相互協調。中間會很復雜,一旦有一個網絡出現故障,數據就會延時,負責人都需要處理自己的業務瓶頸。處理瓶頸是在用戶使用場景中比較難辦的問題。

 

 

我們是如何處理呢?現有的一套解決方案提供了一套原生的基于SparkSQL,Sparkshell的系統。我們自研了一套就是交互式的分析,可以在本地,通過LivyJBDCAPI,當用戶select*或者什麼的時候,它完全可以交互式的提交到集群上處理,然後反饋給用戶。這時候結合SparkMemorySink去處理,將數據進行一些復雜的分析,具體的業務邏輯處理之後,它會將數據寫入到集群內存里面。前端通過LivyJDBCAPI去交互式的去查詢Spark集群內存的數據。這樣數據是直接落到集群內存的,沒有任何產出,是保存在內存里面的。Livy去查的時候直接從里面去拿出來,相當于直接走內存,不會走經過落盤,網絡等。這套方案就在實時處理和部署上,比較受歡迎。

 

 

然後這就是,我們整個的流批的處理,用戶通過各種方式傳輸數據,我們經過流到流,或者是流到批,或者是批到批的處理,當然中間我們可以完全映射成Table,就是流式的Table到批的Table,再到具體的業務輸出方。或者流式的Table到流式的Table,或者是批的Table的到批的Table,這完全是OK的。

流式數據處理在百度數據工廠的實踐

基于這套數據處理,我給大家簡單介紹一下百度內部的一些實踐。下圖是流式的第一版本的產品化界面。我們已經在設計第三版了。左邊是一個流式SQL的一個提交界面,用戶可以在里面添加SQL,右邊是流式監控。和現在Spark比較,和Spark監控頁面比較類似的,監控當天有什麼數據,顯示實際處理量、數據時延等。

 

 

廣告物料分析實踐案例是我們一個比較典型的流批處理的使用場景。在實際的產品運維過程中,存在廣告主投放廣告的場景。廣告主投放廣告付錢是要看真實的點擊率、曝光率和轉化率的,而且很多廣告主是根據曝光量、點擊量、轉化量來付錢的。

這種情況下,我們就需要專門針對廣告物料進行分析,根據點擊、曝光日志和轉化數據生成廣告的pv、uv、點擊率和轉化率,並根據計費數據生成廣告收益。

 

 

可以看到我們會通過Streaming直接輸出到大屏展示。廣告主會直接看到實時的用戶量、當前的產出、收益。另外一部分數據產出到離線里面,通過實時轉離線輸出出來,進行一個日級別,天級別的分析,生成一些天級別,還有月級別的一些PV量,供後台做策略人員分析,以調整廣告的投放策略。

講師︰李俊卿,百度高級研發工程師,數據工廠流式數據處理負責人。

上一篇︰如何基于 Apache Pulsar 和 Spark 進行批流一體的彈性數據處理? 三行Python代碼,可以讓你的數據處理快別人4倍下一篇︰