Skip to content

Commit

Permalink
test: cover consumer with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
floydspace committed Nov 22, 2024
1 parent 43423f5 commit 84269f0
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 27 deletions.
32 changes: 32 additions & 0 deletions src/Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,38 @@ export const serve: {
>;
} = internal.serve;

/**
* @since 0.5.0
* @category accessors
*/
export const serveOnceEffect: {
/**
* @since 0.5.0
* @category accessors
*/
(
options: Consumer.ConsumerOptions,
): <E, R>(
app: MessageRouter.MessageRouter<E, R>,
) => Effect.Effect<
void,
Error.ConnectionException,
Scope.Scope | KafkaInstance.KafkaInstance | Exclude<R, ConsumerRecord.ConsumerRecord>
>;
/**
* @since 0.5.0
* @category accessors
*/
<E, R>(
app: MessageRouter.MessageRouter<E, R>,
options: Consumer.ConsumerOptions,
): Effect.Effect<
void,
Error.ConnectionException,
Scope.Scope | KafkaInstance.KafkaInstance | Exclude<R, ConsumerRecord.ConsumerRecord>
>;
} = internal.serveOnceEffect;

/**
* @since 0.1.0
* @category accessors
Expand Down
6 changes: 6 additions & 0 deletions src/ConsumerRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,9 @@ export declare namespace ConsumerRecord {
* @category constructors
*/
export const make: (payload: internal.ConsumerRecordConstructorProps) => ConsumerRecord = internal.make;

/**
* @since 0.5.0
* @category constructors
*/
export const empty: ConsumerRecord = internal.empty;
47 changes: 47 additions & 0 deletions src/internal/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,53 @@ export const makeConsumer = (
return yield* instance.consumer(options);
});

/** @internal */
export const serveOnceEffect = dual<
{
(
options: Consumer.Consumer.ConsumerOptions,
): <E, R>(
app: MessageRouter.MessageRouter<E, R>,
) => Effect.Effect<
void,
Error.ConnectionException,
KafkaInstance.KafkaInstance | Scope.Scope | Exclude<R, ConsumerRecord.ConsumerRecord>
>;
},
{
<E, R>(
app: MessageRouter.MessageRouter<E, R>,
options: Consumer.Consumer.ConsumerOptions,
): Effect.Effect<
void,
Error.ConnectionException,
KafkaInstance.KafkaInstance | Scope.Scope | Exclude<R, ConsumerRecord.ConsumerRecord>
>;
}
>(
(args) => Effect.isEffect(args[0]),
<E, R>(
app: MessageRouter.MessageRouter<E, R>,
options: Consumer.Consumer.ConsumerOptions,
): Effect.Effect<
void,
Error.ConnectionException,
KafkaInstance.KafkaInstance | Scope.Scope | Exclude<R, ConsumerRecord.ConsumerRecord>
> =>
Effect.gen(function* () {
const instance = yield* instanceTag;
const consumer = yield* instance.consumer(options);

const topics = Chunk.toArray(app.routes).map((route) => route.topic);

yield* consumer.subscribe(topics);

const queue = yield* consumer.consume();

yield* app.pipe(Effect.provideServiceEffect(consumerRecordTag, queue)).pipe(Effect.orDie);
}),
);

/** @internal */
export const serveEffect = dual<
{
Expand Down
18 changes: 17 additions & 1 deletion src/internal/consumerRecord.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Context, Effect } from "effect";
import { Context, Effect, String } from "effect";
import type * as ConsumerRecord from "../ConsumerRecord.js";

/** @internal */
Expand Down Expand Up @@ -29,3 +29,19 @@ export type ConsumerRecordConstructorProps = {
/** @internal */
export const make = (payload: ConsumerRecordConstructorProps): ConsumerRecord.ConsumerRecord =>
Object.assign(Object.create(consumerRecordProto), payload);

const noop = () => Effect.void;

/** @internal */
export const empty: ConsumerRecord.ConsumerRecord = make({
topic: String.empty,
partition: 0,
highWatermark: "0",
key: null,
value: null,
timestamp: "0",
attributes: 0,
offset: "0",
heartbeat: noop,
commit: noop,
});
138 changes: 138 additions & 0 deletions test/Consumer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { afterEach, beforeEach, describe, expect, it } from "@effect/vitest";
import Substitute, { Arg, SubstituteOf } from "@fluffy-spoon/substitute";
import { Cause, Effect, Exit } from "effect";
import { Consumer, ConsumerRecord, MessageRouter } from "../src";
import { TestConsumer, TestInstance, testKafkaInstanceLayer } from "./mocks/TestKafkaInstance";

describe("Consumer", () => {
let kafkaSub: SubstituteOf<TestInstance>;
let consumerSub: SubstituteOf<TestConsumer>;

beforeEach(() => {
kafkaSub = Substitute.for<TestInstance>();
consumerSub = Substitute.for<TestConsumer>();
consumerSub.connect().returns(Effect.void);
consumerSub.disconnect().returns(Effect.void);
consumerSub.subscribe(Arg.any()).returns(Effect.void);
consumerSub.results.returns?.([
ConsumerRecord.make({
...ConsumerRecord.empty,
topic: "test-topic",
}),
]);
kafkaSub.consumer().returns(consumerSub);
});

afterEach(() => {
// Assert that scope was released
consumerSub.received(1).disconnect();
});

it.effect("should receive message", () =>
Effect.gen(function* () {
const record = ConsumerRecord.make({
...ConsumerRecord.empty,
topic: "test-topic",
});
consumerSub.results.returns?.([record]);

expect.assertions(1);

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.tap(ConsumerRecord.ConsumerRecord, (r) => {
expect(r).toStrictEqual(record);
}),
),
Consumer.serveOnceEffect({ groupId: "group" }),
Effect.scoped,
);

consumerSub.received(1).subscribe(["test-topic"]);
}).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
);

it.effect("should catchTag handle error", () =>
Effect.gen(function* () {
expect.assertions(2);

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
MessageRouter.catchTag("UnknownException", (e) => {
expect(e).toBeInstanceOf(Cause.UnknownException);
expect(e.error).toBe("Error processing message");
return Effect.void;
}),
Consumer.serveOnceEffect({ groupId: "group" }),
Effect.scoped,
);

consumerSub.received(1).subscribe(["test-topic"]);
}).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
);

it.effect("should catchAll handle error", () =>
Effect.gen(function* () {
expect.assertions(2);

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
MessageRouter.catchAll((e) => {
expect(e).toBeInstanceOf(Cause.UnknownException);
expect(e.error).toBe("Error processing message");
return Effect.void;
}),
Consumer.serveOnceEffect({ groupId: "group" }),
Effect.scoped,
);

consumerSub.received(1).subscribe(["test-topic"]);
}).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
);

it.effect("should catchAllCause handle error", () =>
Effect.gen(function* () {
expect.assertions(1);

yield* MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
MessageRouter.catchAllCause((e) => {
expect(e).toStrictEqual(Cause.fail(new Cause.UnknownException("Error processing message")));
return Effect.void;
}),
Consumer.serveOnceEffect({ groupId: "group" }),
Effect.scoped,
);

consumerSub.received(1).subscribe(["test-topic"]);
}).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))),
);

it("should not handle error", async () => {
expect.assertions(1);

const program = await MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")),
),
Consumer.serveOnceEffect({ groupId: "group" }),
Effect.scoped,
Effect.provide(testKafkaInstanceLayer(kafkaSub)),
Effect.runPromiseExit,
);

consumerSub.received(1).subscribe(["test-topic"]);
expect(program).toStrictEqual(Exit.die(new Cause.UnknownException("Error processing message")));
});
});
31 changes: 15 additions & 16 deletions test/Producer.test.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
import { afterEach, beforeEach, describe, expect, it } from "@effect/vitest";
import Substitute, { Arg, SubstituteOf } from "@fluffy-spoon/substitute";
import { Effect } from "effect";
import type * as KafkaJS from "kafkajs";
import { Producer } from "../src";
import { testKafkaInstanceLayer } from "./mocks/TestKafkaInstance";
import { TestInstance, testKafkaInstanceLayer, TestProducer } from "./mocks/TestKafkaInstance";

describe("Producer", () => {
let kafkaSub: SubstituteOf<KafkaJS.Kafka>;
let producerSub: SubstituteOf<KafkaJS.Producer>;
let kafkaSub: SubstituteOf<TestInstance>;
let producerSub: SubstituteOf<TestProducer>;

beforeEach(() => {
kafkaSub = Substitute.for<KafkaJS.Kafka>();
producerSub = Substitute.for<KafkaJS.Producer>();
producerSub.connect().resolves();
producerSub.disconnect().resolves();
producerSub.send(Arg.any()).resolves([]);
producerSub.sendBatch(Arg.any()).resolves([]);
kafkaSub.producer(Arg.any()).returns(producerSub);
kafkaSub = Substitute.for<TestInstance>();
producerSub = Substitute.for<TestProducer>();
producerSub.connect().returns(Effect.void);
producerSub.disconnect().returns(Effect.void);
producerSub.send(Arg.any()).returns(Effect.succeed([]));
producerSub.sendBatch(Arg.any()).returns(Effect.succeed([]));
kafkaSub.producer().returns(producerSub);
});

afterEach(() => {
// Assert that scope was released
producerSub.received(1).disconnect();
});

it.effect("send", () =>
Effect.gen(function* (_) {
it.effect("should send message", () =>
Effect.gen(function* () {
const result = yield* Producer.send({
topic: "test-topic",
messages: [{ value: "Hello, effect-kafka user!" }, { value: "How are you, effect-kafka user?" }],
Expand All @@ -43,15 +42,15 @@ describe("Producer", () => {
}).pipe(Effect.provide(Producer.layer()), Effect.provide(testKafkaInstanceLayer(kafkaSub))),
);

it.effect("sendScoped", () =>
Effect.gen(function* (_) {
it.effect("should send message scoped", () =>
Effect.gen(function* () {
const result = yield* Producer.sendScoped({
topic: "test-topic",
messages: [{ value: "Hello, effect-kafka user!" }, { value: "How are you, effect-kafka user?" }],
}).pipe(Effect.scoped);

expect(result).toEqual([]);
kafkaSub.received(1).producer({});
kafkaSub.received(1).producer();
producerSub.received(1).connect();
producerSub.received(1).disconnect();
producerSub.received(1).send({
Expand Down
Loading

0 comments on commit 84269f0

Please sign in to comment.