-
-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Management commands #29600
Draft
kaapstorm
wants to merge
7
commits into
nh/rep/create_stubs
Choose a base branch
from
nh/rep/commands
base: nh/rep/create_stubs
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Management commands #29600
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
b1eebf4
Update fire_repeaters command
kaapstorm 3a402fe
iter_sql_repeat_records_by_domain() safe prefetch
kaapstorm 4e21cd2
Update delete_duplicate_cancelled_records command
kaapstorm 5047662
Update generate_repeater_report command
kaapstorm 1705056
Update update_cancelled_records command
kaapstorm 7d8b9bd
⏪ Command to revert RepeatRecord migration
kaapstorm 181c70c
📝 Update docs
kaapstorm File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,7 @@ | ||
import datetime | ||
from typing import Iterator, List, Optional, Tuple | ||
|
||
from django.db.models import QuerySet | ||
|
||
from dimagi.utils.parsing import json_format_datetime | ||
|
||
|
@@ -138,6 +141,59 @@ def get_paged_sql_repeat_records(domain, skip, limit, repeater_id=None, state=No | |
.prefetch_related('sqlrepeatrecordattempt_set')) | ||
|
||
|
||
def iter_sql_repeat_records_by_domain( | ||
domain: str, | ||
repeater_id: Optional[str] = None, | ||
states: Optional[List[str]] = None, | ||
order_by: Optional[List[str]] = None, | ||
) -> Tuple[Iterator['SQLRepeatRecord'], int]: | ||
""" | ||
Returns an iterator of SQLRepeatRecords, and the total count | ||
""" | ||
from corehq.motech.repeaters.models import SQLRepeatRecord | ||
|
||
queryset = SQLRepeatRecord.objects.filter(domain=domain) | ||
if repeater_id: | ||
queryset = queryset.filter(repeater__couch_id=repeater_id) | ||
if states: | ||
queryset = queryset.filter(state__in=states) | ||
record_count = queryset.count() | ||
if order_by: | ||
queryset = queryset.order_by(order_by) | ||
|
||
return ( | ||
prefetch_attempts(queryset, record_count), | ||
record_count, | ||
) | ||
|
||
|
||
def prefetch_attempts( | ||
queryset: QuerySet, | ||
record_count: int, | ||
chunk_size: int = 1000, | ||
) -> Iterator['SQLRepeatRecord']: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. F821 undefined name 'SQLRepeatRecord' |
||
""" | ||
Prefetches SQLRepeatRecordAttempts for SQLRepeatRecords. Paginates | ||
querysets because prefetching loads both the primary queryset and | ||
the prefetched queryset into memory. | ||
""" | ||
for start, end in _pages(record_count, chunk_size): | ||
yield from (queryset[start:end] | ||
.prefetch_related('sqlrepeatrecordattempt_set')) | ||
|
||
|
||
def _pages(total: int, page_size: int) -> Iterator[Tuple[int, int]]: | ||
""" | ||
Return an interator of start-end pairs, given a total and page size. | ||
|
||
>>> list(_pages(10, 4)) | ||
[(0, 4), (4, 8), (8, 10)] | ||
""" | ||
for start in range(0, total, page_size): | ||
end = min(start + page_size, total) | ||
yield start, end | ||
|
||
|
||
def iter_repeat_records_by_domain(domain, repeater_id=None, state=None, chunk_size=1000): | ||
from .models import RepeatRecord | ||
kwargs = { | ||
|
158 changes: 78 additions & 80 deletions
158
corehq/motech/repeaters/management/commands/delete_duplicate_cancelled_records.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,106 +1,104 @@ | ||
import csv | ||
import datetime | ||
from collections import defaultdict | ||
from contextlib import contextmanager | ||
from inspect import cleandoc | ||
|
||
from django.core.management.base import BaseCommand | ||
|
||
from memoized import memoized | ||
|
||
from corehq.motech.repeaters.const import ( | ||
RECORD_CANCELLED_STATE, | ||
RECORD_SUCCESS_STATE, | ||
) | ||
from corehq.motech.repeaters.dbaccessors import iter_repeat_records_by_domain | ||
from corehq.motech.repeaters.models import Repeater, RepeatRecord | ||
from corehq.util.couch import IterDB | ||
from corehq.motech.repeaters.dbaccessors import ( | ||
iter_sql_repeat_records_by_domain, | ||
) | ||
from corehq.motech.repeaters.models import Repeater | ||
|
||
|
||
class Command(BaseCommand): | ||
help = """ | ||
help = cleandoc(""" | ||
If there are multiple cancelled repeat records for a given payload id, this | ||
will delete all but one for each payload, reducing the number of requests | ||
that must be made. It will also delete any cancelled repeat records for | ||
which there is a more recent successful record with the same payload_id. | ||
""" | ||
""") | ||
|
||
def add_arguments(self, parser): | ||
parser.add_argument( | ||
'domain', | ||
) | ||
parser.add_argument( | ||
'repeater_id', | ||
) | ||
|
||
@property | ||
@memoized | ||
def most_recent_success(self): | ||
res = {} | ||
for record in iter_repeat_records_by_domain( | ||
self.domain, repeater_id=self.repeater_id, state=RECORD_SUCCESS_STATE): | ||
if record.last_checked: | ||
res[record.payload_id] = max(res.get(record.payload_id, datetime.datetime.min), | ||
record.last_checked) | ||
return res | ||
parser.add_argument('domain') | ||
parser.add_argument('repeater_id') | ||
|
||
def handle(self, domain, repeater_id, *args, **options): | ||
self.domain = domain | ||
self.repeater_id = repeater_id | ||
repeater = Repeater.get(repeater_id) | ||
print("Looking up repeat records for '{}'".format(repeater.friendly_name)) | ||
# This differs from the original code as follows: | ||
# 1. It does not prompt for confirmation | ||
# 2. If a successful record has been resent, and that resent | ||
# record is cancelled, this function will keep the most | ||
# recent (cancelled) record and delete the older (successful) | ||
# record. The original code would keep the successful record | ||
# and delete the cancelled record. One could argue for both | ||
# approaches. The current behaviour respects the decision to | ||
# resend the successful payload. | ||
|
||
records, __ = iter_sql_repeat_records_by_domain( | ||
domain, | ||
repeater_id, | ||
states=[RECORD_SUCCESS_STATE, RECORD_CANCELLED_STATE], | ||
order_by=['payload_id', '-registered_at'], | ||
) | ||
last_payload_id = None | ||
with csv_log_writer(domain, repeater_id) as writer: | ||
for record in records: | ||
if record.payload_id != last_payload_id: | ||
last_payload_id = record.payload_id | ||
succeeded = record.state == RECORD_SUCCESS_STATE | ||
writer.writerow(get_latest_record_row(record)) | ||
continue | ||
writer.writerow(get_duplicate_record_row(record, succeeded)) | ||
record.delete() | ||
|
||
redundant_records = [] | ||
records_by_payload_id = defaultdict(list) | ||
records = iter_repeat_records_by_domain(domain, repeater_id=repeater_id, state=RECORD_CANCELLED_STATE) | ||
total_records = 0 | ||
for record in records: | ||
total_records += 1 | ||
most_recent_success = self.most_recent_success.get(record.payload_id) | ||
if most_recent_success and record.last_checked < most_recent_success: | ||
# another record with this payload has succeeded after this record failed | ||
redundant_records.append(record) | ||
else: | ||
records_by_payload_id[record.payload_id].append(record) | ||
|
||
unique_payloads = len(records_by_payload_id) | ||
redundant_payloads = len(redundant_records) | ||
print ("There are {total} total cancelled records, {redundant} with payloads which " | ||
"have since succeeded, and {unique} unsent unique payload ids." | ||
.format(total=total_records, | ||
redundant=redundant_payloads, | ||
unique=unique_payloads)) | ||
print("Delete {} duplicate records?".format(total_records - unique_payloads)) | ||
if not input("(y/n)") == 'y': | ||
print("Aborting") | ||
return | ||
@contextmanager | ||
def csv_log_writer(domain, repeater_id): | ||
repeater = Repeater.get(repeater_id) | ||
assert repeater.domain == domain | ||
filename = "cancelled_{}_records-{}.csv".format( | ||
repeater.__class__.__name__, | ||
datetime.datetime.utcnow().isoformat()) | ||
print("Writing log of changes to {}".format(filename)) | ||
with open(filename, 'w', encoding='utf-8') as f: | ||
writer = csv.writer(f) | ||
writer.writerow(( | ||
'RepeatRecord ID', | ||
'Payload ID', | ||
'Failure Reason', | ||
'Deleted?', | ||
'Reason', | ||
)) | ||
yield writer | ||
|
||
redundant_log = self.delete_already_successful_records(redundant_records) | ||
duplicates_log = self.resolve_duplicates(records_by_payload_id) | ||
|
||
filename = "cancelled_{}_records-{}.csv".format( | ||
repeater.__class__.__name__, | ||
datetime.datetime.utcnow().isoformat()) | ||
print("Writing log of changes to {}".format(filename)) | ||
with open(filename, 'w', encoding='utf-8') as f: | ||
writer = csv.writer(f) | ||
writer.writerow(('RepeatRecord ID', 'Payload ID', 'Failure Reason', 'Deleted?', 'Reason')) | ||
writer.writerows(redundant_log) | ||
writer.writerows(duplicates_log) | ||
def get_latest_record_row(record): | ||
if record.state == RECORD_CANCELLED_STATE: | ||
failure_reason = list(record.attempts)[-1].message | ||
else: | ||
failure_reason = '' | ||
return ( | ||
record.pk, | ||
record.payload_id, | ||
failure_reason, | ||
'No', | ||
'', | ||
) | ||
|
||
def resolve_duplicates(self, records_by_payload_id): | ||
log = [] | ||
with IterDB(RepeatRecord.get_db()) as iter_db: | ||
for payload_id, records in records_by_payload_id.items(): | ||
log.append((records[0]._id, payload_id, records[0].failure_reason, 'No', '')) | ||
if len(records) > 1: | ||
for record in records[1:]: | ||
iter_db.delete(record) | ||
log.append((record._id, payload_id, record.failure_reason, 'Yes', 'Duplicate')) | ||
return log | ||
|
||
def delete_already_successful_records(self, redundant_records): | ||
log = [] | ||
with IterDB(RepeatRecord.get_db()) as iter_db: | ||
for record in redundant_records: | ||
iter_db.delete(record) | ||
log.append((record._id, record.payload_id, record.failure_reason, 'Yes', 'Already Sent')) | ||
return log | ||
def get_duplicate_record_row(record, succeeded): | ||
if record.state == RECORD_CANCELLED_STATE: | ||
failure_reason = list(record.attempts)[-1].message | ||
else: | ||
failure_reason = '' | ||
return ( | ||
record.pk, | ||
record.payload_id, | ||
failure_reason, | ||
'Yes', | ||
'Already Sent' if succeeded else 'Duplicate', | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
F821 undefined name 'SQLRepeatRecord'