diff --git a/.github/workflows/build_and_deploy.yaml b/.github/workflows/build_and_deploy.yaml index 0972fbdd..ede65190 100644 --- a/.github/workflows/build_and_deploy.yaml +++ b/.github/workflows/build_and_deploy.yaml @@ -14,7 +14,7 @@ jobs: - name: Check if tag is one in list of current releases id: tag-checker run: | - (echo -n TRIGGER_ALLOWED= && echo 'print("${{ github.ref_name }}".split("_")[0] in "${{ vars.CURRENT_RELEASE }}")' | python3) >> "$GITHUB_OUTPUT" + (echo -n TRIGGER_ALLOWED= && echo 'print("${{ github.ref_name }}".split("_")[0] not in "${{ vars.CURRENT_RELEASE }}")' | python3) >> "$GITHUB_OUTPUT" docker-build-api-service: needs: check-tag if: needs.check-tag.outputs.ALLOWED_TAG == 'True' diff --git a/api-service/package-lock.json b/api-service/package-lock.json index a66580df..9cae2448 100644 --- a/api-service/package-lock.json +++ b/api-service/package-lock.json @@ -13,6 +13,7 @@ "@aws-sdk/credential-providers": "^3.309.0", "@aws-sdk/lib-storage": "^3.182.0", "@aws-sdk/s3-request-presigner": "^3.173.0", + "@azure/storage-blob": "^12.16.0", "@google-cloud/storage": "^6.5.2", "@project-sunbird/logger": "^0.0.9", "ajv": "^8.11.2", @@ -972,6 +973,140 @@ "node": ">=14.0.0" } }, + "node_modules/@azure/abort-controller": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@azure/abort-controller/-/abort-controller-1.1.0.tgz", + "integrity": "sha512-TrRLIoSQVzfAJX9H1JeFjzAoDGcoK1IYX1UImfceTZpsyYfWr09Ss1aHW1y5TrrR3iq6RZLBwJ3E24uwPhwahw==", + "dependencies": { + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=12.0.0" + } + }, + "node_modules/@azure/core-auth": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/@azure/core-auth/-/core-auth-1.5.0.tgz", + "integrity": "sha512-udzoBuYG1VBoHVohDTrvKjyzel34zt77Bhp7dQntVGGD0ehVq48owENbBG8fIgkHRNUBQH5k1r0hpoMu5L8+kw==", + "dependencies": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-util": "^1.1.0", + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@azure/core-http": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@azure/core-http/-/core-http-3.0.3.tgz", + "integrity": "sha512-QMib3wXotJMFhHgmJBPUF9YsyErw34H0XDFQd9CauH7TPB+RGcyl9Ayy7iURtJB04ngXhE6YwrQsWDXlSLrilg==", + "dependencies": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-auth": "^1.3.0", + "@azure/core-tracing": "1.0.0-preview.13", + "@azure/core-util": "^1.1.1", + "@azure/logger": "^1.0.0", + "@types/node-fetch": "^2.5.0", + "@types/tunnel": "^0.0.3", + "form-data": "^4.0.0", + "node-fetch": "^2.6.7", + "process": "^0.11.10", + "tslib": "^2.2.0", + "tunnel": "^0.0.6", + "uuid": "^8.3.0", + "xml2js": "^0.5.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@azure/core-http/node_modules/uuid": { + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", + "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==", + "bin": { + "uuid": "dist/bin/uuid" + } + }, + "node_modules/@azure/core-lro": { + "version": "2.5.4", + "resolved": "https://registry.npmjs.org/@azure/core-lro/-/core-lro-2.5.4.tgz", + "integrity": "sha512-3GJiMVH7/10bulzOKGrrLeG/uCBH/9VtxqaMcB9lIqAeamI/xYQSHJL/KcsLDuH+yTjYpro/u6D/MuRe4dN70Q==", + "dependencies": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-util": "^1.2.0", + "@azure/logger": "^1.0.0", + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@azure/core-paging": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/@azure/core-paging/-/core-paging-1.5.0.tgz", + "integrity": "sha512-zqWdVIt+2Z+3wqxEOGzR5hXFZ8MGKK52x4vFLw8n58pR6ZfKRx3EXYTxTaYxYHc/PexPUTyimcTWFJbji9Z6Iw==", + "dependencies": { + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@azure/core-tracing": { + "version": "1.0.0-preview.13", + "resolved": "https://registry.npmjs.org/@azure/core-tracing/-/core-tracing-1.0.0-preview.13.tgz", + "integrity": "sha512-KxDlhXyMlh2Jhj2ykX6vNEU0Vou4nHr025KoSEiz7cS3BNiHNaZcdECk/DmLkEB0as5T7b/TpRcehJ5yV6NeXQ==", + "dependencies": { + "@opentelemetry/api": "^1.0.1", + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=12.0.0" + } + }, + "node_modules/@azure/core-util": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/@azure/core-util/-/core-util-1.5.0.tgz", + "integrity": "sha512-GZBpVFDtQ/15hW1OgBcRdT4Bl7AEpcEZqLfbAvOtm1CQUncKWiYapFHVD588hmlV27NbOOtSm3cnLF3lvoHi4g==", + "dependencies": { + "@azure/abort-controller": "^1.0.0", + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@azure/logger": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@azure/logger/-/logger-1.0.4.tgz", + "integrity": "sha512-ustrPY8MryhloQj7OWGe+HrYx+aoiOxzbXTtgblbV3xwCqpzUK36phH3XNHQKj3EPonyFUuDTfR3qFhTEAuZEg==", + "dependencies": { + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@azure/storage-blob": { + "version": "12.16.0", + "resolved": "https://registry.npmjs.org/@azure/storage-blob/-/storage-blob-12.16.0.tgz", + "integrity": "sha512-jz33rUSUGUB65FgYrTRgRDjG6hdPHwfvHe+g/UrwVG8MsyLqSxg9TaW7Yuhjxu1v1OZ5xam2NU6+IpCN0xJO8Q==", + "dependencies": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-http": "^3.0.0", + "@azure/core-lro": "^2.2.0", + "@azure/core-paging": "^1.1.1", + "@azure/core-tracing": "1.0.0-preview.13", + "@azure/logger": "^1.0.0", + "events": "^3.0.0", + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/@babel/code-frame": { "version": "7.22.13", "resolved": "https://registry.npmjs.org/@babel/code-frame/-/code-frame-7.22.13.tgz", @@ -1723,6 +1858,14 @@ "@jridgewell/sourcemap-codec": "1.4.14" } }, + "node_modules/@opentelemetry/api": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.6.0.tgz", + "integrity": "sha512-OWlrQAnWn9577PhVgqjUvMr1pg57Bc4jv0iL4w0PRuOSRvq67rvHW9Ie/dZVMvCzhSCB+UxhcY/PmCmFj33Q+g==", + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/@project-sunbird/logger": { "version": "0.0.9", "resolved": "https://registry.npmjs.org/@project-sunbird/logger/-/logger-0.0.9.tgz", @@ -2601,9 +2744,17 @@ }, "node_modules/@types/node": { "version": "18.15.3", - "dev": true, "license": "MIT" }, + "node_modules/@types/node-fetch": { + "version": "2.6.6", + "resolved": "https://registry.npmjs.org/@types/node-fetch/-/node-fetch-2.6.6.tgz", + "integrity": "sha512-95X8guJYhfqiuVVhRFxVQcf4hW/2bCuoPwDasMf/531STFoNoWTT7YDnWdXHEZKqAGUigmpG31r2FE70LwnzJw==", + "dependencies": { + "@types/node": "*", + "form-data": "^4.0.0" + } + }, "node_modules/@types/pg": { "version": "8.6.6", "dev": true, @@ -2647,6 +2798,14 @@ "resolved": "https://registry.npmjs.org/@types/triple-beam/-/triple-beam-1.3.3.tgz", "integrity": "sha512-6tOUG+nVHn0cJbVp25JFayS5UE6+xlbcNF9Lo9mU7U0zk3zeUShZied4YEQZjy1JBF043FSkdXw8YkUJuVtB5g==" }, + "node_modules/@types/tunnel": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/@types/tunnel/-/tunnel-0.0.3.tgz", + "integrity": "sha512-sOUTGn6h1SfQ+gbgqC364jLFBw2lnFqkgF3q0WovEHRLMrVD1sd5aufqi/aJObLekJO+Aq5z646U4Oxy6shXMA==", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/uuid": { "version": "9.0.5", "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.5.tgz", @@ -6424,6 +6583,14 @@ "node": ">=6" } }, + "node_modules/process": { + "version": "0.11.10", + "resolved": "https://registry.npmjs.org/process/-/process-0.11.10.tgz", + "integrity": "sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A==", + "engines": { + "node": ">= 0.6.0" + } + }, "node_modules/process-nextick-args": { "version": "2.0.1", "license": "MIT" @@ -7432,6 +7599,14 @@ "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" }, + "node_modules/tunnel": { + "version": "0.0.6", + "resolved": "https://registry.npmjs.org/tunnel/-/tunnel-0.0.6.tgz", + "integrity": "sha512-1h/Lnq9yajKY2PEbBadPXj3VxsDDu844OnaAo52UVmIzIvwwtBPIuNvkjuzBlTWpfJyUbG3ez0KSBibQkj4ojg==", + "engines": { + "node": ">=0.6.11 <=0.7.0 || >=0.7.3" + } + }, "node_modules/tunnel-agent": { "version": "0.6.0", "resolved": "https://registry.npmjs.org/tunnel-agent/-/tunnel-agent-0.6.0.tgz", @@ -8706,6 +8881,112 @@ "tslib": "^2.5.0" } }, + "@azure/abort-controller": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@azure/abort-controller/-/abort-controller-1.1.0.tgz", + "integrity": "sha512-TrRLIoSQVzfAJX9H1JeFjzAoDGcoK1IYX1UImfceTZpsyYfWr09Ss1aHW1y5TrrR3iq6RZLBwJ3E24uwPhwahw==", + "requires": { + "tslib": "^2.2.0" + } + }, + "@azure/core-auth": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/@azure/core-auth/-/core-auth-1.5.0.tgz", + "integrity": "sha512-udzoBuYG1VBoHVohDTrvKjyzel34zt77Bhp7dQntVGGD0ehVq48owENbBG8fIgkHRNUBQH5k1r0hpoMu5L8+kw==", + "requires": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-util": "^1.1.0", + "tslib": "^2.2.0" + } + }, + "@azure/core-http": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/@azure/core-http/-/core-http-3.0.3.tgz", + "integrity": "sha512-QMib3wXotJMFhHgmJBPUF9YsyErw34H0XDFQd9CauH7TPB+RGcyl9Ayy7iURtJB04ngXhE6YwrQsWDXlSLrilg==", + "requires": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-auth": "^1.3.0", + "@azure/core-tracing": "1.0.0-preview.13", + "@azure/core-util": "^1.1.1", + "@azure/logger": "^1.0.0", + "@types/node-fetch": "^2.5.0", + "@types/tunnel": "^0.0.3", + "form-data": "^4.0.0", + "node-fetch": "^2.6.7", + "process": "^0.11.10", + "tslib": "^2.2.0", + "tunnel": "^0.0.6", + "uuid": "^8.3.0", + "xml2js": "^0.5.0" + }, + "dependencies": { + "uuid": { + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", + "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==" + } + } + }, + "@azure/core-lro": { + "version": "2.5.4", + "resolved": "https://registry.npmjs.org/@azure/core-lro/-/core-lro-2.5.4.tgz", + "integrity": "sha512-3GJiMVH7/10bulzOKGrrLeG/uCBH/9VtxqaMcB9lIqAeamI/xYQSHJL/KcsLDuH+yTjYpro/u6D/MuRe4dN70Q==", + "requires": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-util": "^1.2.0", + "@azure/logger": "^1.0.0", + "tslib": "^2.2.0" + } + }, + "@azure/core-paging": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/@azure/core-paging/-/core-paging-1.5.0.tgz", + "integrity": "sha512-zqWdVIt+2Z+3wqxEOGzR5hXFZ8MGKK52x4vFLw8n58pR6ZfKRx3EXYTxTaYxYHc/PexPUTyimcTWFJbji9Z6Iw==", + "requires": { + "tslib": "^2.2.0" + } + }, + "@azure/core-tracing": { + "version": "1.0.0-preview.13", + "resolved": "https://registry.npmjs.org/@azure/core-tracing/-/core-tracing-1.0.0-preview.13.tgz", + "integrity": "sha512-KxDlhXyMlh2Jhj2ykX6vNEU0Vou4nHr025KoSEiz7cS3BNiHNaZcdECk/DmLkEB0as5T7b/TpRcehJ5yV6NeXQ==", + "requires": { + "@opentelemetry/api": "^1.0.1", + "tslib": "^2.2.0" + } + }, + "@azure/core-util": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/@azure/core-util/-/core-util-1.5.0.tgz", + "integrity": "sha512-GZBpVFDtQ/15hW1OgBcRdT4Bl7AEpcEZqLfbAvOtm1CQUncKWiYapFHVD588hmlV27NbOOtSm3cnLF3lvoHi4g==", + "requires": { + "@azure/abort-controller": "^1.0.0", + "tslib": "^2.2.0" + } + }, + "@azure/logger": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@azure/logger/-/logger-1.0.4.tgz", + "integrity": "sha512-ustrPY8MryhloQj7OWGe+HrYx+aoiOxzbXTtgblbV3xwCqpzUK36phH3XNHQKj3EPonyFUuDTfR3qFhTEAuZEg==", + "requires": { + "tslib": "^2.2.0" + } + }, + "@azure/storage-blob": { + "version": "12.16.0", + "resolved": "https://registry.npmjs.org/@azure/storage-blob/-/storage-blob-12.16.0.tgz", + "integrity": "sha512-jz33rUSUGUB65FgYrTRgRDjG6hdPHwfvHe+g/UrwVG8MsyLqSxg9TaW7Yuhjxu1v1OZ5xam2NU6+IpCN0xJO8Q==", + "requires": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-http": "^3.0.0", + "@azure/core-lro": "^2.2.0", + "@azure/core-paging": "^1.1.1", + "@azure/core-tracing": "1.0.0-preview.13", + "@azure/logger": "^1.0.0", + "events": "^3.0.0", + "tslib": "^2.2.0" + } + }, "@babel/code-frame": { "version": "7.22.13", "resolved": "https://registry.npmjs.org/@babel/code-frame/-/code-frame-7.22.13.tgz", @@ -9250,6 +9531,11 @@ "@jridgewell/sourcemap-codec": "1.4.14" } }, + "@opentelemetry/api": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.6.0.tgz", + "integrity": "sha512-OWlrQAnWn9577PhVgqjUvMr1pg57Bc4jv0iL4w0PRuOSRvq67rvHW9Ie/dZVMvCzhSCB+UxhcY/PmCmFj33Q+g==" + }, "@project-sunbird/logger": { "version": "0.0.9", "resolved": "https://registry.npmjs.org/@project-sunbird/logger/-/logger-0.0.9.tgz", @@ -9977,8 +10263,16 @@ } }, "@types/node": { - "version": "18.15.3", - "dev": true + "version": "18.15.3" + }, + "@types/node-fetch": { + "version": "2.6.6", + "resolved": "https://registry.npmjs.org/@types/node-fetch/-/node-fetch-2.6.6.tgz", + "integrity": "sha512-95X8guJYhfqiuVVhRFxVQcf4hW/2bCuoPwDasMf/531STFoNoWTT7YDnWdXHEZKqAGUigmpG31r2FE70LwnzJw==", + "requires": { + "@types/node": "*", + "form-data": "^4.0.0" + } }, "@types/pg": { "version": "8.6.6", @@ -10018,6 +10312,14 @@ "resolved": "https://registry.npmjs.org/@types/triple-beam/-/triple-beam-1.3.3.tgz", "integrity": "sha512-6tOUG+nVHn0cJbVp25JFayS5UE6+xlbcNF9Lo9mU7U0zk3zeUShZied4YEQZjy1JBF043FSkdXw8YkUJuVtB5g==" }, + "@types/tunnel": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/@types/tunnel/-/tunnel-0.0.3.tgz", + "integrity": "sha512-sOUTGn6h1SfQ+gbgqC364jLFBw2lnFqkgF3q0WovEHRLMrVD1sd5aufqi/aJObLekJO+Aq5z646U4Oxy6shXMA==", + "requires": { + "@types/node": "*" + } + }, "@types/uuid": { "version": "9.0.5", "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.5.tgz", @@ -12579,6 +12881,11 @@ "which-pm-runs": "^1.0.0" } }, + "process": { + "version": "0.11.10", + "resolved": "https://registry.npmjs.org/process/-/process-0.11.10.tgz", + "integrity": "sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A==" + }, "process-nextick-args": { "version": "2.0.1" }, @@ -13286,6 +13593,11 @@ "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" }, + "tunnel": { + "version": "0.0.6", + "resolved": "https://registry.npmjs.org/tunnel/-/tunnel-0.0.6.tgz", + "integrity": "sha512-1h/Lnq9yajKY2PEbBadPXj3VxsDDu844OnaAo52UVmIzIvwwtBPIuNvkjuzBlTWpfJyUbG3ez0KSBibQkj4ojg==" + }, "tunnel-agent": { "version": "0.6.0", "resolved": "https://registry.npmjs.org/tunnel-agent/-/tunnel-agent-0.6.0.tgz", diff --git a/api-service/package.json b/api-service/package.json index 37c3b926..e29ac044 100644 --- a/api-service/package.json +++ b/api-service/package.json @@ -15,6 +15,7 @@ "@aws-sdk/credential-providers": "^3.309.0", "@aws-sdk/lib-storage": "^3.182.0", "@aws-sdk/s3-request-presigner": "^3.173.0", + "@azure/storage-blob": "^12.16.0", "@google-cloud/storage": "^6.5.2", "@project-sunbird/logger": "^0.0.9", "ajv": "^8.11.2", @@ -33,12 +34,12 @@ "knex": "^2.4.2", "lodash": "^4.17.21", "moment": "^2.29.4", + "multiparty": "4.2.1", "pg": "^8.8.0", "prom-client": "^14.2.0", - "winston": "~2.4.3", - "winston-daily-rotate-file": "~3.2.1", "uuid": "3.1.0", - "multiparty": "4.2.1" + "winston": "~2.4.3", + "winston-daily-rotate-file": "~3.2.1" }, "overrides": { "semver": "^7.5.3", @@ -47,7 +48,6 @@ "devDependencies": { "@types/chai": "^4.3.3", "@types/chai-as-promised": "^7.1.5", - "@types/moment": "^2.13.0", "@types/chai-spies": "^1.0.3", "@types/compression": "^1.7.2", "@types/express": "^4.17.14", @@ -57,6 +57,7 @@ "@types/lodash": "^4.14.190", "@types/mocha": "^10.0.0", "@types/mock-knex": "^0.4.4", + "@types/moment": "^2.13.0", "@types/node": "^18.11.9", "@types/pg": "^8.6.6", "@types/uuid": "^9.0.1", diff --git a/api-service/src/configs/Config.ts b/api-service/src/configs/Config.ts index 9313956a..f1196f07 100644 --- a/api-service/src/configs/Config.ts +++ b/api-service/src/configs/Config.ts @@ -3,6 +3,7 @@ export const config = { "env": process.env.system_env || "local", "api_port": process.env.api_port || 3000, "body_parser_limit": process.env.body_parser_limit || "100mb", + "version": "1.0", "query_api": { "druid": { "host": process.env.druid_host || "http://localhost", diff --git a/api-service/src/connectors/DbConnector.ts b/api-service/src/connectors/DbConnector.ts index b51b227b..30909995 100644 --- a/api-service/src/connectors/DbConnector.ts +++ b/api-service/src/connectors/DbConnector.ts @@ -7,6 +7,15 @@ import { config as appConfig } from "../configs/Config" import _ from 'lodash' import { wrapperService } from "../routes/Router"; const schemaMerger = new SchemaMerger() + +const OP_TYPES = { + INSERT: "insert", + UPDATE: "update", + UPSERT: "upsert", + READ: "read", + LIST: "list", + DELETE: "delete", +} export class DbConnector implements IConnector { public pool: Knex private config: DbConnectorConfig @@ -43,7 +52,13 @@ export class DbConnector implements IConnector { public async insertRecord(table: string, fields: any) { await this.pool.transaction(async (dbTransaction) => { await this.submit_ingestion(_.get(fields, 'ingestion_spec'), table) - await dbTransaction(table).insert(fields) + await dbTransaction(table).insert(fields).on('query-error', (error: any) => { + this.log_error(OP_TYPES.INSERT, error, table, fields); + throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code} + }).on('error', (error: any) => { + this.log_error(OP_TYPES.INSERT, error, table, fields); + throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code} + }); }) } @@ -53,7 +68,13 @@ export class DbConnector implements IConnector { const currentRecord = await dbTransaction(table).select(Object.keys(values)).where(filters).first() if (_.isUndefined(currentRecord)) { throw constants.FAILED_RECORD_UPDATE } if (!_.isUndefined(currentRecord.tags)) { delete currentRecord.tags } - await dbTransaction(table).where(filters).update(schemaMerger.mergeSchema(currentRecord, values)) + await dbTransaction(table).where(filters).update(schemaMerger.mergeSchema(currentRecord, values)).on('query-error', (error: any) => { + this.log_error(OP_TYPES.UPDATE, error, table, values); + throw {...constants.FAILED_RECORD_UPDATE, "errCode": error.code} + }).on('error', (error: any) => { + this.log_error(OP_TYPES.INSERT, error, table, values); + throw {...constants.FAILED_RECORD_UPDATE, "errCode": error.code} + }); }) } @@ -63,12 +84,24 @@ export class DbConnector implements IConnector { if (!_.isUndefined(existingRecord)) { await this.pool.transaction(async (dbTransaction) => { await this.submit_ingestion(_.get(values, 'ingestion_spec'), table) - await dbTransaction(table).where(filters).update(schemaMerger.mergeSchema(existingRecord, values)) + await dbTransaction(table).where(filters).update(schemaMerger.mergeSchema(existingRecord, values)).on('query-error', (error: any) => { + this.log_error(OP_TYPES.UPSERT, error, table, values); + throw {...constants.FAILED_RECORD_UPDATE, "errCode": error.code} + }).on('error', (error: any) => { + this.log_error(OP_TYPES.INSERT, error, table, values); + throw constants.FAILED_RECORD_CREATE + }); }) } else { await this.pool.transaction(async (dbTransaction) => { await this.submit_ingestion(_.get(values, 'ingestion_spec'), table) - await dbTransaction(table).insert(values) + await dbTransaction(table).insert(values).on('query-error', (error: any) => { + this.log_error(OP_TYPES.UPSERT, error, table, values); + throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code} + }).on('error', (error: any) => { + this.log_error(OP_TYPES.INSERT, error, table, values); + throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code} + }); }) } } @@ -84,13 +117,25 @@ export class DbConnector implements IConnector { } } delete filters.status; - builder.where(filters); + builder.where(filters).on('query-error', (error: any) => { + this.log_error(OP_TYPES.READ, error, table, filters); + throw {...constants.FAILED_RECORD_FETCH, "errCode": error.code} + }).on('error', (error: any) => { + this.log_error(OP_TYPES.INSERT, error, table, filters); + throw {...constants.FAILED_RECORD_FETCH, "errCode": error.code} + }); }) return await query } public async listRecords(table: string) { - return await this.pool.select('*').from(table) + return await this.pool.select('*').from(table).on('query-error', (error: any) => { + this.log_error(OP_TYPES.LIST, error, table, {}); + throw {...constants.FAILED_RECORD_FETCH, "errCode": error.code} + }).on('error', (error: any) => { + this.log_error(OP_TYPES.INSERT, error, table, {}); + throw {...constants.FAILED_RECORD_FETCH, "errCode": error.code} + }) } private async submit_ingestion(ingestion_spec: Record, table: string) { @@ -98,9 +143,18 @@ export class DbConnector implements IConnector { return await wrapperService.submitIngestion(ingestion_spec) .catch((error: any) => { console.error(constants.INGESTION_FAILED_ON_SAVE) - throw constants.FAILED_RECORD_UPDATE + throw {...constants.FAILED_RECORD_UPDATE, "errCode": error.code} }) } return } -} \ No newline at end of file + + private log_error(op_type: string, error: any, table: string, values: any) { + console.log(` + Error occured for operation ${op_type} - + Table - ${table} + Values - ${JSON.stringify(values)} + Error - ${JSON.stringify(error)} + `); + } +} diff --git a/api-service/src/helpers/DatasetSourceConfigs.ts b/api-service/src/helpers/DatasetSourceConfigs.ts index f429e80b..b31e0602 100644 --- a/api-service/src/helpers/DatasetSourceConfigs.ts +++ b/api-service/src/helpers/DatasetSourceConfigs.ts @@ -39,7 +39,7 @@ export class DatasetSourceConfigs { } public removeNullValues(payload: any) { - Object.keys(payload).forEach((value) => { + Object.keys(payload).map((value) => { if (_.isEmpty(payload[value])) delete payload[value] }) return payload @@ -48,4 +48,4 @@ export class DatasetSourceConfigs { public getDefaults() { return {...defaultConfig.sourceConfig} } -} \ No newline at end of file +} diff --git a/api-service/src/helpers/Datasets.ts b/api-service/src/helpers/Datasets.ts index 2f3b817f..c9296494 100644 --- a/api-service/src/helpers/Datasets.ts +++ b/api-service/src/helpers/Datasets.ts @@ -54,7 +54,7 @@ export class Datasets { } public removeNullValues(payload: any) { - Object.keys(payload).forEach((value) => { + Object.keys(payload).map((value) => { if (_.isEmpty(payload[value])) delete payload[value] }) return payload @@ -68,4 +68,4 @@ export class Datasets { return {...defaultConfig.dataset} } } -} \ No newline at end of file +} diff --git a/api-service/src/helpers/Datasources.ts b/api-service/src/helpers/Datasources.ts index a9aca4a3..59c1197f 100644 --- a/api-service/src/helpers/Datasources.ts +++ b/api-service/src/helpers/Datasources.ts @@ -48,7 +48,7 @@ export class Datasources { } public removeNullValues(payload: any) { - Object.keys(payload).forEach((value) => { + Object.keys(payload).map((value) => { if (_.isEmpty(payload[value])) delete payload[value] }) return payload diff --git a/api-service/src/helpers/ErrorResponseHandler.ts b/api-service/src/helpers/ErrorResponseHandler.ts new file mode 100644 index 00000000..de6b93c9 --- /dev/null +++ b/api-service/src/helpers/ErrorResponseHandler.ts @@ -0,0 +1,32 @@ +import { NextFunction, Request, Response } from "express"; +import httpStatus from "http-status"; +import { setAuditState } from "../services/telemetry"; + +export class ErrorResponseHandler { + private serviceName: string; + constructor(serviceName: string) { + this.serviceName = serviceName; + } + public handleError(req: Request, res: Response, next: NextFunction, error: any, audit: boolean = true): any { + console.error("Error in " + this.serviceName) + console.error(JSON.stringify({ + "ts": Date.now(), + "body": req.body, + "headers": req.headers, + "url": req.url, + "error": { + "message": error.message, + "stack": error.stack, + "data": error.data, + "code": error.code, + "error": error, + } + })); + if(audit) setAuditState("failed", req); + next({ + statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, + message: error.message, + errCode: httpStatus[`${error.status}_NAME`] || httpStatus["500_NAME"], + }); + } +}; diff --git a/api-service/src/helpers/prometheus/index.ts b/api-service/src/helpers/prometheus/index.ts index 2108ca56..206cc139 100644 --- a/api-service/src/helpers/prometheus/index.ts +++ b/api-service/src/helpers/prometheus/index.ts @@ -7,7 +7,7 @@ const register = new client.Registry(); const configureRegistry = (register: client.Registry) => { register.setDefaultLabels({ release: 'monitoring' }); - metrics.forEach(metric => { + metrics.map(metric => { register.registerMetric(metric); }) } diff --git a/api-service/src/lib/client-cloud-services/AWSStorageService.js b/api-service/src/lib/client-cloud-services/AWSStorageService.js index a5079a17..d8661ec4 100644 --- a/api-service/src/lib/client-cloud-services/AWSStorageService.js +++ b/api-service/src/lib/client-cloud-services/AWSStorageService.js @@ -173,7 +173,7 @@ class AWSStorageService extends BaseStorageService { } async.parallel(getBlogRequest, (err, results) => { if (results) { - results.forEach((blob) => { + results.map((blob) => { if (blob.error) { responseData[_.get(blob, "error.reportname")] = blob.error; } else { @@ -412,8 +412,8 @@ class AWSStorageService extends BaseStorageService { })) } let S3Objects = await Promise.all(promises); - S3Objects.forEach((S3Object) => { - S3Object.Contents?.forEach((content) => { + S3Objects.map((S3Object) => { + S3Object.Contents?.map((content) => { result.push((content.Key || "")); }) }); diff --git a/api-service/src/lib/client-cloud-services/AzureStorageService.js b/api-service/src/lib/client-cloud-services/AzureStorageService.js index 352563e0..4c2e053c 100644 --- a/api-service/src/lib/client-cloud-services/AzureStorageService.js +++ b/api-service/src/lib/client-cloud-services/AzureStorageService.js @@ -2,345 +2,404 @@ * @file - Azure Storage Service * @exports - `AzureStorageService` * @since - 5.0.1 - * @version - 1.0.0 + * @version - 2.0.0 * @implements - BaseStorageService + * + * @see {@link https://learn.microsoft.com/en-us/javascript/api/@azure/storage-blob/?view=azure-node-latest | Azure Blob Documentation} + * @see {@link https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/storage/storage-blob/MigrationGuide.md#uploading-a-blob-to-the-container | Azure Migration Guide} */ const BaseStorageService = require("./BaseStorageService"); -const azure = require("azure-storage"); const { logger } = require("@project-sunbird/logger"); -const async = require("async"); const _ = require("lodash"); -const dateFormat = require("dateformat"); -const uuidv1 = require("uuid/v1"); -const multiparty = require("multiparty"); +const { TextDecoder } = require("util"); +const { config: globalConfig } = require("../../configs/Config"); +const moment = require("moment"); +const { + BlobServiceClient, + StorageSharedKeyCredential, + generateBlobSASQueryParameters, + ContainerClient, +} = require("@azure/storage-blob"); +const { getFileKey } = require("../../utils/common"); +const READ = "r"; class AzureStorageService extends BaseStorageService { - constructor(config) { - super(); - if (!_.get(config, "identity") || !_.get(config, "credential")) { - throw new Error("Azure__StorageService :: Required configuration is missing"); + constructor(config) { + super(); + if (!_.get(config, "identity") || !_.get(config, "credential")) { + throw new Error( + "Azure__StorageService :: Required configuration is missing" + ); + } + try { + this.sharedKeyCredential = new StorageSharedKeyCredential( + config?.identity, + config?.credential + ); + this.blobService = new BlobServiceClient( + `https://${config?.identity}.blob.core.windows.net`, + this.sharedKeyCredential + ); + this.containerClient = new ContainerClient( + `https://${config?.identity}.blob.core.windows.net/${globalConfig?.exhaust_config?.container}`, + this.sharedKeyCredential + ); + } catch (error) { + logger.info({ + msg: "Azure__StorageService - Unable to create Azure client", + }); + } } - this.reportsContainer = _.get(config, "reportsContainer")?.toString(); - this.blobService = azure.createBlobService(config?.identity, config?.credential); - } - fileExists(container, fileToGet, callback) { - if (!container || !fileToGet || !callback) throw new Error("Invalid arguments"); - logger.info({ msg: "Azure__StorageService - fileExists called for container " + container + " for file " + fileToGet }); - this.blobService.doesBlobExist(container, fileToGet, (err, response) => { - if (err) { - callback(err); - } else { - callback(null, response); - } - }); - } - /** - * @description - Retrieves a shared access signature token - * @param { string } container - Container name - * @param { string } blob - Blob to be fetched - * @param { azure.common.SharedAccessPolicy } sharedAccessPolicy - Shared access policy - * @param { azure.common.ContentSettingsHeaders } headers - Optional header values to set for a blob returned wth this SAS - * @return { string } - The shared access signature - */ - generateSharedAccessSignature(container, blob, sharedAccessPolicy, headers) { - return this.blobService.generateSharedAccessSignature(container, blob, sharedAccessPolicy, headers); - } + async fileExists(container, fileToGet, callback) { + if (!container || !fileToGet || !callback) + throw new Error("Invalid arguments"); + logger.info({ + msg: + "Azure__StorageService - fileExists called for container " + + container + + " for file " + + fileToGet, + }); + const blobClient = this.blobService + .getContainerClient(container) + .getBlobClient(fileToGet); + try { + const blobProperties = await blobClient.getProperties(); + if (blobProperties) { + const response = { + exists: true, + }; + callback(null, response); + } + } catch (error) { + callback(error); + } + } - /** - * @description - Retrieves a blob or container URL - * @param { string } container - Container name - * @param { string } blob - Blob to be fetched - * @param { string } SASToken - Shared Access Signature token - * @return { string } - Formatted URL string - */ - getUrl(container, blob, SASToken) { - return this.blobService.getUrl(container, blob, SASToken); - } + /** + * @description - Retrieves a shared access signature token + * @param { string } container - Container name + * @param { string } blob - Blob to be fetched + * @param { azure.common.SharedAccessPolicy } sharedAccessPolicy - Shared access policy + * @param { azure.common.ContentSettingsHeaders } headers - Optional header values to set for a blob returned wth this SAS + * @return { string } - The shared access signature + */ + generateSharedAccessSignature( + container, + blob, + sharedAccessPolicy, + headers + ) { + const sasToken = generateBlobSASQueryParameters( + { + containerName: container, + blobName: blob, + ...sharedAccessPolicy.AccessPolicy, + ...headers, + }, + this.sharedKeyCredential + ).toString(); + return sasToken; + } - fileReadStream(container = undefined, fileToGet = undefined) { - return (req, res, next) => { - let container = this.reportsContainer; - let fileToGet = req.params.slug.replace("__", "/") + "/" + req.params.filename; - logger.info({ msg: "Azure__StorageService - fileReadStream called for container " + container + " for file " + fileToGet }); - if (fileToGet.includes(".json")) { - const readStream = this.blobService.createReadStream(container, fileToGet); - readStream.pipe(res); - readStream.on("end", () => { - res.end(); - }); - readStream.on("error", (error) => { - if (error && error.statusCode === 404) { - logger.error({ msg: "Azure__StorageService : readStream error - Error with status code 404", error: error }); - const response = { - responseCode: "CLIENT_ERROR", - params: { - err: "CLIENT_ERROR", - status: "failed", - errmsg: "Blob not found", - }, - result: {}, - }; - res.status(404).send(this.apiResponse(response)); - } else { - logger.error({ msg: "Azure__StorageService : readStream error - Error 500", error: error }); - const response = { - responseCode: "SERVER_ERROR", - params: { - err: "SERVER_ERROR", - status: "failed", - errmsg: "Failed to display blob", - }, - result: {}, - }; - res.status(500).send(this.apiResponse(response)); - } + /** + * @description - Retrieves a blob or container URL + * @param { string } container - Container name + * @param { string } blob - Blob to be fetched + * @param { string } SASToken - Shared Access Signature token + * @return { string } - Formatted URL string + */ + getUrl(container, blob, SASToken) { + const blobClient = this.blobService + .getContainerClient(container) + .getBlobClient(blob); + return `${blobClient.url}?${SASToken}`; + } + + async getBlobProperties(request, callback) { + logger.info({ + msg: + "Azure__StorageService - getBlobProperties called for container " + + request.container + + " for file " + + request.file, }); - } else { + const blobClient = this.blobService + .getContainerClient(request.container) + .getBlobClient(request.file); + try { + const blobProperties = await blobClient.getProperties(); + if (blobProperties) { + blobProperties.reportname = request.reportname; + blobProperties.filename = request.file; + blobProperties.statusCode = 200; + callback(null, blobProperties); + } + } catch (error) { + logger.error({ + msg: "Azure__StorageService : readStream error - Error with status code 404", + }); + callback({ + msg: "NotFound", + statusCode: error.statusCode, + filename: request.file, + reportname: request.reportname, + }); + } + } + + async getFileAsText( + container = undefined, + fileToGet = undefined, + prefix = undefined, + callback + ) { + const blobClient = this.blobService + .getContainerClient(container) + .getBlobClient(fileToGet); + try { + const downloadResponse = await blobClient.download(0); + const textDecoder = new TextDecoder("utf-8"); + const content = []; + for await (const chunk of downloadResponse.readableStreamBody) { + content.push(textDecoder.decode(chunk)); + } + const text = content.join(""); + logger.info({ + msg: + "Azure__StorageService : getFileAsText success for container " + + container + + " for file " + + fileToGet, + }); + callback(null, text); + } catch (error) { + logger.error({ + msg: "Azure__StorageService : getFileAsText error => ", + error, + }); + delete error.request; + delete error.response; + delete error.details; + callback(error); + } + } + + upload(container, fileName, filePath, callback) { + throw new Error("AzureStorageService :: upload() must be implemented"); + } + + async getPreSignedUrl(container, fileName, prefix = undefined) { + if (prefix) { + fileName = prefix + fileName; + } + const presignedURL = await this.getSignedUrl( + container, + fileName, + globalConfig.exhaust_config.storage_url_expiry + ); + return presignedURL; + } + + /** + * @description - Generates a pre-signed URL for a specific operation on a file in Azure storage. + * @param {string} container - Azure container or bucket name. + * @param {string} filePath - Path to the file within the container. + * @param {number} expiresIn - Optional. Number of seconds before the pre-signed URL expires. + * @param {string} permission - Optional. The permission for the operation (e.g., READ, WRITE). + * @returns {Promise} - A promise that resolves to the pre-signed URL. + */ + getSignedUrl(container, filePath, expiresIn = 3600, permission = "") { let startDate = new Date(); let expiryDate = new Date(startDate); - expiryDate.setMinutes(startDate.getMinutes() + 3600); - startDate.setMinutes(startDate.getMinutes() - 3600); + expiryDate.setMinutes(startDate.getMinutes() + expiresIn); + startDate.setMinutes(startDate.getMinutes() - expiresIn); let sharedAccessPolicy = { - AccessPolicy: { - Permissions: azure.BlobUtilities.SharedAccessPermissions.READ, - Start: startDate, - Expiry: expiryDate, - }, + AccessPolicy: { + permissions: permission !== "" ? permission : READ, + startsOn: startDate, + expiresOn: expiryDate, + }, }; - this.fileExists(container, fileToGet, (err, resp) => { - if (err || !_.get(resp, "exists")) { - logger.error({ msg: "Azure__StorageService : doesBlobExist error - Error with status code 404", error: err }); - const response = { - responseCode: "CLIENT_ERROR", - params: { - err: "CLIENT_ERROR", - status: "failed", - errmsg: "Blob not found", - }, - result: {}, - }; - res.status(404).send(this.apiResponse(response)); - } else { - let azureHeaders = {}; - if (req.headers["content-disposition"] == "attachment" && req.headers.filename) azureHeaders.contentDisposition = `attachment;filename=${req.headers.filename}`; - let token = this.generateSharedAccessSignature(container, fileToGet, sharedAccessPolicy, azureHeaders); - let sasUrl = this.getUrl(container, fileToGet, token); - const response = { - responseCode: "OK", - params: { - err: null, - status: "success", - errmsg: null, - }, - result: { - signedUrl: sasUrl, - }, - }; - res.status(200).send(this.apiResponse(response)); - } - }); - } - }; - } - - async getBlobProperties(request, callback) { - logger.info({ msg: "Azure__StorageService - getBlobProperties called for container " + request.container + " for file " + request.file }); - this.blobService.getBlobProperties(request.container, request.file, function (err, result, response) { - if (err) { - logger.error({ msg: "Azure__StorageService : readStream error - Error with status code 404" }); - callback({ msg: err.message, statusCode: err.statusCode, filename: request.file, reportname: request.reportname }); - } else if (!response.isSuccessful) { - console.error("Blob %s wasn't found container %s", file, request.container); - callback({ msg: err.message, statusCode: err.statusCode, filename: request.file, reportname: request.reportname }); - } else { - result.reportname = request.reportname; - result.statusCode = 200; - callback(null, result); - } - }); - } + let azureHeaders = {}; + let token = this.generateSharedAccessSignature( + container, + filePath, + sharedAccessPolicy, + azureHeaders + ); + let sasUrl = this.getUrl(container, filePath, token); + return Promise.resolve(sasUrl); + } - getFileProperties(container = undefined, fileToGet = undefined) { - return (req, res, next) => { - const container = this.reportsContainer; - const fileToGet = JSON.parse(req.query.fileNames); - logger.info({ msg: "Azure__StorageService - getFileProperties called for container " + container + " for file " + fileToGet }); - const responseData = {}; - if (Object.keys(fileToGet).length > 0) { - const getBlogRequest = []; - for (const [key, file] of Object.entries(fileToGet)) { - const req = { - container: container, - file: file, - reportname: key, - }; - getBlogRequest.push( - async.reflect((callback) => { - this.getBlobProperties(req, callback); - }) - ); - } - async.parallel(getBlogRequest, (err, results) => { - if (results) { - results.forEach((blob) => { - if (blob.error) { - responseData[_.get(blob, "error.reportname")] = blob.error; - } else { - responseData[_.get(blob, "value.reportname")] = { - lastModified: _.get(blob, "value.lastModified"), - reportname: _.get(blob, "value.reportname"), - statusCode: _.get(blob, "value.statusCode"), - fileSize: _.get(blob, "value.contentLength"), - }; - } - }); - const finalResponse = { - responseCode: "OK", - params: { - err: null, - status: "success", - errmsg: null, - }, - result: responseData, - }; - res.status(200).send(this.apiResponse(finalResponse)); - } - }); - } - }; - } + /** + * @description - Generates a pre-signed URL for downloading a file from the Azure storage. + * @param {string} container - Azure container or bucket name. + * @param {string} filePath - Path to the file within the container. + * @param {number} expiresIn - Optional. Number of seconds before the pre-signed URL expires. + * @returns {Promise} - A promise that resolves to the downloadable URL. + */ + getDownloadableUrl(container, filePath, expiresIn = 3600) { + let startDate = new Date(); + let expiryDate = new Date(startDate); + expiryDate.setMinutes(startDate.getMinutes() + expiresIn); + let sharedAccessPolicy = { + AccessPolicy: { + permissions: READ, + startsOn: startDate, + expiresOn: expiryDate, + }, + }; + let azureHeaders = {}; + let token = this.generateSharedAccessSignature( + container, + filePath, + sharedAccessPolicy, + azureHeaders + ); + let downloadableUrl = this.getUrl(container, filePath, token); + return Promise.resolve(downloadableUrl); + } - getFileAsText(container = undefined, fileToGet = undefined, callback) { - this.blobService.getBlobToText(container, fileToGet, (error, result, response) => { - if (error) { - logger.error({ msg: "Azure__StorageService : getFileAsText error => ", error }); - callback(error); - } else if (result) { - logger.info({ msg: "Azure__StorageService : getFileAsText success for container " + container + " for file " + fileToGet }); - callback(null, result); - } else if (response) { - callback(null, null, response); - logger.info({ - msg: "Azure__StorageService : getFileAsText success response for container " + container + " for file " + fileToGet + " response " + response, - }); - } - }); - } + /** + * @description - Generates a ingestion specification for a file. + * @param {string} container - Bucket name. + * @param {string} filePath - Path to the file in the bucket. + * @returns {Promise} - A Promise that resolves to the Druid ingestion specification. + */ + getFileUrlForIngestion(container, filePath) { + let druidSpec = { + type: "azure", + uris: [`azure://${container}/${filePath}`], + }; + return Promise.resolve(druidSpec); + } - blockStreamUpload(uploadContainer = undefined) { - return (req, res) => { - try { - const blobFolderName = new Date().toLocaleDateString(); - let form = new multiparty.Form(); - form.on("part", (part) => { - if (part.filename) { - var size = part.byteCount; - var name = `${_.get(req, "query.deviceId")}_${Date.now()}.${_.get(part, "filename")}`; - logger.info({ - msg: "Azure__StorageService : blockStreamUpload Uploading file to container " + uploadContainer + " to folder " + blobFolderName + " for file name " + name + " with size " + size, - }); - this.blobService.createBlockBlobFromStream(uploadContainer, `${blobFolderName}/${name}`, part, size, (error) => { - if (error && error.statusCode === 403) { - const response = { - responseCode: "FORBIDDEN", - params: { - err: "FORBIDDEN", - status: "failed", - errmsg: "Unable to authorize to azure blob", - }, - result: req.file, - }; - logger.error({ - msg: "Azure__StorageService : blockStreamUpload Unable to authorize to azure blob for uploading desktop crash logs", - error: error, - }); - return res.status(403).send(this.apiResponse(response, "api.desktop.upload.crash.log")); - } else if (error) { - const response = { - responseCode: "SERVER_ERROR", - params: { - err: "SERVER_ERROR", - status: "failed", - errmsg: "Failed to upload to blob", - }, - result: {}, - }; - logger.error({ - msg: "Azure__StorageService : blockStreamUpload Failed to upload desktop crash logs to blob", - error: error, - }); - return res.status(500).send(this.apiResponse(response, "api.desktop.upload.crash.log")); - } else { - const response = { - responseCode: "OK", - params: { - err: null, - status: "success", - errmsg: null, - }, - result: { - message: "Successfully uploaded to blob", - }, - }; - return res.status(200).send(this.apiResponse(response, "api.desktop.upload.crash.log")); - } + /** + * @description - Function to get file download URLs from S3 bucket + * @param {string} container - Bucket name to fetch the files from + * @param {Array} filesList - List of file keys obtained for generating signed urls for download + */ + async getFilesSignedUrls(container, filesList) { + const signedUrlsPromises = filesList.map((fileNameWithPrefix) => { + return new Promise(async (resolve, reject) => { + const presignedURL = await this.getPreSignedUrl( + container, + fileNameWithPrefix + ); + const fileName = fileNameWithPrefix.split("/").pop(); + resolve({ [fileName]: presignedURL }); }); - } }); - form.parse(req); - } catch (error) { - const response = { - responseCode: "SERVER_ERROR", - params: { - err: "SERVER_ERROR", - status: "failed", - errmsg: "Failed to upload to blob", - }, - result: {}, - }; - logger.error({ - msg: "Azure__StorageService : blockStreamUpload Failed to upload desktop crash logs to blob", - error: error, + const signedUrlsList = await Promise.all(signedUrlsPromises); + const periodWiseFiles = {}; + const files = []; + // Formatting response + signedUrlsList.map(async (fileObject) => { + const fileDetails = _.keys(fileObject); + const fileUrl = _.values(fileObject)[0]; + const period = getFileKey(fileDetails[0]); + if (_.has(periodWiseFiles, period)) + periodWiseFiles[period].push(fileUrl); + else { + periodWiseFiles[period] = []; + periodWiseFiles[period].push(fileUrl); + } + files.push(fileUrl); }); - return res.status(500).send(this.apiResponse(response, "api.desktop.upload.crash.log")); - } - }; - } + return { + expiresAt: moment() + .add(globalConfig.exhaust_config.storage_url_expiry, "seconds") + .toISOString(), + files, + periodWiseFiles, + }; + } - apiResponse({ responseCode, result, params: { err, errmsg, status } }, id = "api.report") { - return { - id: id, - ver: "1.0", - ts: dateFormat(new Date(), "yyyy-mm-dd HH:MM:ss:lo"), - params: { - resmsgid: uuidv1(), - msgid: null, - status: status, - err: err, - errmsg: errmsg, - }, - responseCode: responseCode, - result: result, - }; - } + /** + * @description - Function to get file names from container for a specific date range + * @param {string} container - container name to fetch the files from + * @param {string} container_prefix - Prefix of the path if the files are nested + * @param {string} type - Folder name/Type of data to fetch the files for + * @param {string} dateRange - Range of time interval, to get the files for + * @param {string} datasetId - Dataset Id to fetch the files for + */ + async filterDataByRange( + container, + container_prefix, + type, + dateRange, + datasetId + ) { + let startDate = moment(dateRange.from); + let endDate = moment(dateRange.to); + let result = []; + let promises = []; + for ( + let analysisDate = startDate; + analysisDate <= endDate; + analysisDate = analysisDate.add(1, "days") + ) { + promises.push( + new Promise(async (resolve, reject) => { + const pathPrefix = `${container_prefix}/${type}/${datasetId}/${analysisDate.format( + "YYYY-MM-DD" + )}`; + try { + const items = this.containerClient.listBlobsByHierarchy( + "/", + { prefix: pathPrefix } + ); + for await (const item of items) { + if (item && item.kind === "blob") resolve(item); + else resolve(null); + return; + } + } catch (err) { + console.log( + `Unable to list the blobs present in directory ${pathPrefix}` + ); + console.log(err); + reject(err); + return; + } + }) + ); + } + try { + result = await Promise.all(promises); + if(result.length > 0) result = result.map((item) => item.name); + return result; + } catch (err) { + console.log(err); + return []; + } + } - upload(container, fileName, filePath, options = {}, callback) { - this.blobService.createBlockBlobFromLocalFile(container, fileName, filePath, options, (error, result, response) => { - if (error) { - logger.error({ msg: "Azure__StorageService : upload error => ", error }); - callback(error); - } else if (result) { - logger.info({ msg: "Azure__StorageService : upload success for container " + container + " for file " + fileName }); - callback(null, result); - } else if (response) { - callback(null, null, response); - logger.info({ - msg: "Azure__StorageService : upload success response for container " + container + " for file " + filename + " response " + response, - }); - } - }); - } + /** + * @description - Function to get file names S3 bucket for a specific date range + * @param {String} container - Bucket name to fetch the files from + * @param {String} container_prefix - Prefix of the path if the files are nested + * @param {String} type - Folder name/Type of data to fetch the files for + * @param {String} dateRange - Range of time interval, to get the files for + * @param {String} datasetId - Dataset Id to fetch the files for + */ + async getFiles(container, container_prefix, type, dateRange, datasetId) { + const filesList = await this.filterDataByRange( + container, + container_prefix, + type, + dateRange, + datasetId + ); + const signedUrlsList = await this.getFilesSignedUrls( + container, + filesList + ); + return signedUrlsList; + } } + module.exports = AzureStorageService; diff --git a/api-service/src/lib/client-cloud-services/GCPStorageService.js b/api-service/src/lib/client-cloud-services/GCPStorageService.js index 8c77195b..4efb2245 100644 --- a/api-service/src/lib/client-cloud-services/GCPStorageService.js +++ b/api-service/src/lib/client-cloud-services/GCPStorageService.js @@ -164,7 +164,7 @@ class GCPStorageService extends BaseStorageService { } async.parallel(getBlogRequest, (err, results) => { if (results) { - results.forEach((blob) => { + results.map((blob) => { if (blob.error) { responseData[_.get(blob, "error.reportname")] = blob.error; } else { diff --git a/api-service/src/lib/client-cloud-services/index.js b/api-service/src/lib/client-cloud-services/index.js index b62cd9c0..abf31642 100644 --- a/api-service/src/lib/client-cloud-services/index.js +++ b/api-service/src/lib/client-cloud-services/index.js @@ -7,7 +7,7 @@ * @version - 1.0.0 */ -// const AzureStorageService = require("./AzureStorageService"); +const AzureStorageService = require("./AzureStorageService"); const AWSStorageService = require("./AWSStorageService"); const GCPStorageService = require("./GCPStorageService"); @@ -18,9 +18,9 @@ const GCPStorageService = require("./GCPStorageService"); function init(provider) { switch (provider) { - // case "azure": - // return AzureStorageService; - // break; + case "azure": + return AzureStorageService; + break; case "aws": return AWSStorageService; break; diff --git a/api-service/src/lib/services/TelemetryService.js b/api-service/src/lib/services/TelemetryService.js index 812c836b..1e81e6df 100644 --- a/api-service/src/lib/services/TelemetryService.js +++ b/api-service/src/lib/services/TelemetryService.js @@ -1,4 +1,4 @@ -const uuidv1 = require("uuid/v1"), +const uuidv5 = require("uuid/v5"), // request = require("request"), DispatcherClass = require("../dispatcher/dispatcher").Dispatcher; const axios = require('axios').default; @@ -18,8 +18,14 @@ class TelemetryService { message.did = req.get("x-device-id"); message.channel = req.get("x-channel-id"); message.pid = req.get("x-app-id"); - if (!message.mid) message.mid = uuidv1(); + if (!message.mid) message.mid = uuidv5(); message.syncts = new Date().getTime(); + + // add obsrv meta + const source = {meta: {id: "", connector_type: "api", version: config.version, entry_source: "api"}, trace_id: uuidv5()}; + const obsrvMeta = {syncts: new Date().getTime(), processingStartTime: new Date().getTime(), flags: {}, timespans: {}, error: {}, source: source}; + message.obsrv_meta = obsrvMeta; + const data = JSON.stringify(message); if (this.config.localStorageEnabled === "true" || this.config.telemetryProxyEnabled === "true") { if (this.config.localStorageEnabled === "true" && this.config.telemetryProxyEnabled !== "true") { diff --git a/api-service/src/resources/Constants.json b/api-service/src/resources/Constants.json index 035787fe..d0c4fef4 100644 --- a/api-service/src/resources/Constants.json +++ b/api-service/src/resources/Constants.json @@ -40,11 +40,26 @@ "status": 400, "code": "BAD_REQUEST" }, + "FAILED_RECORD_CREATE": { + "message": "Failed to save Record", + "status": 400, + "code": "BAD_REQUEST" + }, + "FAILED_RECORD_FETCH": { + "message": "Failed to fetch Records", + "status": 400, + "code": "BAD_REQUEST" + }, "EMPTY_DATASET_ID": { "message": "datasetId parameter in url cannot be empty", "status": 400, "code": "BAD_REQUEST" }, + "INVALID_DATASET_CONFIG": { + "message": "Invalid dataset config provided for ingestion", + "status": 400, + "code": "BAD_REQUEST" + }, "RECORD_SAVED": "The Record has been saved successfully", "RECORD_UPDATED": "The Record has been update successfully", "TABLE_NOT_FOUND": { diff --git a/api-service/src/services/DataSourceService.ts b/api-service/src/services/DataSourceService.ts index 6b114e2c..d0366d10 100644 --- a/api-service/src/services/DataSourceService.ts +++ b/api-service/src/services/DataSourceService.ts @@ -1,34 +1,31 @@ import { Request, Response, NextFunction } from "express"; -import httpStatus from "http-status"; import _ from 'lodash' import { Datasources } from "../helpers/Datasources"; import { IConnector } from "../models/IngestionModels"; -import { findAndSetExistingRecord, setAuditState } from "./telemetry"; +import { findAndSetExistingRecord } from "./telemetry"; import { DbUtil } from "../helpers/DbUtil"; import constants from "../resources/Constants.json"; import { ingestorService } from "../routes/Router"; +import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler"; export class DataSourceService { private table: string private dbConnector: IConnector; private dbUtil: DbUtil + private errorHandler: ErrorResponseHandler; constructor(dbConnector: IConnector, table: string) { this.dbConnector = dbConnector this.table = table this.dbUtil = new DbUtil(dbConnector, table) + this.errorHandler = new ErrorResponseHandler("DataSourceService"); } + public save = async (req: Request, res: Response, next: NextFunction) => { try { const datasources = new Datasources(req.body) const payload: any = datasources.setValues() await this.validateDatasource(payload) await this.dbUtil.save(req, res, next, payload) - } - catch (error: any) { - console.error(error.message) - setAuditState("failed", req); - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); - - } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error) } } public update = async (req: Request, res: Response, next: NextFunction) => { try { @@ -37,32 +34,20 @@ export class DataSourceService { await this.validateDatasource(payload) await findAndSetExistingRecord({ dbConnector: this.dbConnector, table: this.table, request: req, filters: { "id": payload.id }, object: { id: payload.id, type: "datasource" } }); await this.dbUtil.upsert(req, res, next, payload) - } - catch (error: any) { - console.error(error.message) - setAuditState("failed", req); - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); - } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error) } } public read = async (req: Request, res: Response, next: NextFunction) => { try { let status: any = req.query.status || "ACTIVE" const id = req.params.datasourceId await this.dbUtil.read(req, res, next, { id, status }) - } - catch (error: any) { - console.error(error.message) - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); - } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) } } public list = async (req: Request, res: Response, next: NextFunction) => { try { const payload = req.body await this.dbUtil.list(req, res, next, payload) - } catch (error: any) { - console.error(error.message) - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); - } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) } } public validateDatasource = async (payload: Record) => { let datasetRecord = await ingestorService.getDatasetConfig(payload.dataset_id); @@ -78,4 +63,4 @@ export class DataSourceService { } } -} \ No newline at end of file +} diff --git a/api-service/src/services/DatasetService.ts b/api-service/src/services/DatasetService.ts index b726c7d5..82b62477 100644 --- a/api-service/src/services/DatasetService.ts +++ b/api-service/src/services/DatasetService.ts @@ -1,32 +1,29 @@ import { Request, Response, NextFunction } from "express"; -import { ResponseHandler } from "../helpers/ResponseHandler"; -import httpStatus from "http-status"; import _ from 'lodash' import { Datasets } from "../helpers/Datasets"; import { IConnector } from "../models/IngestionModels"; -import { findAndSetExistingRecord, setAuditState } from "./telemetry"; +import { findAndSetExistingRecord } from "./telemetry"; import { DbUtil } from "../helpers/DbUtil"; import { refreshDatasetConfigs } from "../helpers/DatasetConfigs"; +import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler"; export class DatasetService { private table: string private dbConnector: IConnector; - private dbUtil: DbUtil + private dbUtil: DbUtil; + private errorHandler: ErrorResponseHandler; constructor(dbConnector: IConnector, table: string) { this.dbConnector = dbConnector this.table = table this.dbUtil = new DbUtil(dbConnector, table) + this.errorHandler = new ErrorResponseHandler("DatasetService"); } + public save = async (req: Request, res: Response, next: NextFunction) => { try { const dataset = new Datasets(req.body) const payload: any = dataset.setValues() await this.dbUtil.save(req, res, next, payload) - } - catch (error: any) { - console.error(error.message) - setAuditState("failed", req); - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); - } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error) } } public update = async (req: Request, res: Response, next: NextFunction) => { try { @@ -35,33 +32,19 @@ export class DatasetService { await findAndSetExistingRecord({ dbConnector: this.dbConnector, table: this.table, request: req, filters: { "id": payload.id }, object: { id: payload.id, type: "dataset" } }); await this.dbUtil.update(req, res, next, payload) await refreshDatasetConfigs() - } - catch (error: any) { - console.log(error.message) - setAuditState("failed", req); - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); - } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error) } } public read = async (req: Request, res: Response, next: NextFunction) => { try { let status: any = req.query.status || "ACTIVE" const id = req.params.datasetId  await this.dbUtil.read(req, res, next, { id, status }) - } - catch (error: any) { - console.log(error.message) - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); - } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) } } public list = async (req: Request, res: Response, next: NextFunction) => { - try{ - const payload = req.body - await this.dbUtil.list(req, res, next, payload) - - } - catch (error: any) { - console.log(error.message) - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); + try { + const payload = req.body + await this.dbUtil.list(req, res, next, payload) + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) } } } -} \ No newline at end of file diff --git a/api-service/src/services/DatasetSourceConfigService.ts b/api-service/src/services/DatasetSourceConfigService.ts index 8ca052b0..cc80faab 100644 --- a/api-service/src/services/DatasetSourceConfigService.ts +++ b/api-service/src/services/DatasetSourceConfigService.ts @@ -1,31 +1,29 @@ import { Request, Response, NextFunction } from "express"; -import httpStatus from "http-status"; import _ from 'lodash' import { DatasetSourceConfigs } from "../helpers/DatasetSourceConfigs"; import { IConnector } from "../models/IngestionModels"; import { DbUtil } from "../helpers/DbUtil"; -import { findAndSetExistingRecord, setAuditState } from "./telemetry"; +import { findAndSetExistingRecord } from "./telemetry"; +import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler"; export class DatasetSourceConfigService { private table: string private dbConnector: IConnector; private dbUtil: DbUtil + private errorHandler: ErrorResponseHandler; constructor(dbConnector: IConnector, table: string) { this.dbConnector = dbConnector this.table = table this.dbUtil = new DbUtil(dbConnector, table) + this.errorHandler = new ErrorResponseHandler("DatasetSourceConfigService"); } + public save = async (req: Request, res: Response, next: NextFunction) => { try { const datasetSourceConfig = new DatasetSourceConfigs(req.body) const payload: any = datasetSourceConfig.setValues() await this.dbUtil.save(req, res, next, payload) - } - catch (error: any) { - console.error(error.message) - setAuditState("failed", req); - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); - } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error) } } public update = async (req: Request, res: Response, next: NextFunction) => { try { @@ -33,33 +31,19 @@ export class DatasetSourceConfigService { const payload: Record = datasetSourceConfig.setValues() await findAndSetExistingRecord({ dbConnector: this.dbConnector, table: this.table, request: req, filters: { "id": payload.id }, object: { id: payload.id, type: "datasetSourceConfig" } }); await this.dbUtil.upsert(req, res, next, payload) - } - catch (error: any) { - console.error(error.message) - setAuditState("failed", req); - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); - } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error) } } public read = async (req: Request, res: Response, next: NextFunction) => { try { let status: any = req.query.status || "ACTIVE" const id = req.params.datasetId await this.dbUtil.read(req, res, next, { id, status }) - } - catch (error: any) { - console.error(error.message) - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); - } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) } } public list = async (req: Request, res: Response, next: NextFunction) => { try { const payload = req.body await this.dbUtil.list(req, res, next, payload) - } - catch (error: any) { - console.error(error.message) - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message, errCode: error.code || httpStatus["500_NAME"] }); - } - + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) } } -} \ No newline at end of file +} diff --git a/api-service/src/services/IngestorService.ts b/api-service/src/services/IngestorService.ts index b8064200..84414665 100644 --- a/api-service/src/services/IngestorService.ts +++ b/api-service/src/services/IngestorService.ts @@ -2,19 +2,17 @@ import { Request, Response, NextFunction } from "express"; import constants from "../resources/Constants.json" import { ResponseHandler } from "../helpers/ResponseHandler"; import _ from 'lodash' -import httpStatus from "http-status"; import { globalCache } from "../routes/Router"; import { refreshDatasetConfigs } from "../helpers/DatasetConfigs"; import { IConnector } from "../models/DatasetModels"; -import { config } from '../configs/Config' -import { AxiosInstance } from "axios"; import { wrapperService } from "../routes/Router"; +import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler"; export class IngestorService { private kafkaConnector: IConnector; - private httpConnector: AxiosInstance - constructor(kafkaConnector: IConnector, httpConnector: IConnector) { + private errorHandler: ErrorResponseHandler; + constructor(kafkaConnector: IConnector, httpConnector: IConnector,) { this.kafkaConnector = kafkaConnector - this.httpConnector = httpConnector.connect() + this.errorHandler = new ErrorResponseHandler("IngestorService"); this.init() } public init() { @@ -26,29 +24,23 @@ export class IngestorService { console.log("error while connecting to kafka", error.message) }) } + public create = async (req: Request, res: Response, next: NextFunction) => { try { const datasetId = this.getDatasetId(req); + const validData = await this.validateData(req.body.data, datasetId); req.body = { ...req.body.data, dataset: datasetId }; const topic = await this.getTopic(datasetId); await this.kafkaConnector.execute(req, res, topic); ResponseHandler.successResponse(req, res, { status: 200, data: { message: constants.DATASET.CREATED } }); - } catch (error: any) { - console.error(error.message) - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: error.message || "", errCode: error.code || httpStatus["500_NAME"] }); - } - + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) } } public submitIngestion = async (req: Request, res: Response, next: NextFunction) => { try { await wrapperService.submitIngestion(req.body) ResponseHandler.successResponse(req, res, { status: 200, data: { message: constants.INGESTION_SUBMITTED } }); } - catch (error: any) { - let errorMessage = error?.response?.data?.error || "Internal Server Error" - console.error(errorMessage) - next({ statusCode: error.status || httpStatus.INTERNAL_SERVER_ERROR, message: errorMessage, errCode: error.code || httpStatus[ "500_NAME" ] }); - } + catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) } } private getDatasetId(req: Request) { let datasetId = req.params.datasetId.trim() @@ -74,11 +66,26 @@ export class IngestorService { private async getTopic(datasetId: string) { const datasetRecord = await this.getDatasetConfig(datasetId); - if (!datasetRecord) { - throw constants.DATASET_ID_NOT_FOUND; + if (!datasetRecord) throw constants.DATASET_ID_NOT_FOUND; + return datasetRecord.dataset_config.entry_topic; + } + + private async validateData(data: any, datasetId: string) { + const datasetRecord = await this.getDatasetConfig(datasetId); + if (!datasetRecord) throw constants.DATASET_ID_NOT_FOUND; + if(_.has(datasetRecord, "extraction_config") && _.get(datasetRecord, ["extraction_config", "is_batch_event"])) { + if( + _.has(data, _.get(datasetRecord, ["extraction_config", "extraction_key"])) && + _.has(data, _.get(datasetRecord, ["extraction_config", "batch_id"])) + ) + return data; + else if (_.has(data, "event")) + return data; + else throw constants.INVALID_DATASET_CONFIG; } else { - return datasetRecord.dataset_config.entry_topic; + if(_.has(data, "event")) + return data; + else throw constants.INVALID_DATASET_CONFIG; } } - } diff --git a/api-service/src/services/QueryService.ts b/api-service/src/services/QueryService.ts index a636d222..fc5f7668 100644 --- a/api-service/src/services/QueryService.ts +++ b/api-service/src/services/QueryService.ts @@ -1,22 +1,17 @@ import { AxiosInstance } from "axios"; import { NextFunction, Request, Response } from "express"; -import errorResponse from "http-errors"; -import httpStatus from "http-status"; import _ from "lodash"; import { config } from "../configs/Config"; import { ResponseHandler } from "../helpers/ResponseHandler"; import { IConnector } from "../models/DatasetModels"; +import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler"; export class QueryService { private connector: AxiosInstance; + private errorHandler: ErrorResponseHandler; constructor(connector: IConnector) { - this.connector = connector.connect();; - } - - private handleError = (error: any, next: NextFunction) => { - console.error(error.message); - console.log(error.data); - next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message)); + this.connector = connector.connect(); + this.errorHandler = new ErrorResponseHandler("QueryService"); } public executeNativeQuery = async (req: Request, res: Response, next: NextFunction) => { @@ -30,13 +25,13 @@ export class QueryService { } ResponseHandler.successResponse(req, res, { status: result.status, data: _.flatten(mergedResult) }); - } catch (error: any) { this.handleError(error, next); } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false); } }; public executeSqlQuery = async (req: Request, res: Response, next: NextFunction) => { try { const result = await this.connector.post(config.query_api.druid.sql_query_path, req.body.querySql); ResponseHandler.successResponse(req, res, { status: result.status, data: result.data }); - } catch (error: any) { this.handleError(error, next); } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false); } } -} \ No newline at end of file +} diff --git a/api-service/src/services/WrapperService.ts b/api-service/src/services/WrapperService.ts index 03d0be96..22b5d650 100644 --- a/api-service/src/services/WrapperService.ts +++ b/api-service/src/services/WrapperService.ts @@ -1,19 +1,15 @@ import axios from "axios"; import { NextFunction, Request, Response } from "express"; -import errorResponse from "http-errors"; -import httpStatus from "http-status"; import _ from "lodash"; import { config } from "../configs/Config"; import { ResponseHandler } from "../helpers/ResponseHandler"; +import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler"; export class WrapperService { - constructor() { } - - private handleError = (error: any, next: NextFunction) => { - console.error(error.message); - console.log(error.data); - next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message)); - }; + private errorHandler: ErrorResponseHandler; + constructor() { + this.errorHandler = new ErrorResponseHandler("WrapperService"); + } public forwardSql = async ( req: Request, @@ -30,7 +26,7 @@ export class WrapperService { } ); ResponseHandler.flatResponse(req, res, result); - } catch (error: any) { this.handleError(error, next); } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false); } }; public forwardNative = async ( @@ -47,7 +43,7 @@ export class WrapperService { req.body, { headers, } ); ResponseHandler.flatResponse(req, res, result); - } catch (error: any) { this.handleError(error, next); } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error); } }; public forwardNativeDel = async ( @@ -66,7 +62,7 @@ export class WrapperService { } ); ResponseHandler.flatResponse(req, res, result); - } catch (error: any) { this.handleError(error, next); } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false); } }; public forwardNativeGet = async ( @@ -86,7 +82,7 @@ export class WrapperService { } ); ResponseHandler.flatResponse(req, res, result); - } catch (error: any) { this.handleError(error, next); } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false); } }; public nativeStatus = async ( @@ -100,10 +96,10 @@ export class WrapperService { `${config.query_api.druid.host}:${config.query_api.druid.port}/status` ); ResponseHandler.flatResponse(req, res, result); - } catch (error: any) { this.handleError(error, next); } + } catch (error: any) { this.errorHandler.handleError(req, res, next, error, false); } }; public submitIngestion = async (ingestionSpec: object) => { return await axios.post(`${config.query_api.druid.host}:${config.query_api.druid.port}/${config.query_api.druid.submit_ingestion}`, ingestionSpec) - } + } } diff --git a/api-service/src/services/telemetry.ts b/api-service/src/services/telemetry.ts index e25777d4..7abfd3ef 100644 --- a/api-service/src/services/telemetry.ts +++ b/api-service/src/services/telemetry.ts @@ -70,7 +70,7 @@ const transformProps = (body: Record) => { export const setAuditState = (state: string, req: Request) => { if (state && req) { - req.auditEvent.toState = state; + _.set(req.auditEvent, "toState", state); } } @@ -153,4 +153,4 @@ export const findAndSetExistingRecord = async ({ dbConnector, table, filters, re console.log(error); } } -} \ No newline at end of file +} diff --git a/api-service/src/test/Fixtures.ts b/api-service/src/test/Fixtures.ts index 4f98645a..44e04971 100644 --- a/api-service/src/test/Fixtures.ts +++ b/api-service/src/test/Fixtures.ts @@ -36,6 +36,7 @@ class TestDruidQuery { } class TestDataIngestion { + public static SAMPLE_INDIVIDUAL_EVENT = { "data": { "event": { "context": { "transaction_id": "3d3bac46-d252-4da0-9290-afdd524d0214", "country": "IND", "bpp_id": "becknify.humbhionline.in.mobility.BPP/beckn_open/app1-succinct-in", "city": "std:080", "message_id": "52dcf5a9-8986-47ff-a9d0-f380b23e3dfe", "core_version": "0.9.1", "ttl": "PT1M", "bap_id": "mobilityreferencebap.becknprotocol.io", "domain": "nic2004:60221", "bpp_uri": "https://becknify.humbhionline.in/mobility/beckn_open/app1-succinct-in/bpp", "action": "on_status", "bap_uri": "https://mobilityreferencebap.becknprotocol.io", "timestamp": "2023-02-22T19:06:27.887Z" }, "message": { "order": { "quote": { "breakup": [ { "price": { "currency": "INR", "value": "58.2936244525222" }, "type": "item", "title": "Fare" }, { "price": { "currency": "INR", "value": "10.492852401453995" }, "type": "item", "title": "Tax" } ], "price": { "currency": "INR", "value": "68.7864768539762" } }, "provider": { "locations": [ { "gps": "12.973437,77.608771", "id": "./mobility/ind.blr/17@taxi.becknprotocol.io.provider_location" } ], "id": "./mobility/ind.blr/7@taxi.becknprotocol.io.provider", "descriptor": { "images": [ "https://taxi.becknprotocol.io/companies/view/7" ], "name": "Best Taxies" }, "categories": [ { "id": "./mobility/ind.blr/1@taxi.becknprotocol.io.category", "descriptor": { "name": "Premium Taxi" } } ], "items": [ { "category_id": "./mobility/ind.blr/1@taxi.becknprotocol.io.category", "price": { "currency": "INR", "value": "68.7864768539762" }, "descriptor": { "images": [ "https://taxi.becknprotocol.io/resources/images/car.png" ], "code": "Premium Taxi-FuelType:Diesel,Make:Maruti,NameOfModel:Brezza,VehicleType:Premium Taxi", "name": "Premium Taxi-FuelType:Diesel,Make:Maruti,NameOfModel:Brezza,VehicleType:Premium Taxi" }, "id": "./mobility/ind.blr/17@taxi.becknprotocol.io.item", "fulfillment_id": "./mobility/ind.blr/6285@taxi.becknprotocol.io.fulfillment", "tags": { "NameOfModel": "Brezza", "VehicleType": "Premium Taxi", "Make": "Maruti", "FuelType": "Diesel" } } ] }, "id": "./mobility/ind.blr/6285@taxi.becknprotocol.io.order", "state": "Awaiting Driver acceptance", "fulfillment": { "agent": { "phone": "+919082233441", "name": "Michel MJ" }, "start": { "location": { "gps": "12.973437,77.608771" } }, "end": { "location": { "gps": "12.935193,77.624481" } }, "id": "./mobility/ind.blr/6285@taxi.becknprotocol.io.fulfillment", "vehicle": { "registration": "KA 05 3456" }, "customer": { "person": { "name": "./Rajat/Mr./Rajat/ /Kumar/" }, "contact": { "phone": "+919867654322", "email": "er.rjtkumar@gmail.com" } } }, "items": [ { "category_id": "./mobility/ind.blr/1@taxi.becknprotocol.io.category", "price": { "currency": "INR", "value": "68.7864768539762" }, "descriptor": { "images": [ "https://taxi.becknprotocol.io/resources/images/car.png" ], "code": "Premium Taxi-FuelType:Diesel,Make:Maruti,NameOfModel:Brezza,VehicleType:Premium Taxi", "name": "Premium Taxi-FuelType:Diesel,Make:Maruti,NameOfModel:Brezza,VehicleType:Premium Taxi" }, "id": "./mobility/ind.blr/17@taxi.becknprotocol.io.item", "fulfillment_id": "./mobility/ind.blr/6285@taxi.becknprotocol.io.fulfillment", "tags": { "NameOfModel": "Brezza", "VehicleType": "Premium Taxi", "Make": "Maruti", "FuelType": "Diesel" } } ], "billing": { "address": { "country": "IND", "door": "MBT", "city": "std:080", "area_code": "560078", "name": "RajatKumar", "locality": "", "building": ",A33" }, "phone": "+919867654322", "name": "./Rajat/Mr./Rajat/ /Kumar/", "email": "er.rjtkumar@gmail.com" } } } } }} public static SAMPLE_INPUT = { "data": { "id": "beckn-batch-1", "events": [ { "context": { "transaction_id": "3d3bac46-d252-4da0-9290-afdd524d0214", "country": "IND", "bpp_id": "becknify.humbhionline.in.mobility.BPP/beckn_open/app1-succinct-in", "city": "std:080", "message_id": "52dcf5a9-8986-47ff-a9d0-f380b23e3dfe", "core_version": "0.9.1", "ttl": "PT1M", "bap_id": "mobilityreferencebap.becknprotocol.io", "domain": "nic2004:60221", "bpp_uri": "https://becknify.humbhionline.in/mobility/beckn_open/app1-succinct-in/bpp", "action": "on_status", "bap_uri": "https://mobilityreferencebap.becknprotocol.io", "timestamp": "2023-02-22T19:06:27.887Z" }, "message": { "order": { "quote": { "breakup": [ { "price": { "currency": "INR", "value": "58.2936244525222" }, "type": "item", "title": "Fare" }, { "price": { "currency": "INR", "value": "10.492852401453995" }, "type": "item", "title": "Tax" } ], "price": { "currency": "INR", "value": "68.7864768539762" } }, "provider": { "locations": [ { "gps": "12.973437,77.608771", "id": "./mobility/ind.blr/17@taxi.becknprotocol.io.provider_location" } ], "id": "./mobility/ind.blr/7@taxi.becknprotocol.io.provider", "descriptor": { "images": [ "https://taxi.becknprotocol.io/companies/view/7" ], "name": "Best Taxies" }, "categories": [ { "id": "./mobility/ind.blr/1@taxi.becknprotocol.io.category", "descriptor": { "name": "Premium Taxi" } } ], "items": [ { "category_id": "./mobility/ind.blr/1@taxi.becknprotocol.io.category", "price": { "currency": "INR", "value": "68.7864768539762" }, "descriptor": { "images": [ "https://taxi.becknprotocol.io/resources/images/car.png" ], "code": "Premium Taxi-FuelType:Diesel,Make:Maruti,NameOfModel:Brezza,VehicleType:Premium Taxi", "name": "Premium Taxi-FuelType:Diesel,Make:Maruti,NameOfModel:Brezza,VehicleType:Premium Taxi" }, "id": "./mobility/ind.blr/17@taxi.becknprotocol.io.item", "fulfillment_id": "./mobility/ind.blr/6285@taxi.becknprotocol.io.fulfillment", "tags": { "NameOfModel": "Brezza", "VehicleType": "Premium Taxi", "Make": "Maruti", "FuelType": "Diesel" } } ] }, "id": "./mobility/ind.blr/6285@taxi.becknprotocol.io.order", "state": "Awaiting Driver acceptance", "fulfillment": { "agent": { "phone": "+919082233441", "name": "Michel MJ" }, "start": { "location": { "gps": "12.973437,77.608771" } }, "end": { "location": { "gps": "12.935193,77.624481" } }, "id": "./mobility/ind.blr/6285@taxi.becknprotocol.io.fulfillment", "vehicle": { "registration": "KA 05 3456" }, "customer": { "person": { "name": "./Rajat/Mr./Rajat/ /Kumar/" }, "contact": { "phone": "+919867654322", "email": "er.rjtkumar@gmail.com" } } }, "items": [ { "category_id": "./mobility/ind.blr/1@taxi.becknprotocol.io.category", "price": { "currency": "INR", "value": "68.7864768539762" }, "descriptor": { "images": [ "https://taxi.becknprotocol.io/resources/images/car.png" ], "code": "Premium Taxi-FuelType:Diesel,Make:Maruti,NameOfModel:Brezza,VehicleType:Premium Taxi", "name": "Premium Taxi-FuelType:Diesel,Make:Maruti,NameOfModel:Brezza,VehicleType:Premium Taxi" }, "id": "./mobility/ind.blr/17@taxi.becknprotocol.io.item", "fulfillment_id": "./mobility/ind.blr/6285@taxi.becknprotocol.io.fulfillment", "tags": { "NameOfModel": "Brezza", "VehicleType": "Premium Taxi", "Make": "Maruti", "FuelType": "Diesel" } } ], "billing": { "address": { "country": "IND", "door": "MBT", "city": "std:080", "area_code": "560078", "name": "RajatKumar", "locality": "", "building": ",A33" }, "phone": "+919867654322", "name": "./Rajat/Mr./Rajat/ /Kumar/", "email": "er.rjtkumar@gmail.com" } } } }, { "context": { "domain": "nic2004:60221", "country": "IND", "city": "std:080", "core_version": "0.9.1", "action": "track", "bap_id": "mobilityreferencebap.becknprotocol.io", "bap_uri": "https://mobilityreferencebap.becknprotocol.io", "bpp_id": "becknify.humbhionline.in.mobility.BPP/beckn_open/app1-succinct-in", "bpp_uri": "https://becknify.humbhionline.in/mobility/beckn_open/app1-succinct-in/bpp", "transaction_id": "3d3bac46-d252-4da0-9290-afdd524d0214", "message_id": "b52878f3-28ed-4c31-8ebb-8989f33c3220", "timestamp": "2023-02-22T19:07:07.887Z", "ttl": "PT1M" }, "message": { "order_id": "./mobility/ind.blr/6285@taxi.becknprotocol.io.order" } } ] } } } class TestDataset { diff --git a/api-service/src/test/IngestorTestService.spec.ts b/api-service/src/test/IngestorTestService.spec.ts index 8c1a78eb..146f4af1 100644 --- a/api-service/src/test/IngestorTestService.spec.ts +++ b/api-service/src/test/IngestorTestService.spec.ts @@ -25,7 +25,7 @@ describe("DATA INGEST API", () => { return Promise.resolve([ {} ]) }) chai.spy.on(globalCache, 'get', () => { - return [ { "id": ":datasetId", "dataset_config": { "entry_topic": "topic" } } ] + return [ { "id": ":datasetId", "dataset_config": { "entry_topic": "topic" }, "extraction_config": { "is_batch_event": true, "extraction_key": "events", "batch_id": "id" } } ] }) chai.spy.on(kafkaConnector.telemetryService, "dispatch", () => { return Promise.resolve("data ingested") @@ -47,7 +47,34 @@ describe("DATA INGEST API", () => { done() }) }); - it("it should not ingest data successfully", (done) => { + it("it should ingest data successfully for batch even for individual events", (done) => { + chai.spy.on(dbConnector, "listRecords", () => { + return Promise.resolve([ {} ]) + }) + chai.spy.on(globalCache, 'get', () => { + return [ { "id": ":datasetId", "dataset_config": { "entry_topic": "topic" }, "extraction_config": { "is_batch_event": true, "extraction_key": "events", "batch_id": "id" } } ] + }) + chai.spy.on(kafkaConnector.telemetryService, "dispatch", () => { + return Promise.resolve("data ingested") + }) + chai + .request(app) + .post(config.apiDatasetIngestEndPoint) + .send(TestDataIngestion.SAMPLE_INDIVIDUAL_EVENT) + .end((err, res) => { + res.should.have.status(httpStatus.OK); + res.body.should.be.a("object"); + res.body.responseCode.should.be.eq(httpStatus[ "200_NAME" ]); + res.body.should.have.property("result"); + res.body.id.should.be.eq(routesConfig.data_ingest.api_id); + res.body.params.status.should.be.eq(constants.STATUS.SUCCESS) + chai.spy.restore(dbConnector, "listRecords") + chai.spy.restore(globalCache, 'get') + chai.spy.restore(kafkaConnector.telemetryService, "dispatch") + done() + }) + }); + it("it should not ingest data successfully when kafka is unable to connect", (done) => { chai.spy.on(dbConnector, "listRecords", () => { return Promise.resolve([ {} ]) }) @@ -62,9 +89,33 @@ describe("DATA INGEST API", () => { .post(config.apiDatasetIngestEndPoint) .send(TestDataIngestion.SAMPLE_INPUT) .end((err, res) => { - res.should.have.status(httpStatus.INTERNAL_SERVER_ERROR); + res.should.have.status(httpStatus.BAD_REQUEST); res.body.should.be.a("object"); - res.body.responseCode.should.be.eq(httpStatus[ "500_NAME" ]); + res.body.responseCode.should.be.eq(httpStatus[ "400_NAME" ]); + res.body.should.have.property("result"); + res.body.id.should.be.eq(routesConfig.data_ingest.api_id); + res.body.params.status.should.be.eq(constants.STATUS.FAILURE) + chai.spy.restore(dbConnector, "listRecords") + chai.spy.restore(globalCache, 'get') + chai.spy.restore(kafkaConnector.telemetryService, "dispatch") + done() + }) + }); + it("it should not ingest data when invalid extraction config present for batch", (done) => { + chai.spy.on(dbConnector, "listRecords", () => { + return Promise.resolve([ {} ]) + }) + chai.spy.on(globalCache, 'get', () => { + return [ { "id": ":datasetId", "dataset_config": { "entry_topic": "topic" }, "extraction_config": { "is_batch_event": true, "extraction_key": "eventas", "batch_id": "ids" } } ] + }) + chai + .request(app) + .post(config.apiDatasetIngestEndPoint) + .send(TestDataIngestion.SAMPLE_INPUT) + .end((err, res) => { + res.should.have.status(httpStatus.BAD_REQUEST); + res.body.should.be.a("object"); + res.body.responseCode.should.be.eq(httpStatus[ "400_NAME" ]); res.body.should.have.property("result"); res.body.id.should.be.eq(routesConfig.data_ingest.api_id); res.body.params.status.should.be.eq(constants.STATUS.FAILURE) @@ -221,4 +272,4 @@ describe("SUBMIT INGESTION ERROR SCENARIOS", ()=>{ done() }) }) -}) \ No newline at end of file +})