BIGO 使用 Flink 做 OLAP 分析及實時數(shù)倉的實踐和優(yōu)化

來源:阿里開發(fā)者
作者:鄒云鶴@BIGO
時間:2023-01-12
2187
BIGO 是一家面向海外的以短視頻直播業(yè)務(wù)為主的公司, 目前公司的主要業(yè)務(wù)包括 BigoLive (全球直播服務(wù)),Likee (短視頻創(chuàng)作分享平臺),IMO (免費通信工具) 三部分,在全球范圍內(nèi)擁有 4 億用戶。

一、業(yè)務(wù)背景

BIGO 是一家面向海外的以短視頻直播業(yè)務(wù)為主的公司, 目前公司的主要業(yè)務(wù)包括 BigoLive (全球直播服務(wù)),Likee (短視頻創(chuàng)作分享平臺),IMO (免費通信工具) 三部分,在全球范圍內(nèi)擁有 4 億用戶。伴隨著業(yè)務(wù)的發(fā)展,對數(shù)據(jù)平臺處理能力的要求也是越來越高,平臺所面臨的問題也是日益凸顯,接下來將介紹 BIGO 大數(shù)據(jù)平臺及其所面臨的問題。BIGO 大數(shù)據(jù)平臺的數(shù)據(jù)流轉(zhuǎn)圖如下所示:

用戶在 APP,Web 頁面上的行為日志數(shù)據(jù),以及關(guān)系數(shù)據(jù)庫的 Binlog 數(shù)據(jù)會被同步到 BIGO 大數(shù)據(jù)平臺消息隊列,以及離線存儲系統(tǒng)中,然后通過實時的,離線的數(shù)據(jù)分析手段進(jìn)行計算,以應(yīng)用于實時推薦、監(jiān)控、即席查詢等使用場景。然而存在以下幾個問題:
  • OLAP 分析平臺入口不統(tǒng)一:Presto/Spark 分析任務(wù)入口并存,用戶不清楚自己的 SQL 查詢適合哪個引擎執(zhí)行,盲目選擇,體驗不好;另外,用戶會在兩個入口同時提交相同查詢,以更快的獲取查詢結(jié)果,導(dǎo)致資源浪費;

  • 離線任務(wù)計算時延高,結(jié)果產(chǎn)出太慢:典型的如 ABTest 業(yè)務(wù),經(jīng)常計算到下午才計算出結(jié)果;

  • 各個業(yè)務(wù)方基于自己的業(yè)務(wù)場景獨立開發(fā)應(yīng)用,實時任務(wù)煙囪式的開發(fā),缺少數(shù)據(jù)分層,數(shù)據(jù)血緣。

面對以上的問題,BIGO 大數(shù)據(jù)平臺建設(shè)了 OneSQL OLAP 分析平臺,以及實時數(shù)倉。
  1. 通過 OneSQL OLAP 分析平臺,統(tǒng)一 OLAP 查詢?nèi)肟冢瑴p少用戶盲目選擇,提升平臺的資源利用率;
  2. 通過 Flink 構(gòu)建實時數(shù)倉任務(wù),通過 Kafka/Pulsar 進(jìn)行數(shù)據(jù)分層;
  3. 將部分離線計算慢的任務(wù)遷移到 Flink 流式計算任務(wù)上,加速計算結(jié)果的產(chǎn)出。

另外建設(shè)實時計算平臺 Bigoflow 管理這些實時計算任務(wù),建設(shè)實時任務(wù)的血緣關(guān)系。


二、落地實踐 & 特色改進(jìn)

2.1 OneSQL OLAP 分析平臺實踐和優(yōu)化

OneSQL OLAP 分析平臺是一個集 Flink、Spark、Presto 于一體的 OLAP 查詢分析引擎。用戶提交的 OLAP 查詢請求通過 OneSQL 后端轉(zhuǎn)發(fā)到不同執(zhí)行引擎的客戶端,然后提交對應(yīng)的查詢請求到不同的集群上執(zhí)行。其整體架構(gòu)圖如下:

該分析平臺整體結(jié)構(gòu)從上到下分為入口層、轉(zhuǎn)發(fā)層、執(zhí)行層、資源管理層。為了優(yōu)化用戶體驗,減少執(zhí)行失敗的概率,提升各集群的資源利用率,OneSQL OLAP 分析平臺實現(xiàn)了以下功能:
  • 統(tǒng)一查詢?nèi)肟冢?/span>入口層,用戶通過統(tǒng)一的 Hue 查詢頁面入口以 Hive SQL 語法為標(biāo)準(zhǔn)提交查詢;
  • 統(tǒng)一查詢語法:集 Flink、Spark、Presto 等多種查詢引擎于一體,不同查詢引擎通過適配 Hive SQL 語法來執(zhí)行用戶的 SQL 查詢?nèi)蝿?wù);
  • 智能路由:在選擇執(zhí)行引擎的過程中,會根據(jù)歷史 SQL 查詢執(zhí)行的情況 (在各引擎上是否執(zhí)行成功,以及執(zhí)行耗時),各集群的繁忙情況,以及各引擎對該 SQL 語法的是否兼容,來選擇合適的引擎提交查詢;
  • 失敗重試:OneSQL 后臺會監(jiān)控 SQL 任務(wù)的執(zhí)行情況,如果 SQL 任務(wù)在執(zhí)行過程中失敗,將選擇其他的引擎執(zhí)行重試提交任務(wù)。
如此一來,通過 OneSQL OLAP 分析平臺,BIGO 大數(shù)據(jù)平臺實現(xiàn)了 OLAP 分析入口的統(tǒng)一,減少用戶的盲目選擇,同時充分利用各個集群的資源,減少資源空閑情況。

2.1.1 Flink OLAP 分析系統(tǒng)建設(shè)

在 OneSQL 分析平臺上,F(xiàn)link 也作為 OLAP 分析引擎的一部分。Flink OLAP 系統(tǒng)分成兩個組成部分:Flink SQL Gateway 和 Flink Session 集群;SQL Gateway 作為 SQL 提交的入口,查詢 SQL 經(jīng)過 Gateway 提交到 Flink Session 集群上執(zhí)行,同時獲取 SQL 執(zhí)行查詢的進(jìn)度,以及返回查詢的結(jié)果給客戶端。其執(zhí)行 SQL 查詢的流程如下:

首先用戶提交過來的 SQL,在 SQL Gateway 進(jìn)行判斷:是否需要將結(jié)果持久化寫入到 Hive 表,如果需要,則會先通過 HiveCatalog 的接口創(chuàng)建一個 Hive 表,用于持久化查詢?nèi)蝿?wù)的計算結(jié)果;之后,任務(wù)通過 SQL Gateway 上執(zhí)行 SQL 解析,設(shè)置作業(yè)運行的并行度,生成 Pipeline 并提交到 Session 集群上執(zhí)行。

為了保證整個 Flink OLAP 系統(tǒng)的穩(wěn)定性,以及高效的執(zhí)行 SQL 查詢,在這個系統(tǒng)中,進(jìn)行了以下功能增強:

  • 穩(wěn)定性:

    • 基于 zookeeper HA 來保證 Flink Session 集群的可靠性,SQL Gateway 監(jiān)聽 Zookeeper 節(jié)點,感知 Session 集群;
    • 控制查詢掃描 Hive 表的數(shù)據(jù)量,分區(qū)個數(shù),以及返回結(jié)果數(shù)據(jù)量,防止 Session 集群的 JobManager,TaskManager 因此出現(xiàn) OOM 情況。
  • 性能:

    • Flink Session 集群預(yù)分配資源,減少作業(yè)提交后申請資源所需的時間;

    • Flink JobManager 異步解析 Split,Split 邊解析任務(wù)邊執(zhí)行,減少由于解析 Split 阻塞任務(wù)執(zhí)行的時間;

    • 控制作業(yè)提交過程中掃描分區(qū),以及 Split 最大的個數(shù),減少設(shè)置任務(wù)并行所需要的時間。

  • Hive SQL 兼容:

    針對 Flink 對于 Hive SQL 語法的兼容性進(jìn)行改進(jìn),目前針對 Hive SQL 的兼容性大致為 80%。
  • 監(jiān)控告警:

    監(jiān)控 Flink Session 集群的 JobManager,TaskManager,以及 SQL Gateway 的內(nèi)存,CPU 使用情況,以及任務(wù)的提交情況,一旦出現(xiàn)問題,及時告警和處理。

2.1.2 OneSQL OLAP 分析平臺取得的成果

基于以上實現(xiàn)的 OneSQL OLAP 分析平臺,取得了以下幾個收益:
  1. 統(tǒng)一查詢?nèi)肟?,減少用戶的盲目選擇,用戶執(zhí)行出錯率下降 85.7%,SQL 執(zhí)行的成功率提升 3%;
  2. SQL 執(zhí)行時間縮短 10%,充分利用了各個集群的資源,減少任務(wù)排隊等待的時間;
  3. Flink 作為 OLAP 分析引擎的一部分,實時計算集群的資源利用率提升了 15%。

2.2 實時數(shù)倉建設(shè)和優(yōu)化

為了提升 BIGO 大數(shù)據(jù)平臺上某些業(yè)務(wù)指標(biāo)的產(chǎn)出效率,以及更好的管理 Flink 實時任務(wù),BIGO 大數(shù)據(jù)平臺建設(shè)了實時計算平臺 Bigoflow,并將部分計算慢的任務(wù)遷移到實時計算平臺上,通過 Flink 流式計算的方式來執(zhí)行,通過消息隊列 Kafka/Pulsar 來進(jìn)行數(shù)據(jù)分層,構(gòu)建實時數(shù)倉;在 Bigoflow 上針對實時數(shù)倉的任務(wù)進(jìn)行平臺化管理,建立統(tǒng)一的實時任務(wù)接入入口,并基于該平臺管理實時任務(wù)的元數(shù)據(jù),構(gòu)建實時任務(wù)的血緣關(guān)系。

2.2.1 建設(shè)方案

BIGO 大數(shù)據(jù)平臺主要基于 Flink + ClickHouse 建設(shè)實時數(shù)倉,大致方案如下:

按照傳統(tǒng)數(shù)據(jù)倉庫的數(shù)據(jù)分層方法,將數(shù)據(jù)劃分成 ODS、DWD、DWS、ADS 等四層數(shù)據(jù):
  • ODS 層:基于用戶的行為日志,業(yè)務(wù)日志等作為原始數(shù)據(jù),存放于 Kafka/Pulsar 等消息隊列中;

  • DWD 層:這部分?jǐn)?shù)據(jù)根據(jù)用戶的 UserId 經(jīng)過 Flink 任務(wù)進(jìn)行聚合后,形成不同用戶的行為明細(xì)數(shù)據(jù),保存到 Kafka/Pulsar 中;

  • DWS 層:用戶行為明細(xì)的 Kafka 流表與用戶 Hive/MySQL 維表進(jìn)行流維表 JOIN,然后將 JOIN 之后產(chǎn)生的多維明細(xì)數(shù)據(jù)輸出到 ClickHouse 表中;

  • ADS 層:針對 ClickHouse 中多維明細(xì)數(shù)據(jù)按照不同維度進(jìn)行匯總,然后應(yīng)用于不同的業(yè)務(wù)中。

按照以上方案建設(shè)實時數(shù)據(jù)倉庫的過程中,遇到了一些問題:
  • 將離線任務(wù)轉(zhuǎn)為實時計算任務(wù)后,計算邏輯較為復(fù)雜 (多流 JOIN,去重),導(dǎo)致作業(yè)狀態(tài)太大,作業(yè)出現(xiàn) OOM (內(nèi)存溢出) 異?;蛘咦鳂I(yè)算子背壓太大;
  • 維表 Join 過程中,明細(xì)流表與大維表 Join,維表數(shù)據(jù)過多,加載到內(nèi)存后 OOM,作業(yè)失敗無法運行;
  • Flink 將流維表 Join 產(chǎn)生的多維明細(xì)數(shù)據(jù)寫入到 ClickHouse,無法保證 Exactly-once,一旦作業(yè)出現(xiàn) Failover,就會導(dǎo)致數(shù)據(jù)重復(fù)寫入。

2.2.2 問題解決 & 優(yōu)化

優(yōu)化作業(yè)執(zhí)行邏輯,減小狀態(tài)
離線的計算任務(wù)邏輯較為復(fù)雜,涉及多個 Hive 表之間的 Join 以及去重操作,其大致邏輯如下:

當(dāng)將離線的作業(yè)轉(zhuǎn)為 Flink 的流式任務(wù)之后,原先離線 Join 多個 Hive 表的場景就轉(zhuǎn)變?yōu)?Join 多個 Kafka Topic 的場景。由于 Join 的 Kafka topic 的流量較大,且 Join 的窗口時間較長 (窗口最長的為 1 天),當(dāng)作業(yè)運行一段時間內(nèi),Join 算子上就積累了大量的狀態(tài) (一小時后狀態(tài)就接近 1T),面對如此大的狀態(tài),F(xiàn)link 作業(yè)采取 Rocksdb State Backend 來存放狀態(tài)數(shù)據(jù),但是仍然避免不了 Rocksdb 內(nèi)存使用超過導(dǎo)致被 YARN kill 的問題,或者是 Rocksdb State 上存的狀態(tài)太多,吞吐下降導(dǎo)致作業(yè)嚴(yán)重背壓。
針對這個問題,我們將這多個 Topic,按照相同的 Schema 進(jìn)行 Unoin all 處理,得到一個大的數(shù)據(jù)流,然后在這個大的數(shù)據(jù)流中,再根據(jù)不同事件流的 event_id 進(jìn)行判斷,就能知道這條數(shù)據(jù)來自哪一個事件流的 Topic,再進(jìn)行聚合計算,獲取對應(yīng)事件流上的計算指標(biāo)。

這樣一來,通過 UNION ALL 代替 JOIN,避免了因為 JOIN 計算帶來的大 State 帶來的影響。

另外,在計算任務(wù)中還存在有比較多的 count distinct 計算,類似如下:

微信圖片_20230112143805.png

這些 count distinct 計算在同一個 group by 中,并基于相同的 postid 進(jìn)行去重計算,因而可以讓這些 distinct state 可以共享一組 key 來進(jìn)行去重計算,那么就可以通過一個 MapState 來存儲這若干個 count distinct 的狀態(tài),如下:

這些 count distinct 函數(shù)去重的 key 相同,因而可以共享 MapState 中的 key 值,從而優(yōu)化存儲空間;而 Mapstate 的 Value 是 Byte 數(shù)組,每個 Byte 8 個 bit,每個 bit 為 0 或者 1,第 n 個 bit 對應(yīng)了 n 個 count distinct 函數(shù)在該 key 上的取值:1 表示該 count disitnct 函數(shù)在對應(yīng)的 key 上需要進(jìn)行計數(shù),0 表示不需要計數(shù);當(dāng)計算聚合結(jié)果的時候,則將所有 key 第 n 位的數(shù)字相加,即為第 n 個 count distinct 的取值,這樣一來,就更進(jìn)一步節(jié)約了狀態(tài)的存儲空間。
通過以上優(yōu)化,成功的將 ABTest 的離線任務(wù)遷移到 Flink 流式計算任務(wù)上,將作業(yè)的狀態(tài)控制在 100GB 以內(nèi),讓作業(yè)正常的運行起來。

流維表 JOIN 優(yōu)化

生成多維明細(xì)寬表的過程中,需要進(jìn)行流維表 JOIN, 使用了 Flink Join Hive 維表的功能:Hive 維表的數(shù)據(jù)會被加載到任務(wù)的 HashMap 的內(nèi)存數(shù)據(jù)結(jié)構(gòu)中,流表中的數(shù)據(jù)再根據(jù) Join Key 與 HashMap 中的數(shù)據(jù)進(jìn)行 Join。但是面對上億,十億行的 Hive 大維表,加載到內(nèi)存的數(shù)據(jù)量太大,很容易導(dǎo)致 OOM (內(nèi)存溢出)。針對以上問題,我們將 Hive 大維表按照 Join Key 進(jìn)行 Hash 分片,如下圖:

這樣一來,Hive 大維表的數(shù)據(jù)經(jīng)過 Hash 函數(shù)計算后分布到 Flink 作業(yè)的不同并行子任務(wù)的 HashMap 中,每個 HashMap 只存放大維表的一部分?jǐn)?shù)據(jù),只要作業(yè)的并行度夠大,就能夠?qū)⒋缶S表的數(shù)據(jù)拆分成足夠多份,進(jìn)行分片保存;對于一些太大的維表,也可以采取 Rocksdb Map State 來保存分片數(shù)據(jù)。
Kafka 流表中的數(shù)據(jù),當(dāng)要下發(fā)到不同的 subtask 上進(jìn)行 Join 時,也通過相同的 Join Key 按照相同的 Hash 函數(shù)進(jìn)行計算,從而將數(shù)據(jù)分配到對應(yīng)的 subtask 進(jìn)行 Join,輸出 Join 后的結(jié)果。
通過以上優(yōu)化,成功 Join 了一些 Hive 大維表任務(wù)來執(zhí)行流維表 Join 計算,最大的維表超過 10 億行。
ClickHouse Sink 的 Exactly-Once 語義支持
將流維表 Join 生成的多維明細(xì)數(shù)據(jù)輸出到 ClickHouse 表的過程中,由于社區(qū)的 ClickHouse 不支持事務(wù),所以沒辦法保證數(shù)據(jù) sink 到 ClickHouse 過程中的 Exactly-Once 語義。在此過程中,一旦出現(xiàn)作業(yè) Failover,數(shù)據(jù)就會重復(fù)寫入到 ClickHouse。
針對這個問題,BIGO ClickHouse 實現(xiàn)了一個二階段提交事務(wù)機制:當(dāng)需要寫入數(shù)據(jù)到 ClickHouse 時,可以先設(shè)置寫入的模式為 temporary,表明現(xiàn)在寫入的數(shù)據(jù)是臨時數(shù)據(jù);當(dāng)數(shù)據(jù)執(zhí)行插入完成后,返回一個 Insert id,然后根據(jù)該 Insert id 執(zhí)行 Commit 操作,那么臨時數(shù)據(jù)就轉(zhuǎn)為正式數(shù)據(jù)。
基于 BIGO ClickHouse 的二階段提交事務(wù)機制,并結(jié)合 Flink 的 checkpoint 機制,實現(xiàn)了一個 ClickHouse Connector,保證 ClickHouse Sink 的 Exactly Once 寫入語義,如下:

  • 在正常寫入的情況下,Connector 隨機選擇 ClickHouse 的某一個 shard 寫入,根據(jù)用戶配置寫單副本,或者雙副本來執(zhí)行 insert 操作,并記錄寫入后的 insert id;在兩次 checkpoint 之間就會有多次這種 insert 操作,從而產(chǎn)生多個 insert id,當(dāng) checkpoint 完成時,再將這些 insert id 批量提交,將臨時數(shù)據(jù)轉(zhuǎn)為正式數(shù)據(jù),即完成了兩次 checkpoint 間數(shù)據(jù)的寫入;

  • 一旦作業(yè)出現(xiàn) Failover,F(xiàn)link 作業(yè) Failover 重啟完成后,將從最近一次完成的 checkpoint 來恢復(fù)狀態(tài),此時 ClickHouse Sink 中的 Operator State 可能會包含上一次還沒有來得及提交完成的 Insert id,針對這些 insert id 進(jìn)行重試提交;針對那些數(shù)據(jù)已經(jīng)寫入 ClickHouse 中之后,但是 insert id 并沒有記錄到 Opeator State 中的數(shù)據(jù),由于是臨時數(shù)據(jù),在 ClickHouse 中并不會被查詢到,一段時間后,將會由 ClickHouse 的過期清理機制,被清理掉,從而保證了狀態(tài)回滾到上一次 checkpoint 之后,數(shù)據(jù)不會重復(fù)。

通過以上機制,成功保證了數(shù)據(jù)從 Kafka 經(jīng)過 Flink 計算后寫入到 ClickHouse 整個鏈路中端到端的 Exactly-Once 語義,數(shù)據(jù)不重復(fù)也不丟失。

2.2.3 平臺建設(shè)

為了更好的管理 BIGO 大數(shù)據(jù)平臺的實時計算任務(wù),公司內(nèi)部建設(shè)了 BIGO 實時計算平臺 Bigoflow,為用戶提供統(tǒng)一的 Flink實時任務(wù)接入,平臺建設(shè)如下:

  • 支持 Flink JAR、SQL、Python 等多種類型作業(yè);支持不同的 Flink 版本,覆蓋公司內(nèi)部大部分實時計算相關(guān)業(yè)務(wù);
  • 一站式管理:集作業(yè)開發(fā)、提交、運行、歷史展示、監(jiān)控、告警于一體,便于隨時查看作業(yè)的運行狀態(tài)和發(fā)現(xiàn)問題;
  • 血緣關(guān)系:方便查詢每個作業(yè)的數(shù)據(jù)源、數(shù)據(jù)目的、數(shù)據(jù)計算的來龍去脈。


三、應(yīng)用場景

3.1 Onesql OLAP 分析平臺應(yīng)用場景

Onesql OLAP 分析平臺在公司內(nèi)部的應(yīng)用場景是:應(yīng)用于 AdHoc 查詢,如下:

用戶通過 Hue 頁面提交的 SQL,通過 OneSQL 后端轉(zhuǎn)發(fā)給 Flink SQL Gateway,并提交到 Flink Session 集群上執(zhí)行查詢?nèi)蝿?wù),F(xiàn)link SQL Gateway 獲取查詢?nèi)蝿?wù)的執(zhí)行進(jìn)度返回給 Hue 頁面,并返回查詢結(jié)果。

3.2 實時數(shù)據(jù)倉庫應(yīng)用場景

實時數(shù)據(jù)倉庫應(yīng)用場景目前主要是 ABTest 業(yè)務(wù),如下:

用戶的原始行為日志數(shù)據(jù)經(jīng)過 Flink 任務(wù)聚合后生成用戶明細(xì)數(shù)據(jù),然后與維表數(shù)據(jù)進(jìn)行流維表 JOIN,輸出到 ClickHouse 生成多維明細(xì)寬表,按照不同維度匯總后,應(yīng)用于不同的業(yè)務(wù)。通過改造 ABTest 業(yè)務(wù),將該業(yè)務(wù)的結(jié)果指標(biāo)的生成時間提前了 8 個小時,同時減少了使用資源一倍以上。


四、未來規(guī)劃

為了更好的建設(shè) OneSQL OLAP 分析平臺以及 BIGO 實時數(shù)據(jù)倉庫,實時計算平臺的規(guī)劃如下:

  • 完善 Flink OLAP 分析平臺,完善 Hive SQL 語法支持,以及解決計算過程中出現(xiàn)的 JOIN 數(shù)據(jù)傾斜問題;

  • 完善實時數(shù)倉建設(shè),引入數(shù)據(jù)湖技術(shù),解決實時數(shù)倉中任務(wù)數(shù)據(jù)的可重跑回溯范圍小的問題;

  • 基于 Flink 打造流批一體的數(shù)據(jù)計算平臺。

立即登錄,閱讀全文
原文鏈接:點擊前往 >
文章來源:阿里開發(fā)者
版權(quán)說明:本文內(nèi)容來自于阿里開發(fā)者,本站不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。文章內(nèi)容系作者個人觀點,不代表快出海對觀點贊同或支持。如有侵權(quán),請聯(lián)系管理員(zzx@kchuhai.com)刪除!
優(yōu)質(zhì)服務(wù)商推薦
更多