使用Apache Flink在Pinterest進(jìn)行實(shí)時(shí)實(shí)驗(yàn)分析

來(lái)源:Ververica
作者:Parag Kesar&Ben Liu
時(shí)間:2020-08-20
2491
本文分享Pinterest是如何基于Flink做實(shí)時(shí)分析的實(shí)例。

在Pinterest,我們每天都要進(jìn)行數(shù)千個(gè)實(shí)驗(yàn)。我們主要依靠日常實(shí)驗(yàn)指標(biāo)來(lái)評(píng)估實(shí)驗(yàn)效果。日常實(shí)驗(yàn)管道運(yùn)行一次可能會(huì)花費(fèi)10多個(gè)小時(shí),有時(shí)還會(huì)超時(shí),因此想要驗(yàn)證實(shí)驗(yàn)設(shè)置、觸發(fā)的正確性以及預(yù)期的實(shí)驗(yàn)性能時(shí)就沒(méi)那么方便了。當(dāng)代碼中存在一些錯(cuò)誤時(shí)這個(gè)問(wèn)題尤為突出。有時(shí)可能要花幾天時(shí)間才能發(fā)現(xiàn)錯(cuò)誤,這對(duì)用戶(hù)體驗(yàn)和重要指標(biāo)造成了更大的損害。我們?cè)赑interest開(kāi)發(fā)了一個(gè)近實(shí)時(shí)實(shí)驗(yàn)平臺(tái),以提供更具時(shí)效性的實(shí)驗(yàn)指標(biāo),從而幫助我們盡快發(fā)現(xiàn)這些問(wèn)題??赡艹霈F(xiàn)的問(wèn)題有:

1.實(shí)驗(yàn)導(dǎo)致impression的統(tǒng)計(jì)數(shù)據(jù)顯著下降,因此需要盡快關(guān)閉實(shí)驗(yàn)。

2.與對(duì)照組相比,實(shí)驗(yàn)導(dǎo)致搜索的執(zhí)行次數(shù)顯著增加。

640.webp (6).jpg

圖1-帶有置信區(qū)間的實(shí)時(shí)實(shí)驗(yàn)指標(biāo)

上圖的面板顯示了所選事件的實(shí)驗(yàn)組和對(duì)照組的流量(也就是動(dòng)作數(shù))和傾向(也就是unique user的數(shù)量)。自實(shí)驗(yàn)開(kāi)始以來(lái),這些計(jì)數(shù)已經(jīng)累計(jì)了3天時(shí)間。如果在3天后發(fā)生了re-ramp(分配給實(shí)驗(yàn)組和對(duì)照組的用戶(hù)數(shù)量增加),則計(jì)數(shù)會(huì)歸零0并重新開(kāi)始累計(jì)3天時(shí)間。

為了確保實(shí)驗(yàn)組與對(duì)照組之間的對(duì)比在統(tǒng)計(jì)上是有效的,我們做了一些統(tǒng)計(jì)檢驗(yàn)。由于指標(biāo)是實(shí)時(shí)交付的,因此每次按順序收到新記錄時(shí),我們都必須進(jìn)行這些檢驗(yàn)。這需要與傳統(tǒng)的固定視野檢驗(yàn)不一樣的方法,否則會(huì)帶來(lái)較高的假正率。我們考慮過(guò)幾種順序測(cè)試方法,包括賭徒破產(chǎn)、貝葉斯A/B檢驗(yàn)和Alpha消耗函數(shù)方法。為了保證數(shù)值穩(wěn)定性,我們從t檢驗(yàn)+Boferroni校正(將我們的案例作為多次檢驗(yàn)進(jìn)行處理)開(kāi)始,并為我們的初始實(shí)現(xiàn)預(yù)先確定了檢驗(yàn)次數(shù)。

高階設(shè)計(jì)

640.webp.jpg

圖2-實(shí)時(shí)實(shí)驗(yàn)管道的高階設(shè)計(jì)

實(shí)時(shí)實(shí)驗(yàn)管道包括下列主要組件:

·最近ramp的實(shí)驗(yàn)組作業(yè)→每5分鐘將一個(gè)CSV文件發(fā)布到一個(gè)S3位置。這個(gè)CSV是過(guò)去3天中所分配用戶(hù)有所增加的實(shí)驗(yàn)組的快照。通過(guò)查詢(xún)托管實(shí)驗(yàn)元數(shù)據(jù)的內(nèi)部Analytics(分析)應(yīng)用程序的MySQL數(shù)據(jù)庫(kù),就能獲得這一信息。

·篩選事件作業(yè)→我們分析了Pinterest上的數(shù)百種用戶(hù)動(dòng)作。這一作業(yè)僅保留最關(guān)key的業(yè)務(wù)事件,這些事件已插入“filtered_events”Kafka主題中。這些事件被剝離掉了不需要的字段,因此filtered_events主題相當(dāng)輕巧。該作業(yè)運(yùn)行在Flink processing時(shí)間內(nèi),并且通過(guò)Flink的增量檢查點(diǎn),每隔5秒將其進(jìn)度保存到HDFS中。

·過(guò)濾實(shí)驗(yàn)Activation作業(yè)→每當(dāng)一個(gè)用戶(hù)被觸發(fā)進(jìn)入一個(gè)實(shí)驗(yàn)時(shí),都會(huì)創(chuàng)建一個(gè)Activation(激活)記錄。觸發(fā)規(guī)則取決于實(shí)驗(yàn)邏輯,一名用戶(hù)可以被觸發(fā)進(jìn)入一個(gè)實(shí)驗(yàn)數(shù)百次。我們只需要最近3天啟動(dòng),或組分配增加的實(shí)驗(yàn)的Activation記錄即可。

為了過(guò)濾Activation記錄,此作業(yè)使用Flink的廣播狀態(tài)模式。每10秒檢查一次“最近ramp的實(shí)驗(yàn)組”作業(yè)所發(fā)布的CSV的更改情況,并將其發(fā)布到一個(gè)KeyedBroadcastProcessFunction的所有分區(qū)上,該函數(shù)也消費(fèi)Activation。

KeyedBroadcastProcessFunction將廣播的CSV與Activation流結(jié)合在一起,就可以過(guò)濾掉那些最近3天內(nèi)未ramp-up實(shí)驗(yàn)的Activation記錄。此外,“group-ramp-up-time”已添加到Activation記錄中,并插入“filtered_experiment_activations”kafka主題中。

640.webp (1).jpg

圖3-Scala對(duì)象被插入中間層Kafka主題中

640.webp (2).jpg

圖4-實(shí)時(shí)實(shí)驗(yàn)累積作業(yè)圖

上面是實(shí)時(shí)累積(Aggregation)Flink作業(yè)的高階概覽。這里簡(jiǎn)單提及了一些operator,后文中還將詳細(xì)介紹另一些operator。Source operator從Kafka讀取數(shù)據(jù),而sink使用一個(gè)REST接口寫(xiě)入我們的內(nèi)部Analytics Store上。

刪除重復(fù)事件→這里用一個(gè)KeyedProcessFunction實(shí)現(xiàn),由(event.user_id,event.event_type,event.timestamp)作為key。這里的思想是,如果來(lái)自同一用戶(hù)的相同事件類(lèi)型的事件具有相同的時(shí)間戳,則它們是重復(fù)事件。第一個(gè)這樣的事件被發(fā)送到下游,但也會(huì)緩存進(jìn)狀態(tài)持續(xù)5分鐘時(shí)間。任何后續(xù)事件都將被丟棄。5分鐘后,一個(gè)計(jì)時(shí)器會(huì)啟動(dòng)并清除狀態(tài)。這里的假定是所有重復(fù)事件之間的間隔都在5分鐘之內(nèi)。

查找首次觸發(fā)時(shí)間→這里是一個(gè)Flink KeyedProcessFunction,由(experiment_hash,experiment_group,user_id)作為key。這里的假設(shè)是,為一個(gè)用戶(hù)收到的第一個(gè)實(shí)驗(yàn)Activation記錄也是具有第一個(gè)觸發(fā)時(shí)間的Activation。一個(gè)實(shí)驗(yàn)ramp-up以后,收到的第一個(gè)Activation將發(fā)送至下游,并保存為狀態(tài)并持續(xù)3天時(shí)間(我們累積了實(shí)驗(yàn)組ramp-up以來(lái)為期3天的計(jì)數(shù))。經(jīng)過(guò)3天的ramp時(shí)間后,一個(gè)計(jì)時(shí)器將清除狀態(tài)。

15分鐘的processing時(shí)間tumbling窗口→事件進(jìn)入并向下游發(fā)送結(jié)果時(shí),Numerator Computer和Denominator computer都將累積計(jì)數(shù)。這意味著數(shù)百萬(wàn)條記錄,但是我們不需要如此頻繁地將結(jié)果發(fā)送到Analytics Store上。我們可以在processing時(shí)間內(nèi)運(yùn)行一個(gè)持續(xù)15分鐘的Flink tumbling窗口,這樣效率更高。對(duì)于Numerator Computer來(lái)說(shuō),這個(gè)窗口由(“experiment_hash”,“experiment_group”,“event_type”,“timestamp”)作為key。當(dāng)窗口在15分鐘后觸發(fā)時(shí),將獲取帶有max_users的記錄并將其發(fā)送到下游的Analytics Store sink。

連接事件和Activation

640.webp (3).jpg

圖5-通過(guò)用戶(hù)ID連接Activation流與事件流

我們使用Flink的IntervalJoin operator實(shí)現(xiàn)流到流的連接。IntervalJoin會(huì)在接下來(lái)的3天內(nèi)緩沖每位用戶(hù)的單個(gè)Activation記錄,并且所有匹配事件都將與Activation記錄中的其他實(shí)驗(yàn)元數(shù)據(jù)一起發(fā)送到下游。

這種方法的局限性:

1.對(duì)我們的需求而言,IntervalJoin operator有點(diǎn)不夠靈活,因?yàn)樗拈g隔是固定的而不是動(dòng)態(tài)的。比如說(shuō),用戶(hù)可以在實(shí)驗(yàn)啟動(dòng)2天后加入進(jìn)來(lái),但I(xiàn)ntervalJoin還是會(huì)為這名用戶(hù)運(yùn)行3天時(shí)間,也就是說(shuō)我們停止累積數(shù)據(jù)后還會(huì)運(yùn)行2天時(shí)間。如果3天后組很快re-ramp,則一位用戶(hù)也可以有2個(gè)這樣的連接。這種情況會(huì)在下游處理。

2.事件和Activation不同步:如果Activation作業(yè)失敗并且Activation流被延遲,則可能會(huì)丟失一些數(shù)據(jù),因?yàn)闆](méi)有匹配Activation的事件還會(huì)繼續(xù)流動(dòng)。這將導(dǎo)致計(jì)數(shù)不足。

我們研究了Flink的IntervalJoin源代碼。它會(huì)在“左側(cè)緩沖區(qū)”中緩沖Activation 3天時(shí)間,但事件將被立即刪除。目前似乎無(wú)法通過(guò)配置更改此行為。我們正在研究使用Flink的協(xié)同處理函數(shù)來(lái)實(shí)現(xiàn)這個(gè)Activation到事件的連接,該函數(shù)是用于流到流連接的更通用的函數(shù)。我們可以將事件緩沖X分鐘,這樣即使Activation流延遲了X分鐘,管道也可以處理延遲而不會(huì)出現(xiàn)計(jì)數(shù)不足。這將幫助我們避免同一用戶(hù)的兩次連接,并能形成更加動(dòng)態(tài)的管道,其可以立即感知到實(shí)驗(yàn)組的re-ramp,并支持更多動(dòng)態(tài)行為,例如在組re-ramp時(shí)自動(dòng)擴(kuò)展累積的覆蓋范圍。

Join Results Deduplicator

640.webp (4).jpg

圖6-Join Results Deduplicator

Join Results Deduplicator是一個(gè)Flink KeyedProcessFunction,它由experiment_hash,experiment_group,event_type,user_id作為key。這個(gè)operator的主要目的是在向下游發(fā)送記錄時(shí)插入“user_first_time_seen”標(biāo)志——下游Numerator Computer使用這個(gè)標(biāo)志來(lái)計(jì)算傾向編號(hào)(#unique users),而無(wú)需使用設(shè)置的數(shù)據(jù)結(jié)構(gòu)。

這個(gè)operator將狀態(tài)存儲(chǔ)到last-ramp-time+3天,之后狀態(tài)將被清除。

Numerator Computer

640.webp.jpg

圖7-Numerator Computer

Numerator Computer是一個(gè)KeyedProcessFunction,由experiment_hash,experiment_group,event_type作為key。它會(huì)在最后2小時(shí)內(nèi)一直滾動(dòng)15分鐘的存儲(chǔ)桶(bucket),每當(dāng)有新記錄進(jìn)入時(shí)都會(huì)更新這些桶。對(duì)于流量來(lái)說(shuō),每個(gè)動(dòng)作都很重要;因此對(duì)于每個(gè)事件,動(dòng)作計(jì)數(shù)都會(huì)增加。對(duì)于傾向數(shù)字(unique user)——它取決于"first_time_seen”標(biāo)志(僅在為true時(shí)遞增)。

隨著時(shí)間的流逝,存儲(chǔ)桶會(huì)滾動(dòng)/旋轉(zhuǎn)。每次新事件進(jìn)入時(shí),存儲(chǔ)桶數(shù)據(jù)都會(huì)向下游刷新到15分鐘的tumbling窗口中。

它有一個(gè)時(shí)間為3天的計(jì)時(shí)器(從ramp-up時(shí)間→3天),可在觸發(fā)后清除所有狀態(tài),這樣就能在ramp-up3天后重置/清除計(jì)數(shù),完成歸零。

垃圾消息與處理

為了使我們的流管道具有容錯(cuò)能力,F(xiàn)link的增量檢查點(diǎn)和RocksDB狀態(tài)后端被用來(lái)保存應(yīng)用程序檢查點(diǎn)。我們面臨的一項(xiàng)有趣挑戰(zhàn)是檢查點(diǎn)失敗。問(wèn)題似乎在于檢查點(diǎn)流程需要花費(fèi)很長(zhǎng)時(shí)間,并且最終會(huì)超時(shí)。我們還注意到,在發(fā)生檢查點(diǎn)故障時(shí)通常也會(huì)有很高的背壓。

640.webp (1).jpg

圖8-Flink UI中顯示的檢查點(diǎn)故障

在仔細(xì)檢查了檢查點(diǎn)故障的內(nèi)部機(jī)制之后,我們發(fā)現(xiàn)超時(shí)是由于某些子任務(wù)未將確認(rèn)發(fā)送給檢查點(diǎn)協(xié)調(diào)器而導(dǎo)致的,整個(gè)檢查點(diǎn)流程都卡住了,如下所示。

640.webp (2).jpg

圖9-子任務(wù)未發(fā)送確認(rèn)

然后我們針對(duì)導(dǎo)致失敗的根本原因應(yīng)用了一些調(diào)試步驟:

1.檢查作業(yè)管理日志

2.檢查在檢查點(diǎn)期間卡住的子任務(wù)的任務(wù)管理器日志

3.使用Jstack詳細(xì)查看子任務(wù)

原來(lái)子任務(wù)運(yùn)行很正常,只是抽不出空來(lái)處理消息。結(jié)果,這個(gè)特定的子任務(wù)具有很高的背壓,從而阻止了barrier通過(guò)。沒(méi)有barrier的收據(jù),檢查點(diǎn)流程將無(wú)法進(jìn)行。

在進(jìn)一步檢查所有子任務(wù)的Flink指標(biāo)之后,我們發(fā)現(xiàn)其中一個(gè)子任務(wù)產(chǎn)生的消息數(shù)量比其對(duì)等任務(wù)多100倍。由于消息是通過(guò)user_id在子任務(wù)之間分區(qū)的,這表明有些用戶(hù)產(chǎn)生的消息比其他用戶(hù)多得多,這就意味著那是垃圾消息。臨時(shí)查詢(xún)我們的spam_adjusted數(shù)據(jù)集后也確認(rèn)了這一結(jié)果。

640.webp (3).jpg

圖10-不同子任務(wù)的消息數(shù)

為了緩解該問(wèn)題,我們?cè)凇斑^(guò)濾器事件作業(yè)”中應(yīng)用了一個(gè)上限規(guī)則:對(duì)于一個(gè)小時(shí)內(nèi)的用戶(hù),如果我們看到的消息多于X條,則僅發(fā)送前X條消息。應(yīng)用上限規(guī)則后,檢查點(diǎn)就不再出現(xiàn)故障了。

數(shù)據(jù)穩(wěn)健性和驗(yàn)證

數(shù)據(jù)準(zhǔn)確性對(duì)于實(shí)驗(yàn)指標(biāo)的計(jì)算而言更為重要。為了確保我們的實(shí)時(shí)實(shí)驗(yàn)流程按預(yù)期運(yùn)行,并始終提供準(zhǔn)確的指標(biāo),我們啟動(dòng)了一個(gè)單獨(dú)的每日工作流,其執(zhí)行與流作業(yè)相同的計(jì)算,但使用的是臨時(shí)方式。如果流作業(yè)結(jié)果違反以下任一條件,則會(huì)提醒開(kāi)發(fā)人員:

·在同一累積期間(本例中為3天),計(jì)數(shù)不應(yīng)減少

·如果在第一個(gè)累積期之后進(jìn)行了re-ramp,則計(jì)數(shù)應(yīng)從0開(kāi)始再累積3天

·流結(jié)果與驗(yàn)證流結(jié)果之間的差異不應(yīng)超過(guò)某個(gè)閾值(在我們的例子中為2%)。

通過(guò)查詢(xún)實(shí)驗(yàn)元數(shù)據(jù),我們分別在3種情況下對(duì)實(shí)驗(yàn)進(jìn)行了驗(yàn)證:

1.單次ramp-up實(shí)驗(yàn)

2.在初始累積期間內(nèi)進(jìn)行多次ramp-up實(shí)驗(yàn)

3.在初始累積期后進(jìn)行多次ramp-up實(shí)驗(yàn)

這一流程如下所示:

640.webp (4).jpg

圖11-驗(yàn)證流程

規(guī)模

在這一部分中,我們提供了一些基本統(tǒng)計(jì)信息,展示實(shí)時(shí)實(shí)驗(yàn)管道的規(guī)模:

1.輸入主題流量(一天的平均值):

1597907775(1).png

2.100G檢查點(diǎn)

3.200~300個(gè)實(shí)驗(yàn)

4.8個(gè)master,50個(gè)worker,每個(gè)都是ec2 c5d.9xlarge

5.計(jì)算的并行度為256

未來(lái)計(jì)劃

1.支持更多指標(biāo),例如PWT(pinner等待時(shí)間),這樣如果實(shí)驗(yàn)導(dǎo)致Pinner的延遲異常增加,則可以盡快停止。

2.可能更新管道以使用Flink的協(xié)同處理功能代替“間隔連接”,使管道更具動(dòng)態(tài)性和彈性,以應(yīng)對(duì)事件流和Activation流之間的不同步問(wèn)題。

3.分區(qū):研究分區(qū)可以支持的分區(qū)類(lèi)型,因?yàn)榉謪^(qū)會(huì)導(dǎo)致?tīng)顟B(tài)增加。

4.通過(guò)電子郵件或Slack支持實(shí)時(shí)警報(bào)。

致謝

實(shí)時(shí)實(shí)驗(yàn)分析是Pinterest在生產(chǎn)環(huán)境中的第一個(gè)基于Flink的應(yīng)用程序。非常感謝我們的大數(shù)據(jù)平臺(tái)團(tuán)隊(duì)(特別感謝Steven Bairos-Novak、Jooseong Kim和Ang Zhang)構(gòu)建了Flink平臺(tái)并將其作為服務(wù)提供出來(lái)。同時(shí)還要感謝Analytics Platform團(tuán)隊(duì)(Bo Sun)出色的可視化效果,Logging Platform團(tuán)隊(duì)提供實(shí)時(shí)數(shù)據(jù)提取,以及Data Science團(tuán)隊(duì)(Brian Karfunkel)提供的統(tǒng)計(jì)咨詢(xún)!

立即登錄,閱讀全文
原文鏈接:點(diǎn)擊前往 >
文章來(lái)源:Ververica
版權(quán)說(shuō)明:本文內(nèi)容來(lái)自于Ververica,本站不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。文章內(nèi)容系作者個(gè)人觀點(diǎn),不代表快出海對(duì)觀點(diǎn)贊同或支持。如有侵權(quán),請(qǐng)聯(lián)系管理員(zzx@kchuhai.com)刪除!
優(yōu)質(zhì)服務(wù)商推薦
更多
掃碼登錄
打開(kāi)掃一掃, 關(guān)注公眾號(hào)后即可登錄/注冊(cè)
加載中
二維碼已失效 請(qǐng)重試
刷新
賬號(hào)登錄/注冊(cè)
個(gè)人VIP
小程序
快出海小程序
公眾號(hào)
快出海公眾號(hào)
商務(wù)合作
商務(wù)合作
投稿采訪
投稿采訪
出海管家
出海管家