From 84269f002fc87d3f3bcabdf891208a087aaaa89f Mon Sep 17 00:00:00 2001 From: Victor Korzunin <5180700+floydspace@users.noreply.github.com> Date: Fri, 22 Nov 2024 22:34:41 +0100 Subject: [PATCH] test: cover consumer with tests --- src/Consumer.ts | 32 ++++++++ src/ConsumerRecord.ts | 6 ++ src/internal/consumer.ts | 47 +++++++++++ src/internal/consumerRecord.ts | 18 ++++- test/Consumer.test.ts | 138 ++++++++++++++++++++++++++++++++ test/Producer.test.ts | 31 ++++--- test/mocks/TestKafkaInstance.ts | 57 ++++++++++--- 7 files changed, 302 insertions(+), 27 deletions(-) create mode 100644 test/Consumer.test.ts diff --git a/src/Consumer.ts b/src/Consumer.ts index 3263109..7236090 100644 --- a/src/Consumer.ts +++ b/src/Consumer.ts @@ -186,6 +186,38 @@ export const serve: { >; } = internal.serve; +/** + * @since 0.5.0 + * @category accessors + */ +export const serveOnceEffect: { + /** + * @since 0.5.0 + * @category accessors + */ + ( + options: Consumer.ConsumerOptions, + ): ( + app: MessageRouter.MessageRouter, + ) => Effect.Effect< + void, + Error.ConnectionException, + Scope.Scope | KafkaInstance.KafkaInstance | Exclude + >; + /** + * @since 0.5.0 + * @category accessors + */ + ( + app: MessageRouter.MessageRouter, + options: Consumer.ConsumerOptions, + ): Effect.Effect< + void, + Error.ConnectionException, + Scope.Scope | KafkaInstance.KafkaInstance | Exclude + >; +} = internal.serveOnceEffect; + /** * @since 0.1.0 * @category accessors diff --git a/src/ConsumerRecord.ts b/src/ConsumerRecord.ts index 9ad7368..f967bcc 100644 --- a/src/ConsumerRecord.ts +++ b/src/ConsumerRecord.ts @@ -47,3 +47,9 @@ export declare namespace ConsumerRecord { * @category constructors */ export const make: (payload: internal.ConsumerRecordConstructorProps) => ConsumerRecord = internal.make; + +/** + * @since 0.5.0 + * @category constructors + */ +export const empty: ConsumerRecord = internal.empty; diff --git a/src/internal/consumer.ts b/src/internal/consumer.ts index 3bdd2bc..3a0aa40 100644 --- a/src/internal/consumer.ts +++ b/src/internal/consumer.ts @@ -37,6 +37,53 @@ export const makeConsumer = ( return yield* instance.consumer(options); }); +/** @internal */ +export const serveOnceEffect = dual< + { + ( + options: Consumer.Consumer.ConsumerOptions, + ): ( + app: MessageRouter.MessageRouter, + ) => Effect.Effect< + void, + Error.ConnectionException, + KafkaInstance.KafkaInstance | Scope.Scope | Exclude + >; + }, + { + ( + app: MessageRouter.MessageRouter, + options: Consumer.Consumer.ConsumerOptions, + ): Effect.Effect< + void, + Error.ConnectionException, + KafkaInstance.KafkaInstance | Scope.Scope | Exclude + >; + } +>( + (args) => Effect.isEffect(args[0]), + ( + app: MessageRouter.MessageRouter, + options: Consumer.Consumer.ConsumerOptions, + ): Effect.Effect< + void, + Error.ConnectionException, + KafkaInstance.KafkaInstance | Scope.Scope | Exclude + > => + Effect.gen(function* () { + const instance = yield* instanceTag; + const consumer = yield* instance.consumer(options); + + const topics = Chunk.toArray(app.routes).map((route) => route.topic); + + yield* consumer.subscribe(topics); + + const queue = yield* consumer.consume(); + + yield* app.pipe(Effect.provideServiceEffect(consumerRecordTag, queue)).pipe(Effect.orDie); + }), +); + /** @internal */ export const serveEffect = dual< { diff --git a/src/internal/consumerRecord.ts b/src/internal/consumerRecord.ts index 3c6866d..b25c5ca 100644 --- a/src/internal/consumerRecord.ts +++ b/src/internal/consumerRecord.ts @@ -1,4 +1,4 @@ -import { Context, Effect } from "effect"; +import { Context, Effect, String } from "effect"; import type * as ConsumerRecord from "../ConsumerRecord.js"; /** @internal */ @@ -29,3 +29,19 @@ export type ConsumerRecordConstructorProps = { /** @internal */ export const make = (payload: ConsumerRecordConstructorProps): ConsumerRecord.ConsumerRecord => Object.assign(Object.create(consumerRecordProto), payload); + +const noop = () => Effect.void; + +/** @internal */ +export const empty: ConsumerRecord.ConsumerRecord = make({ + topic: String.empty, + partition: 0, + highWatermark: "0", + key: null, + value: null, + timestamp: "0", + attributes: 0, + offset: "0", + heartbeat: noop, + commit: noop, +}); diff --git a/test/Consumer.test.ts b/test/Consumer.test.ts new file mode 100644 index 0000000..b3405ca --- /dev/null +++ b/test/Consumer.test.ts @@ -0,0 +1,138 @@ +import { afterEach, beforeEach, describe, expect, it } from "@effect/vitest"; +import Substitute, { Arg, SubstituteOf } from "@fluffy-spoon/substitute"; +import { Cause, Effect, Exit } from "effect"; +import { Consumer, ConsumerRecord, MessageRouter } from "../src"; +import { TestConsumer, TestInstance, testKafkaInstanceLayer } from "./mocks/TestKafkaInstance"; + +describe("Consumer", () => { + let kafkaSub: SubstituteOf; + let consumerSub: SubstituteOf; + + beforeEach(() => { + kafkaSub = Substitute.for(); + consumerSub = Substitute.for(); + consumerSub.connect().returns(Effect.void); + consumerSub.disconnect().returns(Effect.void); + consumerSub.subscribe(Arg.any()).returns(Effect.void); + consumerSub.results.returns?.([ + ConsumerRecord.make({ + ...ConsumerRecord.empty, + topic: "test-topic", + }), + ]); + kafkaSub.consumer().returns(consumerSub); + }); + + afterEach(() => { + // Assert that scope was released + consumerSub.received(1).disconnect(); + }); + + it.effect("should receive message", () => + Effect.gen(function* () { + const record = ConsumerRecord.make({ + ...ConsumerRecord.empty, + topic: "test-topic", + }); + consumerSub.results.returns?.([record]); + + expect.assertions(1); + + yield* MessageRouter.empty.pipe( + MessageRouter.subscribe( + "test-topic", + Effect.tap(ConsumerRecord.ConsumerRecord, (r) => { + expect(r).toStrictEqual(record); + }), + ), + Consumer.serveOnceEffect({ groupId: "group" }), + Effect.scoped, + ); + + consumerSub.received(1).subscribe(["test-topic"]); + }).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))), + ); + + it.effect("should catchTag handle error", () => + Effect.gen(function* () { + expect.assertions(2); + + yield* MessageRouter.empty.pipe( + MessageRouter.subscribe( + "test-topic", + Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")), + ), + MessageRouter.catchTag("UnknownException", (e) => { + expect(e).toBeInstanceOf(Cause.UnknownException); + expect(e.error).toBe("Error processing message"); + return Effect.void; + }), + Consumer.serveOnceEffect({ groupId: "group" }), + Effect.scoped, + ); + + consumerSub.received(1).subscribe(["test-topic"]); + }).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))), + ); + + it.effect("should catchAll handle error", () => + Effect.gen(function* () { + expect.assertions(2); + + yield* MessageRouter.empty.pipe( + MessageRouter.subscribe( + "test-topic", + Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")), + ), + MessageRouter.catchAll((e) => { + expect(e).toBeInstanceOf(Cause.UnknownException); + expect(e.error).toBe("Error processing message"); + return Effect.void; + }), + Consumer.serveOnceEffect({ groupId: "group" }), + Effect.scoped, + ); + + consumerSub.received(1).subscribe(["test-topic"]); + }).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))), + ); + + it.effect("should catchAllCause handle error", () => + Effect.gen(function* () { + expect.assertions(1); + + yield* MessageRouter.empty.pipe( + MessageRouter.subscribe( + "test-topic", + Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")), + ), + MessageRouter.catchAllCause((e) => { + expect(e).toStrictEqual(Cause.fail(new Cause.UnknownException("Error processing message"))); + return Effect.void; + }), + Consumer.serveOnceEffect({ groupId: "group" }), + Effect.scoped, + ); + + consumerSub.received(1).subscribe(["test-topic"]); + }).pipe(Effect.provide(testKafkaInstanceLayer(kafkaSub))), + ); + + it("should not handle error", async () => { + expect.assertions(1); + + const program = await MessageRouter.empty.pipe( + MessageRouter.subscribe( + "test-topic", + Effect.flatMap(ConsumerRecord.ConsumerRecord, () => new Cause.UnknownException("Error processing message")), + ), + Consumer.serveOnceEffect({ groupId: "group" }), + Effect.scoped, + Effect.provide(testKafkaInstanceLayer(kafkaSub)), + Effect.runPromiseExit, + ); + + consumerSub.received(1).subscribe(["test-topic"]); + expect(program).toStrictEqual(Exit.die(new Cause.UnknownException("Error processing message"))); + }); +}); diff --git a/test/Producer.test.ts b/test/Producer.test.ts index 88a1e79..4382592 100644 --- a/test/Producer.test.ts +++ b/test/Producer.test.ts @@ -1,22 +1,21 @@ import { afterEach, beforeEach, describe, expect, it } from "@effect/vitest"; import Substitute, { Arg, SubstituteOf } from "@fluffy-spoon/substitute"; import { Effect } from "effect"; -import type * as KafkaJS from "kafkajs"; import { Producer } from "../src"; -import { testKafkaInstanceLayer } from "./mocks/TestKafkaInstance"; +import { TestInstance, testKafkaInstanceLayer, TestProducer } from "./mocks/TestKafkaInstance"; describe("Producer", () => { - let kafkaSub: SubstituteOf; - let producerSub: SubstituteOf; + let kafkaSub: SubstituteOf; + let producerSub: SubstituteOf; beforeEach(() => { - kafkaSub = Substitute.for(); - producerSub = Substitute.for(); - producerSub.connect().resolves(); - producerSub.disconnect().resolves(); - producerSub.send(Arg.any()).resolves([]); - producerSub.sendBatch(Arg.any()).resolves([]); - kafkaSub.producer(Arg.any()).returns(producerSub); + kafkaSub = Substitute.for(); + producerSub = Substitute.for(); + producerSub.connect().returns(Effect.void); + producerSub.disconnect().returns(Effect.void); + producerSub.send(Arg.any()).returns(Effect.succeed([])); + producerSub.sendBatch(Arg.any()).returns(Effect.succeed([])); + kafkaSub.producer().returns(producerSub); }); afterEach(() => { @@ -24,8 +23,8 @@ describe("Producer", () => { producerSub.received(1).disconnect(); }); - it.effect("send", () => - Effect.gen(function* (_) { + it.effect("should send message", () => + Effect.gen(function* () { const result = yield* Producer.send({ topic: "test-topic", messages: [{ value: "Hello, effect-kafka user!" }, { value: "How are you, effect-kafka user?" }], @@ -43,15 +42,15 @@ describe("Producer", () => { }).pipe(Effect.provide(Producer.layer()), Effect.provide(testKafkaInstanceLayer(kafkaSub))), ); - it.effect("sendScoped", () => - Effect.gen(function* (_) { + it.effect("should send message scoped", () => + Effect.gen(function* () { const result = yield* Producer.sendScoped({ topic: "test-topic", messages: [{ value: "Hello, effect-kafka user!" }, { value: "How are you, effect-kafka user?" }], }).pipe(Effect.scoped); expect(result).toEqual([]); - kafkaSub.received(1).producer({}); + kafkaSub.received(1).producer(); producerSub.received(1).connect(); producerSub.received(1).disconnect(); producerSub.received(1).send({ diff --git a/test/mocks/TestKafkaInstance.ts b/test/mocks/TestKafkaInstance.ts index 6a1038b..e556f09 100644 --- a/test/mocks/TestKafkaInstance.ts +++ b/test/mocks/TestKafkaInstance.ts @@ -1,22 +1,59 @@ -import { Effect, Layer } from "effect"; -import type * as KafkaJS from "kafkajs"; +import { Effect, Layer, Queue } from "effect"; +import { Consumer, ConsumerRecord, MessageRouter, Producer } from "../../src"; +import { ProducerConstructorProps } from "../../src/internal/producer"; import * as KafkaInstance from "../../src/KafkaInstance"; -import * as internal from "../../src/KafkaJS/internal/kafkaJSInstance"; -import * as Producer from "../../src/Producer"; -export const testKafkaInstanceLayer = (kafka: KafkaJS.Kafka) => +type Connectable = { + connect: () => Effect.Effect; + disconnect: () => Effect.Effect; +}; + +export interface TestProducer extends ProducerConstructorProps, Connectable {} + +export interface TestConsumer extends Connectable { + readonly subscribe: (topics: MessageRouter.Route.Path[]) => Effect.Effect; + readonly results: ConsumerRecord.ConsumerRecord[]; +} + +export interface TestInstance { + readonly producer: () => TestProducer; + readonly consumer: () => TestConsumer; +} + +export const testKafkaInstanceLayer = (kafka: TestInstance) => Layer.succeed( KafkaInstance.KafkaInstance, KafkaInstance.make({ - producer: (options) => + producer: () => Effect.gen(function* () { - const producer = yield* internal.connectProducerScoped(kafka, options); + const producer = yield* Effect.acquireRelease( + Effect.sync(() => kafka.producer()).pipe(Effect.tap((p) => p.connect())), + (p) => p.disconnect(), + ); return Producer.make({ - send: (record) => internal.send(producer, record), - sendBatch: (batch) => internal.sendBatch(producer, batch), + send: (record) => producer.send(record), + sendBatch: (batch) => producer.sendBatch(batch), + }); + }), + consumer: () => + Effect.gen(function* () { + const consumer = yield* Effect.acquireRelease( + Effect.sync(() => kafka.consumer()).pipe(Effect.tap((p) => p.connect())), + (p) => p.disconnect(), + ); + + return Consumer.make({ + subscribe: (topics) => consumer.subscribe(topics), + consume: () => + Effect.gen(function* () { + const queue = yield* Queue.bounded(1); + + yield* Queue.offerAll(queue, consumer.results); + + return queue; + }), }); }), - consumer: () => Effect.dieMessage("Not implemented"), }), );