Skip to content

Commit

Permalink
Release 1.0.6-GA (#163)
Browse files Browse the repository at this point in the history
* default config for dataset_config (#99)

* Sanketika-Obsrv/issue-tracker#14: feat: validate sql query (#100)

* Issue #304 datasource configuration changes to support Hudi schema. (#111)

* Sanketika-obsrv/issue-tracker#304: modified mandatory fields for datasource create request (#112)

* Configure the vuln scan for obsrv-api-service (#117)

* Revert "Sanketika-obsrv/issue-tracker#304: modified mandatory fields for data…" (#121)

This reverts commit 0789f3f.

* Revert "Issue #304 datasource configuration changes to support Hudi schema. (…" (#122)

This reverts commit ef918f6.

* Sanketika-Obsrv/issue-tracker#182: added event validation service against schema for particular dataset

* Sanketika-Obsrv/issue-tracker#182: added table name from config

* Sanketika-Obsrv/issue-tracker#182: added validation status in response handler

* Sanketika-Obsrv/issue-tracker#180 feat: API changes to support hudi changes in obsrv (#160)

* Sanketika-obsrv/issue-tracker#304: modified name for lakehouse spec

* #0000 feat: added type column

* #273 feat: API changes to support lakehouse queries

* Sanketika-obsrv/issue-tracker#273 feat: modified datasource apis to default type column and fixed sql query api

* Sanketika-obsrv/issue-tracker#273 feat: updated postman collection and swagger doc

* Sanketika-obsrv/issue-tracker#273 feat: updated testcases for latest changes

* Sanketika-obsrv/issue-tracker#273 feat: removed unnecessary log statements

* Sanketika-obsrv/issue-tracker#273 feat: removed unnecessary flatmapping and await

* Sanketika-obsrv/issue-tracker#273 feat: replaced special characters with underscore in table name

* Sanketika-Obsrv/issue-tracker#180 fix: removed unused constants and table checks in hudi schema

* Sanketika-Obsrv/issue-tracker#180 fix: removed unused constants and table checks in hudi schema

* Release  1.0.6-GA merge fix (#164)

* Release 1.0.4-GA (#105)

* default config for dataset_config (#99)

* Sanketika-Obsrv/issue-tracker#14: feat: validate sql query (#100)

---------

Co-authored-by: harishkumar gangula <[email protected]>
Co-authored-by: Shreyas Bhaktharam <[email protected]>

* Release 1.0.5-GA (#147)

---------

Co-authored-by: Manjunath Davanam <[email protected]>
Co-authored-by: Shreyas Bhaktharam <[email protected]>
Co-authored-by: Ravi Mula <[email protected]>

---------

Co-authored-by: harishkumar gangula <[email protected]>
Co-authored-by: Shreyas Bhaktharam <[email protected]>
Co-authored-by: GayathriSrividya <[email protected]>
Co-authored-by: divyagovindaiah <[email protected]>
Co-authored-by: yashashk <[email protected]>
Co-authored-by: Manjunath Davanam <[email protected]>
  • Loading branch information
7 people authored May 27, 2024
1 parent 9d5fbd8 commit bfafddf
Show file tree
Hide file tree
Showing 26 changed files with 2,006 additions and 761 deletions.
29 changes: 23 additions & 6 deletions api-service/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"multiparty": "4.2.1",
"pg": "^8.8.0",
"prom-client": "^14.2.0",
"trino-client": "^0.2.2",
"uuid": "3.1.0",
"winston": "~2.4.3",
"winston-daily-rotate-file": "~3.2.1"
Expand Down

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@ export const config = {
"version": "1.0",
"query_api": {
"druid": {
"queryType": "realtime",
"host": process.env.druid_host || "http://localhost",
"port": process.env.druid_port || 8888,
"sql_query_path": "/druid/v2/sql/",
"native_query_path": "/druid/v2",
"list_datasources_path": "/druid/v2/datasources",
"submit_ingestion": "druid/indexer/v1/supervisor"
},
"lakehouse": {
"queryType": "datalake",
"host": process.env.lakehouse_host || "http://localhost",
"port": process.env.lakehouse_port || 8080,
"catalog": process.env.lakehouse_catalog || "hudi_connector",
"schema": process.env.lakehouse_schema || "obsrv",
"default_user": process.env.lakehouse_default_user || "admin"
}
},
"db_connector_config": {
Expand Down Expand Up @@ -58,6 +67,10 @@ export const config = {
normalDataset: "dataset",
masterDataset: "master-dataset"
},
"datasource_storage_types": {
druid: "druid",
datalake: "datalake"
},
"redis_config": {
"redis_host": process.env.redis_host || 'localhost',
"redis_port": process.env.redis_port || 6379
Expand Down
6 changes: 6 additions & 0 deletions api-service/src/configs/RoutesConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ export const routesConfig = {
api_id: "api.health",
method: "get",
path: "/health"
},
schema_validator:{
api_id: "api.schema.validate",
method: "post",
path: "/data/v1/schema/validate",
validation_schema: "SchemaValidatorReq.json"
}
}

10 changes: 5 additions & 5 deletions api-service/src/connectors/DbConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class DbConnector implements IConnector {

public async insertRecord(table: string, fields: any) {
await this.pool.transaction(async (dbTransaction) => {
await this.submit_ingestion(_.get(fields, 'ingestion_spec'), table)
await this.submit_ingestion(_.get(fields, 'ingestion_spec'), table, _.get(fields, "type"))
await dbTransaction(table).insert(fields).on('query-error', (error: any) => {
this.log_error(OP_TYPES.INSERT, error, table, fields);
throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code}
Expand Down Expand Up @@ -89,7 +89,7 @@ export class DbConnector implements IConnector {
const existingRecord = await this.pool(table).select().where(filters).first()
if (!_.isUndefined(existingRecord)) {
await this.pool.transaction(async (dbTransaction) => {
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table)
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table, _.get(fields, "type"))
await dbTransaction(table).where(filters).update(schemaMerger.mergeSchema(existingRecord, values)).on('query-error', (error: any) => {
this.log_error(OP_TYPES.UPSERT, error, table, values);
throw {...constants.FAILED_RECORD_UPDATE, "errCode": error.code}
Expand All @@ -100,7 +100,7 @@ export class DbConnector implements IConnector {
})
} else {
await this.pool.transaction(async (dbTransaction) => {
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table)
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table, _.get(fields, "type"))
await dbTransaction(table).insert(values).on('query-error', (error: any) => {
this.log_error(OP_TYPES.UPSERT, error, table, values);
throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code}
Expand Down Expand Up @@ -144,8 +144,8 @@ export class DbConnector implements IConnector {
})
}

private async submit_ingestion(ingestion_spec: Record<string, any>, table: string) {
if (appConfig.table_names.datasources === table) {
private async submit_ingestion(ingestion_spec: Record<string, any>, table: string, storage_type: string) {
if (appConfig.table_names.datasources === table && storage_type === appConfig.datasource_storage_types.druid) {
return await wrapperService.submitIngestion(ingestion_spec)
.catch((error: any) => {
console.error(constants.INGESTION_FAILED_ON_SAVE)
Expand Down
4 changes: 3 additions & 1 deletion api-service/src/helpers/Datasources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export class Datasources {
private id: string
private dataset_id: string
private ingestion_spec: object
private type: string
private datasource: string
private datasource_ref: string
private retention_period: object
Expand All @@ -30,6 +31,7 @@ export class Datasources {
}
this.dataset_id = payload.dataset_id
this.ingestion_spec = payload.ingestion_spec
this.type = payload.type
this.datasource = payload.datasource
this.datasource_ref = payload.datasource_ref
this.retention_period = payload.retentionPeriod
Expand All @@ -44,7 +46,7 @@ export class Datasources {
this.metadata = payload.metadata
}
public getValues() {
return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, datasource_ref: this.datasource_ref, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, version: this.version, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date, metadata: this.metadata }), { "updated_date": new Date })
return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, type: this.type, datasource: this.datasource, datasource_ref: this.datasource_ref, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, version: this.version, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date, metadata: this.metadata }), { "updated_date": new Date })
}

public setValues() {
Expand Down
51 changes: 51 additions & 0 deletions api-service/src/helpers/LakehouseUtil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Trino, BasicAuth } from 'trino-client';
import _ from 'lodash';
import { config } from '../configs/Config';

const trino: Trino = Trino.create({
server: `${config.query_api.lakehouse.host}:${config.query_api.lakehouse.port}`,
catalog: config.query_api.lakehouse.catalog,
schema: config.query_api.lakehouse.schema,
auth: new BasicAuth(config.query_api.lakehouse.default_user),
});


const getFormattedData = (data: any[], columnData: any[]) => {
const formattedData: any[] = [];
for (let i = 0; i < data.length; i++) {
const row = data[ i ];
const jsonRow: any = {};
for (let j = 0; j < row.length; j++) {
// assign column only if doesn't start with _hoodie_
const colName = columnData[ j ];
if (_.startsWith(colName, "_hoodie_")) {
continue;
}
jsonRow[ colName ] = row[ j ];
}
formattedData.push(jsonRow);
}
return formattedData;
}


export const executeLakehouseQuery = async (query: string) => {
const iter = await trino.query(query);
let queryResult: any = []
for await (let data of iter) {
if(!_.isEmpty(data.error)){
throw {
status: 400,
message: data.error.message.replace(/line.*: /, ''),
code: "BAD_REQUEST"
}
}
queryResult = [ ...queryResult, ...(data?.data?.length ? data.data : []) ]
}
let columns = await iter.map((r: any) => r.columns ?? []).next();
let finalColumns = columns.value.map((column: any) => {
return column.name;
});
const formattedData = getFormattedData(queryResult, finalColumns);
return formattedData
}
12 changes: 12 additions & 0 deletions api-service/src/helpers/ValidationService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import Ajv from "ajv";
const validator = new Ajv({ strict: false });

export const schemaValidation = (payload: Record<string, any>, schema: Record<string, any>): Record<string, any> => {
const isValid = validator.validate(schema, payload)
if (!isValid) {
const error: any = validator.errors;
const errorMessage = error[0]?.schemaPath?.replace("/", "") + " " + error[0]?.message || "Invalid Request Body";
return { isValid, message: errorMessage }
}
return { isValid, message: "success" }
}
7 changes: 6 additions & 1 deletion api-service/src/resources/Constants.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,10 @@
"code": "BAD_REQUEST"
},
"INGESTION_SUBMITTED": "ingestion spec has been submitted successfully",
"INGESTION_FAILED_ON_SAVE": "Failed to submit Ingestion Spec, record is not saved"
"INGESTION_FAILED_ON_SAVE": "Failed to submit Ingestion Spec, record is not saved",
"FAILED_EXECUTING_QUERY": {
"status": 500,
"message": "Something went wrong while executing the query. Please try again later.",
"code": "INTERNAL_SERVER_ERROR"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
"metadata": {
"aggregated": false,
"granularity": "day"
}
},
"type": "druid"
}
Loading

0 comments on commit bfafddf

Please sign in to comment.