From 475eb42b95bae477abcf85be6bb51aada68276f4 Mon Sep 17 00:00:00 2001 From: Shiva Kayathi Date: Thu, 9 Jan 2025 12:16:46 -0500 Subject: [PATCH] mws-3953 -- bulk api with subscribers and topic details --- .env-example | 6 +- Dockerfile | 5 +- controllers/bulkApiMailer.js | 153 +++++++++++++++++++++++++++++++++ controllers/mailing.js | 38 +++++--- controllers/subscriptions.js | 2 +- controllers/workerSendEmail.js | 2 + docker-compose.yml | 34 +++++--- notifyQueue.js | 2 +- package.json | 4 +- server.js | 8 +- setup.md | 3 +- 11 files changed, 225 insertions(+), 32 deletions(-) create mode 100644 controllers/bulkApiMailer.js diff --git a/.env-example b/.env-example index 7ef5042..2793378 100644 --- a/.env-example +++ b/.env-example @@ -28,7 +28,7 @@ password=password keySalt=salt validHosts=["localhost:8080"] - +BASE_URL="http://localhost:8080" # Setting require to enable 50k more download CDS_NOTIFY_END_POINT= @@ -40,10 +40,12 @@ AWS_ACCESS_KEY= AWS_SECRET_ACCESS_KEY= AWS_BUCKET= +# used by bulk api, make sure has a space at end +BULK_GC_NOTIFY_PREPEND="ApiKey-v1 " # REDIS REDIS_ENV=stage -REDIS_URI=x-notify-redis +REDIS_URI=notify-redis-1 REDIS_PORT=6379 REDIS_SENTINEL_1_URI=127.0.0.1 REDIS_SENTINEL_1_PORT=26379 diff --git a/Dockerfile b/Dockerfile index 0512be5..a8bc06e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,11 +9,12 @@ COPY package*.json ./ RUN npm install -g nodemon RUN npm install +RUN export NODE_OPTIONS=--max_old_space_size=4096 #4GB COPY . . -WORKDIR ./x-notify/ - COPY ./.env-example ./.env +WORKDIR ./ + CMD [ "npm", "start" ] diff --git a/controllers/bulkApiMailer.js b/controllers/bulkApiMailer.js new file mode 100644 index 0000000..5a83222 --- /dev/null +++ b/controllers/bulkApiMailer.js @@ -0,0 +1,153 @@ +const fetch = require('node-fetch'); +const mailingManager = require('./mailing'); +const mailSend = require("../helpers/mailSend"); + +const Queue = require('bull'); +const redisUri = process.env.REDIS_URI || 'notify-redis-1'; +const redisPort = process.env.REDIS_PORT || '6379'; +const redisSentinel1Uri = process.env.REDIS_SENTINEL_1_URI || '127.0.0.1'; +const redisSentinel1Port = process.env.REDIS_SENTINEL_1_PORT || '26379'; +const redisSentinel2Uri = process.env.REDIS_SENTINEL_2_URI || '127.0.0.1'; +const redisSentinel2Port = process.env.REDIS_SENTINEL_2_PORT || '26379'; +const redisMasterName = process.env.REDIS_MASTER_NAME || 'x-notify-master'; + +var maxCompletedJobs = process.env.COMPLETED_JOBS_TO_KEEP || 300; +const BASE_URL = process.env.BASE_URL || "https://apps.canada.ca/x-notify"; +const bulkAPI = "https://api.notification.canada.ca/v2/notifications/bulk"; +const BULK_GC_NOTIFY_PREPEND = process.env.BULK_GC_NOTIFY_PREPEND || "ApiKey-v1 "; + +let redisConf = {}; +if (process.env.NODE_ENV === 'prod') { + redisConf = { + redis: { + sentinels: [ + { host: redisSentinel1Uri, port: redisSentinel1Port }, + { host: redisSentinel2Uri, port: redisSentinel2Port } + ], + name: redisMasterName, + host: redisUri, + port: redisPort + } + } +} else { + redisConf = { + redis: { + host: redisUri, + port: redisPort, + } + } +} + +const bulkQueue = new Queue('bulk-api', redisConf); +exports.bulkQueue = bulkQueue; + +// Process jobs +bulkQueue.process(async (job) => { + try { + let mailingState = mailingManager.mailingState; + let jobData = job.data; + + // Making the Bulk API POST request + let response = await fetch(bulkAPI, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + "Authorization" : BULK_GC_NOTIFY_PREPEND + jobData.notifyKey + }, + body: JSON.stringify( jobData.bulkEmailBody ), + }).then(response => { + if (!response.ok) { + throw new Error(`HTTP Error Status: ${response.status}`); + } + return response.json(); + }) + .then( result => { + mailingManager.mailingUpdate( jobData.mailingId, mailingState.sent, { historyState: mailingState.sending } ); + }) + + } catch (error) { + if (error.message.includes('HTTP Error Status: 5')) { + throw new Error('Retryable error'); // Ensures Bull retries + } + } +}); + +// Listen for failures +bulkQueue.on('failed', (job, err) => { + console.error(`Job ${job.id} failed: ${err.message}`); +}); + +exports.sendBulkEmails = async ( mailingId, topicId, subject, mailingBody ) => { + try { + let mailing_name = "Bulk_email-" + topicId; + let mailingTopic = await mailingManager.getTopic( topicId ); + + if ( !mailingTopic ) { + console.log( " Bulkmailer -- sendBulkEmails mailingTopic" ); + console.log( e ); + throw new Error( "Bulkmailer sendBulkEmails: Can't find the topic: " + topicId ); + } + + if ( !mailingTopic.nTemplateMailingId || !mailingTopic.notifyKey ) { + console.log( " Bulkmailer -- sendBulkEmails : check mailingTopic details" ); + console.log( e ); + throw new Error( "Bulkmailer sendBulkEmails: Can't find the topic: " + topicId ); + } + + let subscribers = await mailSend.getConfirmedSubscriberAsArray( topicId ); + if ( !subscribers.length) { + console.log( " Bulkmailer -- sendBulkEmails : No subscribers" ); + console.log( e ); + throw new Error( "Bulkmailer sendBulkEmails: No subscribers for the topic: " + topicId ); + } + + let formattedSubsArray = await formatSubsArray( subscribers, mailingBody, subject); + let bulkEmailBody = { + "name": mailing_name, + "template_id": mailingTopic.nTemplateMailingId, + "rows": formattedSubsArray + } + + bulkQueue.add( + { + bulkEmailBody: bulkEmailBody, + notifyKey: mailingTopic.notifyKey, + mailingId: mailingId, + }, + { + attempts: 5, // Maximum number of retries + backoff: { + type: 'exponential', // Use exponential backoff + delay: 5000 // Initial delay of 1 second (doubles each retry) + } + } + ); + } catch (err) { + throw Error('sendBulkEmails error: ' + err, 500) + } +} + +formatSubsArray = async ( listEmail, mailingBody, subject) => { + + let i, i_len = listEmail.length, subscriber; + let subsArray = [ + ["subject", "email address", "body", "unsub_link"] + ] + for( i = 0; i !== i_len; i++) { + subscriber = listEmail[ i ]; + + const { email, _id } = subscriber; + + const userCodeUrl = ( _id ? _id.toHexString() : _id ); + + if ( !email || !userCodeUrl ) { + continue; + } + + let unsub_link = BASE_URL + "/subs/remove/" + userCodeUrl + subsArray.push( [subject, email, mailingBody, unsub_link] ) + } + + return subsArray + +} diff --git a/controllers/mailing.js b/controllers/mailing.js index eda1ae8..b6aac29 100644 --- a/controllers/mailing.js +++ b/controllers/mailing.js @@ -11,6 +11,7 @@ const dbConn = module.parent.parent.exports.dbConn; const ObjectId = require('mongodb').ObjectId; const { Worker } = require('worker_threads'); +const bulkApiMailer = require('./bulkApiMailer'); const _mailingState = { cancelled: "cancelled", @@ -232,20 +233,33 @@ exports.mailingCancelSendToSub = async ( mailingId ) => { } exports.mailingSendToSub = async ( mailingId ) => { - // Need to be in current state "approved" + let mailing = await dbConn.collection( "mailing" ).findOne( { _id: ObjectId( mailingId ) } ); + if ( !mailing ) { + console.log( "mailingSendToSub: Invalid mailing id: " + mailingId ); + throw new Error( "mailingSendToSub: Mailing unavailable" ); + } - const rDoc = await mailingUpdate( mailingId, _mailingState.sending, { historyState: _mailingState.approved } ); - + let topic = await getTopic( mailing.topicId ); + if ( !topic ) { + console.log( "mailingSendToSub: no topic: " + mailing.topicId ); + throw Error( "mailingSendToSub : no topic with topicId: " + mailing.topicId); + } + // Need to be in current state "approved" + const rDoc = await mailingUpdate( mailingId, _mailingState.sending, { historyState: _mailingState.approved } ); // Check if the operation was successful, if not we know the error is already logged if ( !rDoc ) { return true; } - - // Do the sending - sendMailingToSubs( mailingId, rDoc.topicId, rDoc.subject, rDoc.body ); - - + + //if the bulkMail flag is set emails are delivered using bulk api + if ( topic.bulkMail ) { + bulkApiMailer.sendBulkEmails( mailingId, rDoc.topicId, rDoc.subject, rDoc.body ); + } else { + // Do the sending + sendMailingToSubs( mailingId, rDoc.topicId, rDoc.subject, rDoc.body ); + } + // When completed, change state to "sent" } @@ -389,7 +403,7 @@ async function mailingUpdate( mailingId, newHistoryState, options ) { // Send the mailing to the "approval email list" return rDoc.value; } - +exports.mailingUpdate = mailingUpdate; // Simple worker to send mailing async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBody ) { @@ -492,7 +506,8 @@ getTopic = ( topicId ) => { unsubURL: 1, thankURL: 1, failURL: 1, - inputErrURL: 1 + inputErrURL: 1, + bulkMail: 1, } } ).catch( (e) => { console.log( "getTopic" ); @@ -512,4 +527,5 @@ getTopic = ( topicId ) => { return topic; -} \ No newline at end of file +} +exports.getTopic = getTopic diff --git a/controllers/subscriptions.js b/controllers/subscriptions.js index 422023c..57c6a5a 100644 --- a/controllers/subscriptions.js +++ b/controllers/subscriptions.js @@ -34,7 +34,7 @@ const processEnv = process.env, _subsLinkSuffix = processEnv.subsLinkSuffix || "853e0212b92a127"; -const redisUri = process.env.REDIS_URI || 'x-notify-redis'; +const redisUri = process.env.REDIS_URI || 'notify-redis-1'; const redisPort = process.env.REDIS_PORT || '6379'; const redisSentinel1Uri = process.env.REDIS_SENTINEL_1_URI || '127.0.0.1'; const redisSentinel1Port = process.env.REDIS_SENTINEL_1_PORT || '26379'; diff --git a/controllers/workerSendEmail.js b/controllers/workerSendEmail.js index d20abfe..04f0cb6 100644 --- a/controllers/workerSendEmail.js +++ b/controllers/workerSendEmail.js @@ -287,3 +287,5 @@ getConfirmedSubscriberAsArray = async ( topicId ) => { return docsItems; }; + +exports.getConfirmedSubscriberAsArray = getConfirmedSubscriberAsArray; \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 4a5d3d9..cd5534d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,38 +1,48 @@ version: '3.8' services: - mongo: + mongo-1: image: mongo:4.2 - container_name: x-notify-mongo + container_name: mongo ports: - target: 27017 published: 27016 protocol: tcp mode: host networks: - - x-notify-net - redis: + - notify-net-1 + volumes: + - mongo_data:/data/db # Persistent volume for MongoDB + - mongo_config:/data/configdb # Configuration volume + + notify-redis-1: image: redis:6.0.1 - container_name: x-notify-redis + container_name: notify-redis-1 ports: - "6379:6379" networks: - - x-notify-net - x-notify: + - notify-net-1 + notify-node-1: build: ./ - container_name: x-notify + container_name: notify-node-1 ports: - "8080:8080" restart: on-failure environment: - MONGODB_URI=mongodb://mongo:27017/test - NODE_ENV=development + env_file: ".env" depends_on: - - mongo + - mongo-1 + - notify-redis-1 volumes: - - .:/x-notify + - .:/notify-1 - /node_modules networks: - - x-notify-net + - notify-net-1 +volumes: + mongo_data: + mongo_config: + networks: - x-notify-net: + notify-net-1: driver: bridge diff --git a/notifyQueue.js b/notifyQueue.js index 329d386..2ba14e4 100644 --- a/notifyQueue.js +++ b/notifyQueue.js @@ -3,7 +3,7 @@ const { createBullBoard } = require('@bull-board/api'); const { BullAdapter } = require('@bull-board/api/bullAdapter'); const { ExpressAdapter } = require('@bull-board/express'); -const redisUri = process.env.REDIS_URI || 'x-notify-redis'; +const redisUri = process.env.REDIS_URI || 'notify-redis-1'; const redisPort = process.env.REDIS_PORT || '6379'; const redisSentinel1Uri = process.env.REDIS_SENTINEL_1_URI || '127.0.0.1'; const redisSentinel1Port = process.env.REDIS_SENTINEL_1_PORT || '26379'; diff --git a/package.json b/package.json index 69c06ce..c2edeb4 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,8 @@ "author": "Government of Canada", "contributors": [ "duboisp", - "GormFrank" + "GormFrank", + "shiva-sc" ], "scripts": { "start": "nodemon -L server.js", @@ -37,6 +38,7 @@ "mustache": "^4.0.1", "nodemailer": "^6.4.6", "notifications-node-client": "^4.8.0", + "node-fetch": "^2.7.0", "passport": "^0.4.1", "passport-http": "^0.3.0", "passport-local": "^1.0.0" diff --git a/server.js b/server.js index 3f02148..9cf46bf 100644 --- a/server.js +++ b/server.js @@ -79,7 +79,8 @@ MongoClient.connect( processEnv.MONGODB_URI || '', {useUnifiedTopology: true} ). const adminController = require('./controllers/admin'); const mailingController = require('./controllers/mailing_view'); const userController = require('./controllers/user'); - + const bulkApiController = require('./controllers/bulkApiMailer'); + /** * Express configuration. @@ -248,6 +249,11 @@ MongoClient.connect( processEnv.MONGODB_URI || '', {useUnifiedTopology: true} ). userController.isAuthenticated, mailingController.v_mailingGetTopicStats); + //bulk-api + app.post('/v2/bulk-mail', + //userController.isAuthenticated, + bulkApiController.sendBulkEmails); + /** * SMTP Mail routes. */ diff --git a/setup.md b/setup.md index bbca47c..4bf4884 100644 --- a/setup.md +++ b/setup.md @@ -143,7 +143,7 @@ Note: We need to set the Service ID associated to the topic details (field: `nSe ### REDIS Default Configuration * `REDIS_ENV` Set environment value for Redis. Default: `stage` and `prod` which would leverage the redis-sentinel in production environment -* `REDIS_URI` Redis URI, the alias or the IP of the server host. Default: `x-notify-redis` +* `REDIS_URI` Redis URI, the alias or the IP of the server host. Default: `notify-redis-1` * `REDIS_PORT` Port of Redis server. Default: `6379` * `REDIS_SENTINEL_1_URI` Redis Sentinel 1 URI. Default: `127.0.0.1` * `REDIS_SENTINEL_1_PORT` Redis Sentinel 1 PORT. Default: `26379` @@ -168,6 +168,7 @@ topics * failURL: Failure URL for server error, * inputErrURL: Failure URL for filling out the form incorrectly * nTemplateMailingId: template ID for sending a corresponding mailing +* bulkMail: When set to true, enables usage bulk api from GCNotify topics_details