Skip to content

Commit

Permalink
Initial Functionality Commit
Browse files Browse the repository at this point in the history
The package is capable of connecting to an Apache Atlas endpoint, downloading and uploading entities and typedefs.

It supports a column lineage scaffolding that is similar to the Hive Bridge way of doing column lineage.

Entity and Relationship Type Defs are supported with the class objects.  Realtionships have a default table and column mapping
as per the hive bridge style (endDef1 is pointing to a table and creates the columns attribute, endDef2 is pointing
at the column and creates the table attribute).

Finally, the pyapacheatlas package can be installed after building the wheel files.
  • Loading branch information
wjohnson committed Jul 11, 2020
0 parents commit d02b81b
Show file tree
Hide file tree
Showing 19 changed files with 809 additions and 0 deletions.
144 changes: 144 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# VS Code
.vscode

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# Personal edits
hive-reference/
20 changes: 20 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
MIT License

Copyright (c) 2020 Will Johnson

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# PyApacheAtlas

A python package to work with the Apache Atlas API and support bulk loading from different file types.
2 changes: 2 additions & 0 deletions pyapacheatlas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# TODO
# from .readers import from_excel
1 change: 1 addition & 0 deletions pyapacheatlas/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .ouathmsft import OAuthMSFT
4 changes: 4 additions & 0 deletions pyapacheatlas/auth/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
class AtlasAuthBase():

def __init__(self):
super().__init__()
37 changes: 37 additions & 0 deletions pyapacheatlas/auth/ouathmsft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from datetime import datetime
import json
import requests

class OAuthMSFT():

def __init__(self, tenant_id, client_id, client_secret):
super().__init__()

self.ouath_url = "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token"
self.data = {"resource": "https://management.core.windows.net/",
"client_id": client_id,
"grant_type": "client_credentials",
"client_secret": client_secret}
self.access_token = None
self.expiration = datetime.now()


def _set_access_token(self):
authResponse = requests.post(self.ouath_url, data=self.data)
if authResponse.status_code != 200:
authResponse.raise_for_status()

authJson = json.loads(authResponse.text)

self.access_token = authJson["access_token"]
self.expiration = datetime.fromtimestamp(int(authJson["expires_in"]))


def get_headers(self):
if self.expiration <= datetime.now():
self._set_access_token()

return {
"Authorization": "Bearer " + self.access_token,
"Content-Type": "application/json"
}
6 changes: 6 additions & 0 deletions pyapacheatlas/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .client import AtlasClient
from .entity import AtlasEntity
from .typedef import (
EntityTypeDef,
TypeCategory
)
109 changes: 109 additions & 0 deletions pyapacheatlas/core/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import json
import requests

from .typedef import TypeCategory
from .entity import AtlasEntity

class AtlasClient():

def __init__(self, endpoint_url, authentication = None):
super().__init__()
self.authentication = authentication
self.endpoint_url = endpoint_url


def get_entity(self, guid, use_cache = False):
results = None

if isinstance(guid, list):
guid_str = '&guid='.join(guid)
else:
guid_str = guid

atlas_endpoint = self.endpoint_url + "/entity/bulk?guid={}".format(guid_str)
getEntity = requests.get(atlas_endpoint, headers=self.authentication.get_headers())
results = json.loads(getEntity.text)

return results


def get_typedef(self, type_category, guid = None, name = None, use_cache = False):
results = None
atlas_endpoint = self.endpoint_url + "/types/{}def".format(type_category.value)

if guid:
atlas_endpoint = atlas_endpoint + '/guid/{}'.format(guid)
elif name:
atlas_endpoint = atlas_endpoint + '/name/{}'.format(name)

getEntity = requests.get(atlas_endpoint, headers=self.authentication.get_headers())
results = json.loads(getEntity.text)

return results


def upload_typedefs(self, typedefs):
# Should this take a list of type defs and figure out the formatting by itself?
# Should you pass in a AtlasTypesDef object and be forced to build it yourself?
results = None
atlas_endpoint = self.endpoint_url + "/types/typedefs"

payload = typedefs
required_keys = ["classificationDefs", "entityDefs", "enumDefs", "relationshipDefs", "structDefs"]
current_keys = list(typedefs.keys())

# Does the typedefs conform to the required pattern?
if not any([req in current_keys for req in required_keys]):
# Assuming this is a single typedef
payload = {typedefs.category.lower()+"Defs":[typedefs]}

postTypeDefs = requests.post(atlas_endpoint, json=payload,
headers=self.authentication.get_headers()
)
results = json.loads(postTypeDefs.text)

return results

@staticmethod
def _prepare_entity_upload(batch):
payload = batch
required_keys = ["entities"]

if isinstance(batch, list):
# It's a list, so we're assuming it's a list of entities
# TODO Incorporate AtlasEntity
payload = {"entities":batch}
elif isinstance(batch, dict):
current_keys = list(batch.keys())

# Does the dict entity conform to the required pattern?
if not any([req in current_keys for req in required_keys]):
# Assuming this is a single entity
# TODO Incorporate AtlasEntity
payload = {"entities":[batch]}
elif isinstance(batch, AtlasEntity):
payload = {"entities":[batch.to_json()]}

return payload

@staticmethod
def validate_entities(batch):
raise NotImplementedError


def upload_entities(self,batch):
# TODO Include a Do Not Overwrite call
results = None
atlas_endpoint = self.endpoint_url + "/entity/bulk"

payload = AtlasClient._prepare_entity_upload(batch)

postBulkEntities = requests.post(atlas_endpoint, json=payload,
headers=self.authentication.get_headers()
)
results = json.loads(postBulkEntities.text)

return results



37 changes: 37 additions & 0 deletions pyapacheatlas/core/entity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
class AtlasEntity():

def __init__(self, name, typeName, qualified_name, guid = None, **kwargs):
super().__init__()
self.typeName = typeName
self.guid = guid
self.attributes = kwargs.get("attributes", {})
self.attributes.update({"name": name, "qualifiedName": qualified_name})
# Example Relationship Attribute
# {"relationshipName": {
# "qualifiedName": "",
# "guid": "",
# "typeName": ""
# }}
self.relationshipAttributes = kwargs.get("relationshipAttributes", {})

def to_json(self, minimum=False):
if minimum:
output = {
"typeName": self.typeName,
"guid": self.guid,
"qualifiedName": self.attributes["qualifiedName"]
}
else:
output = {
"typeName": self.typeName,
"guid": self.guid,
"attributes": self.attributes,
"relationshipAttributes": self.relationshipAttributes
}
return output

class AtlasProcess(AtlasEntity):

def __init__(self, name, typeName, qualified_name, inputs, outputs, guid=None, **kwargs):
super().__init__(name, typeName, qualified_name, guid=guid, **kwargs)
self.attributes.update({"inputs": inputs, "outputs": outputs})
Loading

0 comments on commit d02b81b

Please sign in to comment.