Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into metrics-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JeraldJF committed Jan 15, 2025
2 parents 3901131 + 3ddf7c7 commit 4015581
Show file tree
Hide file tree
Showing 26 changed files with 947 additions and 90 deletions.
1 change: 1 addition & 0 deletions api-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"busboy": "^1.6.0",
"compression": "^1.7.4",
"dateformat": "2.0.0",
"dayjs": "^1.11.13",
"express": "^5.0.0-beta.3",
"http-errors": "^2.0.0",
"http-status": "^1.5.3",
Expand Down
8 changes: 7 additions & 1 deletion api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export const config = {
"command_service_config": {
"host": process.env.command_service_host || "http://localhost",
"port": parseInt(process.env.command_service_port || "8000"),
"path": process.env.command_service_path || "/system/v1/dataset/command"
"paths": JSON.parse(process.env.command_service_paths || '{"dataset":"/system/v1/dataset/command","connector":"/connector/v1/register","analyzePII":"/system/data/v1/analyze/pii"}')
},
"flink_job_configs": {
"pipeline_merged_job_manager_url": process.env.pipeline_merged_job_manager_url || "http://localhost:8081",
Expand All @@ -118,5 +118,11 @@ export const config = {
"otel": {
"enable": process.env.otel_enable || "false",
"collector_endpoint": process.env.otel_collector_endpoint || "http://localhost:4318"
},
"storage_types": process.env.storage_types || '{"lake_house":true,"realtime_store":true}',
"data_observability": {
"default_freshness_threshold": process.env.default_freshness_threshold ? parseInt(process.env.default_freshness_threshold) : 5, // in minutes
"data_out_query_time_period": process.env.data_out_query_time_period || "1d",
"default_query_time_period": process.env.default_query_time_period ? parseInt(process.env.default_query_time_period) : 7, // in days
}
}
4 changes: 2 additions & 2 deletions api-service/src/configs/IngestionConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ export const rawIngestionSpecDefaults = {
"flattenSpec": [
{
"type": "path",
"expr": "$.obsrv_meta.source.connector",
"expr": "$.obsrv_meta.source.entry_source",
"name": "obsrv.meta.source.connector"
},
{
"type": "path",
"expr": "$.obsrv_meta.source.connectorInstance",
"expr": "$.obsrv_meta.source.id",
"name": "obsrv.meta.source.id"
}
],
Expand Down
18 changes: 13 additions & 5 deletions api-service/src/connections/commandServiceConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,29 @@ import { v4 } from "uuid";

const commandHost = _.get(config, ["command_service_config", "host"])
const commandPort = _.get(config, ["command_service_config", "port"])
const commandPath = _.get(config, ["command_service_config", "path"])
const commandPaths = _.get(config, ["command_service_config", "paths"])
const datasetPath = _.get(commandPaths, ["dataset"])
const connectorRegisterPath = _.get(commandPaths, ["connector"])
const analyzePIIPath = _.get(commandPaths, ["analyzePII"])

export const commandHttpService = axios.create({ baseURL: `${commandHost}:${commandPort}`, headers: { "Content-Type": "application/json" } });

export const executeCommand = async (id: string, command: string) => {
export const executeCommand = async (id: string, command: string, userToken: string) => {
const payload = {
"id": v4(),
"data": {
"dataset_id": id,
"command": command
}
}
return commandHttpService.post(commandPath, payload)
return commandHttpService.post(datasetPath, payload, { headers: { Authorization: userToken }})
}

export const registerConnector = async (requestBody: any) => {
return commandHttpService.post("/connector/v1/register", requestBody)
export const registerConnector = async (requestBody: any, userToken: string) => {
return commandHttpService.post(connectorRegisterPath, requestBody, { headers: { Authorization: userToken }})
}

export const detectPII = async (requestBody: any, userToken: string) => {
console.log(`analyzePIIPath : ${analyzePIIPath}`)
return commandHttpService.post(analyzePIIPath, requestBody, { headers: { Authorization: userToken }})
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const connectorRegisterController = async (req: Request, res: Response) => {
relative_path: uploadStreamResponse[0]
}
logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` })
const registryResponse = await registerConnector(payload);
const userToken = req.get('authorization') as string;
const registryResponse = await registerConnector(payload, userToken);
logger.info({ apiId, resmsgid, message: `Connector registered successfully` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: registryResponse?.data?.message } })
} catch (error: any) {
Expand Down Expand Up @@ -118,4 +119,4 @@ const extractFileNameFromPath = (filePath: string): string[] => {
return filePath.match(regex) || [];
};

export default connectorRegisterController;
export default connectorRegisterController;
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Request, Response } from "express";
import { ResponseHandler } from "../../helpers/ResponseHandler";
import httpStatus from "http-status";
import * as _ from "lodash";
import logger from "../../logger";
import { detectPII } from "../../connections/commandServiceConnection";

const code = "FAILED_TO_DETECT_PII";
export const dataAnalyzePII = async (req: Request, res: Response) => {
const apiId = _.get(req, 'id')
try {
const userToken = req.get('authorization') as string;
const piiSuggestionsResponse = await detectPII(_.get(req, ['body', 'data']), userToken);
logger.info({apiId , message: `Detected PII successfully` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: piiSuggestionsResponse?.data})
} catch (error: any) {
const errMessage = _.get(error, "response.data.detail")
logger.error(error, apiId, code);
let errorMessage = error;
const statusCode = _.get(error, "status")
if (!statusCode || statusCode == 500) {
errorMessage = { code, message: errMessage || "Failed to detect pii" }
}
ResponseHandler.errorResponse(errorMessage, req, res);
}
}
10 changes: 5 additions & 5 deletions api-service/src/controllers/DataExhaust/DataExhaustController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { cloudProvider } from "../../services/CloudServices";

export const dataExhaust = async (req: Request, res: Response) => {
const { params } = req;
const { datasetId } = params;
const { dataset_id } = params;
const { type }: any = req.query;
const momentFormat = "YYYY-MM-DD";

Expand All @@ -28,12 +28,12 @@ export const dataExhaust = async (req: Request, res: Response) => {
return resData || {};
}

if (type && config.cloud_config.exclude_exhaust_types.includes(datasetId)) {
if (type && config.cloud_config.exclude_exhaust_types.includes(dataset_id)) {
return ResponseHandler.errorResponse({ statusCode: 404, message: "Record not found", errCode: httpStatus["404_NAME"] }, req, res)
}
const datasetRecord = await verifyDatasetExists(datasetId);
const datasetRecord = await verifyDatasetExists(dataset_id);
if (datasetRecord === null) {
logger.error(`Dataset with ${datasetId} not found in live table`)
logger.error(`Dataset with ${dataset_id} not found in live table`)
return ResponseHandler.errorResponse({ statusCode: 404, message: "Record not found", errCode: httpStatus["404_NAME"] }, req, res)
}
const dateRange = getDateRange(req);
Expand All @@ -47,7 +47,7 @@ export const dataExhaust = async (req: Request, res: Response) => {
return ResponseHandler.errorResponse({ statusCode: 400, message: `Invalid date range! make sure your range cannot be more than ${config.cloud_config.maxQueryDateRange} days`, errCode: "BAD_REQUEST" }, req, res)
}

const resData: any = await getFromStorage(type, dateRange, datasetId);
const resData: any = await getFromStorage(type, dateRange, dataset_id);
if (_.isEmpty(resData.files)) {
logger.error("Date range provided does not have any backup files")
return ResponseHandler.errorResponse({ statusCode: 404, message: "Date range provided does not have any backup files", errCode: "NOT_FOUND" }, req, res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const apiId = "api.data.in";
const dataIn = async (req: Request, res: Response) => {

const requestBody = req.body;
const datasetId = req.params.datasetId.trim();
const datasetId = req.params.dataset_id.trim();

const isValidSchema = schemaValidation(requestBody, validationSchema)
if (!isValidSchema?.isValid) {
Expand All @@ -39,34 +39,34 @@ const dataIn = async (req: Request, res: Response) => {
logger.error({ apiId, message: `Dataset with id ${datasetId} not found in live table`, code: "DATASET_NOT_FOUND" })
return ResponseHandler.errorResponse(errorObject.datasetNotFound, req, res);
}
const { entry_topic, dataset_config, api_version } = dataset
const { entry_topic, dataset_config, extraction_config, api_version } = dataset
const entryTopic = api_version !== "v2" ? _.get(dataset_config, "entry_topic") : entry_topic
if (!entryTopic) {
logger.error({ apiId, message: "Entry topic not found", code: "TOPIC_NOT_FOUND" })
return ResponseHandler.errorResponse(errorObject.topicNotFound, req, res);
}
await send(addMetadataToEvents(datasetId, requestBody), entryTopic)
await send(addMetadataToEvents(datasetId, requestBody, extraction_config), entryTopic)
ResponseHandler.successResponse(req, res, { status: 200, data: { message: "Data ingested successfully" } });

}

const addMetadataToEvents = (datasetId: string, payload: any) => {
const addMetadataToEvents = (datasetId: string, payload: any, extraction_config: any) => {
const validData = _.get(payload, "data");
const now = Date.now();
const mid = _.get(payload, "params.msgid");
const source = { id: "api.data.in", version: config?.version, entry_source: "api" };
const obsrvMeta = { syncts: now, flags: {}, timespans: {}, error: {}, source: source };
if (Array.isArray(validData)) {
const payloadRef = validData.map((event: any) => {
const payload = {
event,
const extraction_key: string = _.get(extraction_config, "extraction_key", 'events');
const dedup_key: string = _.get(extraction_config, "dedup_config.dedup_key", 'id');
const payload: any = {
"obsrv_meta": obsrvMeta,
"dataset": datasetId,
"msgid": mid
}
};
payload[extraction_key] = validData;
payload[dedup_key] = mid
return payload;
})
return payloadRef;
}
else {
return ({
Expand Down
38 changes: 38 additions & 0 deletions api-service/src/controllers/DataMetrics/DataMetricsController.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Request, Response } from "express";
import _ from "lodash";
import { executeNativeQuery } from "../../connections/druidConnection";
import { ResponseHandler } from "../../helpers/ResponseHandler";
import vaidationSchema from "./DataMetricsValidationSchema.json"
import { schemaValidation } from "../../services/ValidationService";
import logger from "../../logger";
import { obsrvError } from "../../types/ObsrvError";
import axios from "axios";
import { config } from "../../configs/Config";

const getBaseUrl = (url: string) => {
if (_.startsWith(url, "/prom")) return config.query_api.prometheus.url + _.replace(url, "/prom", "")
}

const dataMetrics = async (req: Request, res: Response) => {
const isValidSchema = schemaValidation(req.body, vaidationSchema);
if (!isValidSchema?.isValid) {
logger.error({ message: isValidSchema?.message, code: "INVALID_QUERY" })
throw obsrvError("", "INVALID_QUERY", isValidSchema.message, "BAD_REQUEST", 400)
}
const { query } = req.body || {};
const endpoint = query.url;
if (_.startsWith(endpoint, "/prom")) {
query.url = getBaseUrl(endpoint)
const { url, method, headers = {}, body = {}, params = {}, ...rest } = query;
const apiResponse = await axios.request({ url, method, headers, params, data: body, ...rest })
const data = _.get(apiResponse, "data");
return res.json(data);
}
else {
const query = _.get(req, ["body", "query", "body", "query"]);
const response = await executeNativeQuery(query);
ResponseHandler.successResponse(req, res, { status: 200, data: _.get(response, "data") });
}
}

export default dataMetrics;
2 changes: 1 addition & 1 deletion api-service/src/controllers/DataOut/DataOutController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { executeNativeQuery, executeSqlQuery } from "../../connections/druidConn

export const apiId = "api.data.out";
const dataOut = async (req: Request, res: Response) => {
const datasetId = req.params?.datasetId;
const datasetId = req.params?.dataset_id;
const requestBody = req.body;
const msgid = _.get(req, "body.params.msgid");
const isValidSchema = schemaValidation(requestBody, validationSchema);
Expand Down
3 changes: 2 additions & 1 deletion api-service/src/controllers/DatasetCopy/DatasetCopy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ResponseHandler } from "../../helpers/ResponseHandler";
import * as _ from "lodash";
import { schemaValidation } from "../../services/ValidationService";
import validationSchema from "./RequestValidationSchema.json";
import { datasetService, getLiveDatasetConfigs } from "../../services/DatasetService";
import { datasetService, getLiveDatasetConfigs, validateStorageSupport } from "../../services/DatasetService";
import { updateRecords } from "./DatasetCopyHelper";
import { obsrvError } from "../../types/ObsrvError";

Expand Down Expand Up @@ -40,6 +40,7 @@ const datasetCopy = async (req: Request, res: Response) => {
validateRequest(req);
const newDatasetId = _.get(req, "body.request.destination.datasetId");
const dataset = await fetchDataset(req);
validateStorageSupport(dataset);
const userID = (req as any)?.userID;
_.set(dataset, "created_by", userID);
_.set(dataset, "updated_by", userID);
Expand Down
3 changes: 2 additions & 1 deletion api-service/src/controllers/DatasetCreate/DatasetCreate.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import _ from "lodash";
import { Request, Response } from "express";
import httpStatus from "http-status";
import { datasetService } from "../../services/DatasetService";
import { datasetService, validateStorageSupport } from "../../services/DatasetService";
import DatasetCreate from "./DatasetCreateValidationSchema.json";
import { schemaValidation } from "../../services/ValidationService";
import { ResponseHandler } from "../../helpers/ResponseHandler";
Expand All @@ -28,6 +28,7 @@ const validateRequest = async (req: Request) => {
throw obsrvError(datasetId, "DATASET_DUPLICATE_DENORM_KEY", "Duplicate denorm output fields found.", "BAD_REQUEST", 400, undefined, {duplicateKeys: duplicateDenormKeys})
}

validateStorageSupport(_.get(req, ["body", "request"]))
}

const datasetCreate = async (req: Request, res: Response) => {
Expand Down
3 changes: 2 additions & 1 deletion api-service/src/controllers/DatasetImport/DatasetImport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Request, Response } from "express";
import { ResponseHandler } from "../../helpers/ResponseHandler";
import httpStatus from "http-status";
import _ from "lodash";
import { datasetService } from "../../services/DatasetService";
import { datasetService, validateStorageSupport } from "../../services/DatasetService";
import { datasetImportValidation, migrateExportedDatasetV1 } from "./DatasetImportHelper";
import { obsrvError } from "../../types/ObsrvError";

Expand All @@ -21,6 +21,7 @@ const datasetImport = async (req: Request, res: Response) => {
const { updatedDataset, ignoredFields } = await datasetImportValidation({ ...requestBody, "request": datasetPayload })
const { successMsg, partialIgnored } = getResponseData(ignoredFields)

validateStorageSupport(updatedDataset);
const dataset = await importDataset(updatedDataset, overwrite, userID);
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: successMsg, data: dataset, ...(!_.isEmpty(partialIgnored) && { ignoredFields: partialIgnored }) } });
}
Expand Down
61 changes: 61 additions & 0 deletions api-service/src/controllers/DatasetMetrics/DatasetMetrics.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"type": "object",
"properties": {
"id": {
"type": "string",
"enum": [
"api.dataset.metrics"
]
},
"ver": {
"type": "string"
},
"ts": {
"type": "string"
},
"params": {
"type": "object",
"properties": {
"msgid": {
"type": "string"
}
},
"required": [
"msgid"
],
"additionalProperties": false
},
"request": {
"type": "object",
"properties": {
"dataset_id": {
"type": "string"
},
"category": {
"type": "array",
"items": {
"type": "string",
"enum": [
"data_freshness",
"data_observability",
"data_volume",
"data_lineage",
"connectors",
"data_quality"
]
},
"minItems": 1
},
"query_time_period":{
"type": "integer",
"minimum": 1
}
},
"required": [
"category",
"dataset_id"
]
}
},
"required": ["id", "ver", "ts", "params", "request"]
}
Loading

0 comments on commit 4015581

Please sign in to comment.