Skip to content

Commit

Permalink
Completing WhatIf and Samples (wjohnson#12)
Browse files Browse the repository at this point in the history
Wrapping up the WhatIfValidator class with a single method to run all the validations and provide an output.

Adding an additional sample file to create templates.

Cleaned up column_scaffolding tests to reflect the columnMapping feature.
  • Loading branch information
wjohnson authored Jul 30, 2020
1 parent 2fc8631 commit 0634e98
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 18 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ The package currently supports:

## Quickstart

### Build and Install from Source

```
python setup.py bdist_wheel
python -m pip install ./dist/pyapacheatlas-0.0.2-py3-none-any.whl
```

### Create a Client Connection

Provides connectivity to your Atlas / Data Catalog service. Supports getting and uploading entities and type defs.
Expand Down
61 changes: 60 additions & 1 deletion pyapacheatlas/core/whatif.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,35 @@

class WhatIfValidator():
"""
Provides a simple way to validate that your entities will successfully
upload. Provides functions to validate the type, check if required
attributes are missing, and check if superfluous attributes are inclded.
"""
ASSET_ATTRIBUTES = ["name", "description", "owner"]
REFERENCABLE_ATTRIBUTES = ["qualifiedName"]

ATLAS_MODEL = {
"ASSET": ["name", "description", "owner"],
"REFERENCABLE": ["qualifiedName"],
"PROCESS": ["inputs", "outputs"] + ASSET_ATTRIBUTES + REFERENCABLE_ATTRIBUTES,
"DATASET": ASSET_ATTRIBUTES + REFERENCABLE_ATTRIBUTES,
"INFRASTRUCTURE": ASSET_ATTRIBUTES + REFERENCABLE_ATTRIBUTES
}

def __init__(self, type_defs = {}, existing_entities = []):
"""
:param dict type_defs:
The list of type definitions to be validated against. Should be
in the form of an AtlasTypeDef composite wrapper.
:param list(dict) existing_entities:
The existing entities that should be validated against.
"""
if len(type_defs) == 0 and len(existing_entities) == 0:
warnings.warn("WARNING: Provided type_defs and existing_entities are empty. All validations will pass.")

self.classification_defs = type_defs.get("classificationDefs", [])
self.entity_defs = type_defs.get("entityDefs", [])
# Create a dict of all entities by name, then find the name of all attributes and whether they are optional
# entity_fields = {e["name"]: [{"name":attr["name"], "isOptional":attr.get("isOptional", True)} for attr in e] for e in self.entity_defs}
entity_fields = {e["name"]:[EntityField(attr.get("name"), attr.get("isOptional")) for attr in e.get("attributeDefs", {})] for e in self.entity_defs}
# Adding Qualified Name to the set of valid fields as it doesn't show up in the entity type def
self.entity_valid_fields = {k: set([field.name for field in v ]+["qualifiedName"]) for k,v in entity_fields.items()}
Expand Down Expand Up @@ -71,7 +90,14 @@ def entity_has_invalid_attributes(self, entity):
"""
current_attributes = set(entity.get("attributes", {}).keys())
valid_attributes = set(self.entity_valid_fields[entity["typeName"]])
# Append inherited attributes:
_entity_type = entity["typeName"]
# Assuming only one entity matches and only one super type
super_type = [e["superTypes"] for e in self.entity_defs if e["name"] == _entity_type][0][0].upper()
if super_type in self.ATLAS_MODEL:
valid_attributes = valid_attributes.union(self.ATLAS_MODEL[super_type])
invalid_attributes = current_attributes.difference(valid_attributes)

if len(invalid_attributes) > 0:
return True
else:
Expand All @@ -94,3 +120,36 @@ def entity_would_overwrite(self, entity):
return True
else:
return False


def validate_entities(self, entities):
"""
Provide a report of invalid entities. Includes TypeDoesNotExist,
UsingInvalidAttributes, and MissingRequiredAttributes.
:param list(dict) entities: A list of entities to validate.
:return: A dictionary containing counts values for the above values.
:rtype: dict
"""
report = {"TypeDoesNotExist":[], "UsingInvalidAttributes":[], "MissingRequiredAttributes":[]}

for entity in entities:
if not self.entity_type_exists(entity):
report["TypeDoesNotExist"].append(entity["guid"])
# If it's an invalid type, we have to skip over the rest of this
continue
if self.entity_has_invalid_attributes(entity):
report["UsingInvalidAttributes"].append(entity["guid"])

if self.entity_missing_attributes(entity):
report["MissingRequiredAttributes"].append(entity["guid"])

output = {
"counts":{k:len(v) for k,v in report.items()},
"values":report
}
output.update({"total":sum(output["counts"].values())})

return output


11 changes: 6 additions & 5 deletions pyapacheatlas/scaffolding/column_lineage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ..core.typedef import *

def column_lineage_scaffold(datasource,
useColumnMapping = False,
use_column_mapping = False,
column_attributes = None,
table_attributes = None,
table_column_relationship_attributes = None,
Expand All @@ -16,7 +16,7 @@ def column_lineage_scaffold(datasource,
:param str datasource: The name of the data source. Acts as a prefix
for all other type defs.
:param bool useColumnMapping: If True, add the columnMapping attribute
:param bool use_column_mapping: If True, add the columnMapping attribute
to the table process.
:param list(dict), optional column_attributes:
Attribute Defs to add to the column entity type.
Expand Down Expand Up @@ -74,9 +74,9 @@ def column_lineage_scaffold(datasource,
# Define {datasource}_column_lineage
column_lineage_process_entity = EntityTypeDef(
name="{}_column_lineage".format(datasource),
attributeDefs=column_lineage_process_attributes,
superTypes=["Process"],
attributes=[
attributeDefs=((column_lineage_process_attributes or []) +
[
{
"name": "dependencyType",
"typeName": "string",
Expand All @@ -100,6 +100,7 @@ def column_lineage_scaffold(datasource,
"includeInNotification": False
}
]
)
)

# Define {datasource}_process
Expand All @@ -108,7 +109,7 @@ def column_lineage_scaffold(datasource,
superTypes=["Process"],
attributeDefs=table_process_attributes
)
if useColumnMapping:
if use_column_mapping:
table_process_entity.attributeDefs.append(
{
"name": "columnMapping",
Expand Down
19 changes: 19 additions & 0 deletions samples/create_templates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import json
import os

# PyApacheAtlas packages
from pyapacheatlas.scaffolding import column_lineage_scaffold # Create dummy types
from pyapacheatlas.scaffolding.templates import excel_template # Create the excel template file to be populated

if __name__ == "__main__":
# Create the demo scaffolding
print("Creating the scaffolding json file")
scaffold = column_lineage_scaffold("demo")
with open("./demo_scaffold.json", 'w') as fp:
fp.write(
json.dumps(scaffold, indent=1)
)

# Create the excel template file
print("Creating the excel template file")
excel_template("./demo_excel_template.xlsx")
30 changes: 22 additions & 8 deletions samples/sample_excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
from pyapacheatlas.scaffolding.templates import excel_template # Create the excel template file to be populated
from pyapacheatlas.readers import from_excel # Read in the populated excel file.
from pyapacheatlas.readers.excel import ExcelConfiguration # Customize header prefixes (e.g. "Sink" rather than "Target") and sheet names
from pyapacheatlas.core.whatif import WhatIfValidator # To do what if analysis

if __name__ == "__main__":

# Authenticate against your Atlas server
oauth = ServicePrincipalAuthentication(
tenant_id = os.environ.get("TENANT_ID"),
client_id = os.environ.get("CLIENT_ID"),
client_secret = os.environ.get("CLIENT_SECRET")
tenant_id = os.environ.get("TENANT_ID", ""),
client_id = os.environ.get("CLIENT_ID", ""),
client_secret = os.environ.get("CLIENT_SECRET", "")
)
atlas_client = AtlasClient(
endpoint_url = os.environ.get("ENDPOINT_URL"),
endpoint_url = os.environ.get("ENDPOINT_URL", ""),
authentication = oauth
)

Expand Down Expand Up @@ -80,18 +81,31 @@
wb.save(file_path)

# Generate the base atlas type defs for the demo of table and column lineage
atlas_type_defs = column_lineage_scaffold("demo", useColumnMapping=True)
atlas_type_defs = column_lineage_scaffold("demo", use_column_mapping=True)
# Alternatively, you can get all atlas types via...
# all_type_defs = client.get_all_typedefs()
# atlas_type_defs = client.get_all_typedefs()

# Upload scaffolded type defs and view the results of upload
_upload_typedef = client.upload_typedefs(atlas_type_defs)
print(json.dumps(_upload_typedef,indent=2))
# _upload_typedef = client.upload_typedefs(atlas_type_defs)
# print(json.dumps(_upload_typedef,indent=2))

# Instantiate some required objects and generate the atlas entities!
excel_config = ExcelConfiguration()
excel_results = from_excel(file_path, excel_config, atlas_type_defs, use_column_mapping=True)

print(excel_results)

# Validate What IF
whatif = WhatIfValidator(type_defs=atlas_type_defs)

report = whatif.validate_entities(excel_results)

if sum([len(v) for v in report.items()]) > 0:
print("There were errors in the provided typedefs")
print(json.dumps(report))
exit(1)


# Upload excel file's content to Atlas and view the guid assignments to confirm successful upload
_upload_entities = client.upload_entities(excel_results)
print(json.dumps(_upload_entities,indent=2))
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="pyapacheatlas",
version="0.0.1",
version="0.0.2",
author="Will Johnson",
author_email="[email protected]",
description="A package to simplify working with the Apache Atlas REST APIs and support bulk loading from files.",
Expand Down
27 changes: 25 additions & 2 deletions tests/scaffolding/test_scaffolding_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


def test_column_lineage_scaffolding():
scaffolding = column_lineage_scaffold("demo", useColumnMapping=True)
scaffolding = column_lineage_scaffold("demo", use_column_mapping=True)

results = scaffolding

Expand All @@ -30,7 +30,30 @@ def test_column_lineage_scaffolding():
{
"category": "ENTITY",
"name": "demo_column_lineage",
"attributeDefs": [],
"attributeDefs": [
{
"name": "dependencyType",
"typeName": "string",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": 1,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false
},
{
"name": "expression",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": 0,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false
}
],
"relationshipAttributeDefs": [],
"superTypes": [
"Process"
Expand Down
37 changes: 36 additions & 1 deletion tests/test_whatif.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_using_invalid_attributes():
'attributeDefs': [
{"name":"req_attrib","isOptional":False},
{"name":"name","isOptional":False},
{"qualifiedName":"name","isOptional":False},
{"name":"qualifiedName","isOptional":False},
],
'relationshipAttributeDefs': [], 'superTypes': ['DataSet']}]}

Expand All @@ -67,3 +67,38 @@ def test_would_it_overwrite():
results = local_what_if.entity_would_overwrite(new_entity)

assert(results)


def test_whatif_validation():

expected = {
"counts":{"TypeDoesNotExist":1, "UsingInvalidAttributes":1, "MissingRequiredAttributes":1},
"total":3,
"values":{"TypeDoesNotExist":[-101], "UsingInvalidAttributes":[-100], "MissingRequiredAttributes":[-98]}
}

entities = [
# Valid attribute
AtlasEntity("dummy1", "demo_table", "dummy1", -99, attributes = {"req_attrib":"1"}).to_json(),
# Missing attribute
AtlasEntity("dummy10", "demo_table", "dummy10", -98, attributes = {}).to_json(),
# Non-Required attribute
AtlasEntity("dummy20", "demo_table", "dummy20", -100, attributes = {"foo":"bar", "req_attrib":"abc"}).to_json(),
# Bad Type
AtlasEntity("dummy30", "bad_table", "dummy30", -101, attributes = {"foo":"bar"}).to_json()
]

demo_table_type = {"entityDefs":[{'category': 'ENTITY', 'name': 'demo_table',
'attributeDefs': [
{"name":"req_attrib","isOptional":False},
{"name":"name","isOptional":False},
{"name":"qualifiedName","isOptional":False},
],
'relationshipAttributeDefs': [], 'superTypes': ['DataSet']}]}

local_what_if = WhatIfValidator(demo_table_type)

results = local_what_if.validate_entities(entities)

assert(set(local_what_if.entity_required_fields["demo_table"]) == set(["req_attrib","name", "qualifiedName"]))
assert(results == expected)

0 comments on commit 0634e98

Please sign in to comment.