diff --git a/corehq/apps/couch_sql_migration/casediff.py b/corehq/apps/couch_sql_migration/casediff.py index 31bbb1cb9d0d..5f23d5d4d8b2 100644 --- a/corehq/apps/couch_sql_migration/casediff.py +++ b/corehq/apps/couch_sql_migration/casediff.py @@ -1,7 +1,9 @@ +import json import logging from collections import defaultdict from contextlib import contextmanager from functools import partial +from xml.sax.saxutils import unescape import attr @@ -32,6 +34,7 @@ from corehq.util.metrics import metrics_counter from .diff import filter_case_diffs, filter_ledger_diffs +from .diffrule import ANY from .rebuildcase import rebuild_and_diff_cases from .statedb import Change from .util import retry_on_sql_error @@ -115,7 +118,7 @@ def diff(couch_json, sql_json): return couch_case, diffs, changes diffs = diff(couch_case, sql_json) if diffs: - if is_case_patched(diffs): + if is_case_patched(case_id, diffs): return couch_case, [], [] form_diffs = diff_case_forms(couch_case, sql_json) if form_diffs: @@ -312,15 +315,58 @@ def iter_stock_transactions(self, form_id): yield report.report_type, tx -def is_case_patched(diffs): - from .casepatch import PatchForm - if not (len(diffs) == 1 - and diffs[0].diff_type == "set_mismatch" - and list(diffs[0].path) == ["xform_ids", "[*]"] - and diffs[0].new_value): +def is_case_patched(case_id, diffs): + """Check if case has been patched + + The case has been patched if at least one patch form has been + applied to the SQL case and if all of the given diffs are + unpatchable and match an unpatchable diff encoded in one of the + patch forms. + + Additionally, diffs having a MISSING `old_value` are patched with an + empty string, which is semantically equivalent to removing the case + property in CommCare. However, a difference is detectable at the + storage level even after the patch has been applied, and therefore + these subsequent patch diffs are considered to be patched. + + The "xform_ids" diff is a special exception because it is not + patchable and is not required to be present in the patch form. + + :returns: True if the case has been patched else False. + """ + def is_patched(form_ids): + forms = get_sql_forms(form_ids, ordered=True) + for form in reversed(forms): + if form.xmlns == PatchForm.xmlns: + discard_expected_diffs(form.form_data.get("diff")) + if not unpatched: + return True return False - form_ids = diffs[0].new_value.split(",") - return any(f.xmlns == PatchForm.xmlns for f in get_sql_forms(form_ids)) + + def discard_expected_diffs(patch_data): + data = json.loads(unescape(patch_data)) if patch_data else {} + if data.get("case_id") != case_id: + return + for diff in data.get("diffs", []): + diff.pop("reason", None) + path = tuple(diff["path"]) + if path in unpatched and diff_to_json(unpatched[path], ANY) == diff: + unpatched.pop(path) + + def expected_patch_diff(diff): + return not is_patchable(diff) or ( + diff.old_value is MISSING and diff.new_value == "") + + from .casepatch import PatchForm, is_patchable, diff_to_json + unpatched = {tuple(d.path): d for d in diffs if expected_patch_diff(d)} + xform_ids = unpatched.pop(("xform_ids", "[*]"), None) + return ( + xform_ids is not None + and xform_ids.diff_type == "set_mismatch" + and xform_ids.new_value + and len(diffs) == len(unpatched) + 1 # false if any diffs are patchable + and is_patched(xform_ids.new_value.split(",")) + ) def diff_case_forms(couch_json, sql_json): @@ -497,5 +543,5 @@ def get_couch_form(form_id): @retry_on_sql_error -def get_sql_forms(form_id): - return FormAccessorSQL.get_forms(form_id) +def get_sql_forms(form_id, **kw): + return FormAccessorSQL.get_forms(form_id, **kw) diff --git a/corehq/apps/couch_sql_migration/casepatch.py b/corehq/apps/couch_sql_migration/casepatch.py index d62367465c27..45b5ab968572 100644 --- a/corehq/apps/couch_sql_migration/casepatch.py +++ b/corehq/apps/couch_sql_migration/casepatch.py @@ -1,7 +1,9 @@ +import json import logging from datetime import datetime from functools import partial, wraps from uuid import uuid4 +from xml.sax.saxutils import escape from django.template.loader import render_to_string @@ -98,7 +100,7 @@ def __attrs_post_init__(self): updates.extend([const.CASE_ACTION_CREATE, const.CASE_ACTION_UPDATE]) self._dynamic_properties = self.case.dynamic_case_properties() else: - if cannot_patch(self.diffs): + if has_illegal_props(self.diffs): raise CannotPatch(self.diffs) props = dict(iter_dynamic_properties(self.diffs)) self._dynamic_properties = props @@ -133,8 +135,42 @@ def indices(self): yield CommCareCaseIndex.wrap(diff.old_value) -ILLEGAL_PROPS = {"actions", "*"} -IGNORE_PROPS = {"opened_by", "external_id"} +def has_illegal_props(diffs): + return any(d.path[0] in ILLEGAL_PROPS for d in diffs) + + +def has_known_props(diffs): + return any(d.path[0] in KNOWN_PROPERTIES for d in diffs) + + +def iter_dynamic_properties(diffs): + for diff in diffs: + name = diff.path[0] + if name in STATIC_PROPS: + continue + if diff.old_value is MISSING: + value = "" + elif len(diff.path) > 1 or not isinstance(diff.old_value, str): + raise CannotPatch([diff]) + else: + value = diff.old_value + yield name, value + + +ILLEGAL_PROPS = {"actions", "case_id", "domain", "*"} +UNPATCHABLE_PROPS = { + "closed_by", + "closed_on", + "deleted_on", + "deletion_id", + "external_id", + "modified_by", + "modified_on", + "opened_by", + "opened_on", + "server_modified_on", + "xform_ids", +} STATIC_PROPS = { "case_id", "closed", @@ -167,25 +203,6 @@ def indices(self): } -def cannot_patch(diffs): - return any(d.path[0] in ILLEGAL_PROPS for d in diffs) \ - or all(d.path[0] in IGNORE_PROPS for d in diffs) - - -def has_known_props(diffs): - return any(d.path[0] in KNOWN_PROPERTIES for d in diffs) - - -def iter_dynamic_properties(diffs): - for diff in diffs: - name = diff.path[0] - if name in STATIC_PROPS: - continue - if len(diff.path) > 1 or not isinstance(diff.old_value, str): - raise CannotPatch([diff]) - yield name, diff.old_value - - @attr.s class PatchForm: _case = attr.ib() @@ -204,9 +221,10 @@ def __getattr__(self, name): def get_xml(self): updates = self._case._updates case_block = get_case_xml(self._case, updates, version='2.0') + diff_block = get_diff_block(self._case) return render_to_string('hqcase/xml/case_block.xml', { 'xmlns': self.xmlns, - 'case_block': case_block.decode('utf-8'), + 'case_block': case_block.decode('utf-8') + diff_block, 'time': json_format_datetime(self.received_on), 'uid': self.form_id, 'username': "", @@ -245,6 +263,52 @@ def add_patch_operation(sql_form): )) +def get_diff_block(case): + """Get XML element containing case diff data + + Some early patch forms were submitted without this element. + + :param case: `PatchCase` instance. + :returns: A "" XML element string containing XML-escaped + JSON-encoded case diff data, some of which may be patched. + + ```json + { + "case_id": case.case_id, + "diffs": [ + { + "path": diff.path, + "old": diff.old_value, # omitted if old_value is MISSING + "new": diff.new_value, # omitted if new_value is MISSING + "patch": true if patched else false + "reason": "...", # omitted if reason for change is unknown + }, + ... + ] + } + ``` + """ + diffs = [diff_to_json(d) for d in sorted(case.diffs, key=lambda d: d.path)] + data = {"case_id": case.case_id, "diffs": diffs} + return f"{escape(json.dumps(data))}" + + +def diff_to_json(diff, new_value=None): + assert diff.old_value is not MISSING or diff.new_value is not MISSING, diff + obj = {"path": list(diff.path), "patch": is_patchable(diff)} + if diff.old_value is not MISSING: + obj["old"] = diff.old_value + if diff.new_value is not MISSING: + obj["new"] = diff.new_value if new_value is None else new_value + if getattr(diff, "reason", ""): + obj["reason"] = diff.reason + return obj + + +def is_patchable(diff): + return diff.path[0] not in UNPATCHABLE_PROPS + + class CannotPatch(Exception): def __init__(self, json_diffs): diff --git a/corehq/apps/couch_sql_migration/management/commands/couch_sql_diff.py b/corehq/apps/couch_sql_migration/management/commands/couch_sql_diff.py index 07d32514fed6..ab19f1de20b5 100644 --- a/corehq/apps/couch_sql_migration/management/commands/couch_sql_diff.py +++ b/corehq/apps/couch_sql_migration/management/commands/couch_sql_diff.py @@ -5,29 +5,35 @@ import sys from collections import defaultdict from itertools import groupby +from xml.sax.saxutils import unescape from django.conf import settings from django.core.management.base import BaseCommand, CommandError +from django.db.models import Q from dimagi.utils.chunked import chunked from dimagi.utils.couch.database import retry_on_couch_error from corehq.apps.domain.models import Domain +from corehq.apps.tzmigration.timezonemigration import MISSING from corehq.form_processor.backends.couch.dbaccessors import FormAccessorCouch from corehq.form_processor.backends.sql.dbaccessors import ( CaseAccessorSQL, FormAccessorSQL, ) +from corehq.form_processor.models import XFormInstanceSQL from corehq.form_processor.utils import should_use_sql_backend +from corehq.sql_db.util import paginate_query_across_partitioned_databases from corehq.util.log import with_progress_bar from ...casediff import get_couch_cases from ...casedifftool import do_case_diffs, do_case_patch, format_diffs, get_migrator +from ...casepatch import PatchForm from ...couchsqlmigration import setup_logging from ...diff import filter_case_diffs, filter_form_diffs from ...missingdocs import MissingIds from ...rewind import IterationState -from ...statedb import Counts, StateDB, open_state_db +from ...statedb import Change, Counts, StateDB, open_state_db log = logging.getLogger(__name__) @@ -83,6 +89,9 @@ def add_arguments(self, parser): parser.add_argument('--changes', dest="changes", action='store_true', default=False, help="Show changes instead of diffs. Only valid with 'show' action") + parser.add_argument('--patched', + dest="patched", action='store_true', default=False, + help="Show case diffs recorded in patch forms.") parser.add_argument('--csv', dest="csv", action='store_true', default=False, help="Output diffs to stdout in CSV format.") @@ -120,6 +129,7 @@ def handle(self, domain, action, **options): "select", "stop", "changes", + "patched", "csv", "batch_size", "reset", @@ -165,7 +175,9 @@ def do_show(self, domain): statedb = self.open_state_db(domain) print(f"showing diffs from {statedb}", file=sys.stderr) select = self.get_select_kwargs() - if self.changes: + if self.patched: + items = iter_patch_form_diffs(domain, **select) + elif self.changes: items = statedb.iter_doc_changes(**select) else: items = statedb.iter_doc_diffs(**select) @@ -272,6 +284,8 @@ def with_progress(self, doc_diffs, statedb, select): count = len(select["doc_ids"]) elif "by_kind" in select: count = sum(len(v) for v in select["by_kind"].values() if v) + elif self.patched: + count = None elif select: count = counts.get(select["kind"], Counts()) count = count.changes if self.changes else count.diffs @@ -500,5 +514,68 @@ def get_couch_forms(form_ids): return FormAccessorCouch.get_forms(form_ids) +def iter_patch_form_diffs(domain, *, kind=None, doc_ids=None, by_kind=None): + if kind: + if by_kind: + raise ValueError("cannot query 'kind' and 'by_kind' together") + if kind not in ["forms", "cases"]: + raise ValueError(f"kind must be 'forms' or 'cases'; got {kind}") + if not doc_ids: + raise ValueError(f"please specify doc ids: --select={kind}:id,...") + by_kind = {kind: doc_ids} + if by_kind: + if by_kind.keys() - {"forms", "cases"}: + kinds = list(by_kind) + raise ValueError(f"valid kinds 'forms' and 'cases'; got {kinds}") + form_ids = by_kind.get("forms", []) + case_ids = by_kind.get("cases", []) + if case_ids: + # may be inefficient for cases with many forms + for case in CaseAccessorSQL.get_cases(case_ids): + form_ids.extend(case.xform_ids) + forms = (f for f in FormAccessorSQL.get_forms(form_ids) + if f.xmlns == PatchForm.xmlns) + else: + # based on iter_form_ids_by_xmlns + q_expr = Q(domain=domain, xmlns=PatchForm.xmlns) + forms = paginate_query_across_partitioned_databases( + XFormInstanceSQL, q_expr, load_source='couch_to_sql_migration') + for form in forms: + yield from iter_doc_diffs(form) + + +def iter_doc_diffs(form): + """Yield doc diffs loaded from patch from XML + + See ...casepatch.get_diff_block for diff JSON structure. + """ + def get_doc_diff(diff, case_id): + old = diff.get("old", MISSING) + new = diff.get("new", MISSING) + if old is MISSING or new is MISSING: + diff_type = "missing" + elif type(old) != type(new): + diff_type = "type" + else: + diff_type = "diff" + return Change( + kind="CommCareCase", + doc_id=case_id, + reason=diff.get("reason", ""), + diff_type=diff_type, + path=diff["path"], + old_value=old, + new_value=new, + ) + + diff_data = form.form_data.get("diff") + if diff_data is None: + return + data = json.loads(unescape(diff_data)) + case_id = data.get("case_id", ""), + diffs = [get_doc_diff(diff, case_id) for diff in data.get("diffs", [])] + yield "CommCareCase", case_id, diffs + + def confirm(msg): return input(msg + " (Y/n) ").lower().strip() in ['', 'y', 'yes'] diff --git a/corehq/apps/couch_sql_migration/tests/test_casedifftool.py b/corehq/apps/couch_sql_migration/tests/test_casedifftool.py index a880b6626016..e383bf723253 100644 --- a/corehq/apps/couch_sql_migration/tests/test_casedifftool.py +++ b/corehq/apps/couch_sql_migration/tests/test_casedifftool.py @@ -1,5 +1,7 @@ +import json from contextlib import contextmanager from datetime import datetime, timedelta +from xml.sax.saxutils import unescape from mock import patch @@ -13,11 +15,13 @@ from corehq.form_processor.utils.general import ( clear_local_domain_sql_backend_override, ) +from corehq.util.dates import iso_string_to_datetime from corehq.util.test_utils import capture_log_output from .test_migration import BaseMigrationTestCase, Diff, IGNORE, make_test_form from .. import casediff from .. import casedifftool as mod +from ..diffrule import ANY from ..statedb import open_state_db @@ -145,6 +149,31 @@ def test_couch_with_missing_forms(self): self.compare_diffs(changes=[ Diff('test-case', 'missing', ['thing'], old=MISSING, new='1', reason='rebuild case'), ]) + self.do_migration(patch=True, diffs=[]) + + def test_couch_missing_create_case(self): + with self.skip_case_and_ledger_updates("thing-form"): + self.submit_form(THING_FORM) + self.submit_form(UPDATE_FORM) + case = self._get_case("test-case") + # simulate null properties seen in the wild + object.__setattr__(case, "name", None) + object.__setattr__(case, "type", None) + case.save() + with self.diff_without_rebuild(): + self.do_migration() + self.compare_diffs([ + Diff('test-case', 'missing', ['thing'], old=MISSING, new='1'), + Diff('test-case', 'set_mismatch', ['xform_ids', '[*]'], old='', new='thing-form'), + Diff('test-case', 'type', ['name'], old=None, new='Thing'), + Diff('test-case', 'type', ['type'], old=None, new='testing'), + ]) + self.do_migration(patch=True, diffs=[]) + case = self._get_case("test-case") + self.assertEqual(case.name, "") + self.assertEqual(case.type, "") + self.assertEqual(case.dynamic_case_properties()["thing"], "") + self.assertEqual(case.xform_ids, ['thing-form', 'update-form', ANY]) def test_case_with_deleted_form(self): # form state=normal / deleted -> missing case @@ -214,6 +243,56 @@ def test_patch_known_properties(self): self.do_migration(forms="missing", case_diff="patch") self.assertEqual(self._get_case("case-1").opened_on, open_date) + def test_unpatchable_properties(self): + date1 = "2018-07-13T11:20:11.381000Z" + self.submit_form(make_test_form("form-1", case_id="case-1")) + case = self._get_case("case-1") + user = case.user_id + case.closed = True + case.closed_by = "someone" + case.closed_on = iso_string_to_datetime(date1) + case.external_id = "ext" + case.name = "Zena" + case.opened_by = "someone" + case.server_modified_on = iso_string_to_datetime(date1) + case.user_id = "person" + case.save() + self.do_migration(diffs=[ + Diff('case-1', 'diff', ['closed'], old=True, new=False), + Diff('case-1', 'diff', ['closed_by'], old='someone', new=''), + Diff('case-1', 'diff', ['external_id'], old='ext', new=''), + Diff('case-1', 'diff', ['name'], old='Zena', new='Xeenax'), + Diff('case-1', 'diff', ['opened_by'], old='someone', new=user), + Diff('case-1', 'diff', ['user_id'], old='person', new=user), + Diff('case-1', 'type', ['closed_on'], old=date1, new=None), + ]) + self.do_migration(patch=True, diffs=[]) + close2 = iso_string_to_datetime("2015-08-04T18:25:56.656Z") + case = self._get_case("case-1") + self.assertEqual(case.closed, True) # patched + self.assertEqual(case.closed_by, "person") # unpatched + self.assertEqual(case.closed_on, close2) # unpatched + self.assertEqual(case.external_id, 'ext') # patched, not sure how/why + self.assertEqual(case.name, "Zena") # patched + self.assertEqual(case.opened_by, user) # unpatched + self.assertEqual(case.user_id, "person") # patched + self.assertNotEqual(case.server_modified_on, + iso_string_to_datetime(date1)) # unpatched + form = self._get_form(case.xform_ids[-1]) + diffs = json.loads(unescape(form.form_data["diff"])) + self.assertEqual(diffs, { + "case_id": "case-1", + "diffs": [ + {"path": ["closed"], "old": True, "new": False, "patch": True}, + {"path": ["closed_by"], "old": "someone", "new": "", "patch": False}, + {"path": ["closed_on"], "old": date1, "new": None, "patch": False}, + {"path": ["external_id"], "old": "ext", "new": "", "patch": False}, + {"path": ["name"], "old": "Zena", "new": "Xeenax", "patch": True}, + {"path": ["opened_by"], "old": "someone", "new": user, "patch": False}, + {"path": ["user_id"], "old": "person", "new": user, "patch": True}, + ], + }) + def test_patch_closed_case(self): from casexml.apps.case.cleanup import close_case self.submit_form(make_test_form("form-1", case_id="case-1")) @@ -356,6 +435,40 @@ def assert_patched_cases(self, case_ids=None): """.strip() + +UPDATE_FORM = """ + + + 27 + + + 27 + + + + cloudcare + 2015-07-13T11:20:11.381Z + 2015-08-04T18:25:56.656Z + jeremy + 3fae4ea4af440efaa53441b5 + update-form + 2.0 + + +""".strip() + + LEDGER_FORM = """ {actual_patched}\n" + f"{sep.join(repr(d) for d in patch_diffs)}\n\n{form.diff_block}" + ) class FakeCase: case_id = "fake" closed = False + + +@attr.s +class FakeForm: + diff_block = attr.ib() + xmlns = mod.PatchForm.xmlns + + @property + def form_data(self): + xml = self.diff_block + assert xml.startswith(""), xml + assert xml.endswith(""), xml + return {"diff": xml[6:-7]}