From b5df08c3d4f6acef2a595e8e3aa7ce64f01c4d8f Mon Sep 17 00:00:00 2001 From: Ian Dundas Date: Sat, 5 Mar 2016 12:01:05 +0700 Subject: [PATCH] Added take(x) operator & unit tests --- ReactiveKit.xcodeproj/project.pbxproj | 2 +- ReactiveKit/Streams/StreamType.swift | 25 +++++++++ ReactiveKitTests/StreamSpec.swift | 75 +++++++++++++++++++++++++++ 3 files changed, 101 insertions(+), 1 deletion(-) diff --git a/ReactiveKit.xcodeproj/project.pbxproj b/ReactiveKit.xcodeproj/project.pbxproj index dd88645..32fc498 100644 --- a/ReactiveKit.xcodeproj/project.pbxproj +++ b/ReactiveKit.xcodeproj/project.pbxproj @@ -199,7 +199,7 @@ ECBCCE011BEB6BBE00723476 /* Queue.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Queue.swift; sourceTree = ""; }; ECBCCE041BEB6BBE00723476 /* ActiveStream.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ActiveStream.swift; sourceTree = ""; }; ECBCCE051BEB6BBE00723476 /* Stream.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Stream.swift; sourceTree = ""; }; - ECBCCE061BEB6BBE00723476 /* StreamType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = StreamType.swift; sourceTree = ""; }; + ECBCCE061BEB6BBE00723476 /* StreamType.swift */ = {isa = PBXFileReference; fileEncoding = 4; indentWidth = 2; lastKnownFileType = sourcecode.swift; path = StreamType.swift; sourceTree = ""; tabWidth = 2; }; ECBCCE2A1BEB6BE100723476 /* NoError.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = NoError.swift; path = ../Operation/NoError.swift; sourceTree = ""; }; ECBCCE2B1BEB6BE100723476 /* Stream+Operation.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Stream+Operation.swift"; sourceTree = ""; }; ECBCCE2C1BEB6BE100723476 /* Operation.swift */ = {isa = PBXFileReference; fileEncoding = 4; indentWidth = 2; lastKnownFileType = sourcecode.swift; path = Operation.swift; sourceTree = ""; tabWidth = 2; }; diff --git a/ReactiveKit/Streams/StreamType.swift b/ReactiveKit/Streams/StreamType.swift index ba30d3e..d14a34d 100644 --- a/ReactiveKit/Streams/StreamType.swift +++ b/ReactiveKit/Streams/StreamType.swift @@ -136,6 +136,31 @@ extension StreamType { } } + @warn_unused_result + public func take(count:Int) -> Stream { + return create { observer in + + var remainder = count + let outerDisposable = CompositeDisposable() + + let disposable = self.observe(on: nil, observer: { event in + if remainder > 0{ + remainder-- + + observer(event) + } + if remainder == 0{ + outerDisposable.dispose() + } + }) + + outerDisposable.addDisposable(disposable) + return outerDisposable + } + } + + + @warn_unused_result public func skip(var count: Int) -> Stream { return create { observer in diff --git a/ReactiveKitTests/StreamSpec.swift b/ReactiveKitTests/StreamSpec.swift index 08c14e8..8bd49b9 100644 --- a/ReactiveKitTests/StreamSpec.swift +++ b/ReactiveKitTests/StreamSpec.swift @@ -184,6 +184,81 @@ class StreamSpec: QuickSpec { } } } + + context("when taking 0") { + var observedEvents: [Int] = [] + var disposable: DisposableType! + + beforeEach { + disposable = stream.take(0).observe(on: ImmediateExecutionContext) { + observedEvents.append($0) + } + } + + it("does takes nothing") { + expect(observedEvents) == [] + } + + describe("can be disposed") { + beforeEach { + disposable.dispose() + } + + it("is disposed") { + expect(simpleDisposable.isDisposed).to(beTrue()) + } + } + } + + context("when taking 1") { + var observedEvents: [Int] = [] + var disposable: DisposableType! + + beforeEach { + disposable = stream.take(1).observe(on: ImmediateExecutionContext) { + observedEvents.append($0) + } + } + + it("does take only 1") { + expect(observedEvents) == [1] + } + + describe("can be disposed") { + beforeEach { + disposable.dispose() + } + + it("is disposed") { + expect(simpleDisposable.isDisposed).to(beTrue()) + } + } + } + + context("when taking 2") { + var observedEvents: [Int] = [] + var disposable: DisposableType! + + beforeEach { + disposable = stream.take(2).observe(on: ImmediateExecutionContext) { + observedEvents.append($0) + } + } + + it("does takes 1 and 2") { + expect(observedEvents) == [1,2] + } + + describe("can be disposed") { + beforeEach { + disposable.dispose() + } + + it("is disposed") { + expect(simpleDisposable.isDisposed).to(beTrue()) + } + } + } context("when skip by 1") { var observedEvents: [Int] = []