From 32d6477ffca34872496db3a7c10adc24cc3b6733 Mon Sep 17 00:00:00 2001 From: jbrown-xentity Date: Fri, 5 Nov 2021 15:20:25 -0600 Subject: [PATCH 1/5] Use sort to run multiple instances --- dedupe/ckan_api.py | 8 ++++++-- dedupe/deduper.py | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dedupe/ckan_api.py b/dedupe/ckan_api.py index 19513f0..4f0fb4f 100644 --- a/dedupe/ckan_api.py +++ b/dedupe/ckan_api.py @@ -123,9 +123,13 @@ def get_duplicate_identifiers(self, organization_name, is_collection): 'facet.mincount': 2, 'rows': 0, }) + + dupes = response.json()['result']['facets']['identifier'] + # If you want to run 2 scripts in parallel, run one version with normal sort + # and another with `reverse=True` - return \ - response.json()['result']['search_facets']['identifier']['items'] + return sorted(dupes, reverse=True) + def get_dataset_count(self, organization_name, identifier, is_collection): filter_query = \ diff --git a/dedupe/deduper.py b/dedupe/deduper.py index f6c91a3..760b96c 100644 --- a/dedupe/deduper.py +++ b/dedupe/deduper.py @@ -90,7 +90,7 @@ def _fetch_and_dedupe_identifiers(is_collection): count = itertools.count(start=1) # Work with the identifer name, since that's all we need and it's a # little cleaner. - for identifier in (i['name'] for i in identifiers): + for identifier in identifiers: if self.stopped: raise DeduperStopException() From 663e93093e2f115337df3a7efa7e9ae4a7f4ab3a Mon Sep 17 00:00:00 2001 From: jbrown-xentity Date: Wed, 10 Nov 2021 16:05:06 -0700 Subject: [PATCH 2/5] Implement reverse functionality Allow reverse flag to reverse sort identifiers to de-dupe; allowing 2 scripts to run in parallel --- dedupe/ckan_api.py | 10 ++++++---- dedupe/deduper.py | 6 ++++-- duplicates-identifier-api.py | 5 ++++- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/dedupe/ckan_api.py b/dedupe/ckan_api.py index 4f0fb4f..a503859 100644 --- a/dedupe/ckan_api.py +++ b/dedupe/ckan_api.py @@ -61,7 +61,10 @@ def __init__(self, api_url, api_key, dry_run=True): self.client.cookies = requests.cookies.cookiejar_from_dict(dict(auth_tkt='1')) def request(self, method, path, **kwargs): - url = '%s/api%s' % (self.api_url, path) + if method == 'POST': + url = '%s/api%s' % (self.api_url, path) + else: + url = '%s/api%s' % ("https://catalog.data.gov", path) if self.dry_run and method not in READ_ONLY_METHODS: raise DryRunException('Cannot call method in dry_run method=%s' % method) @@ -111,7 +114,7 @@ def get_dataset(self, organization_name, identifier, is_collection, sort_order=' return results[0] - def get_duplicate_identifiers(self, organization_name, is_collection): + def get_duplicate_identifiers(self, organization_name, is_collection, reverse=False): filter_query = 'organization:"%s" AND type:dataset' % organization_name if is_collection: filter_query = '%s AND collection_package_id:*' % filter_query @@ -127,8 +130,7 @@ def get_duplicate_identifiers(self, organization_name, is_collection): dupes = response.json()['result']['facets']['identifier'] # If you want to run 2 scripts in parallel, run one version with normal sort # and another with `reverse=True` - - return sorted(dupes, reverse=True) + return sorted(dupes, reverse=reverse) def get_dataset_count(self, organization_name, identifier, is_collection): diff --git a/dedupe/deduper.py b/dedupe/deduper.py index 760b96c..07b999a 100644 --- a/dedupe/deduper.py +++ b/dedupe/deduper.py @@ -34,7 +34,8 @@ def __init__(self, collection_package_log=None, run_id=None, oldest=True, - update_name=False): + update_name=False, + reverse=False): self.organization_name = organization_name self.ckan_api = ckan_api self.log = ContextLoggerAdapter(module_log, {'organization': organization_name}) @@ -44,6 +45,7 @@ def __init__(self, self.stopped = False self.oldest = oldest self.update_name = update_name + self.reverse = reverse if not run_id: run_id = datetime.now().strftime('%Y%m%d%H%M%S') @@ -75,7 +77,7 @@ def _fetch_and_dedupe_identifiers(is_collection): self.log.debug('Fetching %s dataset identifiers with duplicates', label) try: identifiers = self.ckan_api.get_duplicate_identifiers(self.organization_name, - is_collection) + is_collection, reverse=self.reverse) except CkanApiFailureException, exc: self.log.error('Failed to fetch %s dataset identifiers for organization', label) self.log.exception(exc) diff --git a/duplicates-identifier-api.py b/duplicates-identifier-api.py index d39a61b..655aac4 100644 --- a/duplicates-identifier-api.py +++ b/duplicates-identifier-api.py @@ -53,6 +53,8 @@ def run(): help='Treat the API as writeable and commit the changes.') parser.add_argument('--newest', action='store_true', help='Keep the newest dataset and remove older ones (default keeps oldest)') + parser.add_argument('--reverse', action='store_true', + help='Reverse the order of ids to parse (for running with another script in parallel)') parser.add_argument('--update-name', action='store_true', help='Update the name of the kept package to be the standard shortest name, whether that was the duplicate package name or the to be kept package name.') parser.add_argument('--debug', action='store_true', @@ -110,7 +112,8 @@ def run(): duplicate_package_log, run_id=args.run_id, oldest=not args.newest, - update_name=args.update_name) + update_name=args.update_name, + reverse=args.reverse) deduper.dedupe() From bc08118cda77ed3ddfa54110d4e1b5c4f2d6a00f Mon Sep 17 00:00:00 2001 From: jbrown-xentity Date: Fri, 12 Nov 2021 09:51:48 -0700 Subject: [PATCH 3/5] Update reverse functionality --- dedupe/ckan_api.py | 15 ++++++++++----- dedupe/deduper.py | 6 ++---- duplicates-identifier-api.py | 11 ++++++++--- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/dedupe/ckan_api.py b/dedupe/ckan_api.py index a503859..35075fe 100644 --- a/dedupe/ckan_api.py +++ b/dedupe/ckan_api.py @@ -50,9 +50,14 @@ class CkanApiClient(object): Represents a client to query and submit requests to the CKAN API. ''' - def __init__(self, api_url, api_key, dry_run=True): + def __init__(self, api_url, api_key, dry_run=True, api_read_url=None, reverse=False): self.api_url = api_url + if api_read_url is None: + self.api_read_url = api_url + else: + self.api_read_url = api_read_url self.dry_run = dry_run + self.reverse = reverse self.client = requests.Session() adapter = requests.adapters.HTTPAdapter(max_retries=3) self.client.mount('https://', adapter) @@ -64,7 +69,7 @@ def request(self, method, path, **kwargs): if method == 'POST': url = '%s/api%s' % (self.api_url, path) else: - url = '%s/api%s' % ("https://catalog.data.gov", path) + url = '%s/api%s' % (self.api_read_url, path) if self.dry_run and method not in READ_ONLY_METHODS: raise DryRunException('Cannot call method in dry_run method=%s' % method) @@ -114,7 +119,7 @@ def get_dataset(self, organization_name, identifier, is_collection, sort_order=' return results[0] - def get_duplicate_identifiers(self, organization_name, is_collection, reverse=False): + def get_duplicate_identifiers(self, organization_name, is_collection): filter_query = 'organization:"%s" AND type:dataset' % organization_name if is_collection: filter_query = '%s AND collection_package_id:*' % filter_query @@ -129,8 +134,8 @@ def get_duplicate_identifiers(self, organization_name, is_collection, reverse=Fa dupes = response.json()['result']['facets']['identifier'] # If you want to run 2 scripts in parallel, run one version with normal sort - # and another with `reverse=True` - return sorted(dupes, reverse=reverse) + # and another with `--reverse` flag + return sorted(dupes, reverse=self.reverse) def get_dataset_count(self, organization_name, identifier, is_collection): diff --git a/dedupe/deduper.py b/dedupe/deduper.py index 07b999a..760b96c 100644 --- a/dedupe/deduper.py +++ b/dedupe/deduper.py @@ -34,8 +34,7 @@ def __init__(self, collection_package_log=None, run_id=None, oldest=True, - update_name=False, - reverse=False): + update_name=False): self.organization_name = organization_name self.ckan_api = ckan_api self.log = ContextLoggerAdapter(module_log, {'organization': organization_name}) @@ -45,7 +44,6 @@ def __init__(self, self.stopped = False self.oldest = oldest self.update_name = update_name - self.reverse = reverse if not run_id: run_id = datetime.now().strftime('%Y%m%d%H%M%S') @@ -77,7 +75,7 @@ def _fetch_and_dedupe_identifiers(is_collection): self.log.debug('Fetching %s dataset identifiers with duplicates', label) try: identifiers = self.ckan_api.get_duplicate_identifiers(self.organization_name, - is_collection, reverse=self.reverse) + is_collection) except CkanApiFailureException, exc: self.log.error('Failed to fetch %s dataset identifiers for organization', label) self.log.exception(exc) diff --git a/duplicates-identifier-api.py b/duplicates-identifier-api.py index 655aac4..b061c9d 100644 --- a/duplicates-identifier-api.py +++ b/duplicates-identifier-api.py @@ -49,6 +49,8 @@ def run(): parser.add_argument('--api-key', default=os.getenv('CKAN_API_KEY', None), help='Admin API key') parser.add_argument('--api-url', default='https://admin-catalog-next.data.gov', help='The API base URL to query') + parser.add_argument('--api-read-url', default=None, + help='The API base URL to query read-only info, for faster processing') parser.add_argument('--commit', action='store_true', help='Treat the API as writeable and commit the changes.') parser.add_argument('--newest', action='store_true', @@ -79,7 +81,11 @@ def run(): log.info('Dry-run enabled') log.info('run_id=%s', args.run_id) - ckan_api = CkanApiClient(args.api_url, args.api_key, dry_run=dry_run) + ckan_api = CkanApiClient(args.api_url, + args.api_key, + dry_run=dry_run, + api_read_url=args.api_read_url, + reverse=args.reverse) duplicate_package_log = DuplicatePackageLog(api_url=args.api_url, run_id=args.run_id) removed_package_log = RemovedPackageLog(run_id=args.run_id) @@ -112,8 +118,7 @@ def run(): duplicate_package_log, run_id=args.run_id, oldest=not args.newest, - update_name=args.update_name, - reverse=args.reverse) + update_name=args.update_name) deduper.dedupe() From 04970cdecc0a4360dd69da2af01122e4cdcea507 Mon Sep 17 00:00:00 2001 From: jbrown-xentity Date: Fri, 12 Nov 2021 10:04:58 -0700 Subject: [PATCH 4/5] Add readme updates --- README.md | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 001969a..b6a1af5 100644 --- a/README.md +++ b/README.md @@ -34,15 +34,21 @@ positional arguments: organization_name Names of the organizations to deduplicate. optional arguments: - -h, --help show this help message and exit - --api-key API_KEY Admin API key - --api-url API_URL The API base URL to query - --commit Treat the API as writeable and commit the changes. - --debug Include debug output from urllib3. - --run-id RUN_ID An identifier for a single run of the deduplication - script. - --newest Keep the newest dataset and remove older ones - (by default the oldest is kept) + -h, --help show this help message and exit + --api-key API_KEY Admin API key + --api-url API_URL The API base URL to query + --api-read-url API_READ_URL The API URL to use for read-only queries, to limit + the load on the read-write URL. Defaults to the + api-url, which defaults to read-write catalog. + --commit Treat the API as writeable and commit the changes. + --debug Include debug output from urllib3. + --run-id RUN_ID An identifier for a single run of the deduplication + script. + --newest Keep the newest dataset and remove older ones + (by default the oldest is kept) + --reverse Reverse the order of unique identifiers the script runs + through de-duping. Used when running twice in parallel. + --update-name Update the name of the kept package to be the standard shortest name, whether that was the duplicate package name or the to be kept package name. From e6ebde180ac1fb19b0b4bcc4def3905e09bf39f4 Mon Sep 17 00:00:00 2001 From: jbrown-xentity Date: Fri, 12 Nov 2021 10:10:26 -0700 Subject: [PATCH 5/5] Cleanup readme and geospatial flag --- README.md | 3 ++- duplicates-identifier-api.py | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 688b61b..a36bbd5 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,8 @@ optional arguments: (by default the oldest is kept) --reverse Reverse the order of unique identifiers the script runs through de-duping. Used when running twice in parallel. - + --geospatial This flag will allow us to toggle between identifier and guid; + it is defaulted to identifier. --update-name Update the name of the kept package to be the standard shortest name, whether that was the duplicate package name or the to be kept package name. diff --git a/duplicates-identifier-api.py b/duplicates-identifier-api.py index bf6677d..94e9538 100644 --- a/duplicates-identifier-api.py +++ b/duplicates-identifier-api.py @@ -67,8 +67,8 @@ def run(): help='Include verbose log output.') parser.add_argument('organization_name', nargs='*', help='Names of the organizations to deduplicate.') - parser.add_argument('--geospatial', default=False, - help='Identifier type') + parser.add_argument('--geospatial', action='store_true', + help='If the organization has geospatial metadata that should be de-duped') args = parser.parse_args() @@ -83,7 +83,7 @@ def run(): if dry_run: log.info('Dry-run enabled') - identifier_type = 'guid' if args.geospatial == 'True' else 'identifier' + identifier_type = 'guid' if args.geospatial else 'identifier' log.info('run_id=%s', args.run_id) ckan_api = CkanApiClient(args.api_url,