Data Catalog讓您能夠通過交互式接口攝取和編輯業(yè)務(wù)元數(shù)據(jù)。它包括可用于實(shí)現(xiàn)常見任務(wù)自動(dòng)化的編程接口。許多企業(yè)必須使用Data Catalog定義和采集一組元數(shù)據(jù),因此,我們將在這里提供一些關(guān)于如何從長期角度聲明、創(chuàng)建和維護(hù)這類元數(shù)據(jù)的最佳實(shí)踐。
在以往的文章中,我們介紹了標(biāo)簽?zāi)0迥軌蛉绾瓮ㄟ^描述用于對數(shù)據(jù)資產(chǎn)進(jìn)行分類的詞匯表來促進(jìn)數(shù)據(jù)發(fā)現(xiàn)、治理和質(zhì)量控制。在本文中,我們將探討如何使用標(biāo)簽?zāi)0鍖?shù)據(jù)進(jìn)行標(biāo)記。標(biāo)記指創(chuàng)建一個(gè)標(biāo)簽?zāi)0宓膶?shí)例并為模板字段分配值,以對特定數(shù)據(jù)資產(chǎn)進(jìn)行分類。撰寫本文時(shí),Data Catalog支持三種存儲(chǔ)后端:BigQuery、Cloud Storage和Pub/Sub。在此,我們將著重介紹如何對在這些后端中存儲(chǔ)的資產(chǎn)進(jìn)行標(biāo)記,例如,表、列、文件和消息主題。
我們將介紹適合在數(shù)據(jù)湖和數(shù)據(jù)倉庫環(huán)境中標(biāo)記數(shù)據(jù)的三種使用模式:配置新數(shù)據(jù)源、處理派生數(shù)據(jù)以及更新標(biāo)簽和模板。對于每種應(yīng)用場景,您將了解到我們推薦的用于大規(guī)模標(biāo)記數(shù)據(jù)的方法。
1.配置數(shù)據(jù)源
配置數(shù)據(jù)源通常涉及幾種活動(dòng):根據(jù)存儲(chǔ)后端創(chuàng)建表或者文件、利用一些初始數(shù)據(jù)填充它們以及對這些資源設(shè)置訪問權(quán)限。我們在此基礎(chǔ)至上還多增加了一種活動(dòng):在Data Catalog中標(biāo)記新創(chuàng)建的資源。以下是涉及的具體步驟:
標(biāo)記數(shù)據(jù)源需要了解擬使用的標(biāo)簽?zāi)0宓暮x以及數(shù)據(jù)源中的數(shù)據(jù)語義的領(lǐng)域?qū)<??;谒哂械闹R(shí),領(lǐng)域?qū)<視?huì)選擇附加哪些模板以及從這些模板創(chuàng)建哪類標(biāo)簽。鑒于許多決策依賴于標(biāo)簽的準(zhǔn)確性,人的參與至關(guān)重要。
基于我們與客戶的合作經(jīng)驗(yàn),我們觀察到兩種類型的標(biāo)簽。一種類型稱為static(靜態(tài)),因?yàn)椋侄沃凳穷A(yù)先知道的,并且預(yù)計(jì)很少變更。另一種類型稱為dynamic(動(dòng)態(tài)),因?yàn)樽侄沃禃?huì)根據(jù)基礎(chǔ)數(shù)據(jù)的內(nèi)容定期變更。靜態(tài)標(biāo)簽的一個(gè)示例是包括data_domain(數(shù)據(jù)域)、data confidentiality(數(shù)據(jù)保密性)和data_retention(數(shù)據(jù)保留)的數(shù)據(jù)治理字段的集合。這些字段的值由組織的數(shù)據(jù)使用策略決定。它們通常在數(shù)據(jù)源創(chuàng)建時(shí)便已知曉,而且不會(huì)頻繁變更。動(dòng)態(tài)標(biāo)簽的一個(gè)示例是數(shù)據(jù)質(zhì)量字段的集合,例如,number_values(數(shù)值)、unique_values(唯一值)、min_value(最小值)和max_value(最大值)。每當(dāng)運(yùn)行新的負(fù)載或者對數(shù)據(jù)源進(jìn)行修改時(shí),這些字段值預(yù)計(jì)會(huì)頻繁變更。
除了這些差異,靜態(tài)標(biāo)簽還有級(jí)聯(lián)屬性,表明其字段應(yīng)當(dāng)以何種方式傳播——從源到派生數(shù)據(jù)。(在后續(xù)部分,我們將進(jìn)一步詳解這一概念。)與此形成對照的是,動(dòng)態(tài)標(biāo)簽有查詢表達(dá)式和刷新屬性,指示應(yīng)當(dāng)用于計(jì)算字段值的查詢以及計(jì)算的頻率。在第一個(gè)代碼段顯示了一個(gè)靜態(tài)標(biāo)簽的配置示例,第二個(gè)代碼段顯示的是動(dòng)態(tài)標(biāo)簽的示例。
tag_config:
template:
- template_id: dg_template
- project_id: sandbox
- region: us-central1
fields:
- {name: data_domain, value: HR, cascade: true}
- {name: data_confidentiality, value: SENSITIVE, cascade: true}
- {name: data_retention, value: 30_DAYS, cascade: false}
lineage:
- template_id: derived_template
- parents_field: parents
rules:
- included_uri_patterns: bigquery/project/sandbox/dataset/covid/*
- excluded_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_input_*
- included_uri_patterns: pubsub/project/sandbox/subscriptions/employee-RTO
基于YAML的靜態(tài)標(biāo)簽配置
tag_config:
template:
- template_id: dg_template
- project_id: sandbox
- region: us-central1
refresh: 1-hour
fields:
- {name: count, query_expression: select count(rto) from $$}
- {name: unique_values, query_expression: select distinct rto from $$}
- {name: null_values, query_expression: select count(*) from $$ where rto is null}
rules:
- included_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_RTO.rto
基于YAML的動(dòng)態(tài)標(biāo)簽配置
所前所述,當(dāng)他們?yōu)閿?shù)據(jù)源設(shè)置標(biāo)記時(shí),領(lǐng)域?qū)<覍檫@些配置提供輸入。更具體而言,他們首先選擇要為數(shù)據(jù)源附加的模板。其次,他們會(huì)選擇擬使用的標(biāo)簽類型,即,靜態(tài)還是動(dòng)態(tài)。接下來,他們會(huì)輸入各字段的值,如果類型是靜態(tài)的,還要輸入級(jí)聯(lián)設(shè)置,如果類型是動(dòng)態(tài)的,要輸入查詢表達(dá)式和刷新設(shè)置。這些輸入通過UI提供,因此,領(lǐng)域?qū)<覠o需編寫原始YAML文件。
一旦生成YAML文件,工具將基于規(guī)范在Data Catalog中解析配置并創(chuàng)建實(shí)際標(biāo)簽。工具還會(huì)根據(jù)刷新設(shè)置安排動(dòng)態(tài)標(biāo)簽的重新計(jì)算。盡管需要領(lǐng)域?qū)<疫M(jìn)行初始輸入,但實(shí)際的標(biāo)記任務(wù)可完全自動(dòng)化。我們推薦采用以下這種方法,這樣,不僅能在發(fā)布時(shí)對新創(chuàng)建的數(shù)據(jù)源進(jìn)行標(biāo)記,而且無需人工操作即可對標(biāo)簽進(jìn)行長期維護(hù)。
2.處理派生數(shù)據(jù)
除了標(biāo)記數(shù)據(jù)源,能夠?qū)ε缮鷶?shù)據(jù)進(jìn)行大規(guī)模標(biāo)記同樣至關(guān)重要。我們將派生數(shù)據(jù)寬泛地定義為以從一個(gè)或多個(gè)數(shù)據(jù)源轉(zhuǎn)換的方式創(chuàng)建的任何數(shù)據(jù)段。這種類型的數(shù)據(jù)與數(shù)據(jù)湖和數(shù)據(jù)倉庫場景尤其具有相關(guān)性,在這類環(huán)境中,數(shù)據(jù)產(chǎn)品通常從各種數(shù)據(jù)源派生。
派生數(shù)據(jù)的標(biāo)簽應(yīng)當(dāng)由原數(shù)據(jù)源和應(yīng)用于數(shù)據(jù)的轉(zhuǎn)換類型組成。原數(shù)據(jù)源的URI被保持在標(biāo)簽中,并且在標(biāo)簽中還存儲(chǔ)一個(gè)或多個(gè)轉(zhuǎn)換類型——例如,聚合、匿名化、歸一化等。我們建議將標(biāo)記創(chuàng)建邏輯融入生成派生數(shù)據(jù)的管道中。利用Airflow DAGs和Beam,這是可行的。例如,如果一個(gè)數(shù)據(jù)管道連接兩個(gè)數(shù)據(jù)源,聚合結(jié)果并將其存儲(chǔ)到表中,您可以參考兩個(gè)原數(shù)據(jù)源以及aggregation:true,基于結(jié)果表創(chuàng)建標(biāo)簽。以下,您可以看到創(chuàng)建此標(biāo)簽的Beam管道的代碼段:
with beam.Pipeline(options=pipeline_options) as p: sql = 'select covid_county, covid_state, sum_new_cases from views.v_covid_new_cases' bq_source = beam.io.BigQuerySource(query=sql, use_standard_sql=True) covid_query_results = p | 'Read from BigQuery' >> beam.io.Read(bq_source) subscription_name = 'projects/scohen-sandbox/subscriptions/employee-RTO' message = p | 'Read message' >> ReadFromPubSub(topic=None, subscription=subscription_name, timestamp_attribute=None) emp_pcoll = message | 'Get Age' >> beam.ParDo(GetAge()) joined_emp_pcoll = emp_pcoll | 'Join Data' >> beam.ParDo(Join(), beam.pvalue.AsList(covid_query_results)) batch_joined_pcoll = joined_emp_pcoll | 'Batch Join' >> BatchElements(min_batch_size=10, max_batch_size=20) masked_dob_pcoll = batch_joined_pcoll | 'Mask DOB' >> beam.ParDo(MaskDOB()) batch_masked_pcoll = masked_dob_pcoll| 'Batch Mask' >> BatchElements(min_batch_size=10, max_batch_size=20) bucket_age_pcoll = batch_masked_pcoll | 'Bucket Age' >> beam.ParDo(BucketAge()) batch_age_pcoll = bucket_age_pcoll | 'Batch Bucket Age' >> BatchElements(min_batch_size=4, max_batch_size=5) hash_id_pcoll = batch_age_pcoll | 'Hash Id' >> beam.ParDo(HashId()) hash_id_pcoll | 'Write Table' >> WriteToBigQuery(table, schema) # Tag Employee_RTO table with Derived Data template template = 'derived_template' dc_client = datacatalog_v1.DataCatalogClient() tag.template = dc_client.tag_template_path(project_id, region, template) tag = datacatalog_v1.types.Tag() table_resource = '//bigquery.googleapis.com/projects/' + project_id + '/datasets/' + dataset + '/tables/' + short_table_name table_entry = dc_client.lookup_entry(linked_resource=table_resource) tag.fields['parents'].string_value = 'pubsub/project/sandbox/subscriptions/employee-RTO,bigquery/project/sandbox/dataset/views/v_covid_new_cases' tag.fields['aggregated_data'].bool_value = False tag.fields['pseudo_anonymized_data'].bool_value = True tag.fields['anonymized_data'].bool_value = False tag.fields['origin_product'].enum_value.display_name = 'DATAFLOW' long_ts = datetime.now(tz.gettz("America/Chicago")).isoformat() ts = timestamp_value[0:19] + timestamp_value[26:32] tag.fields['date_data_processed'].timestamp_value.FromJsonString(ts) response = dc_client.create_tag(parent=table_entry.name, tag=tag)
帶標(biāo)記邏輯的Beam管道
一旦利用其原數(shù)據(jù)源對派生數(shù)據(jù)進(jìn)行標(biāo)記,您可以使用此信息傳播附加到原數(shù)據(jù)源的靜態(tài)標(biāo)簽。這是cascade屬性發(fā)揮作用的地方,指示哪些字段應(yīng)當(dāng)被傳播到其派生數(shù)據(jù)。在以上所示的第一個(gè)代碼段中顯示了cascade屬性的示例,其中data_domain和data_confidentiality字段被傳播,而data_retention字段未被傳播。這意味著,BigQuery中派生的任何表將使用dg_template利用data_domain:HR和data_confidentiality:CONFIDENTIAL進(jìn)行標(biāo)記。
3.處理更新
有幾種場景需要針對標(biāo)簽和模板的更新能力。例如,如果業(yè)務(wù)分析師發(fā)現(xiàn)標(biāo)簽中的一個(gè)錯(cuò)誤,需要對一個(gè)或多個(gè)值進(jìn)行更正。如果要采用新的數(shù)據(jù)使用策略,可能需要為模板添加新的字段并對現(xiàn)有字段重命名或者刪除。
我們?yōu)闃?biāo)記和模板更新提供配置,如下圖所示。標(biāo)記更新配置指定將變更的每個(gè)字段的當(dāng)前值和新值。工具處理配置并基于規(guī)范更新標(biāo)簽中的字段的值。如果更新的標(biāo)簽是靜態(tài)的,工具還會(huì)將變更傳播至派生數(shù)據(jù)的相同標(biāo)簽。
模板更新配置制定變更的字段名、字段類型以及任何枚舉值。工具通過首先確定變更的性質(zhì)來處理更新。撰寫本文時(shí),Data Catalog支持對模板添加和刪除字段以及添加枚舉值,但尚不支持字段重命名或者類型變更。因此,如果需要簡單添加或者刪除,工具會(huì)對現(xiàn)有模板進(jìn)行修改。否則,必須重新創(chuàng)建整個(gè)模板以及所有從屬標(biāo)簽。
tag_config: template: - template_id: dg_template - project_id: sandbox - region: us-central1 fields: - {name: data_confidentiality, current: SENSITIVE, new: SHARED_INTERNALLY, cascade: true} - {name: data_retention, current: 30_DAYS, new: 60_DAYS, cascade: false} rules: - included_uri_patterns: bigquery/project/sandbox/dataset/covid/* - excluded_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_input_* - included_uri_patterns: pubsub/project/sandbox/subscriptions/employee-RTO
基于YAML的標(biāo)簽更新配置
template_config: - template_id: dg_template - project_id: sandbox - region: us-central1 fields: - {name: data_confidentiality, type: enum, values: {SENSITIVE, SHARED_INTERNALLY, SHARED_EXTERNALLY, PUBLIC, UNKNOWN} - {name: data_retention, type: enum, values: {30_DAYS, 60_DAYS, 90_DAYS, 120_DAYS, 1_YEAR, 2_YEARS, 5_YEARS, UNKNOWN}
基于YAML的模板更新配置
我們已經(jīng)開始對這些方法進(jìn)行原型創(chuàng)建,以發(fā)布一個(gè)開源工具,實(shí)現(xiàn)按照我們建議的使用模式在Data Catalog中創(chuàng)建和維護(hù)標(biāo)簽所涉及的許多任務(wù)的自動(dòng)化。