Skip to content

Commit

Permalink
feat: add queryRangeMatrix & add missing Unix timestamp (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraxken authored Jun 30, 2024
1 parent 52da390 commit db8f2e6
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 27 deletions.
53 changes: 45 additions & 8 deletions docs/Loki.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ const api = new GrafanaApi({
await api.Loki.series(`{env="production"}`);
```

Note that a TimeRange is defined as following:
Note that a TimeRange is defined as follows:

```ts
export type TimeRange = [first: number, last: number];
```
Expand All @@ -23,7 +24,9 @@ export type TimeRange = [first: number, last: number];

### queryRange< T = string >(logQL: LogQL | string, options?: LokiQueryOptions< T >): Promise< QueryRangeResponse< T > >

Note that you can provide a custom parser to queryRange (by default it inject a NoopParser doing nothing).
The `queryRange` method returns raw logs (without timestamp or metric/stream labels).

You can provide a custom parser to queryRange (by default it injects a **NoopParser** doing nothing).

```ts
const logs = await api.Loki.queryRange(
Expand All @@ -32,7 +35,9 @@ const logs = await api.Loki.queryRange(
console.log(logs);
```

queryRange options is described by the following TypeScript interface
#### options

The queryRange options are described by the following TypeScript interface:

```ts
export interface LokiQueryOptions<T> {
Expand All @@ -47,7 +52,9 @@ export interface LokiQueryOptions<T> {
}
```

<em>start</em> and <em>end</em> arguments can be either a unix timestamp or a duration like `6h`.
<em>start</em> and <em>end</em> arguments can be either a Unix timestamp or a duration like `6h`.

#### response

The response is described by the following interface:
```ts
Expand All @@ -57,6 +64,11 @@ export interface QueryRangeResponse<T extends LokiPatternType> {
}
```

`timerange` is **null** when there are no logs available with the given LogQL.

> [!CAUTION]
> When you use an incorrect pattern, any logs that are not correctly parsed will be removed from the result.
### queryRangeStream< T = string >(logQL: LogQL | string, options?: LokiQueryOptions< T >): Promise< QueryRangeStreamResponse< T > >

Same as `queryRange` but returns the labels key-value pairs stream
Expand All @@ -68,21 +80,46 @@ const logs = await api.Loki.queryRangeStream(
for (const { stream, values } of logs) {
// Record<string, string>
console.log(stream);
// string[]
// [unixEpoch: number, value: T][]
console.log(values);
}
```

The response is described by the following interface:
```ts
export interface QueryRangeStreamResponse<T extends LokiPatternType> {
interface QueryRangeStreamResponse<T extends LokiPatternType> {
logs: LokiStreamResult<LokiLiteralPattern<T>>[];
timerange: TimeRange | null;
}

interface LokiStreamResult<T = string> {
interface LokiStreamResult<T> {
stream: Record<string, string>;
values: T[];
values: [unixEpoch: number, value: T][];
}
```

### queryRangeMatrix< T = string >(logQL: LogQL | string, options?: LokiQueryOptions< T >): Promise< QueryRangeMatrixResponse< T > >

Same as `queryRange` but returns the labels key-value pairs metric

Matrix are returned for [Metric queries](https://grafana.com/docs/loki/latest/query/metric_queries/)
```
count_over_time({ label="value" }[5m])
```

---

The response is described by the following interface:

```ts
interface QueryRangeMatrixResponse<T extends LokiPatternType> {
logs: LokiMatrixResult<LokiLiteralPattern<T>>[];
timerange: TimeRange | null;
}

interface LokiMatrixResult<T> {
metric: Record<string, string>;
values: [unixEpoch: number, value: T][];
}
```

Expand Down
32 changes: 29 additions & 3 deletions src/class/Loki.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import {
LokiStream,
LokiMatrix,
QueryRangeResponse,
QueryRangeStreamResponse
QueryRangeStreamResponse,
QueryRangeMatrixResponse
} from "../types.js";
import { ApiCredential } from "./ApiCredential.class.js";

Expand Down Expand Up @@ -112,6 +113,29 @@ export class Loki {
);
}

async queryRangeMatrix<T extends LokiPatternType = string>(
logQL: LogQL | string,
options: LokiQueryOptions<T> = {}
): Promise<QueryRangeMatrixResponse<T>> {
const { pattern = new NoopPattern() } = options;
const parser: PatternShape<any> = pattern instanceof NoopPattern ?
pattern : new Pattern(pattern);

const { data } = await this.#fetchQueryRange<LokiMatrix>(logQL, options);

return {
logs: data.data.result.map((result) => {
return {
metric: result.metric,
values: result.values
.map(([unixEpoch, log]) => [unixEpoch, ...parser.executeOnLogs([log])])
.filter((log) => log.length > 1) as any[]
};
}),
timerange: utils.streamOrMatrixTimeRange(data.data.result)
};
}

async queryRangeStream<T extends LokiPatternType = string>(
logQL: LogQL | string,
options: LokiQueryOptions<T> = {}
Expand All @@ -126,10 +150,12 @@ export class Loki {
logs: data.data.result.map((result) => {
return {
stream: result.stream,
values: result.values.flatMap(([, log]) => parser.executeOnLogs([log])) as any[]
values: result.values
.map(([unixEpoch, log]) => [unixEpoch, ...parser.executeOnLogs([log])])
.filter((log) => log.length > 1) as any[]
};
}),
timerange: utils.queryRangeStreamTimeRange(data.data.result)
timerange: utils.streamOrMatrixTimeRange(data.data.result)
};
}

Expand Down
16 changes: 13 additions & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,22 @@ import { TimeRange } from "./utils.js";

export interface LokiStream {
stream: Record<string, string>;
values: [unixEpoch: string, log: string][];
values: [unixEpoch: number, log: string][];
}

export interface LokiMatrix {
metric: Record<string, string>;
values: [unixEpoch: string, value: string][];
values: [unixEpoch: number, value: string][];
}

export interface LokiMatrixResult<T> {
metric: Record<string, string>;
values: [unixEpoch: number, value: T][];
}

export interface LokiStreamResult<T> {
stream: Record<string, string>;
values: T[];
values: [unixEpoch: number, value: T][];
}

export interface RawQueryRangeResponse<T = LokiStream> {
Expand Down Expand Up @@ -79,4 +84,9 @@ export interface QueryRangeStreamResponse<T extends LokiPatternType> {
timerange: TimeRange | null;
}

export interface QueryRangeMatrixResponse<T extends LokiPatternType> {
logs: LokiMatrixResult<LokiLiteralPattern<T>>[];
timerange: TimeRange | null;
}

export { TimeRange };
16 changes: 9 additions & 7 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Import Third-party Dependencies
import dayjs from "dayjs";
import dayjs, { Dayjs } from "dayjs";
import ms from "ms";

// Import Internal Dependencies
Expand All @@ -23,9 +23,9 @@ export function escapeStringRegExp(str: string): string {
return str.replace(/[|\\{}()[\]^$+*?.]/g, "\\$&");
}

export function transformStreamValue(
value: [unixEpoch: string, log: string]
) {
export function transformStreamOrMatrixValue(
value: [unixEpoch: number, log: string]
): { date: Dayjs; log: string } {
const [unixEpoch, log] = value;

return {
Expand All @@ -45,7 +45,7 @@ export function inlineLogs(

const flatLogs = result.data.result
.flatMap(
(host) => host.values.map(transformStreamValue)
(host) => host.values.map(transformStreamOrMatrixValue)
)
.sort((left, right) => (left.date.isBefore(right.date) ? 1 : -1));
if (flatLogs.length === 0) {
Expand All @@ -58,14 +58,16 @@ export function inlineLogs(
};
}

export function queryRangeStreamTimeRange(result: LokiStream[]): [number, number] | null {
export function streamOrMatrixTimeRange(
result: (LokiStream | LokiMatrix)[]
): [number, number] | null {
if (result.length === 0) {
return null;
}

const flatLogs = result
.flatMap(
(host) => host.values.map(transformStreamValue)
(host) => host.values.map(transformStreamOrMatrixValue)
)
.sort((left, right) => (left.date.isBefore(right.date) ? 1 : -1));

Expand Down
60 changes: 54 additions & 6 deletions test/Loki.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from "@myunisoft/
import {
GrafanaApi,
LokiStandardBaseResponse,
RawQueryRangeResponse
RawQueryRangeResponse,
LokiMatrix
} from "../src/index.js";

// CONSTANTS
Expand Down Expand Up @@ -84,12 +85,42 @@ describe("GrafanaApi.Loki", () => {
const sdk = new GrafanaApi({ remoteApiURL: kDummyURL });

const result = await sdk.Loki.queryRangeStream("{app='foo'}");
const resultLogs = result.logs[0]!;

assert.ok(
resultLogs.values.every((arr) => typeof arr[0] === "number")
);
assert.deepEqual(
resultLogs.values.map((arr) => arr[1]),
expectedLogs.slice(0)
);
assert.deepEqual(resultLogs.stream, { foo: "bar" });
});

it("should return expectedLogs with no modification (using NoopParser, queryRangeMatrix)", async() => {
const expectedLogs = ["hello world", "foobar"];

agentPoolInterceptor
.intercept({
path: (path) => path.includes("loki/api/v1/query_range")
})
.reply(200, mockMatrixResponse(expectedLogs), {
headers: { "Content-Type": "application/json" }
});

const sdk = new GrafanaApi({ remoteApiURL: kDummyURL });

const result = await sdk.Loki.queryRangeMatrix("{app='foo'}");
const resultLogs = result.logs[0]!;

assert.ok(
resultLogs.values.every((arr) => typeof arr[0] === "number")
);
assert.deepEqual(
result.logs[0].values,
resultLogs.values.map((arr) => arr[1]),
expectedLogs.slice(0)
);
assert.deepEqual(result.logs[0].stream, { foo: "bar" });
assert.deepEqual(resultLogs.metric, { foo: "bar" });
});

it("should return empty list of logs (using NoopParser, queryRangeStream)", async() => {
Expand Down Expand Up @@ -152,13 +183,14 @@ describe("GrafanaApi.Loki", () => {
const result = await sdk.Loki.queryRangeStream("{app='foo'}", {
pattern: "hello '<name>'"
});
const resultLogs = result.logs[0]!;

assert.strictEqual(result.logs.length, 1);
assert.deepEqual(
result.logs[0].values[0],
resultLogs.values[0][1],
{ name: "Thomas" }
);
assert.deepEqual(result.logs[0].stream, { foo: "bar" });
assert.deepEqual(resultLogs.stream, { foo: "bar" });
});

it("should return empty list of logs (using LogParser, queryRangeStream)", async() => {
Expand Down Expand Up @@ -324,6 +356,22 @@ function mockStreamResponse(logs: string[]): DeepPartial<RawQueryRangeResponse>
};
}

function mockMatrixResponse(logs: string[]): DeepPartial<RawQueryRangeResponse<LokiMatrix>> {
return {
status: "success",
data: {
resultType: "matrix",
result: logs.length > 0 ? [
{
metric: { foo: "bar" },
values: logs.map((log) => [getNanoSecTime(), log])
}
] : [],
stats: {}
}
};
}

function mockLabelResponse<T>(
status: "success" | "failed",
response: T[]
Expand All @@ -337,6 +385,6 @@ function mockLabelResponse<T>(
function getNanoSecTime() {
const hrTime = process.hrtime();

return String((hrTime[0] * 1000000000) + hrTime[1]);
return (hrTime[0] * 1000000000) + hrTime[1];
}

0 comments on commit db8f2e6

Please sign in to comment.