Skip to content

Commit

Permalink
Approve claims (#26)
Browse files Browse the repository at this point in the history
Make it possible to approve claims
  • Loading branch information
anneschuth authored Feb 22, 2025
1 parent 586104b commit a580f34
Show file tree
Hide file tree
Showing 17 changed files with 874 additions and 175 deletions.
5 changes: 4 additions & 1 deletion machine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ async def evaluate(
sources: dict[str, pd.DataFrame] | None = None,
calculation_date=None,
requested_output: str | None = None,
approved: bool = False,
) -> dict[str, Any]:
"""Evaluate rules using service context and sources"""
parameters = parameters or {}
Expand All @@ -181,7 +182,9 @@ async def evaluate(
claims = None
if "BSN" in parameters:
bsn = parameters["BSN"]
claims = self.service_provider.claim_manager.get_claim_by_bsn_service_law(bsn, self.service_name, self.law)
claims = self.service_provider.claim_manager.get_claim_by_bsn_service_law(
bsn, self.service_name, self.law, approved=approved
)

context = RuleContext(
definitions=self.definitions,
Expand Down
28 changes: 26 additions & 2 deletions machine/events/case/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class CaseStatus(str, Enum):
OBJECTED = "OBJECTED"


class ClaimStatusTranscoding(Transcoding):
class CaseStatusTranscoding(Transcoding):
@staticmethod
def can_handle(obj: object) -> bool:
return isinstance(obj, CaseStatus | str)
Expand All @@ -29,7 +29,7 @@ def decode(data: str) -> str:
return data # Keep it as a string


Transcoding.register(ClaimStatusTranscoding)
Transcoding.register(CaseStatusTranscoding)


class Case(Aggregate):
Expand All @@ -43,6 +43,7 @@ def __init__(
claimed_result: dict,
rulespec_uuid: str,
) -> None:
self.claim_ids = None
self.bsn = bsn
self.service = service_type
self.law = law
Expand Down Expand Up @@ -190,3 +191,26 @@ def can_appeal(self) -> bool:
if not hasattr(self, "appeal_status") or self.appeal_status is None:
return False
return bool(self.appeal_status.get("possible", False))

@event("ClaimCreated")
def add_claim(self, claim_id: str) -> None:
"""Record when a new claim is created for this case"""
if not hasattr(self, "claim_ids") or self.claim_ids is None:
self.claim_ids = set()
self.claim_ids.add(claim_id)

@event("ClaimApproved")
def approve_claim(self, claim_id: str) -> None:
"""Record when a claim is approved"""
if not hasattr(self, "claim_ids") or self.claim_ids is None:
self.claim_ids = set()
if claim_id not in self.claim_ids:
self.claim_ids.add(claim_id)

@event("ClaimRejected")
def reject_claim(self, claim_id: str) -> None:
"""Record when a claim is rejected"""
if not hasattr(self, "claim_ids") or self.claim_ids is None:
self.claim_ids = set()
if claim_id not in self.claim_ids:
self.claim_ids.add(claim_id)
39 changes: 35 additions & 4 deletions machine/events/claim/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,44 @@ def __init__(
self.status = ClaimStatus.PENDING
self.created_at = datetime.now()

@event("Reset")
def reset(
self,
service: str,
key: str,
new_value: Any,
reason: str,
claimant: str,
law: str,
bsn: str,
case_id: str | None = None,
old_value: Any | None = None,
evidence_path: str | None = None,
) -> None:
self.service = service
self.key = key
self.old_value = old_value
self.new_value = new_value
self.reason = reason
self.evidence_path = evidence_path
self.claimant = claimant
self.case_id = case_id
self.law = law
self.bsn = bsn
self.status = ClaimStatus.PENDING
self.created_at = datetime.now()

@event("AutoApproved")
def auto_approve(self, verified_by: str, verified_value: Any) -> None:
"""Approve the claim with potentially adjusted value"""
self.status = ClaimStatus.APPROVED
self.verified_by = verified_by
self.verified_value = verified_value
self.verified_at = datetime.now()

@event("Approved")
def approve(self, verified_by: str, verified_value: Any) -> None:
"""Approve the claim with potentially adjusted value"""
if self.status != ClaimStatus.PENDING:
raise ValueError("Can only approve pending claims")
self.status = ClaimStatus.APPROVED
self.verified_by = verified_by
self.verified_value = verified_value
Expand All @@ -70,8 +103,6 @@ def approve(self, verified_by: str, verified_value: Any) -> None:
@event("Rejected")
def reject(self, rejected_by: str, rejection_reason: str) -> None:
"""Reject the claim with a reason"""
if self.status != ClaimStatus.PENDING:
raise ValueError("Can only reject pending claims")
self.status = ClaimStatus.REJECTED
self.rejected_by = rejected_by
self.rejection_reason = rejection_reason
Expand Down
168 changes: 126 additions & 42 deletions machine/events/claim/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ def __init__(self, rules_engine, **kwargs) -> None:
self._service_index: dict[str, list[str]] = {} # service -> [claim_ids]
self._case_index: dict[str, list[str]] = {} # case_id -> [claim_ids]
self._claimant_index: dict[str, list[str]] = {} # claimant -> [claim_ids]
self._bsn_index: dict[str, list[str]] = {} # claimant -> [claim_ids]
self._status_index: dict[ClaimStatus, list[str]] = {status: [] for status in ClaimStatus}
self._bsn_service_law_index: dict[tuple[str, str, str], dict[str, str]] = {} # (service, key) -> claim_id
self._case_manager = None

def _index_claim(self, claim: Claim) -> None:
"""Add claim to all indexes"""
Expand All @@ -42,19 +44,19 @@ def _index_claim(self, claim: Claim) -> None:
self._claimant_index[claim.claimant] = []
self._claimant_index[claim.claimant].append(claim_id)

# Status index
self._status_index[claim.status].append(claim_id)
# BSN index
if claim.bsn not in self._bsn_index:
self._bsn_index[claim.bsn] = []
self._bsn_index[claim.bsn].append(claim_id)

# Service-Key index
if (claim.bsn, claim.service, claim.law) not in self._bsn_service_law_index:
self._bsn_service_law_index[(claim.bsn, claim.service, claim.law)] = {}
self._bsn_service_law_index[(claim.bsn, claim.service, claim.law)][claim.key] = claim_id

def _update_status_index(self, claim: Claim, old_status: ClaimStatus) -> None:
"""Update status index when claim status changes"""
claim_id = str(claim.id)
self._status_index[old_status].remove(claim_id)
self._status_index[claim.status].append(claim_id)
@property
def case_manager(self):
return self._case_manager

def submit_claim(
self,
Expand All @@ -68,42 +70,84 @@ def submit_claim(
case_id: str | None = None,
old_value: Any | None = None,
evidence_path: str | None = None,
auto_approve: bool = False, # Add this parameter
) -> str:
"""
Submit a new claim. Can be linked to an existing case or standalone.
If auto_approve is True, the claim will be automatically approved.
"""
claim = Claim(
service=service,
key=key,
new_value=new_value,
reason=reason,
claimant=claimant,
case_id=case_id,
old_value=old_value,
evidence_path=evidence_path,
law=law,
bsn=bsn,
)
existing_claims = self.get_claim_by_bsn_service_law(bsn, service, law, include_rejected=True)
if existing_claims and key in existing_claims:
claim = existing_claims[key]
claim.reset(
service=service,
key=key,
new_value=new_value,
reason=reason,
claimant=claimant,
case_id=case_id,
old_value=old_value,
evidence_path=evidence_path,
law=law,
bsn=bsn,
)
self.save(claim)
else:
claim = Claim(
service=service,
key=key,
new_value=new_value,
reason=reason,
claimant=claimant,
case_id=case_id,
old_value=old_value,
evidence_path=evidence_path,
law=law,
bsn=bsn,
)
self.save(claim)
self._index_claim(claim)

case = None
if claim.case_id:
case = self.case_manager.get_case_by_id(claim.case_id)
if case:
case.add_claim(claim.id)
self.case_manager.save(case)

# Auto-approve if requested
if auto_approve:
claim.auto_approve(verified_by=claimant, verified_value=new_value)
self.save(claim)
if case:
case.approve_claim(claim.id)
self.case_manager.save(case)

self.save(claim)
self._index_claim(claim)
return str(claim.id)

def approve_claim(self, claim_id: str, verified_by: str, verified_value: Any) -> None:
"""Approve a claim with verified value"""
claim = self.get_claim(claim_id)
old_status = claim.status
claim.approve(verified_by, verified_value)
self.save(claim)
self._update_status_index(claim, old_status)

if claim.case_id:
case = self.case_manager.get_case_by_id(claim.case_id)
if case:
case.approve_claim(claim.id)
self.case_manager.save(case)

def reject_claim(self, claim_id: str, rejected_by: str, rejection_reason: str) -> None:
"""Reject a claim with reason"""
claim = self.get_claim(claim_id)
old_status = claim.status
claim.reject(rejected_by, rejection_reason)
self.save(claim)
self._update_status_index(claim, old_status)

if claim.case_id:
case = self.case_manager.get_case_by_id(claim.case_id)
if case:
case.reject_claim(claim.id)
self.case_manager.save(case)

def link_case(self, claim_id: str, case_id: str) -> None:
"""Link an existing claim to a case"""
Expand All @@ -127,27 +171,67 @@ def get_claim(self, claim_id: str) -> Claim:
"""Get claim by ID"""
return self.repository.get(UUID(claim_id))

def get_claims_by_service(self, service: str) -> list[Claim]:
"""Get all claims for a service"""
return [self.get_claim(claim_id) for claim_id in self._service_index.get(service, [])]
@staticmethod
def _filter_claims_by_status(claims: list[Claim], approved: bool, include_rejected: bool = False) -> list[Claim]:
"""
Helper method to filter claims based on approved parameter.
def get_claims_by_case(self, case_id: str) -> list[Claim]:
"""Get all claims for a case"""
return [self.get_claim(claim_id) for claim_id in self._case_index.get(case_id, [])]
Args:
claims: List of claims to filter
approved: If True, only return approved claims. If False, return approved and submitted claims.
include_rejected: If True, also include rejected claims in the results.
"""
if approved:
return [claim for claim in claims if claim.status == ClaimStatus.APPROVED]

def get_claims_by_claimant(self, claimant: str) -> list[Claim]:
"""Get all claims made by a claimant"""
return [self.get_claim(claim_id) for claim_id in self._claimant_index.get(claimant, [])]
allowed_statuses = {ClaimStatus.APPROVED, ClaimStatus.PENDING}
if include_rejected:
allowed_statuses.add(ClaimStatus.REJECTED)

def get_claims_by_status(self, status: ClaimStatus) -> list[Claim]:
"""Get all claims with a specific status"""
return [self.get_claim(claim_id) for claim_id in self._status_index.get(status, [])]
return [claim for claim in claims if claim.status in allowed_statuses]

def get_claim_by_bsn_service_law(self, bsn: str, service: str, law: str) -> dict[str:Claim] | None:
def get_claims_by_service(
self, service: str, approved: bool = False, include_rejected: bool = False
) -> list[Claim]:
"""
Get a dictionary with claims
Get all claims for a service, filtered by status
Args:
service: Service to filter by
approved: If True, only return approved claims
include_rejected: If True, also include rejected claims
"""
claims = [self.get_claim(claim_id) for claim_id in self._service_index.get(service, [])]
return self._filter_claims_by_status(claims, approved, include_rejected)

def get_claims_by_case(self, case_id: str, approved: bool = False, include_rejected: bool = False) -> list[Claim]:
"""Get all claims for a case, filtered by status"""
claims = [self.get_claim(claim_id) for claim_id in self._case_index.get(case_id, [])]
return self._filter_claims_by_status(claims, approved, include_rejected)

def get_claims_by_claimant(
self, claimant: str, approved: bool = False, include_rejected: bool = False
) -> list[Claim]:
"""Get all claims made by a claimant, filtered by status"""
claims = [self.get_claim(claim_id) for claim_id in self._claimant_index.get(claimant, [])]
return self._filter_claims_by_status(claims, approved, include_rejected)

def get_claims_by_bsn(self, bsn: str, approved: bool = False, include_rejected: bool = False) -> list[Claim]:
"""Get all claims for a BSN, filtered by status"""
claims = [self.get_claim(claim_id) for claim_id in self._bsn_index.get(bsn, [])]
return self._filter_claims_by_status(claims, approved, include_rejected)

def get_claim_by_bsn_service_law(
self, bsn: str, service: str, law: str, approved: bool = False, include_rejected: bool = False
) -> dict[str:Claim] | None:
"""Get a dictionary with claims filtered by status"""
key_index = self._bsn_service_law_index.get((bsn, service, law))
if key_index:
return {key: self.get_claim(claim_id) for key, claim_id in key_index.items()}
return None
if not key_index:
return None
claims = {key: self.get_claim(claim_id) for key, claim_id in key_index.items()}
filtered_claims = {
key: claim
for key, claim in claims.items()
if claim in self._filter_claims_by_status([claim], approved, include_rejected)
}
return filtered_claims if filtered_claims else None
22 changes: 0 additions & 22 deletions machine/events/claim/processor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.system import ProcessApplication

from .aggregate import Claim


class ClaimProcessor(ProcessApplication):
"""Process application for handling claim events"""
Expand All @@ -22,23 +20,3 @@ def case_manager(self):
@singledispatchmethod
def policy(self, domain_event, process_event) -> None:
"""Sync policy that processes events"""

@policy.register(Claim.Approved)
async def handle_claim_approved(self, domain_event, process_event) -> None:
"""
When a claim is approved:
1. If linked to a case, update the case
2. Run any applicable rules
"""
claim = self.repository.get(domain_event.originator_id)

# If claim is linked to a case, update it
if claim.case_id:
case = self.case_manager.get_case_by_id(claim.case_id)
if case:
# Update the case parameter
case.update_parameter(key=claim.key, new_value=domain_event.verified_value)
self.case_manager.save(case)

# Run any applicable rules
await self.rules_engine.apply_rules(domain_event)
Loading

0 comments on commit a580f34

Please sign in to comment.