Skip to content

Commit

Permalink
Merge pull request #22 from GSA/change_identifier
Browse files Browse the repository at this point in the history
updated identifier to guid
  • Loading branch information
jbrown-xentity authored Nov 12, 2021
2 parents 723ac14 + 643e46c commit db53428
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 22 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ optional arguments:
script.
--newest Keep the newest dataset and remove older ones
(by default the oldest is kept)
--geospatial This flag will allow us to toggle between identifier and guid;
it is defaulted to False which will use identifier.
--update-name Update the name of the kept package to be the standard
shortest name, whether that was the duplicate package
name or the to be kept package name.
Expand Down
19 changes: 10 additions & 9 deletions dedupe/ckan_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class CkanApiClient(object):
Represents a client to query and submit requests to the CKAN API.
'''

def __init__(self, api_url, api_key, dry_run=True):
def __init__(self, api_url, api_key, dry_run=True, identifier_type='identifier'):
self.api_url = api_url
self.dry_run = dry_run
self.client = requests.Session()
Expand All @@ -59,6 +59,7 @@ def __init__(self, api_url, api_key, dry_run=True):
self.client.headers.update(Authorization=api_key)
# Set the auth_tkt cookie to talk to admin API
self.client.cookies = requests.cookies.cookiejar_from_dict(dict(auth_tkt='1'))
self.identifier_type = identifier_type

def request(self, method, path, **kwargs):
url = '%s/api%s' % (self.api_url, path)
Expand All @@ -85,8 +86,8 @@ def get(self, path, **kwargs):

def get_dataset(self, organization_name, identifier, is_collection, sort_order='asc'):
filter_query = \
'identifier:"%s" AND organization:"%s" AND type:dataset' % \
(identifier, organization_name)
'%s:"%s" AND organization:"%s" AND type:dataset' % \
(self.identifier_type, identifier, organization_name)
if is_collection:
filter_query = '%s AND collection_package_id:*' % filter_query

Expand Down Expand Up @@ -118,19 +119,19 @@ def get_duplicate_identifiers(self, organization_name, is_collection):

response = self.get('/3/action/package_search', params={
'fq': filter_query,
'facet.field': '["identifier"]',
'facet.field': '["'+self.identifier_type+'"]',
'facet.limit': -1,
'facet.mincount': 2,
'rows': 0,
})

return \
response.json()['result']['search_facets']['identifier']['items']
response.json()['result']['search_facets'][self.identifier_type]['items']

def get_dataset_count(self, organization_name, identifier, is_collection):
filter_query = \
'identifier:"%s" AND organization:"%s" AND type:dataset' % \
(identifier, organization_name)
'%s:"%s" AND organization:"%s" AND type:dataset' % \
(self.identifier_type, identifier, organization_name)
if is_collection:
filter_query = '%s AND collection_package_id:*' % filter_query

Expand Down Expand Up @@ -159,8 +160,8 @@ def get_datasets_in_collection(self, package_id):
def get_datasets(self, organization_name, identifier, start=0, rows=1000,
is_collection=False):
filter_query = \
'identifier:"%s" AND organization:"%s" AND type:dataset' % \
(identifier, organization_name)
'%s:"%s" AND organization:"%s" AND type:dataset' % \
(self.identifier_type, identifier, organization_name)
if is_collection:
filter_query = '%s AND collection_package_id:*' % filter_query

Expand Down
25 changes: 14 additions & 11 deletions dedupe/deduper.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def __init__(self,
collection_package_log=None,
run_id=None,
oldest=True,
update_name=False):
update_name=False,
identifier_type='identifier'):
self.organization_name = organization_name
self.ckan_api = ckan_api
self.log = ContextLoggerAdapter(module_log, {'organization': organization_name})
Expand All @@ -44,6 +45,7 @@ def __init__(self,
self.stopped = False
self.oldest = oldest
self.update_name = update_name
self.identifier_type = identifier_type

if not run_id:
run_id = datetime.now().strftime('%Y%m%d%H%M%S')
Expand Down Expand Up @@ -76,7 +78,7 @@ def _fetch_and_dedupe_identifiers(is_collection):
try:
identifiers = self.ckan_api.get_duplicate_identifiers(self.organization_name,
is_collection)
except CkanApiFailureException, exc:
except CkanApiFailureException as exc:
self.log.error('Failed to fetch %s dataset identifiers for organization', label)
self.log.exception(exc)
# continue onto the next organization
Expand All @@ -94,18 +96,18 @@ def _fetch_and_dedupe_identifiers(is_collection):
if self.stopped:
raise DeduperStopException()

self.log.info('Deduplicating identifier=%s progress=%r',
identifier, (next(count), len(identifiers)))
self.log.info('Deduplicating %s=%s progress=%r',
self.identifier_type, identifier, (next(count), len(identifiers)))
try:
duplicate_count += self.dedupe_identifier(identifier, is_collection)
except CkanApiFailureException:
self.log.error('Failed to dedupe identifier=%s', identifier)
self.log.error('Failed to dedupe %s=%s', self.identifier_type, identifier)
# Move on to next identifier
continue
except CkanApiCountException:
self.log.error('Got an invalid count, this may not be a duplicate or there '
'could be inconsistencies between db and solr. Try running the '
'db_solr_sync job. identifier=%s', identifier)
'db_solr_sync job. %s=%s', self.identifier_type, identifier)
# Move on to next identifier
continue

Expand Down Expand Up @@ -243,7 +245,7 @@ def dedupe_identifier(self, identifier, is_collection=False):

log = ContextLoggerAdapter(
module_log,
{'organization': self.organization_name, 'identifier': identifier},
{'organization': self.organization_name, self.identifier_type: identifier},
)

log.debug('Fetching number of datasets for unique identifier')
Expand All @@ -257,7 +259,8 @@ def dedupe_identifier(self, identifier, is_collection=False):

sort_order = 'asc' if self.oldest else 'desc'
# We want to keep the oldest dataset
self.log.debug('Fetching %s dataset for identifier=%s', 'oldest' if self.oldest else 'newest', identifier)
self.log.debug('Fetching %s dataset for %s=%s', 'oldest' if self.oldest else 'newest',
self.identifier_type, identifier)
retained_dataset = self.ckan_api.get_dataset(self.organization_name,
identifier,
is_collection,
Expand All @@ -283,8 +286,8 @@ def get_datasets(total, rows=1000):
start = 0
while start < total:
log.debug(
'Batch fetching datasets for identifier offset=%d rows=%d total=%d',
start, rows, total)
'Batch fetching datasets for %s offset=%d rows=%d total=%d',
self.identifier_type, start, rows, total)
datasets = self.ckan_api.get_datasets(self.organization_name, identifier, start, rows, is_collection)
if len(datasets) < 1:
log.warning('Got zero datasets from API offset=%d total=%d', start, total)
Expand Down Expand Up @@ -312,7 +315,7 @@ def get_datasets(total, rows=1000):
duplicate_count += 1
try:
self.remove_duplicate(dataset, retained_dataset)
except CkanApiFailureException, e:
except CkanApiFailureException as e:
log.error('Failed to remove dataset status_code=%s package=%r',
e.response.status_code, (dataset['id'], dataset['name']))
continue
Expand Down
10 changes: 8 additions & 2 deletions duplicates-identifier-api.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def run():
help='Include verbose log output.')
parser.add_argument('organization_name', nargs='*',
help='Names of the organizations to deduplicate.')
parser.add_argument('--geospatial', default=False,
help='Identifier type')


args = parser.parse_args()

Expand All @@ -76,8 +79,10 @@ def run():
if dry_run:
log.info('Dry-run enabled')

identifier_type = 'guid' if args.geospatial == 'True' else 'identifier'

log.info('run_id=%s', args.run_id)
ckan_api = CkanApiClient(args.api_url, args.api_key, dry_run=dry_run)
ckan_api = CkanApiClient(args.api_url, args.api_key, dry_run=dry_run, identifier_type=identifier_type)
duplicate_package_log = DuplicatePackageLog(api_url=args.api_url, run_id=args.run_id)
removed_package_log = RemovedPackageLog(run_id=args.run_id)

Expand Down Expand Up @@ -110,7 +115,8 @@ def run():
duplicate_package_log,
run_id=args.run_id,
oldest=not args.newest,
update_name=args.update_name)
update_name=args.update_name,
identifier_type=identifier_type)
deduper.dedupe()


Expand Down

0 comments on commit db53428

Please sign in to comment.