Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Management commands #29600

Draft
wants to merge 7 commits into
base: nh/rep/create_stubs
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions corehq/apps/cleanup/management/commands/fire_repeaters.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import datetime
import sys

from django.core.management.base import BaseCommand

from corehq.motech.repeaters.models import RepeatRecord
from corehq.motech.repeaters.const import (
RECORD_FAILURE_STATE,
RECORD_PENDING_STATE,
)
from corehq.motech.repeaters.models import RepeaterStub, domain_can_forward
from corehq.motech.repeaters.tasks import process_repeater_stub


class Command(BaseCommand):
Expand All @@ -12,8 +17,17 @@ def add_arguments(self, parser):
parser.add_argument('domain')

def handle(self, domain, **options):
next_year = datetime.datetime.utcnow() + datetime.timedelta(days=365)
records = RepeatRecord.all(domain=domain, due_before=next_year) # Excludes succeeded and cancelled
for record in records:
record.fire(force_send=True)
print('{} {}'.format(record._id, 'successful' if record.succeeded else 'failed'))
if not domain_can_forward(domain):
print('Domain does not have Data Forwarding or Zapier Integration '
'enabled.', file=sys.stderr)
sys.exit(1)
for repeater_stub in RepeaterStub.objects.filter(
domain=domain,
is_paused=False,
repeat_records__state__in=(RECORD_PENDING_STATE,
RECORD_FAILURE_STATE)
# Compare filters to RepeaterStubManager.all_ready(). This
# command will ignore whether RepeaterStub is waiting for a
# retry interval to pass.
):
process_repeater_stub.delay(repeater_stub)
56 changes: 56 additions & 0 deletions corehq/motech/repeaters/dbaccessors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import datetime
from typing import Iterator, List, Optional, Tuple

from django.db.models import QuerySet

from dimagi.utils.parsing import json_format_datetime

Expand Down Expand Up @@ -138,6 +141,59 @@ def get_paged_sql_repeat_records(domain, skip, limit, repeater_id=None, state=No
.prefetch_related('sqlrepeatrecordattempt_set'))


def iter_sql_repeat_records_by_domain(
domain: str,
repeater_id: Optional[str] = None,
states: Optional[List[str]] = None,
order_by: Optional[List[str]] = None,
) -> Tuple[Iterator['SQLRepeatRecord'], int]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F821 undefined name 'SQLRepeatRecord'

"""
Returns an iterator of SQLRepeatRecords, and the total count
"""
from corehq.motech.repeaters.models import SQLRepeatRecord

queryset = SQLRepeatRecord.objects.filter(domain=domain)
if repeater_id:
queryset = queryset.filter(repeater__couch_id=repeater_id)
if states:
queryset = queryset.filter(state__in=states)
record_count = queryset.count()
if order_by:
queryset = queryset.order_by(order_by)

return (
prefetch_attempts(queryset, record_count),
record_count,
)


def prefetch_attempts(
queryset: QuerySet,
record_count: int,
chunk_size: int = 1000,
) -> Iterator['SQLRepeatRecord']:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F821 undefined name 'SQLRepeatRecord'

"""
Prefetches SQLRepeatRecordAttempts for SQLRepeatRecords. Paginates
querysets because prefetching loads both the primary queryset and
the prefetched queryset into memory.
"""
for start, end in _pages(record_count, chunk_size):
yield from (queryset[start:end]
.prefetch_related('sqlrepeatrecordattempt_set'))


def _pages(total: int, page_size: int) -> Iterator[Tuple[int, int]]:
"""
Return an interator of start-end pairs, given a total and page size.

>>> list(_pages(10, 4))
[(0, 4), (4, 8), (8, 10)]
"""
for start in range(0, total, page_size):
end = min(start + page_size, total)
yield start, end


def iter_repeat_records_by_domain(domain, repeater_id=None, state=None, chunk_size=1000):
from .models import RepeatRecord
kwargs = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,106 +1,104 @@
import csv
import datetime
from collections import defaultdict
from contextlib import contextmanager
from inspect import cleandoc

from django.core.management.base import BaseCommand

from memoized import memoized

from corehq.motech.repeaters.const import (
RECORD_CANCELLED_STATE,
RECORD_SUCCESS_STATE,
)
from corehq.motech.repeaters.dbaccessors import iter_repeat_records_by_domain
from corehq.motech.repeaters.models import Repeater, RepeatRecord
from corehq.util.couch import IterDB
from corehq.motech.repeaters.dbaccessors import (
iter_sql_repeat_records_by_domain,
)
from corehq.motech.repeaters.models import Repeater


class Command(BaseCommand):
help = """
help = cleandoc("""
If there are multiple cancelled repeat records for a given payload id, this
will delete all but one for each payload, reducing the number of requests
that must be made. It will also delete any cancelled repeat records for
which there is a more recent successful record with the same payload_id.
"""
""")

def add_arguments(self, parser):
parser.add_argument(
'domain',
)
parser.add_argument(
'repeater_id',
)

@property
@memoized
def most_recent_success(self):
res = {}
for record in iter_repeat_records_by_domain(
self.domain, repeater_id=self.repeater_id, state=RECORD_SUCCESS_STATE):
if record.last_checked:
res[record.payload_id] = max(res.get(record.payload_id, datetime.datetime.min),
record.last_checked)
return res
parser.add_argument('domain')
parser.add_argument('repeater_id')

def handle(self, domain, repeater_id, *args, **options):
self.domain = domain
self.repeater_id = repeater_id
repeater = Repeater.get(repeater_id)
print("Looking up repeat records for '{}'".format(repeater.friendly_name))
# This differs from the original code as follows:
# 1. It does not prompt for confirmation
# 2. If a successful record has been resent, and that resent
# record is cancelled, this function will keep the most
# recent (cancelled) record and delete the older (successful)
# record. The original code would keep the successful record
# and delete the cancelled record. One could argue for both
# approaches. The current behaviour respects the decision to
# resend the successful payload.

records, __ = iter_sql_repeat_records_by_domain(
domain,
repeater_id,
states=[RECORD_SUCCESS_STATE, RECORD_CANCELLED_STATE],
order_by=['payload_id', '-registered_at'],
)
last_payload_id = None
with csv_log_writer(domain, repeater_id) as writer:
for record in records:
if record.payload_id != last_payload_id:
last_payload_id = record.payload_id
succeeded = record.state == RECORD_SUCCESS_STATE
writer.writerow(get_latest_record_row(record))
continue
writer.writerow(get_duplicate_record_row(record, succeeded))
record.delete()

redundant_records = []
records_by_payload_id = defaultdict(list)
records = iter_repeat_records_by_domain(domain, repeater_id=repeater_id, state=RECORD_CANCELLED_STATE)
total_records = 0
for record in records:
total_records += 1
most_recent_success = self.most_recent_success.get(record.payload_id)
if most_recent_success and record.last_checked < most_recent_success:
# another record with this payload has succeeded after this record failed
redundant_records.append(record)
else:
records_by_payload_id[record.payload_id].append(record)

unique_payloads = len(records_by_payload_id)
redundant_payloads = len(redundant_records)
print ("There are {total} total cancelled records, {redundant} with payloads which "
"have since succeeded, and {unique} unsent unique payload ids."
.format(total=total_records,
redundant=redundant_payloads,
unique=unique_payloads))
print("Delete {} duplicate records?".format(total_records - unique_payloads))
if not input("(y/n)") == 'y':
print("Aborting")
return
@contextmanager
def csv_log_writer(domain, repeater_id):
repeater = Repeater.get(repeater_id)
assert repeater.domain == domain
filename = "cancelled_{}_records-{}.csv".format(
repeater.__class__.__name__,
datetime.datetime.utcnow().isoformat())
print("Writing log of changes to {}".format(filename))
with open(filename, 'w', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow((
'RepeatRecord ID',
'Payload ID',
'Failure Reason',
'Deleted?',
'Reason',
))
yield writer

redundant_log = self.delete_already_successful_records(redundant_records)
duplicates_log = self.resolve_duplicates(records_by_payload_id)

filename = "cancelled_{}_records-{}.csv".format(
repeater.__class__.__name__,
datetime.datetime.utcnow().isoformat())
print("Writing log of changes to {}".format(filename))
with open(filename, 'w', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(('RepeatRecord ID', 'Payload ID', 'Failure Reason', 'Deleted?', 'Reason'))
writer.writerows(redundant_log)
writer.writerows(duplicates_log)
def get_latest_record_row(record):
if record.state == RECORD_CANCELLED_STATE:
failure_reason = list(record.attempts)[-1].message
else:
failure_reason = ''
return (
record.pk,
record.payload_id,
failure_reason,
'No',
'',
)

def resolve_duplicates(self, records_by_payload_id):
log = []
with IterDB(RepeatRecord.get_db()) as iter_db:
for payload_id, records in records_by_payload_id.items():
log.append((records[0]._id, payload_id, records[0].failure_reason, 'No', ''))
if len(records) > 1:
for record in records[1:]:
iter_db.delete(record)
log.append((record._id, payload_id, record.failure_reason, 'Yes', 'Duplicate'))
return log

def delete_already_successful_records(self, redundant_records):
log = []
with IterDB(RepeatRecord.get_db()) as iter_db:
for record in redundant_records:
iter_db.delete(record)
log.append((record._id, record.payload_id, record.failure_reason, 'Yes', 'Already Sent'))
return log
def get_duplicate_record_row(record, succeeded):
if record.state == RECORD_CANCELLED_STATE:
failure_reason = list(record.attempts)[-1].message
else:
failure_reason = ''
return (
record.pk,
record.payload_id,
failure_reason,
'Yes',
'Already Sent' if succeeded else 'Duplicate',
)
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
from datetime import datetime
import csv
from datetime import datetime
from inspect import cleandoc

from django.core.management.base import BaseCommand, CommandError

from couchdbkit import ResourceNotFound
from openpyxl import Workbook

from corehq.motech.repeaters.dbaccessors import (
get_repeat_record_count,
iter_repeat_records_by_domain,
iter_sql_repeat_records_by_domain,
)
from corehq.motech.repeaters.models import RepeatRecord
from corehq.motech.repeaters.models import SQLRepeatRecord
from corehq.util.log import with_progress_bar


class Command(BaseCommand):
help = """
help = cleandoc("""
Pass Repeater ID along with domain and State(Optional) or
Pass a csv file path with a list of repeat records IDs to get a report(xlsx)
with final state and message for all attempts(if available)
"""
""")

def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -54,10 +54,14 @@ def add_arguments(self, parser):
)

def _add_row(self, repeat_record):
row = [repeat_record.get_id, repeat_record.payload_id, repeat_record.state, repeat_record.failure_reason]
repeat_record_attempts_count = len(repeat_record.attempts)
if repeat_record_attempts_count > self.max_attempts_in_sheet:
self.max_attempts_in_sheet = repeat_record_attempts_count
row = [
repeat_record.pk,
repeat_record.payload_id,
repeat_record.state,
repeat_record.failure_reason,
]
if repeat_record.num_attempts > self.max_attempts_in_sheet:
self.max_attempts_in_sheet = repeat_record.num_attempts
for attempt in repeat_record.attempts:
row.append(attempt.message)
self.ws.append(row)
Expand Down Expand Up @@ -105,16 +109,19 @@ def handle(self, *args, **options):
records = self.record_ids
record_count = len(records)
elif domain and repeater_id:
records = iter_repeat_records_by_domain(domain, repeater_id=repeater_id, state=state)
record_count = get_repeat_record_count(domain, repeater_id=repeater_id, state=state)
records, record_count = iter_sql_repeat_records_by_domain(
domain,
repeater_id,
[state] if state else None,
)
else:
raise CommandError("Insufficient Arguments")

for record in with_progress_bar(records, length=record_count):
if isinstance(record, str):
record_id = record
try:
record = RepeatRecord.get(record_id)
record = SQLRepeatRecord.objects.get(pk=record_id)
except ResourceNotFound:
self.ws.append([record_id, '', 'Not Found'])
continue
Expand Down
Loading