Skip to content

Commit

Permalink
test: improve consumer testing
Browse files Browse the repository at this point in the history
  • Loading branch information
floydspace committed Nov 28, 2024
1 parent cf1cfd8 commit 9f14593
Showing 1 changed file with 110 additions and 98 deletions.
208 changes: 110 additions & 98 deletions test/Consumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "@effect/vitest";
import Substitute, { Arg, SubstituteOf } from "@fluffy-spoon/substitute";
import { Cause, Effect, Exit } from "effect";
import { Cause, Effect, Exit, Fiber, Layer } from "effect";
import { Consumer, ConsumerRecord, MessageRouter } from "../src";
import { TestConsumer, TestInstance, testKafkaInstanceLayer } from "./mocks/TestKafkaInstance";

const runPromiseWithInterruption = <A, E>(effect: Effect.Effect<A, E>) => {
const fiber = Effect.runFork(effect);
return Effect.sleep(0).pipe(
Effect.andThen(() => Fiber.interrupt(fiber)),
Effect.runPromise,
);
};

describe("Consumer", () => {
let kafkaSub: SubstituteOf<TestInstance>;
let consumerSub: SubstituteOf<TestConsumer>;
Expand Down Expand Up @@ -33,107 +41,111 @@ describe("Consumer", () => {
consumerSub.received(1).disconnect();
});

it.effect("should receive message", () =>
Effect.gen(function* () {
expect.assertions(3);

const procedure = vi.fn();

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.tap(ConsumerRecord.ConsumerRecord, (r) => {
procedure(r);
}),
),
Consumer.serveUpToEffect(2, { groupId: "group" }),
Effect.scoped,
);

consumerSub.received(1).subscribe(["test-topic"]);
expect(procedure).toHaveBeenCalledTimes(2);
expect(procedure).toHaveBeenNthCalledWith(
1,
ConsumerRecord.make({
...ConsumerRecord.empty,
topic: "test-topic",
}),
);
expect(procedure).toHaveBeenNthCalledWith(
2,
ConsumerRecord.make({
...ConsumerRecord.empty,
offset: "1",
topic: "test-topic",
}),
);
}).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
);
it("should receive message", async () => {
expect.assertions(4);

it.effect("should catchTag handle error", () =>
Effect.gen(function* () {
expect.assertions(2 * 2); // 2 assertions for each error

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
MessageRouter.catchTag("UnknownException", (e) => {
expect(e).toBeInstanceOf(Cause.UnknownException);
expect(e.error).toBe("Error processing message");
return Effect.void;
const procedure = vi.fn();

const program = await MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.tap(ConsumerRecord.ConsumerRecord, (r) => {
procedure(r);
}),
Consumer.serveUpToEffect(2, { groupId: "group" }),
Effect.scoped,
);
),
Consumer.serve({ groupId: "group" }),
Layer.launch,
Effect.provide(testKafkaInstanceLayer(kafkaSub)),
runPromiseWithInterruption,
);

consumerSub.received(1).subscribe(["test-topic"]);
}).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
);
consumerSub.received(1).subscribe(["test-topic"]);
expect(Exit.isInterrupted(program)).toBe(true);
expect(procedure).toHaveBeenCalledTimes(2);
expect(procedure).toHaveBeenNthCalledWith(
1,
ConsumerRecord.make({
...ConsumerRecord.empty,
topic: "test-topic",
}),
);
expect(procedure).toHaveBeenNthCalledWith(
2,
ConsumerRecord.make({
...ConsumerRecord.empty,
offset: "1",
topic: "test-topic",
}),
);
});

it.effect("should catchAll handle error", () =>
Effect.gen(function* () {
expect.assertions(2 * 2); // 2 assertions for each error

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
MessageRouter.catchAll((e) => {
expect(e).toBeInstanceOf(Cause.UnknownException);
expect(e.error).toBe("Error processing message");
return Effect.void;
}),
Consumer.serveUpToEffect(2, { groupId: "group" }),
Effect.scoped,
);
it("should catchTag handle error", async () => {
expect.assertions(2 * 2 + 1); // 2 assertions for each error + 1 for interruption

consumerSub.received(1).subscribe(["test-topic"]);
}).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
);
const program = await MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
MessageRouter.catchTag("UnknownException", (e) => {
expect(e).toBeInstanceOf(Cause.UnknownException);
expect(e.error).toBe("Error processing message");
return Effect.void;
}),
Consumer.serve({ groupId: "group" }),
Layer.launch,
Effect.provide(testKafkaInstanceLayer(kafkaSub)),
runPromiseWithInterruption,
);

it.effect("should catchAllCause handle error", () =>
Effect.gen(function* () {
expect.assertions(1 * 2); // 1 assertion for each error

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
MessageRouter.catchAllCause((e) => {
expect(e).toStrictEqual(Cause.fail(new Cause.UnknownException("Error processing message")));
return Effect.void;
}),
Consumer.serveUpToEffect(2, { groupId: "group" }),
Effect.scoped,
);
consumerSub.received(1).subscribe(["test-topic"]);
expect(Exit.isInterrupted(program)).toBe(true);
});

consumerSub.received(1).subscribe(["test-topic"]);
}).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
);
it("should catchAll handle error", async () => {
expect.assertions(2 * 2 + 1); // 2 assertions for each error + 1 for interruption

const program = await MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
MessageRouter.catchAll((e) => {
expect(e).toBeInstanceOf(Cause.UnknownException);
expect(e.error).toBe("Error processing message");
return Effect.void;
}),
Consumer.serve({ groupId: "group" }),
Layer.launch,
Effect.provide(testKafkaInstanceLayer(kafkaSub)),
runPromiseWithInterruption,
);

consumerSub.received(1).subscribe(["test-topic"]);
expect(Exit.isInterrupted(program)).toBe(true);
});

it("should catchAllCause handle error", async () => {
expect.assertions(1 * 2 + 1); // 1 assertion for each error + 1 for interruption

const program = await MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
MessageRouter.catchAllCause((e) => {
expect(e).toStrictEqual(Cause.fail(new Cause.UnknownException("Error processing message")));
return Effect.void;
}),
Consumer.serve({ groupId: "group" }),
Layer.launch,
Effect.provide(testKafkaInstanceLayer(kafkaSub)),
runPromiseWithInterruption,
);

consumerSub.received(1).subscribe(["test-topic"]);
expect(Exit.isInterrupted(program)).toBe(true);
});

it("should not handle error", async () => {
expect.assertions(1);
Expand All @@ -143,10 +155,10 @@ describe("Consumer", () => {
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
Consumer.serveUpToEffect(2, { groupId: "group" }),
Effect.scoped,
Consumer.serve({ groupId: "group" }),
Layer.launch,
Effect.provide(testKafkaInstanceLayer(kafkaSub)),
Effect.runPromiseExit,
runPromiseWithInterruption,
);

consumerSub.received(1).subscribe(["test-topic"]);
Expand Down

0 comments on commit 9f14593

Please sign in to comment.