-
Notifications
You must be signed in to change notification settings - Fork 431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Port groupBy
from REX
#200
Conversation
Sources/SignalProducer.swift
Outdated
let producer = SignalProducer(signal).replayLazily(upTo: Int.max) | ||
|
||
// Start the buffering immediately. | ||
producer.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be disposed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok 👍
Sources/SignalProducer.swift
Outdated
let key = grouping(value) | ||
var group: Signal<Value, Error>.Observer? | ||
groups.modify { | ||
group = $0[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Nit] For multi-line modifications, it'd be better to use named parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
Sources/SignalProducer.swift
Outdated
switch event { | ||
case let .value(value): | ||
let key = grouping(value) | ||
var group: Signal<Value, Error>.Observer? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can dodge the need of an optional by using:
let group: Signal<Value, Error>.Observer = groups.modify { groups in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, didn't know about this!
} | ||
} | ||
group!.send(value: value) | ||
case let .failed(error): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Nit] Blank line before the pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Sources/SignalProducer.swift
Outdated
return SignalProducer<(Key, SignalProducer<Value, Error>), Error> { observer, disposable in | ||
let groups = Atomic<[Key: Signal<Value, Error>.Observer]>([:]) | ||
|
||
self.start { event in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The start disposable should be added to the producer disposable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/// to the group to which the value belongs to (as determined by the key) | ||
public func groupBy<Key: Hashable>(_ grouping: @escaping (Value) -> Key) -> SignalProducer<(Key, SignalProducer<Value, Error>), Error> { | ||
return SignalProducer<(Key, SignalProducer<Value, Error>), Error> { observer, disposable in | ||
let groups = Atomic<[Key: Signal<Value, Error>.Observer]>([:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem necessary to specialise it. Is it a workaround to the compiler's weirdness or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid I don't know what exactly you mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean if you really need to write SignalProducer<(Key, SignalProducer<Value, Error>), Error>
when the type parameters can be inferred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that line. Yeah that works without explicit annotations 👍
@@ -1666,5 +1666,47 @@ class SignalProducerLiftingSpec: QuickSpec { | |||
expect(latestValues?.1) == 2 | |||
} | |||
} | |||
|
|||
describe("groupBy") { | |||
let (signal, observer) = Signal<Int, NoError>.pipe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please wrap test cases in it
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woops
|
||
disposable += producer | ||
.groupBy { $0 % 2 == 0 } | ||
.start(Observer(value: { key, group in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like start
that takes an event action would be nicer here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright! 👍
disposable.dispose() | ||
|
||
observer.send(value: 1) | ||
expect(interrupted) == true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be checked immediately after the disposal.
@@ -1666,5 +1666,47 @@ class SignalProducerLiftingSpec: QuickSpec { | |||
expect(latestValues?.1) == 2 | |||
} | |||
} | |||
|
|||
describe("groupBy") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great if there are specialized test cases than one giant one. At least this single test case apparently did not catch the undisposed upstreams, caused by this line.
Say we could use a few test cases on how the inner producers behaves with regard to an outer terminal event, and keep this test case focusing on values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah right, i've split this up into 2 separated cases and extended them to check the lifetime of the inner producers - now the cases catch the disposal issue you've found!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps one per terminal event, hmm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I can do that.
Should we check for all events that only this event is sent (e.g. when sending completed, also check that interrupted was not received), or can we trust that when completed
is received, interrupted
can not be received?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think asserting just the specific event is enough. The Signal
contract guarantees only one would ever be sent. If multiple terminal events are sent, this would be a fault in the Signal
basics and should have been caught somewhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright
expect(evens) == [2] | ||
expect(odds) == [1, 3] | ||
} | ||
it("should terminate correctly on disposal") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Nit] A blank line please. 🙏
} else { | ||
group.startWithValues { odds.append($0)} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Nit] This close bracket could be aligned better.
Sources/SignalProducer.swift
Outdated
/// - returns: A producer of producers amits one producer for each group and forwards | ||
/// each value from the original producer to the inner producer corresponding | ||
/// to the group to which the value belongs to (as determined by the key) | ||
public func groupBy<Key: Hashable>(_ grouping: @escaping (Value) -> Key) -> SignalProducer<(Key, SignalProducer<Value, Error>), Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the API convention though, it should be group(by:)
. Would the parameter be better named classifier
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried that for a short moment, but on the call site that would then look like producer.group { ... }
instead of producer.groupBy { ... }
when using trailing closure syntax.
Not sure if thats a problem though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have skip(while:)
and take(while:)
working in this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright! 👍
@andersio all done 😃 |
expect(oddCompleted) == true | ||
} | ||
|
||
it("should terminate correctly receiving an error event") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/error/failed/
@mdiep ping |
Oops! Sorry @iv-mexx. I put this back on my todo list and will get to it this week. |
return group | ||
} else { | ||
let (signal, innerObserver) = Signal<Value, Error>.pipe() | ||
let producer = SignalProducer(signal).replayLazily(upTo: Int.max) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is replaying values an essential part of group(by:)
? This is my primary hesitation. In general, we try to discourage the use of replaying (which is why buffer
was removed).
(You could just as easily send the Signal
and not send values on the Signal
until it's been sent—giving observers a chance to observe the signal.)
If you feel that it is, I'd love to see a larger code sample that demonstrates why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, otherwise consumers could miss events without realizing it. Hence why I erred on the side of buffering. I could be convinced otherwise, but I'm not sure how to best communicate those semantics at the operator level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think returning Signal
s instead of SignalProducer
s communicates the semantics.
But I think seeing an actual example of this operator will make it easier to see whether that's a viable change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've posted an example how we use the groupBy
in a project in the Issue: #197 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm reading this correctly, RxJS is using Hot Observables for the groups (via refCount
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As much as I agree with @mdiep about the semantics, in my experience having buffering in this operator (specifically RxJava
's Observable#groupBy
) has been valuable .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps should the buffering be an option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(You could just as easily send the Signal and not send values on the Signal until it's been sent—giving observers a chance to observe the signal.)
While this is the ideal case, it seems the buffering is essential, say when using with flatten(.concat)
, which can delay the establishment of the observation.
Edit: I'd suggest to annotate it with warnings for the potential of indefinite buffering, and perhaps offer a non-buffering variant.
Edit: If we have to find a middle ground, perhaps the emitted inner producers should buffer until & only replay values at the first time it started (future work: a configurable #?). Subsequent starts would be invalid and emit interrupted
. This should satisfy the common use cases, e.g. @iv-mexx's example, which do not retain and repeatedly start the inner producers.
Sources/SignalProducer.swift
Outdated
/// - parameters: | ||
/// - grouping: a closure that determines the grouping key for a given value | ||
/// | ||
/// - returns: A producer of producers amits one producer for each group and forwards |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo amits
I've been thinking about this quite a bit. The replaying of values makes me pretty uncomfortable, so I've felt a bit stuck and unsure how to resolve this. But after thinking about it, I no longer think that class BluetoothManager {
private let allBeaconsSignal: Signal<CLBeacon, NoError>
private var _beacons: [(Int, Int): (Signal<_, _>, Signal<_, _>.Observer)]
public let beacons: Signal<(CLBeacon, Signal<_, _>), NoError>
private let observer: Signal<(CLBeacon, Signal<_, _>), NoError>.Observer
init() {
// I'm not exactly sure how this should all work. I was working off the given example code,
// which is tightly focused on the groupBy. But this will hopefully communicate the idea,
// which is that the class is responsible for grouping values, notifying interested parties
// that new values have come in and giving them an opportunity to subscribe to events.
let (beacons, observer) = Signal<(CLBeacon, Signal), NoError>.pipe()
allBeaconsSignal.observeValues { beacon in
let identifier = (beacon.major, beacon.minor)
if _beacons[identifier] == nil {
let (signal, observer) = Signal<_, _>.pipe()
self.observer.send(value: (beacon, signal))
}
}
}
} But if the rest of the core team feels that this does make sense to add, they can definitely override me. 😄 |
In lieu of the rise of unidirectional data flow in the Swift community, perhaps the class of problems intended for Closing because of inactivity. |
As per #197, this is a Port of
groupBy
from REX.I've only changed it to use
Atomic
instead of theNSRecursiveLock