diff --git a/api-service/src/controllers/DataIngestion/DataIngestionController.ts b/api-service/src/controllers/DataIngestion/DataIngestionController.ts index 46f829e9..5044bb43 100644 --- a/api-service/src/controllers/DataIngestion/DataIngestionController.ts +++ b/api-service/src/controllers/DataIngestion/DataIngestionController.ts @@ -39,34 +39,34 @@ const dataIn = async (req: Request, res: Response) => { logger.error({ apiId, message: `Dataset with id ${datasetId} not found in live table`, code: "DATASET_NOT_FOUND" }) return ResponseHandler.errorResponse(errorObject.datasetNotFound, req, res); } - const { entry_topic, dataset_config, api_version } = dataset + const { entry_topic, dataset_config, extraction_config, api_version } = dataset const entryTopic = api_version !== "v2" ? _.get(dataset_config, "entry_topic") : entry_topic if (!entryTopic) { logger.error({ apiId, message: "Entry topic not found", code: "TOPIC_NOT_FOUND" }) return ResponseHandler.errorResponse(errorObject.topicNotFound, req, res); } - await send(addMetadataToEvents(datasetId, requestBody), entryTopic) + await send(addMetadataToEvents(datasetId, requestBody, extraction_config), entryTopic) ResponseHandler.successResponse(req, res, { status: 200, data: { message: "Data ingested successfully" } }); } -const addMetadataToEvents = (datasetId: string, payload: any) => { +const addMetadataToEvents = (datasetId: string, payload: any, extraction_config: any) => { const validData = _.get(payload, "data"); const now = Date.now(); const mid = _.get(payload, "params.msgid"); const source = { id: "api.data.in", version: config?.version, entry_source: "api" }; const obsrvMeta = { syncts: now, flags: {}, timespans: {}, error: {}, source: source }; if (Array.isArray(validData)) { - const payloadRef = validData.map((event: any) => { - const payload = { - event, + const extraction_key: string = _.get(extraction_config, "extraction_key", 'events'); + const dedup_key: string = _.get(extraction_config, "dedup_config.dedup_key", 'id'); + const payload: any = { "obsrv_meta": obsrvMeta, "dataset": datasetId, "msgid": mid - } + }; + payload[extraction_key] = validData; + payload[dedup_key] = mid return payload; - }) - return payloadRef; } else { return ({