Skip to content

Commit

Permalink
Merge pull request #305 from Sanketika-Obsrv/data-ingestion-fix
Browse files Browse the repository at this point in the history
fix #OBS-I479 ingestion batch structure fix
  • Loading branch information
ravismula authored Jan 9, 2025
2 parents b78add1 + ce5b774 commit ad844b9
Showing 1 changed file with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,34 @@ const dataIn = async (req: Request, res: Response) => {
logger.error({ apiId, message: `Dataset with id ${datasetId} not found in live table`, code: "DATASET_NOT_FOUND" })
return ResponseHandler.errorResponse(errorObject.datasetNotFound, req, res);
}
const { entry_topic, dataset_config, api_version } = dataset
const { entry_topic, dataset_config, extraction_config, api_version } = dataset
const entryTopic = api_version !== "v2" ? _.get(dataset_config, "entry_topic") : entry_topic
if (!entryTopic) {
logger.error({ apiId, message: "Entry topic not found", code: "TOPIC_NOT_FOUND" })
return ResponseHandler.errorResponse(errorObject.topicNotFound, req, res);
}
await send(addMetadataToEvents(datasetId, requestBody), entryTopic)
await send(addMetadataToEvents(datasetId, requestBody, extraction_config), entryTopic)
ResponseHandler.successResponse(req, res, { status: 200, data: { message: "Data ingested successfully" } });

}

const addMetadataToEvents = (datasetId: string, payload: any) => {
const addMetadataToEvents = (datasetId: string, payload: any, extraction_config: any) => {
const validData = _.get(payload, "data");
const now = Date.now();
const mid = _.get(payload, "params.msgid");
const source = { id: "api.data.in", version: config?.version, entry_source: "api" };
const obsrvMeta = { syncts: now, flags: {}, timespans: {}, error: {}, source: source };
if (Array.isArray(validData)) {
const payloadRef = validData.map((event: any) => {
const payload = {
event,
const extraction_key: string = _.get(extraction_config, "extraction_key", 'events');
const dedup_key: string = _.get(extraction_config, "dedup_config.dedup_key", 'id');
const payload: any = {
"obsrv_meta": obsrvMeta,
"dataset": datasetId,
"msgid": mid
}
};
payload[extraction_key] = validData;
payload[dedup_key] = mid
return payload;
})
return payloadRef;
}
else {
return ({
Expand Down

0 comments on commit ad844b9

Please sign in to comment.