diff --git a/converter/es_connector.py b/converter/es_connector.py index b6b229c5..c4a1f530 100644 --- a/converter/es_connector.py +++ b/converter/es_connector.py @@ -168,6 +168,27 @@ def setPermissions(self, uuid, permissions) -> bool: return True except ApiException as e: return False + def setNodeBinaryData(self, uuid, item) -> bool: + if "binary" in item: + logging.info(get_project_settings().get("EDU_SHARING_BASE_URL") + + "rest/node/v1/nodes/-home-/" + + uuid + + "/content?mimetype=" + + item["lom"]["technical"]["format"] + ) + files = {"file": item["binary"]} + response = requests.post( + get_project_settings().get("EDU_SHARING_BASE_URL") + + "rest/node/v1/nodes/-home-/" + + uuid + + "/content?mimetype=" + + item["lom"]["technical"]["format"], + headers=self.getHeaders(None), + files=files, + ) + return response.status_code == 200 + else: + return False def setNodePreview(self, uuid, item) -> bool: if "thumbnail" in item: @@ -243,8 +264,9 @@ def transformItem(self, uuid, spider, item): "ccm:objecttype": item["type"], "ccm:replicationsourceuuid": uuid, "cm:name": item["lom"]["general"]["title"], - "ccm:wwwurl": item["lom"]["technical"]["location"], - "cclom:location": item["lom"]["technical"]["location"], + "ccm:wwwurl": item["lom"]["technical"]["location"] if "location" in item["lom"]["technical"] else None, + "cclom:location": item["lom"]["technical"]["location"] if "location" in item["lom"]["technical"] else None, + "cclom:format": item["lom"]["technical"]["format"] if "format" in item["lom"]["technical"] else None, "cclom:title": item["lom"]["general"]["title"], } if "notes" in item: @@ -491,7 +513,8 @@ def insertItem(self, spider, uuid, item): node = self.syncNode(spider, "ccm:io", self.transformItem(uuid, spider, item)) self.setNodePermissions(node["ref"]["id"], item) self.setNodePreview(node["ref"]["id"], item) - self.setNodeText(node["ref"]["id"], item) + if not self.setNodeBinaryData(node["ref"]["id"], item): + self.setNodeText(node["ref"]["id"], item) def updateItem(self, spider, uuid, item): self.insertItem(spider, uuid, item) diff --git a/converter/items.py b/converter/items.py index 83d266dc..41dd58a6 100644 --- a/converter/items.py +++ b/converter/items.py @@ -181,6 +181,7 @@ class BaseItem(Item): ranking = Field() fulltext = Field() thumbnail = Field() + "thumbnail data in base64" lastModified = Field() lom = Field(serializer=LomBaseItem) valuespaces = Field(serializer=ValuespaceItem) @@ -188,8 +189,10 @@ class BaseItem(Item): "permissions (access rights) for this entry" license = Field(serializer=LicenseItem) publisher = Field() - # editorial notes notes = Field() + "editorial notes" + binary = Field() + "binary data which should be uploaded (raw data)" class BaseItemLoader(ItemLoader): diff --git a/converter/pipelines.py b/converter/pipelines.py index 60aea79c..1523a7d7 100644 --- a/converter/pipelines.py +++ b/converter/pipelines.py @@ -112,9 +112,9 @@ def process_item(self, raw_item, spider): except KeyError: raise DropItem(f'Item {item} has no lom.technical.location') try: - if "location" not in item["lom"]["technical"]: + if "location" not in item["lom"]["technical"] and not "binary" in item: raise DropItem( - "Entry {} has no technical location".format(item["lom"]["general"]["title"]) + "Entry {} has no technical location or binary data".format(item["lom"]["general"]["title"]) ) except KeyError: raise DropItem(f'Item {item} has no lom.technical.location') @@ -519,7 +519,7 @@ def process_item(self, raw_item, spider): title = "" if "title" in item["lom"]["general"]: title = str(item["lom"]["general"]["title"]) - entryUUID = EduSharing.buildUUID(item["response"]["url"]) + entryUUID = EduSharing.buildUUID(item["response"]["url"] if "url" in item["response"] else item["hash"]) self.insertItem(spider, entryUUID, item) logging.info("item " + entryUUID + " inserted/updated") diff --git a/converter/spiders/niedersachsen_abi_spider.py b/converter/spiders/niedersachsen_abi_spider.py new file mode 100644 index 00000000..de895dd4 --- /dev/null +++ b/converter/spiders/niedersachsen_abi_spider.py @@ -0,0 +1,209 @@ +import logging +import os +from _datetime import datetime + +import scrapy + +from .base_classes import LomBase +from .scripts.lower_saxony_abi.directory_routine import DirectoryInitializer, UnZipper, \ + DirectoryScanner +from .scripts.lower_saxony_abi.keyword_mapper import LoSaxKeywordMapper +from ..constants import Constants +from ..items import BaseItemLoader, LomBaseItemloader, LomGeneralItemloader, LomTechnicalItemLoader, \ + LomLifecycleItemloader, LomEducationalItemLoader, LicenseItemLoader, ResponseItemLoader, \ + ValuespaceItemLoader + + +class NiedersachsenAbiSpider(scrapy.Spider, LomBase): + name = 'niedersachsen_abi_spider' + + allowed_domains = ['za-aufgaben.nibis.de'] + start_urls = ['https://za-aufgaben.nibis.de'] + version = "0.0.2" + # Default values for the 2 expected parameters. Parameter "filename" is always required, "skip_unzip" is optional. + filename = None + skip_unzip = False + pdf_dictionary_general = dict() + pdf_dictionary_additional = dict() + + # Running the crawler from the command line with the exact filename as a parameter: + # scrapy crawl niedersachsen_abi_spider -a filename="za-download-6e05cbbb6e07250c69ebe95ae972fe8a.zip" + # -a skip_unzip="yes" + # Make sure that there is a corresponding .zip file inside the /zip_download/-folder in the project root + + # def start_requests(self): + # yield self.parse(None) + + def __init__(self, **kwargs): + super().__init__(**kwargs) + # logging.basicConfig(level=logging.DEBUG, format=' %(asctime)s - %(levelname)s - %(message)s') + # logging.disable(logging.DEBUG) + directory_paths = DirectoryInitializer() + zip_file_dictionary = directory_paths.check_download_folder_for_zip_files() + + # only extract files if a "filename"-parameter was given: + if self.filename is not None: + zip_selection = self.filename + + logging.debug(f"Selected .zip file by CLI-parameter: {zip_selection}") + logging.debug(f"User wants to skip the unzipping? {self.skip_unzip}") + # by default, the script should always unzip the desired .zip file, + # but unzipping the nested .zip files is only done when requested by parameter + if self.skip_unzip == "no": + self.skip_unzip = False + if self.skip_unzip == "yes": + self.skip_unzip = True + logging.debug(f"skip_unzip variable: {self.skip_unzip}") + + if self.skip_unzip is False: + un_zipper = UnZipper() + un_zipper.directory_paths = directory_paths.get_path_storage() + un_zipper.zip_file_dictionary = zip_file_dictionary + zip_file_chosen_by_user = \ + un_zipper.compare_selected_zip_file_with_recognized_files(zip_selection=zip_selection) + + if zip_file_chosen_by_user is not None: + un_zipper.unzip_all_zips_within_the_initial_zip(zip_file=zip_file_chosen_by_user, + skip_unzip=self.skip_unzip) + + logging.debug(f"Extracted the following zip files:") + logging.debug(un_zipper.zip_files_already_extracted) + + # always scan the /zip_extract/-directory for pdfs and try to extract metadata + print( + f"Analyzing file paths for '.pdf'-files inside " + f"{directory_paths.path_storage.path_to_extraction_directory}") + pdfs_in_directory: dict = \ + DirectoryScanner.scan_directory_for_pdfs(directory_paths.path_storage.path_to_extraction_directory) + # logging.debug(pp.pformat(pdfs_in_directory)) + print(f"Total .pdf items in the above mentioned directory: {len(pdfs_in_directory.keys())}") + if len(pdfs_in_directory.keys()) == 0: + raise Exception(f"No .pdf files found inside {directory_paths.path_storage.path_to_extraction_directory}. " + f"Please make sure that you've run the crawler with '-a filename=' " + f"parameter first and that there's actual .pdf files inside the extraction directory") + kw_mapper = LoSaxKeywordMapper() + pdf_dict1, pdf_dict2 = kw_mapper.extract_pdf_metadata(pdfs_in_directory) + self.pdf_dictionary_general = pdf_dict1 + self.pdf_dictionary_additional = pdf_dict2 + + def getId(self, response=None) -> str: + pass + + def getHash(self, response=None) -> str: + pass + + def parse(self, response, **kwargs): + # print(f"filename = {self.filename}") + # print(f"skip_unzip = {self.skip_unzip}") + logging.debug(f"The .pdf (general) dictionary has {len(self.pdf_dictionary_general.keys())} files") + logging.debug(f"The dictionary for additional .pdf files has " + f"{len(self.pdf_dictionary_additional.keys())} entries") + + # first we're scraping all the .pdf files that follow the more general RegEx syntax + for pdf_item in self.pdf_dictionary_general: + current_dict: dict = self.pdf_dictionary_general.get(pdf_item) + # pprint.pprint(current_dict) + base = BaseItemLoader() + base.add_value('sourceId', pdf_item) + hash_temp = str(f"{datetime.now().isoformat()}{self.version}") + base.add_value('hash', hash_temp) + base.add_value('type', Constants.TYPE_MATERIAL) + base.add_value('binary', self.get_binary(current_dict, pdf_item)) + + lom = LomBaseItemloader() + + general = LomGeneralItemloader() + title_long: str = ' '.join(current_dict.get('keywords')) + general.add_value('title', title_long) + general.add_value('identifier', pdf_item) + general.add_value('keyword', current_dict.get('keywords')) + lom.add_value('general', general.load_item()) + + technical = LomTechnicalItemLoader() + technical.add_value('format', 'application/pdf') + lom.add_value('technical', technical.load_item()) + + lifecycle = LomLifecycleItemloader() + lifecycle.add_value('role', 'publisher') + lifecycle.add_value('organization', 'Niedersächsisches Kultusministerium') + lom.add_value('lifecycle', lifecycle.load_item()) + + educational = LomEducationalItemLoader() + lom.add_value('educational', educational.load_item()) + + base.add_value('lom', lom.load_item()) + + vs = ValuespaceItemLoader() + if current_dict.get('discipline') is not None: + vs.add_value('discipline', current_dict.get('discipline')) + if current_dict.get('intendedEndUserRole') is not None: + vs.add_value('intendedEndUserRole', current_dict.get('intendedEndUserRole')) + base.add_value('valuespaces', vs.load_item()) + + lic = LicenseItemLoader() + base.add_value('license', lic.load_item()) + + permissions = LomBase.getPermissions(self) + base.add_value('permissions', permissions.load_item()) + + response_loader = ResponseItemLoader() + base.add_value('response', response_loader.load_item()) + + yield base.load_item() + + # Making sure that we also grab the additional .pdf files that don't follow the general filename syntax + for pdf_item in self.pdf_dictionary_additional: + current_dict: dict = self.pdf_dictionary_additional.get(pdf_item) + # pprint.pprint(current_dict) + base = BaseItemLoader() + base.add_value('sourceId', pdf_item) + hash_temp = str(f"{datetime.now().isoformat()}{self.version}") + base.add_value('hash', hash_temp) + base.add_value('type', Constants.TYPE_MATERIAL) + base.add_value('binary', self.get_binary(current_dict, pdf_item)) + + lom = LomBaseItemloader() + + general = LomGeneralItemloader() + general.add_value('title', pdf_item.split('.')[:-1]) + general.add_value('identifier', pdf_item) + general.add_value('keyword', current_dict.get('keywords')) + lom.add_value('general', general.load_item()) + + technical = LomTechnicalItemLoader() + technical.add_value('format', 'application/pdf') + lom.add_value('technical', technical.load_item()) + + lifecycle = LomLifecycleItemloader() + lifecycle.add_value('role', 'publisher') + lifecycle.add_value('organization', 'Niedersächsisches Kultusministerium') + lom.add_value('lifecycle', lifecycle.load_item()) + + educational = LomEducationalItemLoader() + lom.add_value('educational', educational.load_item()) + + base.add_value('lom', lom.load_item()) + + vs = ValuespaceItemLoader() + if current_dict.get('discipline') is not None: + vs.add_value('discipline', current_dict.get('discipline')) + base.add_value('valuespaces', vs.load_item()) + + lic = LicenseItemLoader() + base.add_value('license', lic.load_item()) + + permissions = LomBase.getPermissions(self) + base.add_value('permissions', permissions.load_item()) + + response_loader = ResponseItemLoader() + base.add_value('response', response_loader.load_item()) + + yield base.load_item() + + @staticmethod + def get_binary(current_dict, pdf_item): + filepath_full = current_dict.get('pdf_path') + os.path.sep + pdf_item + file = open(filepath_full, mode='rb') + binary = file.read() + file.close() + return binary diff --git a/converter/spiders/scripts/__init__.py b/converter/spiders/scripts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/converter/spiders/scripts/lower_saxony_abi/__init__.py b/converter/spiders/scripts/lower_saxony_abi/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/converter/spiders/scripts/lower_saxony_abi/directory_routine.py b/converter/spiders/scripts/lower_saxony_abi/directory_routine.py new file mode 100644 index 00000000..cd869528 --- /dev/null +++ b/converter/spiders/scripts/lower_saxony_abi/directory_routine.py @@ -0,0 +1,246 @@ +import logging +import os +import pprint +import zipfile +from dataclasses import dataclass + + +@dataclass +class PathStorage: + parent_directory: str = None + path_to_download_directory: str = None + path_to_extraction_directory: str = None + + pp = pprint.PrettyPrinter(indent=4) + + def print_all_directories(self): + self.pp.pprint("Working-directories that this script will be using:") + self.pp.pprint(self.parent_directory) + self.pp.pprint(self.path_to_download_directory) + self.pp.pprint(self.path_to_extraction_directory) + + pass + + +class DirectoryInitializer: + """ + This class makes sure that the 3 directories that will be frequently used actually exist - and if they don't will + create those directories and save them to our 'PathStorage'-dataclass. + After the DirectoryInitializer class is done with its work, the folder structure should look like this: + // + //zip_download <- this is where the 'to be extracted' .zips should be + //zip_download/zip_extract/ <- this is where the extracted files end up + """ + path_storage = PathStorage() + + def __init__(self): + self.initialize_required_directories() + + def check_download_folder_for_zip_files(self) -> dict: + """ + Checks the /zip_download/-folder for .zip files and returns a list with their filenames and size in megabyte. + """ + file_dict = dict() + os.chdir(self.path_storage.path_to_download_directory) + logging.debug("Checking " + os.getcwd() + " for zip files") + if os.getcwd().endswith('zip_download'): + temp_list = os.listdir(os.getcwd()) + # since the temp_list will hold folder names as well, we're checking for files only: + file_list = list() + for file_entry in temp_list: + if os.path.isfile(file_entry): + if file_entry.endswith('.zip'): + file_list.append(file_entry) + for file in file_list: + file_size_temp = os.path.getsize(file) + file_size_megabyte = file_size_temp / (1000 * 1000) + file_size_megabyte = str(file_size_megabyte) + "MB" + # file size in Mebibyte: + # file_size_mebibyte = file_size_temp / (1024 * 1024) + file_dict_entry = { + file: file_size_megabyte + } + file_dict.update(file_dict_entry) + logging.debug(".zip files detected inside the '/zip_download/'-directory: ") + logging.debug(file_dict) + return file_dict + + def create_zip_download_directory(self): + os.chdir(self.path_storage.parent_directory) + logging.debug("Creating '/zip_download/-directory ...") + os.mkdir('zip_download') + if os.path.exists('zip_download'): + print("Please provide a suitable .zip-file inside the '/zip_download/'-directory and rerun the script") + self.path_storage.path_to_download_directory = os.path.join(os.getcwd(), 'zip_download') + + def create_zip_extraction_directory(self): + os.chdir(self.path_storage.path_to_download_directory) + logging.debug("Creating '/zip_extract/'-directory ...") + os.mkdir('zip_extract') + os.chdir('zip_extract') + self.path_storage.path_to_extraction_directory = os.getcwd() + os.chdir('..') + + def detect_extraction_directory(self): + """ + Checks if there is a /zip_extract/-subdirectory inside the /zip_download/ folder and saves the folder path to + the class attributes. If there isn't a subdirectory, it'll create one by calling the corresponding method. + """ + logging.debug("Detecting 'zip_extract'-sub-folder ...") + os.chdir(self.path_storage.path_to_download_directory) + if os.path.exists('zip_extract'): + logging.debug("SUCCESS! Detected '/zip_extract/'-directory, continuing ...") + os.chdir('zip_extract') + self.path_storage.path_to_extraction_directory = os.getcwd() + os.chdir('..') + else: + self.create_zip_extraction_directory() + + def detect_zip_directory(self) -> bool: + if os.path.exists('zip_download'): + os.chdir('zip_download') + zip_directory = os.path.join(os.getcwd()) + logging.debug("SUCCESS! Detected 'zip_download'-directory in: " + zip_directory) + self.path_storage.path_to_download_directory = zip_directory + return True + else: + self.create_zip_download_directory() + return False + + def get_path_storage(self): + return self.path_storage + + def initialize_folders(self): + logging.debug("Looking for 'zip_download/'-directory ...") + if self.detect_zip_directory(): + self.detect_extraction_directory() + + def initialize_required_directories(self): + self.path_storage.parent_directory = os.getcwd() + self.initialize_folders() + self.path_storage.print_all_directories() + os.chdir(self.path_storage.parent_directory) + return self + + +class UnZipper: + directory_paths: PathStorage = None + zip_file_dictionary: dict = None + zip_files_already_extracted = set() + zip_files_to_extract = set() + zip_files_to_extract_dict = dict() + + pp = pprint.PrettyPrinter(indent=4) + + def compare_selected_zip_file_with_recognized_files(self, zip_selection=None): + # TODO: maybe prettify the zip list output + self.pp.pprint(f"The following .zip files were recognized by the script: {self.zip_file_dictionary}") + if zip_selection is not None: + if zip_selection in self.zip_file_dictionary.keys(): + zip_file_name = zip_selection + zip_file_size_megabytes = self.zip_file_dictionary.get(zip_selection) + print(f"Selected the following file:\t {zip_file_name} \t size: {zip_file_size_megabytes}") + zip_file = zipfile.ZipFile(zip_file_name) + return zip_file + else: + logging.warning(f"Selected .zip file '{zip_selection}' not found in " + f"'{self.directory_paths.path_to_download_directory}'!\n" + f"These are the available .zip files: {self.zip_file_dictionary}.\n" + f"Please make sure that your CLI-parameter input for --filename='file.zip' is valid.") + + def unzip_all_zips_within_the_initial_zip(self, zip_file: zipfile, skip_unzip=False): + """ + Unzips the initially selected .zip file and checks if the user wants to also extract all .zip files in its + subdirectories. + Keeps track of which files were already extracted by using a set() of their filenames. + :param zip_file: the user-specified zip file that needs extraction + :param skip_unzip: in case the user wants to only unzip the initial .zip file and nothing else + :return: a list() of all .zip files that were found within the initial .zip file + """ + zips_inside_zip: list = list() + zip_files_list: list = zip_file.namelist() + zip_file.extractall(path='zip_extract') + filename_full_path = os.path.abspath(zip_file.filename) + self.zip_files_already_extracted.add(filename_full_path) + + for zip_item in zip_files_list: + if zip_item.endswith('.zip'): + zips_inside_zip.append(zip_item) + + if len(zips_inside_zip) > 0: + logging.debug(f"Found additional .zip files inside {zip_file.filename}:") + logging.debug(zips_inside_zip) + if skip_unzip is False: + self.unzip_everything(self.directory_paths.path_to_extraction_directory) + elif skip_unzip is True: + print(f"Okay. Skipping extraction of nested .zip files within {zip_file.filename}") + elif len(zips_inside_zip) == 0: + return zips_inside_zip + + def unzip_everything(self, directory_as_string): + """ + Tries to recursively unzip all .zip files within a directory. + :param directory_as_string: the filepath in which to look for .zip files + """ + extract_dir = directory_as_string + os.chdir(extract_dir) + zip_inside_zip_counter = 0 + for folder_name, sub_folder, filenames in os.walk(extract_dir): + if len(sub_folder) == 0 and folder_name.endswith('zip_extract'): + for filename_top_level in filenames: + current_full_path = os.path.abspath(filename_top_level) + if filename_top_level.endswith( + '.zip') and current_full_path not in self.zip_files_already_extracted: + print(folder_name) + print(filename_top_level) + self.zip_files_already_extracted.add(current_full_path) + current_zip = zipfile.ZipFile(filename_top_level) + zip_files_inside = current_zip.namelist() + for zip_file_inside in zip_files_inside: + if zip_file_inside.endswith('.zip'): + zip_inside_zip_counter += 1 + current_zip.extractall() + if zip_inside_zip_counter > 0: + if extract_dir is not None: + self.unzip_everything(extract_dir) + else: + extract_dir = self.directory_paths.path_to_extraction_directory + self.unzip_everything(extract_dir) + for _ in sub_folder: + for filename in filenames: + current_full_path = os.path.abspath(filename) + if filename.endswith('.zip') and current_full_path not in self.zip_files_already_extracted: + self.zip_files_to_extract.add(filename) + self.zip_files_to_extract_dict.update({filename: folder_name}) + + for item in self.zip_files_to_extract_dict.keys(): + if item not in self.zip_files_already_extracted: + print(f"Unzipping: {item}") + temp_filepath_full = self.zip_files_to_extract_dict.get(item) + os.path.sep + item + temp_path = self.zip_files_to_extract_dict.get(item) + temp_zip: zipfile = zipfile.ZipFile(temp_filepath_full) + temp_zip.extractall(path=temp_path) + self.zip_files_already_extracted.add(item) + pass + + +class DirectoryScanner: + + @staticmethod + def scan_directory_for_pdfs(target_directory): + """ + Returns a dict() of .pdf files and their filepath. + :param target_directory: the directory in which to look for .pdf files + :return: a dictionary consisting of two strings: a unique filename and the corresponding directory, e.g.: + dict() = { filename : directory } + """ + directory_to_scan = target_directory + pdf_list = set() + pdf_dictionary_temp = dict() + for folder_name, sub_folders, filenames in os.walk(directory_to_scan): + for _ in sub_folders: + for filename in filenames: + if filename.endswith('.pdf') and filename not in pdf_list: + pdf_list.add(filename) + pdf_dictionary_temp.update({filename: folder_name}) + return pdf_dictionary_temp diff --git a/converter/spiders/scripts/lower_saxony_abi/keyword_mapper.py b/converter/spiders/scripts/lower_saxony_abi/keyword_mapper.py new file mode 100644 index 00000000..1ca04752 --- /dev/null +++ b/converter/spiders/scripts/lower_saxony_abi/keyword_mapper.py @@ -0,0 +1,252 @@ +import logging +import os +import pprint +import re + + +class LoSaxKeywordMapper: + """ + KeywordMapper for 'Abituraufgaben' from Lower Saxony + see: https://za-aufgaben.nibis.de + + Provides discipline- and keyword-mapping for the abbreviations found in the to be parsed '.pdf'-filenames. + """ + discipline_mapping = { + # SkoHub discipline Mapping, see: https://github.com/openeduhub/oeh-metadata-vocabs/blob/master/discipline.ttl + 'BRC': 'Wirtschaft und Verwaltung', + 'BVW': 'Wirtschaftskunde', + 'Ernaehrung': 'Ernährung und Hauswirtschaft', + 'EvReligion': 'Religion', + 'Franz': 'Französisch', + 'GesPfl': 'Gesundheit', + 'KathReligion': 'Religion', + 'Mathe': 'Mathematik', + 'MatheTech': 'Mathematik', + 'MatheWirt': 'Mathematik', + 'PaedPsych': 'Pädagogik', + 'PolitikWirtschaft': 'Politik', + 'VW': 'Wirtschaftskunde', + 'WerteNormen': 'Ethik', + } + + keyword_mapping = { + # additional discipline information, specific for Lower Saxony: + 'BRC': 'Betriebswirtschaft mit Rechnungswesen-Controlling', + 'BVW': 'Betriebs- und Volkswirtschaft', + 'Ernaehrung': 'Ernährung und Hauswirtschaft', + 'EvReligion': 'Evangelische Religion', + 'Franz': 'Französisch', + 'GesPfl': 'Gesundheit-Pflege', + 'KathReligion': 'Katholische Religion', + 'Mathe': 'Mathematik', + 'MatheTech': 'Mathematik - Berufliches Gymnasium - Technik', + 'MatheWirt': 'Mathematik - Berufliches Gymnasium - Wirtschaft / Gesundheit und Soziales', + 'PaedPsych': 'Pädagogik-Psychologie', + 'PolitikWirtschaft': 'Politik-Wirtschaft', + 'VW': 'Volkswirtschaft', + 'WerteNormen': 'Werte und Normen', + # additional keywords + 'Neu': 'Neubeginn', + 'BG': 'Berufsgymnasium (BG)', + 'ZBW': 'Zweiter Bildungsweg (ZBW) / Freie Waldorfschulen / Nichtschüler', + 'CAS': 'Computer Algebra System (CAS)', + 'GTR': 'Grafikfähiger Taschenrechner (GTR)', + 'WTR': 'Wissenschaftlicher Taschenrechner', + 'EA': 'Kurs auf erhöhtem Anforderungsniveau (eA)', + 'GA': 'Kurs auf grundlegendem Anforderungsniveau (gA)', + 'HV': 'Hörverständnis', + 'ME': 'Material', # for students or teachers + 'mitExp': 'mit Experimentieren', + 'ohneExp': 'ohne Experimentieren', + 'mitExpElektrik': 'mit Experimentieren - Elektrik', + 'mitExpOptik': 'mit Experimentieren - Optik', + 'mitExpWellen': 'mit Experimentieren - Wellen', + '_ALLGE': 'Allgemein (ALLGE)', + '_LA': 'Lineare Algebra (LA)', + '_LA_AG': 'Lineare Algebra / Analytische Geometrie (LA_AG)', + '_STOCH': 'Stochastik (STOCH)', + 'AnlagenTSP': 'Anlagen - Thematische Schwerpunkte', + 'TS': 'Thematische Schwerpunkte / Themenschwerpunkte', + 'TSP': 'Thematische Schwerpunkte / Themenschwerpunkte' + } + # For Debugging: + logging.basicConfig(level=logging.DEBUG, format=' %(asctime)s - %(levelname)s - %(message)s') + pp = pprint.PrettyPrinter(indent=4) + + def extract_pdf_metadata(self, pdf_dictionary): + """ + expects a pdf_dictionary consisting of two strings: {'filename': 'path_to_file'} + then does a 3 step conversion: + + - sorting the pdf_entries into either 'general' or 'additional' .pdf files + - using RegEx to extract metadata from the filename into a pdf dictionary + - cleaning up the dictionary of 'None'-Types + - mapping keywords + + afterwards returns two final pdf_dictionary for 'normal' and 'additional' .pdf files, where + + - key = 'unique_filename_of_a_pdf_file.pdf' + - values = nested dictionary (with keys like 'discipline', 'year', 'pdf_path', 'keywords' + + :param pdf_dictionary: dict + :return: pdf_dictionary_general, pdf_dictionary_additional_files + """ + pdf_dictionary_raw = pdf_dictionary + pdf_temp = dict() + pdf_additional_files = dict() + for pdf_item in pdf_dictionary_raw.keys(): + logging.debug(self.pp.pformat(pdf_item)) + if pdf_item.startswith('Anlage') or pdf_item.startswith('TSP'): + logging.debug(f"Filtered out {pdf_item} from {pdf_dictionary_raw.get(pdf_item)}") + pdf_additional_files.update({pdf_item: pdf_dictionary_raw.get(pdf_item)}) + else: + regex_general = re.compile(r'(?P\d{4})' + r'(?P.+?)' + r'(?PNeu)?' # Neubeginner (2. Fremdsprache als Neubeginner) + r'(?PBG|ZBW|FWS)?' + # Berufsgymnasium / Zweiter Bildungsweg / Freie Waldorfschulen? + r'(?PPflicht)?' # Pflichtfach + r'(?PCAS|GTR|WTR)?' + # ComputerAlgebraSystem / Grafikfähiger Taschenrechner / + # Wissenschaftlicher TR + r'(?PEA|GA)?' + r'(?PHV)?' # Hörverständnis + r'(?PM|ME)?' + # Material (für Schüler) oder Erwartungshorizont (für Lehrer) + r'(?P(mitExp)?(Elektrik|Optik|Wellen)?' + r'|ohneExp)?' + r'(?PNeu)?' + r'(?P_ALLGE|_LA|_LA_AG|_STOCH)?' + # Allgemein / LinAlg / analytische Geometrie / Stochastik + r'(?PAnlagen|AnlagenTSP|TS|TS\d{4})?' + # TSP bzw. TS = Thematische Schwerpunkte / Themenschwerpunkte + r'(?PAufg\d)?' + r'(?PLehrer)?' + r'(.pdf)') + if regex_general.search(pdf_item) is not None: + regex_result_dict = regex_general.search(pdf_item).groupdict() + + # For Debugging - In case we want to see the individual (raw) RegEx results: + logging.debug(self.pp.pformat(regex_result_dict)) + + # filterung out the invalid (NoneType) values from the initial regex results with a temporary list: + only_valid_values = list() + for value in regex_result_dict.values(): + if value is not None and value != '': + only_valid_values.append(value) + + # Discipline-Mapping to SkoHub vocabulary: + if regex_result_dict.get('discipline') in self.discipline_mapping.keys(): + regex_result_dict.update( + {'discipline': self.discipline_mapping.get(regex_result_dict.get('discipline'))}) + # Mapping 'Lehrer.pdf' to SkoHub intendedEndUserRole: + if regex_result_dict.get('teacher') is None: + regex_result_dict.update({'intendedEndUserRole': 'learner'}) + elif regex_result_dict.get('teacher') == "Lehrer": + regex_result_dict.update({'intendedEndUserRole': 'teacher'}) + + # For Debugging - this is the 'working list' of keywords without any of the 'None'-types: + logging.debug(f"PDF File: {pdf_item} // only_valid_keywords: {only_valid_values}") + + keywords_cleaned_and_mapped = list() + keywords_cleaned_and_mapped.append('Schriftliche Abituraufgaben Niedersachsen') + for potential_keyword in only_valid_values: + if potential_keyword in self.keyword_mapping: + potential_keyword = self.keyword_mapping.get(potential_keyword) + if potential_keyword.startswith('Aufg'): + potential_keyword = potential_keyword.replace('Aufg', 'Aufgabe ') + keywords_cleaned_and_mapped.append(potential_keyword) + logging.debug(self.pp.pformat(keywords_cleaned_and_mapped)) + + # TODO: keywords + # - Erwartungshorizont für Lehrer + # - relative / absolute path? + dict_of_current_pdf = { + pdf_item.split(os.path.sep)[-1]: { + 'discipline': regex_result_dict.get('discipline'), + 'year': regex_result_dict.get('year'), + 'pdf_path': pdf_dictionary_raw.get(pdf_item), + 'keywords': keywords_cleaned_and_mapped, + 'intendedEndUserRole': regex_result_dict.get('intendedEndUserRole') + } + } + pdf_temp.update(dict_of_current_pdf) + + logging.debug(self.pp.pformat(pdf_temp)) + logging.debug(f"length of pdf_temp: {len(pdf_temp)}") + logging.debug(f"amount of filtered out (additional) pdfs: {len(pdf_additional_files)}") + logging.debug(f"Filtered out pdf items: {pdf_additional_files.items()}") + # self.pp.pprint(pdf_additional_files) + if len(pdf_additional_files) > 0: + pdf_additional_files = self.extract_pdf_metadata_from_additional_files(pdf_dictionary=pdf_additional_files) + return pdf_temp, pdf_additional_files + + def extract_pdf_metadata_from_additional_files(self, pdf_dictionary): + """ + Since not all '.pdf'-filenames are following the same naming syntax, this method processes the filenames that + can't be parsed by the more generic extract_pdf_metadata()-method. + + Expects a pdf_dictionary consisting of two strings: {'filename': 'path_to_file'} + then does a 3 step conversion: + + - sorting the pdf_entries into either 'general' or 'additional' .pdf files + - using RegEx to extract metadata from the filename into a pdf dictionary + - cleaning up the dictionary of 'None'-Types + - mapping keywords + + afterwards returns two final pdf_dictionary for 'normal' and 'additional' .pdf files, where + + - key = 'unique_filename_of_a_pdf_file.pdf' + - values = nested dictionary (with the following keys: 'discipline', 'year', 'pdf_path', 'keywords' + + :param pdf_dictionary: dict + :return: nested dict = { '.pdf filename': { + 'discipline': '...', + 'year': '...', + 'pdf_path': '...', + 'keywords': '...' } + } + """ + pdf_working_dict = pdf_dictionary + pdf_filenames_and_metadata_dict = dict() + for pdf_filename in pdf_working_dict.keys(): + regex_additional_files = re.compile(r'(?PAnlage .+ im Fach|TSP)?' + r'(?P.+?)' + r'(?PTS)?' + r'(?P\d{4})?' + r'(?PAnlagen)?' + r'(.pdf)') + if regex_additional_files.search(pdf_filename) is not None: + regex_result_dict_temporary: dict = regex_additional_files.search(pdf_filename).groupdict() + logging.debug(self.pp.pformat(regex_result_dict_temporary)) + + # extract and clean up the keyword-list: + only_valid_values = list() + for value in regex_result_dict_temporary.values(): + if value is not None and value != '': + only_valid_values.append(value) + logging.debug(only_valid_values) + + # Discipline-Mapping to SkoHub vocabulary: + if regex_result_dict_temporary.get('discipline') in self.discipline_mapping.keys(): + regex_result_dict_temporary.update( + {'discipline': self.discipline_mapping.get(regex_result_dict_temporary.get('discipline'))}) + + keywords_cleaned_and_mapped = list() + for potential_keyword in only_valid_values: + if potential_keyword in self.keyword_mapping: + potential_keyword = self.keyword_mapping.get(potential_keyword) + keywords_cleaned_and_mapped.append(potential_keyword) + keywords_cleaned_and_mapped.append('Schriftliche Abituraufgaben Niedersachsen') + + logging.debug(self.pp.pformat(keywords_cleaned_and_mapped)) + dict_of_current_pdf = { + pdf_filename: { + 'discipline': regex_result_dict_temporary.get('discipline'), + 'year': regex_result_dict_temporary.get('year'), + 'pdf_path': pdf_working_dict.get(pdf_filename), + 'keywords': keywords_cleaned_and_mapped + } + } + pdf_filenames_and_metadata_dict.update(dict_of_current_pdf) + return pdf_filenames_and_metadata_dict diff --git a/zip_download/.gitignore b/zip_download/.gitignore new file mode 100644 index 00000000..1a0e445b --- /dev/null +++ b/zip_download/.gitignore @@ -0,0 +1,3 @@ +./zip_extract +*.zip +*.json \ No newline at end of file