隨著IT技術(shù)與大數(shù)據(jù)的不斷發(fā)展,越來越多的企業(yè)開始意識(shí)到數(shù)據(jù)的價(jià)值,通過大數(shù)據(jù)分析,可以幫助企業(yè)更深入地了解用戶需求、更好地洞察市場趨勢。目前大數(shù)據(jù)分析在每個(gè)業(yè)務(wù)運(yùn)營中都發(fā)揮著重要作用,成為企業(yè)提升市場競爭力的關(guān)鍵舉措之一。通常企業(yè)會(huì)構(gòu)建數(shù)據(jù)湖倉,將多個(gè)數(shù)據(jù)源通過數(shù)據(jù)集成技術(shù),匯集一起進(jìn)行數(shù)據(jù)分析。由此,數(shù)據(jù)集成成為了構(gòu)建數(shù)據(jù)湖倉的必經(jīng)之路,然而企業(yè)在數(shù)據(jù)集成過程中卻面臨很多棘手問題。
·全量+增量數(shù)據(jù)集成割裂
傳統(tǒng)的數(shù)據(jù)集成大多僅支持全量數(shù)據(jù),對(duì)于全量+增量的一并集成,則需要分別部署鏈路,獲取到數(shù)據(jù)后再手動(dòng)合并。
·多個(gè)數(shù)據(jù)源頭,操作與維護(hù)復(fù)雜
表結(jié)構(gòu)頻繁變更,無法自動(dòng)同步表結(jié)構(gòu)變更到數(shù)據(jù)湖倉,手動(dòng)維護(hù)成本高。另外無法”一鍵”整庫同步,追加同步對(duì)象操作復(fù)雜等。
·數(shù)據(jù)獲取時(shí)效性差
傳統(tǒng)的數(shù)據(jù)集成技術(shù)建模路徑較長,按照T+1的方式同步到數(shù)據(jù)倉庫中,時(shí)效性差。需要做到實(shí)時(shí)數(shù)據(jù)集成和分析,才能幫助用戶根據(jù)最新的數(shù)據(jù)做出更快、更準(zhǔn)確的決策。
基于數(shù)據(jù)集成的核心痛點(diǎn)和用戶訴求,近期騰訊云數(shù)據(jù)傳輸服務(wù)DTS重磅發(fā)布全新數(shù)據(jù)集成方案,該方案采取全增量數(shù)據(jù)一起的同步方式,將數(shù)據(jù)源先同步到Ckafka,再從Ckafka消費(fèi)數(shù)據(jù)投遞到數(shù)據(jù)湖倉,可以有效幫助用戶解決數(shù)據(jù)湖倉建設(shè)前期數(shù)據(jù)集成的問題。
關(guān)于DTS
選擇DTS做數(shù)據(jù)集成是因?yàn)镈TS有著技術(shù)上的天然優(yōu)勢。
2.1 DTS簡介
DTS是騰訊云自主研發(fā)的專注于數(shù)據(jù)庫傳輸服務(wù)的工具,具有高傳輸性能、高可用、安全連接、操作便捷等特點(diǎn),可以實(shí)現(xiàn)數(shù)據(jù)源在業(yè)務(wù)不停服狀態(tài)下的實(shí)時(shí)數(shù)據(jù)同步,整個(gè)數(shù)據(jù)同步過程對(duì)源庫業(yè)務(wù)無影響。DTS已成功應(yīng)用于金融、醫(yī)療、娛樂、泛互聯(lián)網(wǎng)等多個(gè)行業(yè)場景,幫助用戶實(shí)現(xiàn)不同系統(tǒng)的數(shù)據(jù)打通和自由流動(dòng),如數(shù)據(jù)庫遷移上云、數(shù)據(jù)庫異地備份、異地多活等。
2.2 DTS的技術(shù)優(yōu)勢
首先,DTS本身已支持多種數(shù)據(jù)源的同步,涵蓋MySQL、MariaDB、Percona、TDSQL-C MySQL版、TDSQL MySQL版、PostgreSQL、Redis、MongoDB、SQL Server等,對(duì)各種類型的數(shù)據(jù)庫以及對(duì)應(yīng)的數(shù)據(jù)格式都“了如指掌”,可以保證數(shù)據(jù)同步結(jié)果的正確性。
在DTS已有的技術(shù)積累中,已支持了無鎖同步技術(shù),即在同步過程中,不會(huì)對(duì)源庫加全局只讀鎖(FTWRL),避免影響源庫的寫入。在數(shù)據(jù)同步過程中,源庫若發(fā)生主從切換、重啟等,任務(wù)都可以正常運(yùn)行,不會(huì)被中斷。這些技術(shù)都對(duì)源庫非常友好,保證使用DTS同步數(shù)據(jù)的同時(shí),不影響源庫業(yè)務(wù)的正常運(yùn)行。
其次,提供全增量一體的數(shù)據(jù)集成能力是當(dāng)前業(yè)界的主流發(fā)展方向,而DTS本身就具備此能力,DTS在數(shù)據(jù)庫之間的同步機(jī)制,原生就采用全增量無縫銜接的同步機(jī)制,既能保證數(shù)據(jù)一致性,又能保證數(shù)據(jù)的實(shí)時(shí)性。這個(gè)能力可以避免因使用不同工具分別集成全量或增量導(dǎo)致的難以保證數(shù)據(jù)連貫性和一致性的問題。
最后,基于DTS的操作和維護(hù)都是Web界面,用戶只需簡單的3-4步即可完成配置,非常便利。
基于DTS本身具有的技術(shù)優(yōu)勢,且技術(shù)日漸成熟和完善,加上用戶對(duì)大數(shù)據(jù)集成訴求的日益突出,DTS構(gòu)建數(shù)據(jù)集成的能力也就應(yīng)時(shí)而生。
2.3基于DTS的數(shù)據(jù)集成方案
DTS在做數(shù)據(jù)集成方案的初期,產(chǎn)研團(tuán)隊(duì)做了非常充分的調(diào)研,并分析出了用戶的核心訴求,主要聚焦以下四個(gè)方面:
支持全量+增量數(shù)據(jù)同步:方便快速將全量+增量數(shù)據(jù)全部同步至下游數(shù)據(jù)分析工具中。
按序消費(fèi):數(shù)據(jù)消費(fèi)時(shí)需要按照數(shù)據(jù)生產(chǎn)的順序進(jìn)行。
數(shù)據(jù)不丟失:數(shù)據(jù)在下游消費(fèi)時(shí)至少要出現(xiàn)一次,不能丟失業(yè)務(wù)數(shù)據(jù)。
維護(hù)便捷:庫表結(jié)構(gòu)變更,或者庫表對(duì)象追加需要方便操作。
DTS的「數(shù)據(jù)訂閱」模塊可以應(yīng)用于數(shù)據(jù)集成并分發(fā)到下游的場景中,但訂閱模塊主要處理增量數(shù)據(jù),無法實(shí)現(xiàn)全量+增量一起同步。經(jīng)過多次的技術(shù)探討和驗(yàn)證后,我們最終決定基于「數(shù)據(jù)同步」模塊來做數(shù)據(jù)集成,技術(shù)方案:數(shù)據(jù)源先通過DTS同步數(shù)據(jù)到Ckafka,再從Ckafka消費(fèi)數(shù)據(jù)投遞到數(shù)據(jù)湖倉。
不過實(shí)際落地中,我們還是遇到了一些挑戰(zhàn)。
2.3.1全量部分?jǐn)?shù)據(jù)塊很大,如何提升導(dǎo)出導(dǎo)入效率?
使用DTS數(shù)據(jù)同步模塊來做數(shù)據(jù)集成,可以滿足全量+增量一起同步的訴求,但在大數(shù)據(jù)場景下,又不得不面臨兩個(gè)問題:對(duì)于大表(如10億行以上),如何提升同步作業(yè)效率?對(duì)于超大的存量數(shù)據(jù),在全量階段遇到任務(wù)中斷時(shí),如何確保數(shù)據(jù)重入?
基于以上問題,DTS設(shè)計(jì)了分塊導(dǎo)出方案,針對(duì)大表場景(如10億行以上),從源庫導(dǎo)出數(shù)據(jù)時(shí)將一張大表分為多個(gè)分塊,一個(gè)分塊連接一個(gè)線程,這樣一張大表就可實(shí)現(xiàn)多分塊同時(shí)導(dǎo)出,提升大表的同步效率。
在導(dǎo)入到目標(biāo)kafka時(shí),也是按照分塊導(dǎo)入的,同時(shí)這些分塊都會(huì)進(jìn)行標(biāo)記,如果kafka發(fā)生重啟,可以根據(jù)標(biāo)記來識(shí)別中斷的分塊位置,從中斷的分塊開始繼續(xù)向目標(biāo)kafka寫入。使用這個(gè)方式,在遇到kafka異常時(shí),就不需要從頭重新寫,大大提升用戶體驗(yàn)。
2.3.2多分區(qū),如何保證按序消費(fèi)?
為了提升用戶消費(fèi)的速率,消息投遞到Kafka時(shí)一般采用投遞到kafka的多個(gè)分區(qū)的形式,多個(gè)分區(qū)可以并行消費(fèi)以提升消費(fèi)速率,但在多分區(qū)處理過程中,會(huì)涉及投遞順序的問題,需要保證投遞到每個(gè)分區(qū)的消息與業(yè)務(wù)生產(chǎn)的消息順序保持一致。
在實(shí)現(xiàn)中,DTS向Kafka投遞消息時(shí),按照源庫日志解析后的順序來寫入,因此可以實(shí)現(xiàn)寫入Kafka順序與業(yè)務(wù)生成順序的一致。
·全局順序性
DTS在拉取源庫的binlog日志時(shí),采用單線程機(jī)制,先保證日志解析結(jié)果與業(yè)務(wù)生產(chǎn)順序保持一致,等寫入到kafka的多個(gè)分區(qū)時(shí),再按照多線程并發(fā),最終實(shí)現(xiàn)了每個(gè)分區(qū)的消息都是按序排列。
這里需要說明下,投遞到多Topic+多分區(qū)這種形式中,每個(gè)分區(qū)內(nèi)的消息都是按順序投遞的,但是多個(gè)分區(qū)同時(shí)消費(fèi)時(shí),無法保證分區(qū)間按序消費(fèi),如果用戶對(duì)消費(fèi)到的消息順序有嚴(yán)格要求,建議選擇投遞到單Topic+單分區(qū)的形式。
·表級(jí)別順序性
在選擇按表名分區(qū)的場景中,源庫同一個(gè)表的數(shù)據(jù)變更都會(huì)投遞到目標(biāo)Topic下的同一個(gè)分區(qū)中,因?yàn)槿罩镜慕馕鍪前葱蚺帕校酝哆f到Topic分區(qū)中的消息也是按序排列。
總之,不論選擇哪種分區(qū)策略,DTS都可以保證投遞到各分區(qū)中消息的順序性。
2.3.3如何保證數(shù)據(jù)不丟?
要保證同步到Kafka的數(shù)據(jù)一條都不丟,那么所有的數(shù)據(jù)就需要有跡可循,哪些已經(jīng)同步過了、哪些還沒有同步過,都必須清楚可查。于是DTS通過對(duì)數(shù)據(jù)做標(biāo)記,標(biāo)識(shí)數(shù)據(jù)同步位置,以此來實(shí)現(xiàn)數(shù)據(jù)準(zhǔn)確同步。
全量階段,數(shù)據(jù)按照分塊機(jī)制進(jìn)行導(dǎo)出導(dǎo)入,DTS導(dǎo)入到目標(biāo)端Kafka的每個(gè)分塊都會(huì)進(jìn)行標(biāo)記,kafka異常時(shí),可以識(shí)別中斷的分塊位置繼續(xù)導(dǎo)入。
增量階段,DTS內(nèi)部處理源庫的日志解析時(shí)會(huì)插入標(biāo)記,來識(shí)別數(shù)據(jù)寫入到Kafka的位置,如果任務(wù)中斷再恢復(fù),通過DTS內(nèi)部標(biāo)記,可以找到中斷的位置,繼續(xù)增量同步。
2.3.4庫表變更,能否靈活同步?
業(yè)務(wù)數(shù)據(jù)庫經(jīng)常會(huì)有庫表結(jié)構(gòu)的變更,而數(shù)據(jù)集成需要能識(shí)別并自動(dòng)同步這些變更字段,否則,庫表結(jié)構(gòu)每變更一次,就需要手動(dòng)改一次集成程序,這個(gè)維護(hù)工作量非常大。在DTS以前的鏈路傳輸中,庫表結(jié)構(gòu)變更的自動(dòng)同步能力就已經(jīng)具備了,直接集成即可。但是我們本次需要解決的是,當(dāng)同步任務(wù)已經(jīng)啟動(dòng),用戶想要追加/刪除一個(gè)新的庫表對(duì)象,如何做到一鍵化操作,讓用戶便捷維護(hù)。
這里,我們以追加一個(gè)表對(duì)象為例,同步任務(wù)已經(jīng)在進(jìn)行中,但是運(yùn)行過程中發(fā)現(xiàn)需要新增一個(gè)表對(duì)象(例如表A),對(duì)用戶來說,只需要在DTS任務(wù)列表頁,進(jìn)行一步可視化點(diǎn)擊操作即可完成。
動(dòng)態(tài)修改同步對(duì)象的過程中,其實(shí)DTS底層做了很多工作,對(duì)用戶操作層面進(jìn)行了簡化,如上述操作案例:新增一個(gè)表對(duì)象(例如表A),DTS需要同步表A的歷史存量數(shù)據(jù),同時(shí),已有的同步任務(wù)1還不能受影響。所以在實(shí)現(xiàn)中,我們?cè)贒TS后臺(tái)構(gòu)造了一個(gè)臨時(shí)任務(wù)2,來負(fù)責(zé)同步表A的存量數(shù)據(jù),當(dāng)任務(wù)2完成后,再將任務(wù)1和任務(wù)2合并,以此來實(shí)現(xiàn)動(dòng)態(tài)追加同步對(duì)象的效果。
相對(duì)于一般的集成工具,DTS在庫表結(jié)構(gòu)的變更,庫表對(duì)象增加/刪除等方面都是非常友好的,用戶只需要在Web界面進(jìn)行操作,一次配置,即可享受長期便利,大大減少用戶的維護(hù)成本。
接下來,給大家重點(diǎn)介紹DTS的數(shù)據(jù)集成方案是如何配置的。
DTS+Ckafka+數(shù)據(jù)湖倉生產(chǎn)實(shí)踐
3.1實(shí)踐場景
數(shù)據(jù)源頭為MySQL,通過DTS獲取MySQL的全量+增量數(shù)據(jù)到消息隊(duì)列Ckafka,然后適配消費(fèi)Demo,將消息投遞到數(shù)據(jù)湖倉。
3.2前期準(zhǔn)備
·準(zhǔn)備騰訊云Ckafka實(shí)例,并創(chuàng)建好消費(fèi)組和消費(fèi)topic。
·準(zhǔn)備源數(shù)據(jù)庫MySQL。
·準(zhǔn)備執(zhí)行DTS任務(wù)的賬號(hào),并授權(quán)源庫和目標(biāo)庫的對(duì)應(yīng)權(quán)限。
·準(zhǔn)備數(shù)據(jù)湖倉。
3.3數(shù)據(jù)同步
DTS的操作比較簡單,在騰訊云Web界面進(jìn)行4個(gè)步驟即可,無需環(huán)境部署。
步驟1:創(chuàng)建DTS任務(wù)
購買一個(gè)DTS任務(wù),源庫選擇MySQL,目標(biāo)庫選擇Ckafka。
步驟2:設(shè)置同步源和目標(biāo)數(shù)據(jù)庫
配置DTS連接源庫和目標(biāo)庫,源庫配置中填入MySQL的主機(jī)地址/端口/用戶名/密碼,目標(biāo)庫選擇Ckafka實(shí)例ID。
這個(gè)步驟主要是驗(yàn)證DTS到源和目標(biāo)庫的網(wǎng)絡(luò)是否打通,對(duì)應(yīng)的用戶權(quán)限是否滿足要求,如果源庫有安全組設(shè)置需要允許DTS IP訪問,否則網(wǎng)絡(luò)不通。
步驟3:配置數(shù)據(jù)同步選項(xiàng)
這個(gè)步驟主要是選擇同步的數(shù)據(jù)格式(Avro、JSON)、數(shù)據(jù)投遞到Ckafka的哪個(gè)topic下、分區(qū)策略等。
·對(duì)于庫表結(jié)構(gòu)的變更,一鍵勾選DDL,即可在后續(xù)自動(dòng)同步庫表結(jié)構(gòu)的變更數(shù)據(jù)。
·選定同步的庫表對(duì)象后,如果有需要追加,在任務(wù)啟動(dòng)后通過修改任務(wù)即可添加。
步驟4:校驗(yàn)任務(wù)
上述配置完成后,DTS會(huì)對(duì)源和目標(biāo)庫的各項(xiàng)參數(shù)進(jìn)行預(yù)校驗(yàn),如Binlog必須開啟,并且binlog_format需要設(shè)置為row模式等等,以保證數(shù)據(jù)同步結(jié)果的正確性。預(yù)校驗(yàn)通過后同步任務(wù)就可以啟動(dòng)了。
3.4數(shù)據(jù)消費(fèi)和投遞
步驟1:下載消費(fèi)Demo樣例
DTS同步任務(wù)正常運(yùn)行后,下載DTS消費(fèi)Demo樣例,將Demo包解壓后運(yùn)行,進(jìn)行數(shù)據(jù)消費(fèi)。
這里以Go語言為例,解壓Demo包后運(yùn)行g(shù)o build-o subscribe./main/main.go,生成可執(zhí)行文件subscribe。
然后運(yùn)行./subscribe--brokers=xxx--topic=xxx--group=xxx--trans2sql=true。這里的brokers、topic、group分別填入Ckafka的地址、消費(fèi)topic名稱、消費(fèi)組名稱。
運(yùn)行結(jié)果顯示如下,表示Kafka正常連接,消費(fèi)鏈路已打通。
步驟2:測試數(shù)據(jù)結(jié)果
在源數(shù)據(jù)庫上插入一條數(shù)據(jù)。
在消費(fèi)端即可查看到對(duì)應(yīng)數(shù)據(jù)。
步驟3:修改Demo,增加適配到后端數(shù)倉的代碼邏輯
DTS提供的消費(fèi)Demo僅對(duì)數(shù)據(jù)做了打印處理,用戶需要在Demo基礎(chǔ)上自行編寫數(shù)據(jù)處理到后端數(shù)據(jù)湖倉的適配邏輯。
實(shí)踐效果
使用DTS同步到Kafka的鏈路形式替代之前使用Canal組件的鏈路,最終實(shí)現(xiàn)高性能傳輸、高穩(wěn)定性保障的同時(shí)有效降低了運(yùn)維成本。
傳輸性能高:DTS的傳輸性能與用戶實(shí)際網(wǎng)絡(luò)延時(shí)、帶寬、數(shù)據(jù)本身的規(guī)格配置都有關(guān)系,在用戶源端和目標(biāo)端規(guī)格都比較高,網(wǎng)絡(luò)無瓶頸的情況下,項(xiàng)目實(shí)測DTS全量階段的RPS最高可達(dá)30萬/s,增量階段最高可達(dá)1.5萬/s。
穩(wěn)定性強(qiáng):DTS可提供高SLA保證,任務(wù)穩(wěn)定性極強(qiáng)。
運(yùn)維成本低:用戶之前使用Canal組件時(shí),平均每月大概需要半個(gè)人力投入到研發(fā)和運(yùn)維中,改用DTS后,任務(wù)配置完成后基本無需運(yùn)維人員投入,大大減少運(yùn)維成本。
DTS提供的同步到Kafka數(shù)據(jù)集成方案具有通用性,目前已成功應(yīng)用在出行、零售、游戲、互聯(lián)網(wǎng)、金融等多個(gè)行業(yè),并收獲了用戶的良好口碑。
總結(jié)和展望
DTS目前已上線了MySQL系列數(shù)據(jù)庫同步到kafka的鏈路,為用戶在大數(shù)據(jù)集成中提供了便捷的技術(shù)通道,后續(xù)為了滿足用戶更多的需求和更高的使用體驗(yàn),DTS將聚焦「數(shù)據(jù)庫生態(tài)」和「產(chǎn)品體驗(yàn)」上持續(xù)發(fā)力。
數(shù)據(jù)庫生態(tài)方面:持續(xù)拓寬數(shù)據(jù)庫生態(tài),支持其他類型的數(shù)據(jù)庫同步到kafka,如MongoDB,Oracle,PostgreSQL等同步到kafka。
產(chǎn)品體驗(yàn)方面:支持更多高階特性,如全量階段支持?jǐn)?shù)據(jù)可重入,投遞到多topic的策略優(yōu)化等等。