Skip to content

Commit

Permalink
feat: implement throttle mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
pehlicd committed Apr 11, 2024
1 parent 585dcc4 commit 87008cd
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 12 deletions.
27 changes: 26 additions & 1 deletion keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import os
import uuid
from datetime import datetime, timedelta, timezone
from typing import List, Tuple
from uuid import uuid4
Expand Down Expand Up @@ -141,7 +142,7 @@ def create_db_and_tables():
logger.info("Migrating WorkflowToAlertExecution table")
# get the foreign key constraint name
results = session.exec(
f"SELECT CONSTRAINT_NAME FROM information_schema.KEY_COLUMN_USAGE WHERE TABLE_SCHEMA = '{engine.url.database}' AND TABLE_NAME = 'workflowtoalertexecution' AND COLUMN_NAME = 'alert_fingerprint';"
f"SELECT CONSTRAINT_NAME FROM information_schema.KEY_COLUMN_USAGE WHERE TABLE_SCHEMA = '{engine.url.database}' AND TABLE_NAME = 'workflowtoalertexecution' AND COLUMN_NAME = 'alert_fingerprint' AND COLUMN_NAME = 'event_id';"
)
# now remove it
for row in results:
Expand Down Expand Up @@ -273,6 +274,7 @@ def create_workflow_execution(
tenant_id: str,
triggered_by: str,
execution_number: int = 1,
event_id: str = None,
fingerprint: str = None,
) -> WorkflowExecution:
with Session(engine) as session:
Expand All @@ -294,6 +296,7 @@ def create_workflow_execution(
workflow_to_alert_execution = WorkflowToAlertExecution(
workflow_execution_id=workflow_execution.id,
alert_fingerprint=fingerprint,
event_id=event_id,
)
session.add(workflow_to_alert_execution)

Expand Down Expand Up @@ -489,6 +492,16 @@ def add_or_update_workflow(
return existing_workflow if existing_workflow else workflow


def get_workflow_to_alert_execution_by_workflow_execution_id(
workflow_execution_id: str
) -> WorkflowToAlertExecution:
with Session(engine) as session:
return (
session.query(WorkflowToAlertExecution)
.filter_by(workflow_execution_id=workflow_execution_id)
.first()
)

def get_last_workflow_workflow_to_alert_executions(
session: Session, tenant_id: str
) -> list[WorkflowToAlertExecution]:
Expand Down Expand Up @@ -1035,6 +1048,18 @@ def get_alerts_by_fingerprint(tenant_id: str, fingerprint: str, limit=1) -> List
return alerts


def get_alert_by_fingerprint_and_event_id(tenant_id: str, fingerprint: str, event_id: str) -> Alert:
with Session(engine) as session:
alert = (
session.query(Alert)
.filter(Alert.tenant_id == tenant_id)
.filter(Alert.fingerprint == fingerprint)
.filter(Alert.id == uuid.UUID(event_id))
.first()
)
return alert


def get_previous_alert_by_fingerprint(tenant_id: str, fingerprint: str) -> Alert:
# get the previous alert for a given fingerprint
with Session(engine) as session:
Expand Down
1 change: 1 addition & 0 deletions keep/api/models/db/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class WorkflowToAlertExecution(SQLModel, table=True):
id: Optional[int] = Field(primary_key=True, default=None)
workflow_execution_id: str = Field(foreign_key="workflowexecution.id")
alert_fingerprint: str
event_id: str
workflow_execution: WorkflowExecution = Relationship(
back_populates="workflow_to_alert_execution"
)
Expand Down
4 changes: 1 addition & 3 deletions keep/contextmanager/contextmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ def set_step_context(self, step_id, results, foreach=False):
self.steps_context_size = asizeof(self.steps_context)

def get_last_workflow_run(self, workflow_id):
# TODO: fix for throttling
# no previous runs
return {}
return get_last_workflow_execution_by_workflow_id(workflow_id, self.tenant_id)

def dump(self):
self.logger.info("Dumping logs to db")
Expand Down
5 changes: 3 additions & 2 deletions keep/step/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ def _check_throttling(self, action_name):
throttle = ThrottleFactory.get_instance(
self.context_manager, throttling_type, throttling_config
)
alert_id = self.context_manager.get_workflow_id()
return throttle.check_throttling(action_name, alert_id)
workflow_id = self.context_manager.get_workflow_id()
event_id = self.context_manager.event_context.event_id
return throttle.check_throttling(action_name, workflow_id, event_id)

def _get_foreach_items(self) -> list | list[list]:
"""Get the items to iterate over, when using the `foreach` attribute (see foreach.md)"""
Expand Down
5 changes: 3 additions & 2 deletions keep/throttles/base_throttle.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ def __init__(
self.context_manager = context_manager

@abc.abstractmethod
def check_throttling(self, action_name, alert_id, **kwargs) -> bool:
def check_throttling(self, action_name, workflow_id, event_id, **kwargs) -> bool:
"""
Validate provider configuration.
Args:
action_name (str): The name of the action to check throttling for.
alert_id (str): The id of the alert to check throttling for.
workflow_id (str): The id of the workflow to check throttling for.
event_id (str): The id of the event to check throttling for.
"""
raise NotImplementedError("apply() method not implemented")
23 changes: 19 additions & 4 deletions keep/throttles/one_until_resolved_throttle.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from keep.api.core.db import get_alert_by_fingerprint_and_event_id, \
get_workflow_to_alert_execution_by_workflow_execution_id
from keep.throttles.base_throttle import BaseThrottle
from keep.contextmanager.contextmanager import ContextManager

Expand All @@ -12,12 +14,25 @@ class OneUntilResolvedThrottle(BaseThrottle):
def __init__(self, context_manager: ContextManager, throttle_type, throttle_config):
super().__init__(context_manager=context_manager, throttle_type=throttle_type, throttle_config=throttle_config)

def check_throttling(self, action_name, alert_id, **kwargs) -> bool:
last_alert_run = self.context_manager.get_last_workflow_run(alert_id)
if not last_alert_run:
def check_throttling(self, action_name, workflow_id, event_id, **kwargs) -> bool:
last_workflow_run = self.context_manager.get_last_workflow_run(workflow_id)
if not last_workflow_run:
return False

# query workflowtoalertexecution table by workflow_id and after that get the alert by fingerprint and event_id
last_workflow_alert_execution = get_workflow_to_alert_execution_by_workflow_execution_id(last_workflow_run.id)
if not last_workflow_alert_execution:
return False

alert = get_alert_by_fingerprint_and_event_id(self.context_manager.tenant_id,
last_workflow_alert_execution.alert_fingerprint,
last_workflow_alert_execution.event_id)
if not alert:
return False

# if the last time the alert were triggered it was in resolved status, return false
if last_alert_run.get("alert_status").lower() == "resolved":
if alert.event.get("status").lower() == "resolved":
return False

# else, return true because its already firing
return True
2 changes: 2 additions & 0 deletions keep/workflowmanager/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def handle_manual_event_workflow(
triggered_by=f"manually by {triggered_by_user}",
execution_number=unique_execution_number,
fingerprint=alert.fingerprint,
event_id=alert.event_id,
)
self.logger.info(f"Workflow execution id: {workflow_execution_id}")
# This is kinda WTF exception since create_workflow_execution shouldn't fail for manual
Expand Down Expand Up @@ -256,6 +257,7 @@ def _handle_event_workflows(self):
triggered_by=triggered_by,
execution_number=workflow_execution_number,
fingerprint=event.fingerprint,
event_id=event.event_id,
)
# This is kinda wtf exception since create workflow execution shouldn't fail for events other than interval
except IntegrityError:
Expand Down

0 comments on commit 87008cd

Please sign in to comment.