From 9f14593e3c19f1a76f2c85e8d4951b9dae26f7ed Mon Sep 17 00:00:00 2001
From: Victor Korzunin <5180700+floydspace@users.noreply.github.com>
Date: Thu, 28 Nov 2024 12:43:01 +0100
Subject: [PATCH] test: improve consumer testing
---
test/Consumer.test.ts | 208 ++++++++++++++++++++++--------------------
1 file changed, 110 insertions(+), 98 deletions(-)
diff --git a/test/Consumer.test.ts b/test/Consumer.test.ts
index 398aafd..a68d923 100644
--- a/test/Consumer.test.ts
+++ b/test/Consumer.test.ts
@@ -1,9 +1,17 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "@effect/vitest";
import Substitute, { Arg, SubstituteOf } from "@fluffy-spoon/substitute";
-import { Cause, Effect, Exit } from "effect";
+import { Cause, Effect, Exit, Fiber, Layer } from "effect";
import { Consumer, ConsumerRecord, MessageRouter } from "../src";
import { TestConsumer, TestInstance, testKafkaInstanceLayer } from "./mocks/TestKafkaInstance";
+const runPromiseWithInterruption = (effect: Effect.Effect) => {
+ const fiber = Effect.runFork(effect);
+ return Effect.sleep(0).pipe(
+ Effect.andThen(() => Fiber.interrupt(fiber)),
+ Effect.runPromise,
+ );
+};
+
describe("Consumer", () => {
let kafkaSub: SubstituteOf;
let consumerSub: SubstituteOf;
@@ -33,107 +41,111 @@ describe("Consumer", () => {
consumerSub.received(1).disconnect();
});
- it.effect("should receive message", () =>
- Effect.gen(function* () {
- expect.assertions(3);
-
- const procedure = vi.fn();
-
- yield* MessageRouter.empty.pipe(
- MessageRouter.subscribe(
- "test-topic",
- Effect.tap(ConsumerRecord.ConsumerRecord, (r) => {
- procedure(r);
- }),
- ),
- Consumer.serveUpToEffect(2, { groupId: "group" }),
- Effect.scoped,
- );
-
- consumerSub.received(1).subscribe(["test-topic"]);
- expect(procedure).toHaveBeenCalledTimes(2);
- expect(procedure).toHaveBeenNthCalledWith(
- 1,
- ConsumerRecord.make({
- ...ConsumerRecord.empty,
- topic: "test-topic",
- }),
- );
- expect(procedure).toHaveBeenNthCalledWith(
- 2,
- ConsumerRecord.make({
- ...ConsumerRecord.empty,
- offset: "1",
- topic: "test-topic",
- }),
- );
- }).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
- );
+ it("should receive message", async () => {
+ expect.assertions(4);
- it.effect("should catchTag handle error", () =>
- Effect.gen(function* () {
- expect.assertions(2 * 2); // 2 assertions for each error
-
- yield* MessageRouter.empty.pipe(
- MessageRouter.subscribe(
- "test-topic",
- Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
- ),
- MessageRouter.catchTag("UnknownException", (e) => {
- expect(e).toBeInstanceOf(Cause.UnknownException);
- expect(e.error).toBe("Error processing message");
- return Effect.void;
+ const procedure = vi.fn();
+
+ const program = await MessageRouter.empty.pipe(
+ MessageRouter.subscribe(
+ "test-topic",
+ Effect.tap(ConsumerRecord.ConsumerRecord, (r) => {
+ procedure(r);
}),
- Consumer.serveUpToEffect(2, { groupId: "group" }),
- Effect.scoped,
- );
+ ),
+ Consumer.serve({ groupId: "group" }),
+ Layer.launch,
+ Effect.provide(testKafkaInstanceLayer(kafkaSub)),
+ runPromiseWithInterruption,
+ );
- consumerSub.received(1).subscribe(["test-topic"]);
- }).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
- );
+ consumerSub.received(1).subscribe(["test-topic"]);
+ expect(Exit.isInterrupted(program)).toBe(true);
+ expect(procedure).toHaveBeenCalledTimes(2);
+ expect(procedure).toHaveBeenNthCalledWith(
+ 1,
+ ConsumerRecord.make({
+ ...ConsumerRecord.empty,
+ topic: "test-topic",
+ }),
+ );
+ expect(procedure).toHaveBeenNthCalledWith(
+ 2,
+ ConsumerRecord.make({
+ ...ConsumerRecord.empty,
+ offset: "1",
+ topic: "test-topic",
+ }),
+ );
+ });
- it.effect("should catchAll handle error", () =>
- Effect.gen(function* () {
- expect.assertions(2 * 2); // 2 assertions for each error
-
- yield* MessageRouter.empty.pipe(
- MessageRouter.subscribe(
- "test-topic",
- Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
- ),
- MessageRouter.catchAll((e) => {
- expect(e).toBeInstanceOf(Cause.UnknownException);
- expect(e.error).toBe("Error processing message");
- return Effect.void;
- }),
- Consumer.serveUpToEffect(2, { groupId: "group" }),
- Effect.scoped,
- );
+ it("should catchTag handle error", async () => {
+ expect.assertions(2 * 2 + 1); // 2 assertions for each error + 1 for interruption
- consumerSub.received(1).subscribe(["test-topic"]);
- }).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
- );
+ const program = await MessageRouter.empty.pipe(
+ MessageRouter.subscribe(
+ "test-topic",
+ Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
+ ),
+ MessageRouter.catchTag("UnknownException", (e) => {
+ expect(e).toBeInstanceOf(Cause.UnknownException);
+ expect(e.error).toBe("Error processing message");
+ return Effect.void;
+ }),
+ Consumer.serve({ groupId: "group" }),
+ Layer.launch,
+ Effect.provide(testKafkaInstanceLayer(kafkaSub)),
+ runPromiseWithInterruption,
+ );
- it.effect("should catchAllCause handle error", () =>
- Effect.gen(function* () {
- expect.assertions(1 * 2); // 1 assertion for each error
-
- yield* MessageRouter.empty.pipe(
- MessageRouter.subscribe(
- "test-topic",
- Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
- ),
- MessageRouter.catchAllCause((e) => {
- expect(e).toStrictEqual(Cause.fail(new Cause.UnknownException("Error processing message")));
- return Effect.void;
- }),
- Consumer.serveUpToEffect(2, { groupId: "group" }),
- Effect.scoped,
- );
+ consumerSub.received(1).subscribe(["test-topic"]);
+ expect(Exit.isInterrupted(program)).toBe(true);
+ });
- consumerSub.received(1).subscribe(["test-topic"]);
- }).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
- );
+ it("should catchAll handle error", async () => {
+ expect.assertions(2 * 2 + 1); // 2 assertions for each error + 1 for interruption
+
+ const program = await MessageRouter.empty.pipe(
+ MessageRouter.subscribe(
+ "test-topic",
+ Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
+ ),
+ MessageRouter.catchAll((e) => {
+ expect(e).toBeInstanceOf(Cause.UnknownException);
+ expect(e.error).toBe("Error processing message");
+ return Effect.void;
+ }),
+ Consumer.serve({ groupId: "group" }),
+ Layer.launch,
+ Effect.provide(testKafkaInstanceLayer(kafkaSub)),
+ runPromiseWithInterruption,
+ );
+
+ consumerSub.received(1).subscribe(["test-topic"]);
+ expect(Exit.isInterrupted(program)).toBe(true);
+ });
+
+ it("should catchAllCause handle error", async () => {
+ expect.assertions(1 * 2 + 1); // 1 assertion for each error + 1 for interruption
+
+ const program = await MessageRouter.empty.pipe(
+ MessageRouter.subscribe(
+ "test-topic",
+ Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
+ ),
+ MessageRouter.catchAllCause((e) => {
+ expect(e).toStrictEqual(Cause.fail(new Cause.UnknownException("Error processing message")));
+ return Effect.void;
+ }),
+ Consumer.serve({ groupId: "group" }),
+ Layer.launch,
+ Effect.provide(testKafkaInstanceLayer(kafkaSub)),
+ runPromiseWithInterruption,
+ );
+
+ consumerSub.received(1).subscribe(["test-topic"]);
+ expect(Exit.isInterrupted(program)).toBe(true);
+ });
it("should not handle error", async () => {
expect.assertions(1);
@@ -143,10 +155,10 @@ describe("Consumer", () => {
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
- Consumer.serveUpToEffect(2, { groupId: "group" }),
- Effect.scoped,
+ Consumer.serve({ groupId: "group" }),
+ Layer.launch,
Effect.provide(testKafkaInstanceLayer(kafkaSub)),
- Effect.runPromiseExit,
+ runPromiseWithInterruption,
);
consumerSub.received(1).subscribe(["test-topic"]);