diff --git a/src/Consumer.ts b/src/Consumer.ts index 268ee24..8864eb5 100644 --- a/src/Consumer.ts +++ b/src/Consumer.ts @@ -187,15 +187,18 @@ export const serve: { } = internal.serve; /** + * Serves consumer to receive up to a maximum number of records. Useful for testing and non-blocking consumption. + * * @since 0.6.0 * @category accessors */ -export const serveOnceEffect: { +export const serveUpToEffect: { /** * @since 0.6.0 * @category accessors */ ( + max: number, options: Consumer.ConsumerOptions, ): ( app: MessageRouter.MessageRouter, @@ -210,13 +213,14 @@ export const serveOnceEffect: { */ ( app: MessageRouter.MessageRouter, + max: number, options: Consumer.ConsumerOptions, ): Effect.Effect< void, Error.ConnectionException, Scope.Scope | KafkaInstance.KafkaInstance | Exclude >; -} = internal.serveOnceEffect; +} = internal.serveUpToEffect; /** * @since 0.1.0 diff --git a/src/internal/consumer.ts b/src/internal/consumer.ts index 6d136e0..57cba70 100644 --- a/src/internal/consumer.ts +++ b/src/internal/consumer.ts @@ -38,9 +38,10 @@ export const makeConsumer = ( }); /** @internal */ -export const serveOnceEffect = dual< +export const serveUpToEffect = dual< { ( + max: number, options: Consumer.Consumer.ConsumerOptions, ): ( app: MessageRouter.MessageRouter, @@ -53,6 +54,7 @@ export const serveOnceEffect = dual< { ( app: MessageRouter.MessageRouter, + max: number, options: Consumer.Consumer.ConsumerOptions, ): Effect.Effect< void, @@ -64,6 +66,7 @@ export const serveOnceEffect = dual< (args) => Effect.isEffect(args[0]), ( app: MessageRouter.MessageRouter, + max: number, options: Consumer.Consumer.ConsumerOptions, ): Effect.Effect< void, @@ -80,7 +83,9 @@ export const serveOnceEffect = dual< const queue = yield* consumer.consume(); - yield* app.pipe(Effect.provideServiceEffect(consumerRecordTag, queue)).pipe(Effect.orDie); + yield* Effect.forEach(yield* queue.takeUpTo(max), (record) => + app.pipe(Effect.provideService(consumerRecordTag, record)), + ).pipe(Effect.orDie); }), ); diff --git a/test/Consumer.test.ts b/test/Consumer.test.ts index b3405ca..398aafd 100644 --- a/test/Consumer.test.ts +++ b/test/Consumer.test.ts @@ -1,4 +1,4 @@ -import { afterEach, beforeEach, describe, expect, it } from "@effect/vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "@effect/vitest"; import Substitute, { Arg, SubstituteOf } from "@fluffy-spoon/substitute"; import { Cause, Effect, Exit } from "effect"; import { Consumer, ConsumerRecord, MessageRouter } from "../src"; @@ -19,6 +19,11 @@ describe("Consumer", () => { ...ConsumerRecord.empty, topic: "test-topic", }), + ConsumerRecord.make({ + ...ConsumerRecord.empty, + offset: "1", + topic: "test-topic", + }), ]); kafkaSub.consumer().returns(consumerSub); }); @@ -30,32 +35,44 @@ describe("Consumer", () => { it.effect("should receive message", () => Effect.gen(function* () { - const record = ConsumerRecord.make({ - ...ConsumerRecord.empty, - topic: "test-topic", - }); - consumerSub.results.returns?.([record]); + expect.assertions(3); - expect.assertions(1); + const procedure = vi.fn(); yield* MessageRouter.empty.pipe( MessageRouter.subscribe( "test-topic", Effect.tap(ConsumerRecord.ConsumerRecord, (r) => { - expect(r).toStrictEqual(record); + procedure(r); }), ), - Consumer.serveOnceEffect({ groupId: "group" }), + Consumer.serveUpToEffect(2, { groupId: "group" }), Effect.scoped, ); consumerSub.received(1).subscribe(["test-topic"]); + expect(procedure).toHaveBeenCalledTimes(2); + expect(procedure).toHaveBeenNthCalledWith( + 1, + ConsumerRecord.make({ + ...ConsumerRecord.empty, + topic: "test-topic", + }), + ); + expect(procedure).toHaveBeenNthCalledWith( + 2, + ConsumerRecord.make({ + ...ConsumerRecord.empty, + offset: "1", + topic: "test-topic", + }), + ); }).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))), ); it.effect("should catchTag handle error", () => Effect.gen(function* () { - expect.assertions(2); + expect.assertions(2 * 2); // 2 assertions for each error yield* MessageRouter.empty.pipe( MessageRouter.subscribe( @@ -67,7 +84,7 @@ describe("Consumer", () => { expect(e.error).toBe("Error processing message"); return Effect.void; }), - Consumer.serveOnceEffect({ groupId: "group" }), + Consumer.serveUpToEffect(2, { groupId: "group" }), Effect.scoped, ); @@ -77,7 +94,7 @@ describe("Consumer", () => { it.effect("should catchAll handle error", () => Effect.gen(function* () { - expect.assertions(2); + expect.assertions(2 * 2); // 2 assertions for each error yield* MessageRouter.empty.pipe( MessageRouter.subscribe( @@ -89,7 +106,7 @@ describe("Consumer", () => { expect(e.error).toBe("Error processing message"); return Effect.void; }), - Consumer.serveOnceEffect({ groupId: "group" }), + Consumer.serveUpToEffect(2, { groupId: "group" }), Effect.scoped, ); @@ -99,7 +116,7 @@ describe("Consumer", () => { it.effect("should catchAllCause handle error", () => Effect.gen(function* () { - expect.assertions(1); + expect.assertions(1 * 2); // 1 assertion for each error yield* MessageRouter.empty.pipe( MessageRouter.subscribe( @@ -110,7 +127,7 @@ describe("Consumer", () => { expect(e).toStrictEqual(Cause.fail(new Cause.UnknownException("Error processing message"))); return Effect.void; }), - Consumer.serveOnceEffect({ groupId: "group" }), + Consumer.serveUpToEffect(2, { groupId: "group" }), Effect.scoped, ); @@ -126,7 +143,7 @@ describe("Consumer", () => { "test-topic", Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")), ), - Consumer.serveOnceEffect({ groupId: "group" }), + Consumer.serveUpToEffect(2, { groupId: "group" }), Effect.scoped, Effect.provide(testKafkaInstanceLayer(kafkaSub)), Effect.runPromiseExit, diff --git a/test/mocks/TestKafkaInstance.ts b/test/mocks/TestKafkaInstance.ts index e556f09..4fe06d6 100644 --- a/test/mocks/TestKafkaInstance.ts +++ b/test/mocks/TestKafkaInstance.ts @@ -47,7 +47,7 @@ export const testKafkaInstanceLayer = (kafka: TestInstance) => subscribe: (topics) => consumer.subscribe(topics), consume: () => Effect.gen(function* () { - const queue = yield* Queue.bounded(1); + const queue = yield* Queue.unbounded(); yield* Queue.offerAll(queue, consumer.results);