Skip to content

Commit

Permalink
fix(agent/selfMonitoring): infinite throttle (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
PierreDemailly authored Apr 9, 2024
1 parent d158f33 commit cf06715
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 9 deletions.
20 changes: 14 additions & 6 deletions src/agent/src/utils/selfMonitoring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ function hasAgentFailureThrottle(throttle: SigynInitializedSelfMonitoring["throt
const agentFailuresAlert = (getDB()
.prepare("SELECT count FROM agentFailures WHERE timestamp >= ? ORDER BY count DESC")
.get(intervalDate) as { count: number });
const unprocessedAgentFailuresAlertFromThrottle = (getDB()
.prepare("SELECT count FROM agentFailures WHERE timestamp <= ? AND processed = 0 ORDER BY count DESC")
.get(intervalDate) as { count: number });
const lastAgentFailureAlert = (getDB()
.prepare("SELECT * FROM agentFailures ORDER BY count DESC LIMIT 1")
.get() as DbAgentFailure);
getDB().exec("UPDATE agentFailures SET processed = 1");

const agentFailuresAlertCount = agentFailuresAlert?.count ?? 0;
const unprocessedAgentFailuresAlertCount = unprocessedAgentFailuresAlertFromThrottle?.count ?? 0;
const countThresholdExceeded = count > 0 && agentFailuresAlertCount - activationThreshold > count;
const activationThresholdExceeded = activationThreshold > 0 && agentFailuresAlertCount <= activationThreshold;
const intervalExceeded = lastAgentFailureAlert.processed && lastAgentFailureAlert.timestamp > intervalDate;
Expand All @@ -52,9 +55,12 @@ function hasAgentFailureThrottle(throttle: SigynInitializedSelfMonitoring["throt
return Err(logMessage(false, "interval exceeded"));
}

if (unprocessedAgentFailuresAlertCount > 0) {
return Err(logMessage(false, "failures older than throttle interval"));
}

if (activationThresholdExceeded) {
return Err(logMessage(false, "activation threshold exceeded"));
return Err(logMessage(false, `activation threshold not reached (${agentFailuresAlertCount} <= ${activationThreshold})`));
}

if (countThresholdExceeded) {
Expand All @@ -73,6 +79,7 @@ function hasAgentFailureThrottle(throttle: SigynInitializedSelfMonitoring["throt
}

export function handleAgentFailure(errorMessage: string, rule: Rule, logger: Logger) {
const db = getDB();
const config = getConfig();
if (!config.selfMonitoring) {
logger.info("[SELF MONITORING](skip: disabled)");
Expand All @@ -95,16 +102,16 @@ export function handleAgentFailure(errorMessage: string, rule: Rule, logger: Log

try {
const dbRule = rule.getRuleFromDatabase();
getDB().exec("UPDATE agentFailures SET count = count + 1");
getDB()
db.exec("UPDATE agentFailures SET count = count + 1");
db
.prepare("INSERT INTO agentFailures (ruleId, message, timestamp) VALUES (?, ?, ?)")
.run(
dbRule.id,
errorMessage,
Date.now()
);

const agentFailures = getDB().prepare("SELECT * FROM agentFailures").all() as DbAgentFailure[];
const agentFailures = db.prepare("SELECT * FROM agentFailures").all() as DbAgentFailure[];
if (agentFailures.length > minimumErrorCount) {
const throttle = hasAgentFailureThrottle(config.selfMonitoring.throttle);
logger.info(`[SELF MONITORING]${throttle.val}`);
Expand All @@ -114,6 +121,7 @@ export function handleAgentFailure(errorMessage: string, rule: Rule, logger: Log
}

logger.info(`[SELF MONITORING](new alert: ${agentFailures.length} agent failures detected)`);
db.exec("UPDATE agentFailures SET processed = 1");

createAgentFailureAlert(agentFailures, config.selfMonitoring, logger);

Expand All @@ -122,7 +130,7 @@ export function handleAgentFailure(errorMessage: string, rule: Rule, logger: Log
const intervalDate = interval ?
cronUtils.durationOrCronToDate(interval, "subtract").valueOf() :
Date.now();
getDB().prepare("DELETE FROM agentFailures WHERE timestamp < ?").run(intervalDate);
db.prepare("DELETE FROM agentFailures WHERE timestamp < ?").run(intervalDate);
}
}
catch (error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"loki": {
"apiUrl": "http://localhost:3100"
},
"grafana": {
"apiUrl": "http://localhost:3000"
},
"notifiers": {
"discord": {
"notifier": "../../test/FT/mocks/sigyn-test-notifier.js",
"webhookUrl": "https://discord.com/api/webhooks/aaa/bbb"
}
},
"rules": [
{
"name": "State KO >= 80% with activationThreshold & interval",
"logql": "{app=\"sigyn\"} |~ `state: (ok|ko)` | regexp `state: (?P<state>ok|ko)`",
"polling": "200ms",
"alert": {
"on": {
"label": "state",
"value": "ko",
"percentThreshold": 80,
"minimumLabelCount": 10
},
"template": {
"title": "Alert"
}
}
}
],
"selfMonitoring": {
"notifiers": ["discord"],
"template": {
"title": "foo"
},
"throttle": {
"interval": "2s",
"activationThreshold": 4
}
}
}
4 changes: 4 additions & 0 deletions src/agent/test/FT/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ export class MockLogger {
return void 0;
}
}

export function resetAgentFailures() {
getDB().prepare("DELETE FROM agentFailures").run();
}
67 changes: 64 additions & 3 deletions src/agent/test/FT/selfMonitoring.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import { afterEach, before, beforeEach, describe, it } from "node:test";
import { setTimeout } from "node:timers/promises";

// Import Third-party Dependencies
import { initConfig } from "@sigyn/config";
import { SigynInitializedConfig, initConfig } from "@sigyn/config";
import { AsyncTask } from "toad-scheduler";
import isCI from "is-ci";

// Import Internal Dependencies
import { asyncTask } from "../../src/tasks/asyncTask";
import { MockLogger } from "./helpers";
import { MockLogger, resetAgentFailures } from "./helpers";
import { Rule } from "../../src/rules";
import { getDB, initDB } from "../../src/database";
import { resetCalls, getCalls, getArgs } from "./mocks/sigyn-test-notifier";
Expand All @@ -25,6 +26,10 @@ const kRuleNoFiltersConfigLocation = path.join(kFixturePath, "/no-self-monitorin
const kRuleThrottleConfigLocation = path.join(kFixturePath, "/self-monitoring-throttle/sigyn.config.json");
const kRuleActivationThresholdConfigLocation = path.join(kFixturePath, "/self-monitoring-activation-threshold/sigyn.config.json");
const kRuleIntervalThrottleConfigLocation = path.join(kFixturePath, "/self-monitoring-interval/sigyn.config.json");
const kRuleActivationThresholdThrottleConfigLocation = path.join(
kFixturePath,
"/self-monitoring-activation-threshold-interval/sigyn.config.json"
);
const kLogger = new MockLogger();
const kMockLokiApi = {
Loki: {
Expand All @@ -43,14 +48,14 @@ describe("Self-monitoring", () => {

before(async() => {
process.env.GRAFANA_API_TOKEN = "toto";
initDB(kLogger, { databaseFilename: ".temp/test-agent.sqlite3" });
});

afterEach(() => {
getDB().exec("DELETE FROM agentFailures");
});

it("should not send alert when error does not match errorFilters", async() => {
initDB(kLogger, { databaseFilename: ".temp/test-agent.sqlite3" });
const config = await initConfig(kRuleConfigLocation);
const rule = new Rule(config.rules[0], { logger: kLogger });
rule.init();
Expand Down Expand Up @@ -278,4 +283,60 @@ describe("Self-monitoring", () => {
// throttle deactivated, now 2 calls
assert.equal(getCalls(), 2);
});

describe("With both activationThreshold & interval", () => {
let config: SigynInitializedConfig;
let rule: Rule;
let task: AsyncTask;

before(async() => {
resetCalls();
resetAgentFailures();
config = await initConfig(kRuleActivationThresholdThrottleConfigLocation);
rule = new Rule(config.rules[0], { logger: kLogger });
rule.init();

task = asyncTask(
config.rules[0], {
logger: kLogger,
lokiApi: kMockLokiApi as any,
rule
}
);
});

it("should have throttle for 5s once activationThreshold is reached", async() => {
task.execute();
await setTimeout(kTimeout);
assert.equal(getCalls(), 1, "should send a first alert (1 / 4)");

task.execute();
await setTimeout(kTimeout);
assert.equal(getCalls(), 2, "should send alert when activationThreshold is not reached (2 / 4)");

task.execute();
await setTimeout(kTimeout);
assert.equal(getCalls(), 3, "should send alert when activationThreshold is not reached (3 / 4)");

task.execute();
await setTimeout(kTimeout);
assert.equal(getCalls(), 4, "should send alert when activationThreshold is not reached (4 / 4)");

task.execute();
await setTimeout(kTimeout);
assert.equal(getCalls(), 4, "should NOT send alert when activationThreshold is reached (5 / 4)");

function intervalCallback() {
task.execute();
}
const timer = setInterval(intervalCallback, 200);
await setTimeout(2000);
clearInterval(timer[Symbol.toPrimitive]());

task.execute();
await setTimeout(kTimeout);
assert.equal(getCalls(), 5, "should send alert when throttle ends with accumulated failures");
});
});
});

0 comments on commit cf06715

Please sign in to comment.