Skip to content

Commit

Permalink
Ensure signal combinators are unsubscribed
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Bassett committed Dec 27, 2017
1 parent e945f63 commit 0f6dd57
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 34 deletions.
63 changes: 43 additions & 20 deletions src/signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,20 @@ export default class Signal {
* @returns A new signal.
*/
concatMap (f) {
let subscription2

return new Signal(emit => {
const next = a => f(a).subscribe(emit.next, emit.error)
return this.subscribe({...emit, next})
const next = a => {
if (subscription2) { subscription2.unsubscribe() }
subscription2 = f(a).subscribe(emit)
}

const subscription1 = this.subscribe({...emit, next})

return () => {
subscription1.unsubscribe()
if (subscription2) { subscription2.unsubscribe() }
}
})
}

Expand Down Expand Up @@ -428,7 +439,7 @@ export default class Signal {
* return a + b
* }, 0)
*
* @param f A binary function.
* @param f A ternary function.
* @param a A starting value.
* @returns A new signal.
*
Expand Down Expand Up @@ -463,12 +474,15 @@ export default class Signal {
if (++count > ss.length) { emit.complete() }
}

this.subscribe({...emit, complete})
const subscription = this.subscribe({...emit, complete})

// Emit values from any signal.
const subscriptions = ss.map(s => s.subscribe({...emit, complete}))

return () => subscriptions.forEach(s => s.unsubscribe())
return () => {
subscription.unsubscribe()
subscriptions.forEach(s => s.unsubscribe())
}
})
}

Expand Down Expand Up @@ -512,11 +526,13 @@ export default class Signal {
if (++count >= 2) { emit.complete() }
}

this.subscribe(a => next(a, 0), emit.error, complete)
const subscription1 = this.subscribe(a => next(a, 0), emit.error, complete)
const subscription2 = s.subscribe(a => next(a, 1), emit.error, complete)

const subscription = s.subscribe(a => next(a, 1), emit.error, complete)

return () => subscription.unsubscribe()
return () => {
subscription1.unsubscribe()
subscription2.unsubscribe()
}
})
}

Expand Down Expand Up @@ -547,13 +563,16 @@ export default class Signal {
// Buffer the value.
const next = a => { lastValue = a }

this.subscribe({...emit, next})
const subscription1 = this.subscribe({...emit, next})

// Emit the buffered value.
const subscription = s.subscribe(a => emit.next(f(lastValue, a)), emit.error)
const subscription2 = s.subscribe(a => emit.next(f(lastValue, a)), emit.error)

// Unsubscribe the sampler.
return () => subscription.unsubscribe()
return () => {
subscription1.unsubscribe()
subscription2.unsubscribe()
}
})
}

Expand Down Expand Up @@ -582,13 +601,16 @@ export default class Signal {
return new Signal(emit => {
const next = a => { if (!lastValue) { emit.next(a) } }

this.subscribe({...emit, next})
const subscription1 = this.subscribe({...emit, next})

// Store the hold value.
const subscription = s.subscribe(a => { lastValue = p(a) }, emit.error)
const subscription2 = s.subscribe(a => { lastValue = p(a) }, emit.error)

// Unsubscribe the sampler.
return () => subscription.unsubscribe()
return () => {
subscription1.unsubscribe()
subscription2.unsubscribe()
}
})
}

Expand Down Expand Up @@ -621,19 +643,20 @@ export default class Signal {
* @returns A new signal.
*/
switch () {
let subscription
let subscription2

return new Signal(emit => {
const next = a => {
if (subscription) { subscription.unsubscribe() }
if (subscription2) { subscription2.unsubscribe() }
if (!(a instanceof Signal)) { throw new Error('Signal value must be a signal') }
subscription = a.subscribe(emit)
subscription2 = a.subscribe(emit)
}

this.subscribe({...emit, next})
const subscription1 = this.subscribe({...emit, next})

return () => {
if (subscription) { subscription.unsubscribe() }
subscription1.unsubscribe()
if (subscription2) { subscription2.unsubscribe() }
}
})
}
Expand Down
71 changes: 57 additions & 14 deletions test/signal_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,28 @@ describe('Signal', () => {
s.concatMap(always()).subscribe({error: errorSpy})
assert.isTrue(errorSpy.calledOnce)
})

it('unmounts the original signal when it is unsubscribed', () => {
const unmount = sinon.spy()
const s = new Signal(() => unmount)
const f = a => Signal.of(a)
const a = s.concatMap(f).subscribe(always())

a.unsubscribe()

assert.isTrue(unmount.calledOnce)
})

it('unmounts the returned signal when it is unsubscribed', () => {
const unmount = sinon.spy()
const s = Signal.of(0)
const f = a => new Signal(() => unmount)
const a = s.concatMap(f).subscribe(always())

a.unsubscribe()

assert.isTrue(unmount.calledOnce)
})
})

describe('#map', () => {
Expand Down Expand Up @@ -472,7 +494,18 @@ describe('Signal', () => {
assert.isTrue(errorSpy.calledTwice)
})

it('unmounts the signal when it is unsubscribed', () => {
it('unmounts the original signal when it is unsubscribed', () => {
const unmount = sinon.spy()
const s = new Signal(() => unmount)
const t = Signal.never()
const a = s.merge(t).subscribe(always())

a.unsubscribe()

assert.isTrue(unmount.calledOnce)
})

it('unmounts the merged signal when it is unsubscribed', () => {
const unmount = sinon.spy()
const s = Signal.never()
const t = new Signal(() => unmount)
Expand Down Expand Up @@ -544,7 +577,18 @@ describe('Signal', () => {
assert.isTrue(errorSpy.calledTwice)
})

it('unmounts the signal when it is unsubscribed', () => {
it('unmounts the original signal when it is unsubscribed', () => {
const unmount = sinon.spy()
const s = new Signal(() => unmount)
const t = Signal.never()
const a = s.zipWith(always(), t).subscribe(always())

a.unsubscribe()

assert.isTrue(unmount.calledOnce)
})

it('unmounts the zipped signal when it is unsubscribed', () => {
const unmount = sinon.spy()
const s = Signal.never()
const t = new Signal(() => unmount)
Expand Down Expand Up @@ -687,7 +731,17 @@ describe('Signal', () => {
assert.isTrue(nextSpy.secondCall.calledWithExactly('bar'))
})

it('unmounts the signal when it is unsubscribed', () => {
it('unmounts the original signal when it is unsubscribed', () => {
const unmount = sinon.spy()
const s = new Signal(() => unmount)
const a = s.switch().subscribe(always())

a.unsubscribe()

assert.isTrue(unmount.calledOnce)
})

it('unmounts the returned signal when it is unsubscribed', () => {
const unmount = sinon.spy()
const s = new Signal(() => unmount)
const t = Signal.of(s)
Expand Down Expand Up @@ -718,16 +772,5 @@ describe('Signal', () => {
clock.tick(1000)
assert.isTrue(nextSpy.secondCall.calledWithExactly('bar'))
})

it('unmounts the signal when it is unsubscribed', () => {
const unmount = sinon.spy()
const s = Signal.of(0)
const t = new Signal(() => unmount)
const a = s.encode(t).subscribe(always())

a.unsubscribe()

assert.isTrue(unmount.calledOnce)
})
})
})

0 comments on commit 0f6dd57

Please sign in to comment.