Skip to content

Commit

Permalink
Merge pull request #285 from Sanketika-Obsrv/open-telemetry
Browse files Browse the repository at this point in the history
Open Telemetry Integration
  • Loading branch information
HarishGangula authored Nov 27, 2024
2 parents 318fcd7 + 640a845 commit d6a49f6
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 7 deletions.
11 changes: 11 additions & 0 deletions api-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@
"@azure/storage-blob": "^12.17.0",
"@google-cloud/storage": "^7.9.0",
"@jsonhero/schema-infer": "^0.1.5",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/exporter-logs-otlp-http": "^0.53.0",
"@opentelemetry/exporter-metrics-otlp-http": "^0.53.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.55.0",
"@opentelemetry/resources": "^1.28.0",
"@opentelemetry/sdk-logs": "^0.53.0",
"@opentelemetry/sdk-metrics": "^1.26.0",
"@opentelemetry/sdk-node": "^0.53.0",
"@opentelemetry/sdk-trace-base": "^1.28.0",
"@opentelemetry/sdk-trace-node": "^1.28.0",
"@opentelemetry/semantic-conventions": "^1.28.0",
"@project-sunbird/logger": "^0.0.9",
"ajv": "^8.11.2",
"ajv-formats": "^2.1.1",
Expand Down
17 changes: 11 additions & 6 deletions api-service/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import express, { Application } from "express";
import {router as v2Router} from "./routes/Router"
import { metricRouter } from "./routes/MetricRouter"
import { druidProxyRouter } from "./routes/DruidProxyRouter"
import { druidProxyRouter } from "./routes/DruidProxyRouter";
import { metricRouter } from "./routes/MetricRouter";
import { router as v2Router } from "./routes/Router";

import bodyParser from "body-parser";
import { errorHandler, obsrvErrorHandler } from "./middlewares/errors";
import { ResponseHandler } from "./helpers/ResponseHandler";
import { config } from "./configs/Config";
import { ResponseHandler } from "./helpers/ResponseHandler";
import { errorHandler, obsrvErrorHandler } from "./middlewares/errors";
import { OTelService } from "./services/otel/OTelService";
import { alertsRouter } from "./routes/AlertsRouter";
import { interceptAuditEvents } from "./services/telemetry";
import _ from "lodash";



const app: Application = express();

((config.otel && _.toLower(config?.otel?.enable) === "true")) && OTelService.init() // Initialisation of Open telemetry Service.

app.use(bodyParser.json({ limit: config.body_parser_limit}));
app.use(express.text());
app.use(express.json());
Expand Down
6 changes: 6 additions & 0 deletions api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,10 @@ export const config = {
},
"user_token_public_key": process.env.user_token_public_key || "",
"is_RBAC_enabled": process.env.is_rbac_enabled || "false",
"otel": {
"enable": process.env.OTEL_ENABLE || "true",
"collector_endpoint": process.env.OTEL_COLLECTOR_ENDPOINT || "http://localhost:4318"
}


}
155 changes: 155 additions & 0 deletions api-service/src/services/otel/OTelService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import { Counter, diag, DiagConsoleLogger, DiagLogLevel, Meter, metrics } from '@opentelemetry/api';
import * as logsAPI from '@opentelemetry/api-logs';
import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-http';
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { Resource } from '@opentelemetry/resources';
import { BatchLogRecordProcessor, LoggerProvider } from '@opentelemetry/sdk-logs';
import { MeterProvider, PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import logger from '../../logger';
import * as _ from "lodash";
import { config } from "../../configs/Config";
const collectorEndpoint = _.get(config, "otel.collector_endpoint", "http://localhost:4318");

export class OTelService {
private static meterProvider: MeterProvider;
private static loggerProvider: LoggerProvider;
private static tracerProvider: NodeTracerProvider;

public static init() {
this.tracerProvider = this.createTracerProvider(collectorEndpoint);
this.meterProvider = this.createMeterProvider(collectorEndpoint);
this.loggerProvider = this.createLoggerProvider(collectorEndpoint);

// Register the global tracer, meter, and logger providers
this.tracerProvider.register();
this.setGlobalMeterProvider(this.meterProvider);

logger.info("OpenTelemetry Service Initialized");

// Add shutdown hook
process.on('SIGTERM', async () => {
await this.tracerProvider.shutdown();
await this.meterProvider.shutdown();
await this.loggerProvider.shutdown();
});
}

private static createTracerProvider(endpoint: string) {
const traceExporter = new OTLPTraceExporter({
url: `${endpoint}/v1/traces`,
});

const tracerProvider = new NodeTracerProvider({
resource: this.createServiceResource('obsrv-api-service'),
});

tracerProvider.addSpanProcessor(new BatchSpanProcessor(traceExporter));

return tracerProvider;
}

private static createMeterProvider(endpoint: string) {
const metricExporter = new OTLPMetricExporter({
url: `${endpoint}/v1/metrics`,
});

const meterProvider = new MeterProvider({
resource: this.createServiceResource('obsrv-api-service'),
});

meterProvider.addMetricReader(
new PeriodicExportingMetricReader({
exporter: metricExporter,
exportIntervalMillis: 10000,
})
);

return meterProvider;
}

private static createLoggerProvider(endpoint: string) {
const logExporter = new OTLPLogExporter({
url: `${endpoint}/v1/logs`,
});

const loggerProvider = new LoggerProvider({
resource: this.createServiceResource('obsrv-api-service'),
});

loggerProvider.addLogRecordProcessor(
new BatchLogRecordProcessor(logExporter)
);

return loggerProvider;
}

// Helper method to create a Resource with service name
private static createServiceResource(serviceName: string) {
return new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: serviceName,
});
}

private static setGlobalMeterProvider(meterProvider: MeterProvider) {
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.INFO);
diag.info('Registering MeterProvider globally.');
metrics.setGlobalMeterProvider(meterProvider);
}

// Method to create a counter metric
public static createCounterMetric(name: string): Counter {
const meter = this.getMeterProvider(); // Use the updated getMeterProvider method
const counter = meter.createCounter(name, {
description: 'Counts the number of API calls',
});
return counter;
}

public static getMeterProvider(): Meter {
return this.meterProvider.getMeter('obsrv-api-service');
}

public static getLoggerProvider(): LoggerProvider {
return this.loggerProvider;
}

public static getTracerProvider(): NodeTracerProvider {
return this.tracerProvider;
}

// Method to record the counter metric
public static recordCounter(counter: Counter, value: number) {
counter.add(value, {
service: 'obsrv-api-service',
});
}


public static generateOTelLog(auditLog: Record<string, any>, severity: 'INFO' | 'WARN' | 'ERROR', logType?: string) {
const loggerInstance = this.loggerProvider.getLogger('obsrv-api-service');

const severityMapping: Record<string, number> = {
INFO: logsAPI.SeverityNumber.INFO,
WARN: logsAPI.SeverityNumber.WARN,
ERROR: logsAPI.SeverityNumber.ERROR,
};

const severityNumber = severityMapping[severity] || logsAPI.SeverityNumber.INFO;

const logRecord = {
severityNumber,
severityText: severity,
body: JSON.stringify(auditLog),
attributes: {
'log.type': logType || 'console',
...auditLog,
},
};
loggerInstance.emit(logRecord);
}

}
4 changes: 3 additions & 1 deletion api-service/src/services/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { v4 } from "uuid";
import _ from "lodash";
import { config as appConfig } from "../configs/Config";
import {send} from "../connections/kafkaConnection"
import { OTelService } from "./otel/OTelService";

const {env, version} = _.pick(appConfig, ["env","version"])
const telemetryTopic = _.get(appConfig, "telemetry_dataset");
Expand Down Expand Up @@ -50,7 +51,8 @@ const getDefaultEdata = ({ action }: any) => ({
})

const sendTelemetryEvents = async (event: Record<string, any>) => {
send({ messages: [{ value: JSON.stringify(event) }] }, telemetryTopic).catch(console.log);
OTelService.generateOTelLog(event, 'INFO', 'audit-log');
send(event, telemetryTopic).catch(console.log);
}

const transformProps = (body: Record<string, any>) => {
Expand Down

0 comments on commit d6a49f6

Please sign in to comment.