Skip to content

Commit

Permalink
Fb/do not filter events for redis (#258)
Browse files Browse the repository at this point in the history
* fix non integer values for redis

* snapshpt

* lint

* do not filter events for redis

* change error to warn

* changelog

---------

Co-authored-by: Max Gruenfelder <[email protected]>
  • Loading branch information
soccermax and Max Gruenfelder authored Dec 5, 2024
1 parent b22964a commit 74a4bd0
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 13 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## v1.8.0 - 2024-11-XX
## v1.8.0 - 2024-12-05

### Added

- Added a suspended status for events. Events in this status won't be processed until changed back to open. Useful for
delaying processing when the event isn't ready.
- Added an option to filter tenant lists when checking for open events. Published events are still processed for all
tenants, but periodic events can be filtered.
Distributed events via Redis that are not part of the current configuration to support DWC use cases where the same
app might have different software states.

### Changed

Expand Down
4 changes: 2 additions & 2 deletions src/redis/redisSub.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ const _messageHandlerProcessEvents = async (messageData) => {
const service = await cds.connect.to(subType);
cds.outboxed(service);
} catch (err) {
logger.error("could not connect to outboxed service", err, {
logger.warn("could not connect to outboxed service", err, {
type,
subType,
});
return;
}
} else {
logger.error("cannot find configuration for published event. Event won't be processed", {
logger.warn("cannot find configuration for published event. Event won't be processed", {
type,
subType,
});
Expand Down
23 changes: 19 additions & 4 deletions src/runner/openEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const cds = require("@sap/cds");
const eventConfig = require("../config");
const { EventProcessingStatus } = require("../constants");

const MS_IN_DAYS = 24 * 60 * 60 * 1000;

const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => {
const startTime = new Date();
const refDateStartAfter = new Date(startTime.getTime() + eventConfig.runInterval * 1.2);
Expand All @@ -21,7 +23,11 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => {
EventProcessingStatus.InProgress,
"AND lastAttemptTimestamp <=",
new Date(startTime.getTime() - eventConfig.globalTxTimeout).toISOString(),
") )"
") ) AND (createdAt >=",
new Date(startTime.getTime() - 30 * MS_IN_DAYS).toISOString(),
" OR startAfter >=",
new Date(startTime.getTime() - 30 * MS_IN_DAYS).toISOString(),
")"
)
.columns("type", "subType")
.groupBy("type", "subType")
Expand All @@ -30,9 +36,13 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => {
const result = [];
for (const { type, subType } of entries) {
if (eventConfig.isCapOutboxEvent(type)) {
await cds.connect
cds.connect
.to(subType)
.then((service) => {
if (!filterAppSpecificEvents) {
return; // will be done in finally
}

if (!service) {
return;
}
Expand All @@ -45,7 +55,12 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => {
result.push({ type, subType });
}
})
.catch(() => {});
.catch(() => {})
.finally(() => {
if (!filterAppSpecificEvents) {
result.push({ type, subType });
}
});
} else {
if (filterAppSpecificEvents) {
if (
Expand All @@ -55,7 +70,7 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => {
result.push({ type, subType });
}
} else {
eventConfig.getEventConfig(type, subType) && result.push({ type, subType });
result.push({ type, subType });
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/runner/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ const _singleTenantRedis = async () => {
"get-openEvents-and-publish",
async () => {
return await cds.tx({}, async (tx) => {
const entries = await openEvents.getOpenQueueEntries(tx);
const entries = await openEvents.getOpenQueueEntries(tx, false);
logger.info("broadcasting events for run", {
entries: entries.length,
});
Expand Down
37 changes: 37 additions & 0 deletions test/__snapshots__/baseFunctionality.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,43 @@ exports[`baseFunctionality getOpenQueueEntries event types in progress should be
]
`;

exports[`baseFunctionality getOpenQueueEntries filterAppSpecificEvents return open event types 2`] = `
[
{
"subType": "AppInstance",
"type": "AppSpecific_PERIODIC",
},
{
"subType": "AppName",
"type": "AppSpecific_PERIODIC",
},
{
"subType": "both",
"type": "AppSpecific_PERIODIC",
},
{
"subType": "DELETE_EVENTS",
"type": "EVENT_QUEUE_BASE_PERIODIC",
},
{
"subType": "DB",
"type": "HealthCheck_PERIODIC",
},
{
"subType": "Task",
"type": "Notifications",
},
{
"subType": "everyFiveMin",
"type": "TimeSpecificEveryFiveMin_PERIODIC",
},
{
"subType": "cron",
"type": "TimeSpecificEveryMin_PERIODIC",
},
]
`;

exports[`baseFunctionality getOpenQueueEntries return open event types 2`] = `
[
{
Expand Down
8 changes: 4 additions & 4 deletions test/__snapshots__/eventQueueOutbox.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true req.data should be stored

exports[`event-queue outbox monkeyPatchCAPOutbox=true return open event types 1`] = `
[
{
"subType": "NotificationService",
"type": "CAP_OUTBOX",
},
{
"subType": "DELETE_EVENTS",
"type": "EVENT_QUEUE_BASE_PERIODIC",
Expand All @@ -127,6 +123,10 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true return open event types 1`
"subType": "cron",
"type": "TimeSpecificEveryMin_PERIODIC",
},
{
"subType": "NotificationService",
"type": "CAP_OUTBOX",
},
]
`;

Expand Down
55 changes: 55 additions & 0 deletions test/baseFunctionality.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,61 @@ describe("baseFunctionality", () => {
});

describe("getOpenQueueEntries", () => {
describe("filterAppSpecificEvents", () => {
test("return open event types", async () => {
await testHelper.insertEventEntry(tx);
await checkAndInsertPeriodicEvents(context);
const result = await getOpenQueueEntries(tx, false);
expect(result.length).toMatchInlineSnapshot(`8`); // 1 ad-hoc and 4 periodic
expect(result).toMatchSnapshot();
});

test("should also return not existing events", async () => {
await testHelper.insertEventEntry(tx);
await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ type: "123" }));
const result = await getOpenQueueEntries(tx, false);
expect(result.length).toMatchInlineSnapshot(`1`);
});

test("should not return not existing events if cratedAt older than 30 days", async () => {
await testHelper.insertEventEntry(tx);
await tx.run(
UPDATE.entity("sap.eventqueue.Event").set({
type: "123",
createdAt: new Date(Date.now() - 31 * 24 * 60 * 60 * 1000).toISOString(),
})
);
const result = await getOpenQueueEntries(tx, false);
expect(result.length).toMatchInlineSnapshot(`0`);
});

test("should return existing events if startAfter within 30 days but createdAt older than 30 days", async () => {
await testHelper.insertEventEntry(tx);
await tx.run(
UPDATE.entity("sap.eventqueue.Event").set({
type: "123",
createdAt: new Date(Date.now() - 31 * 24 * 60 * 60 * 1000).toISOString(),
startAfter: new Date(Date.now() - 29 * 24 * 60 * 60 * 1000).toISOString(),
})
);
const result = await getOpenQueueEntries(tx, false);
expect(result.length).toMatchInlineSnapshot(`1`);
});

test("should return not existing events if startAfter and createdAt is after 30 days", async () => {
await testHelper.insertEventEntry(tx);
await tx.run(
UPDATE.entity("sap.eventqueue.Event").set({
type: "123",
createdAt: new Date(Date.now() - 31 * 24 * 60 * 60 * 1000).toISOString(),
startAfter: new Date(Date.now() - 29 * 24 * 60 * 60 * 1000).toISOString(),
})
);
const result = await getOpenQueueEntries(tx, false);
expect(result.length).toMatchInlineSnapshot(`1`);
});
});

test("return open event types", async () => {
await testHelper.insertEventEntry(tx);
await checkAndInsertPeriodicEvents(context);
Expand Down
2 changes: 1 addition & 1 deletion test/eventQueueOutbox.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ describe("event-queue outbox", () => {
);
config = eventQueue.config.getEventConfig(type, subType);
expect(config).toBeUndefined();
expect(loggerMock.callsLengths().error).toEqual(1);
expect(loggerMock.callsLengths().warn).toEqual(1);
expect(runEventCombinationForTenantSpy).toHaveBeenCalledTimes(0);
});
});
Expand Down

0 comments on commit 74a4bd0

Please sign in to comment.