Skip to content

Commit

Permalink
use Channel for stream rx
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Dec 9, 2024
1 parent dd102ce commit 3198244
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/cold-goats-repair.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-rx/rx": patch
---

use Channel for stream rx
39 changes: 28 additions & 11 deletions packages/rx/src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
*/
/* eslint-disable @typescript-eslint/no-empty-object-type */
import { NoSuchElementException } from "effect/Cause"
import type * as Cause from "effect/Cause"
import * as Channel from "effect/Channel"
import * as Chunk from "effect/Chunk"
import * as Duration from "effect/Duration"
import * as Effect from "effect/Effect"
import * as Either from "effect/Either"
import * as Exit from "effect/Exit"
import { dual, pipe } from "effect/Function"
import { constVoid, dual, pipe } from "effect/Function"
import { globalValue } from "effect/GlobalValue"
import * as Hash from "effect/Hash"
import * as Inspectable from "effect/Inspectable"
Expand Down Expand Up @@ -654,15 +656,24 @@ function makeStream<A, E>(
): Result.Result<A, E | NoSuchElementException> {
const previous = ctx.self<Result.Result<A, E | NoSuchElementException>>()

const cancel = runCallbackSync(runtime)(
Stream.runForEach(
stream,
(a) => Effect.sync(() => ctx.setSelfSync(Result.waiting(Result.success(a))))
),
(exit) => {
if (exit._tag === "Failure") {
ctx.setSelfSync(Result.failureWithPrevious(exit.cause, previous))
} else {
const writer: Channel.Channel<never, Chunk.Chunk<A>, never, E> = Channel.readWithCause({
onInput(input: Chunk.Chunk<A>) {
return Channel.suspend(() => {
const last = Chunk.last(input)
if (last._tag === "Some") {
ctx.setSelfSync(Result.success(last.value, true))
}
return writer
})
},
onFailure(cause: Cause.Cause<E>) {
return Channel.suspend(() => {
ctx.setSelfSync(Result.failureWithPrevious(cause, previous))
return Channel.void
})
},
onDone(_done: unknown) {
return Channel.suspend(() => {
pipe(
ctx.self<Result.Result<A, E | NoSuchElementException>>(),
Option.flatMap(Result.value),
Expand All @@ -671,8 +682,14 @@ function makeStream<A, E>(
onSome: (a) => ctx.setSelfSync(Result.success(a))
})
)
}
return Channel.void
})
}
})

const cancel = runCallbackSync(runtime)(
Channel.runDrain(Channel.pipeTo(Stream.toChannel(stream), writer)),
constVoid
)
if (cancel !== undefined) {
ctx.addFinalizer(cancel)
Expand Down

0 comments on commit 3198244

Please sign in to comment.