Data Catalog讓您能夠通過交互式接口攝取和編輯業(yè)務元數(shù)據(jù)。它包括可用于實現(xiàn)常見任務自動化的編程接口。許多企業(yè)必須使用Data Catalog定義和采集一組元數(shù)據(jù),因此,我們將在這里提供一些關(guān)于如何從長期角度聲明、創(chuàng)建和維護這類元數(shù)據(jù)的最佳實踐。
在以往的文章中,我們介紹了標簽模板能夠如何通過描述用于對數(shù)據(jù)資產(chǎn)進行分類的詞匯表來促進數(shù)據(jù)發(fā)現(xiàn)、治理和質(zhì)量控制。在本文中,我們將探討如何使用標簽模板對數(shù)據(jù)進行標記。標記指創(chuàng)建一個標簽模板的實例并為模板字段分配值,以對特定數(shù)據(jù)資產(chǎn)進行分類。撰寫本文時,Data Catalog支持三種存儲后端:BigQuery、Cloud Storage和Pub/Sub。在此,我們將著重介紹如何對在這些后端中存儲的資產(chǎn)進行標記,例如,表、列、文件和消息主題。
我們將介紹適合在數(shù)據(jù)湖和數(shù)據(jù)倉庫環(huán)境中標記數(shù)據(jù)的三種使用模式:配置新數(shù)據(jù)源、處理派生數(shù)據(jù)以及更新標簽和模板。對于每種應用場景,您將了解到我們推薦的用于大規(guī)模標記數(shù)據(jù)的方法。
1.配置數(shù)據(jù)源
配置數(shù)據(jù)源通常涉及幾種活動:根據(jù)存儲后端創(chuàng)建表或者文件、利用一些初始數(shù)據(jù)填充它們以及對這些資源設置訪問權(quán)限。我們在此基礎至上還多增加了一種活動:在Data Catalog中標記新創(chuàng)建的資源。以下是涉及的具體步驟:
標記數(shù)據(jù)源需要了解擬使用的標簽模板的含義以及數(shù)據(jù)源中的數(shù)據(jù)語義的領域?qū)<??;谒哂械闹R,領域?qū)<視x擇附加哪些模板以及從這些模板創(chuàng)建哪類標簽。鑒于許多決策依賴于標簽的準確性,人的參與至關(guān)重要。
基于我們與客戶的合作經(jīng)驗,我們觀察到兩種類型的標簽。一種類型稱為static(靜態(tài)),因為,字段值是預先知道的,并且預計很少變更。另一種類型稱為dynamic(動態(tài)),因為字段值會根據(jù)基礎數(shù)據(jù)的內(nèi)容定期變更。靜態(tài)標簽的一個示例是包括data_domain(數(shù)據(jù)域)、data confidentiality(數(shù)據(jù)保密性)和data_retention(數(shù)據(jù)保留)的數(shù)據(jù)治理字段的集合。這些字段的值由組織的數(shù)據(jù)使用策略決定。它們通常在數(shù)據(jù)源創(chuàng)建時便已知曉,而且不會頻繁變更。動態(tài)標簽的一個示例是數(shù)據(jù)質(zhì)量字段的集合,例如,number_values(數(shù)值)、unique_values(唯一值)、min_value(最小值)和max_value(最大值)。每當運行新的負載或者對數(shù)據(jù)源進行修改時,這些字段值預計會頻繁變更。
除了這些差異,靜態(tài)標簽還有級聯(lián)屬性,表明其字段應當以何種方式傳播——從源到派生數(shù)據(jù)。(在后續(xù)部分,我們將進一步詳解這一概念。)與此形成對照的是,動態(tài)標簽有查詢表達式和刷新屬性,指示應當用于計算字段值的查詢以及計算的頻率。在第一個代碼段顯示了一個靜態(tài)標簽的配置示例,第二個代碼段顯示的是動態(tài)標簽的示例。
tag_config:
template:
- template_id: dg_template
- project_id: sandbox
- region: us-central1
fields:
- {name: data_domain, value: HR, cascade: true}
- {name: data_confidentiality, value: SENSITIVE, cascade: true}
- {name: data_retention, value: 30_DAYS, cascade: false}
lineage:
- template_id: derived_template
- parents_field: parents
rules:
- included_uri_patterns: bigquery/project/sandbox/dataset/covid/*
- excluded_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_input_*
- included_uri_patterns: pubsub/project/sandbox/subscriptions/employee-RTO
基于YAML的靜態(tài)標簽配置
tag_config:
template:
- template_id: dg_template
- project_id: sandbox
- region: us-central1
refresh: 1-hour
fields:
- {name: count, query_expression: select count(rto) from $$}
- {name: unique_values, query_expression: select distinct rto from $$}
- {name: null_values, query_expression: select count(*) from $$ where rto is null}
rules:
- included_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_RTO.rto
基于YAML的動態(tài)標簽配置
所前所述,當他們?yōu)閿?shù)據(jù)源設置標記時,領域?qū)<覍檫@些配置提供輸入。更具體而言,他們首先選擇要為數(shù)據(jù)源附加的模板。其次,他們會選擇擬使用的標簽類型,即,靜態(tài)還是動態(tài)。接下來,他們會輸入各字段的值,如果類型是靜態(tài)的,還要輸入級聯(lián)設置,如果類型是動態(tài)的,要輸入查詢表達式和刷新設置。這些輸入通過UI提供,因此,領域?qū)<覠o需編寫原始YAML文件。
一旦生成YAML文件,工具將基于規(guī)范在Data Catalog中解析配置并創(chuàng)建實際標簽。工具還會根據(jù)刷新設置安排動態(tài)標簽的重新計算。盡管需要領域?qū)<疫M行初始輸入,但實際的標記任務可完全自動化。我們推薦采用以下這種方法,這樣,不僅能在發(fā)布時對新創(chuàng)建的數(shù)據(jù)源進行標記,而且無需人工操作即可對標簽進行長期維護。
2.處理派生數(shù)據(jù)
除了標記數(shù)據(jù)源,能夠?qū)ε缮鷶?shù)據(jù)進行大規(guī)模標記同樣至關(guān)重要。我們將派生數(shù)據(jù)寬泛地定義為以從一個或多個數(shù)據(jù)源轉(zhuǎn)換的方式創(chuàng)建的任何數(shù)據(jù)段。這種類型的數(shù)據(jù)與數(shù)據(jù)湖和數(shù)據(jù)倉庫場景尤其具有相關(guān)性,在這類環(huán)境中,數(shù)據(jù)產(chǎn)品通常從各種數(shù)據(jù)源派生。
派生數(shù)據(jù)的標簽應當由原數(shù)據(jù)源和應用于數(shù)據(jù)的轉(zhuǎn)換類型組成。原數(shù)據(jù)源的URI被保持在標簽中,并且在標簽中還存儲一個或多個轉(zhuǎn)換類型——例如,聚合、匿名化、歸一化等。我們建議將標記創(chuàng)建邏輯融入生成派生數(shù)據(jù)的管道中。利用Airflow DAGs和Beam,這是可行的。例如,如果一個數(shù)據(jù)管道連接兩個數(shù)據(jù)源,聚合結(jié)果并將其存儲到表中,您可以參考兩個原數(shù)據(jù)源以及aggregation:true,基于結(jié)果表創(chuàng)建標簽。以下,您可以看到創(chuàng)建此標簽的Beam管道的代碼段:
with beam.Pipeline(options=pipeline_options) as p: sql = 'select covid_county, covid_state, sum_new_cases from views.v_covid_new_cases' bq_source = beam.io.BigQuerySource(query=sql, use_standard_sql=True) covid_query_results = p | 'Read from BigQuery' >> beam.io.Read(bq_source) subscription_name = 'projects/scohen-sandbox/subscriptions/employee-RTO' message = p | 'Read message' >> ReadFromPubSub(topic=None, subscription=subscription_name, timestamp_attribute=None) emp_pcoll = message | 'Get Age' >> beam.ParDo(GetAge()) joined_emp_pcoll = emp_pcoll | 'Join Data' >> beam.ParDo(Join(), beam.pvalue.AsList(covid_query_results)) batch_joined_pcoll = joined_emp_pcoll | 'Batch Join' >> BatchElements(min_batch_size=10, max_batch_size=20) masked_dob_pcoll = batch_joined_pcoll | 'Mask DOB' >> beam.ParDo(MaskDOB()) batch_masked_pcoll = masked_dob_pcoll| 'Batch Mask' >> BatchElements(min_batch_size=10, max_batch_size=20) bucket_age_pcoll = batch_masked_pcoll | 'Bucket Age' >> beam.ParDo(BucketAge()) batch_age_pcoll = bucket_age_pcoll | 'Batch Bucket Age' >> BatchElements(min_batch_size=4, max_batch_size=5) hash_id_pcoll = batch_age_pcoll | 'Hash Id' >> beam.ParDo(HashId()) hash_id_pcoll | 'Write Table' >> WriteToBigQuery(table, schema) # Tag Employee_RTO table with Derived Data template template = 'derived_template' dc_client = datacatalog_v1.DataCatalogClient() tag.template = dc_client.tag_template_path(project_id, region, template) tag = datacatalog_v1.types.Tag() table_resource = '//bigquery.googleapis.com/projects/' + project_id + '/datasets/' + dataset + '/tables/' + short_table_name table_entry = dc_client.lookup_entry(linked_resource=table_resource) tag.fields['parents'].string_value = 'pubsub/project/sandbox/subscriptions/employee-RTO,bigquery/project/sandbox/dataset/views/v_covid_new_cases' tag.fields['aggregated_data'].bool_value = False tag.fields['pseudo_anonymized_data'].bool_value = True tag.fields['anonymized_data'].bool_value = False tag.fields['origin_product'].enum_value.display_name = 'DATAFLOW' long_ts = datetime.now(tz.gettz("America/Chicago")).isoformat() ts = timestamp_value[0:19] + timestamp_value[26:32] tag.fields['date_data_processed'].timestamp_value.FromJsonString(ts) response = dc_client.create_tag(parent=table_entry.name, tag=tag)
帶標記邏輯的Beam管道
一旦利用其原數(shù)據(jù)源對派生數(shù)據(jù)進行標記,您可以使用此信息傳播附加到原數(shù)據(jù)源的靜態(tài)標簽。這是cascade屬性發(fā)揮作用的地方,指示哪些字段應當被傳播到其派生數(shù)據(jù)。在以上所示的第一個代碼段中顯示了cascade屬性的示例,其中data_domain和data_confidentiality字段被傳播,而data_retention字段未被傳播。這意味著,BigQuery中派生的任何表將使用dg_template利用data_domain:HR和data_confidentiality:CONFIDENTIAL進行標記。
3.處理更新
有幾種場景需要針對標簽和模板的更新能力。例如,如果業(yè)務分析師發(fā)現(xiàn)標簽中的一個錯誤,需要對一個或多個值進行更正。如果要采用新的數(shù)據(jù)使用策略,可能需要為模板添加新的字段并對現(xiàn)有字段重命名或者刪除。
我們?yōu)闃擞浐湍0甯绿峁┡渲?,如下圖所示。標記更新配置指定將變更的每個字段的當前值和新值。工具處理配置并基于規(guī)范更新標簽中的字段的值。如果更新的標簽是靜態(tài)的,工具還會將變更傳播至派生數(shù)據(jù)的相同標簽。
模板更新配置制定變更的字段名、字段類型以及任何枚舉值。工具通過首先確定變更的性質(zhì)來處理更新。撰寫本文時,Data Catalog支持對模板添加和刪除字段以及添加枚舉值,但尚不支持字段重命名或者類型變更。因此,如果需要簡單添加或者刪除,工具會對現(xiàn)有模板進行修改。否則,必須重新創(chuàng)建整個模板以及所有從屬標簽。
tag_config: template: - template_id: dg_template - project_id: sandbox - region: us-central1 fields: - {name: data_confidentiality, current: SENSITIVE, new: SHARED_INTERNALLY, cascade: true} - {name: data_retention, current: 30_DAYS, new: 60_DAYS, cascade: false} rules: - included_uri_patterns: bigquery/project/sandbox/dataset/covid/* - excluded_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_input_* - included_uri_patterns: pubsub/project/sandbox/subscriptions/employee-RTO
基于YAML的標簽更新配置
template_config: - template_id: dg_template - project_id: sandbox - region: us-central1 fields: - {name: data_confidentiality, type: enum, values: {SENSITIVE, SHARED_INTERNALLY, SHARED_EXTERNALLY, PUBLIC, UNKNOWN} - {name: data_retention, type: enum, values: {30_DAYS, 60_DAYS, 90_DAYS, 120_DAYS, 1_YEAR, 2_YEARS, 5_YEARS, UNKNOWN}
基于YAML的模板更新配置
我們已經(jīng)開始對這些方法進行原型創(chuàng)建,以發(fā)布一個開源工具,實現(xiàn)按照我們建議的使用模式在Data Catalog中創(chuàng)建和維護標簽所涉及的許多任務的自動化。