Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/multi run #23

Merged
merged 7 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,26 @@ positional arguments:
organization_name Names of the organizations to deduplicate.

optional arguments:
-h, --help show this help message and exit
--api-key API_KEY Admin API key
--api-url API_URL The API base URL to query
--commit Treat the API as writeable and commit the changes.
--debug Include debug output from urllib3.
--run-id RUN_ID An identifier for a single run of the deduplication
script.
--newest Keep the newest dataset and remove older ones
(by default the oldest is kept)
--geospatial This flag will allow us to toggle between identifier and guid;
it is defaulted to False which will use identifier.
--update-name Update the name of the kept package to be the standard
shortest name, whether that was the duplicate package
name or the to be kept package name.
--verbose, -v Include verbose log output.
-h, --help show this help message and exit
--api-key API_KEY Admin API key
--api-url API_URL The API base URL to query
--api-read-url API_READ_URL The API URL to use for read-only queries, to limit
the load on the read-write URL. Defaults to the
api-url, which defaults to read-write catalog.
--commit Treat the API as writeable and commit the changes.
--debug Include debug output from urllib3.
--run-id RUN_ID An identifier for a single run of the deduplication
script.
--newest Keep the newest dataset and remove older ones
(by default the oldest is kept)
--reverse Reverse the order of unique identifiers the script runs
through de-duping. Used when running twice in parallel.
--geospatial This flag will allow us to toggle between identifier and guid;
it is defaulted to identifier.
--update-name Update the name of the kept package to be the standard
shortest name, whether that was the duplicate package
name or the to be kept package name.
--verbose, -v Include verbose log output.
```


Expand Down
21 changes: 16 additions & 5 deletions dedupe/ckan_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,15 @@ class CkanApiClient(object):
Represents a client to query and submit requests to the CKAN API.
'''

def __init__(self, api_url, api_key, dry_run=True, identifier_type='identifier'):
def __init__(self, api_url, api_key, dry_run=True,
identifier_type='identifier', api_read_url=None, reverse=False):
self.api_url = api_url
if api_read_url is None:
self.api_read_url = api_url
else:
self.api_read_url = api_read_url
self.dry_run = dry_run
self.reverse = reverse
self.client = requests.Session()
adapter = requests.adapters.HTTPAdapter(max_retries=3)
self.client.mount('https://', adapter)
Expand All @@ -62,7 +68,10 @@ def __init__(self, api_url, api_key, dry_run=True, identifier_type='identifier')
self.identifier_type = identifier_type

def request(self, method, path, **kwargs):
url = '%s/api%s' % (self.api_url, path)
if method == 'POST':
url = '%s/api%s' % (self.api_url, path)
else:
url = '%s/api%s' % (self.api_read_url, path)

if self.dry_run and method not in READ_ONLY_METHODS:
raise DryRunException('Cannot call method in dry_run method=%s' % method)
Expand Down Expand Up @@ -124,9 +133,11 @@ def get_duplicate_identifiers(self, organization_name, is_collection):
'facet.mincount': 2,
'rows': 0,
})

return \
response.json()['result']['search_facets'][self.identifier_type]['items']

dupes = response.json()['result']['facets'][self.identifier_type]
# If you want to run 2 scripts in parallel, run one version with normal sort
# and another with `--reverse` flag
return sorted(dupes, reverse=self.reverse)

def get_dataset_count(self, organization_name, identifier, is_collection):
filter_query = \
Expand Down
2 changes: 1 addition & 1 deletion dedupe/deduper.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def _fetch_and_dedupe_identifiers(is_collection):
count = itertools.count(start=1)
# Work with the identifer name, since that's all we need and it's a
# little cleaner.
for identifier in (i['name'] for i in identifiers):
for identifier in identifiers:
if self.stopped:
raise DeduperStopException()

Expand Down
18 changes: 14 additions & 4 deletions duplicates-identifier-api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ def run():
parser.add_argument('--api-key', default=os.getenv('CKAN_API_KEY', None), help='Admin API key')
parser.add_argument('--api-url', default='https://admin-catalog-next.data.gov',
help='The API base URL to query')
parser.add_argument('--api-read-url', default=None,
help='The API base URL to query read-only info, for faster processing')
parser.add_argument('--commit', action='store_true',
help='Treat the API as writeable and commit the changes.')
parser.add_argument('--newest', action='store_true',
help='Keep the newest dataset and remove older ones (default keeps oldest)')
parser.add_argument('--reverse', action='store_true',
help='Reverse the order of ids to parse (for running with another script in parallel)')
parser.add_argument('--update-name', action='store_true',
help='Update the name of the kept package to be the standard shortest name, whether that was the duplicate package name or the to be kept package name.')
parser.add_argument('--debug', action='store_true',
Expand All @@ -63,8 +67,8 @@ def run():
help='Include verbose log output.')
parser.add_argument('organization_name', nargs='*',
help='Names of the organizations to deduplicate.')
parser.add_argument('--geospatial', default=False,
help='Identifier type')
parser.add_argument('--geospatial', action='store_true',
help='If the organization has geospatial metadata that should be de-duped')


args = parser.parse_args()
Expand All @@ -79,10 +83,16 @@ def run():
if dry_run:
log.info('Dry-run enabled')

identifier_type = 'guid' if args.geospatial == 'True' else 'identifier'
identifier_type = 'guid' if args.geospatial else 'identifier'

log.info('run_id=%s', args.run_id)
ckan_api = CkanApiClient(args.api_url, args.api_key, dry_run=dry_run, identifier_type=identifier_type)
ckan_api = CkanApiClient(args.api_url,
args.api_key,
dry_run=dry_run,
identifier_type=identifier_type,
api_read_url=args.api_read_url,
reverse=args.reverse)

duplicate_package_log = DuplicatePackageLog(api_url=args.api_url, run_id=args.run_id)
removed_package_log = RemovedPackageLog(run_id=args.run_id)

Expand Down