Skip to content

Commit

Permalink
Merge pull request #276 from Sanketika-Obsrv/hudi-spec-fix
Browse files Browse the repository at this point in the history
#OBS-I335: dataset update fix
  • Loading branch information
SanthoshVasabhaktula authored Nov 13, 2024
2 parents 812ffbe + 6d1cb44 commit 25ec1f0
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@
"default": true
}
},
"required": ["$schema", "type", "properties", "additionalProperties"],
"additionalProperties": false
"required": ["type", "properties"],
"additionalProperties": true
},
"dataset_config": {
"type": "object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
"type",
"properties"
],
"additionalProperties": false
"additionalProperties": true
},
"denorm_config": {
"type": "object",
Expand Down
128 changes: 74 additions & 54 deletions api-service/src/controllers/DatasetUpdate/DatasetUpdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ const validateDataset = (dataset: Record<string, any> | null, req: Request) => {
if (dataset) {
if (dataset.api_version !== "v2") {
throw obsrvError(datasetId, "DATASET_API_VERSION_MISMATCH", "Draft dataset api version is not v2. Perform a read api call with mode=edit to migrate the dataset", "NOT_FOUND", 404)
}
if(dataset.version_key !== versionKey) {
}
if (dataset.version_key !== versionKey) {
throw obsrvError(datasetId, "DATASET_OUTDATED", "The dataset is outdated. Please try to fetch latest changes of the dataset and perform the updates", "CONFLICT", 409)
}
if(!_.includes([DatasetStatus.Draft, DatasetStatus.ReadyToPublish], dataset.status)) {
if (!_.includes([DatasetStatus.Draft, DatasetStatus.ReadyToPublish], dataset.status)) {
throw obsrvError(datasetId, "DATASET_NOT_IN_DRAFT_STATE_TO_UPDATE", "Dataset cannot be updated as it is not in draft state", "BAD_REQUEST", 400)
}
} else {
Expand All @@ -52,53 +52,65 @@ const validateDataset = (dataset: Record<string, any> | null, req: Request) => {
}

const datasetUpdate = async (req: Request, res: Response) => {

await validateRequest(req)
const datasetReq = req.body.request;
const datasetModel = await datasetService.getDraftDataset(datasetReq.dataset_id)
validateDataset(datasetModel, req)

const draftDataset = mergeDraftDataset(datasetModel, datasetReq);
const userID = (req as any)?.userID;
_.set(draftDataset, "updated_by", userID )
_.set(draftDataset, "updated_by", userID)
const response = await datasetService.updateDraftDataset(draftDataset);
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: response });
}

const mergeDraftDataset = (datasetModel: Model<any, any> | null, datasetReq: any): Record<string, any> => {

const prev_dataset_config = _.get(datasetModel, ["dataset_config"])
const currentSchema = _.get(datasetModel, 'data_schema')
const fieldsRemoved = (datasetReq.data_schema) ? getMissingFieldsInNewSchema(datasetReq.data_schema, currentSchema) : []
const prev_dataset_config = _.get(datasetModel, ["dataset_config"])
const dataset: Record<string, any> = {
version_key: Date.now().toString(),
name: datasetReq.name || _.get(datasetModel, ["name"]),
id: _.get(datasetModel, ["id"])
}
if(datasetReq.validation_config) dataset["validation_config"] = datasetReq.validation_config
if(datasetReq.extraction_config) dataset["extraction_config"] = datasetReq.extraction_config
if(datasetReq.dedup_config) dataset["dedup_config"] = datasetReq.dedup_config
if(datasetReq.data_schema) dataset["data_schema"] = datasetReq.data_schema
if(datasetReq.dataset_config) dataset["dataset_config"] = { ...prev_dataset_config, ...datasetReq.dataset_config }
if(datasetReq.transformations_config || fieldsRemoved.length > 0)
if (datasetReq.validation_config) dataset["validation_config"] = datasetReq.validation_config
if (datasetReq.extraction_config) dataset["extraction_config"] = datasetReq.extraction_config
if (datasetReq.dedup_config) dataset["dedup_config"] = datasetReq.dedup_config
if (datasetReq.data_schema) dataset["data_schema"] = datasetReq.data_schema
if (datasetReq.dataset_config) dataset["dataset_config"] = { ...prev_dataset_config, ...datasetReq.dataset_config }
if (datasetReq.transformations_config || fieldsRemoved.length > 0)
dataset["transformations_config"] = mergeTransformationsConfig(_.get(datasetModel, ["transformations_config"]), datasetReq.transformations_config, fieldsRemoved)
if(datasetReq.denorm_config || fieldsRemoved.length > 0) dataset["denorm_config"] = mergeDenormConfig(_.get(datasetModel, ["denorm_config"]), datasetReq.denorm_config, fieldsRemoved)
if(datasetReq.connectors_config) dataset["connectors_config"] = mergeConnectorsConfig(_.get(datasetModel, ["connectors_config"]), datasetReq.connectors_config)
if(datasetReq.tags) dataset["tags"] = mergeTags(_.get(datasetModel, ["tags"]), datasetReq.tags)
if(datasetReq.sample_data) dataset["sample_data"] = datasetReq.sample_data
if(datasetReq.type) dataset["type"] = datasetReq.type
if(fieldsRemoved.length > 0) {
if (datasetReq.denorm_config || fieldsRemoved.length > 0) dataset["denorm_config"] = mergeDenormConfig(_.get(datasetModel, ["denorm_config"]), datasetReq.denorm_config, fieldsRemoved)
if (datasetReq.connectors_config) dataset["connectors_config"] = mergeConnectorsConfig(_.get(datasetModel, ["connectors_config"]), datasetReq.connectors_config)
if (datasetReq.tags) dataset["tags"] = mergeTags(_.get(datasetModel, ["tags"]), datasetReq.tags)
if (datasetReq.sample_data) dataset["sample_data"] = datasetReq.sample_data
if (datasetReq.type) dataset["type"] = datasetReq.type
if (fieldsRemoved.length > 0) {
const keys_config = _.get(dataset["dataset_config"] ? dataset["dataset_config"] : prev_dataset_config, ["keys_config"])
if(keys_config['data_key'] in fieldsRemoved) {
let modified = false;
if (_.includes(fieldsRemoved, keys_config['data_key'])) {
modified = true;
keys_config['data_key'] = '';
}
if(keys_config['primary_key'] in fieldsRemoved) {
keys_config['primary_key'] = '';
if (_.includes(fieldsRemoved, keys_config['partition_key'])) {
modified = true;
keys_config['partition_key'] = '';
}
if(keys_config['timestamp_key'] in fieldsRemoved) {
if (_.includes(fieldsRemoved, keys_config['timestamp_key'])) {
modified = true;
keys_config['timestamp_key'] = '';
}
_.set(dataset["dataset_config"], 'keys_config', keys_config)
if (modified) {
if (dataset["dataset_config"]) {
_.set(dataset["dataset_config"], 'keys_config', keys_config)
} else {
const keys_config = _.get(prev_dataset_config, ["keys_config"])
dataset["dataset_config"] = { ...prev_dataset_config }
_.set(dataset["dataset_config"], 'keys_config', keys_config)
}
}
}
return dataset;
}
Expand All @@ -111,22 +123,30 @@ const getMissingFieldsInNewSchema = (newSchema: any, oldSchema: any) => {
const fullPath = [...path, key].join('.');
if (!(key in newProperties)) {
removedFields.push(fullPath);
} else if (typeof oldProperties[key] === 'object' && typeof newProperties[key] === 'object') {
if (typeof oldProperties[key] === 'object' && oldProperties[key].properties) {
removedFields = removedFields.concat(getRemovedPropertiesFieldsNested(oldProperties[key].properties || {}, {}, [...path, key]));
}
}
else if (typeof oldProperties[key] === 'object' && typeof newProperties[key] === 'object') {
removedFields = removedFields.concat(
getRemovedPropertiesFieldsNested(oldProperties[key].properties || {}, newProperties[key].properties || {}, [...path, key])
getRemovedPropertiesFieldsNested(
oldProperties[key].properties || {},
newProperties[key].properties || {},
[...path, key]
)
);
}
}

return removedFields;
}

};


const getRemovedPropertiesFields = (oldSchema: any, newSchema: any): string[] => {
const oldProperties = oldSchema.properties || {};
const newProperties = newSchema.properties || {};
return getRemovedPropertiesFieldsNested(oldProperties, newProperties);
}

// Example usage
const removedFieldsNested = getRemovedPropertiesFields(oldSchema, newSchema);
return removedFieldsNested
Expand All @@ -135,41 +155,41 @@ const getMissingFieldsInNewSchema = (newSchema: any, oldSchema: any) => {
const mergeTransformationsConfig = (currentConfigs: any, newConfigs: any, fieldsRemoved: string[]) => {

let updatedConfigs = currentConfigs;
if(newConfigs) {
const removeConfigs = _.map(_.filter(newConfigs, {action: "remove"}), "value.field_key")
const addConfigs = _.map(_.filter(newConfigs, {action: "upsert"}), "value")
if (newConfigs && newConfigs.length) {
const removeConfigs = _.map(_.filter(newConfigs, { action: "remove" }), "value.field_key")
const addConfigs = _.map(_.filter(newConfigs, { action: "upsert" }), "value")

updatedConfigs = _.unionWith(
addConfigs,
_.reject(currentConfigs, (config) => { return _.includes(removeConfigs, config.field_key)}),
(a, b) => {
_.reject(currentConfigs, (config) => { return _.includes(removeConfigs, config.field_key) }),
(a, b) => {
return a.field_key === b.field_key
}
}
)
}
if(fieldsRemoved.length > 0) {
updatedConfigs = _.reject(updatedConfigs, (config) => { return _.includes(fieldsRemoved, config.field_key)})
if (fieldsRemoved.length > 0) {
updatedConfigs = _.reject(updatedConfigs, (config) => { return _.includes(fieldsRemoved, config.field_key) })
}
return updatedConfigs
}

const mergeDenormConfig = (currentConfig: any, newConfig: any, fieldsRemoved: string[]) => {

let updatedConfigs = currentConfig.denorm_fields;
if(newConfig) {
const removeConfigs = _.map(_.filter(newConfig.denorm_fields, {action: "remove"}), "value.denorm_out_field")
const addConfigs = _.map(_.filter(newConfig.denorm_fields, {action: "upsert"}), "value")
if (_.get(newConfig, "denorm_fields")) {
const removeConfigs = _.map(_.filter(newConfig.denorm_fields, { action: "remove" }), "value.denorm_out_field")
const addConfigs = _.map(_.filter(newConfig.denorm_fields, { action: "upsert" }), "value")

const denormFields = _.unionWith(
updatedConfigs = _.unionWith(
addConfigs,
_.reject(currentConfig.denorm_fields, (config) => { return _.includes(removeConfigs, config.denorm_out_field)}),
(a, b) => {
_.reject(currentConfig.denorm_fields, (config) => { return _.includes(removeConfigs, config.denorm_out_field) }),
(a, b) => {
return a.denorm_out_field === b.denorm_out_field
}
}
)
}
if(fieldsRemoved.length > 0) {
updatedConfigs = _.reject(currentConfig.denorm_fields, (config) => { return _.includes(fieldsRemoved, config.denorm_key)})
if (fieldsRemoved.length > 0) {
updatedConfigs = _.reject(updatedConfigs, (config) => { return _.includes(fieldsRemoved, config.denorm_key) })
}
return {
denorm_fields: updatedConfigs
Expand All @@ -178,8 +198,8 @@ const mergeDenormConfig = (currentConfig: any, newConfig: any, fieldsRemoved: st

const mergeConnectorsConfig = (currConfigs: any, newConfigs: any) => {

const removeConfigs = _.map(_.filter(newConfigs, {action: "remove"}), "value.connector_id")
const addConfigs = _.map(_.filter(newConfigs, {action: "upsert"}), "value")
const removeConfigs = _.map(_.filter(newConfigs, { action: "remove" }), "value.connector_id")
const addConfigs = _.map(_.filter(newConfigs, { action: "upsert" }), "value")

return _.unionWith(
_.map(addConfigs, (config) => {
Expand All @@ -191,17 +211,17 @@ const mergeConnectorsConfig = (currConfigs: any, newConfigs: any) => {
version: config.version
}
}),
_.reject(currConfigs, (config) => { return _.includes(removeConfigs, config.connector_id)}),
(a, b) => {
_.reject(currConfigs, (config) => { return _.includes(removeConfigs, config.connector_id) }),
(a, b) => {
return a.connector_id === b.connector_id
}
}
)
}

const mergeTags = (currentTags: any, newConfigs: any) => {

const tagsToRemove = _.map(_.filter(newConfigs, {action: "remove"}), "value")
const tagsToAdd = _.map(_.filter(newConfigs, {action: "upsert"}), "value")
const tagsToRemove = _.map(_.filter(newConfigs, { action: "remove" }), "value")
const tagsToAdd = _.map(_.filter(newConfigs, { action: "upsert" }), "value")
return _.union(_.pullAll(currentTags, tagsToRemove), tagsToAdd)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@
"default": true
}
},
"required": ["$schema", "type", "properties", "additionalProperties"],
"additionalProperties": false
"required": ["type", "properties"],
"additionalProperties": true
},
"dataset_config": {
"type": "object",
Expand Down

0 comments on commit 25ec1f0

Please sign in to comment.