From c891117bed78efcf2bfa803bffa0ce00eedcfd3c Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Mon, 8 Jan 2024 14:01:36 +0100 Subject: [PATCH] Fix update tenant hash (#101) * fix update tenant hash * more fixes for updates * more fixes for updates * cleanup --------- Co-authored-by: Max Gruenfelder --- CHANGELOG.md | 7 + src/periodicEvents.js | 6 +- src/runner.js | 3 +- test-integration/runner.test.js | 157 ++++++++++++++++++ .../__snapshots__/periodicEvents.test.js.snap | 37 +++++ test/periodicEvents.test.js | 16 ++ 6 files changed, 223 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c98abbd0..37c520b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## v1.0.3 - 2024-01-08 + +### Fixed + +- update tenant hash for newly onboarded tenants +- consider running periodic events during update of periodic events + ## v1.0.2 - 2024-01-05 ### Added diff --git a/src/periodicEvents.js b/src/periodicEvents.js index 2ae45bed..64e6dfdd 100644 --- a/src/periodicEvents.js +++ b/src/periodicEvents.js @@ -22,8 +22,10 @@ const checkAndInsertPeriodicEvents = async (context) => { }, "AND", { ref: ["status"] }, - "=", - { val: EventProcessingStatus.Open }, + "IN", + { + list: [{ val: EventProcessingStatus.Open }, { val: EventProcessingStatus.InProgress }], + }, ]) .columns(["ID", "type", "subType", "startAfter"]); const currentPeriodEvents = await tx.run(baseCqn); diff --git a/src/runner.js b/src/runner.js index 3c77160e..f10f9b23 100644 --- a/src/runner.js +++ b/src/runner.js @@ -88,6 +88,7 @@ const _checkAndTriggerPeriodicEventUpdate = (tenantIds) => { return; } if (tenantIdHash && tenantIdHash !== hash) { + tenantIdHash = hash; cds.log(COMPONENT_NAME).info("tenant id hash changed, triggering updating periodic events!"); _multiTenancyPeriodicEvents().catch((err) => { cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events!", err); @@ -300,7 +301,7 @@ const _checkPeriodicEventsSingleTenant = async (context = {}) => { return; } try { - logger.info("executing updating periotic events", { + logger.info("executing updating periodic events", { tenantId: context.tenant, subdomain: context.http?.req.authInfo.getSubdomain(), }); diff --git a/test-integration/runner.test.js b/test-integration/runner.test.js index f8162de1..7d396d95 100644 --- a/test-integration/runner.test.js +++ b/test-integration/runner.test.js @@ -1,5 +1,7 @@ "use strict"; +const { promisify } = require("util"); + const cds = require("@sap/cds"); cds.test(__dirname + "/_env"); @@ -283,4 +285,159 @@ describe("redisRunner", () => { jest.useRealTimers(); }); }); + + describe("tenant hash", () => { + it("should trigger update periodic events once per tenant", async () => { + let counter = 0; + let acquireLockSpy; + const promise = new Promise((resolve) => { + acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => { + if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") { + counter++; + } + if (counter === tenantIds.length) { + resolve(); + } + }); + }); + getAllTenantIdsSpy.mockResolvedValueOnce(tenantIds).mockResolvedValueOnce(tenantIds); + const p1 = runner._._multiTenancyDb(); + await p1.then((p) => Promise.allSettled(p)); + await promise; + expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(3); + + acquireLockSpy.mockRestore(); + }); + + it("should not trigger update again if tenant ids have not been changed", async () => { + let counter = 0; + let acquireLockSpy; + const promise = new Promise((resolve) => { + acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => { + if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") { + counter++; + } + if (counter === tenantIds.length) { + resolve(); + } + }); + }); + getAllTenantIdsSpy.mockResolvedValueOnce(tenantIds).mockResolvedValueOnce(tenantIds); + const p1 = runner._._multiTenancyDb(); + await p1.then((p) => Promise.allSettled(p)); + await promise; + expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(3); + + // second run + getAllTenantIdsSpy.mockResolvedValueOnce(tenantIds); + const p2 = runner._._multiTenancyDb(); + await p2.then((p) => Promise.allSettled(p)); + await promisify(setTimeout)(500); + + expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(3); + acquireLockSpy.mockRestore(); + }); + + it("should trigger update again if tenant ids have been changed", async () => { + let counter = 0; + let acquireLockSpy; + const promise = new Promise((resolve) => { + acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => { + if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") { + counter++; + } + if (counter === tenantIds.length) { + resolve(); + } + }); + }); + getAllTenantIdsSpy.mockResolvedValueOnce(tenantIds).mockResolvedValueOnce(tenantIds); + const p1 = runner._._multiTenancyDb(); + await p1.then((p) => Promise.allSettled(p)); + await promise; + expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(3); + + // second run with changed tenant ids + getAllTenantIdsSpy + .mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792")) + .mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792")); + + const promise2 = new Promise((resolve) => { + counter = 0; + acquireLockSpy.mockRestore(); + acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => { + if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") { + counter++; + } + if (counter === tenantIds.length) { + resolve(); + } + }); + }); + + const p2 = runner._._multiTenancyDb(); + await p2.then((p) => Promise.allSettled(p)); + await promise2; + + expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(4); + acquireLockSpy.mockRestore(); + }); + + it("should trigger update again if tenant ids have been changed and third run should not trigger an update", async () => { + let counter = 0; + let acquireLockSpy; + const promise = new Promise((resolve) => { + acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => { + if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") { + counter++; + } + if (counter === tenantIds.length) { + resolve(); + } + }); + }); + getAllTenantIdsSpy.mockResolvedValueOnce(tenantIds).mockResolvedValueOnce(tenantIds); + const p1 = runner._._multiTenancyDb(); + await p1.then((p) => Promise.allSettled(p)); + await promise; + expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(3); + + // second run with changed tenant ids + getAllTenantIdsSpy + .mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792")) + .mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792")); + + const promise2 = new Promise((resolve) => { + counter = 0; + acquireLockSpy.mockRestore(); + acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => { + if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") { + counter++; + } + if (counter === tenantIds.length) { + resolve(); + } + }); + }); + + const p2 = runner._._multiTenancyDb(); + await p2.then((p) => Promise.allSettled(p)); + await promise2; + + expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(4); + acquireLockSpy.mockReset(); + + // thirds run with same tenant ids + getAllTenantIdsSpy + .mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792")) + .mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792")); + + const p3 = runner._._multiTenancyDb(); + await p3.then((p) => Promise.allSettled(p)); + await promisify(setTimeout)(500); + + expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(0); + acquireLockSpy.mockRestore(); + }); + }); }); diff --git a/test/__snapshots__/periodicEvents.test.js.snap b/test/__snapshots__/periodicEvents.test.js.snap index 22d09cb2..2bea9e94 100644 --- a/test/__snapshots__/periodicEvents.test.js.snap +++ b/test/__snapshots__/periodicEvents.test.js.snap @@ -91,6 +91,43 @@ exports[`baseFunctionality delta insert 2`] = ` ] `; +exports[`baseFunctionality if periodic event is in progress - no insert should happen 1`] = ` +[ + [ + "1/1 | inserting chunk of changed or new periodic events", + { + "events": [ + { + "interval": 10, + "subType": "DB", + "type": "HealthCheck_PERIODIC", + }, + { + "interval": 86400, + "subType": "DELETE_EVENTS", + "type": "EVENT_QUEUE_BASE_PERIODIC", + }, + ], + }, + ], +] +`; + +exports[`baseFunctionality if periodic event is in progress - no insert should happen 2`] = ` +[ + { + "attempts": 0, + "startAfter": "2023-11-13T11:00:00.000Z", + "status": 1, + }, + { + "attempts": 0, + "startAfter": "2023-11-13T11:00:00.000Z", + "status": 1, + }, +] +`; + exports[`baseFunctionality interval changed 1`] = ` [ [ diff --git a/test/periodicEvents.test.js b/test/periodicEvents.test.js index cd509665..f082e955 100644 --- a/test/periodicEvents.test.js +++ b/test/periodicEvents.test.js @@ -9,6 +9,7 @@ const { Logger: mockLogger } = require("./mocks/logger"); const { checkAndInsertPeriodicEvents } = require("../src/periodicEvents"); const config = require("../src/config"); const { selectEventQueueAndReturn } = require("./helper"); +const { EventProcessingStatus } = require("../src/constants"); const project = __dirname + "/.."; // The project's root folder cds.test(project); @@ -89,4 +90,19 @@ describe("baseFunctionality", () => { expect(loggerMock.calls().info).toMatchSnapshot(); expect(await selectEventQueueAndReturn(tx, { expectedLength: 2 })).toMatchSnapshot(); }); + + it("if periodic event is in progress - no insert should happen", async () => { + await checkAndInsertPeriodicEvents(context); + + await tx.run( + UPDATE.entity("sap.eventqueue.Event").set({ + status: EventProcessingStatus.InProgress, + }) + ); + await checkAndInsertPeriodicEvents(context); + + expect(loggerMock.callsLengths().error).toEqual(0); + expect(loggerMock.calls().info).toMatchSnapshot(); + expect(await selectEventQueueAndReturn(tx, { expectedLength: 2 })).toMatchSnapshot(); + }); });