diff --git a/.changeset/cold-goats-repair.md b/.changeset/cold-goats-repair.md new file mode 100644 index 0000000..820005d --- /dev/null +++ b/.changeset/cold-goats-repair.md @@ -0,0 +1,5 @@ +--- +"@effect-rx/rx": patch +--- + +use Channel for stream rx diff --git a/packages/rx/src/Rx.ts b/packages/rx/src/Rx.ts index 0943f6d..07df5a8 100644 --- a/packages/rx/src/Rx.ts +++ b/packages/rx/src/Rx.ts @@ -3,12 +3,14 @@ */ /* eslint-disable @typescript-eslint/no-empty-object-type */ import { NoSuchElementException } from "effect/Cause" +import type * as Cause from "effect/Cause" +import * as Channel from "effect/Channel" import * as Chunk from "effect/Chunk" import * as Duration from "effect/Duration" import * as Effect from "effect/Effect" import * as Either from "effect/Either" import * as Exit from "effect/Exit" -import { dual, pipe } from "effect/Function" +import { constVoid, dual, pipe } from "effect/Function" import { globalValue } from "effect/GlobalValue" import * as Hash from "effect/Hash" import * as Inspectable from "effect/Inspectable" @@ -654,15 +656,24 @@ function makeStream( ): Result.Result { const previous = ctx.self>() - const cancel = runCallbackSync(runtime)( - Stream.runForEach( - stream, - (a) => Effect.sync(() => ctx.setSelfSync(Result.waiting(Result.success(a)))) - ), - (exit) => { - if (exit._tag === "Failure") { - ctx.setSelfSync(Result.failureWithPrevious(exit.cause, previous)) - } else { + const writer: Channel.Channel, never, E> = Channel.readWithCause({ + onInput(input: Chunk.Chunk) { + return Channel.suspend(() => { + const last = Chunk.last(input) + if (last._tag === "Some") { + ctx.setSelfSync(Result.success(last.value, true)) + } + return writer + }) + }, + onFailure(cause: Cause.Cause) { + return Channel.suspend(() => { + ctx.setSelfSync(Result.failureWithPrevious(cause, previous)) + return Channel.void + }) + }, + onDone(_done: unknown) { + return Channel.suspend(() => { pipe( ctx.self>(), Option.flatMap(Result.value), @@ -671,8 +682,14 @@ function makeStream( onSome: (a) => ctx.setSelfSync(Result.success(a)) }) ) - } + return Channel.void + }) } + }) + + const cancel = runCallbackSync(runtime)( + Channel.runDrain(Channel.pipeTo(Stream.toChannel(stream), writer)), + constVoid ) if (cancel !== undefined) { ctx.addFinalizer(cancel)