diff --git a/CHANGELOG.md b/CHANGELOG.md index e71f5f3..3f46e52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). -## v1.8.0 - 2024-11-XX +## v1.8.0 - 2024-12-05 ### Added @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. delaying processing when the event isn't ready. - Added an option to filter tenant lists when checking for open events. Published events are still processed for all tenants, but periodic events can be filtered. + Distributed events via Redis that are not part of the current configuration to support DWC use cases where the same + app might have different software states. ### Changed diff --git a/src/redis/redisSub.js b/src/redis/redisSub.js index b961454..cb0acb7 100644 --- a/src/redis/redisSub.js +++ b/src/redis/redisSub.js @@ -41,14 +41,14 @@ const _messageHandlerProcessEvents = async (messageData) => { const service = await cds.connect.to(subType); cds.outboxed(service); } catch (err) { - logger.error("could not connect to outboxed service", err, { + logger.warn("could not connect to outboxed service", err, { type, subType, }); return; } } else { - logger.error("cannot find configuration for published event. Event won't be processed", { + logger.warn("cannot find configuration for published event. Event won't be processed", { type, subType, }); diff --git a/src/runner/openEvents.js b/src/runner/openEvents.js index 7f6e3e5..0fd4c2b 100644 --- a/src/runner/openEvents.js +++ b/src/runner/openEvents.js @@ -5,6 +5,8 @@ const cds = require("@sap/cds"); const eventConfig = require("../config"); const { EventProcessingStatus } = require("../constants"); +const MS_IN_DAYS = 24 * 60 * 60 * 1000; + const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { const startTime = new Date(); const refDateStartAfter = new Date(startTime.getTime() + eventConfig.runInterval * 1.2); @@ -21,7 +23,11 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { EventProcessingStatus.InProgress, "AND lastAttemptTimestamp <=", new Date(startTime.getTime() - eventConfig.globalTxTimeout).toISOString(), - ") )" + ") ) AND (createdAt >=", + new Date(startTime.getTime() - 30 * MS_IN_DAYS).toISOString(), + " OR startAfter >=", + new Date(startTime.getTime() - 30 * MS_IN_DAYS).toISOString(), + ")" ) .columns("type", "subType") .groupBy("type", "subType") @@ -30,9 +36,13 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { const result = []; for (const { type, subType } of entries) { if (eventConfig.isCapOutboxEvent(type)) { - await cds.connect + cds.connect .to(subType) .then((service) => { + if (!filterAppSpecificEvents) { + return; // will be done in finally + } + if (!service) { return; } @@ -45,7 +55,12 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { result.push({ type, subType }); } }) - .catch(() => {}); + .catch(() => {}) + .finally(() => { + if (!filterAppSpecificEvents) { + result.push({ type, subType }); + } + }); } else { if (filterAppSpecificEvents) { if ( @@ -55,7 +70,7 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { result.push({ type, subType }); } } else { - eventConfig.getEventConfig(type, subType) && result.push({ type, subType }); + result.push({ type, subType }); } } } diff --git a/src/runner/runner.js b/src/runner/runner.js index 14f2e88..274a7f1 100644 --- a/src/runner/runner.js +++ b/src/runner/runner.js @@ -328,7 +328,7 @@ const _singleTenantRedis = async () => { "get-openEvents-and-publish", async () => { return await cds.tx({}, async (tx) => { - const entries = await openEvents.getOpenQueueEntries(tx); + const entries = await openEvents.getOpenQueueEntries(tx, false); logger.info("broadcasting events for run", { entries: entries.length, }); diff --git a/test/__snapshots__/baseFunctionality.test.js.snap b/test/__snapshots__/baseFunctionality.test.js.snap index d1e9643..ca7006f 100644 --- a/test/__snapshots__/baseFunctionality.test.js.snap +++ b/test/__snapshots__/baseFunctionality.test.js.snap @@ -141,6 +141,43 @@ exports[`baseFunctionality getOpenQueueEntries event types in progress should be ] `; +exports[`baseFunctionality getOpenQueueEntries filterAppSpecificEvents return open event types 2`] = ` +[ + { + "subType": "AppInstance", + "type": "AppSpecific_PERIODIC", + }, + { + "subType": "AppName", + "type": "AppSpecific_PERIODIC", + }, + { + "subType": "both", + "type": "AppSpecific_PERIODIC", + }, + { + "subType": "DELETE_EVENTS", + "type": "EVENT_QUEUE_BASE_PERIODIC", + }, + { + "subType": "DB", + "type": "HealthCheck_PERIODIC", + }, + { + "subType": "Task", + "type": "Notifications", + }, + { + "subType": "everyFiveMin", + "type": "TimeSpecificEveryFiveMin_PERIODIC", + }, + { + "subType": "cron", + "type": "TimeSpecificEveryMin_PERIODIC", + }, +] +`; + exports[`baseFunctionality getOpenQueueEntries return open event types 2`] = ` [ { diff --git a/test/__snapshots__/eventQueueOutbox.test.js.snap b/test/__snapshots__/eventQueueOutbox.test.js.snap index 6e21ba7..cf67969 100644 --- a/test/__snapshots__/eventQueueOutbox.test.js.snap +++ b/test/__snapshots__/eventQueueOutbox.test.js.snap @@ -103,10 +103,6 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true req.data should be stored exports[`event-queue outbox monkeyPatchCAPOutbox=true return open event types 1`] = ` [ - { - "subType": "NotificationService", - "type": "CAP_OUTBOX", - }, { "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -127,6 +123,10 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true return open event types 1` "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", }, + { + "subType": "NotificationService", + "type": "CAP_OUTBOX", + }, ] `; diff --git a/test/baseFunctionality.test.js b/test/baseFunctionality.test.js index a884ca3..98d4efa 100644 --- a/test/baseFunctionality.test.js +++ b/test/baseFunctionality.test.js @@ -534,6 +534,61 @@ describe("baseFunctionality", () => { }); describe("getOpenQueueEntries", () => { + describe("filterAppSpecificEvents", () => { + test("return open event types", async () => { + await testHelper.insertEventEntry(tx); + await checkAndInsertPeriodicEvents(context); + const result = await getOpenQueueEntries(tx, false); + expect(result.length).toMatchInlineSnapshot(`8`); // 1 ad-hoc and 4 periodic + expect(result).toMatchSnapshot(); + }); + + test("should also return not existing events", async () => { + await testHelper.insertEventEntry(tx); + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ type: "123" })); + const result = await getOpenQueueEntries(tx, false); + expect(result.length).toMatchInlineSnapshot(`1`); + }); + + test("should not return not existing events if cratedAt older than 30 days", async () => { + await testHelper.insertEventEntry(tx); + await tx.run( + UPDATE.entity("sap.eventqueue.Event").set({ + type: "123", + createdAt: new Date(Date.now() - 31 * 24 * 60 * 60 * 1000).toISOString(), + }) + ); + const result = await getOpenQueueEntries(tx, false); + expect(result.length).toMatchInlineSnapshot(`0`); + }); + + test("should return existing events if startAfter within 30 days but createdAt older than 30 days", async () => { + await testHelper.insertEventEntry(tx); + await tx.run( + UPDATE.entity("sap.eventqueue.Event").set({ + type: "123", + createdAt: new Date(Date.now() - 31 * 24 * 60 * 60 * 1000).toISOString(), + startAfter: new Date(Date.now() - 29 * 24 * 60 * 60 * 1000).toISOString(), + }) + ); + const result = await getOpenQueueEntries(tx, false); + expect(result.length).toMatchInlineSnapshot(`1`); + }); + + test("should return not existing events if startAfter and createdAt is after 30 days", async () => { + await testHelper.insertEventEntry(tx); + await tx.run( + UPDATE.entity("sap.eventqueue.Event").set({ + type: "123", + createdAt: new Date(Date.now() - 31 * 24 * 60 * 60 * 1000).toISOString(), + startAfter: new Date(Date.now() - 29 * 24 * 60 * 60 * 1000).toISOString(), + }) + ); + const result = await getOpenQueueEntries(tx, false); + expect(result.length).toMatchInlineSnapshot(`1`); + }); + }); + test("return open event types", async () => { await testHelper.insertEventEntry(tx); await checkAndInsertPeriodicEvents(context); diff --git a/test/eventQueueOutbox.test.js b/test/eventQueueOutbox.test.js index c8bea11..4578d7e 100644 --- a/test/eventQueueOutbox.test.js +++ b/test/eventQueueOutbox.test.js @@ -603,7 +603,7 @@ describe("event-queue outbox", () => { ); config = eventQueue.config.getEventConfig(type, subType); expect(config).toBeUndefined(); - expect(loggerMock.callsLengths().error).toEqual(1); + expect(loggerMock.callsLengths().warn).toEqual(1); expect(runEventCombinationForTenantSpy).toHaveBeenCalledTimes(0); }); });