Skip to content

Commit

Permalink
Adding UpdateLineage feature (wjohnson#47)
Browse files Browse the repository at this point in the history
Enabling the ability to create or update an existing process entity with partial or full changes to the inputs and outputs. Adds a new excel tab to the template and the parse_update_lineage method to the readers.
  • Loading branch information
wjohnson authored Oct 11, 2020
1 parent b8111f9 commit 8a0c279
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Create a wheel distribution file and install it in your environment.
```
python -m pip install wheel
python setup.py bdist_wheel
python -m pip install ./dist/pyapacheatlas-0.0b15-py3-none-any.whl
python -m pip install ./dist/pyapacheatlas-0.0b16-py3-none-any.whl
```

### Create a Client Connection
Expand Down
2 changes: 1 addition & 1 deletion pyapacheatlas/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0b15"
__version__ = "0.0b16"
43 changes: 43 additions & 0 deletions pyapacheatlas/readers/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ExcelConfiguration(ReaderConfiguration):
def __init__(self, column_sheet="ColumnsLineage",
table_sheet="TablesLineage",
entityDef_sheet="EntityDefs", bulkEntity_sheet="BulkEntities",
updateLineage_sheet="UpdateLineage",
**kwargs):
"""
The following parameters apply to the
Expand Down Expand Up @@ -52,6 +53,7 @@ def __init__(self, column_sheet="ColumnsLineage",
self.table_sheet = table_sheet
self.entityDef_sheet = entityDef_sheet
self.bulkEntity_sheet = bulkEntity_sheet
self.updateLineage_sheet = updateLineage_sheet


class ExcelReader(Reader):
Expand Down Expand Up @@ -285,6 +287,43 @@ def parse_lineages(self, filepath, atlas_typedefs, use_column_mapping=False):

return output

def parse_update_lineage(self, filepath):
"""
Read a given excel file that conforms to the excel atlas template and
parse the (default) UpdateLineage tabe into existing process entities.
Assumes these process entities and any referenced entity exists.
Leave the qualifiedName cell blank on source or target to leave the
existing input or output (respectively) unchanged.
Use 'N/A' in the qualifiedName on source or target to 'destroy' the
existing input or output and overwrite with an empty list.
:param str filepath:
The xlsx file that contains your table and columns.
:return:
A list of Atlas Process entities representing the spreadsheet's
contents.
:rtype: list(dict)
"""
wb = load_workbook(filepath)

entities = []

if self.config.updateLineage_sheet not in wb.sheetnames:
raise KeyError("The sheet {} was not found".format(
self.config.updateLineage_sheet))

# Getting table entities
updateLineage_sheet = wb[self.config.updateLineage_sheet]
json_sheet = ExcelReader._parse_spreadsheet(updateLineage_sheet)
entities = super().parse_update_lineage(json_sheet)

wb.close()

return entities

@staticmethod
def _update_sheet_headers(headers, worksheet):
"""
Expand Down Expand Up @@ -321,6 +360,7 @@ def make_template(filepath):
tablesSheet = wb.create_sheet("TablesLineage")
entityDefsSheet = wb.create_sheet("EntityDefs")
bulkEntitiesSheet = wb.create_sheet("BulkEntities")
updateLineageSheet = wb.create_sheet("UpdateLineage")

ExcelReader._update_sheet_headers(
Reader.TEMPLATE_HEADERS["ColumnsLineage"], columnsSheet
Expand All @@ -334,6 +374,9 @@ def make_template(filepath):
ExcelReader._update_sheet_headers(
Reader.TEMPLATE_HEADERS["BulkEntities"], bulkEntitiesSheet
)
ExcelReader._update_sheet_headers(
Reader.TEMPLATE_HEADERS["UpdateLineage"], updateLineageSheet
)

wb.save(filepath)
wb.close()
70 changes: 70 additions & 0 deletions pyapacheatlas/readers/lineagemixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,73 @@ def parse_column_lineage(self, json_rows, atlas_entities, atlas_typedefs, use_co
)
output = [e for e in list(columnEntitiesOutput.values())]
return output

def _determine_dataset_to_use(self, qual_name, typeName):
results = []
if qual_name is None:
# This will allow us to do a partial update.
results = None
elif qual_name == "N/A":
# N/A is a special keyword that forces us to override
# and delete the existing input/output
results = []
else:
# This is an AtlasObjectId, necessary if we don't have the
# guid of the existing object or have it as a referenced object
results = [{"typeName": typeName,
"uniqueAttributes": {"qualifiedName": qual_name}}]
return results

def parse_update_lineage(self, json_rows):
"""
Take in UpdateLineage dictionaries and create the mutated Process
entities to be uploaded. All referenced entities must already exist
and be identified by their type and qualifiedName.
Assumes a None entry for target or source qualifiedNames means
"no change" to the existing entity. Using 'N/A' for the target or
source qualifiedNames will reset the existing input or output to an
empty list.
:param json_rows:
A list of dicts that contain the converted rows of your update
lineage spreadsheet.
:type json_rows: list(dict(str,str))
:return:
A list of Atlas Processes as dictionaries representing the updated
process entity.
:rtype: list(dict)
"""
results = []
sp = self.config.source_prefix
tp = self.config.target_prefix
pp = self.config.process_prefix
for row in json_rows:
try:
target_type = row[f"{tp} typeName"]
target_qual_name = row[f"{tp} qualifiedName"]
source_type = row[f"{sp} typeName"]
source_qual_name = row[f"{sp} qualifiedName"]
process_type = row[f"{pp} typeName"]
process_qual_name = row[f"{pp} qualifiedName"]
process_name = row[f"{pp} name"]
except KeyError:
raise Exception(json.dumps(row))

# Determine whether this should destroy one side, partial update
# (one side), or full update (both sides).
inputs = self._determine_dataset_to_use(
source_qual_name, source_type)
outputs = self._determine_dataset_to_use(
target_qual_name, target_type)
# Convert the target / source into
proc = AtlasProcess(
name=process_name,
typeName=process_type,
qualified_name=process_qual_name,
guid=self.guidTracker.get_guid(),
inputs=inputs,
outputs=outputs
)
results.append(proc.to_json())
return results
5 changes: 5 additions & 0 deletions pyapacheatlas/readers/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ class Reader(LineageMixIn):
],
"BulkEntities": [
"typeName", "name", "qualifiedName", "classifications"
],
"UpdateLineage": [
"Target typeName", "Target qualifiedName", "Source typeName",
"Source qualifiedName", "Process name", "Process qualifiedName",
"Process typeName"
]
}

Expand Down
28 changes: 27 additions & 1 deletion tests/readers/test_excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def test_verify_template_sheets():

# Expected
expected_sheets = set(["ColumnsLineage", "TablesLineage",
"EntityDefs", "BulkEntities"
"EntityDefs", "BulkEntities",
"UpdateLineage"
])

wb = load_workbook(temp_path)
Expand Down Expand Up @@ -319,3 +320,28 @@ def test_excel_column_lineage():
assert(resulting_col_map["ColumnMapping"][2] in expected_col_map["ColumnMapping"])
finally:
remove_workbook(temp_filepath)

def test_excel_update_lineage():
temp_filepath = "./temp_test_excel_updateLineage.xlsx"
ec = ExcelConfiguration()
reader = ExcelReader(ec)

headers = ExcelReader.TEMPLATE_HEADERS["UpdateLineage"]

# Same as main test
json_rows = [
[
"demo_table", "demotarget", "demo_table2", "demosource",
"proc01", "procqual01", "Process2"
]
]

setup_workbook_custom_sheet(
temp_filepath, "UpdateLineage", headers, json_rows)

results = reader.parse_update_lineage(temp_filepath)

try:
assert(len(results) == 1)
finally:
remove_workbook(temp_filepath)
60 changes: 60 additions & 0 deletions tests/readers/test_table_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,63 @@ def test_column_lineage_entities_when_multi_tabled_inputs():
table_process_entity.attributes["columnMapping"])
assert(len(expected_col_map_obj) == len(resulting_colmap))
assert(all([res in expected_col_map_obj for res in resulting_colmap]))


def test_parse_update_lineage():
reader = Reader(READER_CONFIG)
json_rows = [
{"Target typeName": "demo_table", "Target qualifiedName": "demotarget",
"Source typeName": "demo_table2", "Source qualifiedName": "demosource",
"Process name": "proc01", "Process qualifiedName": "procqual01",
"Process typeName": "Process2"
},
{"Target typeName": "demo_table", "Target qualifiedName": "demotarget02",
"Source typeName": None, "Source qualifiedName": None,
"Process name": "proc02", "Process qualifiedName": "procqual02",
"Process typeName": "Process3"
},
{"Target typeName": None, "Target qualifiedName": None,
"Source typeName": "demo_table2", "Source qualifiedName": "demosource03",
"Process name": "proc03", "Process qualifiedName": "procqual03",
"Process typeName": "Process4"
},
{"Target typeName": "N/A", "Target qualifiedName": "N/A",
"Source typeName": "demo_table2", "Source qualifiedName": "demosource03",
"Process name": "proc03", "Process qualifiedName": "procqual03",
"Process typeName": "Process5"
}
]


results = reader.parse_update_lineage(json_rows)

assert(len(results) == 4)
full_update = results[0]
target_update = results[1]
source_update = results[2]
target_destroy = results[3]

assert(full_update["typeName"] == "Process2")
assert(full_update["attributes"]["name"] == "proc01")
assert(len(full_update["attributes"]["inputs"]) == 1)
assert(len(full_update["attributes"]["outputs"]) == 1)

fullupd_input = full_update["attributes"]["inputs"][0]
fullupd_output = full_update["attributes"]["outputs"][0]

assert(fullupd_input == {"typeName": "demo_table2",
"uniqueAttributes": {"qualifiedName": "demosource"}})
assert(fullupd_output == {"typeName": "demo_table",
"uniqueAttributes": {"qualifiedName": "demotarget"}})

# For a partial update, inputs will be set to None
assert(target_update["attributes"]["inputs"] == None)

# For a partial update, outputs will be set to None
assert(source_update["attributes"]["outputs"] == None)

# If they use the "N/A" keyword in qualifiedName, destroy that type
assert(target_destroy["attributes"]["outputs"] == [])
assert(target_destroy["attributes"]["inputs"] == [
{"typeName": "demo_table2",
"uniqueAttributes": {"qualifiedName": "demosource03"}}])

0 comments on commit 8a0c279

Please sign in to comment.