BIGO 是一家面向海外的以短視頻直播業(yè)務為主的公司, 目前公司的主要業(yè)務包括 BigoLive (全球直播服務),Likee (短視頻創(chuàng)作分享平臺),IMO (免費通信工具) 三部分,在全球范圍內擁有 4 億用戶。
一、業(yè)務背景
BIGO 是一家面向海外的以短視頻直播業(yè)務為主的公司, 目前公司的主要業(yè)務包括 BigoLive (全球直播服務),Likee (短視頻創(chuàng)作分享平臺),IMO (免費通信工具) 三部分,在全球范圍內擁有 4 億用戶。伴隨著業(yè)務的發(fā)展,對數(shù)據(jù)平臺處理能力的要求也是越來越高,平臺所面臨的問題也是日益凸顯,接下來將介紹 BIGO 大數(shù)據(jù)平臺及其所面臨的問題。BIGO 大數(shù)據(jù)平臺的數(shù)據(jù)流轉圖如下所示:
用戶在 APP,Web 頁面上的行為日志數(shù)據(jù),以及關系數(shù)據(jù)庫的 Binlog 數(shù)據(jù)會被同步到 BIGO 大數(shù)據(jù)平臺消息隊列,以及離線存儲系統(tǒng)中,然后通過實時的,離線的數(shù)據(jù)分析手段進行計算,以應用于實時推薦、監(jiān)控、即席查詢等使用場景。然而存在以下幾個問題:OLAP 分析平臺入口不統(tǒng)一:Presto/Spark 分析任務入口并存,用戶不清楚自己的 SQL 查詢適合哪個引擎執(zhí)行,盲目選擇,體驗不好;另外,用戶會在兩個入口同時提交相同查詢,以更快的獲取查詢結果,導致資源浪費;
離線任務計算時延高,結果產出太慢:典型的如 ABTest 業(yè)務,經常計算到下午才計算出結果;
各個業(yè)務方基于自己的業(yè)務場景獨立開發(fā)應用,實時任務煙囪式的開發(fā),缺少數(shù)據(jù)分層,數(shù)據(jù)血緣。
面對以上的問題,BIGO 大數(shù)據(jù)平臺建設了 OneSQL OLAP 分析平臺,以及實時數(shù)倉。- 通過 OneSQL OLAP 分析平臺,統(tǒng)一 OLAP 查詢入口,減少用戶盲目選擇,提升平臺的資源利用率;
- 通過 Flink 構建實時數(shù)倉任務,通過 Kafka/Pulsar 進行數(shù)據(jù)分層;
- 將部分離線計算慢的任務遷移到 Flink 流式計算任務上,加速計算結果的產出。
另外建設實時計算平臺 Bigoflow 管理這些實時計算任務,建設實時任務的血緣關系。
二、落地實踐 & 特色改進
2.1 OneSQL OLAP 分析平臺實踐和優(yōu)化
OneSQL OLAP 分析平臺是一個集 Flink、Spark、Presto 于一體的 OLAP 查詢分析引擎。用戶提交的 OLAP 查詢請求通過 OneSQL 后端轉發(fā)到不同執(zhí)行引擎的客戶端,然后提交對應的查詢請求到不同的集群上執(zhí)行。其整體架構圖如下:

該分析平臺整體結構從上到下分為入口層、轉發(fā)層、執(zhí)行層、資源管理層。為了優(yōu)化用戶體驗,減少執(zhí)行失敗的概率,提升各集群的資源利用率,OneSQL OLAP 分析平臺實現(xiàn)了以下功能:- 統(tǒng)一查詢入口:入口層,用戶通過統(tǒng)一的 Hue 查詢頁面入口以 Hive SQL 語法為標準提交查詢;
- 統(tǒng)一查詢語法:集 Flink、Spark、Presto 等多種查詢引擎于一體,不同查詢引擎通過適配 Hive SQL 語法來執(zhí)行用戶的 SQL 查詢任務;
- 智能路由:在選擇執(zhí)行引擎的過程中,會根據(jù)歷史 SQL 查詢執(zhí)行的情況 (在各引擎上是否執(zhí)行成功,以及執(zhí)行耗時),各集群的繁忙情況,以及各引擎對該 SQL 語法的是否兼容,來選擇合適的引擎提交查詢;
- 失敗重試:OneSQL 后臺會監(jiān)控 SQL 任務的執(zhí)行情況,如果 SQL 任務在執(zhí)行過程中失敗,將選擇其他的引擎執(zhí)行重試提交任務。
如此一來,通過 OneSQL OLAP 分析平臺,BIGO 大數(shù)據(jù)平臺實現(xiàn)了 OLAP 分析入口的統(tǒng)一,減少用戶的盲目選擇,同時充分利用各個集群的資源,減少資源空閑情況。2.1.1 Flink OLAP 分析系統(tǒng)建設
在 OneSQL 分析平臺上,F(xiàn)link 也作為 OLAP 分析引擎的一部分。Flink OLAP 系統(tǒng)分成兩個組成部分:Flink SQL Gateway 和 Flink Session 集群;SQL Gateway 作為 SQL 提交的入口,查詢 SQL 經過 Gateway 提交到 Flink Session 集群上執(zhí)行,同時獲取 SQL 執(zhí)行查詢的進度,以及返回查詢的結果給客戶端。其執(zhí)行 SQL 查詢的流程如下:
首先用戶提交過來的 SQL,在 SQL Gateway 進行判斷:是否需要將結果持久化寫入到 Hive 表,如果需要,則會先通過 HiveCatalog 的接口創(chuàng)建一個 Hive 表,用于持久化查詢任務的計算結果;之后,任務通過 SQL Gateway 上執(zhí)行 SQL 解析,設置作業(yè)運行的并行度,生成 Pipeline 并提交到 Session 集群上執(zhí)行。
為了保證整個 Flink OLAP 系統(tǒng)的穩(wěn)定性,以及高效的執(zhí)行 SQL 查詢,在這個系統(tǒng)中,進行了以下功能增強:
穩(wěn)定性:
- 基于 zookeeper HA 來保證 Flink Session 集群的可靠性,SQL Gateway 監(jiān)聽 Zookeeper 節(jié)點,感知 Session 集群;
- 控制查詢掃描 Hive 表的數(shù)據(jù)量,分區(qū)個數(shù),以及返回結果數(shù)據(jù)量,防止 Session 集群的 JobManager,TaskManager 因此出現(xiàn) OOM 情況。
性能:
Flink Session 集群預分配資源,減少作業(yè)提交后申請資源所需的時間;
Flink JobManager 異步解析 Split,Split 邊解析任務邊執(zhí)行,減少由于解析 Split 阻塞任務執(zhí)行的時間;
控制作業(yè)提交過程中掃描分區(qū),以及 Split 最大的個數(shù),減少設置任務并行所需要的時間。
Hive SQL 兼容:
針對 Flink 對于 Hive SQL 語法的兼容性進行改進,目前針對 Hive SQL 的兼容性大致為 80%。監(jiān)控告警:
監(jiān)控 Flink Session 集群的 JobManager,TaskManager,以及 SQL Gateway 的內存,CPU 使用情況,以及任務的提交情況,一旦出現(xiàn)問題,及時告警和處理。
2.1.2 OneSQL OLAP 分析平臺取得的成果
基于以上實現(xiàn)的 OneSQL OLAP 分析平臺,取得了以下幾個收益:- 統(tǒng)一查詢入口,減少用戶的盲目選擇,用戶執(zhí)行出錯率下降 85.7%,SQL 執(zhí)行的成功率提升 3%;
- SQL 執(zhí)行時間縮短 10%,充分利用了各個集群的資源,減少任務排隊等待的時間;
- Flink 作為 OLAP 分析引擎的一部分,實時計算集群的資源利用率提升了 15%。
2.2 實時數(shù)倉建設和優(yōu)化
為了提升 BIGO 大數(shù)據(jù)平臺上某些業(yè)務指標的產出效率,以及更好的管理 Flink 實時任務,BIGO 大數(shù)據(jù)平臺建設了實時計算平臺 Bigoflow,并將部分計算慢的任務遷移到實時計算平臺上,通過 Flink 流式計算的方式來執(zhí)行,通過消息隊列 Kafka/Pulsar 來進行數(shù)據(jù)分層,構建實時數(shù)倉;在 Bigoflow 上針對實時數(shù)倉的任務進行平臺化管理,建立統(tǒng)一的實時任務接入入口,并基于該平臺管理實時任務的元數(shù)據(jù),構建實時任務的血緣關系。2.2.1 建設方案
BIGO 大數(shù)據(jù)平臺主要基于 Flink + ClickHouse 建設實時數(shù)倉,大致方案如下:
按照傳統(tǒng)數(shù)據(jù)倉庫的數(shù)據(jù)分層方法,將數(shù)據(jù)劃分成 ODS、DWD、DWS、ADS 等四層數(shù)據(jù):ODS 層:基于用戶的行為日志,業(yè)務日志等作為原始數(shù)據(jù),存放于 Kafka/Pulsar 等消息隊列中;
DWD 層:這部分數(shù)據(jù)根據(jù)用戶的 UserId 經過 Flink 任務進行聚合后,形成不同用戶的行為明細數(shù)據(jù),保存到 Kafka/Pulsar 中;
DWS 層:用戶行為明細的 Kafka 流表與用戶 Hive/MySQL 維表進行流維表 JOIN,然后將 JOIN 之后產生的多維明細數(shù)據(jù)輸出到 ClickHouse 表中;
ADS 層:針對 ClickHouse 中多維明細數(shù)據(jù)按照不同維度進行匯總,然后應用于不同的業(yè)務中。
按照以上方案建設實時數(shù)據(jù)倉庫的過程中,遇到了一些問題:- 將離線任務轉為實時計算任務后,計算邏輯較為復雜 (多流 JOIN,去重),導致作業(yè)狀態(tài)太大,作業(yè)出現(xiàn) OOM (內存溢出) 異?;蛘咦鳂I(yè)算子背壓太大;
- 維表 Join 過程中,明細流表與大維表 Join,維表數(shù)據(jù)過多,加載到內存后 OOM,作業(yè)失敗無法運行;
- Flink 將流維表 Join 產生的多維明細數(shù)據(jù)寫入到 ClickHouse,無法保證 Exactly-once,一旦作業(yè)出現(xiàn) Failover,就會導致數(shù)據(jù)重復寫入。
2.2.2 問題解決 & 優(yōu)化
優(yōu)化作業(yè)執(zhí)行邏輯,減小狀態(tài)離線的計算任務邏輯較為復雜,涉及多個 Hive 表之間的 Join 以及去重操作,其大致邏輯如下:
當將離線的作業(yè)轉為 Flink 的流式任務之后,原先離線 Join 多個 Hive 表的場景就轉變?yōu)?Join 多個 Kafka Topic 的場景。由于 Join 的 Kafka topic 的流量較大,且 Join 的窗口時間較長 (窗口最長的為 1 天),當作業(yè)運行一段時間內,Join 算子上就積累了大量的狀態(tài) (一小時后狀態(tài)就接近 1T),面對如此大的狀態(tài),F(xiàn)link 作業(yè)采取 Rocksdb State Backend 來存放狀態(tài)數(shù)據(jù),但是仍然避免不了 Rocksdb 內存使用超過導致被 YARN kill 的問題,或者是 Rocksdb State 上存的狀態(tài)太多,吞吐下降導致作業(yè)嚴重背壓。針對這個問題,我們將這多個 Topic,按照相同的 Schema 進行 Unoin all 處理,得到一個大的數(shù)據(jù)流,然后在這個大的數(shù)據(jù)流中,再根據(jù)不同事件流的 event_id 進行判斷,就能知道這條數(shù)據(jù)來自哪一個事件流的 Topic,再進行聚合計算,獲取對應事件流上的計算指標。
這樣一來,通過 UNION ALL 代替 JOIN,避免了因為 JOIN 計算帶來的大 State 帶來的影響。另外,在計算任務中還存在有比較多的 count distinct 計算,類似如下:

這些 count distinct 計算在同一個 group by 中,并基于相同的 postid 進行去重計算,因而可以讓這些 distinct state 可以共享一組 key 來進行去重計算,那么就可以通過一個 MapState 來存儲這若干個 count distinct 的狀態(tài),如下:
這些 count distinct 函數(shù)去重的 key 相同,因而可以共享 MapState 中的 key 值,從而優(yōu)化存儲空間;而 Mapstate 的 Value 是 Byte 數(shù)組,每個 Byte 8 個 bit,每個 bit 為 0 或者 1,第 n 個 bit 對應了 n 個 count distinct 函數(shù)在該 key 上的取值:1 表示該 count disitnct 函數(shù)在對應的 key 上需要進行計數(shù),0 表示不需要計數(shù);當計算聚合結果的時候,則將所有 key 第 n 位的數(shù)字相加,即為第 n 個 count distinct 的取值,這樣一來,就更進一步節(jié)約了狀態(tài)的存儲空間。通過以上優(yōu)化,成功的將 ABTest 的離線任務遷移到 Flink 流式計算任務上,將作業(yè)的狀態(tài)控制在 100GB 以內,讓作業(yè)正常的運行起來。流維表 JOIN 優(yōu)化
生成多維明細寬表的過程中,需要進行流維表 JOIN, 使用了 Flink Join Hive 維表的功能:Hive 維表的數(shù)據(jù)會被加載到任務的 HashMap 的內存數(shù)據(jù)結構中,流表中的數(shù)據(jù)再根據(jù) Join Key 與 HashMap 中的數(shù)據(jù)進行 Join。但是面對上億,十億行的 Hive 大維表,加載到內存的數(shù)據(jù)量太大,很容易導致 OOM (內存溢出)。針對以上問題,我們將 Hive 大維表按照 Join Key 進行 Hash 分片,如下圖:
這樣一來,Hive 大維表的數(shù)據(jù)經過 Hash 函數(shù)計算后分布到 Flink 作業(yè)的不同并行子任務的 HashMap 中,每個 HashMap 只存放大維表的一部分數(shù)據(jù),只要作業(yè)的并行度夠大,就能夠將大維表的數(shù)據(jù)拆分成足夠多份,進行分片保存;對于一些太大的維表,也可以采取 Rocksdb Map State 來保存分片數(shù)據(jù)。Kafka 流表中的數(shù)據(jù),當要下發(fā)到不同的 subtask 上進行 Join 時,也通過相同的 Join Key 按照相同的 Hash 函數(shù)進行計算,從而將數(shù)據(jù)分配到對應的 subtask 進行 Join,輸出 Join 后的結果。通過以上優(yōu)化,成功 Join 了一些 Hive 大維表任務來執(zhí)行流維表 Join 計算,最大的維表超過 10 億行。ClickHouse Sink 的 Exactly-Once 語義支持將流維表 Join 生成的多維明細數(shù)據(jù)輸出到 ClickHouse 表的過程中,由于社區(qū)的 ClickHouse 不支持事務,所以沒辦法保證數(shù)據(jù) sink 到 ClickHouse 過程中的 Exactly-Once 語義。在此過程中,一旦出現(xiàn)作業(yè) Failover,數(shù)據(jù)就會重復寫入到 ClickHouse。針對這個問題,BIGO ClickHouse 實現(xiàn)了一個二階段提交事務機制:當需要寫入數(shù)據(jù)到 ClickHouse 時,可以先設置寫入的模式為 temporary,表明現(xiàn)在寫入的數(shù)據(jù)是臨時數(shù)據(jù);當數(shù)據(jù)執(zhí)行插入完成后,返回一個 Insert id,然后根據(jù)該 Insert id 執(zhí)行 Commit 操作,那么臨時數(shù)據(jù)就轉為正式數(shù)據(jù)。基于 BIGO ClickHouse 的二階段提交事務機制,并結合 Flink 的 checkpoint 機制,實現(xiàn)了一個 ClickHouse Connector,保證 ClickHouse Sink 的 Exactly Once 寫入語義,如下:
在正常寫入的情況下,Connector 隨機選擇 ClickHouse 的某一個 shard 寫入,根據(jù)用戶配置寫單副本,或者雙副本來執(zhí)行 insert 操作,并記錄寫入后的 insert id;在兩次 checkpoint 之間就會有多次這種 insert 操作,從而產生多個 insert id,當 checkpoint 完成時,再將這些 insert id 批量提交,將臨時數(shù)據(jù)轉為正式數(shù)據(jù),即完成了兩次 checkpoint 間數(shù)據(jù)的寫入;
一旦作業(yè)出現(xiàn) Failover,F(xiàn)link 作業(yè) Failover 重啟完成后,將從最近一次完成的 checkpoint 來恢復狀態(tài),此時 ClickHouse Sink 中的 Operator State 可能會包含上一次還沒有來得及提交完成的 Insert id,針對這些 insert id 進行重試提交;針對那些數(shù)據(jù)已經寫入 ClickHouse 中之后,但是 insert id 并沒有記錄到 Opeator State 中的數(shù)據(jù),由于是臨時數(shù)據(jù),在 ClickHouse 中并不會被查詢到,一段時間后,將會由 ClickHouse 的過期清理機制,被清理掉,從而保證了狀態(tài)回滾到上一次 checkpoint 之后,數(shù)據(jù)不會重復。
通過以上機制,成功保證了數(shù)據(jù)從 Kafka 經過 Flink 計算后寫入到 ClickHouse 整個鏈路中端到端的 Exactly-Once 語義,數(shù)據(jù)不重復也不丟失。2.2.3 平臺建設
為了更好的管理 BIGO 大數(shù)據(jù)平臺的實時計算任務,公司內部建設了 BIGO 實時計算平臺 Bigoflow,為用戶提供統(tǒng)一的 Flink實時任務接入,平臺建設如下:

- 支持 Flink JAR、SQL、Python 等多種類型作業(yè);支持不同的 Flink 版本,覆蓋公司內部大部分實時計算相關業(yè)務;
- 一站式管理:集作業(yè)開發(fā)、提交、運行、歷史展示、監(jiān)控、告警于一體,便于隨時查看作業(yè)的運行狀態(tài)和發(fā)現(xiàn)問題;
- 血緣關系:方便查詢每個作業(yè)的數(shù)據(jù)源、數(shù)據(jù)目的、數(shù)據(jù)計算的來龍去脈。
三、應用場景
3.1 Onesql OLAP 分析平臺應用場景
Onesql OLAP 分析平臺在公司內部的應用場景是:應用于 AdHoc 查詢,如下:
用戶通過 Hue 頁面提交的 SQL,通過 OneSQL 后端轉發(fā)給 Flink SQL Gateway,并提交到 Flink Session 集群上執(zhí)行查詢任務,F(xiàn)link SQL Gateway 獲取查詢任務的執(zhí)行進度返回給 Hue 頁面,并返回查詢結果。
3.2 實時數(shù)據(jù)倉庫應用場景
實時數(shù)據(jù)倉庫應用場景目前主要是 ABTest 業(yè)務,如下:
用戶的原始行為日志數(shù)據(jù)經過 Flink 任務聚合后生成用戶明細數(shù)據(jù),然后與維表數(shù)據(jù)進行流維表 JOIN,輸出到 ClickHouse 生成多維明細寬表,按照不同維度匯總后,應用于不同的業(yè)務。通過改造 ABTest 業(yè)務,將該業(yè)務的結果指標的生成時間提前了 8 個小時,同時減少了使用資源一倍以上。
四、未來規(guī)劃
為了更好的建設 OneSQL OLAP 分析平臺以及 BIGO 實時數(shù)據(jù)倉庫,實時計算平臺的規(guī)劃如下:
完善 Flink OLAP 分析平臺,完善 Hive SQL 語法支持,以及解決計算過程中出現(xiàn)的 JOIN 數(shù)據(jù)傾斜問題;
完善實時數(shù)倉建設,引入數(shù)據(jù)湖技術,解決實時數(shù)倉中任務數(shù)據(jù)的可重跑回溯范圍小的問題;
基于 Flink 打造流批一體的數(shù)據(jù)計算平臺。
立即登錄,閱讀全文