Skip to content

Commit

Permalink
develop into release 1.3.0 (#61)
Browse files Browse the repository at this point in the history
* Issue #4 feat: enhance the ingest event to add obsrv and source meta

* #90 fix: Resolve API Issues (#52)

* #90 fix: format error messages, code cleanup

* #90 fix: validate extraction config during ingest

* #90 fix: add test cases for extraction key validation

* #90 fix: move error handler to helpers for standard handling

* #90 fix: update telemetry audit event set function (#58)

* Build and deployment (#57)

* build api image

* Update build_and_deploy.yaml

* Update build_and_deploy.yaml

* Update build_and_deploy.yaml

* Update build_and_deploy.yaml

* build api image

* Update build_and_deploy.yaml

* build api image

* build api image

* build api image

* build and deploy api image

* build and deploy api image

* build and deploy api image

* build and deploy of api service

* build and deploy of api service

* modify docker file

* modify docker file

* modify docker file

* update build and deployment

* update build and deployment

* Update build_and_deploy.yaml

* api service build and deployment

* api service build and deployment

* feat: obsrv api service build and deployment github actions configuration

* #0 fix: update the tag condition in actions

---------

Co-authored-by: ManojCKrishna <[email protected]>
Co-authored-by: Manoj Krishna <[email protected]>
Co-authored-by: Harish Kumar Gangula <[email protected]>
Co-authored-by: Manjunath Davanam <[email protected]>
Co-authored-by: Praveen <[email protected]>

* #0 fix: add azure exhaust support (#44)

* #0 fix: add azure exhaust support

* #0 fix: update azure exhaust service without async

* #126 Feat: Add Querying on Aggregated datasources (#60)

* #126 feat: enable querying on aggregated datasources

* #126 feat: Fix validation issues and update routes for aggregate queries

* #126 fix: remove unused methods

* #126 fix: remove aggregates from endpoint

* #126 fix: add validation for granularity options

* #126 fix: Remove unused routes and validators

* #126 feat: add new property to datasources structure and update rollup querying

---------

Co-authored-by: shiva-rakshith <[email protected]>
Co-authored-by: Praveen Veleneni <[email protected]>
Co-authored-by: Manoj Krishna <[email protected]>
Co-authored-by: ManojCKrishna <[email protected]>
Co-authored-by: Manoj Krishna <[email protected]>
Co-authored-by: Harish Kumar Gangula <[email protected]>
Co-authored-by: Manjunath Davanam <[email protected]>
  • Loading branch information
8 people authored Dec 6, 2023
1 parent aa48b45 commit f0b329e
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 38 deletions.
4 changes: 2 additions & 2 deletions api-service/src/configs/RoutesConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export const routesConfig = {
native_query_with_params: {
api_id: "obsrv.native.query",
method: "post",
path: "/obsrv/v1/data/query/:datasourceId",
path: "/obsrv/v1/data/query/:datasetId",
validation_schema: "QueryRequest.json",
},
sql_query: {
Expand All @@ -25,7 +25,7 @@ export const routesConfig = {
sql_query_with_params: {
api_id: "obsrv.sql.query",
method: "post",
path: "/obsrv/v1/data/sql-query/:datasourceId",
path: "/obsrv/v1/data/sql-query/:datasetId",
validation_schema: "QueryRequest.json",
},
},
Expand Down
5 changes: 4 additions & 1 deletion api-service/src/helpers/Datasources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export class Datasources {
private updated_by: string
private version: string
private published_date: Date
private metadata: object

constructor(payload: any) {
if (payload.id) {
this.id = payload.id
Expand All @@ -38,9 +40,10 @@ export class Datasources {
this.created_by = payload.created_by
this.updated_by = payload.updated_by
this.published_date = payload.published_date
this.metadata = payload.metadata
}
public getValues() {
return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, datasource_ref: this.datasource_ref, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, version: this.version, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date }), { "updated_date": new Date })
return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, datasource_ref: this.datasource_ref, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, version: this.version, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date, metadata: this.metadata }), { "updated_date": new Date })
}

public setValues() {
Expand Down
6 changes: 3 additions & 3 deletions api-service/src/lib/services/TelemetryService.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const uuidv5 = require("uuid/v5"),
const uuidv1 = require("uuid/v1"),
// request = require("request"),
DispatcherClass = require("../dispatcher/dispatcher").Dispatcher;
const axios = require('axios').default;
Expand All @@ -18,11 +18,11 @@ class TelemetryService {
message.did = req.get("x-device-id");
message.channel = req.get("x-channel-id");
message.pid = req.get("x-app-id");
if (!message.mid) message.mid = uuidv5();
if (!message.mid) message.mid = uuidv1();
message.syncts = new Date().getTime();

// add obsrv meta
const source = {meta: {id: "", connector_type: "api", version: config.version, entry_source: "api"}, trace_id: uuidv5()};
const source = {meta: {id: "", connector_type: "api", version: config.version, entry_source: "api"}, trace_id: uuidv1()};
const obsrvMeta = {syncts: new Date().getTime(), processingStartTime: new Date().getTime(), flags: {}, timespans: {}, error: {}, source: source};
message.obsrv_meta = obsrvMeta;

Expand Down
1 change: 1 addition & 0 deletions api-service/src/models/DatasetModels.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ValidationStatus } from "./ValidationModels";
import { Request, Response } from "express";

export interface ISchemaGenerator {
generate: ((sample: Map<string, any>) => any) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,9 @@
},
"status": "ACTIVE",
"created_by": "SYSTEM",
"updated_by": "SYSTEM"
}
"updated_by": "SYSTEM",
"metadata": {
"aggregated": false,
"granularity": "day"
}
}
13 changes: 12 additions & 1 deletion api-service/src/resources/schemas/DatasourceSaveReq.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,22 @@
},
"published_date": {
"type": "string"
},
"metadata": {
"type": "object",
"properties": {
"aggregated": {
"type": "boolean"
},
"granularity": {
"type": "string"
}
}
}
},
"required": [
"dataset_id",
"datasource",
"published_date"
]
}
}
13 changes: 12 additions & 1 deletion api-service/src/resources/schemas/DatasourceUpdateReq.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,21 @@
},
"published_date": {
"type": "string"
},
"metadata": {
"type": "object",
"properties": {
"aggregated": {
"type": "boolean"
},
"granularity": {
"type": "string"
}
}
}
},
"required": [
"dataset_id",
"datasource"
]
}
}
13 changes: 6 additions & 7 deletions api-service/src/resources/schemas/QueryRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,24 @@
"properties": {
"dataSource": {
"type": "string"
},
"granularity": {
"type": "string",
"enum": ["five_minute", "ten_minute", "fifteen_minute", "thirty_minute", "hour", "six_hour", "eight_hour", "day", "week", "month", "quarter", "year"]
}
},
"required": [
"dataSource"
]
}
}
},
"oneOf": [
{
"required": [
"context",
"querySql"
]
},
{
"required": [
"context",
"query"
]
}
]
}
}
4 changes: 2 additions & 2 deletions api-service/src/routes/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ export const globalCache: any = new Map()
export const router = express.Router()
dbConnector.init()
/** Query API(s) */
router.post([`${routesConfig.query.native_query.path}`, `${routesConfig.query.native_query_with_params.path}`], ResponseHandler.setApiId(routesConfig.query.native_query.api_id), onRequest({ entity: promEntities.data_out }), validationService.validateRequestBody, validationService.validateQuery, queryService.executeNativeQuery);
router.post([`${routesConfig.query.sql_query.path}`, `${routesConfig.query.sql_query_with_params.path}`], ResponseHandler.setApiId(routesConfig.query.sql_query.api_id), onRequest({ entity: promEntities.data_out }), validationService.validateRequestBody, validationService.validateQuery, queryService.executeSqlQuery);
router.post([`${routesConfig.query.native_query.path}`, `${routesConfig.query.native_query_with_params.path}`,], ResponseHandler.setApiId(routesConfig.query.native_query.api_id), onRequest({ entity: promEntities.data_out }), validationService.validateRequestBody, validationService.validateQuery, queryService.executeNativeQuery);
router.post([`${routesConfig.query.sql_query.path}`, `${routesConfig.query.sql_query_with_params.path}`,], ResponseHandler.setApiId(routesConfig.query.sql_query.api_id), onRequest({ entity: promEntities.data_out }), validationService.validateRequestBody, validationService.validateQuery, queryService.executeSqlQuery);

/** Ingestor API */
router.post(`${routesConfig.data_ingest.path}`, ResponseHandler.setApiId(routesConfig.data_ingest.api_id), onRequest({ entity: promEntities.data_in }), validationService.validateRequestBody, ingestorService.create);
Expand Down
6 changes: 3 additions & 3 deletions api-service/src/test/Fixtures.ts

Large diffs are not rendered by default.

54 changes: 38 additions & 16 deletions api-service/src/validators/QueryValidator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ export class QueryValidator implements IValidator {
this.httpConnector = new HTTPConnector(`${config.query_api.druid.host}:${config.query_api.druid.port}`).connect()
}
public async validate(data: any, id: string): Promise<ValidationStatus> {
let validationStatus
let datasource
let shouldSkip
let validationStatus, dataSource, shouldSkip;
switch (id) {
case routesConfig.query.native_query.api_id:
validationStatus = await this.validateNativeQuery(data)
datasource = this.getDataSource(data)
shouldSkip = _.includes(config.exclude_datasource_validation, datasource);
return validationStatus.isValid ? (shouldSkip ? validationStatus : this.setDatasourceRef(data)) : validationStatus
dataSource = this.getDataSource(data)
shouldSkip = _.includes(config.exclude_datasource_validation, dataSource);
return validationStatus.isValid ? (shouldSkip ? validationStatus : this.setDatasourceRef(dataSource, data)) : validationStatus
case routesConfig.query.sql_query.api_id:
validationStatus = await this.validateSqlQuery(data)
datasource = this.getDataSource(data)
shouldSkip = _.includes(config.exclude_datasource_validation, datasource);
return validationStatus.isValid ? (shouldSkip ? validationStatus : this.setDatasourceRef(data)) : validationStatus
dataSource = this.getDataSource(data)
shouldSkip = _.includes(config.exclude_datasource_validation, dataSource);
return validationStatus.isValid ? (shouldSkip ? validationStatus : this.setDatasourceRef(dataSource, data)) : validationStatus
default:
return <ValidationStatus>{ isValid: false }
}
Expand All @@ -44,13 +42,21 @@ export class QueryValidator implements IValidator {
let queryObj: IQuery = data;
this.setQueryLimits(data, this.limits.common);
let dataSourceLimits = this.getDataSourceLimits(this.getDataSource(data));
return (!_.isEmpty(dataSourceLimits)) ? this.validateQueryRules(queryObj, dataSourceLimits.queryRules[queryObj.query.queryType as keyof IQueryTypeRules]) : { isValid: true }
try {
return (!_.isEmpty(dataSourceLimits)) ? this.validateQueryRules(queryObj, dataSourceLimits.queryRules[queryObj.query.queryType as keyof IQueryTypeRules]) : { isValid: true }
} catch (error: any) {
return { isValid: false, message: error.message || "error ocuured while validating native query", code: error.code || httpStatus[ "400_NAME" ] };
}
}

private validateSqlQuery(data: any): ValidationStatus {
this.setQueryLimits(data, this.limits.common);
let dataSourceLimits = this.getDataSourceLimits(this.getDataSource(data));
return (!_.isEmpty(dataSourceLimits)) ? this.validateQueryRules(data, dataSourceLimits.queryRules.scan) : { isValid: true };
try {
return (!_.isEmpty(dataSourceLimits)) ? this.validateQueryRules(data, dataSourceLimits.queryRules.scan) : { isValid: true };
} catch (error: any) {
return { isValid: false, message: error.message || "error ocuured while validating native query", code: error.code || httpStatus[ "400_NAME" ] };
}
}

private validateQueryRules(queryPayload: IQuery, limits: IRules): ValidationStatus {
Expand Down Expand Up @@ -140,10 +146,10 @@ export class QueryValidator implements IValidator {
}
return
}
public async setDatasourceRef(payload: any): Promise<ValidationStatus> {
public async setDatasourceRef(dataSource: string, payload: any): Promise<ValidationStatus> {
try {
let dataSource = this.getDataSource(payload)
let dataSourceRef = await this.getDataSourceRef(dataSource)
const granularity = _.get(payload, 'context.granularity')
let dataSourceRef = await this.getDataSourceRef(dataSource, granularity);
await this.validateDatasource(dataSourceRef)
if (payload.querySql) {
payload.querySql.query = payload.querySql.query.replace(dataSource, dataSourceRef)
Expand All @@ -158,8 +164,24 @@ export class QueryValidator implements IValidator {
}
}

public async getDataSourceRef(datasource: string): Promise<string> {
const record: any = await dbConnector.readRecords("datasources", { "filters": { "dataset_id": datasource } })
public async getDataSourceRef(datasource: string, granularity: string | undefined): Promise<string> {
const records: any = await dbConnector.readRecords("datasources", { "filters": { "dataset_id": datasource } })
const record = records.filter((record: any) => {
const aggregatedRecord = _.get(record, "metadata.aggregated")
if(granularity)
return aggregatedRecord && _.get(record, "metadata.granularity") === granularity;
else
return !aggregatedRecord
});

if (record.length == 0) {
const error = { ...constants.INVALID_DATASOURCE }
error.message = error.message.replace('${datasource}', datasource)
if (granularity !== undefined) {
error.message = `Aggregate ${error.message}`
}
throw error
}
return record[0].datasource_ref
}

Expand Down
6 changes: 6 additions & 0 deletions api-service/swagger-doc/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ components:
type: string
published_date:
type: string
metadata:
type: object

DataIngest:
description: datasetId in request params is required
Expand Down Expand Up @@ -2655,6 +2657,10 @@ components:
retention_period: {}
archival_policy: {}
purge_policy: {}
metadata: {
"aggregated": false,
"granularity": "day"
}
backup_config:
enabled: true
status: ACTIVE
Expand Down

0 comments on commit f0b329e

Please sign in to comment.