【作者】ByConity開源團隊
談到數據倉庫,一定離不開使用Extract-Transform-Load(ETL)或Extract-Load-Transform(ELT)。將來源不同、格式各異的數據提取到數據倉庫中,並進行處理加工。傳統的數據轉換過程一般採用Extract-Transform-Load(ETL)來將業務數據轉換為適合數倉的數據模型,然而,這依賴於獨立於數倉外的ETL系統,因而維護成本較高。
ByConity作為雲原生數據倉庫,從0.2.0版本開始逐步支持Extract-Load-Transform(ELT),使用戶免於維護多套異構數據系統。本文將介紹ByConity在ELT方面的能力規劃,實現原理和使用方式等。
ETL場景和方案
ELT與ETL的區別
ETL:是用來描述將數據從來源端經過抽取、轉置、載入至目的端(數據倉庫)的過程。Transform通常描述在數據倉庫中的前置數據加工過程。
ELT 專註於將最小處理的數據載入到數據倉庫中,而把大部分的轉換操作留給分析階段。相比起前者(ETL),它不需要過多的數據建模,而給分析者提供更靈活的選項。ELT已經成為當今大數據的處理常態,它對數據倉庫也提出了很多新的要求。
資源重複的挑戰
典型的數據鏈路如下:我們將行為數據、日誌、點擊流等通過MQ/Kafka/ Flink將其接入存儲系統當中,存儲系統又可分為域內的HDFS和雲上的OSS&S3 這種遠程儲存系統,然後進行一系列的數倉的ETL操作,提供給OLAP系統完成分析查詢。
但有些業務需要從上述的存儲中做一個分支,因此會在數據分析的某一階段,從整體鏈路中將數據導出,做一些不同於主鏈路的ETL操作,會出現兩份數據存儲。其次在這過程中也會出現兩套不同的ETL邏輯。
當數據量變大,計算冗餘以及存儲冗餘所帶來的成本壓力也會愈發變大,同時,存儲空間的膨脹也會讓彈性擴容變得不便利。
業界解決思路
在業界中,為了解決以上問題,有以下幾類流派:
數據預計算流派:如Kylin等。如果Hadoop系統中出報表較慢或聚合能力較差,可以去做一個數據的預計算,提前將配的指標的cube或一些視圖算好。實際SQL查詢時,可以直接用裡面的cube或視圖做替換,之後直接返回。
流批一體派:如 Flink、Risingwave。在數據流進時,針對一些需要出報表或者需要做大屏的數據直接內存中做聚合。聚合完成後,將結果寫入HBase或MySQL中再去取數據,將數據取出後作展示。Flink還會去直接暴露中間狀態的介面,即queryable state,讓用戶更好的使用狀態數據。但是最後還會與批計算的結果完成對數,如果不一致,需要進行回查操作,整個過程考驗運維/開發同學的功力。
湖倉一體&HxxP:將數據湖與數據倉庫結合起來。
ELTin ByConity
整體執行流程
ELT任務對系統的要求:
整體易擴展:導入和轉換通常需要大量的資源,系統需要通過水平擴展的方式來滿足數據量的快速增長。
可靠性和容錯能力:大量的job能有序調度;出現task偶然失敗(OOM)、container失敗時,能夠拉起重試;能處理一定的數據傾斜
效率&性能:有效利用多核多機並發能力;數據快速導入;內存使用有效(內存管理);CPU優化(向量化、codegen)
生態&可觀測性:可對接多種工具;任務狀態感知;任務進度感知;失敗日誌查詢;有一定可視化能力
ByConity針對ELT任務的要求,以及當前場景遇到的困難,新增了以下特性和優化改進。
分階段執行(Stage-levelScheduling)
原理解析
當前 ClickHouse的 SQL 執行過程如下:
第一階段,Coordinator 收到分散式表查詢後將請求轉換為對 local 表查詢發送給每個 shard 節點;
第二階段,Coordinator 收到各個節點的結果後匯聚起來處理後返回給客戶端;
ClickHouse 將Join操作中的右錶轉換為子查詢,帶來如下幾個問題都很難以解決:
複雜的query有多個子查詢,轉換複雜度高;
Join表較大時,容易造成worker節點的OOM;
聚合階段在Cooridnator,壓力大,容易成為性能瓶頸;
不同於ClickHouse,我們在ByConity中實現了對複雜查詢的執行優化。通過對執行計劃的切分,將之前的兩階段執行模型轉換為分階段執行。在邏輯計劃階段,根據運算元類型插入exchange運算元。執行階段根據exchange運算元將整個執行計划進行DAG切分,並且分stage進行調度。stage之間的exchange運算元負責完成數據傳輸和交換。
關鍵節點:
exchange節點插入
切分stage
stage scheduler
segment executer
exchange manager
這裡重點來講一下exchange的視線。上圖可以看到,最頂層的是queryplan。下面轉換成物理計劃的時候,我們會根據不同的數據分布的要求轉換成不同的運算元。source層是接收數據的節點,基本都是統一的,叫做ExchangeSource。Sink則有不同的實現,BroadcastSink、Local、PartitionSink等,他們是作為maptask的一部分去運行的。如果是跨節點的數據操作,我們在底層使用統一的brpc流式數據傳輸,如果是本地,則使用內存隊列來實現。針對不同的點,我們進行了非常細緻的優化:
數據傳輸層
進程內通過內存隊列,無序列化,zero copy
進程間使用brpc stream rpc,保序、連接復用、狀態碼傳輸、壓縮等
運算元層
批量發送
線程復用,減少線程數量
帶來的收益
因為ByConity徹底採用了多階段的查詢執行方式,整體有很大的收益:
Cooridnator更穩定、更高效
聚合等運算元拆分到worker節點執行
Cooridnator節點只需要聚合最終結果
Worker OOM減少
進行了stage切分,每個stage的計算相對簡單
增加了exchange運算元,減少內存壓力
網路連接更加穩定、高效
exchange運算元有效傳輸
復用連接池
自適應的調度器(AdaptiveScheduler)
AdaptiveScheduler屬於我們在穩定性方面所做的特性。在OLAP場景中可能會發現部分數據不全或數據查詢超時等,原因是每個worker是所有的query共用的,這樣一旦有一個worker較慢就會導致整個query的執行受到影響。
計算節點共用存在的問題:
Scan 所在的節點負載和不同查詢所需的掃描數據量相關,做不到完全平均;
各 Plan Segment 所需資源差異大;
這就導致worker節點之間的負載嚴重不均衡。負載較重的worker節點就會影響query整體的進程。因此我們做了以下的優化方案:
建立 Worker 健康度機制。Server 端建立 Worker 健康度管理類,可以快速獲取 Worker Group 的健康度信息,包括CPU、內存、運行Query數量等信息。
自適應調度:每個SQL 根據 Worker 健康度動態的進行選擇以及計算節點並發度控制。
查詢的隊列機制(QueryQueue)
我們的集群也會出現滿載情況,即所有的worker都是不健康的或者滿載/超載的,就會用查詢隊列來進行優化。
我們直接在server端做了一個manager。每次查詢的時候manager會去check集群的資源,並且持有一個鎖。如果資源不夠用,則等待資源釋放後去喚醒這個鎖。這就避免了Server端不限制的下發計算任務,導致worker節點超載,然後崩掉的情況。
當前實現相對簡單。server是多實例,每個server實例中都有queue,所持有的是一個局部視角,缺乏全局的資源視角。除此之外,每個queue中的查詢狀態沒有持久化,只是簡單的緩存在內存中。
後續,我們會增加server之間的協調,在一個全局的視角上對查詢並發做限制。也會對server實例中query做持久化,增加一些failover的場景支持。
非同步執行(AsyncExecution)
ELT任務的一個典型特徵就是:相對於即時分析,他們的運行時間會相對較長。一般ELT任務執行時長為分鐘級,甚至到達小時級。
目前ClickHouse的客戶端查詢都採用阻塞的方式進行返回。這樣就造成了客戶端長期處於等待的情況,而在這個等待過程中還需要保持和服務端的連接。在不穩定的網路情況下,客戶端和服務端的連接會斷開,從而導致服務端的任務失敗。
為了減少這種不必要的失敗,以及減少客戶端為了維持連接的增加的複雜度。我們開發了非同步執行的功能,它的實現如下:
用戶指定非同步執行。用戶可以通過settings enable_async_query = 1的方式進行per query的指定。也可以通過set enable_async_query = 1的方式進行session級別的指定。
如果是非同步query,則將其放到後台線程池中運行
靜默io。當非同步query執行時,則需要切斷它和客戶端的交互邏輯,比如輸出日誌等。
針對query的初始化還是在session的同步線程中進行。一旦完成初始化,則將query狀態寫入到metastore,並向客戶端返回asyncquery id。客戶端可以用這個id查詢query的狀態。asyncqueryid返回後,則表示完成此次查詢的交互。這種模式下,如果語句是select,那麼後續結果則無法回傳給客戶端。這種情況下我們推薦用戶使用asyncquery + select...into outfile的組合來滿足需求。
未來規劃
針對ELT混合負載,ByConity0.2.0版本目前只是牛刀小試。後續的版本中我們會持續優化查詢相關的能力,ELT為核心的規劃如下:
故障恢復能力
運算元Spill
Sort、Agg、Join 運算元Spill;
Exchange Spill 能力;
Recoverability 容錯恢復
運算元執行恢復:ELT任務運行時長較長時,中間 Task的偶發失敗會導致整個Query失敗,支持Task 級別重試可以極大地降低環境原因導致的偶發失敗;
Stage重試:當節點失敗時,可以進行 Stage級別的重試;
保存隊列作業狀態的能力;
Remote Shuffle Service:當前業界開源的 shuffle service通常為Spark定製,沒有通用的客戶端,比如c++客戶端。後續我們會補充這部分能力。
資源
計算資源可指定:用戶可指定query需要的計算資源;
計算資源預估/預占:可動態預估query需要的計算資源,並通過預占的方式進行調配;
動態申請資源:當前worker均為常駐進程/節點。動態申請資源可以提高利用率;
更細粒度的資源隔離:通過worker group或者進程級別的隔離,減少各query之間相互影響;