Skip to content

Commit

Permalink
Rewrite promises to allow callback option (#19)
Browse files Browse the repository at this point in the history
* Rewrite promises to allow callback option

* setFailureType for Never

* Add more error transformation

* Add init that flattens PromiseError

* Promise from Publisher as func, not var, so it can be made generic

* Oops, fix wrong generic usage

* Oops, fix wrong generics usage

* More Promise extensions

* Promises to use NonEmptyPublisher as upstream

* Improve tests

* Promise Zip to not automatically map

* Promise timeout

* Add generic constraint
  • Loading branch information
luizmb authored Apr 22, 2021
1 parent 683b5fc commit c030f23
Show file tree
Hide file tree
Showing 14 changed files with 847 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import Foundation

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {
public func flatMapResult<T>(maxPublishers: Subscribers.Demand = .unlimited,
_ transform: @escaping (Self.Output) -> Result<T, Failure>
public func flatMapResult<T>(
maxPublishers: Subscribers.Demand = .unlimited,
_ transform: @escaping (Self.Output) -> Result<T, Failure>
) -> Publishers.FlatMap<Result<T, Failure>.Publisher, Self> {
flatMap(maxPublishers: maxPublishers) { value in
transform(value).publisher
Expand Down
61 changes: 61 additions & 0 deletions Sources/FoundationExtensions/Promise/NonEmptyPublisher.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// NonEmptyPublisher.swift
// FoundationExtensions
//
// Created by Luiz Rodrigo Martins Barbosa on 19.04.21.
// Copyright © 2021 Lautsprecher Teufel GmbH. All rights reserved.
//

#if canImport(Combine)
import Combine
import Foundation

/// A Publisher that ensures that at least 1 value will be emitted before successful completion, but not necessarily in case of failure completion.
/// This requires a fallback success or error in case the upstream decides to terminate empty.
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct NonEmptyPublisher<Upstream: Publisher>: Publisher {
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure

private let upstream: Upstream
let fallback: () -> Result<Output, Failure>

public init(upstream: Upstream, onEmpty fallback: @escaping () -> Result<Output, Failure>) {
self.upstream = upstream
self.fallback = fallback
}

private enum EmptyStream {
case empty
case someValue(Output)
}

public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
upstream
.map(EmptyStream.someValue)
.replaceEmpty(with: EmptyStream.empty)
.flatMap { valueType -> AnyPublisher<Output, Failure> in
switch valueType {
case .empty:
switch fallback() {
case let .success(fallbackValue):
return Just(fallbackValue).setFailureType(to: Failure.self).eraseToAnyPublisher()
case let .failure(fallbackError):
return Fail(error: fallbackError).eraseToAnyPublisher()
}
case let .someValue(value):
return Just(value).setFailureType(to: Failure.self).eraseToAnyPublisher()
}
}
.subscribe(subscriber)
}
}

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension NonEmptyPublisher where Upstream: PromiseType {
public init(upstream: Upstream) {
self.upstream = upstream
self.fallback = { fatalError() }
}
}
#endif
46 changes: 46 additions & 0 deletions Sources/FoundationExtensions/Promise/Promise+PerformInQueue.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// Promise+PerformInQueue.swift
// FoundationExtensions
//
// Created by Luiz Rodrigo Martins Barbosa on 17.04.21.
// Copyright © 2021 Lautsprecher Teufel GmbH. All rights reserved.
//

#if canImport(Combine)
import Combine
import Dispatch
import Foundation

extension DispatchWorkItem: Cancellable { }

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers.Promise {
public static func perform(in queue: DispatchQueue, operation: @escaping () throws -> Output) -> Self where Failure == Error {
.init { completion in
let workItem = DispatchWorkItem {
do {
let value = try operation()
completion(.success(value))
} catch {
completion(.failure(error))
}
}

queue.async(execute: workItem)
return workItem
}
}

public static func perform(in queue: DispatchQueue, operation: @escaping () -> Output) -> Self where Failure == Never {
.init { completion in
let workItem = DispatchWorkItem {
let value = operation()
completion(.success(value))
}

queue.async(execute: workItem)
return workItem
}
}
}
#endif
82 changes: 63 additions & 19 deletions Sources/FoundationExtensions/Promise/Promise.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import class Foundation.NSRecursiveLock

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers {

/// A Promise is a Publisher that lives in between First and Deferred.
/// It will listen to one and only one event, and finish successfully, or finish with error without any successful output.
/// It will hang until an event arrives from upstream. If the upstream is eager, it will be deferred, that means it won't
Expand All @@ -28,15 +27,57 @@ extension Publishers {
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct Promise<Success, Failure: Error>: PromiseType {
public typealias Output = Success
public typealias Failure = Failure
public typealias CompletionHandler = (Result<Success, Failure>) -> Void
public typealias OperationClosure = (@escaping CompletionHandler) -> Cancellable

struct SinkNotification {
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
let receiveValue: (Output) -> Void
}
typealias SinkClosure = (SinkNotification) -> AnyCancellable

private let upstream: () -> AnyPublisher<Success, Failure>
private let operation: SinkClosure

/// A promise from an upstream publisher. Because this is an closure parameter, the upstream will become a factory
/// and its creation will be deferred until there's some positive demand from the downstream.
/// - Parameter upstream: a closure that creates an upstream publisher. This is an closure, so creation will be deferred.
public init<P: Publisher>(_ upstream: @escaping () -> P) where P.Output == Success, P.Failure == Failure {
self.upstream = { upstream().eraseToAnyPublisher() }
public init<P: Publisher>(_ upstream: @escaping () -> NonEmptyPublisher<P>) where P.Output == Success, P.Failure == Failure {
self.init(upstreamUncheckedForEmptiness: upstream)
}

// https://www.fivestars.blog/articles/disfavoredOverload/
@_disfavoredOverload
internal init<P: Publisher>(upstreamUncheckedForEmptiness upstream: @escaping () -> P) where P.Output == Success, P.Failure == Failure {
self.operation = { sinkNotification in
upstream()
.first()
.sink(
receiveCompletion: sinkNotification.receiveCompletion,
receiveValue: sinkNotification.receiveValue
)
}
}

public init(_ promise: @escaping () -> Promise<Success, Failure>) {
self = promise()
}

public init(operation: @escaping OperationClosure) {
self.operation = { sinkNotification in
let cancellable = operation { result in
switch result {
case let .failure(error):
sinkNotification.receiveCompletion(.failure(error))
case let .success(value):
sinkNotification.receiveValue(value)
sinkNotification.receiveCompletion(.finished)
}
}

return AnyCancellable {
cancellable.cancel()
}
}
}

/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
Expand All @@ -46,10 +87,10 @@ extension Publishers {
/// - subscriber: The subscriber to attach to this `Publisher`.
/// once attached it can begin to receive values.
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Success == S.Input {
subscriber.receive(subscription: Subscription.init(upstream: upstream, downstream: subscriber))
subscriber.receive(subscription: Subscription(operation: operation, downstream: subscriber))
}

public func asPromise() -> Publishers.Promise<Success, Failure> {
public var promise: Publishers.Promise<Success, Failure> {
self
}
}
Expand All @@ -60,12 +101,12 @@ extension Publishers.Promise {
class Subscription<Downstream: Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
private let lock = NSRecursiveLock()
private var hasStarted = false
private let upstream: () -> AnyPublisher<Success, Failure>
private let operation: SinkClosure
private let downstream: Downstream
private var cancellable: AnyCancellable?

public init(upstream: @escaping () -> AnyPublisher<Output, Failure>, downstream: Downstream) {
self.upstream = upstream
public init(operation: @escaping SinkClosure, downstream: Downstream) {
self.operation = operation
self.downstream = downstream
}

Expand All @@ -79,17 +120,20 @@ extension Publishers.Promise {

guard shouldRun else { return }

cancellable = upstream()
.first()
.sink(
receiveCompletion: { [weak self] completion in
self?.downstream.receive(completion: completion)
},
receiveValue: { [weak self] value in
_ = self?.downstream.receive(value)
cancellable = operation(SinkNotification(
receiveCompletion: { [weak self] result in
switch result {
case .finished:
self?.downstream.receive(completion: .finished)
case let .failure(error):
self?.downstream.receive(completion: .failure(error))
}
)
},
receiveValue: { [weak self] value in
_ = self?.downstream.receive(value)
self?.downstream.receive(completion: .finished)
}
))
}

func cancel() {
Expand Down
Loading

0 comments on commit c030f23

Please sign in to comment.