Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release-1.5.0' into dataset-alia…
Browse files Browse the repository at this point in the history
…sing
  • Loading branch information
JeraldJF committed Jan 16, 2025
2 parents 49868a1 + b41d459 commit 88eec14
Show file tree
Hide file tree
Showing 23 changed files with 936 additions and 92 deletions.
1 change: 1 addition & 0 deletions api-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"busboy": "^1.6.0",
"compression": "^1.7.4",
"dateformat": "2.0.0",
"dayjs": "^1.11.13",
"express": "^5.0.0-beta.3",
"http-errors": "^2.0.0",
"http-status": "^1.5.3",
Expand Down
9 changes: 7 additions & 2 deletions api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export const config = {
"command_service_config": {
"host": process.env.command_service_host || "http://localhost",
"port": parseInt(process.env.command_service_port || "8000"),
"path": process.env.command_service_path || "/system/v1/dataset/command"
"paths": JSON.parse(process.env.command_service_paths || '{"dataset":"/system/v1/dataset/command","connector":"/connector/v1/register","analyzePII":"/system/data/v1/analyze/pii"}')
},
"flink_job_configs": {
"pipeline_merged_job_manager_url": process.env.pipeline_merged_job_manager_url || "http://localhost:8081",
Expand All @@ -119,5 +119,10 @@ export const config = {
"enable": process.env.otel_enable || "false",
"collector_endpoint": process.env.otel_collector_endpoint || "http://localhost:4318"
},
"storage_types": process.env.storage_types || '{"lake_house":true,"realtime_store":true}'
"storage_types": process.env.storage_types || '{"lake_house":true,"realtime_store":true}',
"data_observability": {
"default_freshness_threshold": process.env.default_freshness_threshold ? parseInt(process.env.default_freshness_threshold) : 5, // in minutes
"data_out_query_time_period": process.env.data_out_query_time_period || "1d",
"default_query_time_period": process.env.default_query_time_period ? parseInt(process.env.default_query_time_period) : 7, // in days
}
}
4 changes: 2 additions & 2 deletions api-service/src/configs/IngestionConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ export const rawIngestionSpecDefaults = {
"flattenSpec": [
{
"type": "path",
"expr": "$.obsrv_meta.source.connector",
"expr": "$.obsrv_meta.source.entry_source",
"name": "obsrv.meta.source.connector"
},
{
"type": "path",
"expr": "$.obsrv_meta.source.connectorInstance",
"expr": "$.obsrv_meta.source.id",
"name": "obsrv.meta.source.id"
}
],
Expand Down
18 changes: 13 additions & 5 deletions api-service/src/connections/commandServiceConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,29 @@ import { v4 } from "uuid";

const commandHost = _.get(config, ["command_service_config", "host"])
const commandPort = _.get(config, ["command_service_config", "port"])
const commandPath = _.get(config, ["command_service_config", "path"])
const commandPaths = _.get(config, ["command_service_config", "paths"])
const datasetPath = _.get(commandPaths, ["dataset"])
const connectorRegisterPath = _.get(commandPaths, ["connector"])
const analyzePIIPath = _.get(commandPaths, ["analyzePII"])

export const commandHttpService = axios.create({ baseURL: `${commandHost}:${commandPort}`, headers: { "Content-Type": "application/json" } });

export const executeCommand = async (id: string, command: string) => {
export const executeCommand = async (id: string, command: string, userToken: string) => {
const payload = {
"id": v4(),
"data": {
"dataset_id": id,
"command": command
}
}
return commandHttpService.post(commandPath, payload)
return commandHttpService.post(datasetPath, payload, { headers: { Authorization: userToken }})
}

export const registerConnector = async (requestBody: any) => {
return commandHttpService.post("/connector/v1/register", requestBody)
export const registerConnector = async (requestBody: any, userToken: string) => {
return commandHttpService.post(connectorRegisterPath, requestBody, { headers: { Authorization: userToken }})
}

export const detectPII = async (requestBody: any, userToken: string) => {
console.log(`analyzePIIPath : ${analyzePIIPath}`)
return commandHttpService.post(analyzePIIPath, requestBody, { headers: { Authorization: userToken }})
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const connectorRegisterController = async (req: Request, res: Response) => {
relative_path: uploadStreamResponse[0]
}
logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` })
const registryResponse = await registerConnector(payload);
const userToken = req.get('authorization') as string;
const registryResponse = await registerConnector(payload, userToken);
logger.info({ apiId, resmsgid, message: `Connector registered successfully` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: registryResponse?.data?.message } })
} catch (error: any) {
Expand Down Expand Up @@ -118,4 +119,4 @@ const extractFileNameFromPath = (filePath: string): string[] => {
return filePath.match(regex) || [];
};

export default connectorRegisterController;
export default connectorRegisterController;
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Request, Response } from "express";
import { ResponseHandler } from "../../helpers/ResponseHandler";
import httpStatus from "http-status";
import * as _ from "lodash";
import logger from "../../logger";
import { detectPII } from "../../connections/commandServiceConnection";

const code = "FAILED_TO_DETECT_PII";
export const dataAnalyzePII = async (req: Request, res: Response) => {
const apiId = _.get(req, 'id')
try {
const userToken = req.get('authorization') as string;
const piiSuggestionsResponse = await detectPII(_.get(req, ['body', 'data']), userToken);
logger.info({apiId , message: `Detected PII successfully` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: piiSuggestionsResponse?.data})
} catch (error: any) {
const errMessage = _.get(error, "response.data.detail")
logger.error(error, apiId, code);
let errorMessage = error;
const statusCode = _.get(error, "status")
if (!statusCode || statusCode == 500) {
errorMessage = { code, message: errMessage || "Failed to detect pii" }
}
ResponseHandler.errorResponse(errorMessage, req, res);
}
}
10 changes: 5 additions & 5 deletions api-service/src/controllers/DataExhaust/DataExhaustController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { cloudProvider } from "../../services/CloudServices";

export const dataExhaust = async (req: Request, res: Response) => {
const { params } = req;
const { datasetId } = params;
const { dataset_id } = params;
const { type }: any = req.query;
const momentFormat = "YYYY-MM-DD";

Expand All @@ -28,12 +28,12 @@ export const dataExhaust = async (req: Request, res: Response) => {
return resData || {};
}

if (type && config.cloud_config.exclude_exhaust_types.includes(datasetId)) {
if (type && config.cloud_config.exclude_exhaust_types.includes(dataset_id)) {
return ResponseHandler.errorResponse({ statusCode: 404, message: "Record not found", errCode: httpStatus["404_NAME"] }, req, res)
}
const datasetRecord = await verifyDatasetExists(datasetId);
const datasetRecord = await verifyDatasetExists(dataset_id);
if (datasetRecord === null) {
logger.error(`Dataset with ${datasetId} not found in live table`)
logger.error(`Dataset with ${dataset_id} not found in live table`)
return ResponseHandler.errorResponse({ statusCode: 404, message: "Record not found", errCode: httpStatus["404_NAME"] }, req, res)
}
const dateRange = getDateRange(req);
Expand All @@ -47,7 +47,7 @@ export const dataExhaust = async (req: Request, res: Response) => {
return ResponseHandler.errorResponse({ statusCode: 400, message: `Invalid date range! make sure your range cannot be more than ${config.cloud_config.maxQueryDateRange} days`, errCode: "BAD_REQUEST" }, req, res)
}

const resData: any = await getFromStorage(type, dateRange, datasetId);
const resData: any = await getFromStorage(type, dateRange, dataset_id);
if (_.isEmpty(resData.files)) {
logger.error("Date range provided does not have any backup files")
return ResponseHandler.errorResponse({ statusCode: 404, message: "Date range provided does not have any backup files", errCode: "NOT_FOUND" }, req, res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const apiId = "api.data.in";
const dataIn = async (req: Request, res: Response) => {

const requestBody = req.body;
const datasetId = req.params.datasetId.trim();
const datasetId = req.params.dataset_id.trim();

const isValidSchema = schemaValidation(requestBody, validationSchema)
if (!isValidSchema?.isValid) {
Expand Down
38 changes: 38 additions & 0 deletions api-service/src/controllers/DataMetrics/DataMetricsController.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Request, Response } from "express";
import _ from "lodash";
import { executeNativeQuery } from "../../connections/druidConnection";
import { ResponseHandler } from "../../helpers/ResponseHandler";
import vaidationSchema from "./DataMetricsValidationSchema.json"
import { schemaValidation } from "../../services/ValidationService";
import logger from "../../logger";
import { obsrvError } from "../../types/ObsrvError";
import axios from "axios";
import { config } from "../../configs/Config";

const getBaseUrl = (url: string) => {
if (_.startsWith(url, "/prom")) return config.query_api.prometheus.url + _.replace(url, "/prom", "")
}

const dataMetrics = async (req: Request, res: Response) => {
const isValidSchema = schemaValidation(req.body, vaidationSchema);
if (!isValidSchema?.isValid) {
logger.error({ message: isValidSchema?.message, code: "INVALID_QUERY" })
throw obsrvError("", "INVALID_QUERY", isValidSchema.message, "BAD_REQUEST", 400)
}
const { query } = req.body || {};
const endpoint = query.url;
if (_.startsWith(endpoint, "/prom")) {
query.url = getBaseUrl(endpoint)
const { url, method, headers = {}, body = {}, params = {}, ...rest } = query;
const apiResponse = await axios.request({ url, method, headers, params, data: body, ...rest })
const data = _.get(apiResponse, "data");
return res.json(data);
}
else {
const query = _.get(req, ["body", "query", "body", "query"]);
const response = await executeNativeQuery(query);
ResponseHandler.successResponse(req, res, { status: 200, data: _.get(response, "data") });
}
}

export default dataMetrics;
2 changes: 1 addition & 1 deletion api-service/src/controllers/DataOut/DataOutController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { executeNativeQuery, executeSqlQuery } from "../../connections/druidConn

export const apiId = "api.data.out";
const dataOut = async (req: Request, res: Response) => {
const datasetId = req.params?.datasetId;
const datasetId = req.params?.dataset_id;
const requestBody = req.body;
const msgid = _.get(req, "body.params.msgid");
const isValidSchema = schemaValidation(requestBody, validationSchema);
Expand Down
61 changes: 61 additions & 0 deletions api-service/src/controllers/DatasetMetrics/DatasetMetrics.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"type": "object",
"properties": {
"id": {
"type": "string",
"enum": [
"api.dataset.metrics"
]
},
"ver": {
"type": "string"
},
"ts": {
"type": "string"
},
"params": {
"type": "object",
"properties": {
"msgid": {
"type": "string"
}
},
"required": [
"msgid"
],
"additionalProperties": false
},
"request": {
"type": "object",
"properties": {
"dataset_id": {
"type": "string"
},
"category": {
"type": "array",
"items": {
"type": "string",
"enum": [
"data_freshness",
"data_observability",
"data_volume",
"data_lineage",
"connectors",
"data_quality"
]
},
"minItems": 1
},
"query_time_period":{
"type": "integer",
"minimum": 1
}
},
"required": [
"category",
"dataset_id"
]
}
},
"required": ["id", "ver", "ts", "params", "request"]
}
Original file line number Diff line number Diff line change
@@ -1,38 +1,81 @@
import { Request, Response } from "express";
import _ from "lodash";
import { executeNativeQuery } from "../../connections/druidConnection";
import * as _ from "lodash"
import { ResponseHandler } from "../../helpers/ResponseHandler";
import vaidationSchema from "./DatasetMetricsValidationSchema.json"
import { schemaValidation } from "../../services/ValidationService";
import dayjs from 'dayjs';
import logger from "../../logger";
import { obsrvError } from "../../types/ObsrvError";
import axios from "axios";
import { schemaValidation } from "../../services/ValidationService";
import validationSchema from "./DatasetMetrics.json";
import { config } from "../../configs/Config";
import { datasetService } from "../../services/DatasetService";
import { getConnectors, getDataFreshness, getDataLineage, getDataObservability, getDataQuality, getDataVolume } from "../../services/DatasetMetricsService";

const getBaseUrl = (url: string) => {
if (_.startsWith(url, "/prom")) return config.query_api.prometheus.url + _.replace(url, "/prom", "")
}

const apiId = "api.dataset.metrics";
const datasetMetrics = async (req: Request, res: Response) => {
const isValidSchema = schemaValidation(req.body, vaidationSchema);
const msgid = _.get(req, "body.params.msgid");
const requestBody = req.body;
const dataset_id = _.get(req, "body.request.dataset_id");
const timePeriod = _.get(req, "body.request.query_time_period") || config?.data_observability?.default_query_time_period;

const { category }: any = req.body.request;
const defaultThreshold = (typeof config?.data_observability?.default_freshness_threshold === 'number' ? config?.data_observability?.default_freshness_threshold : 5) * 60 * 1000; // 5 minutes in milliseconds
const dateFormat = 'YYYY-MM-DDTHH:mm:ss';
const endDate = dayjs().add(1, 'day').format(dateFormat);
const startDate = dayjs(endDate).subtract(timePeriod, 'day').format(dateFormat);
const intervals = `${startDate}/${endDate}`;
const isValidSchema = schemaValidation(requestBody, validationSchema);
const results = [];

if (!isValidSchema?.isValid) {
logger.error({ message: isValidSchema?.message, code: "INVALID_QUERY" })
throw obsrvError("", "INVALID_QUERY", isValidSchema.message, "BAD_REQUEST", 400)
logger.error({ apiId, datasetId: dataset_id, msgid, requestBody, message: isValidSchema?.message, code: "DATA_OUT_INVALID_INPUT" })
return ResponseHandler.errorResponse({ message: isValidSchema?.message, statusCode: 400, errCode: "BAD_REQUEST", code: "DATA_OUT_INVALID_INPUT" }, req, res);
}
const { query } = req.body || {};
const endpoint = query.url;
if (_.startsWith(endpoint, "/prom")) {
query.url = getBaseUrl(endpoint)
const { url, method, headers = {}, body = {}, params = {}, ...rest } = query;
const apiResponse = await axios.request({ url, method, headers, params, data: body, ...rest })
const data = _.get(apiResponse, "data");
return res.json(data);

const dataset = await datasetService.getDataset(dataset_id, ["id"], true)
if (!dataset) {
logger.error({ apiId, message: `Dataset with id ${dataset_id} not found in live table`, code: "DATASET_NOT_FOUND" })
return ResponseHandler.errorResponse({ message: `Dataset with id ${dataset_id} not found in live table`, code: "DATASET_NOT_FOUND", statusCode: 404, errCode: "NOT_FOUND" }, req, res);
}
else {
const query = _.get(req, ["body", "query", "body", "query"]);
const response = await executeNativeQuery(query);
ResponseHandler.successResponse(req, res, { status: 200, data: _.get(response, "data") });

try {
if (!category || category.includes("data_freshness")) {
const dataFreshnessResult = await getDataFreshness(dataset_id, intervals, defaultThreshold);
results.push(dataFreshnessResult);
}

if (!category || category.includes("data_observability")) {
const dataObservabilityResult = await getDataObservability(dataset_id, intervals);
results.push(dataObservabilityResult);
}

if (!category || category.includes("data_volume")) {
const dataVolumeResult = await getDataVolume(dataset_id, timePeriod, dateFormat);
results.push(dataVolumeResult);
}

if (!category || category.includes("data_lineage")) {
const dataLineageResult = await getDataLineage(dataset_id, intervals);
results.push(dataLineageResult);
}

if (!category || category.includes("connectors")) {
const connectorsResult = await getConnectors(dataset_id, intervals);
results.push(connectorsResult);
}

if (!category || category.includes("data_quality")) {
const connectorsResult = await getDataQuality(dataset_id, intervals);
results.push(connectorsResult);
}

logger.info({ apiId, msgid, requestBody, datasetId: dataset_id, message: "Metrics fetched successfully" })
return ResponseHandler.successResponse(req, res, { status: 200, data: results });

}
catch (error: any) {
logger.error({ apiId, msgid, requestBody: req?.body, datasetId: dataset_id, message: "Error while fetching metrics", code: "FAILED_TO_FETCH_METRICS", error });
return ResponseHandler.errorResponse({ message: "Error while fetching metrics", statusCode: 500, errCode: "FAILED", code: "FAILED_TO_FETCH_METRICS" }, req, res);
}

}

export default datasetMetrics;
Loading

0 comments on commit 88eec14

Please sign in to comment.