Skip to content

Commit

Permalink
Preparing for release 0.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
wjohnson committed Dec 7, 2020
2 parents d05ab8f + 0f17280 commit 8b41812
Show file tree
Hide file tree
Showing 19 changed files with 1,144 additions and 138 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ jobs:
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 ./pyapacheatlas --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
flake8 ./pyapacheatlas --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
64 changes: 30 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,27 @@
A python package to work with the Apache Atlas API and support bulk loading, custom lineage, and more from a Pythonic set of classes and Excel templates.

The package currently supports:
* Creating a column lineage scaffolding as in the [Hive Bridge style](https://atlas.apache.org/0.8.3/Bridge-Hive.html).
* Creating and reading from an excel template file
* From Excel, constructing the defined entities and column lineages.
* Table entities
* Column entities
* Table lineage processes
* Column lineage processes
* From excel, bulk uploading entities, creating / updating lineage, and creating custom types.
* Supports Azure Purview ColumnMapping attributes.
* Bulk upload of entities.
* Bulk upload of type definitions.
* Creating custom lineage between two existing entities.
* Creating custom table and complex column level lineage in the [Hive Bridge style](https://atlas.apache.org/0.8.3/Bridge-Hive.html).
* Supports Azure Purview ColumnMapping Attributes.
* Creating a column lineage scaffolding as in the Hive Bridge Style .
* Performing "What-If" analysis to check if...
* Your entities are valid types.
* Your entities are missing required attributes.
* Your entities are using undefined attributes.
* Working with the glossary.
* Uploading terms.
* Downloading individual or all terms.
* Working with relationships.
* Able to create arbitrary relationships between entities.
* e.g. associating a given column with a table.
* Able to upload relationship definitions.
* Deleting types (by name) or entities (by guid).
* Search (only for Azure Purview advanced search).
* Authentication to Azure Purview via Service Principal.
* Authentication using basic authentication of username and password.
* Authentication using basic authentication of username and password for open source Atlas.

## Quickstart

Expand All @@ -26,7 +32,13 @@ The package currently supports:
Create a wheel distribution file and install it in your environment.

```
<<<<<<< HEAD
python -m pip install pyapacheatlas
=======
python -m pip install wheel
python setup.py bdist_wheel
python -m pip install ./dist/pyapacheatlas-0.0b19-py3-none-any.whl
>>>>>>> 0f17280c7c75190a3b6b03ac42b8055b5ccc8e60
```

### Create a Client Connection
Expand Down Expand Up @@ -80,35 +92,19 @@ upload_results = client.upload_entities([ae.to_json()])

### Create Entities from Excel

Read from a standardized excel template to create table, column, table process, and column lineage entities. Follows / Requires the hive bridge style of column lineages.
Read from a standardized excel template that supports...

```
from pyapacheatlas.core import TypeCategory
from pyapacheatlas.scaffolding import column_lineage_scaffold
from pyapacheatlas.readers import ExcelConfiguration, ExcelReader
file_path = "./atlas_excel_template.xlsx"
# Create the Excel Template
ExcelReader.make_template(file_path)
# Populate the excel file manually!
* Bulk uploading entities into your data catalog.
* Creating custom table and column level lineage.
* Creating custom type definitions for datasets
* Creating custom lineage between existing assets / entities in your data catalog.

# Generate the base atlas type defs
all_type_defs = client.get_typedefs(TypeCategory.ENTITY)
See end to end samples for each scenario in the [excel samples](./samples/excel/README.md).

# Create objects for
ec = ExcelConfiguration()
excel_reader = ExcelReader(ec)
# Read from excel file and convert to
entities = excel_reader.parse_lineage(file_path, all_type_defs)
upload_results = client.upload_entities(entities)
print(json.dumps(upload,results,indent=1))
```
Learn more about the Excel [features and configuration in the wiki](https://github.com/wjohnson/pyapacheatlas/wiki/Excel-Template-and-Configuration).

## Additional Resources

* Learn more about this package in the github wiki.
* Learn more about this package in the [github wiki](https://github.com/wjohnson/pyapacheatlas/wiki/Excel-Template-and-Configuration).
* The [Apache Atlas client in Python](https://pypi.org/project/pyatlasclient/)
* The [Apache Atlas REST API](http://atlas.apache.org/api/v2/)
4 changes: 4 additions & 0 deletions pyapacheatlas/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
<<<<<<< HEAD
__version__ = "0.1.0"
=======
__version__ = "0.0b19"
>>>>>>> 0f17280c7c75190a3b6b03ac42b8055b5ccc8e60
64 changes: 59 additions & 5 deletions pyapacheatlas/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,28 +73,82 @@ def delete_entity(self, guid):

return results

def get_entity(self, guid):
def delete_type(self, name):
"""
Delete a type based on the given name.
:param str name: The name of the type you want to remove.
:return:
No content, should receive a 204 status code.
:rtype: None
"""
results = None

atlas_endpoint = self.endpoint_url + \
f"/types/typedef/name/{name}"
deleteType = requests.delete(
atlas_endpoint,
headers=self.authentication.get_authentication_headers())

try:
deleteType.raise_for_status()
except requests.RequestException:
raise Exception(deleteType.text)

results = {"message":f"successfully delete {name}"}
return results

def get_entity(self, guid=None, qualifiedName=None, typeName=None):
"""
Retrieve one or many guids from your Atlas backed Data Catalog.
:param guid: The guid or guids you want to retrieve
:param guid:
The guid or guids you want to retrieve. Not used if using typeName
and qualifiedName.
:type guid: Union[str, list(str)]
:param qualifiedName:
The qualified name of the entity you want to find. Must provide
typeName if using qualifiedName. You may search for multiple
qualified names under the same type. Ignored if using guid
parameter.
:type qualifiedName: Union[str, list(str)]
:param str typeName:
The type name of the entity you want to find. Must provide
qualifiedName if using typeName. Ignored if using guid parameter.
:return:
An AtlasEntitiesWithExtInfo object which includes a list of
entities and accessible with the "entities" key.
:rtype: dict(str, Union[list(dict),dict])
"""
results = None
parameters = {}

if isinstance(guid, list):
guid_str = '&guid='.join(guid)
else:
guid_str = guid

atlas_endpoint = self.endpoint_url + \
"/entity/bulk?guid={}".format(guid_str)
qualifiedName_params = dict()
if isinstance(qualifiedName, list):
qualifiedName_params = {
f"attr_{idx}:qualifiedName": qname
for idx, qname in enumerate(qualifiedName)
}
else:
qualifiedName_params = {"attr_0:qualifiedName": qualifiedName}

if qualifiedName and typeName:
atlas_endpoint = self.endpoint_url + \
f"/entity/bulk/uniqueAttribute/type/{typeName}"
parameters.update(qualifiedName_params)

else:
atlas_endpoint = self.endpoint_url + \
"/entity/bulk?guid={}".format(guid_str)

getEntity = requests.get(
atlas_endpoint,
params=parameters,
headers=self.authentication.get_authentication_headers()
)

Expand Down Expand Up @@ -187,7 +241,7 @@ def get_glossary(self, name="Glossary", guid=None, detailed=False):
for all glossaries.
Use detailed = True to return the full detail of terms
(AtlasGlossaryTerm) accessible via "termInfo" key.
:param str name:
The name of the glossary to use, defaults to "Glossary". Not
required if using the guid parameter.
Expand Down
2 changes: 2 additions & 0 deletions pyapacheatlas/core/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def __init__(self, name, typeName, qualified_name, guid=None, **kwargs):
self.guid = guid
self.attributes = kwargs.get("attributes", {})
self.attributes.update({"name": name, "qualifiedName": qualified_name})
if "description" in kwargs:
self.attributes.update({"description": kwargs["description"]})
# Example Relationship Attribute
# {"relationshipName": {
# "qualifiedName": "",
Expand Down
43 changes: 43 additions & 0 deletions pyapacheatlas/readers/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ExcelConfiguration(ReaderConfiguration):
def __init__(self, column_sheet="ColumnsLineage",
table_sheet="TablesLineage",
entityDef_sheet="EntityDefs", bulkEntity_sheet="BulkEntities",
updateLineage_sheet="UpdateLineage",
**kwargs):
"""
The following parameters apply to the
Expand Down Expand Up @@ -52,6 +53,7 @@ def __init__(self, column_sheet="ColumnsLineage",
self.table_sheet = table_sheet
self.entityDef_sheet = entityDef_sheet
self.bulkEntity_sheet = bulkEntity_sheet
self.updateLineage_sheet = updateLineage_sheet


class ExcelReader(Reader):
Expand Down Expand Up @@ -285,6 +287,43 @@ def parse_lineages(self, filepath, atlas_typedefs, use_column_mapping=False):

return output

def parse_update_lineage(self, filepath):
"""
Read a given excel file that conforms to the excel atlas template and
parse the (default) UpdateLineage tabe into existing process entities.
Assumes these process entities and any referenced entity exists.
Leave the qualifiedName cell blank on source or target to leave the
existing input or output (respectively) unchanged.
Use 'N/A' in the qualifiedName on source or target to 'destroy' the
existing input or output and overwrite with an empty list.
:param str filepath:
The xlsx file that contains your table and columns.
:return:
A list of Atlas Process entities representing the spreadsheet's
contents.
:rtype: list(dict)
"""
wb = load_workbook(filepath)

entities = []

if self.config.updateLineage_sheet not in wb.sheetnames:
raise KeyError("The sheet {} was not found".format(
self.config.updateLineage_sheet))

# Getting table entities
updateLineage_sheet = wb[self.config.updateLineage_sheet]
json_sheet = ExcelReader._parse_spreadsheet(updateLineage_sheet)
entities = super().parse_update_lineage(json_sheet)

wb.close()

return entities

@staticmethod
def _update_sheet_headers(headers, worksheet):
"""
Expand Down Expand Up @@ -321,6 +360,7 @@ def make_template(filepath):
tablesSheet = wb.create_sheet("TablesLineage")
entityDefsSheet = wb.create_sheet("EntityDefs")
bulkEntitiesSheet = wb.create_sheet("BulkEntities")
updateLineageSheet = wb.create_sheet("UpdateLineage")

ExcelReader._update_sheet_headers(
Reader.TEMPLATE_HEADERS["ColumnsLineage"], columnsSheet
Expand All @@ -334,6 +374,9 @@ def make_template(filepath):
ExcelReader._update_sheet_headers(
Reader.TEMPLATE_HEADERS["BulkEntities"], bulkEntitiesSheet
)
ExcelReader._update_sheet_headers(
Reader.TEMPLATE_HEADERS["UpdateLineage"], updateLineageSheet
)

wb.save(filepath)
wb.close()
70 changes: 70 additions & 0 deletions pyapacheatlas/readers/lineagemixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,73 @@ def parse_column_lineage(self, json_rows, atlas_entities, atlas_typedefs, use_co
)
output = [e for e in list(columnEntitiesOutput.values())]
return output

def _determine_dataset_to_use(self, qual_name, typeName):
results = []
if qual_name is None:
# This will allow us to do a partial update.
results = None
elif qual_name == "N/A":
# N/A is a special keyword that forces us to override
# and delete the existing input/output
results = []
else:
# This is an AtlasObjectId, necessary if we don't have the
# guid of the existing object or have it as a referenced object
results = [{"typeName": typeName,
"uniqueAttributes": {"qualifiedName": qual_name}}]
return results

def parse_update_lineage(self, json_rows):
"""
Take in UpdateLineage dictionaries and create the mutated Process
entities to be uploaded. All referenced entities must already exist
and be identified by their type and qualifiedName.
Assumes a None entry for target or source qualifiedNames means
"no change" to the existing entity. Using 'N/A' for the target or
source qualifiedNames will reset the existing input or output to an
empty list.
:param json_rows:
A list of dicts that contain the converted rows of your update
lineage spreadsheet.
:type json_rows: list(dict(str,str))
:return:
A list of Atlas Processes as dictionaries representing the updated
process entity.
:rtype: list(dict)
"""
results = []
sp = self.config.source_prefix
tp = self.config.target_prefix
pp = self.config.process_prefix
for row in json_rows:
try:
target_type = row[f"{tp} typeName"]
target_qual_name = row[f"{tp} qualifiedName"]
source_type = row[f"{sp} typeName"]
source_qual_name = row[f"{sp} qualifiedName"]
process_type = row[f"{pp} typeName"]
process_qual_name = row[f"{pp} qualifiedName"]
process_name = row[f"{pp} name"]
except KeyError:
raise Exception(json.dumps(row))

# Determine whether this should destroy one side, partial update
# (one side), or full update (both sides).
inputs = self._determine_dataset_to_use(
source_qual_name, source_type)
outputs = self._determine_dataset_to_use(
target_qual_name, target_type)
# Convert the target / source into
proc = AtlasProcess(
name=process_name,
typeName=process_type,
qualified_name=process_qual_name,
guid=self.guidTracker.get_guid(),
inputs=inputs,
outputs=outputs
)
results.append(proc.to_json())
return results
Loading

0 comments on commit 8b41812

Please sign in to comment.