Skip to content

Commit

Permalink
refactor(agent): improve logging
Browse files Browse the repository at this point in the history
- add a new timeout option
- update dependencies & GHAs
  • Loading branch information
PierreDemailly committed Dec 10, 2023
1 parent 83d4dca commit 651b7cd
Show file tree
Hide file tree
Showing 17 changed files with 603 additions and 393 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
uses: github/codeql-action/init@c0d1daa7f7e14667747d73a7dbbe8c074bc8bfe2 # v2.22.9
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
Expand All @@ -68,7 +68,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, Go, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
uses: github/codeql-action/autobuild@c0d1daa7f7e14667747d73a7dbbe8c074bc8bfe2 # v2.22.9

# ℹ️ Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
Expand All @@ -81,6 +81,6 @@ jobs:
# ./location_of_script_within_repo/buildscript.sh

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
uses: github/codeql-action/analyze@c0d1daa7f7e14667747d73a7dbbe8c074bc8bfe2 # v2.22.9
with:
category: "/language:${{matrix.language}}"
2 changes: 1 addition & 1 deletion .github/workflows/scorecard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ jobs:

# Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
uses: github/codeql-action/upload-sarif@c0d1daa7f7e14667747d73a7dbbe8c074bc8bfe2 # v2.22.9
with:
sarif_file: results.sarif
689 changes: 437 additions & 252 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
"src/pattern"
],
"devDependencies": {
"@nodesecure/eslint-config": "^1.8.0",
"@types/node": "^20.9.4",
"@nodesecure/eslint-config": "^1.9.0",
"@types/node": "^20.10.4",
"c8": "^8.0.1",
"cross-env": "^7.0.3",
"glob": "^10.3.10",
"tsup": "^8.0.1",
"tsx": "^4.4.0",
"typescript": "^5.3.2"
"tsx": "^4.6.2",
"typescript": "^5.3.3"
}
}
2 changes: 2 additions & 0 deletions src/agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Run Sigyn agent. It will fetch logs depending your rules `polling` and send aler
- `location: string` Optional, default to `process.cwd()`. The path to your SQLite database, it will create the file if it doesn't exists but the directory **must** exists.
- `options.logger: Logger` Optional, default to `pino`. You can use your own logger which must be an object with theses 3 methods: `debug`, `info` & `error`.
- `options.level` Optional, only works if no logger given. Set log level: `"info" | "debug" | "error"`.
- `options.timeout` Optional, you can provide a timeout for Grafana API requests. Default: `30_000`.

The returned scheduler instance allow you to put some extra logic if needed, see [API for scheduler](https://github.com/kibertoad/toad-scheduler/blob/main/README.md#api-for-scheduler).

Expand All @@ -92,6 +93,7 @@ interface Logger {
interface StartOptions {
logger?: Logger;
level?: "info" | "debug" | "error";
timeout?: number;
}
```

Expand Down
6 changes: 4 additions & 2 deletions src/agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
"author": "GENTILHOMME Thomas <[email protected]>",
"license": "MIT",
"dependencies": {
"@myunisoft/loki": "^2.1.0",
"better-sqlite3": "^9.1.1",
"@myunisoft/httpie": "^3.0.0",
"@myunisoft/loki": "^3.0.0",
"@openally/result": "^1.2.0",
"better-sqlite3": "^9.2.2",
"cron-parser": "^4.9.0",
"croner": "^7.0.5",
"dayjs": "^1.11.10",
Expand Down
8 changes: 4 additions & 4 deletions src/agent/src/datasource.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
// Import Third-party Dependencies
import { GrafanaLoki } from "@myunisoft/loki";
import { GrafanaApi } from "@myunisoft/loki";

export class Datasource {
#lokiApi: GrafanaLoki;
#lokiApi: GrafanaApi;

private static datasource: Datasource;

private constructor(host: string) {
this.#lokiApi = new GrafanaLoki({
this.#lokiApi = new GrafanaApi({
remoteApiURL: host
});
}

fetchDatasources() {
return this.#lokiApi.datasources();
return this.#lokiApi.Datasources.all();
}

static async Loki(host: string) {
Expand Down
10 changes: 7 additions & 3 deletions src/agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import timers from "node:timers/promises";

// Import Third-party Dependencies
import { initConfig } from "@sigyn/config";
import { GrafanaLoki } from "@myunisoft/loki";
import { GrafanaApi } from "@myunisoft/loki";
import { Agent, setGlobalDispatcher } from "@myunisoft/httpie";
import { ToadScheduler, CronJob, SimpleIntervalJob } from "toad-scheduler";
import { pino } from "pino";
import ms from "ms";
Expand All @@ -27,6 +28,7 @@ export interface Logger {
export interface StartOptions {
logger?: Logger;
level?: "info" | "debug" | "error";
timeout?: number;
}

function defaultLogger(level: StartOptions["level"]) {
Expand All @@ -42,17 +44,19 @@ export async function start(
location = process.cwd(),
options: StartOptions = {}
) {
const { logger, level = "info" } = options;
const { logger, level = "info", timeout = 30_000 } = options;
const agentLogger = logger ?? defaultLogger(level);

agentLogger.info(`Starting sigyn agent at '${location}'`);

setGlobalDispatcher(new Agent({ connect: { timeout } }));
initDB(agentLogger);

const { rules, loki } = await initConfig(
path.join(location, "/sigyn.config.json")
);

const lokiApi = new GrafanaLoki({
const lokiApi = new GrafanaApi({
remoteApiURL: loki.apiUrl
});

Expand Down
124 changes: 64 additions & 60 deletions src/agent/src/rules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import dayjs from "dayjs";
import ms from "ms";
import cronParser from "cron-parser";
import { Database } from "better-sqlite3";
import { Result, Ok, Err } from "@openally/result";

// Import Internal Dependencies
import { DbRule, DbRuleLabel, getDB, getOldestLabelTimestamp } from "./database";
Expand All @@ -21,6 +22,7 @@ export class Rule {
config: SigynInitializedRule;
#logger: Logger;
#lastFetchedStream: Record<string, string> | null = null;
#now: number;

constructor(rule: SigynInitializedRule, options: RuleOptions) {
const { logger } = options;
Expand Down Expand Up @@ -90,71 +92,36 @@ export class Rule {
}
}

async walkOnLogs(logs: LokiStreamResult[]): Promise<boolean> {
walkOnLogs(logs: LokiStreamResult<string>[]): Result<true, string> {
this.#lastFetchedStream = null;
this.#now = dayjs().valueOf();

const db = getDB();
const now = dayjs().valueOf();
const rule = this.getRuleFromDatabase();
if (rule.muteUntil > now) {
return false;
}

const ruleLabels = this.getDistinctLabelsFromDatabase(rule.id);
const lastCounter = rule.counter;
const existingLabels = new Set();
for (const label of ruleLabels) {
existingLabels.add(`${label.key}:${label.value}`);
if (rule.muteUntil > this.#now) {
return Err("Rule is muted by higher level composite rule");
}

const ruleLabelsInsertStmt = db.prepare("INSERT INTO ruleLabels (ruleId, key, value, timestamp) VALUES (?, ?, ?, ?)");
const ruleLogInsertStmt = db.prepare("INSERT INTO ruleLogs (ruleId, log, timestamp) VALUES (?, ?, ?)");

db.transaction(() => {
for (const { stream, values } of logs) {
if (this.#lastFetchedStream === null) {
this.#lastFetchedStream = stream;
}
for (const [key, value] of Object.entries(stream)) {
if (!(this.#lastFetchedStream![key] ??= value).split(",").includes(value)) {
this.#lastFetchedStream![key] += `,${value}`;
}

// If rule is based on label, insert as many label as there is values
// because we receive only one stream for N values (but the stream is the same for each value)
if (this.config.alert.on.label === key) {
let insertedCount = 0;

while (insertedCount++ < values.length) {
ruleLabelsInsertStmt.run(rule.id, key, value, now);
}
}
else if (!existingLabels.has(`${key}:${value}`)) {
ruleLabelsInsertStmt.run(rule.id, key, value, now);
existingLabels.add(`${key}:${value}`);
}
}

for (const log of values) {
ruleLogInsertStmt.run(rule.id, log, now);
}
}
})();
this.#insertLogsInDB(logs);

if (this.config.alert.on.label) {
return this.#checkLabelThreshold(rule);
const checkLabelThreshold = this.#checkLabelThreshold(rule);

return checkLabelThreshold ? Ok(true) : Err("Label threshold does not match");
}

const timeThreshold = utils.cron
.durationOrCronToDate(this.config.alert.on.interval!, "subtract")
.valueOf();

if (rule.lastIntervalReset === null || rule.lastIntervalReset - timeThreshold < 0) {
db.prepare("UPDATE rules SET lastIntervalReset = ?, firstReset = ? WHERE id = ?").run(now, rule.lastIntervalReset === null ? 1 : 0, rule.id);
db.prepare("UPDATE rules SET lastIntervalReset = ?, firstReset = ? WHERE id = ?").run(this.#now, rule.lastIntervalReset === null ? 1 : 0, rule.id);
rule.firstReset = rule.lastIntervalReset === null ? 1 : 0;
rule.lastIntervalReset = now;
rule.lastIntervalReset = this.#now;
}

const previousCounter = rule.counter;
rule.counter = (
db.prepare("SELECT COUNT(id) as counter FROM ruleLogs WHERE ruleId = ? AND processed = 0 AND timestamp >= ?")
.get(rule.id, timeThreshold) as { counter: null | number }
Expand All @@ -163,37 +130,74 @@ export class Rule {
db.prepare("UPDATE rules SET counter = ? WHERE id = ?").run(rule.counter, rule.id);

const alertThreshold = this.config.alert.on.count!;
this.#logger.info(`[${rule.name}](state: handle|logs: ${logs.reduce((acc, curr) => acc + curr.values.length, 0)}|polling: ${this.#getCurrentPolling()[1]}|previous: ${lastCounter}|new: ${rule.counter - lastCounter}|next: ${rule.counter}|alertThreshold: ${alertThreshold}|timeThreshold: ${timeThreshold})`);
this.#logger.info(`[${rule.name}](state: handle|polling: ${this.#getCurrentPolling()[1]}|previous: ${previousCounter}|new: ${rule.counter - previousCounter}|next: ${rule.counter}|alertThreshold: ${alertThreshold}|timeThreshold: ${timeThreshold})`);

const [operator, value] = utils.rules.countThresholdOperator(alertThreshold);

if (operator.startsWith("<")) {
// we checking for a max value, so we want to wait the whole interval before sending an alert
if (rule.lastIntervalReset !== now || rule.firstReset === 1) {
return false;
}

if (!utils.rules.countMatchOperator(operator, rule.counter, value)) {
return false;
}
// we checking for a max value, so we want to wait the whole interval before sending an alert
if (operator.startsWith("<") && (rule.lastIntervalReset !== this.#now || rule.firstReset === 1)) {
return Err("Waiting the whole interval before comparing logs");
}
else if (!utils.rules.countMatchOperator(operator, rule.counter, value)) {
return false;
return Err(`Logs does not match operator value (o:${operator}|c:${rule.counter}|v:${value})`);
}

const cancelAlert = this.#checkThrottle(rule, db);
if (cancelAlert) {
return false;
return Err(`Rule throttle activated`);
}

this.#logger.error(`[${rule.name}](state: alert|threshold: ${alertThreshold}|actual: ${rule.counter})`);

db.transaction(() => {
db.prepare("UPDATE rules SET counter = 0, threshold = ?, lastIntervalReset = ? WHERE id = ?").run(rule.counter, now, rule.id);
db.prepare("UPDATE rules SET counter = 0, threshold = ?, lastIntervalReset = ? WHERE id = ?").run(rule.counter, this.#now, rule.id);
db.prepare("UPDATE ruleLogs SET processed = 1 WHERE ruleId = ?").run(rule.id);
})();

return true;
return Ok(true);
}

#insertLogsInDB(logs: LokiStreamResult<string>[]): void {
const rule = this.getRuleFromDatabase();
const ruleLabels = this.getDistinctLabelsFromDatabase(rule.id);
const existingLabels = new Set();
for (const label of ruleLabels) {
existingLabels.add(`${label.key}:${label.value}`);
}
const db = getDB();
const ruleLabelsInsertStmt = db.prepare("INSERT INTO ruleLabels (ruleId, key, value, timestamp) VALUES (?, ?, ?, ?)");
const ruleLogInsertStmt = db.prepare("INSERT INTO ruleLogs (ruleId, log, timestamp) VALUES (?, ?, ?)");

db.transaction(() => {
for (const { stream, values } of logs) {
if (this.#lastFetchedStream === null) {
this.#lastFetchedStream = stream;
}
for (const [key, value] of Object.entries(stream)) {
if (!(this.#lastFetchedStream![key] ??= value).split(",").includes(value)) {
this.#lastFetchedStream![key] += `,${value}`;
}

// If rule is based on label, insert as many label as there is values
// because we receive only one stream for N values (but the stream is the same for each value)
if (this.config.alert.on.label === key) {
let insertedCount = 0;

while (insertedCount++ < values.length) {
ruleLabelsInsertStmt.run(rule.id, key, value, this.#now);
}
}
else if (!existingLabels.has(`${key}:${value}`)) {
ruleLabelsInsertStmt.run(rule.id, key, value, this.#now);
existingLabels.add(`${key}:${value}`);
}
}

for (const log of values) {
ruleLogInsertStmt.run(rule.id, log, this.#now);
}
}
})();
}

#checkLabelThreshold(rule: DbRule): boolean {
Expand Down
22 changes: 16 additions & 6 deletions src/agent/src/tasks/asyncTask.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// Import Node.js Dependencies
import { performance } from "node:perf_hooks";

// Import Third-party Dependencies
import { SigynInitializedRule } from "@sigyn/config";
import { GrafanaLoki } from "@myunisoft/loki";
import { GrafanaApi } from "@myunisoft/loki";
import { AsyncTask } from "toad-scheduler";
import ms from "ms";

// Import Internal Dependencies
import { Rule } from "../rules";
Expand All @@ -12,7 +16,7 @@ import { handleAgentFailure } from "../utils/selfMonitoring";
export interface AsyncTaskOptions {
logger: Logger;
rule: Rule;
lokiApi: GrafanaLoki;
lokiApi: GrafanaApi;
}

export function asyncTask(ruleConfig: SigynInitializedRule, options: AsyncTaskOptions) {
Expand All @@ -26,19 +30,25 @@ export function asyncTask(ruleConfig: SigynInitializedRule, options: AsyncTaskOp

logger.info(`[${ruleConfig.name}](state: polling|start: ${start}|int: ${Date.now() - start}|query: ${ruleConfig.logql})`);

const t0 = performance.now();
try {
const { logs } = await lokiApi.queryRangeStream<string>(ruleConfig.logql, {
const { logs } = await lokiApi.Loki.queryRangeStream<string>(ruleConfig.logql, {
start
});
const logsCount = logs.reduce((acc, curr) => acc + curr.values.length, 0);
logger.info(`[${ruleConfig.name}](logs: ${logsCount}|execTime: ${ms(performance.now() - t0)})`);

const createAlert = await rule.walkOnLogs(logs);
if (createAlert) {
const createAlertResult = rule.walkOnLogs(logs);
if (createAlertResult.ok) {
createRuleAlert(rule.getAlertFormattedRule(), ruleConfig, logger);
rule.clearLabels();
}
else {
logger.debug(`[${ruleConfig.name}](debug: ${createAlertResult.val})`);
}
}
catch (error) {
logger.error(`[${ruleConfig.name}](error: ${error.message})`);
logger.error(`[${ruleConfig.name}](error: ${error.message}|execTime: ${ms(performance.now() - t0)})`);
logger.debug(error);

handleAgentFailure(error.message, rule, logger);
Expand Down
Loading

0 comments on commit 651b7cd

Please sign in to comment.