From d02b81bcb20bd5edee49b00d5dd5c2e6b50bea09 Mon Sep 17 00:00:00 2001 From: Will Johnson Date: Sat, 11 Jul 2020 10:47:51 -0500 Subject: [PATCH] Initial Functionality Commit The package is capable of connecting to an Apache Atlas endpoint, downloading and uploading entities and typedefs. It supports a column lineage scaffolding that is similar to the Hive Bridge way of doing column lineage. Entity and Relationship Type Defs are supported with the class objects. Realtionships have a default table and column mapping as per the hive bridge style (endDef1 is pointing to a table and creates the columns attribute, endDef2 is pointing at the column and creates the table attribute). Finally, the pyapacheatlas package can be installed after building the wheel files. --- .gitignore | 144 ++++++++++++++++++ LICENSE | 20 +++ README.md | 3 + pyapacheatlas/__init__.py | 2 + pyapacheatlas/auth/__init__.py | 1 + pyapacheatlas/auth/base.py | 4 + pyapacheatlas/auth/ouathmsft.py | 37 +++++ pyapacheatlas/core/__init__.py | 6 + pyapacheatlas/core/client.py | 109 +++++++++++++ pyapacheatlas/core/entity.py | 37 +++++ pyapacheatlas/core/typedef.py | 95 ++++++++++++ pyapacheatlas/core/util.py | 21 +++ pyapacheatlas/scaffolding/__init__.py | 1 + pyapacheatlas/scaffolding/column_lineage.py | 86 +++++++++++ samples/upload_full_typedef.py | 23 +++ setup.py | 22 +++ tests/client/test_prep_validate.py | 72 +++++++++ tests/scaffolding/test_scaffolding_lineage.py | 98 ++++++++++++ tests/test_guid_tracker.py | 28 ++++ 19 files changed, 809 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 pyapacheatlas/__init__.py create mode 100644 pyapacheatlas/auth/__init__.py create mode 100644 pyapacheatlas/auth/base.py create mode 100644 pyapacheatlas/auth/ouathmsft.py create mode 100644 pyapacheatlas/core/__init__.py create mode 100644 pyapacheatlas/core/client.py create mode 100644 pyapacheatlas/core/entity.py create mode 100644 pyapacheatlas/core/typedef.py create mode 100644 pyapacheatlas/core/util.py create mode 100644 pyapacheatlas/scaffolding/__init__.py create mode 100644 pyapacheatlas/scaffolding/column_lineage.py create mode 100644 samples/upload_full_typedef.py create mode 100644 setup.py create mode 100644 tests/client/test_prep_validate.py create mode 100644 tests/scaffolding/test_scaffolding_lineage.py create mode 100644 tests/test_guid_tracker.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..00e393b --- /dev/null +++ b/.gitignore @@ -0,0 +1,144 @@ +# VS Code +.vscode + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# Personal edits +hive-reference/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..de79698 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +MIT License + +Copyright (c) 2020 Will Johnson + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, +INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF +CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE +OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..2fdf2e4 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# PyApacheAtlas + +A python package to work with the Apache Atlas API and support bulk loading from different file types. diff --git a/pyapacheatlas/__init__.py b/pyapacheatlas/__init__.py new file mode 100644 index 0000000..98d9553 --- /dev/null +++ b/pyapacheatlas/__init__.py @@ -0,0 +1,2 @@ +# TODO +# from .readers import from_excel \ No newline at end of file diff --git a/pyapacheatlas/auth/__init__.py b/pyapacheatlas/auth/__init__.py new file mode 100644 index 0000000..99ebd7c --- /dev/null +++ b/pyapacheatlas/auth/__init__.py @@ -0,0 +1 @@ +from .ouathmsft import OAuthMSFT \ No newline at end of file diff --git a/pyapacheatlas/auth/base.py b/pyapacheatlas/auth/base.py new file mode 100644 index 0000000..f9d1a55 --- /dev/null +++ b/pyapacheatlas/auth/base.py @@ -0,0 +1,4 @@ +class AtlasAuthBase(): + + def __init__(self): + super().__init__() \ No newline at end of file diff --git a/pyapacheatlas/auth/ouathmsft.py b/pyapacheatlas/auth/ouathmsft.py new file mode 100644 index 0000000..191fdf2 --- /dev/null +++ b/pyapacheatlas/auth/ouathmsft.py @@ -0,0 +1,37 @@ +from datetime import datetime +import json +import requests + +class OAuthMSFT(): + + def __init__(self, tenant_id, client_id, client_secret): + super().__init__() + + self.ouath_url = "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token" + self.data = {"resource": "https://management.core.windows.net/", + "client_id": client_id, + "grant_type": "client_credentials", + "client_secret": client_secret} + self.access_token = None + self.expiration = datetime.now() + + + def _set_access_token(self): + authResponse = requests.post(self.ouath_url, data=self.data) + if authResponse.status_code != 200: + authResponse.raise_for_status() + + authJson = json.loads(authResponse.text) + + self.access_token = authJson["access_token"] + self.expiration = datetime.fromtimestamp(int(authJson["expires_in"])) + + + def get_headers(self): + if self.expiration <= datetime.now(): + self._set_access_token() + + return { + "Authorization": "Bearer " + self.access_token, + "Content-Type": "application/json" + } \ No newline at end of file diff --git a/pyapacheatlas/core/__init__.py b/pyapacheatlas/core/__init__.py new file mode 100644 index 0000000..d7f61e4 --- /dev/null +++ b/pyapacheatlas/core/__init__.py @@ -0,0 +1,6 @@ +from .client import AtlasClient +from .entity import AtlasEntity +from .typedef import ( + EntityTypeDef, + TypeCategory +) \ No newline at end of file diff --git a/pyapacheatlas/core/client.py b/pyapacheatlas/core/client.py new file mode 100644 index 0000000..7066aa1 --- /dev/null +++ b/pyapacheatlas/core/client.py @@ -0,0 +1,109 @@ +import json +import requests + +from .typedef import TypeCategory +from .entity import AtlasEntity + +class AtlasClient(): + + def __init__(self, endpoint_url, authentication = None): + super().__init__() + self.authentication = authentication + self.endpoint_url = endpoint_url + + + def get_entity(self, guid, use_cache = False): + results = None + + if isinstance(guid, list): + guid_str = '&guid='.join(guid) + else: + guid_str = guid + + atlas_endpoint = self.endpoint_url + "/entity/bulk?guid={}".format(guid_str) + getEntity = requests.get(atlas_endpoint, headers=self.authentication.get_headers()) + results = json.loads(getEntity.text) + + return results + + + def get_typedef(self, type_category, guid = None, name = None, use_cache = False): + results = None + atlas_endpoint = self.endpoint_url + "/types/{}def".format(type_category.value) + + if guid: + atlas_endpoint = atlas_endpoint + '/guid/{}'.format(guid) + elif name: + atlas_endpoint = atlas_endpoint + '/name/{}'.format(name) + + getEntity = requests.get(atlas_endpoint, headers=self.authentication.get_headers()) + results = json.loads(getEntity.text) + + return results + + + def upload_typedefs(self, typedefs): + # Should this take a list of type defs and figure out the formatting by itself? + # Should you pass in a AtlasTypesDef object and be forced to build it yourself? + results = None + atlas_endpoint = self.endpoint_url + "/types/typedefs" + + payload = typedefs + required_keys = ["classificationDefs", "entityDefs", "enumDefs", "relationshipDefs", "structDefs"] + current_keys = list(typedefs.keys()) + + # Does the typedefs conform to the required pattern? + if not any([req in current_keys for req in required_keys]): + # Assuming this is a single typedef + payload = {typedefs.category.lower()+"Defs":[typedefs]} + + postTypeDefs = requests.post(atlas_endpoint, json=payload, + headers=self.authentication.get_headers() + ) + results = json.loads(postTypeDefs.text) + + return results + + @staticmethod + def _prepare_entity_upload(batch): + payload = batch + required_keys = ["entities"] + + if isinstance(batch, list): + # It's a list, so we're assuming it's a list of entities + # TODO Incorporate AtlasEntity + payload = {"entities":batch} + elif isinstance(batch, dict): + current_keys = list(batch.keys()) + + # Does the dict entity conform to the required pattern? + if not any([req in current_keys for req in required_keys]): + # Assuming this is a single entity + # TODO Incorporate AtlasEntity + payload = {"entities":[batch]} + elif isinstance(batch, AtlasEntity): + payload = {"entities":[batch.to_json()]} + + return payload + + @staticmethod + def validate_entities(batch): + raise NotImplementedError + + + def upload_entities(self,batch): + # TODO Include a Do Not Overwrite call + results = None + atlas_endpoint = self.endpoint_url + "/entity/bulk" + + payload = AtlasClient._prepare_entity_upload(batch) + + postBulkEntities = requests.post(atlas_endpoint, json=payload, + headers=self.authentication.get_headers() + ) + results = json.loads(postBulkEntities.text) + + return results + + + \ No newline at end of file diff --git a/pyapacheatlas/core/entity.py b/pyapacheatlas/core/entity.py new file mode 100644 index 0000000..dbdd412 --- /dev/null +++ b/pyapacheatlas/core/entity.py @@ -0,0 +1,37 @@ +class AtlasEntity(): + + def __init__(self, name, typeName, qualified_name, guid = None, **kwargs): + super().__init__() + self.typeName = typeName + self.guid = guid + self.attributes = kwargs.get("attributes", {}) + self.attributes.update({"name": name, "qualifiedName": qualified_name}) + # Example Relationship Attribute + # {"relationshipName": { + # "qualifiedName": "", + # "guid": "", + # "typeName": "" + # }} + self.relationshipAttributes = kwargs.get("relationshipAttributes", {}) + + def to_json(self, minimum=False): + if minimum: + output = { + "typeName": self.typeName, + "guid": self.guid, + "qualifiedName": self.attributes["qualifiedName"] + } + else: + output = { + "typeName": self.typeName, + "guid": self.guid, + "attributes": self.attributes, + "relationshipAttributes": self.relationshipAttributes + } + return output + +class AtlasProcess(AtlasEntity): + + def __init__(self, name, typeName, qualified_name, inputs, outputs, guid=None, **kwargs): + super().__init__(name, typeName, qualified_name, guid=guid, **kwargs) + self.attributes.update({"inputs": inputs, "outputs": outputs}) \ No newline at end of file diff --git a/pyapacheatlas/core/typedef.py b/pyapacheatlas/core/typedef.py new file mode 100644 index 0000000..2f4830b --- /dev/null +++ b/pyapacheatlas/core/typedef.py @@ -0,0 +1,95 @@ +import json +from enum import Enum + +class TypeCategory(Enum): + CLASSIFICATION="classification" + ENTITY="entity" + ENUM="enum" + RELATIONSHIP="relationship" + STRUCT="struct" + +class Cardinality(Enum): + SINGLE="SINGLE" + LIST="LIST" + SET="SET" + +class BaseTypeDef(): + + def __init__(self, name, **kwargs): + super().__init__() + self.category = kwargs.get("category").value.upper() + self.createTime = kwargs.get("createTime") + self.createdBy = kwargs.get("createdBy") + self.dateFormatter = kwargs.get("dateFormatter") + self.description = kwargs.get("description") + self.guid = kwargs.get("guid") + self.name = name + self.options = kwargs.get("options") + self.serviceType = kwargs.get("serviceType") + self.typeVersion = kwargs.get("typeVersion") + self.updateTime = kwargs.get("updateTime") + self.updatedBy = kwargs.get("updatedBy") + self.version = kwargs.get("version") + + def to_json(self, omit_nulls = True): + output = self.__dict__ + if omit_nulls: + output = {k:v for k,v in output.items() if v is not None} + return output + + +class EntityTypeDef(BaseTypeDef): + + def __init__(self, name, **kwargs): + kwargs["category"] = TypeCategory.ENTITY + super().__init__(name, **kwargs) + self.attributeDefs = kwargs.get("attributeDefs", []) + self.relationshipAttributeDefs = kwargs.get("relationshipAttributeDefs", []) + self.superTypes = kwargs.get("superTypes", []) + # Process supertype inherits inputs and outputs relationshipattribute + + def __str__(self): + return self.name + + +class RelationshipTypeDef(BaseTypeDef): + + @staticmethod + def default_columns_endDef(typeName): + return { + "type": typeName, + "name": "columns", + "isContainer": True, + "cardinality": "SET", + "isLegacyAttribute": True + } + + @staticmethod + def default_table_endDef(typeName): + return { + "type": typeName, + "name": "table", + "isContainer": False, + "cardinality": "SINGLE", + "isLegacyAttribute": True + } + + @staticmethod + def _decide_endDef(endDef, default_func): + output = None + + if isinstance(endDef, dict): + output = endDef + elif isinstance(endDef, str): + output = default_func(endDef) + + else: + raise NotImplementedError("endDef1 of type {} is not supported. Use string or dict.".format(type(endDef))) + return output + + def __init__(self, name, endDef1, endDef2, **kwargs): + kwargs["category"] = TypeCategory.RELATIONSHIP + super().__init__(name, **kwargs) + + self.endDef1 = RelationshipTypeDef._decide_endDef(endDef1, RelationshipTypeDef.default_columns_endDef) + self.endDef2 = RelationshipTypeDef._decide_endDef(endDef2, RelationshipTypeDef.default_table_endDef) diff --git a/pyapacheatlas/core/util.py b/pyapacheatlas/core/util.py new file mode 100644 index 0000000..450eda0 --- /dev/null +++ b/pyapacheatlas/core/util.py @@ -0,0 +1,21 @@ +class GuidTracker(): + + def __init__(self, starting, direction="decrease"): + _ALLOWED_DIRECTIONS = ["increase", "decrease"] + + if direction not in _ALLOWED_DIRECTIONS: + raise NotImplementedError("The direction of {} is not supported. Only {}".format(direction, _ALLOWED_DIRECTIONS)) + + self._guid = starting + self._direction = direction + + def _decide_next_guid(self): + return self._guid + (-1 if self._direction == "decrease" else 1) + + def get_guid(self): + self._guid = self._decide_next_guid() + return self._guid + + def peek_next_guid(self): + return self._decide_next_guid() + diff --git a/pyapacheatlas/scaffolding/__init__.py b/pyapacheatlas/scaffolding/__init__.py new file mode 100644 index 0000000..70867b4 --- /dev/null +++ b/pyapacheatlas/scaffolding/__init__.py @@ -0,0 +1 @@ +from .column_lineage import column_lineage_scaffold \ No newline at end of file diff --git a/pyapacheatlas/scaffolding/column_lineage.py b/pyapacheatlas/scaffolding/column_lineage.py new file mode 100644 index 0000000..88be5b9 --- /dev/null +++ b/pyapacheatlas/scaffolding/column_lineage.py @@ -0,0 +1,86 @@ +from ..core.typedef import * + +def column_lineage_scaffold(datasource, + column_attributes = None, + table_attributes = None, + table_column_relationship_attributes = None, + column_lineage_attributes = None, + table_process_attributes = None, + useColumnMapping = False, + column_lineage_process_attributes = None +): + # TODO: Create all combinations of datasource + # Define {datasource}_column + column_entity = EntityTypeDef( + name="{}_column".format(datasource), + superTypes=["DataSet"] + ) + # Define {datasource}_table + table_entity = EntityTypeDef( + name="{}_table".format(datasource), + superTypes=["DataSet"] + ) + # Define {datasource}_table_columns relationship () + table_column_relationship = RelationshipTypeDef( + name="{}_table_columns".format(datasource), + endDef1 = table_entity.name, + endDef2 = column_entity.name + ) + + # Define {datasource}_column_lineage + column_lineage_process_entity = EntityTypeDef( + name="{}_column_lineage".format(datasource), + superTypes=["Process"], + ) + + # Define {datasource}_process + table_process_entity = EntityTypeDef( + name="{}_process".format(datasource), + superTypes=["Process"], + ) + if useColumnMapping: + table_process_entity.attributeDefs.append( + { + "name": "columnMapping", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": False, + "isOptional": True, + "isUnique": False + } + ) + + # Define {datasource}_process_column_lineage + table_process_column_lineage_relationship = RelationshipTypeDef( + name="{}_process_column_lineage".format(datasource), + endDef1 = { + "type": column_lineage_process_entity.name, + "name": "query", + "isContainer": False, + "cardinality": "SINGLE", + "isLegacyAttribute": True + }, + endDef2 = { + "type": table_process_entity.name, + "name": "columnLineages", + "isContainer": True, + "cardinality": "SET", + "isLegacyAttribute": False + } + ) + + + # Output composite entity + output = { + "entityDefs":[ + column_entity.to_json(), + table_entity.to_json(), + column_lineage_process_entity.to_json(), + table_process_entity.to_json() + ], + "relationshipDefs":[ + table_column_relationship.to_json(), + table_process_column_lineage_relationship.to_json() + ] + } + return output \ No newline at end of file diff --git a/samples/upload_full_typedef.py b/samples/upload_full_typedef.py new file mode 100644 index 0000000..8669630 --- /dev/null +++ b/samples/upload_full_typedef.py @@ -0,0 +1,23 @@ +import os + +from pyapacheatlas.core import AtlasClient +from pyapacheatlas.auth import OAuthMSFT +import pyapacheatlas as pyaa + +from dotenv import load_env + +if __name__ == "__main__": + load_env() + + oauth = OAuthMSFT( + tenant_id = os.environ.get("TENANT_ID"), + client_id = os.environ.get("CLIENT_ID"), + client_secret = os.environ.get("CLIENT_SECRET") + ) + atlas_client = AtlasClient( + endpoint_url = os.environ.get("ENDPOINT_URL"), + authentication = oauth + ) + + + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..8b01918 --- /dev/null +++ b/setup.py @@ -0,0 +1,22 @@ +import setuptools + +with open("README.md", "r") as fh: + long_description = fh.read() + +setuptools.setup( + name="pyapacheatlas", + version="0.0.1", + author="Will Johnson", + author_email="will@willj.com", + description="A package to simplify working with the Apache Atlas REST APIs and support bulk loading from files.", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/wjohnson/pyapacheatlas", + packages=setuptools.find_packages(), + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + ], + python_requires='>=3.6', +) \ No newline at end of file diff --git a/tests/client/test_prep_validate.py b/tests/client/test_prep_validate.py new file mode 100644 index 0000000..8becb3f --- /dev/null +++ b/tests/client/test_prep_validate.py @@ -0,0 +1,72 @@ +import json +from pyapacheatlas.core import AtlasClient, AtlasEntity + +sample_entity = { + "typeName": "hive_column", + "attributes": { + "owner": "admin", + "replicatedTo": [], + "replicatedFrom": [], + "qualifiedName": "hivedbtest.point_derived.y_value@primary", + "name": "y_value", + "description": None, + "comment": None, + "position": 1, + "type": "int", + "table": { + "guid": "79e5659a-70c9-4ac9-bced-d28ac86a60cd", + "typeName": "hive_table" + } + }, + "guid": "95f5da92-545b-44ac-8393-427f706cc7bb", + "relationshipAttributes": { + "inputToProcesses": [], + "schema": [], + "attachedSchema": [], + "meanings": [], + "table": { + "guid": "79e5659a-70c9-4ac9-bced-d28ac86a60cd", + "typeName": "hive_table", + "entityStatus": "ACTIVE", + "displayText": "point_derived", + "relationshipType": "hive_table_columns", + "relationshipGuid": "1dc9aed8-011e-4c0f-b879-90ba3c59ef78", + "relationshipStatus": "ACTIVE", + "relationshipAttributes": { + "typeName": "hive_table_columns" + } + } + } +} + + +def test_prepare_bulk_entity_from_list(): + results = AtlasClient._prepare_entity_upload([sample_entity]) + + expected = {"entities": [sample_entity]} + + assert(results == expected) + +def test_prepare_bulk_entity_from_dict(): + results = AtlasClient._prepare_entity_upload({"entities":[sample_entity]}) + + expected = {"entities": [sample_entity]} + + assert(results == expected) + +def test_prepare_bulk_entity_from_atlas_entity(): + + class_entity = AtlasEntity( + name=sample_entity["attributes"]["name"], + typeName=sample_entity["typeName"], + qualified_name=sample_entity["attributes"]["qualifiedName"], + attributes=sample_entity["attributes"], + guid=sample_entity["guid"], + relationshipAttributes= sample_entity["relationshipAttributes"] + ) + + results = AtlasClient._prepare_entity_upload(class_entity) + + expected = {"entities": [sample_entity]} + + assert(results == expected) \ No newline at end of file diff --git a/tests/scaffolding/test_scaffolding_lineage.py b/tests/scaffolding/test_scaffolding_lineage.py new file mode 100644 index 0000000..4043010 --- /dev/null +++ b/tests/scaffolding/test_scaffolding_lineage.py @@ -0,0 +1,98 @@ +import json +from pyapacheatlas.scaffolding import column_lineage_scaffold + + +def test_column_lineage_scaffolding(): + scaffolding = column_lineage_scaffold("demo", useColumnMapping=True) + + results = json.dumps(scaffolding).replace(" ", "") + + expected = """{ + "entityDefs": [ + { + "category": "ENTITY", + "name": "demo_column", + "attributeDefs": [], + "relationshipAttributeDefs": [], + "superTypes": [ + "DataSet" + ] + }, + { + "category": "ENTITY", + "name": "demo_table", + "attributeDefs": [], + "relationshipAttributeDefs": [], + "superTypes": [ + "DataSet" + ] + }, + { + "category": "ENTITY", + "name": "demo_column_lineage", + "attributeDefs": [], + "relationshipAttributeDefs": [], + "superTypes": [ + "Process" + ] + }, + { + "category": "ENTITY", + "name": "demo_process", + "attributeDefs": [ + { + "name": "columnMapping", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": true, + "isUnique": false + } + ], + "relationshipAttributeDefs": [], + "superTypes": [ + "Process" + ] + } + ], + "relationshipDefs": [ + { + "category": "RELATIONSHIP", + "name": "demo_table_columns", + "endDef1": { + "type": "demo_table", + "name": "columns", + "isContainer": true, + "cardinality": "SET", + "isLegacyAttribute": true + }, + "endDef2": { + "type": "demo_column", + "name": "table", + "isContainer": false, + "cardinality": "SINGLE", + "isLegacyAttribute": true + } + }, + { + "category": "RELATIONSHIP", + "name": "demo_process_column_lineage", + "endDef1": { + "type": "demo_column_lineage", + "name": "query", + "isContainer": false, + "cardinality": "SINGLE", + "isLegacyAttribute": true + }, + "endDef2": { + "type": "demo_process", + "name": "columnLineages", + "isContainer": true, + "cardinality": "SET", + "isLegacyAttribute": false + } + } + ] +} +""".strip().replace('\n', ' ').replace(" ", "") + assert(expected == results) \ No newline at end of file diff --git a/tests/test_guid_tracker.py b/tests/test_guid_tracker.py new file mode 100644 index 0000000..6396333 --- /dev/null +++ b/tests/test_guid_tracker.py @@ -0,0 +1,28 @@ +import sys +sys.path.append('.') + +from pyapacheatlas.core.util import GuidTracker + +def test_guid_tracker_get_and_decrement(): + + gt = GuidTracker(-100, "decrease") + results = gt.get_guid() + + expected = -101 + + assert(expected == results) + + second_expected = -102 + second_results = gt.get_guid() + + assert(second_expected == second_results) + +def test_peek(): + gt = GuidTracker(-100, "decrease") + peek_results = gt.peek_next_guid() + results = gt.get_guid() + + expected = -101 + + assert(expected == results) + assert(results == peek_results) \ No newline at end of file