BIGO 是一家面向海外的以短視頻直播業(yè)務(wù)為主的公司, 目前公司的主要業(yè)務(wù)包括 BigoLive (全球直播服務(wù)),Likee (短視頻創(chuàng)作分享平臺),IMO (免費通信工具) 三部分,在全球范圍內(nèi)擁有 4 億用戶。
一、業(yè)務(wù)背景
BIGO 是一家面向海外的以短視頻直播業(yè)務(wù)為主的公司, 目前公司的主要業(yè)務(wù)包括 BigoLive (全球直播服務(wù)),Likee (短視頻創(chuàng)作分享平臺),IMO (免費通信工具) 三部分,在全球范圍內(nèi)擁有 4 億用戶。伴隨著業(yè)務(wù)的發(fā)展,對數(shù)據(jù)平臺處理能力的要求也是越來越高,平臺所面臨的問題也是日益凸顯,接下來將介紹 BIGO 大數(shù)據(jù)平臺及其所面臨的問題。BIGO 大數(shù)據(jù)平臺的數(shù)據(jù)流轉(zhuǎn)圖如下所示:用戶在 APP,Web 頁面上的行為日志數(shù)據(jù),以及關(guān)系數(shù)據(jù)庫的 Binlog 數(shù)據(jù)會被同步到 BIGO 大數(shù)據(jù)平臺消息隊列,以及離線存儲系統(tǒng)中,然后通過實時的,離線的數(shù)據(jù)分析手段進(jìn)行計算,以應(yīng)用于實時推薦、監(jiān)控、即席查詢等使用場景。然而存在以下幾個問題:OLAP 分析平臺入口不統(tǒng)一:Presto/Spark 分析任務(wù)入口并存,用戶不清楚自己的 SQL 查詢適合哪個引擎執(zhí)行,盲目選擇,體驗不好;另外,用戶會在兩個入口同時提交相同查詢,以更快的獲取查詢結(jié)果,導(dǎo)致資源浪費;
離線任務(wù)計算時延高,結(jié)果產(chǎn)出太慢:典型的如 ABTest 業(yè)務(wù),經(jīng)常計算到下午才計算出結(jié)果;
各個業(yè)務(wù)方基于自己的業(yè)務(wù)場景獨立開發(fā)應(yīng)用,實時任務(wù)煙囪式的開發(fā),缺少數(shù)據(jù)分層,數(shù)據(jù)血緣。
面對以上的問題,BIGO 大數(shù)據(jù)平臺建設(shè)了 OneSQL OLAP 分析平臺,以及實時數(shù)倉。- 通過 OneSQL OLAP 分析平臺,統(tǒng)一 OLAP 查詢?nèi)肟冢瑴p少用戶盲目選擇,提升平臺的資源利用率;
- 通過 Flink 構(gòu)建實時數(shù)倉任務(wù),通過 Kafka/Pulsar 進(jìn)行數(shù)據(jù)分層;
- 將部分離線計算慢的任務(wù)遷移到 Flink 流式計算任務(wù)上,加速計算結(jié)果的產(chǎn)出。
另外建設(shè)實時計算平臺 Bigoflow 管理這些實時計算任務(wù),建設(shè)實時任務(wù)的血緣關(guān)系。
二、落地實踐 & 特色改進(jìn)
2.1 OneSQL OLAP 分析平臺實踐和優(yōu)化
OneSQL OLAP 分析平臺是一個集 Flink、Spark、Presto 于一體的 OLAP 查詢分析引擎。用戶提交的 OLAP 查詢請求通過 OneSQL 后端轉(zhuǎn)發(fā)到不同執(zhí)行引擎的客戶端,然后提交對應(yīng)的查詢請求到不同的集群上執(zhí)行。其整體架構(gòu)圖如下:
該分析平臺整體結(jié)構(gòu)從上到下分為入口層、轉(zhuǎn)發(fā)層、執(zhí)行層、資源管理層。為了優(yōu)化用戶體驗,減少執(zhí)行失敗的概率,提升各集群的資源利用率,OneSQL OLAP 分析平臺實現(xiàn)了以下功能:- 統(tǒng)一查詢?nèi)肟冢?/span>入口層,用戶通過統(tǒng)一的 Hue 查詢頁面入口以 Hive SQL 語法為標(biāo)準(zhǔn)提交查詢;
- 統(tǒng)一查詢語法:集 Flink、Spark、Presto 等多種查詢引擎于一體,不同查詢引擎通過適配 Hive SQL 語法來執(zhí)行用戶的 SQL 查詢?nèi)蝿?wù);
- 智能路由:在選擇執(zhí)行引擎的過程中,會根據(jù)歷史 SQL 查詢執(zhí)行的情況 (在各引擎上是否執(zhí)行成功,以及執(zhí)行耗時),各集群的繁忙情況,以及各引擎對該 SQL 語法的是否兼容,來選擇合適的引擎提交查詢;
- 失敗重試:OneSQL 后臺會監(jiān)控 SQL 任務(wù)的執(zhí)行情況,如果 SQL 任務(wù)在執(zhí)行過程中失敗,將選擇其他的引擎執(zhí)行重試提交任務(wù)。
如此一來,通過 OneSQL OLAP 分析平臺,BIGO 大數(shù)據(jù)平臺實現(xiàn)了 OLAP 分析入口的統(tǒng)一,減少用戶的盲目選擇,同時充分利用各個集群的資源,減少資源空閑情況。2.1.1 Flink OLAP 分析系統(tǒng)建設(shè)
在 OneSQL 分析平臺上,F(xiàn)link 也作為 OLAP 分析引擎的一部分。Flink OLAP 系統(tǒng)分成兩個組成部分:Flink SQL Gateway 和 Flink Session 集群;SQL Gateway 作為 SQL 提交的入口,查詢 SQL 經(jīng)過 Gateway 提交到 Flink Session 集群上執(zhí)行,同時獲取 SQL 執(zhí)行查詢的進(jìn)度,以及返回查詢的結(jié)果給客戶端。其執(zhí)行 SQL 查詢的流程如下:首先用戶提交過來的 SQL,在 SQL Gateway 進(jìn)行判斷:是否需要將結(jié)果持久化寫入到 Hive 表,如果需要,則會先通過 HiveCatalog 的接口創(chuàng)建一個 Hive 表,用于持久化查詢?nèi)蝿?wù)的計算結(jié)果;之后,任務(wù)通過 SQL Gateway 上執(zhí)行 SQL 解析,設(shè)置作業(yè)運行的并行度,生成 Pipeline 并提交到 Session 集群上執(zhí)行。
為了保證整個 Flink OLAP 系統(tǒng)的穩(wěn)定性,以及高效的執(zhí)行 SQL 查詢,在這個系統(tǒng)中,進(jìn)行了以下功能增強:
穩(wěn)定性:
- 基于 zookeeper HA 來保證 Flink Session 集群的可靠性,SQL Gateway 監(jiān)聽 Zookeeper 節(jié)點,感知 Session 集群;
- 控制查詢掃描 Hive 表的數(shù)據(jù)量,分區(qū)個數(shù),以及返回結(jié)果數(shù)據(jù)量,防止 Session 集群的 JobManager,TaskManager 因此出現(xiàn) OOM 情況。
性能:
Flink Session 集群預(yù)分配資源,減少作業(yè)提交后申請資源所需的時間;
Flink JobManager 異步解析 Split,Split 邊解析任務(wù)邊執(zhí)行,減少由于解析 Split 阻塞任務(wù)執(zhí)行的時間;
控制作業(yè)提交過程中掃描分區(qū),以及 Split 最大的個數(shù),減少設(shè)置任務(wù)并行所需要的時間。
Hive SQL 兼容:
針對 Flink 對于 Hive SQL 語法的兼容性進(jìn)行改進(jìn),目前針對 Hive SQL 的兼容性大致為 80%。監(jiān)控告警:
監(jiān)控 Flink Session 集群的 JobManager,TaskManager,以及 SQL Gateway 的內(nèi)存,CPU 使用情況,以及任務(wù)的提交情況,一旦出現(xiàn)問題,及時告警和處理。
2.1.2 OneSQL OLAP 分析平臺取得的成果
基于以上實現(xiàn)的 OneSQL OLAP 分析平臺,取得了以下幾個收益:- 統(tǒng)一查詢?nèi)肟?,減少用戶的盲目選擇,用戶執(zhí)行出錯率下降 85.7%,SQL 執(zhí)行的成功率提升 3%;
- SQL 執(zhí)行時間縮短 10%,充分利用了各個集群的資源,減少任務(wù)排隊等待的時間;
- Flink 作為 OLAP 分析引擎的一部分,實時計算集群的資源利用率提升了 15%。
2.2 實時數(shù)倉建設(shè)和優(yōu)化
為了提升 BIGO 大數(shù)據(jù)平臺上某些業(yè)務(wù)指標(biāo)的產(chǎn)出效率,以及更好的管理 Flink 實時任務(wù),BIGO 大數(shù)據(jù)平臺建設(shè)了實時計算平臺 Bigoflow,并將部分計算慢的任務(wù)遷移到實時計算平臺上,通過 Flink 流式計算的方式來執(zhí)行,通過消息隊列 Kafka/Pulsar 來進(jìn)行數(shù)據(jù)分層,構(gòu)建實時數(shù)倉;在 Bigoflow 上針對實時數(shù)倉的任務(wù)進(jìn)行平臺化管理,建立統(tǒng)一的實時任務(wù)接入入口,并基于該平臺管理實時任務(wù)的元數(shù)據(jù),構(gòu)建實時任務(wù)的血緣關(guān)系。2.2.1 建設(shè)方案
BIGO 大數(shù)據(jù)平臺主要基于 Flink + ClickHouse 建設(shè)實時數(shù)倉,大致方案如下:按照傳統(tǒng)數(shù)據(jù)倉庫的數(shù)據(jù)分層方法,將數(shù)據(jù)劃分成 ODS、DWD、DWS、ADS 等四層數(shù)據(jù):ODS 層:基于用戶的行為日志,業(yè)務(wù)日志等作為原始數(shù)據(jù),存放于 Kafka/Pulsar 等消息隊列中;
DWD 層:這部分?jǐn)?shù)據(jù)根據(jù)用戶的 UserId 經(jīng)過 Flink 任務(wù)進(jìn)行聚合后,形成不同用戶的行為明細(xì)數(shù)據(jù),保存到 Kafka/Pulsar 中;
DWS 層:用戶行為明細(xì)的 Kafka 流表與用戶 Hive/MySQL 維表進(jìn)行流維表 JOIN,然后將 JOIN 之后產(chǎn)生的多維明細(xì)數(shù)據(jù)輸出到 ClickHouse 表中;
ADS 層:針對 ClickHouse 中多維明細(xì)數(shù)據(jù)按照不同維度進(jìn)行匯總,然后應(yīng)用于不同的業(yè)務(wù)中。
按照以上方案建設(shè)實時數(shù)據(jù)倉庫的過程中,遇到了一些問題:- 將離線任務(wù)轉(zhuǎn)為實時計算任務(wù)后,計算邏輯較為復(fù)雜 (多流 JOIN,去重),導(dǎo)致作業(yè)狀態(tài)太大,作業(yè)出現(xiàn) OOM (內(nèi)存溢出) 異?;蛘咦鳂I(yè)算子背壓太大;
- 維表 Join 過程中,明細(xì)流表與大維表 Join,維表數(shù)據(jù)過多,加載到內(nèi)存后 OOM,作業(yè)失敗無法運行;
- Flink 將流維表 Join 產(chǎn)生的多維明細(xì)數(shù)據(jù)寫入到 ClickHouse,無法保證 Exactly-once,一旦作業(yè)出現(xiàn) Failover,就會導(dǎo)致數(shù)據(jù)重復(fù)寫入。
2.2.2 問題解決 & 優(yōu)化
優(yōu)化作業(yè)執(zhí)行邏輯,減小狀態(tài)離線的計算任務(wù)邏輯較為復(fù)雜,涉及多個 Hive 表之間的 Join 以及去重操作,其大致邏輯如下:當(dāng)將離線的作業(yè)轉(zhuǎn)為 Flink 的流式任務(wù)之后,原先離線 Join 多個 Hive 表的場景就轉(zhuǎn)變?yōu)?Join 多個 Kafka Topic 的場景。由于 Join 的 Kafka topic 的流量較大,且 Join 的窗口時間較長 (窗口最長的為 1 天),當(dāng)作業(yè)運行一段時間內(nèi),Join 算子上就積累了大量的狀態(tài) (一小時后狀態(tài)就接近 1T),面對如此大的狀態(tài),F(xiàn)link 作業(yè)采取 Rocksdb State Backend 來存放狀態(tài)數(shù)據(jù),但是仍然避免不了 Rocksdb 內(nèi)存使用超過導(dǎo)致被 YARN kill 的問題,或者是 Rocksdb State 上存的狀態(tài)太多,吞吐下降導(dǎo)致作業(yè)嚴(yán)重背壓。針對這個問題,我們將這多個 Topic,按照相同的 Schema 進(jìn)行 Unoin all 處理,得到一個大的數(shù)據(jù)流,然后在這個大的數(shù)據(jù)流中,再根據(jù)不同事件流的 event_id 進(jìn)行判斷,就能知道這條數(shù)據(jù)來自哪一個事件流的 Topic,再進(jìn)行聚合計算,獲取對應(yīng)事件流上的計算指標(biāo)。這樣一來,通過 UNION ALL 代替 JOIN,避免了因為 JOIN 計算帶來的大 State 帶來的影響。另外,在計算任務(wù)中還存在有比較多的 count distinct 計算,類似如下:
這些 count distinct 計算在同一個 group by 中,并基于相同的 postid 進(jìn)行去重計算,因而可以讓這些 distinct state 可以共享一組 key 來進(jìn)行去重計算,那么就可以通過一個 MapState 來存儲這若干個 count distinct 的狀態(tài),如下:這些 count distinct 函數(shù)去重的 key 相同,因而可以共享 MapState 中的 key 值,從而優(yōu)化存儲空間;而 Mapstate 的 Value 是 Byte 數(shù)組,每個 Byte 8 個 bit,每個 bit 為 0 或者 1,第 n 個 bit 對應(yīng)了 n 個 count distinct 函數(shù)在該 key 上的取值:1 表示該 count disitnct 函數(shù)在對應(yīng)的 key 上需要進(jìn)行計數(shù),0 表示不需要計數(shù);當(dāng)計算聚合結(jié)果的時候,則將所有 key 第 n 位的數(shù)字相加,即為第 n 個 count distinct 的取值,這樣一來,就更進(jìn)一步節(jié)約了狀態(tài)的存儲空間。通過以上優(yōu)化,成功的將 ABTest 的離線任務(wù)遷移到 Flink 流式計算任務(wù)上,將作業(yè)的狀態(tài)控制在 100GB 以內(nèi),讓作業(yè)正常的運行起來。流維表 JOIN 優(yōu)化
生成多維明細(xì)寬表的過程中,需要進(jìn)行流維表 JOIN, 使用了 Flink Join Hive 維表的功能:Hive 維表的數(shù)據(jù)會被加載到任務(wù)的 HashMap 的內(nèi)存數(shù)據(jù)結(jié)構(gòu)中,流表中的數(shù)據(jù)再根據(jù) Join Key 與 HashMap 中的數(shù)據(jù)進(jìn)行 Join。但是面對上億,十億行的 Hive 大維表,加載到內(nèi)存的數(shù)據(jù)量太大,很容易導(dǎo)致 OOM (內(nèi)存溢出)。針對以上問題,我們將 Hive 大維表按照 Join Key 進(jìn)行 Hash 分片,如下圖:這樣一來,Hive 大維表的數(shù)據(jù)經(jīng)過 Hash 函數(shù)計算后分布到 Flink 作業(yè)的不同并行子任務(wù)的 HashMap 中,每個 HashMap 只存放大維表的一部分?jǐn)?shù)據(jù),只要作業(yè)的并行度夠大,就能夠?qū)⒋缶S表的數(shù)據(jù)拆分成足夠多份,進(jìn)行分片保存;對于一些太大的維表,也可以采取 Rocksdb Map State 來保存分片數(shù)據(jù)。Kafka 流表中的數(shù)據(jù),當(dāng)要下發(fā)到不同的 subtask 上進(jìn)行 Join 時,也通過相同的 Join Key 按照相同的 Hash 函數(shù)進(jìn)行計算,從而將數(shù)據(jù)分配到對應(yīng)的 subtask 進(jìn)行 Join,輸出 Join 后的結(jié)果。通過以上優(yōu)化,成功 Join 了一些 Hive 大維表任務(wù)來執(zhí)行流維表 Join 計算,最大的維表超過 10 億行。ClickHouse Sink 的 Exactly-Once 語義支持將流維表 Join 生成的多維明細(xì)數(shù)據(jù)輸出到 ClickHouse 表的過程中,由于社區(qū)的 ClickHouse 不支持事務(wù),所以沒辦法保證數(shù)據(jù) sink 到 ClickHouse 過程中的 Exactly-Once 語義。在此過程中,一旦出現(xiàn)作業(yè) Failover,數(shù)據(jù)就會重復(fù)寫入到 ClickHouse。針對這個問題,BIGO ClickHouse 實現(xiàn)了一個二階段提交事務(wù)機制:當(dāng)需要寫入數(shù)據(jù)到 ClickHouse 時,可以先設(shè)置寫入的模式為 temporary,表明現(xiàn)在寫入的數(shù)據(jù)是臨時數(shù)據(jù);當(dāng)數(shù)據(jù)執(zhí)行插入完成后,返回一個 Insert id,然后根據(jù)該 Insert id 執(zhí)行 Commit 操作,那么臨時數(shù)據(jù)就轉(zhuǎn)為正式數(shù)據(jù)。基于 BIGO ClickHouse 的二階段提交事務(wù)機制,并結(jié)合 Flink 的 checkpoint 機制,實現(xiàn)了一個 ClickHouse Connector,保證 ClickHouse Sink 的 Exactly Once 寫入語義,如下:在正常寫入的情況下,Connector 隨機選擇 ClickHouse 的某一個 shard 寫入,根據(jù)用戶配置寫單副本,或者雙副本來執(zhí)行 insert 操作,并記錄寫入后的 insert id;在兩次 checkpoint 之間就會有多次這種 insert 操作,從而產(chǎn)生多個 insert id,當(dāng) checkpoint 完成時,再將這些 insert id 批量提交,將臨時數(shù)據(jù)轉(zhuǎn)為正式數(shù)據(jù),即完成了兩次 checkpoint 間數(shù)據(jù)的寫入;
一旦作業(yè)出現(xiàn) Failover,F(xiàn)link 作業(yè) Failover 重啟完成后,將從最近一次完成的 checkpoint 來恢復(fù)狀態(tài),此時 ClickHouse Sink 中的 Operator State 可能會包含上一次還沒有來得及提交完成的 Insert id,針對這些 insert id 進(jìn)行重試提交;針對那些數(shù)據(jù)已經(jīng)寫入 ClickHouse 中之后,但是 insert id 并沒有記錄到 Opeator State 中的數(shù)據(jù),由于是臨時數(shù)據(jù),在 ClickHouse 中并不會被查詢到,一段時間后,將會由 ClickHouse 的過期清理機制,被清理掉,從而保證了狀態(tài)回滾到上一次 checkpoint 之后,數(shù)據(jù)不會重復(fù)。
通過以上機制,成功保證了數(shù)據(jù)從 Kafka 經(jīng)過 Flink 計算后寫入到 ClickHouse 整個鏈路中端到端的 Exactly-Once 語義,數(shù)據(jù)不重復(fù)也不丟失。2.2.3 平臺建設(shè)
為了更好的管理 BIGO 大數(shù)據(jù)平臺的實時計算任務(wù),公司內(nèi)部建設(shè)了 BIGO 實時計算平臺 Bigoflow,為用戶提供統(tǒng)一的 Flink實時任務(wù)接入,平臺建設(shè)如下:
- 支持 Flink JAR、SQL、Python 等多種類型作業(yè);支持不同的 Flink 版本,覆蓋公司內(nèi)部大部分實時計算相關(guān)業(yè)務(wù);
- 一站式管理:集作業(yè)開發(fā)、提交、運行、歷史展示、監(jiān)控、告警于一體,便于隨時查看作業(yè)的運行狀態(tài)和發(fā)現(xiàn)問題;
- 血緣關(guān)系:方便查詢每個作業(yè)的數(shù)據(jù)源、數(shù)據(jù)目的、數(shù)據(jù)計算的來龍去脈。
三、應(yīng)用場景
3.1 Onesql OLAP 分析平臺應(yīng)用場景
Onesql OLAP 分析平臺在公司內(nèi)部的應(yīng)用場景是:應(yīng)用于 AdHoc 查詢,如下:用戶通過 Hue 頁面提交的 SQL,通過 OneSQL 后端轉(zhuǎn)發(fā)給 Flink SQL Gateway,并提交到 Flink Session 集群上執(zhí)行查詢?nèi)蝿?wù),F(xiàn)link SQL Gateway 獲取查詢?nèi)蝿?wù)的執(zhí)行進(jìn)度返回給 Hue 頁面,并返回查詢結(jié)果。
3.2 實時數(shù)據(jù)倉庫應(yīng)用場景
實時數(shù)據(jù)倉庫應(yīng)用場景目前主要是 ABTest 業(yè)務(wù),如下:用戶的原始行為日志數(shù)據(jù)經(jīng)過 Flink 任務(wù)聚合后生成用戶明細(xì)數(shù)據(jù),然后與維表數(shù)據(jù)進(jìn)行流維表 JOIN,輸出到 ClickHouse 生成多維明細(xì)寬表,按照不同維度匯總后,應(yīng)用于不同的業(yè)務(wù)。通過改造 ABTest 業(yè)務(wù),將該業(yè)務(wù)的結(jié)果指標(biāo)的生成時間提前了 8 個小時,同時減少了使用資源一倍以上。
四、未來規(guī)劃
為了更好的建設(shè) OneSQL OLAP 分析平臺以及 BIGO 實時數(shù)據(jù)倉庫,實時計算平臺的規(guī)劃如下:
完善 Flink OLAP 分析平臺,完善 Hive SQL 語法支持,以及解決計算過程中出現(xiàn)的 JOIN 數(shù)據(jù)傾斜問題;
完善實時數(shù)倉建設(shè),引入數(shù)據(jù)湖技術(shù),解決實時數(shù)倉中任務(wù)數(shù)據(jù)的可重跑回溯范圍小的問題;
基于 Flink 打造流批一體的數(shù)據(jù)計算平臺。
立即登錄,閱讀全文