diff --git a/src/signal.js b/src/signal.js index 87b682b..eed3ed5 100644 --- a/src/signal.js +++ b/src/signal.js @@ -332,9 +332,20 @@ export default class Signal { * @returns A new signal. */ concatMap (f) { + let subscription2 + return new Signal(emit => { - const next = a => f(a).subscribe(emit.next, emit.error) - return this.subscribe({...emit, next}) + const next = a => { + if (subscription2) { subscription2.unsubscribe() } + subscription2 = f(a).subscribe(emit) + } + + const subscription1 = this.subscribe({...emit, next}) + + return () => { + subscription1.unsubscribe() + if (subscription2) { subscription2.unsubscribe() } + } }) } @@ -428,7 +439,7 @@ export default class Signal { * return a + b * }, 0) * - * @param f A binary function. + * @param f A ternary function. * @param a A starting value. * @returns A new signal. * @@ -463,12 +474,15 @@ export default class Signal { if (++count > ss.length) { emit.complete() } } - this.subscribe({...emit, complete}) + const subscription = this.subscribe({...emit, complete}) // Emit values from any signal. const subscriptions = ss.map(s => s.subscribe({...emit, complete})) - return () => subscriptions.forEach(s => s.unsubscribe()) + return () => { + subscription.unsubscribe() + subscriptions.forEach(s => s.unsubscribe()) + } }) } @@ -512,11 +526,13 @@ export default class Signal { if (++count >= 2) { emit.complete() } } - this.subscribe(a => next(a, 0), emit.error, complete) + const subscription1 = this.subscribe(a => next(a, 0), emit.error, complete) + const subscription2 = s.subscribe(a => next(a, 1), emit.error, complete) - const subscription = s.subscribe(a => next(a, 1), emit.error, complete) - - return () => subscription.unsubscribe() + return () => { + subscription1.unsubscribe() + subscription2.unsubscribe() + } }) } @@ -547,13 +563,16 @@ export default class Signal { // Buffer the value. const next = a => { lastValue = a } - this.subscribe({...emit, next}) + const subscription1 = this.subscribe({...emit, next}) // Emit the buffered value. - const subscription = s.subscribe(a => emit.next(f(lastValue, a)), emit.error) + const subscription2 = s.subscribe(a => emit.next(f(lastValue, a)), emit.error) // Unsubscribe the sampler. - return () => subscription.unsubscribe() + return () => { + subscription1.unsubscribe() + subscription2.unsubscribe() + } }) } @@ -582,13 +601,16 @@ export default class Signal { return new Signal(emit => { const next = a => { if (!lastValue) { emit.next(a) } } - this.subscribe({...emit, next}) + const subscription1 = this.subscribe({...emit, next}) // Store the hold value. - const subscription = s.subscribe(a => { lastValue = p(a) }, emit.error) + const subscription2 = s.subscribe(a => { lastValue = p(a) }, emit.error) // Unsubscribe the sampler. - return () => subscription.unsubscribe() + return () => { + subscription1.unsubscribe() + subscription2.unsubscribe() + } }) } @@ -621,19 +643,20 @@ export default class Signal { * @returns A new signal. */ switch () { - let subscription + let subscription2 return new Signal(emit => { const next = a => { - if (subscription) { subscription.unsubscribe() } + if (subscription2) { subscription2.unsubscribe() } if (!(a instanceof Signal)) { throw new Error('Signal value must be a signal') } - subscription = a.subscribe(emit) + subscription2 = a.subscribe(emit) } - this.subscribe({...emit, next}) + const subscription1 = this.subscribe({...emit, next}) return () => { - if (subscription) { subscription.unsubscribe() } + subscription1.unsubscribe() + if (subscription2) { subscription2.unsubscribe() } } }) } diff --git a/test/signal_test.js b/test/signal_test.js index bb847e0..dfd5192 100644 --- a/test/signal_test.js +++ b/test/signal_test.js @@ -323,6 +323,28 @@ describe('Signal', () => { s.concatMap(always()).subscribe({error: errorSpy}) assert.isTrue(errorSpy.calledOnce) }) + + it('unmounts the original signal when it is unsubscribed', () => { + const unmount = sinon.spy() + const s = new Signal(() => unmount) + const f = a => Signal.of(a) + const a = s.concatMap(f).subscribe(always()) + + a.unsubscribe() + + assert.isTrue(unmount.calledOnce) + }) + + it('unmounts the returned signal when it is unsubscribed', () => { + const unmount = sinon.spy() + const s = Signal.of(0) + const f = a => new Signal(() => unmount) + const a = s.concatMap(f).subscribe(always()) + + a.unsubscribe() + + assert.isTrue(unmount.calledOnce) + }) }) describe('#map', () => { @@ -472,7 +494,18 @@ describe('Signal', () => { assert.isTrue(errorSpy.calledTwice) }) - it('unmounts the signal when it is unsubscribed', () => { + it('unmounts the original signal when it is unsubscribed', () => { + const unmount = sinon.spy() + const s = new Signal(() => unmount) + const t = Signal.never() + const a = s.merge(t).subscribe(always()) + + a.unsubscribe() + + assert.isTrue(unmount.calledOnce) + }) + + it('unmounts the merged signal when it is unsubscribed', () => { const unmount = sinon.spy() const s = Signal.never() const t = new Signal(() => unmount) @@ -544,7 +577,18 @@ describe('Signal', () => { assert.isTrue(errorSpy.calledTwice) }) - it('unmounts the signal when it is unsubscribed', () => { + it('unmounts the original signal when it is unsubscribed', () => { + const unmount = sinon.spy() + const s = new Signal(() => unmount) + const t = Signal.never() + const a = s.zipWith(always(), t).subscribe(always()) + + a.unsubscribe() + + assert.isTrue(unmount.calledOnce) + }) + + it('unmounts the zipped signal when it is unsubscribed', () => { const unmount = sinon.spy() const s = Signal.never() const t = new Signal(() => unmount) @@ -687,7 +731,17 @@ describe('Signal', () => { assert.isTrue(nextSpy.secondCall.calledWithExactly('bar')) }) - it('unmounts the signal when it is unsubscribed', () => { + it('unmounts the original signal when it is unsubscribed', () => { + const unmount = sinon.spy() + const s = new Signal(() => unmount) + const a = s.switch().subscribe(always()) + + a.unsubscribe() + + assert.isTrue(unmount.calledOnce) + }) + + it('unmounts the returned signal when it is unsubscribed', () => { const unmount = sinon.spy() const s = new Signal(() => unmount) const t = Signal.of(s) @@ -718,16 +772,5 @@ describe('Signal', () => { clock.tick(1000) assert.isTrue(nextSpy.secondCall.calledWithExactly('bar')) }) - - it('unmounts the signal when it is unsubscribed', () => { - const unmount = sinon.spy() - const s = Signal.of(0) - const t = new Signal(() => unmount) - const a = s.encode(t).subscribe(always()) - - a.unsubscribe() - - assert.isTrue(unmount.calledOnce) - }) }) })