Google Cloud 的 Dataproc 讓您能夠以更簡便、更經(jīng)濟的方式來基于 Google Cloud 運行原生 Apache Spark 和 Hadoop 集群。在本文中,我們將介紹在 Dataproc 的 Component Exchange 中提供的最新可選組件:Docker 和 Apache Flink。
Dataproc 中的 Docker 容器
Docker 是一種廣泛使用的容器技術(shù)。由于它現(xiàn)在是 Dataproc 可選組件,Docker 守護進程 (daemon) 現(xiàn)在可被安裝到 Dataproc 集群的每個節(jié)點。這將使您能夠安裝容器化應(yīng)用程序,并且在集群中輕松地與 Hadoop 集群交互。
此外,Docker 對于支持以下這些功能也至關(guān)重要:
1.通過 YARN 運行容器
2.可移植 Apache Beam 作業(yè)
在 YARN 中運行容器使您能夠單獨管理您的 YARN 應(yīng)用程序的依賴性,并且允許您在 YARN 中創(chuàng)建容器化的服務(wù)??梢浦?Apache Beam 將作業(yè)打包到 Docker 容器,并將其提交至 Flink 集群。
除了默認的 Docker registry,還可對 Docker 可選組件進行配置以使用 Google Container Registry。這使您能夠使用由您的組織管理的容器鏡像。
以下是利用 Docker 可選組件創(chuàng)建 Dataproc 集群的示例:
gcloud beta dataproc clusters create <cluster-name> \
--optional-components=DOCKER \
--image-version=1.5
當您運行 Docker 應(yīng)用程序時,使用 gcplogs 驅(qū)動程序,日志將被傳至 Cloud Logging。
如果您的應(yīng)用程序不依賴任何 Hadoop 服務(wù),核實 Kubernetes 和 Google Kubernetes Engine 是否以原生方式運行容器。要了解有關(guān) Dataproc 使用的更多信息,請參閱我們的相關(guān)文檔。
基于 Dataproc 的 Apache Flink
在流分析技術(shù)中,Apache Beam 和 Apache Flink 更加出色。Apache Flink 是一個基于有狀態(tài)計算的分布式處理引擎。Apache Beam 是定義批處理和流處理管道的統(tǒng)一模式。使用 Apache Flink 作為擴展引擎,除了 Google 的 Cloud Dataflow 服務(wù),您還可以在 Dataproc 中運行 Apache Beam 作業(yè)。
Flink 以及在 Flink 中運行 Beam 適合大規(guī)模連續(xù)作業(yè),可提供:
支持批處理和數(shù)據(jù)流程序的流優(yōu)先運行環(huán)境
同時支持非常高的吞吐量和低事件延遲的運行環(huán)境
具有精確單次處理保證的容錯
流程序中的自然背壓 (back-pressure)
自定義內(nèi)存管理以實現(xiàn)在內(nèi)存和核外數(shù)據(jù)處理算法之間高效、穩(wěn)健的切換
與 YARN 以及 Apache Hadoop 生態(tài)系統(tǒng)的其他組件集成
Google Cloud 的 Dataproc 團隊最近宣布 Flink Operator on Kubernetes 現(xiàn)已可用。它允許您在 Kubernetes 中運行 Apache Flink 作業(yè),具有減少平臺依賴性和產(chǎn)生更好的硬件效率的優(yōu)勢。
基本 Flink 概念
Flink 集群包括 Flink JobManager 以及一組 Flink TaskManager。與 YARN 之類的其他分布式系統(tǒng)中的類似角色相似,JobManager 的“責(zé)任”包括接受作業(yè)、管理資源以及監(jiān)控作業(yè)等。TaskManager 負責(zé)運行實際任務(wù)。
在 Dataproc 中運行 Flink 作業(yè)時,我們將 YARN 用作 Flink 的資源管理器。您可以以兩種方式運行 Flink 作業(yè):作業(yè)集群和會話集群。對于作業(yè)集群,YARN 將為作業(yè)創(chuàng)建 JobManager 和 TaskManagers,并且將在作業(yè)完成時銷毀集群。對于會話集群,YARN 將創(chuàng)建 JobManager 和幾個 TaskManager。集群可服務(wù)多個作業(yè)直至被用戶關(guān)閉。
如何利用 Flink 創(chuàng)建集群
使用以下命令作為開始:
gcloud beta dataproc clusters create <cluster-name> \
--optional-components=FLINK \
--image-version=1.5
如何運行 Flink 作業(yè)
在帶有 Flink 的 Dataproc 集群啟動后,您可以使用 Flink 作業(yè)集群直接將您的 Flink 作業(yè)提交至 YARN。接受作業(yè)后,F(xiàn)link 將在 YARN 中為此作業(yè)啟動 JobManager 和任務(wù)槽。Flink 作業(yè)將在 YARN 集群中運行,直至完成。然后,將關(guān)閉所創(chuàng)建的 JobManager。作業(yè)日志將在常規(guī) YARN 日志中提供。嘗試此命令以運行一個字數(shù)統(tǒng)計示例:
HADOOP_CLASSPATH=`hadoop classpath` flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar
默認情況下,Dataproc 集群將不啟動 Flink 會話集群。相反,Dataproc 將創(chuàng)建腳本“/usr/bin/flink-yarn-daemon”,該腳本將啟動 Flink 會話。
如果您要在 Dataproc 創(chuàng)建時啟動 Flink 會話,使用metadata關(guān)鍵詞來允許啟動:
gcloud dataproc clusters create <cluster-name> \
--optional-components=FLINK \
--image-version=1.5 \
--metadata flink-start-yarn-session=true
如果您要在 Dataproc 創(chuàng)建后啟動 Flink 會話,可在主節(jié)點運行下列命令:
$ . /usr/bin/flink-yarn-daemon
向該會話集群提交作業(yè)。您需要獲得 Flink JobManager URL:
HADOOP_CLASSPATH=`hadoop classpath` flink run -m <JOB_MANAGER_HOSTNAME>:<REST_API_PORT> /usr/lib/flink/examples/batch/WordCount.jar
如何運行 Java Beam 作業(yè)
運行以 Java 編寫的 Apache Beam 作業(yè)非常簡單。無需額外的配置。只要您將 Beam 作業(yè)打包為 JAR 文件,不需要進行任何配置即可在 Flink 中運行 Beam。以下是您可以使用的命令:
$ mvn package -Pflink-runner$ bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar--runner=FlinkRunner --other-parameters
如何運行以 Python 編寫的 Python Beam 作業(yè)
以 Python 編寫的 Beam 作業(yè)使用不同的執(zhí)行模式。要基于 Dataproc 在 Flink 中運行它們,您還需要啟用 Docker 可選組件。以下是創(chuàng)建集群的示例:
gcloud dataproc clusters create <cluster-name> \ --optional-components=FLINK,DOCKER
您還需要安裝 Beam 所必需的 Python 庫,例如,apache_beam 和 apache_beam[gcp]。您可以傳遞一個 Flink 主 URL,讓它在會話集群中運行。如果您未傳遞 URL,需要使用作業(yè)集群模式來運行此作業(yè):
import apache_beam as beamfrom apache_beam.options.pipeline_options import PipelineOptionsoptions = PipelineOptions([ "--runner=FlinkRunner", "--flink_version=1.9", "--flink_master=localhost:8081", "--environment_type=DOCKER"])with beam.Pipeline(options=options) as p: ...編寫 Python 作業(yè)后,只需運行它以提交:
$ python wordcount.py