diff --git a/README.md b/README.md index 001969a..88d1680 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,8 @@ optional arguments: script. --newest Keep the newest dataset and remove older ones (by default the oldest is kept) + --geospatial This flag will allow us to toggle between identifier and guid; + it is defaulted to False which will use identifier. --update-name Update the name of the kept package to be the standard shortest name, whether that was the duplicate package name or the to be kept package name. diff --git a/dedupe/ckan_api.py b/dedupe/ckan_api.py index 19513f0..4ef44c6 100644 --- a/dedupe/ckan_api.py +++ b/dedupe/ckan_api.py @@ -50,7 +50,7 @@ class CkanApiClient(object): Represents a client to query and submit requests to the CKAN API. ''' - def __init__(self, api_url, api_key, dry_run=True): + def __init__(self, api_url, api_key, dry_run=True, identifier_type='identifier'): self.api_url = api_url self.dry_run = dry_run self.client = requests.Session() @@ -59,6 +59,7 @@ def __init__(self, api_url, api_key, dry_run=True): self.client.headers.update(Authorization=api_key) # Set the auth_tkt cookie to talk to admin API self.client.cookies = requests.cookies.cookiejar_from_dict(dict(auth_tkt='1')) + self.identifier_type = identifier_type def request(self, method, path, **kwargs): url = '%s/api%s' % (self.api_url, path) @@ -85,8 +86,8 @@ def get(self, path, **kwargs): def get_dataset(self, organization_name, identifier, is_collection, sort_order='asc'): filter_query = \ - 'identifier:"%s" AND organization:"%s" AND type:dataset' % \ - (identifier, organization_name) + '%s:"%s" AND organization:"%s" AND type:dataset' % \ + (self.identifier_type, identifier, organization_name) if is_collection: filter_query = '%s AND collection_package_id:*' % filter_query @@ -118,19 +119,19 @@ def get_duplicate_identifiers(self, organization_name, is_collection): response = self.get('/3/action/package_search', params={ 'fq': filter_query, - 'facet.field': '["identifier"]', + 'facet.field': '["'+self.identifier_type+'"]', 'facet.limit': -1, 'facet.mincount': 2, 'rows': 0, }) return \ - response.json()['result']['search_facets']['identifier']['items'] + response.json()['result']['search_facets'][self.identifier_type]['items'] def get_dataset_count(self, organization_name, identifier, is_collection): filter_query = \ - 'identifier:"%s" AND organization:"%s" AND type:dataset' % \ - (identifier, organization_name) + '%s:"%s" AND organization:"%s" AND type:dataset' % \ + (self.identifier_type, identifier, organization_name) if is_collection: filter_query = '%s AND collection_package_id:*' % filter_query @@ -159,8 +160,8 @@ def get_datasets_in_collection(self, package_id): def get_datasets(self, organization_name, identifier, start=0, rows=1000, is_collection=False): filter_query = \ - 'identifier:"%s" AND organization:"%s" AND type:dataset' % \ - (identifier, organization_name) + '%s:"%s" AND organization:"%s" AND type:dataset' % \ + (self.identifier_type, identifier, organization_name) if is_collection: filter_query = '%s AND collection_package_id:*' % filter_query diff --git a/dedupe/deduper.py b/dedupe/deduper.py index f6c91a3..f51a86b 100644 --- a/dedupe/deduper.py +++ b/dedupe/deduper.py @@ -34,7 +34,8 @@ def __init__(self, collection_package_log=None, run_id=None, oldest=True, - update_name=False): + update_name=False, + identifier_type='identifier'): self.organization_name = organization_name self.ckan_api = ckan_api self.log = ContextLoggerAdapter(module_log, {'organization': organization_name}) @@ -44,6 +45,7 @@ def __init__(self, self.stopped = False self.oldest = oldest self.update_name = update_name + self.identifier_type = identifier_type if not run_id: run_id = datetime.now().strftime('%Y%m%d%H%M%S') @@ -76,7 +78,7 @@ def _fetch_and_dedupe_identifiers(is_collection): try: identifiers = self.ckan_api.get_duplicate_identifiers(self.organization_name, is_collection) - except CkanApiFailureException, exc: + except CkanApiFailureException as exc: self.log.error('Failed to fetch %s dataset identifiers for organization', label) self.log.exception(exc) # continue onto the next organization @@ -94,18 +96,18 @@ def _fetch_and_dedupe_identifiers(is_collection): if self.stopped: raise DeduperStopException() - self.log.info('Deduplicating identifier=%s progress=%r', - identifier, (next(count), len(identifiers))) + self.log.info('Deduplicating %s=%s progress=%r', + self.identifier_type, identifier, (next(count), len(identifiers))) try: duplicate_count += self.dedupe_identifier(identifier, is_collection) except CkanApiFailureException: - self.log.error('Failed to dedupe identifier=%s', identifier) + self.log.error('Failed to dedupe %s=%s', self.identifier_type, identifier) # Move on to next identifier continue except CkanApiCountException: self.log.error('Got an invalid count, this may not be a duplicate or there ' 'could be inconsistencies between db and solr. Try running the ' - 'db_solr_sync job. identifier=%s', identifier) + 'db_solr_sync job. %s=%s', self.identifier_type, identifier) # Move on to next identifier continue @@ -243,7 +245,7 @@ def dedupe_identifier(self, identifier, is_collection=False): log = ContextLoggerAdapter( module_log, - {'organization': self.organization_name, 'identifier': identifier}, + {'organization': self.organization_name, self.identifier_type: identifier}, ) log.debug('Fetching number of datasets for unique identifier') @@ -257,7 +259,8 @@ def dedupe_identifier(self, identifier, is_collection=False): sort_order = 'asc' if self.oldest else 'desc' # We want to keep the oldest dataset - self.log.debug('Fetching %s dataset for identifier=%s', 'oldest' if self.oldest else 'newest', identifier) + self.log.debug('Fetching %s dataset for %s=%s', 'oldest' if self.oldest else 'newest', + self.identifier_type, identifier) retained_dataset = self.ckan_api.get_dataset(self.organization_name, identifier, is_collection, @@ -283,8 +286,8 @@ def get_datasets(total, rows=1000): start = 0 while start < total: log.debug( - 'Batch fetching datasets for identifier offset=%d rows=%d total=%d', - start, rows, total) + 'Batch fetching datasets for %s offset=%d rows=%d total=%d', + self.identifier_type, start, rows, total) datasets = self.ckan_api.get_datasets(self.organization_name, identifier, start, rows, is_collection) if len(datasets) < 1: log.warning('Got zero datasets from API offset=%d total=%d', start, total) @@ -312,7 +315,7 @@ def get_datasets(total, rows=1000): duplicate_count += 1 try: self.remove_duplicate(dataset, retained_dataset) - except CkanApiFailureException, e: + except CkanApiFailureException as e: log.error('Failed to remove dataset status_code=%s package=%r', e.response.status_code, (dataset['id'], dataset['name'])) continue diff --git a/duplicates-identifier-api.py b/duplicates-identifier-api.py index d39a61b..e80fc7a 100644 --- a/duplicates-identifier-api.py +++ b/duplicates-identifier-api.py @@ -63,6 +63,9 @@ def run(): help='Include verbose log output.') parser.add_argument('organization_name', nargs='*', help='Names of the organizations to deduplicate.') + parser.add_argument('--geospatial', default=False, + help='Identifier type') + args = parser.parse_args() @@ -76,8 +79,10 @@ def run(): if dry_run: log.info('Dry-run enabled') + identifier_type = 'guid' if args.geospatial == 'True' else 'identifier' + log.info('run_id=%s', args.run_id) - ckan_api = CkanApiClient(args.api_url, args.api_key, dry_run=dry_run) + ckan_api = CkanApiClient(args.api_url, args.api_key, dry_run=dry_run, identifier_type=identifier_type) duplicate_package_log = DuplicatePackageLog(api_url=args.api_url, run_id=args.run_id) removed_package_log = RemovedPackageLog(run_id=args.run_id) @@ -110,7 +115,8 @@ def run(): duplicate_package_log, run_id=args.run_id, oldest=not args.newest, - update_name=args.update_name) + update_name=args.update_name, + identifier_type=identifier_type) deduper.dedupe()