Skip to content

Commit

Permalink
feat: replace serveOnceEffect to serveUpToEffect
Browse files Browse the repository at this point in the history
  • Loading branch information
floydspace committed Nov 23, 2024
1 parent edca0d6 commit 1b3b5b0
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 21 deletions.
8 changes: 6 additions & 2 deletions src/Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,18 @@ export const serve: {
} = internal.serve;

/**
* Serves consumer to receive up to a maximum number of records. Useful for testing and non-blocking consumption.
*
* @since 0.6.0
* @category accessors
*/
export const serveOnceEffect: {
export const serveUpToEffect: {
/**
* @since 0.6.0
* @category accessors
*/
(
max: number,
options: Consumer.ConsumerOptions,
): <E, R>(
app: MessageRouter.MessageRouter<E, R>,
Expand All @@ -210,13 +213,14 @@ export const serveOnceEffect: {
*/
<E, R>(
app: MessageRouter.MessageRouter<E, R>,
max: number,
options: Consumer.ConsumerOptions,
): Effect.Effect<
void,
Error.ConnectionException,
Scope.Scope | KafkaInstance.KafkaInstance | Exclude<R, ConsumerRecord.ConsumerRecord>
>;
} = internal.serveOnceEffect;
} = internal.serveUpToEffect;

/**
* @since 0.1.0
Expand Down
9 changes: 7 additions & 2 deletions src/internal/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ export const makeConsumer = (
});

/** @internal */
export const serveOnceEffect = dual<
export const serveUpToEffect = dual<
{
(
max: number,
options: Consumer.Consumer.ConsumerOptions,
): <E, R>(
app: MessageRouter.MessageRouter<E, R>,
Expand All @@ -53,6 +54,7 @@ export const serveOnceEffect = dual<
{
<E, R>(
app: MessageRouter.MessageRouter<E, R>,
max: number,
options: Consumer.Consumer.ConsumerOptions,
): Effect.Effect<
void,
Expand All @@ -64,6 +66,7 @@ export const serveOnceEffect = dual<
(args) => Effect.isEffect(args[0]),
<E, R>(
app: MessageRouter.MessageRouter<E, R>,
max: number,
options: Consumer.Consumer.ConsumerOptions,
): Effect.Effect<
void,
Expand All @@ -80,7 +83,9 @@ export const serveOnceEffect = dual<

const queue = yield* consumer.consume();

yield* app.pipe(Effect.provideServiceEffect(consumerRecordTag, queue)).pipe(Effect.orDie);
yield* Effect.forEach(yield* queue.takeUpTo(max), (record) =>
app.pipe(Effect.provideService(consumerRecordTag, record)),
).pipe(Effect.orDie);
}),
);

Expand Down
49 changes: 33 additions & 16 deletions test/Consumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { afterEach, beforeEach, describe, expect, it } from "@effect/vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "@effect/vitest";
import Substitute, { Arg, SubstituteOf } from "@fluffy-spoon/substitute";
import { Cause, Effect, Exit } from "effect";
import { Consumer, ConsumerRecord, MessageRouter } from "../src";
Expand All @@ -19,6 +19,11 @@ describe("Consumer", () => {
...ConsumerRecord.empty,
topic: "test-topic",
}),
ConsumerRecord.make({
...ConsumerRecord.empty,
offset: "1",
topic: "test-topic",
}),
]);
kafkaSub.consumer().returns(consumerSub);
});
Expand All @@ -30,32 +35,44 @@ describe("Consumer", () => {

it.effect("should receive message", () =>
Effect.gen(function* () {
const record = ConsumerRecord.make({
...ConsumerRecord.empty,
topic: "test-topic",
});
consumerSub.results.returns?.([record]);
expect.assertions(3);

expect.assertions(1);
const procedure = vi.fn();

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.tap(ConsumerRecord.ConsumerRecord, (r) => {
expect(r).toStrictEqual(record);
procedure(r);
}),
),
Consumer.serveOnceEffect({ groupId: "group" }),
Consumer.serveUpToEffect(2, { groupId: "group" }),
Effect.scoped,
);

consumerSub.received(1).subscribe(["test-topic"]);
expect(procedure).toHaveBeenCalledTimes(2);
expect(procedure).toHaveBeenNthCalledWith(
1,
ConsumerRecord.make({
...ConsumerRecord.empty,
topic: "test-topic",
}),
);
expect(procedure).toHaveBeenNthCalledWith(
2,
ConsumerRecord.make({
...ConsumerRecord.empty,
offset: "1",
topic: "test-topic",
}),
);
}).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
);

it.effect("should catchTag handle error", () =>
Effect.gen(function* () {
expect.assertions(2);
expect.assertions(2 * 2); // 2 assertions for each error

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
Expand All @@ -67,7 +84,7 @@ describe("Consumer", () => {
expect(e.error).toBe("Error processing message");
return Effect.void;
}),
Consumer.serveOnceEffect({ groupId: "group" }),
Consumer.serveUpToEffect(2, { groupId: "group" }),
Effect.scoped,
);

Expand All @@ -77,7 +94,7 @@ describe("Consumer", () => {

it.effect("should catchAll handle error", () =>
Effect.gen(function* () {
expect.assertions(2);
expect.assertions(2 * 2); // 2 assertions for each error

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
Expand All @@ -89,7 +106,7 @@ describe("Consumer", () => {
expect(e.error).toBe("Error processing message");
return Effect.void;
}),
Consumer.serveOnceEffect({ groupId: "group" }),
Consumer.serveUpToEffect(2, { groupId: "group" }),
Effect.scoped,
);

Expand All @@ -99,7 +116,7 @@ describe("Consumer", () => {

it.effect("should catchAllCause handle error", () =>
Effect.gen(function* () {
expect.assertions(1);
expect.assertions(1 * 2); // 1 assertion for each error

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
Expand All @@ -110,7 +127,7 @@ describe("Consumer", () => {
expect(e).toStrictEqual(Cause.fail(new Cause.UnknownException("Error processing message")));
return Effect.void;
}),
Consumer.serveOnceEffect({ groupId: "group" }),
Consumer.serveUpToEffect(2, { groupId: "group" }),
Effect.scoped,
);

Expand All @@ -126,7 +143,7 @@ describe("Consumer", () => {
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
Consumer.serveOnceEffect({ groupId: "group" }),
Consumer.serveUpToEffect(2, { groupId: "group" }),
Effect.scoped,
Effect.provide(testKafkaInstanceLayer(kafkaSub)),
Effect.runPromiseExit,
Expand Down
2 changes: 1 addition & 1 deletion test/mocks/TestKafkaInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const testKafkaInstanceLayer = (kafka: TestInstance) =>
subscribe: (topics) => consumer.subscribe(topics),
consume: () =>
Effect.gen(function* () {
const queue = yield* Queue.bounded<ConsumerRecord.ConsumerRecord>(1);
const queue = yield* Queue.unbounded<ConsumerRecord.ConsumerRecord>();

yield* Queue.offerAll(queue, consumer.results);

Expand Down

0 comments on commit 1b3b5b0

Please sign in to comment.