Skip to content

Commit

Permalink
Fix update tenant hash (#101)
Browse files Browse the repository at this point in the history
* fix update tenant hash

* more fixes for updates

* more fixes for updates

* cleanup

---------

Co-authored-by: Max Gruenfelder <[email protected]>
  • Loading branch information
soccermax and Max Gruenfelder authored Jan 8, 2024
1 parent 0b995cb commit c891117
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 3 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## v1.0.3 - 2024-01-08

### Fixed

- update tenant hash for newly onboarded tenants
- consider running periodic events during update of periodic events

## v1.0.2 - 2024-01-05

### Added
Expand Down
6 changes: 4 additions & 2 deletions src/periodicEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ const checkAndInsertPeriodicEvents = async (context) => {
},
"AND",
{ ref: ["status"] },
"=",
{ val: EventProcessingStatus.Open },
"IN",
{
list: [{ val: EventProcessingStatus.Open }, { val: EventProcessingStatus.InProgress }],
},
])
.columns(["ID", "type", "subType", "startAfter"]);
const currentPeriodEvents = await tx.run(baseCqn);
Expand Down
3 changes: 2 additions & 1 deletion src/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ const _checkAndTriggerPeriodicEventUpdate = (tenantIds) => {
return;
}
if (tenantIdHash && tenantIdHash !== hash) {
tenantIdHash = hash;
cds.log(COMPONENT_NAME).info("tenant id hash changed, triggering updating periodic events!");
_multiTenancyPeriodicEvents().catch((err) => {
cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events!", err);
Expand Down Expand Up @@ -300,7 +301,7 @@ const _checkPeriodicEventsSingleTenant = async (context = {}) => {
return;
}
try {
logger.info("executing updating periotic events", {
logger.info("executing updating periodic events", {
tenantId: context.tenant,
subdomain: context.http?.req.authInfo.getSubdomain(),
});
Expand Down
157 changes: 157 additions & 0 deletions test-integration/runner.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"use strict";

const { promisify } = require("util");

const cds = require("@sap/cds");
cds.test(__dirname + "/_env");

Expand Down Expand Up @@ -283,4 +285,159 @@ describe("redisRunner", () => {
jest.useRealTimers();
});
});

describe("tenant hash", () => {
it("should trigger update periodic events once per tenant", async () => {
let counter = 0;
let acquireLockSpy;
const promise = new Promise((resolve) => {
acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => {
if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") {
counter++;
}
if (counter === tenantIds.length) {
resolve();
}
});
});
getAllTenantIdsSpy.mockResolvedValueOnce(tenantIds).mockResolvedValueOnce(tenantIds);
const p1 = runner._._multiTenancyDb();
await p1.then((p) => Promise.allSettled(p));
await promise;
expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(3);

acquireLockSpy.mockRestore();
});

it("should not trigger update again if tenant ids have not been changed", async () => {
let counter = 0;
let acquireLockSpy;
const promise = new Promise((resolve) => {
acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => {
if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") {
counter++;
}
if (counter === tenantIds.length) {
resolve();
}
});
});
getAllTenantIdsSpy.mockResolvedValueOnce(tenantIds).mockResolvedValueOnce(tenantIds);
const p1 = runner._._multiTenancyDb();
await p1.then((p) => Promise.allSettled(p));
await promise;
expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(3);

// second run
getAllTenantIdsSpy.mockResolvedValueOnce(tenantIds);
const p2 = runner._._multiTenancyDb();
await p2.then((p) => Promise.allSettled(p));
await promisify(setTimeout)(500);

expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(3);
acquireLockSpy.mockRestore();
});

it("should trigger update again if tenant ids have been changed", async () => {
let counter = 0;
let acquireLockSpy;
const promise = new Promise((resolve) => {
acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => {
if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") {
counter++;
}
if (counter === tenantIds.length) {
resolve();
}
});
});
getAllTenantIdsSpy.mockResolvedValueOnce(tenantIds).mockResolvedValueOnce(tenantIds);
const p1 = runner._._multiTenancyDb();
await p1.then((p) => Promise.allSettled(p));
await promise;
expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(3);

// second run with changed tenant ids
getAllTenantIdsSpy
.mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792"))
.mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792"));

const promise2 = new Promise((resolve) => {
counter = 0;
acquireLockSpy.mockRestore();
acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => {
if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") {
counter++;
}
if (counter === tenantIds.length) {
resolve();
}
});
});

const p2 = runner._._multiTenancyDb();
await p2.then((p) => Promise.allSettled(p));
await promise2;

expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(4);
acquireLockSpy.mockRestore();
});

it("should trigger update again if tenant ids have been changed and third run should not trigger an update", async () => {
let counter = 0;
let acquireLockSpy;
const promise = new Promise((resolve) => {
acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => {
if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") {
counter++;
}
if (counter === tenantIds.length) {
resolve();
}
});
});
getAllTenantIdsSpy.mockResolvedValueOnce(tenantIds).mockResolvedValueOnce(tenantIds);
const p1 = runner._._multiTenancyDb();
await p1.then((p) => Promise.allSettled(p));
await promise;
expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(3);

// second run with changed tenant ids
getAllTenantIdsSpy
.mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792"))
.mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792"));

const promise2 = new Promise((resolve) => {
counter = 0;
acquireLockSpy.mockRestore();
acquireLockSpy = jest.spyOn(distributedLock, "acquireLock").mockImplementation(async (context, key) => {
if (key === "EVENT_QUEUE_RUN_PERIODIC_EVENT") {
counter++;
}
if (counter === tenantIds.length) {
resolve();
}
});
});

const p2 = runner._._multiTenancyDb();
await p2.then((p) => Promise.allSettled(p));
await promise2;

expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(4);
acquireLockSpy.mockReset();

// thirds run with same tenant ids
getAllTenantIdsSpy
.mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792"))
.mockResolvedValueOnce(tenantIds.concat("e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792"));

const p3 = runner._._multiTenancyDb();
await p3.then((p) => Promise.allSettled(p));
await promisify(setTimeout)(500);

expect(acquireLockSpy.mock.calls.filter(([, key]) => key === "EVENT_QUEUE_RUN_PERIODIC_EVENT")).toHaveLength(0);
acquireLockSpy.mockRestore();
});
});
});
37 changes: 37 additions & 0 deletions test/__snapshots__/periodicEvents.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,43 @@ exports[`baseFunctionality delta insert 2`] = `
]
`;

exports[`baseFunctionality if periodic event is in progress - no insert should happen 1`] = `
[
[
"1/1 | inserting chunk of changed or new periodic events",
{
"events": [
{
"interval": 10,
"subType": "DB",
"type": "HealthCheck_PERIODIC",
},
{
"interval": 86400,
"subType": "DELETE_EVENTS",
"type": "EVENT_QUEUE_BASE_PERIODIC",
},
],
},
],
]
`;

exports[`baseFunctionality if periodic event is in progress - no insert should happen 2`] = `
[
{
"attempts": 0,
"startAfter": "2023-11-13T11:00:00.000Z",
"status": 1,
},
{
"attempts": 0,
"startAfter": "2023-11-13T11:00:00.000Z",
"status": 1,
},
]
`;

exports[`baseFunctionality interval changed 1`] = `
[
[
Expand Down
16 changes: 16 additions & 0 deletions test/periodicEvents.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const { Logger: mockLogger } = require("./mocks/logger");
const { checkAndInsertPeriodicEvents } = require("../src/periodicEvents");
const config = require("../src/config");
const { selectEventQueueAndReturn } = require("./helper");
const { EventProcessingStatus } = require("../src/constants");
const project = __dirname + "/.."; // The project's root folder
cds.test(project);

Expand Down Expand Up @@ -89,4 +90,19 @@ describe("baseFunctionality", () => {
expect(loggerMock.calls().info).toMatchSnapshot();
expect(await selectEventQueueAndReturn(tx, { expectedLength: 2 })).toMatchSnapshot();
});

it("if periodic event is in progress - no insert should happen", async () => {
await checkAndInsertPeriodicEvents(context);

await tx.run(
UPDATE.entity("sap.eventqueue.Event").set({
status: EventProcessingStatus.InProgress,
})
);
await checkAndInsertPeriodicEvents(context);

expect(loggerMock.callsLengths().error).toEqual(0);
expect(loggerMock.calls().info).toMatchSnapshot();
expect(await selectEventQueueAndReturn(tx, { expectedLength: 2 })).toMatchSnapshot();
});
});

0 comments on commit c891117

Please sign in to comment.