diff --git a/README.md b/README.md index 313bee7..606b72f 100644 --- a/README.md +++ b/README.md @@ -22,14 +22,18 @@ The package currently supports: ### Build and Install from Source +Create a wheel distribution file and install it in your environment. + ``` +python -m pip install wheel python setup.py bdist_wheel -python -m pip install ./dist/pyapacheatlas-0.0.2-py3-none-any.whl +python -m pip install ./dist/pyapacheatlas-0.0b10-py3-none-any.whl ``` ### Create a Client Connection -Provides connectivity to your Atlas / Data Catalog service. Supports getting and uploading entities and type defs. +Provides connectivity to your Atlas / Data Catalog service. +Supports getting and uploading entities and type defs. ``` from pyapacheatlas.auth import ServicePrincipalAuthentication @@ -41,6 +45,9 @@ auth = ServicePrincipalAuthentication( client_secret = "" ) +# Azure Data Catalog Endpoints are: +# https://{your_catalog_name}.catalog.babylon.azure.com/api/atlas/v2 + client = AtlasClient( endpoint_url = "https://MYENDPOINT/api/atlas/v2", auth = auth @@ -77,15 +84,13 @@ upload_results = client.upload_entities([ae.to_json()]) Read from a standardized excel template to create table, column, table process, and column lineage entities. Follows / Requires the hive bridge style of column lineages. ``` -from pyapacheatlas.core import GuidTracker, TypeCategory +from pyapacheatlas.core import TypeCategory from pyapacheatlas.scaffolding import column_lineage_scaffold -from pyapacheatlas.scaffolding.templates import excel_template -from pyapacheatlas import from_excel -from pyapacheatlas.readers.excel import ExcelConfiguration +from pyapacheatlas.readers import ExcelConfiguration, ExcelReader file_path = "./atlas_excel_template.xlsx" # Create the Excel Template -excel_template(file_path) +ExcelReader.make_template(file_path) # Populate the excel file manually! @@ -93,15 +98,12 @@ excel_template(file_path) all_type_defs = client.get_typedefs(TypeCategory.ENTITY) # Create objects for -guid_tracker = GuidTracker() -excel_config = ExcelConfiguration() +ec = ExcelConfiguration() +excel_reader = ExcelReader(ec) # Read from excel file and convert to -entities = from_excel(file_path, excel_config, guid_tracker) - -# Prepare a batch by converting everything to json -batch = {"entities":[e.to_json() for e in entities]} +entities = excel_reader.parse_lineage(file_path, all_type_defs) -upload_results = client.upload_entities(batch) +upload_results = client.upload_entities(entities) print(json.dumps(upload,results,indent=1)) ``` diff --git a/docs/source/index.rst b/docs/source/index.rst index d578569..929ee2d 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -16,7 +16,6 @@ Welcome to PyApacheAtlas's documentation! pyapacheatlas.core.rst pyapacheatlas.readers.rst pyapacheatlas.scaffolding.rst - pyapacheatlas.scaffolding.templates.rst Indices and tables ================== diff --git a/docs/source/pyapacheatlas.readers.core.rst b/docs/source/pyapacheatlas.readers.core.rst deleted file mode 100644 index 56eaea3..0000000 --- a/docs/source/pyapacheatlas.readers.core.rst +++ /dev/null @@ -1,18 +0,0 @@ -pyapacheatlas.readers.core package -================================== - -pyapacheatlas.readers.core.column module ----------------------------------------- - -.. automodule:: pyapacheatlas.readers.core.column - :members: - :undoc-members: - :show-inheritance: - -pyapacheatlas.readers.core.table module ---------------------------------------- - -.. automodule:: pyapacheatlas.readers.core.table - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/source/pyapacheatlas.readers.rst b/docs/source/pyapacheatlas.readers.rst index 2bda3f1..de73bef 100644 --- a/docs/source/pyapacheatlas.readers.rst +++ b/docs/source/pyapacheatlas.readers.rst @@ -1,16 +1,24 @@ pyapacheatlas.readers package ============================= -Subpackages ------------ +Submodules +---------- -.. toctree:: - :maxdepth: 4 +pyapacheatlas.readers.reader module +----------------------------------- - pyapacheatlas.readers.core +.. automodule:: pyapacheatlas.readers.reader + :members: + :undoc-members: + :show-inheritance: -Submodules ----------- +pyapacheatlas.readers.lineagemixin module +----------------------------------------- + +.. automodule:: pyapacheatlas.readers.lineagemixin + :members: + :undoc-members: + :show-inheritance: pyapacheatlas.readers.excel module ---------------------------------- diff --git a/docs/source/pyapacheatlas.scaffolding.rst b/docs/source/pyapacheatlas.scaffolding.rst index 640c79f..51c266b 100644 --- a/docs/source/pyapacheatlas.scaffolding.rst +++ b/docs/source/pyapacheatlas.scaffolding.rst @@ -1,14 +1,6 @@ pyapacheatlas.scaffolding package ================================= -Subpackages ------------ - -.. toctree:: - :maxdepth: 4 - - pyapacheatlas.scaffolding.templates - Submodules ---------- diff --git a/docs/source/pyapacheatlas.scaffolding.templates.rst b/docs/source/pyapacheatlas.scaffolding.templates.rst deleted file mode 100644 index 8e6fbc4..0000000 --- a/docs/source/pyapacheatlas.scaffolding.templates.rst +++ /dev/null @@ -1,10 +0,0 @@ -pyapacheatlas.scaffolding.templates package -=========================================== - -pyapacheatlas.scaffolding.templates.excel module ------------------------------------------------- - -.. automodule:: pyapacheatlas.scaffolding.templates.excel - :members: - :undoc-members: - :show-inheritance: diff --git a/pyapacheatlas/__init__.py b/pyapacheatlas/__init__.py index dfeab16..03ff1f9 100644 --- a/pyapacheatlas/__init__.py +++ b/pyapacheatlas/__init__.py @@ -1,3 +1 @@ -from .readers import from_excel - -__version__ = "0.0b9" +__version__ = "0.0b10" diff --git a/pyapacheatlas/core/__init__.py b/pyapacheatlas/core/__init__.py index 61a4b0f..db86797 100644 --- a/pyapacheatlas/core/__init__.py +++ b/pyapacheatlas/core/__init__.py @@ -1,8 +1,8 @@ from .client import AtlasClient -from .entity import AtlasEntity -from .entity import AtlasProcess +from .entity import AtlasEntity, AtlasProcess from .typedef import ( + AtlasAttributeDef, EntityTypeDef, + RelationshipTypeDef, TypeCategory ) -from .util import GuidTracker diff --git a/pyapacheatlas/readers/__init__.py b/pyapacheatlas/readers/__init__.py index 570698b..06012a7 100644 --- a/pyapacheatlas/readers/__init__.py +++ b/pyapacheatlas/readers/__init__.py @@ -1 +1 @@ -from .excel import excel_typeDefs, from_excel +from .excel import ExcelConfiguration, ExcelReader diff --git a/pyapacheatlas/readers/core/__init__.py b/pyapacheatlas/readers/core/__init__.py deleted file mode 100644 index 2e09090..0000000 --- a/pyapacheatlas/readers/core/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .column import to_column_entities -from .table import to_table_entities -from .entitydef import to_entityDefs -from .bulkEntities import to_bulkEntities \ No newline at end of file diff --git a/pyapacheatlas/readers/core/bulkEntities.py b/pyapacheatlas/readers/core/bulkEntities.py deleted file mode 100644 index 9786ede..0000000 --- a/pyapacheatlas/readers/core/bulkEntities.py +++ /dev/null @@ -1,40 +0,0 @@ -from warnings import warn -from ...core import AtlasEntity -from ...core import GuidTracker -from ...readers.util import string_to_classification - - -def to_bulkEntities(json_rows, starting_guid=-1000): - """ - Create an AtlasTypeDef consisting of entities and their attributes for the given json_rows. - - :param list(dict(str,str)) json_rows: - A list of dicts containing at least `Entity TypeName` and `name` - that represents the metadata for a given entity type's attributeDefs. - Extra metadata will be ignored. - :return: An AtlasTypeDef with entityDefs for the provided rows. - :rtype: dict(str, list(dict)) - """ - # For each row, - # Extract the - # Extract any additional attributes - req_attribs = ["typeName", "name", "qualifiedName", "classifications"] - gt = GuidTracker(starting_guid) - output = {"entities": []} - for row in json_rows: - # Remove any cell with a None / Null attribute - # Remove the required attributes so they're not double dipping. - custom_attributes = { - k: v for k, v in row.items() - if k not in req_attribs and v is not None - } - entity = AtlasEntity( - name=row["name"], - typeName=row["typeName"], - qualified_name=row["qualifiedName"], - guid=gt.get_guid(), - attributes=custom_attributes, - classifications=string_to_classification(row["classifications"]) - ).to_json() - output["entities"].append(entity) - return output diff --git a/pyapacheatlas/readers/core/column.py b/pyapacheatlas/readers/core/column.py deleted file mode 100644 index 2106da9..0000000 --- a/pyapacheatlas/readers/core/column.py +++ /dev/null @@ -1,240 +0,0 @@ -import json -from ...core import AtlasEntity, AtlasProcess -from ..util import * - - -def to_column_entities(json_rows, excel_config, guid_tracker, atlas_entities, atlas_typedefs, use_column_mapping=False): - """ - :param json_rows: - A list of dicts that contain the converted rows of your column spreadsheet. - :type json_rows: list(dict(str,str)) - :param ~pyapacheatlas.readers.excel.ExcelConfiguration excel_config: - An excel configuration object to indicate any customizations to the template excel. - :param ~pyapacheatlas.core.util.GuidTracker guid_tracker: - A guid tracker to be used in incrementing / decrementing the guids in use. - :param atlas_entities: - A list of :class:`~pyapacheatlas.core.entity.AtlasEntity` containing the referred entities. - :type atlas_entities: list(:class:`~pyapacheatlas.core.entity.AtlasEntity`) - :param dict(str,list(dict)) atlas_typedefs: - The results of requesting all type defs from Apache Atlas, including - entityDefs, relationshipDefs, etc. relationshipDefs are the only - values used. - :param bool use_column_mapping: - Should the table processes include the columnMappings attribute - that represents Column Lineage in Azure Data Catalog. - Defaults to False. - :return: - A list of atlas entities that represent your column source, target, - and column lineage processes. - :rtype: list(:class:`~pyapacheatlas.core.entity.AtlasEntity`) - - """ - # Required attributes - # NOTE: Classification is not actually required but it's being included to avoid being roped in as an attribute - source_table_name_header = excel_config.entity_source_prefix + " Table" - source_column_name_header = excel_config.entity_source_prefix + " Column" - source_column_classifications_header = excel_config.entity_source_prefix + " Classifications" - required_source_headers = [source_column_name_header, - source_table_name_header, source_column_classifications_header] - - target_table_name_header = excel_config.entity_target_prefix + " Table" - target_column_name_header = excel_config.entity_target_prefix + " Column" - target_column_classifications_header = excel_config.entity_target_prefix + " Classifications" - required_target_headers = [target_column_name_header, - target_table_name_header, target_column_classifications_header] - - transformation_column_header = excel_config.column_transformation_name - # No required process headers - - output = [] - tables = {} # Stores all of the table entities by their name seen in the loop for faster lookup - # table_process_guid: {"input_table":"","output_table":"","columnMapping":[]} - dataset_mapping = {} - # Caches all of the table processes seen in the loop for faster lookup - table_and_proc_mappings = {} - - for row in json_rows: - # Set up defaults - target_entity, source_entity, process_entity = None, None, None - target_entity_table_name, source_entity_table_name = None, None - # Given the existing table entity in atlas_entities, look up the appropriate column type - target_entity_table_name = row[target_table_name_header] - if target_entity_table_name not in tables: - target_table_entity = first_entity_matching_attribute( - "name", target_entity_table_name, atlas_entities) - tables[target_entity_table_name] = target_table_entity - - columns_relationship = first_relationship_that_matches( - end_def="endDef1", - end_def_type=target_table_entity.typeName, - end_def_name="columns", - relationship_typedefs=atlas_typedefs["relationshipDefs"] - ) - target_col_type = columns_relationship["endDef2"]["type"] - - # There should always be a target - target_entity = AtlasEntity( - name=row[target_column_name_header], - typeName=target_col_type, - # qualifiedName can be overwritten via the attributes functionality - qualified_name=target_entity_table_name + \ - "#" + row[target_column_name_header], - guid=guid_tracker.get_guid(), - attributes=columns_matching_pattern( - row, excel_config.entity_target_prefix, does_not_match=required_target_headers), - # TODO: Make the relationship name more dynamic instead of hard coding table - relationshipAttributes={ - "table": tables[target_entity_table_name].to_json(minimum=True)}, - classifications=string_to_classification( - row.get(target_column_classifications_header)) - ) - if target_entity in output: - poppable_index = output.index(target_entity) - popped_target = output.pop(poppable_index) - target_entity.merge(popped_target) - # Add to outputs - output.append(target_entity) - - # Source Column is optiona in the spreadsheet - if row[source_table_name_header] is not None: - # Given the existing source table entity in atlas_entities, look up the appropriate column type - source_entity_table_name = row[source_table_name_header] - - if source_entity_table_name not in tables: - source_table_entity = first_entity_matching_attribute( - "name", source_entity_table_name, atlas_entities) - tables[source_entity_table_name] = source_table_entity - - columns_relationship_source = first_relationship_that_matches( - end_def="endDef1", - end_def_type=source_table_entity.typeName, - end_def_name="columns", - relationship_typedefs=atlas_typedefs["relationshipDefs"] - ) - source_col_type = columns_relationship_source["endDef2"]["type"] - - source_entity = AtlasEntity( - name=row[source_column_name_header], - typeName=source_col_type, - # qualifiedName can be overwritten via the attributes functionality - qualified_name=source_entity_table_name + \ - "#" + row[source_column_name_header], - guid=guid_tracker.get_guid(), - attributes=columns_matching_pattern( - row, excel_config.entity_source_prefix, does_not_match=required_source_headers), - # TODO: Make the relationship name more dynamic instead of hard coding query - relationshipAttributes={ - "table": tables[source_entity_table_name].to_json(minimum=True)}, - classifications=string_to_classification( - row.get(source_column_classifications_header)) - ) - if source_entity in output: - poppable_index = output.index(source_entity) - popped_source = output.pop(poppable_index) - source_entity.merge(popped_source) - # Add to outputs - output.append(source_entity) - - # Given the existing process that with target table and source table types, - # look up the appropriate column_lineage type - # LIMITATION: Prevents you from specifying multiple processes for the same input and output tables - try: - table_process = first_process_containing_io( - source_entity_table_name, target_entity_table_name, atlas_entities) - except ValueError as e: - # ValueError means we didn't find anything that matched - # Try using a wildcard search if the source entity is none - # to match to any process that at least includes the target entity table - if source_entity_table_name is None: - table_process = first_process_containing_io( - "*", target_entity_table_name, atlas_entities) - else: - raise e - - if table_process.get_name() in table_and_proc_mappings: - process_type = table_and_proc_mappings[table_process.get_name( - )]["column_lineage_type"] - else: - process_type = from_process_lookup_col_lineage( - table_process.get_name(), - atlas_entities, - atlas_typedefs["relationshipDefs"] - ) - table_and_proc_mappings[table_process.get_name()] = { - "column_lineage_type": process_type - } - - # Assuming there is always a Process for adding at least the target table - process_attributes = columns_matching_pattern( - row, excel_config.entity_process_prefix) - process_attributes.update({"dependencyType": "SIMPLE"}) - if row[transformation_column_header] is not None: - process_attributes.update( - {"dependencyType": "EXPRESSION", "expression": row[transformation_column_header]}) - - process_entity = AtlasProcess( - name=table_process.get_name(), - typeName=process_type, - # qualifiedName can be overwritten via the attributes functionality - qualified_name=table_process.get_name() + "@derived_column:{}".format( - target_entity.get_name() - ), - guid=guid_tracker.get_guid(), - # Assuming always a single output - inputs=[] if source_entity is None else [ - source_entity.to_json(minimum=True)], - outputs=[target_entity.to_json(minimum=True)], - attributes=process_attributes, - # TODO: Make the relationship name more dynamic instead of hard coding query - relationshipAttributes={ - "query": table_process.to_json(minimum=True)} - ) - if process_entity in output: - poppable_index = output.index(process_entity) - popped_process = output.pop(poppable_index) - process_entity.merge(popped_process) - output.append(process_entity) - - if use_column_mapping: - # Handles multiple source columns from multiple source datasets - col_map_source_col = "*" if source_entity is None else source_entity.get_name() - col_map_target_col = target_entity.get_name() - col_map_source_table = row[source_table_name_header] or "*" - col_map_target_table = row[target_table_name_header] - hash_key = hash(col_map_source_table + col_map_target_table) - col_map_dict = {"Source": col_map_source_col, - "Sink": col_map_target_col} - data_map_dict = {"Source": col_map_source_table, - "Sink": col_map_target_table} - - if table_process.guid in dataset_mapping: - if hash_key in dataset_mapping[table_process.guid]: - # Hash Key has not been seen before - dataset_mapping[table_process.guid][hash_key]["ColumnMapping"].append( - col_map_dict) - else: - # Hash key has been seen before - dataset_mapping[table_process.guid][hash_key] = { - "ColumnMapping": [col_map_dict], - "DatasetMapping": data_map_dict - } - else: - # This guid has never been seen before - dataset_mapping[table_process.guid] = { - hash_key: { - "ColumnMapping": [col_map_dict], - "DatasetMapping": data_map_dict - } - } - # Update the passed in atlas_entities if we are using column mapping - if use_column_mapping: - for entity in atlas_entities: - if entity.guid in dataset_mapping: - # hash_key: {DSMap:{}, ColumnMapping} - column_mapping_attribute = [ - mappings for mappings in dataset_mapping[entity.guid].values()] - entity.attributes.update( - {"columnMapping": json.dumps(column_mapping_attribute)} - ) - - return output diff --git a/pyapacheatlas/readers/core/entitydef.py b/pyapacheatlas/readers/core/entitydef.py deleted file mode 100644 index 3e73ed2..0000000 --- a/pyapacheatlas/readers/core/entitydef.py +++ /dev/null @@ -1,62 +0,0 @@ -from warnings import warn -from ...core.typedef import AtlasAttributeDef, EntityTypeDef - - -def to_entityDefs(json_rows): - """ - Create an AtlasTypeDef consisting of entityDefs for the given json_rows. - - :param list(dict(str,str)) json_rows: - A list of dicts containing at least `Entity TypeName` and `name` - that represents the metadata for a given entity type's attributeDefs. - Extra metadata will be ignored. - :return: An AtlasTypeDef with entityDefs for the provided rows. - :rtype: dict(str, list(dict)) - """ - entities = dict() - attribute_metadata_seen = set() - output = {"entityDefs": []} - # Required attributes - # Get all the attributes it's expecting official camel casing - # with the exception of "Entity TypeName" - for row in json_rows: - try: - entityTypeName = row["Entity TypeName"] - except KeyError: - raise KeyError("Entity TypeName not foud in {}".format(row)) - - _ = row.pop("Entity TypeName") - # Update all seen attribute metadata - columns_in_row = list(row.keys()) - attribute_metadata_seen = attribute_metadata_seen.union( - set(columns_in_row)) - # Remove any null cells, otherwise the AttributeDefs constructor - # doesn't use the defaults. - for column in columns_in_row: - if row[column] is None: - _ = row.pop(column) - - json_entity_def = AtlasAttributeDef(**row).to_json() - - if entityTypeName not in entities: - entities[entityTypeName] = [] - - entities[entityTypeName].append(json_entity_def) - - # Create the entitydefs - for entityType in entities: - local_entity_def = EntityTypeDef( - name=entityType, - attributeDefs=entities[entityType] - ).to_json() - output["entityDefs"].append(local_entity_def) - - # Extra attribute metadata (e.g. extra columns / json entries) are ignored. - # Warn the user that this metadata will be ignored. - extra_metadata_warnings = [ - i for i in attribute_metadata_seen if i not in AtlasAttributeDef.propertiesEnum] - for extra_metadata in extra_metadata_warnings: - warn("The attribute metadata \"{}\" is not a part of the Atlas Attribute Def and will be ignored.".format( - extra_metadata)) - - return output diff --git a/pyapacheatlas/readers/core/table.py b/pyapacheatlas/readers/core/table.py deleted file mode 100644 index dc6aecd..0000000 --- a/pyapacheatlas/readers/core/table.py +++ /dev/null @@ -1,112 +0,0 @@ -from ...core import AtlasEntity, AtlasProcess -from ..util import * - - -def to_table_entities(json_rows, excel_config, guid_tracker): - """ - Converts the "tables" information into Atlas Entities for Target, Source, - and Process types. Currently only support one target from one source. - - :param json_rows: - A list of dicts that contain the converted tables of your column spreadsheet. - :type json_rows: list(dict(str,str)) - :param ~pyapacheatlas.readers.excel.ExcelConfiguration excel_config: - An excel configuration object to indicate any customizations to the template excel. - :param ~pyapacheatlas.core.util.GuidTracker guid_tracker: - A guid tracker to be used in incrementing / decrementing the guids in use. - :return: - A list of atlas entities that represent your source, target, - and table processes. - :rtype: list(:class:`~pyapacheatlas.core.entity.AtlasEntity`) - - """ - # Required attributes - # NOTE: Classification is not actually required but it's being included to avoid being roped in as an attribute - source_table_name_header = excel_config.entity_source_prefix + " Table" - source_table_type_column = excel_config.entity_source_prefix + " Type" - source_table_classifications_header = excel_config.entity_source_prefix + " Classifications" - required_source_headers = [source_table_name_header, - source_table_type_column, source_table_classifications_header] - - target_table_name_header = excel_config.entity_target_prefix + " Table" - target_table_type_column = excel_config.entity_target_prefix + " Type" - target_table_classifications_header = excel_config.entity_target_prefix + " Classifications" - required_target_headers = [target_table_name_header, - target_table_type_column, target_table_classifications_header] - - process_name_column = excel_config.entity_process_prefix + " Name" - process_type_column = excel_config.entity_process_prefix + " Type" - required_process_headers = [process_name_column, process_type_column] - - # Read in all Source and Target entities - output = list() # TODO: Change to a dict to facilitate lookups - for row in json_rows: - # Set up defaults - target_entity, source_entity, process_entity = None, None, None - # Always expecting a TARGET in the sheet - target_entity = AtlasEntity( - name=row[target_table_name_header], - typeName=row[target_table_type_column], - # qualifiedName can be overwritten via the attributes functionality - qualified_name=row[target_table_name_header], - guid=guid_tracker.get_guid(), - attributes=columns_matching_pattern( - row, excel_config.entity_target_prefix, does_not_match=required_target_headers), - classifications=string_to_classification( - row.get(target_table_classifications_header)) - ) - # TODO: Look up if this is in the output append if not; update attributes and classifications if it is present. - if target_entity in output: - # Assumes things like name, type name, are consistent - poppable_index = output.index(target_entity) - popped_target = output.pop(poppable_index) - target_entity.merge(popped_target) - - output.append(target_entity) - - if row[source_table_name_header] is not None: - # There is a source table - source_entity = AtlasEntity( - name=row[source_table_name_header], - typeName=row[source_table_type_column], - # qualifiedName can be overwritten via the attributes functionality - qualified_name=row[source_table_name_header], - guid=guid_tracker.get_guid(), - attributes=columns_matching_pattern( - row, excel_config.entity_source_prefix, does_not_match=required_source_headers), - classifications=string_to_classification( - row.get(source_table_classifications_header)) - ) - if source_entity in output: - # Assumes things like name, type name, are consistent - poppable_index = output.index(source_entity) - popped_source = output.pop(poppable_index) - source_entity.merge(popped_source) - - output.append(source_entity) - - # Map the source and target tables to a process - if row[process_name_column] is not None: - # There is a process - process_entity = AtlasProcess( - name=row[process_name_column], - typeName=row[process_type_column], - qualified_name=row[process_name_column], - guid=guid_tracker.get_guid(), - inputs=[] if source_entity is None else [ - source_entity.to_json(minimum=True)], - outputs=[target_entity.to_json(minimum=True)], - attributes=columns_matching_pattern( - row, excel_config.entity_process_prefix, does_not_match=required_process_headers) - ) - # TODO: Lookup if it exists already and if it does, update the inputs and outputs and attributes - if process_entity in output: - # Assumes things like name, type name, are consistent - poppable_index = output.index(process_entity) - popped_process = output.pop(poppable_index) - process_entity.merge(popped_process) - - output.append(process_entity) - - # Return all entities - return output diff --git a/pyapacheatlas/readers/excel.py b/pyapacheatlas/readers/excel.py index 5118e98..3bb0fb9 100644 --- a/pyapacheatlas/readers/excel.py +++ b/pyapacheatlas/readers/excel.py @@ -1,13 +1,12 @@ +from string import ascii_uppercase + +from openpyxl import Workbook from openpyxl import load_workbook -from ..core.util import GuidTracker -from .core import to_column_entities -from .core import to_table_entities -from .core import to_entityDefs -from .core import to_bulkEntities +from .reader import Reader, ReaderConfiguration -class ExcelConfiguration(): +class ExcelConfiguration(ReaderConfiguration): """ A configuration utility to understand how your Excel file is structured. @@ -45,198 +44,277 @@ def __init__(self, column_sheet="ColumnsLineage", represents the transformation for a specific column. """ - super().__init__() + super().__init__(**kwargs) # Required attributes: # qualifiedName, column, transformation, table self.column_sheet = column_sheet self.table_sheet = table_sheet self.entityDef_sheet = entityDef_sheet self.bulkEntity_sheet = bulkEntity_sheet - self.entity_source_prefix = kwargs.get( - "entity_source_prefix", "Source") - self.entity_target_prefix = kwargs.get( - "entity_target_prefix", "Target") - self.entity_process_prefix = kwargs.get( - "entity_process_prefix", "Process") - self.column_transformation_name = kwargs.get( - "column_transformation_name", "Transformation") -def from_excel(filepath, excel_config, atlas_typedefs, use_column_mapping=False): +class ExcelReader(Reader): """ - Wrapper for excel_columnLineage function. To be later used as a wrapper - for the entire excel_* family. - - :param str filepath: The xlsx file that contains your table and columns. - :param excel_config: - An excel configuration object that is customized to - your intended spreadsheet. - :type: :class:`~pyapacheatlas.readers.excel.ExcelConfiguration` - :param dict(str,list(dict)) atlas_typedefs: - The results of requesting all type defs from Apache Atlas, including - entityDefs, relationshipDefs, etc. relationshipDefs are the only - values used. - :param bool use_column_mapping: - Should the table processes include the columnMappings attribute - that represents Column Lineage in Azure Data Catalog. - Defaults to False. - :return: A list of Atlas Entities representing the spreadsheet's inputs as their json dicts. - :rtype: list(dict) + Read in Excel files that follow the excel template tab structure. + Expects an :class:`~pyapacheatlas.readers.excel.ExcelConfiguration` object + to determine the naming conventions of tabs and headers. """ + @staticmethod + def _parse_spreadsheet(worksheet): + """ + Standardizes the excel worksheet into a json format. - output = excel_columnLineage( - filepath, excel_config, atlas_typedefs, use_column_mapping) + :param openpyxl.workbook.Workbook worksheet: + A worksheet class from openpyxl. + :return: The standardized version of the excel spreadsheet in json form. + :rtype: list(dict(str,str)) + """ + # Standardize the column header + # TODO: Allow for skip lines + column_headers = list( + zip( + range(0, len(worksheet[1])), + [str(c.value).strip() for c in worksheet[1]] + ) + ) - return output + output = [] + for row in worksheet.iter_rows(min_row=2, max_row=worksheet.max_row, + min_col=1, max_col=worksheet.max_column): + output.append( + {k: row[idx].value for idx, k in column_headers}) + return output -def excel_columnLineage(filepath, excel_config, atlas_typedefs, use_column_mapping=False): - """ - Read a given excel file that conforms to the excel atlas template and - parse the tables, processes, and columns into table and column lineages. - Requires that the relationship attributes are already defined in the - provided atlas type defs. - - Infers column type from the target table type and an assumed "columns" - relationship attribute on the table type. - - Infers the column lineage process based on the provided table process - (provided in the template's table excel sheet). Looks for the first - relationship type def with an endDef2 of `columnLineages`. - - :param str filepath: The xlsx file that contains your table and columns. - :param excel_config: - An excel configuration object that is customized to - your intended spreadsheet. - :type: :class:`~pyapacheatlas.readers.excel.ExcelConfiguration` - :param dict(str,list(dict)) atlas_typedefs: - The results of requesting all type defs from Apache Atlas, including - entityDefs, relationshipDefs, etc. relationshipDefs are the only - values used. - :param bool use_column_mapping: - Should the table processes include the columnMappings attribute - that represents Column Lineage in Azure Data Catalog. - Defaults to False. - :return: A list of Atlas Entities representing the spreadsheet's inputs as their json dicts. - :rtype: list(dict) - """ + def parse_bulk_entities(self, filepath): + """ + Generate a set of entities from an excel template file. - wb = load_workbook(filepath) + :param str filepath: The xlsx file that contains your table and columns. + :return: An AtlasTypeDef with entityDefs for the provided rows. + :rtype: dict(str, list(dict)) + """ + wb = load_workbook(filepath) + # A user may omit the entityDef_sheet by providing the config with None + if self.config.bulkEntity_sheet and self.config.bulkEntity_sheet not in wb.sheetnames: + raise KeyError("The sheet {} was not found".format( + self.config.entityDef_sheet)) - guid_tracker = GuidTracker(-5000) + output = dict() - entities = [] + if self.config.bulkEntity_sheet: + bulkEntity_sheet = wb[self.config.bulkEntity_sheet] + json_bulkEntities = ExcelReader._parse_spreadsheet( + bulkEntity_sheet) + bulkEntities_generated = super().parse_bulk_entities(json_bulkEntities) + output.update(bulkEntities_generated) - if excel_config.table_sheet not in wb.sheetnames: - raise KeyError("The sheet {} was not found".format( - excel_config.table_sheet)) - if excel_config.column_sheet not in wb.sheetnames: - raise KeyError("The sheet {} was not found".format( - excel_config.column_sheet)) + wb.close() - # Getting table entities - table_sheet = wb[excel_config.table_sheet] - json_sheet = _parse_spreadsheet(table_sheet) - entities.extend(to_table_entities(json_sheet, excel_config, guid_tracker)) + return output - # Getting column entities - column_sheet = wb[excel_config.column_sheet] - json_columns = _parse_spreadsheet(column_sheet) + def parse_entity_defs(self, filepath): + """ + Read a given excel file that conforms to the excel atlas template and + parse the type def tab(s) into a set of entity defs that can be uploaded. - _temp_columns = to_column_entities( - json_columns, excel_config, guid_tracker, entities, atlas_typedefs, use_column_mapping=use_column_mapping) - entities.extend(_temp_columns) + Currently, only entityDefs are supported. - output = [e.to_json() for e in entities] + :param str filepath: The xlsx file that contains your table and columns. + :return: An AtlasTypeDef with entityDefs for the provided rows. + :rtype: dict(str, list(dict)) + """ + wb = load_workbook(filepath) + # A user may omit the entityDef_sheet by providing the config with None + if self.config.entityDef_sheet and self.config.entityDef_sheet not in wb.sheetnames: + raise KeyError("The sheet {} was not found".format( + self.config.entityDef_sheet)) + + output = dict() + + # Getting entityDefinitions if the user provided a name of the sheet + if self.config.entityDef_sheet: + entityDef_sheet = wb[self.config.entityDef_sheet] + json_entitydefs = ExcelReader._parse_spreadsheet(entityDef_sheet) + entityDefs_generated = super().parse_entity_defs(json_entitydefs) + output.update(entityDefs_generated) + + wb.close() + # TODO: Add in classificationDefs and relationshipDefs + return output + + def parse_column_lineage(self, filepath, atlas_entities, atlas_typedefs, use_column_mapping=False): + """ + Read a given excel file that conforms to the excel atlas template and + parse the columns into column lineages. + Requires that the relationship attributes are already defined in the + provided atlas type defs. + + Infers column type from the target table type and an assumed "columns" + relationship attribute on the table type. + + Infers the column lineage process based on the provided table process + (provided in the template's table excel sheet). Looks for the first + relationship type def with an endDef2 of `columnLineages`. + + :param str filepath: The xlsx file that contains your table and columns. + :param list() atlas_entities: + A list of AtlasEntity objects representing + :param dict(str,list(dict)) atlas_typedefs: + The results of requesting all type defs from Apache Atlas, including + entityDefs, relationshipDefs, etc. relationshipDefs are the only + values used. + :param bool use_column_mapping: + Should the table processes include the columnMappings attribute + that represents Column Lineage in Azure Data Catalog. + Defaults to False. + :return: A list of Atlas Entities representing the spreadsheet's inputs as their json dicts. + :rtype: list(:class:`~pyapacheatlas.core.entity.AtlasEntity`) + """ - return output + wb = load_workbook(filepath) + if self.config.column_sheet not in wb.sheetnames: + raise KeyError("The sheet {} was not found".format( + self.config.column_sheet)) -def excel_typeDefs(filepath, excel_config): - """ - Read a given excel file that conforms to the excel atlas template and - parse the type def tab(s) into a set of entity defs that can be uploaded. - - Currently, only entityDefs are supported. - - :param str filepath: The xlsx file that contains your table and columns. - :param excel_config: - An excel configuration object that is customized to - your intended spreadsheet. - :type: :class:`~pyapacheatlas.readers.excel.ExcelConfiguration` - :return: An AtlasTypeDef with entityDefs for the provided rows. - :rtype: dict(str, list(dict)) - """ - wb = load_workbook(filepath) - # A user may omit the entityDef_sheet by providing the config with None - if excel_config.entityDef_sheet and excel_config.entityDef_sheet not in wb.sheetnames: - raise KeyError("The sheet {} was not found".format( - excel_config.entityDef_sheet)) + # Getting column entities + column_sheet = wb[self.config.column_sheet] + json_columns = ExcelReader._parse_spreadsheet(column_sheet) - output = dict() + entities = super().parse_column_lineage( + json_columns, + atlas_entities, + atlas_typedefs, + use_column_mapping=use_column_mapping + ) - # Getting entityDefinitions if the user provided a name of the sheet - if excel_config.entityDef_sheet: - entityDef_sheet = wb[excel_config.entityDef_sheet] - json_entitydefs = _parse_spreadsheet(entityDef_sheet) - entityDefs_generated = to_entityDefs(json_entitydefs) - output.update(entityDefs_generated) + wb.close() - # TODO: Add in classificationDefs and relationshipDefs - return output + return entities + def parse_table_lineage(self, filepath): + """ + Read a given excel file that conforms to the excel atlas template and + parse the tables and processes table lineages. + Requires that the relationship attributes are already defined in the + provided atlas type defs. -def excel_bulkEntities(filepath, excel_config): - """ - Generate a set of entities from an excel template file. - - :param str filepath: The xlsx file that contains your table and columns. - :param excel_config: - An excel configuration object that is customized to - your intended spreadsheet. - :type: :class:`~pyapacheatlas.readers.excel.ExcelConfiguration` - :return: An AtlasTypeDef with entityDefs for the provided rows. - :rtype: dict(str, list(dict)) - """ - wb = load_workbook(filepath) - # A user may omit the entityDef_sheet by providing the config with None - if excel_config.bulkEntity_sheet and excel_config.bulkEntity_sheet not in wb.sheetnames: - raise KeyError("The sheet {} was not found".format( - excel_config.entityDef_sheet)) + :param str filepath: The xlsx file that contains your table and columns. - output = dict() + :return: A list of Atlas Entities representing the spreadsheet's inputs as their json dicts. + :rtype: list(:class:`~pyapacheatlas.core.entity.AtlasEntity`) + """ - if excel_config.bulkEntity_sheet: - bulkEntity_sheet = wb[excel_config.bulkEntity_sheet] - json_bulkEntities = _parse_spreadsheet(bulkEntity_sheet) - bulkEntities_generated = to_bulkEntities(json_bulkEntities) - output.update(bulkEntities_generated) + wb = load_workbook(filepath) - return output + entities = [] + if self.config.table_sheet not in wb.sheetnames: + raise KeyError("The sheet {} was not found".format( + self.config.table_sheet)) -def _parse_spreadsheet(worksheet): - """ - Standardizes the excel worksheet into a json format. + # Getting table entities + table_sheet = wb[self.config.table_sheet] + json_sheet = ExcelReader._parse_spreadsheet(table_sheet) + entities = super().parse_table_lineage(json_sheet) - :param openpyxl.workbook.Workbook worksheet: - A worksheet class from openpyxl. - :return: The standardized version of the excel spreadsheet in json form. - :rtype: list(dict(str,str)) - """ + wb.close() + + return entities - # Standardize the column header - # TODO: Allow for skip lines - column_headers = list( - zip( - range(0, len(worksheet[1])), - [str(c.value).strip() for c in worksheet[1]] + def parse_lineages(self, filepath, atlas_typedefs, use_column_mapping=False): + """ + Read a given excel file that conforms to the excel atlas template and + parse the tables, processes, and columns into table and column lineages. + Requires that the relationship attributes are already defined in the + provided atlas type defs. + + Infers column type from the target table type and an assumed "columns" + relationship attribute on the table type. + + Infers the column lineage process based on the provided table process + (provided in the template's table excel sheet). Looks for the first + relationship type def with an endDef2 of `columnLineages`. + + :param str filepath: The xlsx file that contains your table and columns. + :param dict(str,list(dict)) atlas_typedefs: + The results of requesting all type defs from Apache Atlas, including + entityDefs, relationshipDefs, etc. relationshipDefs are the only + values used. + :param bool use_column_mapping: + Should the table processes include the columnMappings attribute + that represents Column Lineage in Azure Data Catalog. + Defaults to False. + :return: A list of Atlas Entities representing the spreadsheet's inputs as their json dicts. + :rtype: list(dict) + """ + entities = [] + + table_entities = self.parse_table_lineage(filepath) + entities.extend(table_entities) + + # Modifies table_entities if use_column_mapping is True + column_entities = self.parse_column_lineage( + filepath, + table_entities, + atlas_typedefs, + use_column_mapping ) - ) + entities.extend(column_entities) + + output = [e.to_json() for e in entities] + + return output - output = [] - for row in worksheet.iter_rows(min_row=2, max_row=worksheet.max_row, min_col=1, max_col=worksheet.max_column): - output.append( - {k: row[idx].value for idx, k in column_headers}) + @staticmethod + def _update_sheet_headers(headers, worksheet): + """ + For the given worksheet, make the first row equal to the list passed + in as the headers. + + :param list headers: A list of column headers to use for this sheet. + :param worksheet: + The worksheet you are updating. + :type worksheet: + :class:`~openpyxl.worksheet.worksheet.Worksheet` + """ + for idx, val in enumerate(headers): + # TODO: Not the best way once we get past 26 columns in the template + active_column = ascii_uppercase[idx] + active_value = headers[idx] + active_cell = "{}1".format(active_column) + worksheet[active_cell] = active_value + worksheet.column_dimensions[active_column].width = len( + active_value) + + @staticmethod + def make_template(filepath): + """ + Generate an Excel template file and write it out to the given filepath. + + :param str filepath: The file path to store an XLSX file with the + template Tables and Columns sheets. + """ + wb = Workbook() + columnsSheet = wb.active + columnsSheet.title = "ColumnsLineage" + tablesSheet = wb.create_sheet("TablesLineage") + entityDefsSheet = wb.create_sheet("EntityDefs") + bulkEntitiesSheet = wb.create_sheet("BulkEntities") + + ExcelReader._update_sheet_headers( + Reader.TEMPLATE_HEADERS["ColumnsLineage"], columnsSheet + ) + ExcelReader._update_sheet_headers( + Reader.TEMPLATE_HEADERS["TablesLineage"], tablesSheet + ) + ExcelReader._update_sheet_headers( + Reader.TEMPLATE_HEADERS["EntityDefs"], entityDefsSheet + ) + ExcelReader._update_sheet_headers( + Reader.TEMPLATE_HEADERS["BulkEntities"], bulkEntitiesSheet + ) - return output + wb.save(filepath) + wb.close() diff --git a/pyapacheatlas/readers/lineagemixin.py b/pyapacheatlas/readers/lineagemixin.py new file mode 100644 index 0000000..8980634 --- /dev/null +++ b/pyapacheatlas/readers/lineagemixin.py @@ -0,0 +1,380 @@ +import json + +from ..core import AtlasEntity, AtlasProcess +from .util import * + +class LineageMixIn(): + """ + A MixIn to support the :class:`~pyapacheatlas.readers.reader.Reader` with + the table and column level lineages. + """ + def _update_entity_and_array(self, entity, mutableOutput): + """ + Take in an AtlasEntity and list of AtlasEntities. If the entity + exists in the mutableOutput, remove the entity from the list and + merge the incoming entity with the popped entity. Append that + merged entity onto the mutableOutput. + + :param entity: + The entity to look up in the mutableOutput. + :param mutableOutput: + A list of Atlas Entities to search through and update if + `entity` is found. + :type entity: + :class:`~pyapacheatlas.core.entity.AtlasEntity` + :type mutableOutput: + list(:class:`~pyapacheatlas.core.entity.AtlasEntity`) + """ + if entity in mutableOutput: + # Assumes things like name, type name, are consistent + poppable_index = mutableOutput.index(entity) + popped_target = mutableOutput.pop(poppable_index) + # Update the newly created entity with the previous versions + # attributes + entity.merge(popped_target) + mutableOutput.append(entity) + + def _add_table_lineage_entity(self, row, header, attributes, mutableOutput): + """ + Create an AtlasEntity object and add it to the mutableOutput with the + desired attributes. + """ + entity = AtlasEntity( + name=row[header["Table"]], + typeName=row[header["Type"]], + # qualifiedName can be overwritten via the attributes + # functionality + qualified_name=row[header["Table"]], + guid=self.guidTracker.get_guid(), + attributes=attributes, + classifications=string_to_classification( + row.get(header["Classifications"]), + self.config.value_separator + ) + ) + + self._update_entity_and_array(entity, mutableOutput) + + return entity + + def parse_table_lineage(self, json_rows): + """ + Converts the "tables" information into Atlas Entities for Target, + Source, and Process types. Currently only support one target from + one source. + + :param json_rows: + A list of dicts that contain the converted tables of your column + spreadsheet. + :type json_rows: list(dict(str,str)) + :return: + A list of atlas entities that represent your source, target, + and table processes. + :rtype: list(:class:`~pyapacheatlas.core.entity.AtlasEntity`) + + """ + # Required attributes + # NOTE: Classification is not actually required but it's being + # included to avoid being roped in as an attribute + _required_headers = ["Table", "Type", "Classifications"] + source_header = {k: "{} {}".format( + self.config.source_prefix, k) for k in _required_headers} + target_header = {k: "{} {}".format( + self.config.target_prefix, k) for k in _required_headers} + process_header = {k: "{} {}".format( + self.config.process_prefix, k) for k in ["Name", "Type"]} + + # Read in all Source and Target entities + output = list() # TODO: Change to a dict to facilitate lookups + for row in json_rows: + # Set up defaults + target_entity, source_entity, process_entity = None, None, None + + # Always expecting a TARGET in the sheet + target_attributes = columns_matching_pattern( + row, self.config.target_prefix, + does_not_match=list(target_header.values()) + ) + target_entity = self._add_table_lineage_entity( + row, target_header, target_attributes, output + ) + + if row[source_header["Table"]] is not None: + # There is a source table + source_attributes = columns_matching_pattern( + row, self.config.source_prefix, + does_not_match=list(source_header.values()) + ) + source_entity = self._add_table_lineage_entity( + row, source_header, source_attributes, output + ) + + # Map the source and target tables to a process + if row[process_header["Name"]] is not None: + # There is a process + inputs_to_process = [] if source_entity is None else [ + source_entity.to_json(minimum=True)] + process_attributes = columns_matching_pattern( + row, self.config.process_prefix, + does_not_match=list(process_header.values()) + ) + + process_entity = AtlasProcess( + name=row[process_header["Name"]], + typeName=row[process_header["Type"]], + qualified_name=row[process_header["Name"]], + guid=self.guidTracker.get_guid(), + inputs=inputs_to_process, + outputs=[target_entity.to_json(minimum=True)], + attributes=process_attributes + ) + + self._update_entity_and_array(process_entity, output) + + # Return all entities + return output + + def parse_column_lineage(self, json_rows, atlas_entities, atlas_typedefs, use_column_mapping=False): + """ + :param json_rows: + A list of dicts that contain the converted rows of your column + spreadsheet. + :type json_rows: list(dict(str,str)) + :param atlas_entities: + A list of :class:`~pyapacheatlas.core.entity.AtlasEntity` + containing the referred entities. + :type atlas_entities: + list(:class:`~pyapacheatlas.core.entity.AtlasEntity`) + :param dict(str,list(dict)) atlas_typedefs: + The results of requesting all type defs from Apache Atlas, + including entityDefs, relationshipDefs, etc. relationshipDefs + are the only values used. + :param bool use_column_mapping: + Should the table processes include the columnMappings attribute + that represents Column Lineage in Azure Data Catalog. + Defaults to False. + :return: + A list of atlas entities that represent your column source, target, + and column lineage processes. + :rtype: list(:class:`~pyapacheatlas.core.entity.AtlasEntity`) + + """ + # Required attributes + # NOTE: Classification is not actually required but it's being + # included to avoid being roped in as an attribute + _required_headers = ["Table", "Column", "Classifications"] + source_header = {k: "{} {}".format( + self.config.source_prefix, k) for k in _required_headers} + target_header = {k: "{} {}".format( + self.config.target_prefix, k) for k in _required_headers} + process_header = {k: "{} {}".format( + self.config.target_prefix, k) for k in ["Name", "Type"]} + + transformation_column_header = self.config.column_transformation_name + # No required process headers + + output = [] + # `tables` Stores all of the table entities by their name seen in the + # loop for faster lookup + tables = {} + # table_process_guid: + # {"input_table":"","output_table":"","columnMapping":[]} + dataset_mapping = {} + # Caches all of the table processes seen in the loop for faster lookup + table_and_proc_mappings = {} + + for row in json_rows: + # Set up defaults + target_entity, source_entity, process_entity = None, None, None + target_entity_table_name, source_entity_table_name = None, None + # Given the existing table entity in atlas_entities, + # look up the appropriate column type + target_entity_table_name = row[target_header["Table"]] + if target_entity_table_name not in tables: + target_table_entity = first_entity_matching_attribute( + "name", target_entity_table_name, atlas_entities) + tables[target_entity_table_name] = target_table_entity + + columns_relationship = first_relationship_that_matches( + end_def="endDef1", + end_def_type=target_table_entity.typeName, + end_def_name="columns", + relationship_typedefs=atlas_typedefs["relationshipDefs"] + ) + target_col_type = columns_relationship["endDef2"]["type"] + + # There should always be a target + target_entity = AtlasEntity( + name=row[target_header["Column"]], + typeName=target_col_type, + # qualifiedName can be overwritten via the attributes + # functionality + qualified_name=target_entity_table_name + \ + "#" + row[target_header["Column"]], + guid=self.guidTracker.get_guid(), + attributes=columns_matching_pattern( + row, self.config.target_prefix, + does_not_match=list(target_header.values()) + ), + # TODO: Make the relationship name dynamic instead of only table + relationshipAttributes={ + "table": tables[target_entity_table_name].to_json(minimum=True)}, + classifications=string_to_classification( + row.get(target_header["Classifications"])) + ) + if target_entity in output: + poppable_index = output.index(target_entity) + popped_target = output.pop(poppable_index) + target_entity.merge(popped_target) + # Add to outputs + output.append(target_entity) + + # Source Column is optiona in the spreadsheet + if row[source_header["Table"]] is not None: + # Given the existing source table entity in atlas_entities, + # look up the appropriate column type + source_entity_table_name = row[source_header["Table"]] + + if source_entity_table_name not in tables: + source_table_entity = first_entity_matching_attribute( + "name", source_entity_table_name, atlas_entities) + tables[source_entity_table_name] = source_table_entity + + columns_relationship_source = first_relationship_that_matches( + end_def="endDef1", + end_def_type=source_table_entity.typeName, + end_def_name="columns", + relationship_typedefs=atlas_typedefs["relationshipDefs"] + ) + source_col_type = columns_relationship_source["endDef2"]["type"] + + source_entity = AtlasEntity( + name=row[source_header["Column"]], + typeName=source_col_type, + # qualifiedName can be overwritten via the attributes functionality + qualified_name=source_entity_table_name + \ + "#" + row[source_header["Column"]], + guid=self.guidTracker.get_guid(), + attributes=columns_matching_pattern( + row, self.config.source_prefix, does_not_match=list(source_header.values())), + # TODO: Make the relationship name more dynamic instead of hard coding query + relationshipAttributes={ + "table": tables[source_entity_table_name].to_json(minimum=True)}, + classifications=string_to_classification( + row.get(source_header["Classifications"])) + ) + if source_entity in output: + poppable_index = output.index(source_entity) + popped_source = output.pop(poppable_index) + source_entity.merge(popped_source) + # Add to outputs + output.append(source_entity) + + # Given the existing process that with target table and source + # table types, look up the appropriate column_lineage type + # LIMITATION: Prevents you from specifying multiple processes + # for the same input and output tables + try: + table_process = first_process_containing_io( + source_entity_table_name, target_entity_table_name, atlas_entities) + except ValueError as e: + # ValueError means we didn't find anything that matched + # Try using a wildcard search if the source entity is none + # to match to any process that at least includes the target + # entity table + if source_entity_table_name is None: + table_process = first_process_containing_io( + "*", target_entity_table_name, atlas_entities) + else: + raise e + + if table_process.get_name() in table_and_proc_mappings: + process_type = table_and_proc_mappings[table_process.get_name( + )]["column_lineage_type"] + else: + process_type = from_process_lookup_col_lineage( + table_process.get_name(), + atlas_entities, + atlas_typedefs["relationshipDefs"] + ) + table_and_proc_mappings[table_process.get_name()] = { + "column_lineage_type": process_type + } + + # Assuming there is always a Process for adding at least the + # target table + process_attributes = columns_matching_pattern( + row, self.config.process_prefix) + process_attributes.update({"dependencyType": "SIMPLE"}) + if row[transformation_column_header] is not None: + process_attributes.update( + {"dependencyType": "EXPRESSION", + "expression": row[transformation_column_header] + }) + + process_entity = AtlasProcess( + name=table_process.get_name(), + typeName=process_type, + # qualifiedName can be overwritten via the attributes functionality + qualified_name=table_process.get_name() + "@derived_column:{}".format( + target_entity.get_name() + ), + guid=self.guidTracker.get_guid(), + # Assuming always a single output + inputs=[] if source_entity is None else [ + source_entity.to_json(minimum=True)], + outputs=[target_entity.to_json(minimum=True)], + attributes=process_attributes, + # TODO: Make the relationship name more dynamic instead of hard coding query + relationshipAttributes={ + "query": table_process.to_json(minimum=True)} + ) + if process_entity in output: + poppable_index = output.index(process_entity) + popped_process = output.pop(poppable_index) + process_entity.merge(popped_process) + output.append(process_entity) + + if use_column_mapping: + # Handles multiple source columns from multiple source datasets + col_map_source_col = "*" if source_entity is None else source_entity.get_name() + col_map_target_col = target_entity.get_name() + col_map_source_table = row[source_header["Table"]] or "*" + col_map_target_table = row[target_header["Table"]] + hash_key = hash(col_map_source_table + col_map_target_table) + col_map_dict = {"Source": col_map_source_col, + "Sink": col_map_target_col} + data_map_dict = {"Source": col_map_source_table, + "Sink": col_map_target_table} + + if table_process.guid in dataset_mapping: + if hash_key in dataset_mapping[table_process.guid]: + # Hash Key has not been seen before + dataset_mapping[table_process.guid][hash_key]["ColumnMapping"].append( + col_map_dict) + else: + # Hash key has been seen before + dataset_mapping[table_process.guid][hash_key] = { + "ColumnMapping": [col_map_dict], + "DatasetMapping": data_map_dict + } + else: + # This guid has never been seen before + dataset_mapping[table_process.guid] = { + hash_key: { + "ColumnMapping": [col_map_dict], + "DatasetMapping": data_map_dict + } + } + # Update the passed in atlas_entities if we are using column mapping + if use_column_mapping: + for entity in atlas_entities: + if entity.guid in dataset_mapping: + # hash_key: {DSMap:{}, ColumnMapping} + column_mapping_attribute = [ + mappings for mappings in dataset_mapping[entity.guid].values()] + entity.attributes.update( + {"columnMapping": json.dumps(column_mapping_attribute)} + ) + + return output diff --git a/pyapacheatlas/readers/reader.py b/pyapacheatlas/readers/reader.py new file mode 100644 index 0000000..29fa9ef --- /dev/null +++ b/pyapacheatlas/readers/reader.py @@ -0,0 +1,182 @@ +from warnings import warn + +from ..core.util import GuidTracker +from ..core import ( + AtlasAttributeDef, + AtlasEntity, + AtlasProcess, + EntityTypeDef +) + +from .lineagemixin import LineageMixIn +from .util import * + +class ReaderConfiguration(): + """ + A base configuration for the Reader class. Allows you to customize + headers with a source_prefix, target_prefix, and process_prefix for + parsing table and column lineages. + """ + + def __init__(self, **kwargs): + super().__init__() + self.value_separator = kwargs.get('value_separator', ';') + self.source_prefix = kwargs.get( + "source_prefix", "Source") + self.target_prefix = kwargs.get( + "target_prefix", "Target") + self.process_prefix = kwargs.get( + "process_prefix", "Process") + self.column_transformation_name = kwargs.get( + "column_transformation_name", "Transformation") + + +class Reader(LineageMixIn): + """ + The base Reader with functionality that supports python dicts. + """ + TEMPLATE_HEADERS = { + "ColumnsLineage": [ + "Target Table", "Target Column", "Target Classifications", + "Source Table", "Source Column", "Source Classifications", + "Transformation" + ], + "TablesLineage": [ + "Target Table", "Target Type", "Target Classifications", + "Source Table", "Source Type", "Source Classifications", + "Process Name", "Process Type" + ], + "EntityDefs": [ + "Entity TypeName", "name", "description", + "isOptional", "isUnique", "defaultValue", + "typeName", "displayName", "valuesMinCount", + "valuesMaxCount", "cardinality", "includeInNotification", + "indexType", "isIndexable" + ], + "BulkEntities": [ + "typeName", "name", "qualifiedName", "classifications" + ] + } + + def __init__(self, configuration, guid=-1000): + """ + Creates the base Reader with functionality that supports python dicts. + + :param configuration: + A list of dicts containing at least `Entity TypeName` and `name` + :type configuration: + :class:`~pyapacheatlas.readers.reader.ReaderConfiguration` + :param int guid: + A negative integer to use as the starting counter for entities + created by this reader. + """ + super().__init__() + self.config = configuration + self.guidTracker = GuidTracker(guid) + + def parse_bulk_entities(self, json_rows): + """ + Create an AtlasTypeDef consisting of entities and their attributes for the given json_rows. + + :param list(dict(str,str)) json_rows: + A list of dicts containing at least `Entity TypeName` and `name` + that represents the metadata for a given entity type's attributeDefs. + Extra metadata will be ignored. + :return: An AtlasTypeDef with entityDefs for the provided rows. + :rtype: dict(str, list(dict)) + """ + # For each row, + # Extract the + # Extract any additional attributes + req_attribs = ["typeName", "name", "qualifiedName", "classifications"] + output = {"entities": []} + for row in json_rows: + # Remove any cell with a None / Null attribute + # Remove the required attributes so they're not double dipping. + custom_attributes = { + k: v for k, v in row.items() + if k not in req_attribs and v is not None + } + entity = AtlasEntity( + name=row["name"], + typeName=row["typeName"], + qualified_name=row["qualifiedName"], + guid=self.guidTracker.get_guid(), + attributes=custom_attributes, + classifications=string_to_classification( + row["classifications"], + sep=self.config.value_separator + ) + ).to_json() + output["entities"].append(entity) + return output + + def parse_entity_defs(self, json_rows): + """ + Create an AtlasTypeDef consisting of entityDefs for the given json_rows. + + :param list(dict(str,str)) json_rows: + A list of dicts containing at least `Entity TypeName` and `name` + that represents the metadata for a given entity type's attributeDefs. + Extra metadata will be ignored. + :return: An AtlasTypeDef with entityDefs for the provided rows. + :rtype: dict(str, list(dict)) + """ + entities = dict() + attribute_metadata_seen = set() + output = {"entityDefs": []} + # Required attributes + # Get all the attributes it's expecting official camel casing + # with the exception of "Entity TypeName" + for row in json_rows: + try: + entityTypeName = row["Entity TypeName"] + except KeyError: + raise KeyError("Entity TypeName not foud in {}".format(row)) + + _ = row.pop("Entity TypeName") + # Update all seen attribute metadata + columns_in_row = list(row.keys()) + attribute_metadata_seen = attribute_metadata_seen.union( + set(columns_in_row)) + # Remove any null cells, otherwise the AttributeDefs constructor + # doesn't use the defaults. + for column in columns_in_row: + if row[column] is None: + _ = row.pop(column) + + json_entity_def = AtlasAttributeDef(**row).to_json() + + if entityTypeName not in entities: + entities[entityTypeName] = [] + + entities[entityTypeName].append(json_entity_def) + + # Create the entitydefs + for entityType in entities: + local_entity_def = EntityTypeDef( + name=entityType, + attributeDefs=entities[entityType] + ).to_json() + output["entityDefs"].append(local_entity_def) + + # Extra attribute metadata (e.g. extra columns / json entries) are ignored. + # Warn the user that this metadata will be ignored. + extra_metadata_warnings = [ + i for i in attribute_metadata_seen if i not in AtlasAttributeDef.propertiesEnum] + for extra_metadata in extra_metadata_warnings: + warn(("The attribute metadata \"{}\" is not a part of the Atlas" + + " Attribute Def and will be ignored.").format( + extra_metadata)) + + return output + + + @staticmethod + def make_template(): + """ + Generate a template for the given reader. + """ + raise NotImplementedError + + \ No newline at end of file diff --git a/pyapacheatlas/readers/util.py b/pyapacheatlas/readers/util.py index be15158..726c76e 100644 --- a/pyapacheatlas/readers/util.py +++ b/pyapacheatlas/readers/util.py @@ -1,3 +1,21 @@ + +def string_to_classification(string, sep=";"): + """ + Converts a string of text into classifications. + + :param str string: The string that contains one or more classifications. + :param str sep: The separator to split the `string` parameter on. + :return: A list of AtlasClassification objects as dicts. + :rtype: list(dict) + """ + if string is None: + return [] + # TODO: How do we bring in attributes if they're required? + results = [{"typeName": s.strip(), "attributes": {}} + for s in string.split(sep) if s.strip() != ""] + return results + + def columns_matching_pattern(row, starts_with, does_not_match=[]): """ Takes in a json "row" and filters the keys to match the `starts_with` @@ -18,7 +36,7 @@ def columns_matching_pattern(row, starts_with, does_not_match=[]): if bad_key in candidates: candidates.pop(bad_key) candidates = {k[len(starts_with):].strip(): v for k, - v in candidates.items()} + v in candidates.items()} return candidates @@ -120,13 +138,13 @@ def first_process_containing_io(input_name, output_name, atlas_entities): ((input_name is None) and (num_inputs == 0)) or ((input_name is not None) and (num_inputs > 0) and (any([e["qualifiedName"] == input_name for e in entity.get_inputs()])) - ) + ) ) output_matches = ( ((output_name is None) and (num_outputs == 0)) or ((output_name is not None) and (num_outputs > 0) and (any([e["qualifiedName"] == output_name for e in entity.get_outputs()])) - ) + ) ) if input_matches and output_matches: output_entity = entity @@ -174,20 +192,3 @@ def from_process_lookup_col_lineage(process_name, atlas_entities, relationship_t column_lineage_type = lineage_relationship["endDef1"]["type"] return column_lineage_type - - -def string_to_classification(string, sep=";"): - """ - Converts a string of text into classifications. - - :param str string: The string that contains one or more classifications. - :param str sep: The separator to split the `string` parameter on. - :return: A list of AtlasClassification objects as dicts. - :rtype: list(dict) - """ - if string is None: - return [] - # TODO: How do we bring in attributes if they're required? - results = [{"typeName": s.strip(), "attributes": {}} - for s in string.split(sep) if s.strip() != ""] - return results diff --git a/pyapacheatlas/scaffolding/templates/__init__.py b/pyapacheatlas/scaffolding/templates/__init__.py deleted file mode 100644 index 400d7f9..0000000 --- a/pyapacheatlas/scaffolding/templates/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .excel import excel_template diff --git a/pyapacheatlas/scaffolding/templates/excel.py b/pyapacheatlas/scaffolding/templates/excel.py deleted file mode 100644 index acac1da..0000000 --- a/pyapacheatlas/scaffolding/templates/excel.py +++ /dev/null @@ -1,66 +0,0 @@ -from openpyxl import Workbook -from string import ascii_uppercase - -COLUMN_TEMPLATE = [ - "Target Table", "Target Column", "Target Classifications", - "Source Table", "Source Column", "Source Classifications", - "Transformation" -] -TABLE_TEMPLATE = [ - "Target Table", "Target Type", "Target Classifications", - "Source Table", "Source Type", "Source Classifications", - "Process Name", "Process Type" -] -ENTITYDEF_TEMPLATE = [ - "Entity TypeName", "name", "description", - "isOptional", "isUnique", "defaultValue", - "typeName", "displayName", "valuesMinCount", - "valuesMaxCount", "cardinality", "includeInNotification", - "indexType", "isIndexable" -] - -BULKENTITY_TEMPLATE = [ - "typeName", "name", "qualifiedName", "classifications" -] - -def _update_sheet_headers(headers, worksheet): - """ - For the given worksheet, make the first row equal to the list passed - in as the headers. - - :param list headers: A list of column headers to use for this sheet. - :param worksheet: - The worksheet you are updating. - :type worksheet: - :class:`~openpyxl.worksheet.worksheet.Worksheet` - """ - for idx, val in enumerate(headers): - # TODO: Not the best way once we get past 26 columns in the template - active_column = ascii_uppercase[idx] - active_value = headers[idx] - active_cell = "{}1".format(active_column) - worksheet[active_cell] = active_value - worksheet.column_dimensions[active_column].width = len(active_value) - - -def excel_template(filepath): - """ - Generate an Excel template file and write it out to the given filepath. - - :param str filepath: The file path to store an XLSX file with the - template Tables and Columns sheets. - """ - wb = Workbook() - columnsSheet = wb.active - columnsSheet.title = "ColumnsLineage" - tablesSheet = wb.create_sheet("TablesLineage") - entityDefsSheet = wb.create_sheet("EntityDefs") - bulkEntitiesSheet = wb.create_sheet("BulkEntities") - - _update_sheet_headers(COLUMN_TEMPLATE, columnsSheet) - _update_sheet_headers(TABLE_TEMPLATE, tablesSheet) - _update_sheet_headers(ENTITYDEF_TEMPLATE, entityDefsSheet) - _update_sheet_headers(BULKENTITY_TEMPLATE, bulkEntitiesSheet) - - wb.save(filepath) - wb.close() diff --git a/samples/create_templates.py b/samples/create_templates.py index 940ff44..aaebb45 100644 --- a/samples/create_templates.py +++ b/samples/create_templates.py @@ -3,28 +3,31 @@ import sys # PyApacheAtlas packages -from pyapacheatlas.scaffolding import column_lineage_scaffold # Create dummy types -from pyapacheatlas.scaffolding.templates import excel_template # Create the excel template file to be populated +from pyapacheatlas.scaffolding import column_lineage_scaffold # Create dummy types +# Create the excel template file to be populated +from pyapacheatlas.readers import ExcelReader if __name__ == "__main__": """ Generates the demo scaffolding and excel template file. """ if len(sys.argv) == 2: - column_map_switch = True if "YES".startswith(sys.argv[1].upper()) else False + column_map_switch = True if "YES".startswith( + sys.argv[1].upper()) else False print("INFO: Using column mapping on the table lineage processes") else: column_map_switch = False print("INFO: NOT using column mapping on the table lineage processes") - + # Create the demo scaffolding print("Creating the scaffolding json file") - scaffold = column_lineage_scaffold("demo", use_column_mapping=column_map_switch) + scaffold = column_lineage_scaffold( + "demo", use_column_mapping=column_map_switch) with open("./demo_scaffold.json", 'w') as fp: fp.write( json.dumps(scaffold, indent=1) ) - + # Create the excel template file print("Creating the excel template file") - excel_template("./demo_excel_template.xlsx") + ExcelReader.make_template("./demo_excel_template.xlsx") diff --git a/samples/end_to_end_excel_sample.py b/samples/end_to_end_excel_sample.py index 4375768..fb4d117 100644 --- a/samples/end_to_end_excel_sample.py +++ b/samples/end_to_end_excel_sample.py @@ -10,12 +10,9 @@ from pyapacheatlas.auth import ServicePrincipalAuthentication from pyapacheatlas.core import AtlasClient # Communicate with your Atlas server from pyapacheatlas.scaffolding import column_lineage_scaffold # Create dummy types -# Create the excel template file to be populated -from pyapacheatlas.scaffolding.templates import excel_template # Read in the populated excel file. -from pyapacheatlas.readers import from_excel # Customize header prefixes (e.g. "Sink" rather than "Target") and sheet names -from pyapacheatlas.readers.excel import ExcelConfiguration +from pyapacheatlas.readers import ExcelConfiguration, ExcelReader from pyapacheatlas.core.whatif import WhatIfValidator # To do what if analysis if __name__ == "__main__": @@ -39,7 +36,9 @@ # Create an empty excel template to be populated file_path = "./atlas_excel_template.xlsx" excel_config = ExcelConfiguration() - excel_template(file_path) + excel_reader = ExcelReader(excel_config) + + excel_reader.make_template(file_path) wb = load_workbook(file_path) table_sheet = wb[excel_config.table_sheet] @@ -125,15 +124,18 @@ # Upload scaffolded type defs and view the results of upload _upload_typedef = client.upload_typedefs( - atlas_type_defs, + atlas_type_defs, force_update=False ) - print(json.dumps(_upload_typedef,indent=2)) + print(json.dumps(_upload_typedef, indent=2)) # Instantiate some required objects and generate the atlas entities! - excel_results = from_excel( - file_path, excel_config, atlas_type_defs, use_column_mapping=True) + excel_results = excel_reader.parse_lineages( + file_path, + atlas_type_defs, + use_column_mapping=True + ) print("Results from excel transformation") print(json.dumps(excel_results, indent=2)) diff --git a/samples/existing_excel_and_types_validate.py b/samples/existing_excel_and_types_validate.py index 3def3af..83b577e 100644 --- a/samples/existing_excel_and_types_validate.py +++ b/samples/existing_excel_and_types_validate.py @@ -1,8 +1,7 @@ import json import sys -from pyapacheatlas.readers import from_excel -from pyapacheatlas.readers.excel import ExcelConfiguration +from pyapacheatlas.readers import ExcelConfiguration, ExcelReader from pyapacheatlas.core.whatif import WhatIfValidator if __name__ == "__main__": @@ -12,26 +11,34 @@ produces the batch of results and provides the what if validation. """ if len(sys.argv) != 3: - raise ValueError("ERROR: There should be an excel_file_path and type_def_path provided on the CLI") + raise ValueError( + "ERROR: There should be an excel_file_path and type_def_path provided on the CLI") excel_path = sys.argv[1] if excel_path is None: - raise ValueError("No excel file path was provided on the command line.") + raise ValueError( + "No excel file path was provided on the command line.") json_path = sys.argv[2] if json_path is None: - raise ValueError("No type definition file path was provided on the command line.") - + raise ValueError( + "No type definition file path was provided on the command line.") + with open(json_path, 'r') as fp: type_defs = json.load(fp) excel_config = ExcelConfiguration() + excel_reader = ExcelReader(excel_config) whatif = WhatIfValidator(type_defs=type_defs) - results = from_excel(excel_path, excel_config, type_defs, use_column_mapping=True) + results = excel_reader.parse_lineages( + excel_path, + type_defs, + use_column_mapping=True + ) report = whatif.validate_entities(results) print("===REPORT===") - print(json.dumps(report,indent=2)) + print(json.dumps(report, indent=2)) print("===EXCEL RESULTS===") - print(json.dumps(results,indent=2)) + print(json.dumps(results, indent=2)) diff --git a/samples/lineage_processes_typedefs.py b/samples/lineage_processes_typedefs.py index 37ffef4..6c73ff7 100644 --- a/samples/lineage_processes_typedefs.py +++ b/samples/lineage_processes_typedefs.py @@ -1,6 +1,6 @@ import json -from pyapacheatlas.core.typedef import EntityTypeDef, RelationshipTypeDef +from pyapacheatlas.core import EntityTypeDef, RelationshipTypeDef if __name__ == "__main__": @@ -8,7 +8,7 @@ This sample provides an example of generating the bare minimum table and columns scaffolding with a relationship definition between the table and column types. - """ + """ datasource = "generic" src_table_columns_typeName = "{}_table_columns".format(datasource) @@ -18,30 +18,30 @@ name="{}_column_lineage".format(datasource), superTypes=["Process"], attributeDefs=( - [ - { - "name": "dependencyType", - "typeName": "string", - "isOptional": False, - "cardinality": "SINGLE", - "valuesMinCount": 1, - "valuesMaxCount": 1, - "isUnique": False, - "isIndexable": False, - "includeInNotification": False - }, - { - "name": "expression", - "typeName": "string", - "isOptional": True, - "cardinality": "SINGLE", - "valuesMinCount": 0, - "valuesMaxCount": 1, - "isUnique": False, - "isIndexable": False, - "includeInNotification": False - } - ] + [ + { + "name": "dependencyType", + "typeName": "string", + "isOptional": False, + "cardinality": "SINGLE", + "valuesMinCount": 1, + "valuesMaxCount": 1, + "isUnique": False, + "isIndexable": False, + "includeInNotification": False + }, + { + "name": "expression", + "typeName": "string", + "isOptional": True, + "cardinality": "SINGLE", + "valuesMinCount": 0, + "valuesMaxCount": 1, + "isUnique": False, + "isIndexable": False, + "includeInNotification": False + } + ] ) ) @@ -49,14 +49,14 @@ table_process_entity = EntityTypeDef( name="{}_process".format(datasource), superTypes=["Process"], - attributeDefs = [ + attributeDefs=[ { - "name": "columnMapping", - "typeName": "string", - "cardinality": "SINGLE", - "isIndexable": False, - "isOptional": True, - "isUnique": False + "name": "columnMapping", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": False, + "isOptional": True, + "isUnique": False } ] ) @@ -64,15 +64,15 @@ # Define {datasource}_process_column_lineage table_process_column_lineage_relationship = RelationshipTypeDef( name="{}_process_column_lineage".format(datasource), - relationshipCategory = "COMPOSITION", - endDef1 = { + relationshipCategory="COMPOSITION", + endDef1={ "type": column_lineage_process_entity.name, "name": "query", "isContainer": False, "cardinality": "SINGLE", "isLegacyAttribute": True - }, - endDef2 = { + }, + endDef2={ "type": table_process_entity.name, "name": "columnLineages", "isContainer": True, @@ -81,15 +81,14 @@ } ) - # Output composite entity output = { - "entityDefs":[ + "entityDefs": [ column_lineage_process_entity.to_json(), table_process_entity.to_json() ], - "relationshipDefs":[ + "relationshipDefs": [ table_process_column_lineage_relationship.to_json() ] } - print(json.dumps(output)) \ No newline at end of file + print(json.dumps(output, indent=2)) diff --git a/samples/table_columns.py b/samples/table_columns.py index f15c200..8446e5c 100644 --- a/samples/table_columns.py +++ b/samples/table_columns.py @@ -1,6 +1,6 @@ import json -from pyapacheatlas.core.typedef import EntityTypeDef, RelationshipTypeDef +from pyapacheatlas.core import EntityTypeDef, RelationshipTypeDef if __name__ == "__main__": @@ -55,4 +55,4 @@ table_column_relationship.to_json(), ] } - print(json.dumps(output)) \ No newline at end of file + print(json.dumps(output, indent=2)) \ No newline at end of file diff --git a/tests/readers/core/test_entitydefs.py b/tests/readers/core/test_entitydefs.py deleted file mode 100644 index 63e447f..0000000 --- a/tests/readers/core/test_entitydefs.py +++ /dev/null @@ -1,61 +0,0 @@ -import warnings -import pytest - -from pyapacheatlas.core.typedef import AtlasAttributeDef -from pyapacheatlas.readers.core.entitydef import to_entityDefs - -def test_entityDefs(): - # All attribute keys should be converted to camel case except "Entity TypeName" - inputData = [ - {"Entity TypeName":"generic", "name":"attrib1", "description":"desc1", - "isOptional":"True", "isUnique":"False", "defaultValue":None}, - {"Entity TypeName":"generic", "name":"attrib2", "description":"desc2", - "isOptional":"True", "isUnique":"False", "defaultValue":None, - "cardinality":"SINGLE"}, - {"Entity TypeName":"demo", "name":"attrib3", "description":"desc3", - "isOptional":"False", "isUnique":"False","cardinality":"SET"} - ] - - output = to_entityDefs(inputData) - # It is an AtlasTypesDef composite wrapper - assert("entityDefs" in output.keys()) - # There are two entity typenames specified so there should be only two entityDefs - assert (len(output["entityDefs"]) == 2) - - genericEntityDef = None - demoEntityDef = None - - for entityDef in output["entityDefs"]: - if entityDef["name"] == "generic": - genericEntityDef = entityDef - elif entityDef["name"] == "demo": - demoEntityDef = entityDef - - # Generic has two attributes - assert(len(genericEntityDef["attributeDefs"]) == 2) - - # Demo has one attribute - assert(len(demoEntityDef["attributeDefs"]) == 1) - - assert( - demoEntityDef["attributeDefs"][0] == AtlasAttributeDef( - name="attrib3", **{"description":"desc3","isOptional":"False", - "isUnique":"False","cardinality":"SET"} - ).to_json() - ) - - - -def test_entityDefs_warns_with_extra_params(): - # All attribute keys should be converted to camel case except "Entity TypeName" - inputData = [ - {"Entity TypeName":"generic", "name":"attrib1", "description":"desc1", - "isOptional":"True", "isUnique":"False", "defaultValue":None}, - {"Entity TypeName":"generic", "name":"attrib2", "description":"desc2", - "isOptional":"True", "isUnique":"False", "defaultValue":None, - "cardinality":"SINGLE","randomAttrib":"foobar"} - ] - - # Assert that a UserWarning occurs when adding an extra attribute - pytest.warns(UserWarning, to_entityDefs, **{"json_rows":inputData}) - \ No newline at end of file diff --git a/tests/readers/core/test_table_column.py b/tests/readers/core/test_table_column.py deleted file mode 100644 index 7a2a0f0..0000000 --- a/tests/readers/core/test_table_column.py +++ /dev/null @@ -1,299 +0,0 @@ -import json - -from pyapacheatlas.core import AtlasProcess -from pyapacheatlas.core.util import GuidTracker -from pyapacheatlas.readers.util import * -from pyapacheatlas.readers.core import ( - to_column_entities, - to_table_entities -) -from pyapacheatlas.readers.excel import ExcelConfiguration - -# Set up some cross-test objects and functions -EXCEL_CONFIG = ExcelConfiguration() - -def setupto_column_entities(): - json_tables = [ - { - "Target Table":"table1", "Target Type": "demo_table", - "Source Table":"table0", "Source Type": "demo_table", - "Process Name":"proc01", "Process Type": "demo_process" - } - ] - - json_columns = [ - { - "Target Column":"col1","Target Table": "table1", - "Source Column":"col0","Source Table": "table0", - "Transformation":None - } - ] - - atlas_typedefs = {"entityDefs":[ - {"typeName":"demo_table","relationshipAttributeDefs":[{"relationshipTypeName":"demo_table_columns","name":"columns","typeName":"array"}]}, - {"typeName":"demo_process","relationshipAttributeDefs":[{"relationshipTypeName":"demo_process_column_lineage","name":"columnLineages","typeName":"array"}]} - ], - "relationshipDefs":[ - {"name": "demo_table_columns","endDef1": {"type": "demo_table","name": "columns"}, - "endDef2": {"type": "demo_column","name": "table"} - }, - {"name": "demo_process_column_lineage","endDef1": {"type": "demo_column_lineage","name": "query"}, - "endDef2": {"type": "demo_process","name": "columnLineages"} - } - ] - } - return json_tables, json_columns, atlas_typedefs - - -# Begin actual tests -def test_to_table_entities(): - guid_tracker = GuidTracker(-1000) - json_rows = [ - { - "Target Table":"table1", "Target Type": "demo_type", - "Source Table":"table0", "Source Type": "demo_type2", - "Process Name":"proc01", "Process Type": "proc_type" - } - ] - - results = to_table_entities(json_rows, EXCEL_CONFIG, guid_tracker) - - assert(results[0].to_json(minimum = True) == {"typeName":"demo_type", "guid":-1001, "qualifiedName": "table1"}) - assert(results[1].to_json(minimum = True) == {"typeName":"demo_type2", "guid":-1002, "qualifiedName": "table0"}) - assert(results[2].to_json(minimum = True) == {"typeName":"proc_type", "guid":-1003, "qualifiedName": "proc01"}) - - -def test_to_table_entities_with_attributes(): - guid_tracker = GuidTracker(-1000) - json_rows = [ - { - "Target Table":"table1", "Target Type": "demo_type","Target data_type":"str", - "Source Table":"table0", "Source Type": "demo_type2","Source foo":"bar", - "Process Name":"proc01", "Process Type": "proc_type", "Process fizz":"buzz" - } - ] - - results = to_table_entities(json_rows, EXCEL_CONFIG, guid_tracker) - - assert(results[0].attributes["data_type"] == "str") - assert(results[1].attributes["foo"] == "bar") - assert(results[2].attributes["fizz"] == "buzz") - - -def test_to_table_entities_multiple_inputs(): - guid_tracker = GuidTracker(-1000) - json_tables = [ - { - "Target Table":"table1", "Target Type": "demo_type", - "Source Table":"table0", "Source Type": "demo_type", - "Process Name":"proc01", "Process Type": "proc_type" - }, - { - "Target Table":"table1", "Target Type": "demo_type", - "Source Table":"tableB", "Source Type": "demo_type", - "Process Name":"proc01", "Process Type": "proc_type" - } - ] - - results = to_table_entities(json_tables, EXCEL_CONFIG, guid_tracker) - - assert(len(results) == 4) - assert(results[3].to_json(minimum = True) == {"typeName":"proc_type", "guid":-1003, "qualifiedName": "proc01"}) - process_inputs_qualified_names = [p["qualifiedName"] for p in results[3].get_inputs()] - process_outputs_qualified_names = [p["qualifiedName"] for p in results[3].get_outputs()] - assert(len(process_inputs_qualified_names) == 2) - assert(len(process_outputs_qualified_names) == 1) - - assert(set(process_inputs_qualified_names) == set(["table0","tableB"])) - assert(set(process_outputs_qualified_names) == set(["table1"])) - - - - -def test_to_column_entities(): - guid_tracker = GuidTracker(-1000) - - json_tables, json_columns, atlas_typedefs = setupto_column_entities() - - # Outputs -1003 as the last guid - tables_and_processes = to_table_entities(json_tables, EXCEL_CONFIG, guid_tracker) - - results = to_column_entities(json_columns, EXCEL_CONFIG, guid_tracker, tables_and_processes, atlas_typedefs) - - # Two column entities - # One process entity - target_col_entity = results[0].to_json() - source_col_entity = results[1].to_json() - col_lineage_entity = results[2].to_json() - - assert(target_col_entity["typeName"] == "demo_column") - assert(target_col_entity["relationshipAttributes"]["table"]["typeName"] == "demo_table") - assert(source_col_entity["typeName"] == "demo_column") - assert(source_col_entity["relationshipAttributes"]["table"]["typeName"] == "demo_table") - assert(col_lineage_entity["typeName"] == "demo_column_lineage") - - for entity in col_lineage_entity["attributes"]["inputs"] + col_lineage_entity["attributes"]["outputs"]: - assert(entity["typeName"] == "demo_column") - - # Check that this points to the correct table process with a (default) query reference in relationshipAttribs - proc_relationship_query_is_demo_process = False - assert("query" in col_lineage_entity["relationshipAttributes"]) - if "query" in col_lineage_entity["relationshipAttributes"]: - proc_relationship_query_is_demo_process = col_lineage_entity["relationshipAttributes"]["query"]["typeName"] == "demo_process" - assert(proc_relationship_query_is_demo_process) - - - -def test_to_column_entities_with_attributes(): - guid_tracker = GuidTracker(-1000) - - json_tables, json_columns, atlas_typedefs = setupto_column_entities() - - # Update target to include an attribute - json_columns[0].update({"Target test_attrib1":"value", "Target test_attrib2":"value2", "Source foo":"bar"}) - - # Outputs -1003 as the last guid - tables_and_processes = to_table_entities(json_tables, EXCEL_CONFIG, guid_tracker) - - results = to_column_entities(json_columns, EXCEL_CONFIG, guid_tracker, tables_and_processes, atlas_typedefs) - - # Two column entities - # One process entity - target_col_entity = results[0] - source_col_entity = results[1] - col_lineage_entity = results[2] - - assert(target_col_entity.attributes["test_attrib1"] == "value") - assert(target_col_entity.attributes["test_attrib2"] == "value2") - assert(source_col_entity.attributes["foo"] == "bar") - -def test_to_column_entities_with_classifications(): - guid_tracker = GuidTracker(-1000) - - json_tables, json_columns, atlas_typedefs = setupto_column_entities() - - # Update target to include a classification - json_columns[0].update({"Target Classifications":"CustomerInfo; PII", "Source Classifications":""}) - - # Outputs -1003 as the last guid - tables_and_processes = to_table_entities(json_tables, EXCEL_CONFIG, guid_tracker) - - results = to_column_entities(json_columns, EXCEL_CONFIG, guid_tracker, tables_and_processes, atlas_typedefs) - - # Two column entities - # One process entity - target_col_entity = results[0] - source_col_entity = results[1] - col_lineage_entity = results[2] - - assert(len(target_col_entity.classifications) == 2) - assert({"typeName":"CustomerInfo","attributes":{}} in target_col_entity.classifications) - assert({"typeName":"PII","attributes":{}} in target_col_entity.classifications) - assert(len(source_col_entity.classifications) == 0) - - -def test_to_column_entities_with_attributes(): - guid_tracker = GuidTracker(-1000) - - json_tables, json_columns, atlas_typedefs = setupto_column_entities() - - # Update target to include an attribute - json_columns[0].update({"Target test_attrib1":"value", "Target test_attrib2":"value2", "Source foo":"bar"}) - - # Outputs -1003 as the last guid - tables_and_processes = to_table_entities(json_tables, EXCEL_CONFIG, guid_tracker) - - results = to_column_entities(json_columns, EXCEL_CONFIG, guid_tracker, tables_and_processes, atlas_typedefs) - - # Two column entities - # One process entity - target_col_entity = results[0] - source_col_entity = results[1] - col_lineage_entity = results[2] - - assert(target_col_entity.attributes["test_attrib1"] == "value") - assert(target_col_entity.attributes["test_attrib2"] == "value2") - assert(source_col_entity.attributes["foo"] == "bar") - -def test_to_column_entities_with_columnMapping(): - guid_tracker = GuidTracker(-1000) - expected_obj = [ - {"ColumnMapping":[{"Source":"col0","Sink":"col1"}, {"Source":"col90","Sink":"col99"}], - "DatasetMapping":{"Source":"table0", "Sink":"table1"} - } - ] - expected = json.dumps(expected_obj)# "[{\"ColumnMapping\": [{\"Source\": \"col0\", \"Sink\": \"col1\"}], \"DatasetMapping\": {\"Source\": \"table0\", \"Sink\": \"table1\"}}]" - - json_tables, json_columns, atlas_typedefs = setupto_column_entities() - - json_columns.append({ - "Target Column":"col99","Target Table": "table1", - "Source Column":"col90","Source Table": "table0", - "Transformation":"col90 + 1" - } - ) - - # Outputs -1003 as the last guid - tables_and_processes = to_table_entities(json_tables, EXCEL_CONFIG, guid_tracker) - - results = to_column_entities(json_columns, EXCEL_CONFIG, guid_tracker, tables_and_processes, atlas_typedefs, use_column_mapping=True) - - # Demonstrating column lineage - assert("columnMapping" in tables_and_processes[2].attributes) - assert(tables_and_processes[2].attributes["columnMapping"] == expected) - - -def test_to_column_entities_when_multi_tabled_inputs(): - guid_tracker = GuidTracker(-1000) - json_tables, json_columns, atlas_typedefs = setupto_column_entities() - # Adding in an extra table - json_tables.append( - { - "Target Table":"table1", "Target Type": "demo_table", - "Source Table":"tableB", "Source Type": "demo_table", - "Process Name":"proc01", "Process Type": "demo_process" - } - ) - json_columns[0].update({"Transformation":"colB + col0"}) - # Adding in an extra column - json_columns.append( - { - "Target Column":"col1","Target Table": "table1", - "Source Column":"colB","Source Table": "tableB", - "Transformation":"colB + col0" - } - ) - expected_col_map_obj = [ - {"ColumnMapping":[{"Source":"col0","Sink":"col1"}], - "DatasetMapping":{"Source":"table0", "Sink":"table1"} - }, - {"ColumnMapping":[{"Source":"colB","Sink":"col1"}], - "DatasetMapping":{"Source":"tableB", "Sink":"table1"} - } - ] - - table_entities = to_table_entities(json_tables,EXCEL_CONFIG, guid_tracker) - column_entities = to_column_entities(json_columns, EXCEL_CONFIG, guid_tracker, - table_entities, atlas_typedefs, use_column_mapping=True) - - # Three columns and one process entity - assert(len(column_entities) == 4) - process_entities = [e for e in column_entities if isinstance(e, AtlasProcess)] - assert(len(process_entities) == 1) - process_entity = process_entities[0] - - process_inputs_qualified_names = [p["qualifiedName"] for p in process_entity.get_inputs()] - process_outputs_qualified_names = [p["qualifiedName"] for p in process_entity.get_outputs()] - assert(len(process_inputs_qualified_names) == 2) - assert(len(process_outputs_qualified_names) == 1) - - assert(set(process_inputs_qualified_names) == set(["table0#col0","tableB#colB"])) - assert(set(process_outputs_qualified_names) == set(["table1#col1"])) - - table_process_entities = [e for e in table_entities if isinstance(e, AtlasProcess)] - table_process_entity = table_process_entities[0] - # Should now contain the expected column Mappings - assert("columnMapping" in table_process_entity.attributes) - resulting_colmap = json.loads(table_process_entity.attributes["columnMapping"]) - assert(len(expected_col_map_obj) == len(resulting_colmap)) - assert(all([res in expected_col_map_obj for res in resulting_colmap])) \ No newline at end of file diff --git a/tests/readers/test_excel.py b/tests/readers/test_excel.py index 17ddf8a..2ffa935 100644 --- a/tests/readers/test_excel.py +++ b/tests/readers/test_excel.py @@ -1,3 +1,4 @@ + import json import os @@ -5,26 +6,36 @@ from openpyxl import Workbook from openpyxl import load_workbook -from pyapacheatlas.scaffolding.templates.excel import ( - excel_template, - ENTITYDEF_TEMPLATE, - BULKENTITY_TEMPLATE -) -from pyapacheatlas.readers.excel import ( - ExcelConfiguration, - excel_bulkEntities, - excel_typeDefs -) -from pyapacheatlas.scaffolding.templates.excel import _update_sheet_headers +from pyapacheatlas.readers.excel import ExcelConfiguration, ExcelReader +from pyapacheatlas.scaffolding.column_lineage import column_lineage_scaffold + +def test_verify_template_sheets(): + # Setup + temp_path = "./temp_verfiysheets.xlsx" + ExcelReader.make_template(temp_path) + + # Expected + expected_sheets = set(["ColumnsLineage", "TablesLineage", + "EntityDefs", "BulkEntities" + ]) + + wb = load_workbook(temp_path) + difference = set(wb.sheetnames).symmetric_difference(expected_sheets) + try: + assert(len(difference) == 0) + finally: + wb.close() + os.remove(temp_path) def setup_workbook_custom_sheet(filepath, sheet_name, headers, json_rows): wb = Workbook() customSheet = wb.active customSheet.title = sheet_name - _update_sheet_headers(headers, customSheet) + ExcelReader._update_sheet_headers(headers, customSheet) row_counter = 0 + # TODO: Clear the column headers # Add the data to the sheet for row in customSheet.iter_rows(min_row=2, max_col=len(headers), max_row=len(json_rows)+1): for idx, cell in enumerate(row): @@ -36,7 +47,8 @@ def setup_workbook_custom_sheet(filepath, sheet_name, headers, json_rows): def setup_workbook(filepath, sheet_name, max_col, json_rows): - excel_template(filepath) + if not os.path.exists(filepath): + ExcelReader.make_template(filepath) wb = load_workbook(filepath) active_sheet = wb[sheet_name] @@ -58,7 +70,8 @@ def remove_workbook(filepath): def test_excel_typeDefs_entityTypes(): temp_filepath = "./temp_test_typeDefs_entityTYpes.xlsx" ec = ExcelConfiguration() - max_cols = len(ENTITYDEF_TEMPLATE) + reader = ExcelReader(ec) + max_cols = len(ExcelReader.TEMPLATE_HEADERS["BulkEntities"]) # "Entity TypeName", "name", "description", # "isOptional", "isUnique", "defaultValue", # "typeName", "displayName", "valuesMinCount", @@ -74,7 +87,7 @@ def test_excel_typeDefs_entityTypes(): ] setup_workbook(temp_filepath, "EntityDefs", max_cols, json_rows) - results = excel_typeDefs(temp_filepath, ec) + results = reader.parse_entity_defs(temp_filepath) assert("entityDefs" in results) assert(len(results["entityDefs"]) == 1) @@ -86,7 +99,8 @@ def test_excel_typeDefs_entityTypes(): def test_excel_bulkEntities(): temp_filepath = "./temp_test_excel_bulkEntities.xlsx" ec = ExcelConfiguration() - max_cols = len(BULKENTITY_TEMPLATE) + reader = ExcelReader(ec) + max_cols = len(ExcelReader.TEMPLATE_HEADERS["BulkEntities"]) # "typeName", "name", # "qualifiedName", "classifications" json_rows = [ @@ -99,7 +113,7 @@ def test_excel_bulkEntities(): ] setup_workbook(temp_filepath, "BulkEntities", max_cols, json_rows) - results = excel_bulkEntities(temp_filepath, ec) + results = reader.parse_bulk_entities(temp_filepath) try: assert("entities" in results) @@ -111,7 +125,8 @@ def test_excel_bulkEntities(): def test_excel_bulkEntities_withClassifications(): temp_filepath = "./temp_test_excel_bulkEntitiesWithClassifications.xlsx" ec = ExcelConfiguration() - max_cols = len(BULKENTITY_TEMPLATE) + reader = ExcelReader(ec) + max_cols = len(ExcelReader.TEMPLATE_HEADERS["BulkEntities"]) # "typeName", "name", # "qualifiedName", "classifications" json_rows = [ @@ -125,7 +140,7 @@ def test_excel_bulkEntities_withClassifications(): setup_workbook(temp_filepath, "BulkEntities", max_cols, json_rows) - results = excel_bulkEntities(temp_filepath, ec) + results = reader.parse_bulk_entities(temp_filepath) try: assert("entities" in results) @@ -148,8 +163,10 @@ def test_excel_bulkEntities_withClassifications(): def test_excel_bulkEntities_dynamicAttributes(): temp_filepath = "./temp_test_excel_bulkEntitieswithAttributes.xlsx" ec = ExcelConfiguration() + reader = ExcelReader(ec) - headers = BULKENTITY_TEMPLATE + ["attrib1", "attrib2"] + headers = ExcelReader.TEMPLATE_HEADERS["BulkEntities"] + \ + ["attrib1", "attrib2"] # "typeName", "name", # "qualifiedName", "classifications" # "attrib1", "attrib2" @@ -167,7 +184,7 @@ def test_excel_bulkEntities_dynamicAttributes(): setup_workbook_custom_sheet( temp_filepath, "BulkEntities", headers, json_rows) - results = excel_bulkEntities(temp_filepath, ec) + results = reader.parse_bulk_entities(temp_filepath) try: assert("entities" in results) @@ -186,3 +203,119 @@ def test_excel_bulkEntities_dynamicAttributes(): finally: remove_workbook(temp_filepath) + + +def test_excel_table_lineage(): + temp_filepath = "./temp_test_excel_table_lineage.xlsx" + ec = ExcelConfiguration() + reader = ExcelReader(ec) + max_cols = len(ExcelReader.TEMPLATE_HEADERS["TablesLineage"]) + + # "Target Table", "Target Type", "Target Classifications", + # "Source Table", "Source Type", "Source Classifications", + # "Process Name", "Process Type" + + json_rows = [ + ["table1", "demo_type", None, + "table0", "demo_type2", None, + "proc01", "proc_type" + ] + ] + + setup_workbook(temp_filepath, "TablesLineage", max_cols, json_rows) + + results = reader.parse_table_lineage(temp_filepath) + + try: + assert(results[0].to_json(minimum=True) == { + "typeName": "demo_type", "guid": -1001, "qualifiedName": "table1"}) + assert(results[1].to_json(minimum=True) == { + "typeName": "demo_type2", "guid": -1002, "qualifiedName": "table0"}) + assert(results[2].to_json(minimum=True) == { + "typeName": "proc_type", "guid": -1003, "qualifiedName": "proc01"}) + finally: + remove_workbook(temp_filepath) + + +def test_excel_column_lineage(): + temp_filepath = "./temp_test_excel_column_lineage.xlsx" + ec = ExcelConfiguration() + reader = ExcelReader(ec) + max_cols_tl = len(ExcelReader.TEMPLATE_HEADERS["TablesLineage"]) + max_cols_cl = len(ExcelReader.TEMPLATE_HEADERS["ColumnsLineage"]) + + # "Target Table", "Target Type", "Target Classifications", + # "Source Table", "Source Type", "Source Classifications", + # "Process Name", "Process Type" + + json_rows = [ + ["table1", "demo_table", None, + "table0", "demo_table", None, + "proc01", "demo_process" + ] + ] + + # "Target Table", "Target Column", "Target Classifications", + # "Source Table", "Source Column", "Source Classifications", + # "Transformation" + json_rows_col = [ + ["table1", "t00", None, + "table0", "t00", None, + None], + ["table1", "tcombo", None, + "table0", "tA", None, + None], + ["table1", "tcombo", None, + "table0", "tB", None, + None], + ] + + setup_workbook(temp_filepath, "TablesLineage", max_cols_tl, json_rows) + setup_workbook(temp_filepath, "ColumnsLineage", max_cols_cl, json_rows_col) + + atlas_types = column_lineage_scaffold("demo") + + table_entities = reader.parse_table_lineage(temp_filepath) + + # For column mappings, table_entities do not contain columnMapping + assert(all(["columnMapping" not in e.attributes for e in table_entities])) + + column_entities = reader.parse_column_lineage(temp_filepath, + table_entities, + atlas_types, + use_column_mapping= True + ) + + try: + table1 = None + table0 = None + proc01 = None + t00 = None + table1_t00 = None + table0_t00 = None + col_lineage_process = None + table_lookup = {e.get_name():e for e in table_entities} + column_lookup = {e.get_name():e for e in column_entities} + + # We have five columns (t00 > t00) + ((tA + tB) > tcombo) + # and two processes + assert(len(column_entities) == 7) + + # Because of column mappings is TRUE, table entities are modified + assert("columnMapping" in table_lookup["proc01"].attributes) + resulting_col_map = json.loads(table_lookup["proc01"].attributes["columnMapping"])[0] + expected_col_map = { + "DatasetMapping":{"Source":"table0", "Sink":"table1"}, + "ColumnMapping":[ + {"Source":"t00","Sink":"t00"}, + {"Source":"tA","Sink":"tcombo"}, + {"Source":"tB","Sink":"tcombo"} + ] + } + assert(resulting_col_map["DatasetMapping"] == expected_col_map["DatasetMapping"]) + assert(len(resulting_col_map["ColumnMapping"]) == 3) + assert(resulting_col_map["ColumnMapping"][0] in expected_col_map["ColumnMapping"]) + assert(resulting_col_map["ColumnMapping"][1] in expected_col_map["ColumnMapping"]) + assert(resulting_col_map["ColumnMapping"][2] in expected_col_map["ColumnMapping"]) + finally: + remove_workbook(temp_filepath) diff --git a/tests/readers/test_reader.py b/tests/readers/test_reader.py new file mode 100644 index 0000000..c1d2a1e --- /dev/null +++ b/tests/readers/test_reader.py @@ -0,0 +1,141 @@ +import warnings + +import pytest + +from pyapacheatlas.core.typedef import AtlasAttributeDef +from pyapacheatlas.readers.reader import Reader, ReaderConfiguration + + +def test_parse_bulk_entities(): + rc = ReaderConfiguration() + reader = Reader(rc) + # "typeName", "name", + # "qualifiedName", "classifications" + json_rows = [ + {"typeName":"demoType", "name":"entityNameABC", + "qualifiedName":"qualifiedNameofEntityNameABC", "classifications":None + }, + {"typeName":"demoType", "name":"entityNameGHI", + "qualifiedName":"qualifiedNameofEntityNameGHI", "classifications":"PII;CLASS2" + }, + {"typeName":"demoType", "name":"entityNameJKL", + "qualifiedName":"qualifiedNameofEntityNameJKL", "classifications":"PII" + }, + {"typeName":"demoType", "name":"entityNameDynamic", + "qualifiedName":"qualifiedNameofEntityNameDynamic", "classifications":None, + "dynamicAttrib1":"foo", "dynamicAttrib2":"bar" + } + ] + results = reader.parse_bulk_entities(json_rows) + + assert("entities" in results) + assert(len(results["entities"]) == len(json_rows)) + abc = results["entities"][0] + ghi = results["entities"][1] + jkl = results["entities"][2] + dynamic = results["entities"][3] + + assert("classifications" not in abc) + assert(len(ghi["classifications"]) == 2) + assert(len(jkl["classifications"]) == 1) + + assert(jkl["classifications"][0]["typeName"] == "PII") + ghi_classification_types = set( + [x["typeName"] for x in ghi["classifications"]] + ) + assert(set(["PII", "CLASS2"]) == ghi_classification_types) + + assert ("dynamicAttrib1" in dynamic["attributes"]) + assert (dynamic["attributes"]["dynamicAttrib1"] == "foo") + assert ("dynamicAttrib2" in dynamic["attributes"]) + assert (dynamic["attributes"]["dynamicAttrib2"] == "bar") + +def test_parse_entity_defs(): + rc = ReaderConfiguration() + reader = Reader(rc) + # "Entity TypeName", "name", "description", + # "isOptional", "isUnique", "defaultValue", + # "typeName", "displayName", "valuesMinCount", + # "valuesMaxCount", "cardinality", "includeInNotification", + # "indexType", "isIndexable" + json_rows = [ + { + "Entity TypeName":"demoType", + "name":"attrib1", + "description":"Some desc", + "isOptional":"True", + "isUnique":"False", + "defaultValue": None, + "typeName":"string", + "displayName": None, + "valuesMinCount": None, + "valuesMaxCount": None, + "cardinality": None, + "includeInNotification": None, + "indexType": None, + "isIndexable": None + } + ] + + results = reader.parse_entity_defs(json_rows) + + assert("entityDefs" in results) + assert(len(results["entityDefs"]) == 1) + assert(results["entityDefs"][0]["attributeDefs"][0]["name"] == "attrib1") + +def test_parse_entity_defs_extended(): + rc = ReaderConfiguration() + reader = Reader(rc) + json_rows = [ + {"Entity TypeName":"generic", "name":"attrib1", "description":"desc1", + "isOptional":"True", "isUnique":"False", "defaultValue":None}, + {"Entity TypeName":"generic", "name":"attrib2", "description":"desc2", + "isOptional":"True", "isUnique":"False", "defaultValue":None, + "cardinality":"SINGLE"}, + {"Entity TypeName":"demo", "name":"attrib3", "description":"desc3", + "isOptional":"False", "isUnique":"False","cardinality":"SET"} + ] + + output = reader.parse_entity_defs(json_rows) + # It is an AtlasTypesDef composite wrapper + assert("entityDefs" in output.keys()) + # There are two entity typenames specified so there should be only two entityDefs + assert (len(output["entityDefs"]) == 2) + + genericEntityDef = None + demoEntityDef = None + + for entityDef in output["entityDefs"]: + if entityDef["name"] == "generic": + genericEntityDef = entityDef + elif entityDef["name"] == "demo": + demoEntityDef = entityDef + + # Generic has two attributes + assert(len(genericEntityDef["attributeDefs"]) == 2) + + # Demo has one attribute + assert(len(demoEntityDef["attributeDefs"]) == 1) + + assert( + demoEntityDef["attributeDefs"][0] == AtlasAttributeDef( + name="attrib3", **{"description":"desc3","isOptional":"False", + "isUnique":"False","cardinality":"SET"} + ).to_json() + ) + +def test_entityDefs_warns_with_extra_params(): + rc = ReaderConfiguration() + reader = Reader(rc) + # All attribute keys should be converted to camel case except "Entity TypeName" + inputData = [ + {"Entity TypeName":"generic", "name":"attrib1", "description":"desc1", + "isOptional":"True", "isUnique":"False", "defaultValue":None}, + {"Entity TypeName":"generic", "name":"attrib2", "description":"desc2", + "isOptional":"True", "isUnique":"False", "defaultValue":None, + "cardinality":"SINGLE","randomAttrib":"foobar"} + ] + + # Assert that a UserWarning occurs when adding an extra attribute + pytest.warns(UserWarning, reader.parse_entity_defs, **{"json_rows":inputData}) + \ No newline at end of file diff --git a/tests/readers/test_table_column.py b/tests/readers/test_table_column.py new file mode 100644 index 0000000..134bceb --- /dev/null +++ b/tests/readers/test_table_column.py @@ -0,0 +1,330 @@ +import json + +from pyapacheatlas.core import AtlasProcess +from pyapacheatlas.readers.util import * + +from pyapacheatlas.readers.reader import Reader, ReaderConfiguration + +# Set up some cross-test objects and functions +READER_CONFIG = ReaderConfiguration() + + +def setup_column_lineage_entities(): + json_tables = [ + { + "Target Table": "table1", "Target Type": "demo_table", + "Source Table": "table0", "Source Type": "demo_table", + "Process Name": "proc01", "Process Type": "demo_process" + } + ] + + json_columns = [ + { + "Target Column": "col1", "Target Table": "table1", + "Source Column": "col0", "Source Table": "table0", + "Transformation": None + } + ] + + atlas_typedefs = {"entityDefs": [ + {"typeName": "demo_table", "relationshipAttributeDefs": [ + {"relationshipTypeName": "demo_table_columns", "name": "columns", + "typeName": "array"}]}, + {"typeName": "demo_process", "relationshipAttributeDefs": [ + {"relationshipTypeName": "demo_process_column_lineage", + "name": "columnLineages", + "typeName": "array"}]} + ], + "relationshipDefs": [ + {"name": "demo_table_columns", + "endDef1": {"type": "demo_table", "name": "columns"}, + "endDef2": {"type": "demo_column", "name": "table"} + }, + {"name": "demo_process_column_lineage", + "endDef1": {"type": "demo_column_lineage", "name": "query"}, + "endDef2": {"type": "demo_process", "name": "columnLineages"} + } + ] + } + return json_tables, json_columns, atlas_typedefs + + +# Begin actual tests +def test_table_lineage(): + reader = Reader(READER_CONFIG) + json_rows = [ + { + "Target Table": "table1", "Target Type": "demo_type", + "Source Table": "table0", "Source Type": "demo_type2", + "Process Name": "proc01", "Process Type": "proc_type" + } + ] + + results = reader.parse_table_lineage(json_rows) + + assert(results[0].to_json(minimum=True) == { + "typeName": "demo_type", "guid": -1001, "qualifiedName": "table1"}) + assert(results[1].to_json(minimum=True) == { + "typeName": "demo_type2", "guid": -1002, "qualifiedName": "table0"}) + assert(results[2].to_json(minimum=True) == { + "typeName": "proc_type", "guid": -1003, "qualifiedName": "proc01"}) + + +def test_table_lineage_with_attributes(): + reader = Reader(READER_CONFIG) + json_rows = [ + { + "Target Table": "table1", "Target Type": "demo_type", + "Target data_type": "str", "Source Table": "table0", + "Source Type": "demo_type2", "Source foo": "bar", + "Process Name": "proc01", "Process Type": "proc_type", + "Process fizz": "buzz" + } + ] + + results = reader.parse_table_lineage(json_rows) + + assert(results[0].attributes["data_type"] == "str") + assert(results[1].attributes["foo"] == "bar") + assert(results[2].attributes["fizz"] == "buzz") + + +def test_table_lineage_multiple_inputs(): + reader = Reader(READER_CONFIG) + json_tables = [ + { + "Target Table": "table1", "Target Type": "demo_type", + "Source Table": "table0", "Source Type": "demo_type", + "Process Name": "proc01", "Process Type": "proc_type" + }, + { + "Target Table": "table1", "Target Type": "demo_type", + "Source Table": "tableB", "Source Type": "demo_type", + "Process Name": "proc01", "Process Type": "proc_type" + } + ] + + results = reader.parse_table_lineage(json_rows=json_tables) + + assert(len(results) == 4) + assert(results[3].to_json(minimum=True) == { + "typeName": "proc_type", "guid": -1003, "qualifiedName": "proc01"}) + process_inputs_qualified_names = [ + p["qualifiedName"] for p in results[3].get_inputs()] + process_outputs_qualified_names = [ + p["qualifiedName"] for p in results[3].get_outputs()] + assert(len(process_inputs_qualified_names) == 2) + assert(len(process_outputs_qualified_names) == 1) + + assert(set(process_inputs_qualified_names) == set(["table0", "tableB"])) + assert(set(process_outputs_qualified_names) == set(["table1"])) + + +def test_column_lineage_entities(): + reader = Reader(READER_CONFIG) + + json_tables, json_columns, atlas_typedefs = setup_column_lineage_entities() + + # Outputs -1003 as the last guid + tables_and_processes = reader.parse_table_lineage(json_tables) + + results = reader.parse_column_lineage( + json_columns, tables_and_processes, atlas_typedefs) + + # Two column entities + # One process entity + target_col_entity = results[0].to_json() + source_col_entity = results[1].to_json() + col_lineage_entity = results[2].to_json() + + assert(target_col_entity["typeName"] == "demo_column") + assert(target_col_entity["relationshipAttributes"] + ["table"]["typeName"] == "demo_table") + assert(source_col_entity["typeName"] == "demo_column") + assert(source_col_entity["relationshipAttributes"] + ["table"]["typeName"] == "demo_table") + assert(col_lineage_entity["typeName"] == "demo_column_lineage") + + for entity in col_lineage_entity["attributes"]["inputs"] + col_lineage_entity["attributes"]["outputs"]: + assert(entity["typeName"] == "demo_column") + + # Check that this points to the correct table process with a (default) query reference in relationshipAttribs + proc_relationship_query_is_demo_process = False + assert("query" in col_lineage_entity["relationshipAttributes"]) + if "query" in col_lineage_entity["relationshipAttributes"]: + proc_relationship_query_is_demo_process = col_lineage_entity[ + "relationshipAttributes"]["query"]["typeName"] == "demo_process" + assert(proc_relationship_query_is_demo_process) + + +def test_column_lineage_entities_with_attributes(): + reader = Reader(READER_CONFIG) + + json_tables, json_columns, atlas_typedefs = setup_column_lineage_entities() + + # Update target to include an attribute + json_columns[0].update({"Target test_attrib1": "value", + "Target test_attrib2": "value2", "Source foo": "bar"}) + + # Outputs -1003 as the last guid + tables_and_processes = reader.parse_table_lineage(json_tables) + + results = reader.parse_column_lineage( + json_columns, tables_and_processes, atlas_typedefs) + + # Two column entities + # One process entity + target_col_entity = results[0] + source_col_entity = results[1] + col_lineage_entity = results[2] + + assert(target_col_entity.attributes["test_attrib1"] == "value") + assert(target_col_entity.attributes["test_attrib2"] == "value2") + assert(source_col_entity.attributes["foo"] == "bar") + + +def test_column_lineage_entities_with_classifications(): + reader = Reader(READER_CONFIG) + + json_tables, json_columns, atlas_typedefs = setup_column_lineage_entities() + + # Update target to include a classification + json_columns[0].update( + {"Target Classifications": "CustomerInfo; PII", "Source Classifications": ""}) + + # Outputs -1003 as the last guid + tables_and_processes = reader.parse_table_lineage(json_tables) + + results = reader.parse_column_lineage( + json_columns, tables_and_processes, atlas_typedefs) + + # Two column entities + # One process entity + target_col_entity = results[0] + source_col_entity = results[1] + col_lineage_entity = results[2] + + assert(len(target_col_entity.classifications) == 2) + assert({"typeName": "CustomerInfo", "attributes": {}} + in target_col_entity.classifications) + assert({"typeName": "PII", "attributes": {}} + in target_col_entity.classifications) + assert(len(source_col_entity.classifications) == 0) + + +def test_column_lineage_entities_with_attributes(): + reader = Reader(READER_CONFIG) + + json_tables, json_columns, atlas_typedefs = setup_column_lineage_entities() + + # Update target to include an attribute + json_columns[0].update({"Target test_attrib1": "value", + "Target test_attrib2": "value2", "Source foo": "bar"}) + + # Outputs -1003 as the last guid + tables_and_processes = reader.parse_table_lineage(json_tables) + + results = reader.parse_column_lineage( + json_columns, tables_and_processes, atlas_typedefs) + + # Two column entities + # One process entity + target_col_entity = results[0] + source_col_entity = results[1] + col_lineage_entity = results[2] + + assert(target_col_entity.attributes["test_attrib1"] == "value") + assert(target_col_entity.attributes["test_attrib2"] == "value2") + assert(source_col_entity.attributes["foo"] == "bar") + + +def test_column_lineage_entities_with_columnMapping(): + reader = Reader(READER_CONFIG) + expected_obj = [ + {"ColumnMapping": [{"Source": "col0", "Sink": "col1"}, {"Source": "col90", "Sink": "col99"}], + "DatasetMapping": {"Source": "table0", "Sink": "table1"} + } + ] + # "[{\"ColumnMapping\": [{\"Source\": \"col0\", \"Sink\": \"col1\"}], \"DatasetMapping\": {\"Source\": \"table0\", \"Sink\": \"table1\"}}]" + expected = json.dumps(expected_obj) + + json_tables, json_columns, atlas_typedefs = setup_column_lineage_entities() + + json_columns.append({ + "Target Column": "col99", "Target Table": "table1", + "Source Column": "col90", "Source Table": "table0", + "Transformation": "col90 + 1" + } + ) + + # Outputs -1003 as the last guid + tables_and_processes = reader.parse_table_lineage(json_tables) + + results = reader.parse_column_lineage( + json_columns, tables_and_processes, atlas_typedefs, use_column_mapping=True) + + # Demonstrating column lineage + assert("columnMapping" in tables_and_processes[2].attributes) + assert(tables_and_processes[2].attributes["columnMapping"] == expected) + + +def test_column_lineage_entities_when_multi_tabled_inputs(): + reader = Reader(READER_CONFIG) + json_tables, json_columns, atlas_typedefs = setup_column_lineage_entities() + # Adding in an extra table + json_tables.append( + { + "Target Table": "table1", "Target Type": "demo_table", + "Source Table": "tableB", "Source Type": "demo_table", + "Process Name": "proc01", "Process Type": "demo_process" + } + ) + json_columns[0].update({"Transformation": "colB + col0"}) + # Adding in an extra column + json_columns.append( + { + "Target Column": "col1", "Target Table": "table1", + "Source Column": "colB", "Source Table": "tableB", + "Transformation": "colB + col0" + } + ) + expected_col_map_obj = [ + {"ColumnMapping": [{"Source": "col0", "Sink": "col1"}], + "DatasetMapping": {"Source": "table0", "Sink": "table1"} + }, + {"ColumnMapping": [{"Source": "colB", "Sink": "col1"}], + "DatasetMapping": {"Source": "tableB", "Sink": "table1"} + } + ] + + table_entities = reader.parse_table_lineage(json_tables) + column_entities = reader.parse_column_lineage( + json_columns, table_entities, atlas_typedefs, use_column_mapping=True) + + # Three columns and one process entity + assert(len(column_entities) == 4) + process_entities = [ + e for e in column_entities if isinstance(e, AtlasProcess)] + assert(len(process_entities) == 1) + process_entity = process_entities[0] + + process_inputs_qualified_names = [p["qualifiedName"] + for p in process_entity.get_inputs()] + process_outputs_qualified_names = [ + p["qualifiedName"] for p in process_entity.get_outputs()] + assert(len(process_inputs_qualified_names) == 2) + assert(len(process_outputs_qualified_names) == 1) + + assert(set(process_inputs_qualified_names) == + set(["table0#col0", "tableB#colB"])) + assert(set(process_outputs_qualified_names) == set(["table1#col1"])) + + table_process_entities = [ + e for e in table_entities if isinstance(e, AtlasProcess)] + table_process_entity = table_process_entities[0] + # Should now contain the expected column Mappings + assert("columnMapping" in table_process_entity.attributes) + resulting_colmap = json.loads( + table_process_entity.attributes["columnMapping"]) + assert(len(expected_col_map_obj) == len(resulting_colmap)) + assert(all([res in expected_col_map_obj for res in resulting_colmap])) diff --git a/tests/scaffolding/test_excel_template.py b/tests/scaffolding/test_excel_template.py deleted file mode 100644 index 2081261..0000000 --- a/tests/scaffolding/test_excel_template.py +++ /dev/null @@ -1,24 +0,0 @@ -import os - -from openpyxl import load_workbook - -from pyapacheatlas.scaffolding.templates import excel_template - -def test_verify_template_sheets(): - # Setup - temp_path = "./temp_verfiysheets.xlsx" - excel_template(temp_path) - - # Expected - expected_sheets = set(["ColumnsLineage", "TablesLineage", - "EntityDefs", "BulkEntities" - ]) - - wb = load_workbook(temp_path) - difference = set(wb.sheetnames).symmetric_difference(expected_sheets) - try: - assert(len(difference) == 0) - finally: - wb.close() - os.remove(temp_path) -