diff --git a/lib/lambda/delayedEmailProcessor.ts b/lib/lambda/delayedEmailProcessor.ts new file mode 100644 index 000000000..6436a3587 --- /dev/null +++ b/lib/lambda/delayedEmailProcessor.ts @@ -0,0 +1,338 @@ +import { SQSEvent } from "aws-lambda"; +import { SESClient, SendEmailCommand, SendEmailCommandInput } from "@aws-sdk/client-ses"; +import { EmailAddresses, KafkaRecord, Events, SEATOOL_STATUS, opensearch } from "shared-types"; +import { decodeBase64WithUtf8, getSecret } from "shared-utils"; +import { getEmailTemplates, getAllStateUsers } from "libs/email"; +import * as os from "libs/opensearch-lib"; +import { EMAIL_CONFIG, getCpocEmail, getSrtEmails } from "libs/email/content/email-components"; +import { htmlToText, HtmlToTextOptions } from "html-to-text"; +import pLimit from "p-limit"; +import { getOsNamespace } from "libs/utils"; + +interface ProcessEmailConfig { + emailAddressLookupSecretName: string; + applicationEndpointUrl: string; + osDomain: string; + indexNamespace?: string; + region: string; + DLQ_URL: string; + userPoolId: string; + configurationSetName: string; + isDev: boolean; +} + +/** + * Main SQS handler: parse each SQS message to get the original Kafka payload, + * then do OpenSearch lookups and send emails. + */ +export const handler = async (event: SQSEvent) => { + const requiredEnvVars = [ + "emailAddressLookupSecretName", + "applicationEndpointUrl", + "osDomain", + "region", + "DLQ_URL", + "userPoolId", + "isDev", + "configurationSetName", + ] as const; + + const missing = requiredEnvVars.filter((e) => !process.env[e]); + if (missing.length > 0) { + throw new Error(`Missing environment variables: ${missing.join(", ")}`); + } + + const emailConfig: ProcessEmailConfig = { + emailAddressLookupSecretName: process.env.emailAddressLookupSecretName!, + applicationEndpointUrl: process.env.applicationEndpointUrl!, + osDomain: `https://${process.env.osDomain!}`, + indexNamespace: process.env.indexNamespace, + region: process.env.region!, + DLQ_URL: process.env.DLQ_URL!, + userPoolId: process.env.userPoolId!, + configurationSetName: process.env.configurationSetName!, + isDev: process.env.isDev === "true", + }; + + for (const sqsRecord of event.Records) { + try { + const kafkaRecord = JSON.parse(sqsRecord.body) as KafkaRecord; + + const { key, value } = kafkaRecord; // sanity check + if (!key || !value) { + console.log("No key/value present. Possibly a tombstone or invalid data."); + continue; + } + + await processRecord(kafkaRecord, emailConfig); + } catch (error) { + console.error("Error processing SQS record:", error); + throw error; // Let Lambda handle retries / DLQ + } + } +}; + +/** + * Takes a KafkaRecord, decodes data, and triggers the actual + * "processAndSendEmails" function. + */ +async function processRecord(kafkaRecord: KafkaRecord, config: ProcessEmailConfig) { + console.log("Processing record from SQS => KafkaRecord:", JSON.stringify(kafkaRecord, null, 2)); + + const { key, value, timestamp } = kafkaRecord; + const id: string = decodeBase64WithUtf8(key); // decode the key + + if (kafkaRecord.topic === "aws.seatool.ksql.onemac.three.agg.State_Plan") { + const safeID = id.replace(/^"|"$/g, ""); + const seatoolRecord: Document = { + safeID, + ...JSON.parse(decodeBase64WithUtf8(value)), + }; + const safeSeatoolRecord = opensearch.main.seatool.transform(safeID).safeParse(seatoolRecord); + + if (safeSeatoolRecord.data?.seatoolStatus === SEATOOL_STATUS.WITHDRAWN) { + try { + const item = await os.getItem(config.osDomain, getOsNamespace("main"), safeID); + + if (!item?.found || !item?._source) { + console.log(`The package was not found for id: ${id} in mako. Doing nothing.`); + return; + } + + const recordToPass = { + timestamp, + ...safeSeatoolRecord.data, + submitterName: item._source.submitterName, + submitterEmail: item._source.submitterEmail, + event: "seatool-withdraw" as const, + proposedEffectiveDate: safeSeatoolRecord.data?.proposedDate, + origin: "seatool", + attachments: {}, + authority: item._source.authority, + } as unknown as Events[keyof Events]; + + await processAndSendEmails(recordToPass, safeID, config); + } catch (error) { + console.error("Error processing record:", JSON.stringify(error, null, 2)); + throw error; + } + } + return; + } + + if (!value) { + console.log("Tombstone detected, nothing to do."); + return; + } + + const recordBody = JSON.parse(decodeBase64WithUtf8(value)); + + // Example check: only process if origin is "mako" + if (recordBody.origin !== "mako") { + console.log("Kafka event is not of 'mako' origin. Skipping."); + return; + } + + // Combine the original fields for passing to the next step + const eventObj = { + ...recordBody, + timestamp, + }; + + try { + await processAndSendEmails(eventObj as Events[keyof Events], id, config); + } catch (error) { + console.error("Error in processAndSendEmails:", error); + throw error; + } +} + +/** + * Main email processing function: + * 1. Retrieves the relevant email templates + * 2. Loads user data & secrets + * 3. Queries OpenSearch for the item + * 4. Renders & sends emails with concurrency limiting + */ +export async function processAndSendEmails( + record: Events[keyof Events], + id: string, + config: ProcessEmailConfig, +) { + const templates = await getEmailTemplates(record); + if (!templates) { + console.log(`No email templates for event '${record.event}'. Skipping.`); + return; + } + + const territory = id.slice(0, 2); + const allStateUsers = await getAllStateUsers({ + userPoolId: config.userPoolId, + state: territory, + }); + + const secretString = await getSecret(config.emailAddressLookupSecretName); + const emails: EmailAddresses = JSON.parse(secretString); + + // Retrieve package from OpenSearch + const item = await os.getItem(config.osDomain, getOsNamespace("main"), id); + if (!item?.found || !item?._source) { + console.log(`OpenSearch item not found for id: ${id}. Skipping.`); + return; + } + + const cpocEmail = getCpocEmail(item); + const srtEmails = getSrtEmails(item); + + // Gather variables to pass into templates + const templateVariables = { + ...record, + id, + territory, + applicationEndpointUrl: config.applicationEndpointUrl, + emails: { + ...emails, + cpocEmail, + srtEmails, + }, + allStateUsersEmails: allStateUsers.map((user) => user.formattedEmailAddress), + }; + + console.log("Template variables:", JSON.stringify(templateVariables, null, 2)); + + // Concurrency limit for sending + const limit = pLimit(5); + + const sendTasks = templates.map((tmpl) => + limit(async () => { + const filledTemplate = await tmpl(templateVariables); + validateEmailTemplate(filledTemplate); + const params = createEmailParams( + filledTemplate, + emails.sourceEmail, + config.applicationEndpointUrl, + config.isDev, + config.configurationSetName, + ); + return sendEmail(params, config.region); + }), + ); + + try { + await Promise.all(sendTasks); + console.log("All emails sent successfully for id:", id); + } catch (err) { + console.error("Email sending failed:", err); + throw err; + } +} + +/** + * Ensure the template has the necessary fields before sending. + */ +export function validateEmailTemplate(tmpl: { + to: string[]; + subject: string; + body: string; + cc?: string[]; +}) { + const requiredFields: (keyof typeof tmpl)[] = ["to", "subject", "body"]; + const missing = requiredFields.filter((f) => !tmpl[f]); + if (missing.length > 0) { + throw new Error(`Email template missing fields: ${missing.join(", ")}`); + } +} + +/** + * Create the SES params. Optionally add BCC in dev. + */ +export function createEmailParams( + filledTemplate: { + to: string[]; + subject: string; + body: string; + cc?: string[]; + }, + sourceEmail: string, + baseUrl: string, + isDev: boolean, + configurationSetName?: string, +): SendEmailCommandInput { + return { + Destination: { + ToAddresses: filledTemplate.to, + CcAddresses: filledTemplate.cc || [], + BccAddresses: isDev ? [`Dev BCC <${EMAIL_CONFIG.DEV_EMAIL}>`] : [], + }, + Message: { + Body: { + Html: { + Data: filledTemplate.body, + Charset: "UTF-8", + }, + Text: { + Data: htmlToText(filledTemplate.body, htmlToTextOptions(baseUrl)), + Charset: "UTF-8", + }, + }, + Subject: { + Data: filledTemplate.subject, + Charset: "UTF-8", + }, + }, + Source: sourceEmail, + ConfigurationSetName: configurationSetName, + }; +} + +/** + * Send the email via AWS SES. + */ +export async function sendEmail(params: SendEmailCommandInput, region: string) { + const sesClient = new SESClient({ region }); + const command = new SendEmailCommand(params); + const result = await sesClient.send(command); + console.log("SES send result:", result); + return result; +} + +/** + * Options for converting HTML to text + */ +export function htmlToTextOptions(baseUrl: string): HtmlToTextOptions { + return { + wordwrap: 80, + preserveNewlines: true, + selectors: [ + { + selector: "h1", + options: { uppercase: true, leadingLineBreaks: 2, trailingLineBreaks: 1 }, + }, + { + selector: "img", + options: { ignoreHref: true, src: true }, + }, + { + selector: "p", + options: { leadingLineBreaks: 1, trailingLineBreaks: 1 }, + }, + { + selector: "a", + options: { + linkBrackets: ["[", "]"], + baseUrl, + hideLinkHrefIfSameAsText: true, + }, + }, + ], + limits: { + maxInputLength: 50000, + ellipsis: "...", + maxBaseElements: 1000, + }, + longWordSplit: { + forceWrapOnLimit: false, + wrapCharacters: ["-", "/"], + }, + }; +} diff --git a/lib/lambda/delayedEmailProcessorHandler.test.ts b/lib/lambda/delayedEmailProcessorHandler.test.ts new file mode 100644 index 000000000..dfa6beb81 --- /dev/null +++ b/lib/lambda/delayedEmailProcessorHandler.test.ts @@ -0,0 +1,283 @@ +import { describe, it, expect } from "vitest"; +import { SQSEvent } from "aws-lambda"; +import { handler } from "./delayedEmailProcessor"; +import { KafkaRecord } from "shared-types"; +import { Authority } from "shared-types"; +import { SIMPLE_ID, WITHDRAW_RAI_ITEM_B, WITHDRAW_RAI_ITEM_C } from "mocks"; +import { uploadSubsequentDocuments } from "lib/libs/email/content"; +const nms = "new-medicaid-submission"; +const ncs = "new-chip-submission"; +const tempExtension = "temporary-extension"; +const withdrawPackage = "withdraw-package"; +const contractingInitial = "contracting-initial"; +const capitatedInitial = "capitated-initial"; +const withdrawRai = "withdraw-rai"; +const respondToRai = "respond-to-rai"; +const appk = "app-k"; + +it.each([ + [ + `should send an email for ${respondToRai} with ${Authority.MED_SPA}`, + Authority.MED_SPA, + respondToRai, + SIMPLE_ID, + ], + [ + `should send an email for ${respondToRai} with ${Authority.CHIP_SPA}`, + Authority.CHIP_SPA, + respondToRai, + SIMPLE_ID, + ], + [ + `should send an email for ${respondToRai} with ${Authority["1915b"]}`, + Authority["1915b"], + respondToRai, + SIMPLE_ID, + ], + [ + `should send an email for ${respondToRai} with ${Authority["1915c"]}`, + Authority["1915c"], + respondToRai, + SIMPLE_ID, + ], + [`should send an email for ${nms} with ${Authority.MED_SPA}`, Authority.MED_SPA, nms, SIMPLE_ID], + [ + `should send an email for ${nms} with ${Authority.CHIP_SPA}`, + Authority.CHIP_SPA, + nms, + SIMPLE_ID, + ], + [ + `should send an email for ${nms} with ${Authority["1915b"]}`, + Authority["1915b"], + nms, + SIMPLE_ID, + ], + [ + `should send an email for ${nms} with ${Authority["1915c"]}`, + Authority["1915c"], + nms, + SIMPLE_ID, + ], + [`should send an email for ${ncs} with ${Authority.MED_SPA}`, Authority.MED_SPA, ncs, SIMPLE_ID], + [ + `should send an email for ${ncs} with ${Authority.CHIP_SPA}`, + Authority.CHIP_SPA, + ncs, + SIMPLE_ID, + ], + [ + `should send an email for ${ncs} with ${Authority["1915b"]}`, + Authority["1915b"], + ncs, + SIMPLE_ID, + ], + [ + `should send an email for ${ncs} with ${Authority["1915c"]}`, + Authority["1915c"], + ncs, + SIMPLE_ID, + ], + [ + `should send an email for ${tempExtension} with ${Authority["1915b"]}`, + Authority["1915b"], + tempExtension, + SIMPLE_ID, + ], + [ + `should send an email for ${tempExtension} with ${Authority["1915c"]}`, + Authority["1915c"], + tempExtension, + SIMPLE_ID, + ], + [ + `should send an email for ${withdrawPackage} with ${Authority.MED_SPA}`, + Authority.MED_SPA, + withdrawPackage, + SIMPLE_ID, + ], + [ + `should send an email for ${withdrawPackage} with ${Authority.CHIP_SPA}`, + Authority.CHIP_SPA, + withdrawPackage, + SIMPLE_ID, + ], + [ + `should send an email for ${withdrawPackage} for ${ncs} with ${Authority["1915b"]}`, + Authority["1915b"], + withdrawPackage, + SIMPLE_ID, + ], + [ + `should send an email for ${appk} with ${Authority["1915c"]}`, + Authority["1915c"], + appk, + SIMPLE_ID, + ], + [ + `should send an email for ${contractingInitial} with ${Authority.MED_SPA}`, + Authority.MED_SPA, + contractingInitial, + SIMPLE_ID, + ], + [ + `should send an email for ${contractingInitial} with ${Authority.CHIP_SPA}`, + Authority.CHIP_SPA, + contractingInitial, + SIMPLE_ID, + ], + [ + `should send an email for ${contractingInitial} with ${Authority["1915b"]}`, + Authority["1915b"], + contractingInitial, + SIMPLE_ID, + ], + [ + `should send an email for ${contractingInitial} with ${Authority["1915c"]}`, + Authority["1915c"], + contractingInitial, + SIMPLE_ID, + ], + [ + `should send an email for ${capitatedInitial} with ${Authority.MED_SPA}`, + Authority.MED_SPA, + capitatedInitial, + SIMPLE_ID, + ], + [ + `should send an email for ${capitatedInitial} with ${Authority.CHIP_SPA}`, + Authority.CHIP_SPA, + capitatedInitial, + SIMPLE_ID, + ], + [ + `should send an email for ${capitatedInitial} with ${Authority["1915b"]}`, + Authority["1915b"], + capitatedInitial, + SIMPLE_ID, + ], + [ + `should send an email for ${appk} with ${Authority["1915c"]}`, + Authority["1915c"], + appk, + SIMPLE_ID, + ], + [ + `should send an email for ${appk} with ${Authority["1915b"]}`, + Authority["1915b"], + appk, + SIMPLE_ID, + ], + [ + `should send an email for ${withdrawRai} with ${Authority["1915b"]}`, + Authority["1915b"], + withdrawRai, + WITHDRAW_RAI_ITEM_B, + ], + [ + `should send an email for ${withdrawRai} with ${Authority["1915c"]}`, + Authority["1915c"], + withdrawRai, + WITHDRAW_RAI_ITEM_C, + ], + [ + `should send an email for ${uploadSubsequentDocuments} with ${Authority.CHIP_SPA}`, + Authority.CHIP_SPA, + uploadSubsequentDocuments, + WITHDRAW_RAI_ITEM_B, + ], + [ + `should send an email for ${uploadSubsequentDocuments} with ${Authority["1915c"]}`, + Authority["1915c"], + uploadSubsequentDocuments, + WITHDRAW_RAI_ITEM_C, + ], +])("%s", async (_, auth, eventType, id) => { + const mockEvent: SQSEvent = { + Records: [ + { + messageId: "test-message-id", + receiptHandle: "test-receipt-handle", + body: JSON.stringify({ + key: Buffer.from(id).toString("base64"), + value: Buffer.from( + JSON.stringify({ + origin: "mako", + event: eventType, + authority: auth, + }), + ).toString("base64"), + headers: {}, + timestamp: 1732645041557, + offset: "0", + partition: 0, + topic: "mock-topic", + }), + attributes: { + ApproximateReceiveCount: "1", + SentTimestamp: "1732645041557", + SenderId: "test-sender", + ApproximateFirstReceiveTimestamp: "1732645041557", + }, + messageAttributes: {}, + md5OfBody: "test-md5", + eventSource: "aws:sqs", + eventSourceARN: "test:arn", + awsRegion: "us-east-1", + }, + ], + }; + await handler(mockEvent); +}); +describe("process emails Handler failures", () => { + it.each([ + [ + `should send an email for ${withdrawRai} with ${Authority["1915b"]} and fail due to not finding it`, + Authority["1915b"], + withdrawRai, + SIMPLE_ID, + ], + [ + `should send an email for ${withdrawRai} with ${Authority["1915c"]} and fail due to not finding it`, + Authority["1915c"], + withdrawRai, + SIMPLE_ID, + ], + ])("%s", async (_, auth, eventType, id = SIMPLE_ID) => { + const mockEvent: SQSEvent = { + Records: [ + { + messageId: "test-message-id", + receiptHandle: "test-receipt-handle", + body: JSON.stringify({ + key: Buffer.from(id).toString("base64"), + value: Buffer.from( + JSON.stringify({ + origin: "mako", + event: eventType, + authority: auth, + }), + ).toString("base64"), + headers: {}, + timestamp: 1732645041557, + offset: "0", + partition: 0, + topic: "mock-topic", + } as unknown as KafkaRecord), + attributes: { + ApproximateReceiveCount: "1", + SentTimestamp: "1732645041557", + SenderId: "test-sender", + ApproximateFirstReceiveTimestamp: "1732645041557", + }, + messageAttributes: {}, + md5OfBody: "test-md5", + eventSource: "aws:sqs", + eventSourceARN: "test:arn", + awsRegion: "us-east-1", + }, + ], + }; + await expect(() => handler(mockEvent)).rejects.toThrow(); + }); +}); diff --git a/lib/lambda/index.ts b/lib/lambda/index.ts index 78215891e..ad5c459b8 100644 --- a/lib/lambda/index.ts +++ b/lib/lambda/index.ts @@ -7,7 +7,7 @@ export * as getSubTypes from "./getSubTypes"; export * as getTypes from "./getTypes"; export * as getUploadUrl from "./getUploadUrl"; export * as mapRole from "./mapRole"; -export * as processEmails from "./processEmails"; +export * as processEmails from "./delayedEmailProcessor.ts"; export * as runReindex from "./runReindex"; export * as search from "./search"; export * as setupIndex from "./setupIndex"; diff --git a/lib/lambda/kafkaToSqs.ts b/lib/lambda/kafkaToSqs.ts new file mode 100644 index 000000000..cebc423cd --- /dev/null +++ b/lib/lambda/kafkaToSqs.ts @@ -0,0 +1,35 @@ +import { Handler } from "aws-lambda"; +import { KafkaEvent } from "shared-types"; +import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs"; + +const sqsClient = new SQSClient({ region: process.env.region }); + +export const handler: Handler = async (event) => { + const queueUrl = process.env.DELAY_QUEUE_URL; + if (!queueUrl) { + throw new Error("DELAY_QUEUE_URL is not set"); + } + + for (const [topic, records] of Object.entries(event.records)) { + for (const record of records) { + if (record.value) { + await sqsClient.send( + new SendMessageCommand({ + QueueUrl: queueUrl, + MessageBody: JSON.stringify({ + topic, + key: record.key, + value: record.value, + timestamp: record.timestamp || Date.now(), + }), + }), + ); + } + } + } + + return { + statusCode: 200, + body: JSON.stringify({ message: "Messages sent to SQS" }), + }; +}; diff --git a/lib/lambda/processEmails.test.ts b/lib/lambda/kafkaToSqsHandler.test.ts similarity index 94% rename from lib/lambda/processEmails.test.ts rename to lib/lambda/kafkaToSqsHandler.test.ts index b5a5b1eee..794bceca3 100644 --- a/lib/lambda/processEmails.test.ts +++ b/lib/lambda/kafkaToSqsHandler.test.ts @@ -1,8 +1,9 @@ import { describe, it, expect, vi } from "vitest"; import { Context } from "aws-lambda"; import { SESClient } from "@aws-sdk/client-ses"; -import { sendEmail, validateEmailTemplate, handler } from "./processEmails"; +import { handler } from "./kafkaToSqs.ts"; import { KafkaRecord, KafkaEvent } from "shared-types"; +import { sendEmail, validateEmailTemplate } from "./delayedEmailProcessor.ts"; describe("process emails Handler", () => { it("should return 200 with a proper email", async () => { @@ -16,7 +17,7 @@ describe("process emails Handler", () => { ConfigurationSetName: "test-config", }; const test = await sendEmail(params, "us-east-1"); - expect(test.status).toStrictEqual(200); + expect(test.$metadata.httpStatusCode).toStrictEqual(200); }); it("should throw an error", async () => { @@ -34,20 +35,18 @@ describe("process emails Handler", () => { it("should validate the email template and throw an error for missing fields", async () => { const template = { - to: "Person", - from: "Other Guy", + to: ["Person"], body: "body", // missing required 'subject' field }; - expect(() => validateEmailTemplate(template)).toThrowError( + expect(() => validateEmailTemplate(template as any)).toThrowError( "Email template missing required fields: subject", ); }); it("should validate a complete email template without throwing", () => { const template = { - to: "Person", - from: "Other Guy", + to: ["Person"], body: "body", subject: "Test Subject", }; diff --git a/lib/lambda/processEmailsHandler.test.ts b/lib/lambda/processEmailsHandler.test.ts deleted file mode 100644 index 4d16398be..000000000 --- a/lib/lambda/processEmailsHandler.test.ts +++ /dev/null @@ -1,278 +0,0 @@ -import { describe, it, expect, vi } from "vitest"; -import { Context } from "aws-lambda"; -import { SESClient } from "@aws-sdk/client-ses"; -import { handler } from "./processEmails"; -import { KafkaRecord, KafkaEvent } from "shared-types"; -import { Authority } from "shared-types"; -import { SIMPLE_ID, WITHDRAW_RAI_ITEM_B, WITHDRAW_RAI_ITEM_C } from "mocks"; -const nms = "new-medicaid-submission"; -const ncs = "new-chip-submission"; -const tempExtension = "temporary-extension"; -const withdrawPackage = "withdraw-package"; -const contractingInitial = "contracting-initial"; -const capitatedInitial = "capitated-initial"; -const withdrawRai = "withdraw-rai"; -const respondToRai = "respond-to-rai"; -const appk = "app-k"; -const uploadSubsequentDocuments = "upload-subsequent-documents"; - -describe("process emails Handler", () => { - it.each([ - [ - `should send an email for ${respondToRai} with ${Authority.MED_SPA}`, - Authority.MED_SPA, - respondToRai, - SIMPLE_ID, - ], - [ - `should send an email for ${respondToRai} with ${Authority.CHIP_SPA}`, - Authority.CHIP_SPA, - respondToRai, - SIMPLE_ID, - ], - [ - `should send an email for ${respondToRai} with ${Authority["1915b"]}`, - Authority["1915b"], - respondToRai, - SIMPLE_ID, - ], - [ - `should send an email for ${respondToRai} with ${Authority["1915c"]}`, - Authority["1915c"], - respondToRai, - SIMPLE_ID, - ], - [ - `should send an email for ${nms} with ${Authority.MED_SPA}`, - Authority.MED_SPA, - nms, - SIMPLE_ID, - ], - [ - `should send an email for ${nms} with ${Authority.CHIP_SPA}`, - Authority.CHIP_SPA, - nms, - SIMPLE_ID, - ], - [ - `should send an email for ${nms} with ${Authority["1915b"]}`, - Authority["1915b"], - nms, - SIMPLE_ID, - ], - [ - `should send an email for ${nms} with ${Authority["1915c"]}`, - Authority["1915c"], - nms, - SIMPLE_ID, - ], - [ - `should send an email for ${ncs} with ${Authority.MED_SPA}`, - Authority.MED_SPA, - ncs, - SIMPLE_ID, - ], - [ - `should send an email for ${ncs} with ${Authority.CHIP_SPA}`, - Authority.CHIP_SPA, - ncs, - SIMPLE_ID, - ], - [ - `should send an email for ${ncs} with ${Authority["1915b"]}`, - Authority["1915b"], - ncs, - SIMPLE_ID, - ], - [ - `should send an email for ${ncs} with ${Authority["1915c"]}`, - Authority["1915c"], - ncs, - SIMPLE_ID, - ], - [ - `should send an email for ${tempExtension} with ${Authority["1915b"]}`, - Authority["1915b"], - tempExtension, - SIMPLE_ID, - ], - [ - `should send an email for ${tempExtension} with ${Authority["1915c"]}`, - Authority["1915c"], - tempExtension, - SIMPLE_ID, - ], - [ - `should send an email for ${withdrawPackage} with ${Authority.MED_SPA}`, - Authority.MED_SPA, - withdrawPackage, - SIMPLE_ID, - ], - [ - `should send an email for ${withdrawPackage} with ${Authority.CHIP_SPA}`, - Authority.CHIP_SPA, - withdrawPackage, - SIMPLE_ID, - ], - [ - `should send an email for ${withdrawPackage} for ${ncs} with ${Authority["1915b"]}`, - Authority["1915b"], - withdrawPackage, - SIMPLE_ID, - ], - [ - `should send an email for ${appk} with ${Authority["1915c"]}`, - Authority["1915c"], - appk, - SIMPLE_ID, - ], - [ - `should send an email for ${contractingInitial} with ${Authority.MED_SPA}`, - Authority.MED_SPA, - contractingInitial, - SIMPLE_ID, - ], - [ - `should send an email for ${contractingInitial} with ${Authority.CHIP_SPA}`, - Authority.CHIP_SPA, - contractingInitial, - SIMPLE_ID, - ], - [ - `should send an email for ${contractingInitial} with ${Authority["1915b"]}`, - Authority["1915b"], - contractingInitial, - SIMPLE_ID, - ], - [ - `should send an email for ${contractingInitial} with ${Authority["1915c"]}`, - Authority["1915c"], - contractingInitial, - SIMPLE_ID, - ], - [ - `should send an email for ${capitatedInitial} with ${Authority.MED_SPA}`, - Authority.MED_SPA, - capitatedInitial, - SIMPLE_ID, - ], - [ - `should send an email for ${capitatedInitial} with ${Authority.CHIP_SPA}`, - Authority.CHIP_SPA, - capitatedInitial, - SIMPLE_ID, - ], - [ - `should send an email for ${capitatedInitial} with ${Authority["1915b"]}`, - Authority["1915b"], - capitatedInitial, - SIMPLE_ID, - ], - [ - `should send an email for ${appk} with ${Authority["1915c"]}`, - Authority["1915c"], - appk, - SIMPLE_ID, - ], - [ - `should send an email for ${appk} with ${Authority["1915b"]}`, - Authority["1915b"], - appk, - SIMPLE_ID, - ], - [ - `should send an email for ${withdrawRai} with ${Authority["1915b"]}`, - Authority["1915b"], - withdrawRai, - WITHDRAW_RAI_ITEM_B, - ], - [ - `should send an email for ${withdrawRai} with ${Authority["1915c"]}`, - Authority["1915c"], - withdrawRai, - WITHDRAW_RAI_ITEM_C, - ], - [ - `should send an email for ${uploadSubsequentDocuments} with ${Authority.CHIP_SPA}`, - Authority.CHIP_SPA, - uploadSubsequentDocuments, - WITHDRAW_RAI_ITEM_B, - ], - [ - `should send an email for ${uploadSubsequentDocuments} with ${Authority["1915c"]}`, - Authority["1915c"], - uploadSubsequentDocuments, - WITHDRAW_RAI_ITEM_C, - ], - ])("%s", async (_, auth, eventType, id) => { - const callback = vi.fn(); - const secSPY = vi.spyOn(SESClient.prototype, "send"); - const mockEvent: KafkaEvent = { - records: { - "mock-topic": [ - { - key: Buffer.from(id).toString("base64"), - value: Buffer.from( - JSON.stringify({ - origin: "mako", - event: eventType, - authority: auth, - }), - ).toString("base64"), - headers: {}, - timestamp: 1732645041557, - offset: "0", - partition: 0, - topic: "mock-topic", - } as unknown as KafkaRecord, - ], - }, - eventSource: "", - bootstrapServers: "", - }; - await handler(mockEvent, {} as Context, callback); - expect(secSPY).toHaveBeenCalledTimes(2); - }); -}); -describe("process emails Handler failures", () => { - it.each([ - [ - `should send an email for ${withdrawRai} with ${Authority["1915b"]} and fail due to not finding it`, - Authority["1915b"], - withdrawRai, - SIMPLE_ID, - ], - [ - `should send an email for ${withdrawRai} with ${Authority["1915c"]} and fail due to not finding it`, - Authority["1915c"], - withdrawRai, - SIMPLE_ID, - ], - ])("%s", async (_, auth, eventType, id = SIMPLE_ID) => { - const callback = vi.fn(); - const mockEvent: KafkaEvent = { - records: { - "mock-topic": [ - { - key: Buffer.from(id).toString("base64"), - value: Buffer.from( - JSON.stringify({ - origin: "mako", - event: eventType, - authority: auth, - }), - ).toString("base64"), - headers: {}, - timestamp: 1732645041557, - offset: "0", - partition: 0, - topic: "mock-topic", - } as unknown as KafkaRecord, - ], - }, - eventSource: "", - bootstrapServers: "", - }; - await expect(() => handler(mockEvent, {} as Context, callback)).rejects.toThrow(); - }); -}); diff --git a/lib/libs/email/preview/InitialSubmissions/State/Temp_Extension.tsx b/lib/libs/email/preview/InitialSubmissions/State/Temp_Extension.tsx index 871363349..0a52bfcb9 100644 --- a/lib/libs/email/preview/InitialSubmissions/State/Temp_Extension.tsx +++ b/lib/libs/email/preview/InitialSubmissions/State/Temp_Extension.tsx @@ -1,13 +1,25 @@ -import { emailTemplateValue } from "../../../mock-data/temp-extension"; import { TempExtStateEmail } from "../../../content/tempExtension/emailTemplates/TempExtState"; +import { Events, CommonEmailVariables } from "node_modules/shared-types"; -const TempExtStatePreview = () => { +type TempExtStateEmailProps = Events["TemporaryExtension"] & CommonEmailVariables; + +const TempExtStatePreview = (variables: TempExtStateEmailProps) => { return ( ); diff --git a/lib/libs/utils.ts b/lib/libs/utils.ts index 5c6cabc28..e197472f2 100644 --- a/lib/libs/utils.ts +++ b/lib/libs/utils.ts @@ -1,10 +1,5 @@ import { BaseIndex, Index } from "lib/packages/shared-types/opensearch"; -/** - * Returns the `osDomain` - * @throws if env variables are not defined, `getDomain` throws error indicating if variable is missing - * @returns the value of `osDomain` - */ export function getDomain(): string { const domain = process.env.osDomain; if (domain === undefined) { @@ -13,13 +8,6 @@ export function getDomain(): string { return domain; } -/** - * Returns the `indexNamespace` and `baseIndex` combined - * process.env.indexNamespace (THIS SHOULD BE THE BRANCH NAME & SHOULD ALWAYS BE DEFINED) - * @throws if process.env.indexNamespace not defined. - * @returns the value of `indexNamespace` and `baseIndex` combined - */ - export function getOsNamespace(baseIndex: BaseIndex): Index { const indexNamespace = process.env.indexNamespace; @@ -30,15 +18,6 @@ export function getOsNamespace(baseIndex: BaseIndex): Index { return `${indexNamespace}${baseIndex}`; } -/** - * Gets both the OpenSearch domain and namespace combined with the base index - * @param baseIndex - The base index to combine with the namespace - * @throws {Error} If required environment variables are not defined - * @returns Object containing: - * - domain: The OpenSearch domain from environment variables - * - index: The namespace and base index combined - */ - export function getDomainAndNamespace(baseIndex: BaseIndex) { const domain = getDomain(); const index = getOsNamespace(baseIndex); diff --git a/lib/stacks/email.ts b/lib/stacks/email.ts index 0f761c056..10f3cdd90 100644 --- a/lib/stacks/email.ts +++ b/lib/stacks/email.ts @@ -5,7 +5,11 @@ import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs"; import { CfnEventSourceMapping } from "aws-cdk-lib/aws-lambda"; import { commonBundlingOptions } from "../config/bundling-config"; import { DeploymentConfigProperties } from "lib/config/deployment-config"; +import { SqsEventSource } from "aws-cdk-lib/aws-lambda-event-sources"; +/** + * Interface defining the required properties for the Email Service Stack + */ interface EmailServiceStackProps extends cdk.StackProps { project: string; stage: string; @@ -24,10 +28,13 @@ interface EmailServiceStackProps extends cdk.StackProps { userPool: cdk.aws_cognito.UserPool; } +/** + * Configuration interface for environment-specific settings + */ interface EnvironmentConfig { memorySize: number; - timeout: number; - logRetention: number; + timeout: number; // in minutes + logRetention: number; // in days maxRetryAttempts: number; dailySendQuota: number; } @@ -75,18 +82,16 @@ export class Email extends cdk.NestedStack { throw new Error("Invalid broker string format"); } - // SES Configuration Set - new cdk.aws_ses.CfnConfigurationSet(this, "ConfigurationSet", { - name: `${project}-${stage}-${stack}-email-configuration-set`, - reputationOptions: { - reputationMetricsEnabled: true, - }, - sendingOptions: { - sendingEnabled: true, - }, - suppressionOptions: { - suppressedReasons: ["BOUNCE", "COMPLAINT"], - }, + const environmentType = isDev ? "dev" : "prod"; + + // ------------------------------------------------------------------------- + // SQS Queues: main (delayed) + DLQ + // ------------------------------------------------------------------------- + const emailQueue = new cdk.aws_sqs.Queue(this, "EmailQueue", { + queueName: `${project}-${stage}-${stack}-email-queue`, + encryption: cdk.aws_sqs.QueueEncryption.KMS_MANAGED, + deliveryDelay: cdk.Duration.seconds(60), + visibilityTimeout: cdk.Duration.seconds(720), }); const dlq = new cdk.aws_sqs.Queue(this, "DeadLetterQueue", { @@ -96,8 +101,10 @@ export class Email extends cdk.NestedStack { visibilityTimeout: cdk.Duration.seconds(300), }); - // IAM Role for Lambda - const lambdaRole = new cdk.aws_iam.Role(this, "LambdaExecutionRole", { + // ------------------------------------------------------------------------- + // IAM Role for both Lambdas + // ------------------------------------------------------------------------- + const lambdaRole = new cdk.aws_iam.Role(this, "EmailLambdaExecutionRole", { assumedBy: new cdk.aws_iam.ServicePrincipal("lambda.amazonaws.com"), managedPolicies: [ cdk.aws_iam.ManagedPolicy.fromAwsManagedPolicyName( @@ -110,6 +117,7 @@ export class Email extends cdk.NestedStack { inlinePolicies: { EmailServicePolicy: new cdk.aws_iam.PolicyDocument({ statements: [ + // OpenSearch new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: [ @@ -122,6 +130,7 @@ export class Email extends cdk.NestedStack { ], resources: [`${openSearchDomainArn}/*`], }), + // SES + other new cdk.aws_iam.PolicyStatement({ actions: [ "ses:SendEmail", @@ -134,31 +143,37 @@ export class Email extends cdk.NestedStack { ], resources: ["*"], }), + // EC2 (VPC networking calls) new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: ["ec2:DescribeSecurityGroups", "ec2:DescribeVpcs"], resources: ["*"], }), + // Secrets Manager new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: ["secretsmanager:DescribeSecret", "secretsmanager:GetSecretValue"], resources: [`arn:aws:secretsmanager:${this.region}:${this.account}:secret:*`], }), + // Cognito user pool list users new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: ["cognito-idp:ListUsers"], resources: [userPool.userPoolArn], }), + // Deny creation of Log Groups if you have them pre-created new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.DENY, actions: ["logs:CreateLogGroup"], resources: ["*"], }), + // Permissions to send messages to the DLQ new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: ["sqs:SendMessage"], resources: [dlq.queueArn], }), + // VPC ENI permissions new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: [ @@ -170,30 +185,88 @@ export class Email extends cdk.NestedStack { ], resources: ["*"], }), + // Permissions to read/write from the main queue + new cdk.aws_iam.PolicyStatement({ + effect: cdk.aws_iam.Effect.ALLOW, + actions: [ + "sqs:SendMessage", + "sqs:ReceiveMessage", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes", + ], + resources: [emailQueue.queueArn], + }), ], }), }, }); - const processEmailsLambda = new NodejsFunction(this, "ProcessEmailsLambda", { - functionName: `${project}-${stage}-${stack}-processEmails`, + // ------------------------------------------------------------------------- + // SES ConfigurationSet + // ------------------------------------------------------------------------- + const emailConfig: cdk.aws_ses.CfnConfigurationSetProps = { + name: `${project}-${stage}-${stack}-email-configuration-set`, + reputationOptions: { + reputationMetricsEnabled: true, + }, + sendingOptions: { + sendingEnabled: true, + }, + suppressionOptions: { + suppressedReasons: ["BOUNCE", "COMPLAINT"], + }, + }; + const emailConfigSet = new cdk.aws_ses.CfnConfigurationSet( + this, + "ConfigurationSet", + emailConfig, + ); + + // ------------------------------------------------------------------------- + // Lambda A: Kafka -> SQS + // ------------------------------------------------------------------------- + const kafkaToSqsLambda = new NodejsFunction(this, "KafkaToSqsLambda", { + functionName: `${project}-${stage}-${stack}-kafkaToSqs`, deadLetterQueue: dlq, depsLockFilePath: join(__dirname, "../../bun.lockb"), - entry: join(__dirname, "../lambda/processEmails.ts"), + entry: join(__dirname, "../lambda/kafkaToSqs.ts"), handler: "handler", - runtime: cdk.aws_lambda.Runtime.NODEJS_18_X, - memorySize: envConfig[props.isDev ? "dev" : "prod"].memorySize, - timeout: cdk.Duration.minutes(envConfig[props.isDev ? "dev" : "prod"].timeout), + runtime: cdk.aws_lambda.Runtime.NODEJS_20_X, + memorySize: envConfig[environmentType].memorySize, + timeout: cdk.Duration.minutes(envConfig[environmentType].timeout), role: lambdaRole, - vpc: vpc, - vpcSubnets: { - subnets: privateSubnets, + vpc, + vpcSubnets: { subnets: privateSubnets }, + logRetention: envConfig[environmentType].logRetention, + securityGroups: [lambdaSecurityGroup], + environment: { + region: cdk.Stack.of(this).region, + DELAY_QUEUE_URL: emailQueue.queueUrl, }, - logRetention: envConfig[isDev ? "dev" : "prod"].logRetention, + bundling: commonBundlingOptions, + tracing: cdk.aws_lambda.Tracing.ACTIVE, + }); + + // ------------------------------------------------------------------------- + // Lambda B: SQS -> OS -> Email + // ------------------------------------------------------------------------- + const delayedEmailLambda = new NodejsFunction(this, "DelayedEmailLambda", { + functionName: `${project}-${stage}-${stack}-delayedEmailProcessor`, + deadLetterQueue: dlq, + depsLockFilePath: join(__dirname, "../../bun.lockb"), + entry: join(__dirname, "../lambda/delayedEmailProcessor.ts"), + handler: "handler", + runtime: cdk.aws_lambda.Runtime.NODEJS_20_X, + memorySize: envConfig[environmentType].memorySize, + timeout: cdk.Duration.minutes(10), + role: lambdaRole, + vpc, + vpcSubnets: { subnets: privateSubnets }, + logRetention: envConfig[environmentType].logRetention, securityGroups: [lambdaSecurityGroup], environment: { region: cdk.Stack.of(this).region, - configurationSetName: `${project}-${stage}-${stack}-email-configuration-set`, + configurationSetName: emailConfigSet.name!, stage, isDev: isDev.toString(), indexNamespace, @@ -202,27 +275,15 @@ export class Email extends cdk.NestedStack { emailAddressLookupSecretName, userPoolId: userPool.userPoolId, DLQ_URL: dlq.queueUrl, - VPC_ID: vpc.vpcId, - SECURITY_GROUP_ID: lambdaSecurityGroup.securityGroupId, }, bundling: commonBundlingOptions, tracing: cdk.aws_lambda.Tracing.ACTIVE, }); - const alarmTopic = new cdk.aws_sns.Topic(this, "EmailErrorAlarmTopic"); - - const alarm = new cdk.aws_cloudwatch.Alarm(this, "EmailErrorAlarm", { - actionsEnabled: true, - metric: processEmailsLambda.metricErrors(), - threshold: 1, - evaluationPeriods: 1, - alarmDescription: "Email processing lambda errors", - treatMissingData: cdk.aws_cloudwatch.TreatMissingData.NOT_BREACHING, - }); - - alarm.addAlarmAction(new cdk.aws_cloudwatch_actions.SnsAction(alarmTopic)); - - new CfnEventSourceMapping(this, "SinkSESTriggerOnemac", { + // ------------------------------------------------------------------------- + // Event Source Mapping: Kafka -> kafkaToSqsLambda + // ------------------------------------------------------------------------- + new CfnEventSourceMapping(this, "KafkaToSqsEventSourceMapping", { batchSize: 1, enabled: true, selfManagedEventSource: { @@ -230,7 +291,7 @@ export class Email extends cdk.NestedStack { kafkaBootstrapServers: brokerString.split(","), }, }, - functionName: processEmailsLambda.functionName, + functionName: kafkaToSqsLambda.functionName, sourceAccessConfigurations: [ ...privateSubnets.map((subnet) => ({ type: "VPC_SUBNET", @@ -250,54 +311,59 @@ export class Email extends cdk.NestedStack { }, }); - new CfnEventSourceMapping(this, "SinkSESTriggerSEATool", { - batchSize: 1, - enabled: true, - selfManagedEventSource: { - endpoints: { - kafkaBootstrapServers: brokerString.split(","), - }, - }, - functionName: processEmailsLambda.functionName, - sourceAccessConfigurations: [ - ...privateSubnets.map((subnet) => ({ - type: "VPC_SUBNET", - uri: subnet.subnetId, - })), - { - type: "VPC_SECURITY_GROUP", - uri: `security_group:${lambdaSecurityGroup.securityGroupId}`, - }, - ], - startingPosition: "LATEST", - topics: [`aws.seatool.ksql.onemac.three.agg.State_Plan`], - destinationConfig: { - onFailure: { - destination: dlq.queueArn, - }, - }, - }); + // ------------------------------------------------------------------------- + // SQS Event Source: emailQueue -> delayedEmailLambda + // ------------------------------------------------------------------------- + emailQueue.grantConsumeMessages(delayedEmailLambda); + delayedEmailLambda.addEventSource( + new SqsEventSource(emailQueue, { + batchSize: 10, + }), + ); + + // ------------------------------------------------------------------------- + // CloudWatch Alarms & SNS + // ------------------------------------------------------------------------- + const alarmTopic = new cdk.aws_sns.Topic(this, "EmailErrorAlarmTopic"); - // Add CloudWatch alarms - new cdk.aws_cloudwatch.Alarm(this, "EmailProcessingErrors", { - metric: processEmailsLambda.metricErrors(), + // KafkaToSqs Lambda alarms + const kafkaErrorsAlarm = new cdk.aws_cloudwatch.Alarm(this, "KafkaToSqsErrors", { + metric: kafkaToSqsLambda.metricErrors(), + threshold: 1, + evaluationPeriods: 1, + alarmDescription: "KafkaToSqs lambda errors", + actionsEnabled: true, + treatMissingData: cdk.aws_cloudwatch.TreatMissingData.NOT_BREACHING, + }); + const kafkaThrottlingAlarm = new cdk.aws_cloudwatch.Alarm(this, "KafkaToSqsThrottling", { + metric: kafkaToSqsLambda.metricThrottles(), threshold: 1, evaluationPeriods: 1, - alarmDescription: "Email processing lambda errors", + alarmDescription: "KafkaToSqs lambda throttled", actionsEnabled: true, treatMissingData: cdk.aws_cloudwatch.TreatMissingData.NOT_BREACHING, }); - new cdk.aws_cloudwatch.Alarm(this, "EmailLambdaThrottling", { - metric: processEmailsLambda.metricThrottles(), + // DelayedEmailLambda alarms + const delayedEmailErrors = new cdk.aws_cloudwatch.Alarm(this, "DelayedEmailErrors", { + metric: delayedEmailLambda.metricErrors(), threshold: 1, evaluationPeriods: 1, - alarmDescription: "Email processing lambda is being throttled", + alarmDescription: "Delayed email processing lambda errors", + actionsEnabled: true, + treatMissingData: cdk.aws_cloudwatch.TreatMissingData.NOT_BREACHING, + }); + const delayedEmailThrottles = new cdk.aws_cloudwatch.Alarm(this, "DelayedEmailThrottling", { + metric: delayedEmailLambda.metricThrottles(), + threshold: 1, + evaluationPeriods: 1, + alarmDescription: "Delayed email processing lambda is throttled", actionsEnabled: true, treatMissingData: cdk.aws_cloudwatch.TreatMissingData.NOT_BREACHING, }); - new cdk.aws_cloudwatch.Alarm(this, "EmailDLQMessages", { + // DLQ alarm + const dlqAlarm = new cdk.aws_cloudwatch.Alarm(this, "EmailDLQMessages", { metric: dlq.metricNumberOfMessagesReceived(), threshold: 1, evaluationPeriods: 1, @@ -306,18 +372,28 @@ export class Email extends cdk.NestedStack { treatMissingData: cdk.aws_cloudwatch.TreatMissingData.NOT_BREACHING, }); - processEmailsLambda.node.addDependency(lambdaSecurityGroup); - - new cdk.aws_cloudwatch.Alarm(this, "SESSendQuotaAlarm", { + // SES Quota alarm + const sesQuotaAlarm = new cdk.aws_cloudwatch.Alarm(this, "SESSendQuotaAlarm", { metric: new cdk.aws_cloudwatch.Metric({ namespace: "AWS/SES", metricName: "Daily24HourSend", statistic: "Sum", period: cdk.Duration.hours(24), }), - threshold: envConfig[props.isDev ? "dev" : "prod"].dailySendQuota * 0.8, // 80% of quota + threshold: envConfig[environmentType].dailySendQuota * 0.8, // 80% usage evaluationPeriods: 1, comparisonOperator: cdk.aws_cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD, }); + + // SNS action + const snsAction = new cdk.aws_cloudwatch_actions.SnsAction(alarmTopic); + + // Add actions + kafkaErrorsAlarm.addAlarmAction(snsAction); + kafkaThrottlingAlarm.addAlarmAction(snsAction); + delayedEmailErrors.addAlarmAction(snsAction); + delayedEmailThrottles.addAlarmAction(snsAction); + dlqAlarm.addAlarmAction(snsAction); + sesQuotaAlarm.addAlarmAction(snsAction); } }