Skip to content

Commit

Permalink
Supporting Purivew Column Mapping in Excel
Browse files Browse the repository at this point in the history
Refactored the column lineages tab / parser to be called a FineGrainColumnLineage to avoid confusion (breaking change).
In addition, updated the README.
Fixed up a few unnecessary imports in the other excel samples.
Added a sample on the column mapping with update lineage tab.
Closes wjohnson#123
  • Loading branch information
wjohnson committed Jul 23, 2021
1 parent de879a4 commit 17a9e88
Show file tree
Hide file tree
Showing 13 changed files with 303 additions and 65 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The Excel template provides a means to:
* Supports adding glossary terms to entities.
* Supports adding classifications to entities.
* Supports creating relationships between entities (e.g. columns of a table).
* Creating custom lineage between two existing entities.
* Creating custom lineage between two existing entities and using the Azure Purview Column Mappings / Lineage feature.
* Bulk upload of type definitions.
* Bulk upload of classification definitions (Purview Classification rules are not currently supported).
* Creating custom table and complex column level lineage in the [Hive Bridge style](https://atlas.apache.org/0.8.3/Bridge-Hive.html).
Expand Down
56 changes: 46 additions & 10 deletions pyapacheatlas/readers/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ExcelConfiguration(ReaderConfiguration):
represents the transformation for a specific column.
"""

def __init__(self, column_sheet="ColumnsLineage",
def __init__(self, column_sheet="FineGrainColumnLineage",
table_sheet="TablesLineage",
entityDef_sheet="EntityDefs", bulkEntity_sheet="BulkEntities",
classificationDef_sheet="ClassificationDefs",
Expand Down Expand Up @@ -156,7 +156,7 @@ def parse_entity_defs(self, filepath):
# TODO: Add in classificationDefs and relationshipDefs
return output

def parse_column_lineage(self, filepath, atlas_entities, atlas_typedefs, use_column_mapping=False):
def parse_finegrain_column_lineage(self, filepath, atlas_entities, atlas_typedefs, use_column_mapping=False):
"""
Read a given excel file that conforms to the excel atlas template and
parse the columns into column lineages.
Expand Down Expand Up @@ -198,7 +198,7 @@ def parse_column_lineage(self, filepath, atlas_entities, atlas_typedefs, use_col
column_sheet = wb[self.config.column_sheet]
json_columns = ExcelReader._parse_spreadsheet(column_sheet)

entities = super().parse_column_lineage(
entities = super().parse_finegrain_column_lineage(
json_columns,
atlas_entities,
atlas_typedefs,
Expand Down Expand Up @@ -241,7 +241,7 @@ def parse_table_lineage(self, filepath):

return entities

def parse_lineages(self, filepath, atlas_typedefs, use_column_mapping=False):
def parse_table_finegrain_column_lineages(self, filepath, atlas_typedefs, use_column_mapping=False):
"""
Read a given excel file that conforms to the excel atlas template and
parse the tables, processes, and columns into table and column
Expand Down Expand Up @@ -276,7 +276,7 @@ def parse_lineages(self, filepath, atlas_typedefs, use_column_mapping=False):
entities.extend(table_entities)

# Modifies table_entities if use_column_mapping is True
column_entities = self.parse_column_lineage(
column_entities = self.parse_finegrain_column_lineage(
filepath,
table_entities,
atlas_typedefs,
Expand Down Expand Up @@ -324,13 +324,15 @@ def parse_update_lineage(self, filepath):
wb.close()

return entities

def parse_column_mapping(self, filepath):
"""
Read a given excel file that conforms to the excel atlas template and
parse the (default) ColumnMapping tab into existing process entities.
Assumes these process entities and any referenced entity exists.
This will not update the inputs and outputs, it will update name
and columnMapping fields.
:param str filepath:
The xlsx file that contains your table and columns.
Expand All @@ -356,6 +358,38 @@ def parse_column_mapping(self, filepath):

return entities

def parse_update_lineage_with_mappings(self, filepath):
"""
Read a given excel file that conforms to the excel atlas template and
parse the (default) UpdateLineage and ColumnMapping tabs into existing process entities.
Assumes these process entities and any referenced entity exists.
:param str filepath:
The xlsx file that contains your table and columns.
:return:
A list of Atlas Process entities representing the spreadsheet's
contents.
:rtype: list(dict)
"""

lineage = self.parse_update_lineage(filepath)
mappings = self.parse_column_mapping(filepath)
seen_qualifiedNames = {}
for working_entity in lineage + mappings:
qn = working_entity["attributes"]["qualifiedName"]
if qn in seen_qualifiedNames:
# If we have seen an entity before check if
# the working entity contains a column mapping attribute
# if it does update the existing entity
if "columnMapping" in working_entity["attributes"]:
seen_qualifiedNames[qn]["attributes"]["columnMapping"] = working_entity["attributes"]["columnMapping"]
else:
# If we haven't seen it just add the entity to the list
seen_qualifiedNames[qn] = working_entity

return list(seen_qualifiedNames.values())

def parse_classification_defs(self, filepath):
"""
Read a given excel file that conforms to the excel atlas template and
Expand All @@ -380,8 +414,10 @@ def parse_classification_defs(self, filepath):
# Getting classificationDef if the user provided a name of the sheet
if self.config.classificationDef_sheet:
classificationDef_sheet = wb[self.config.classificationDef_sheet]
json_classificationdefs = ExcelReader._parse_spreadsheet(classificationDef_sheet)
classificationDefs_generated = super().parse_classification_defs(json_classificationdefs)
json_classificationdefs = ExcelReader._parse_spreadsheet(
classificationDef_sheet)
classificationDefs_generated = super(
).parse_classification_defs(json_classificationdefs)
output.update(classificationDefs_generated)

wb.close()
Expand Down Expand Up @@ -425,10 +461,10 @@ def make_template(filepath):
entityDefsSheet = wb.create_sheet("EntityDefs")
classificationDefsSheet = wb.create_sheet("ClassificationDefs")
tablesSheet = wb.create_sheet("TablesLineage")
columnsSheet = wb.create_sheet("ColumnsLineage")
columnsSheet = wb.create_sheet("FineGrainColumnLineage")

ExcelReader._update_sheet_headers(
Reader.TEMPLATE_HEADERS["ColumnsLineage"], columnsSheet
Reader.TEMPLATE_HEADERS["FineGrainColumnLineage"], columnsSheet
)
ExcelReader._update_sheet_headers(
Reader.TEMPLATE_HEADERS["TablesLineage"], tablesSheet
Expand Down
49 changes: 27 additions & 22 deletions pyapacheatlas/readers/lineagemixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def _insert_column_entity(self, prefix, row, column_type, headers, column_entiti

return _qual_name

def parse_column_lineage(self, json_rows, atlas_entities, atlas_typedefs, use_column_mapping=False):
def parse_finegrain_column_lineage(self, json_rows, atlas_entities, atlas_typedefs, use_column_mapping=False):
"""
:param json_rows:
A list of dicts that contain the converted rows of your column
Expand Down Expand Up @@ -518,32 +518,36 @@ def parse_update_lineage(self, json_rows):
temp_out = temp_proc.outputs
# If we have an input, check if it already exists
if inputs and len(inputs) > 0:
input_qn = self._header_qn(inputs[0])
input_qn = self._header_qn(inputs[0])
if input_qn not in [self._header_qn(x) for x in temp_in]:
temp_in.extend(inputs)
temp_proc.inputs = temp_in

else:
warnings.warn(f"Input '{input_qn}' is repeated in Process '{process_qual_name}'. Only the earliest entry is kept.")
warnings.warn(
f"Input '{input_qn}' is repeated in Process '{process_qual_name}'. Only the earliest entry is kept.")
elif isinstance(inputs, list):
# We have an empty list, as the input, meaning destroy the input
if len(temp_in) > 0:
warnings.warn(f"Process '{process_qual_name}' has conflicting inputs and N/A values and will possibly be overwritten.")
warnings.warn(
f"Process '{process_qual_name}' has conflicting inputs and N/A values and will possibly be overwritten.")
temp_proc.inputs(inputs)

if outputs:
output_qn = self._header_qn(outputs[0])
output_qn = self._header_qn(outputs[0])
if output_qn not in [self._header_qn(x) for x in temp_out]:
temp_out.extend(outputs)
temp_proc.outputs = temp_out
else:
warnings.warn(f"Output '{output_qn}' is repeated in Process '{process_qual_name}'. Only the earliest entry is kept.")
warnings.warn(
f"Output '{output_qn}' is repeated in Process '{process_qual_name}'. Only the earliest entry is kept.")
elif isinstance(outputs, list):
# We have an empty list, as the output, meaning destroy the output
if len(temp_out) > 0:
warnings.warn(f"Process '{process_qual_name}' has conflicting outputs and N/A values and will possibly be overwritten.")
warnings.warn(
f"Process '{process_qual_name}' has conflicting outputs and N/A values and will possibly be overwritten.")
temp_proc.outputs = outputs

else:
proc = AtlasProcess(
name=process_name,
Expand All @@ -554,7 +558,7 @@ def parse_update_lineage(self, json_rows):
outputs=outputs
)
processes_seen[process_qual_name] = proc

results = [v.to_json() for v in processes_seen.values()]
return results

Expand Down Expand Up @@ -593,37 +597,38 @@ def parse_column_mapping(self, json_rows):
except KeyError:
raise Exception(
"This row does not contain all of the required fields (" +
', '.join([f"{tp} column", f"{tp} qualifiedName", f"{sp} column", f"{sp} column", f"{pp} name", f"{tp} typeName", f"{pp} qualifiedName"]) +'): ' +
', '.join([f"{tp} column", f"{tp} qualifiedName", f"{sp} column", f"{sp} column", f"{pp} name", f"{tp} typeName", f"{pp} qualifiedName"]) + '): ' +
json.dumps(row)
)

dataset_key = f"{source_qual_name}|{target_qual_name}"
column_mapping = {"Source":source_col, "Sink":target_col}
column_mapping = {"Source": source_col, "Sink": target_col}
if process_qual_name in processes_seen:
# Updating the entity
working_process = processes_seen[process_qual_name]
if dataset_key in working_process["mappings"]:
working_process["mappings"][dataset_key].append(column_mapping)
working_process["mappings"][dataset_key].append(
column_mapping)
else:
working_process["mappings"][dataset_key] = [column_mapping]
else:
# Creating a new one
processes_seen[process_qual_name] = {
"mappings":{dataset_key:[column_mapping]},
"mappings": {dataset_key: [column_mapping]},
"processType": process_type,
"processName": process_name
}

for proc_qn, proc in processes_seen.items():
columnMapping = []
for dataset in proc["mappings"]:
# Split apart the pipe delimited data
src_data, sink_data = dataset.split("|",1)
src_data, sink_data = dataset.split("|", 1)
# Create the column mapping data structure
columnMapping.append(
{
"DatasetMapping":{"Source":src_data, "Sink":sink_data},
"ColumnMapping":proc["mappings"][dataset]
"DatasetMapping": {"Source": src_data, "Sink": sink_data},
"ColumnMapping": proc["mappings"][dataset]
}
)

Expand All @@ -632,10 +637,10 @@ def parse_column_mapping(self, json_rows):
typeName=proc["processType"],
qualified_name=proc_qn,
guid=self.guidTracker.get_guid(),
inputs = None,
outputs = None,
attributes = {"columnMapping": json.dumps(columnMapping)}
inputs=None,
outputs=None,
attributes={"columnMapping": json.dumps(columnMapping)}
)
results.append(proc_entity.to_json())

return {"entities":results}
return results
2 changes: 1 addition & 1 deletion pyapacheatlas/readers/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Reader(LineageMixIn):
The base Reader with functionality that supports python dicts.
"""
TEMPLATE_HEADERS = {
"ColumnsLineage": [
"FineGrainColumnLineage": [
"Target table", "Target column", "Target classifications",
"Source table", "Source column", "Source classifications",
"transformation"
Expand Down
59 changes: 49 additions & 10 deletions samples/excel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,66 @@ There are four key features of the PyApacheAtlas package with respect to the Exc
from pyapacheatlas.readers import ExcelReader

ExcelReader.make_template("./my_file_path.xlsx")

```
* The below samples all create their own template file and fill it with demo data.
* Look for the "ACTUAL WORK" comments to see what code you'd run if you filled in the spreadsheet yourself.
* Each tab has its own `parse_*` function in the `ExcelReader` class.

```python
from pyapacheatlas.readers import ExcelReader, ExcelConfiguration

ec = ExcelConfiguration() # Supports customization of tab names and header prefixes
reader = ExcelReader(ec)

file_path = 'some.xlsx'
# Create entities (most common thing you'll do)
reader.parse_parse_bulk_entities(file_path)
# Connect entities / assets with custom lineage
reader.parse_update_lineage(file_path)
# For Azure Purview users, pdate existing process entities with column mappings
reader.parse_column_mapping(file_path)
# For Azure Purview users, create custom lineage with column mappings at the same time
reader.parse_update_lineage_with_mappings(file_path)

# Create entity type definitions
reader.parse_entity_defs(file_path)
# Create classification type definitions
reader.parse_classification_defs(file_path)

# Create table level lineage
reader.parse_table_lineage(file_path)
# Create advanced column level lineage that supports fine grain attributes on the transformation
# Requires using the column_lineage_scaffold() types
reader.parse_finegrain_column_lineage(file_path)
# Parse table and finegrain column lineage all together
reader.parse_table_finegrain_column_lineages(file_path)
```

* **Bulk upload entities**
* [Bulk Entities Excel Sample](./excel_bulk_entities_upload.py)
* You want to dump entity information into excel and upload.
* You want to provide some simple relationship mapping (e.g. the columns of a table).
* Your entities may exist and you want them updated or they do not exist and you want them created.
* **Create Lineage Between Two Existing Entities**
* [Update / Create Lineage Between Existing Entities](./excel_update_lineage_upload.py)
* You have two existing entities (an input and output) but there is no lineage between them.
* You want to create a "process entity" that represents the process that ties the two tables together.
* **Create Lineage Between Two Existing Entities with Column Mappings**
* [Update / Create Lineage Between Existing Entities with Column Mappings](./excel_update_lineage_with_mappings_upload.py)
* You have two existing entities (an input and output) but there is no lineage between them.
* You want to create a "process entity" that represents the process that ties the two tables together.
* In addition, you want to use the Azure Purview Column Mapping / Column Lineage UI feature.
* You'll do this across the `UpdateLineage` and `ColumnMapping` tabs.
* **Creating Custom DataSet Types**
* [Custom Type Excel Sample](./excel_custom_type_and_entity_upload.py)
* You have a custom dataset type you want to create with many attributes.
* You want to upload an entity using that custom type as well.
* **Hive Bridge Style Table and Column Lineage**
* [Custom Table and Column Lineage Excel Sample](./excel_custom_table_column_lineage.py)
* You are willing to use a custom type to capture more data about lineage.
* You are interested in capturing more complex column level lineage.
* None of the entities you want to upload exist in your catalog.
* **Creating Custom DataSet Types**
* [Custom Type Excel Sample](./excel_custom_type_and_entity_upload.py)
* You have a custom dataset type you want to create with many attributes.
* You want to upload an entity using that custom type as well.
* **Create Lineage Between Two Existing Entities**
* [Update / Create Lineage Between Existing Entities](./excel_update_lineage_upload.py)
* You have two existing entities (an input and output) but there is no lineage between them.
* You want to create a "process entity" that represents the process that ties the two tables together.

Each sample linked above is stand alone and will create an excel spreadsheet with all of the data to be uploaded. It will then parse that spreadsheet and then upload to your data catalog.

Expand All @@ -42,7 +81,7 @@ I would strongly encourage you to run this in a dev / sandbox environment since
* You'll need a Service Principal (for Azure Data Catalog) with the Catalog Admin role.
* You'll need to set the following environment variables.

```
```bash
set TENANT_ID=YOUR_TENANT_ID
set CLIENT_ID=YOUR_SERVICE_PRINCIPAL_CLIENT_ID
set CLIENT_SECRET=YOUR_SERVICE_PRINCIPAL_CLIENT_SECRET
Expand All @@ -55,7 +94,7 @@ set PURVIEW_NAME=YOUR_PURVIEW_ACCOUNT_SERVICE_NAME
* If you're following along with the built-in demos, search for 'pyapacheatlas' to find the majority of the entities.
* To find the guid, select the asset from your search and grab the guid from the URL.

```
```python
# Delete an Entity
client.delete_entity(guid=["myguid1","myguid2"])

Expand Down
1 change: 0 additions & 1 deletion samples/excel/excel_bulk_entities_upload.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import os

import openpyxl
from openpyxl import Workbook
from openpyxl import load_workbook

Expand Down
2 changes: 1 addition & 1 deletion samples/excel/excel_custom_table_column_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def fill_in_workbook(filepath, excel_config):

# Generate the atlas entities!

excel_results = excel_reader.parse_lineages(
excel_results = excel_reader.parse_table_finegrain_column_lineages(
file_path,
atlas_type_defs,
use_column_mapping=True
Expand Down
1 change: 0 additions & 1 deletion samples/excel/excel_custom_type_and_entity_upload.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import os

import openpyxl
from openpyxl import Workbook
from openpyxl import load_workbook

Expand Down
1 change: 0 additions & 1 deletion samples/excel/excel_update_lineage_upload.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import os

import openpyxl
from openpyxl import Workbook
from openpyxl import load_workbook

Expand Down
Loading

0 comments on commit 17a9e88

Please sign in to comment.