diff --git a/Sources/Core/Emitter/Emitter.swift b/Sources/Core/Emitter/Emitter.swift index 1c51f973b..30e4cc59b 100644 --- a/Sources/Core/Emitter/Emitter.swift +++ b/Sources/Core/Emitter/Emitter.swift @@ -276,11 +276,14 @@ class Emitter: EmitterEventProcessing { } /// Insert a Payload object into the buffer to be sent to collector. - /// This method will add the payload to the database and flush (send all events). + /// This method will add the payload to the database and flush (send all events) when the buffer is full. /// - Parameter eventPayload: A Payload containing a completed event to be added into the buffer. func addPayload(toBuffer eventPayload: Payload) { self.eventStore.addEvent(eventPayload) - self.flush() + + if self.eventStore.count() >= self.bufferOption.rawValue { + self.flush() + } } /// Empties the buffer of events using the respective HTTP request method. @@ -377,46 +380,43 @@ class Emitter: EmitterEventProcessing { requests.append(request) } } else { - var i = 0 - while i < events.count { - var eventArray: [Payload] = [] - var indexArray: [Int64] = [] - - let iUntil = min(i + bufferOption.rawValue, events.count) - for j in i.. separate requests + if isOversize(payload, byteLimit: byteLimit) { + let request = Request(payload: payload, emitterEventId: emitterEventId, oversize: true) + requests.append(request) } - - // Check if all payloads have been processed - if eventArray.count != 0 { - let request = Request(payloads: eventArray, emitterEventIds: indexArray) + // Events up to this one are oversize -> create request for them + else if isOversize(payload, byteLimit: byteLimit, previousPayloads: eventPayloads) { + let request = Request(payloads: eventPayloads, emitterEventIds: eventIds) requests.append(request) + + // Clear collection and build a new POST + eventPayloads = [] + eventIds = [] + + // Build and store the request + eventPayloads.append(payload) + eventIds.append(emitterEventId) } - i += bufferOption.rawValue + // Add to the list of events for the request + else { + eventPayloads.append(payload) + eventIds.append(emitterEventId) + } + } + + // Check if there are any remaining events not in a request + if !eventPayloads.isEmpty { + let request = Request(payloads: eventPayloads, emitterEventIds: eventIds) + requests.append(request) } } return requests diff --git a/Sources/Snowplow/Configurations/EmitterConfiguration.swift b/Sources/Snowplow/Configurations/EmitterConfiguration.swift index 2d04f1f5b..b369862b5 100644 --- a/Sources/Snowplow/Configurations/EmitterConfiguration.swift +++ b/Sources/Snowplow/Configurations/EmitterConfiguration.swift @@ -16,7 +16,7 @@ import Foundation @objc(SPEmitterConfigurationProtocol) public protocol EmitterConfigurationProtocol: AnyObject { /// Sets whether the buffer should send events instantly or after the buffer - /// has reached it's limit. By default, this is set to BufferOption Default. + /// has reached it's limit. By default, this is set to BufferOption single. @objc var bufferOption: BufferOption { get set } /// Maximum number of events collected from the EventStore to be sent in a request. diff --git a/Sources/Snowplow/Emitter/BufferOption.swift b/Sources/Snowplow/Emitter/BufferOption.swift index 36d12a51c..c6cd7ac2c 100644 --- a/Sources/Snowplow/Emitter/BufferOption.swift +++ b/Sources/Snowplow/Emitter/BufferOption.swift @@ -16,14 +16,16 @@ import Foundation /// An enum for buffer options. @objc(SPBufferOption) public enum BufferOption : Int { - /// Sends both GET and POST requests with only a single event. Can cause a spike in - /// network traffic if used in correlation with a large amount of events. + /// Sends both GET and POST requests with only a single event. + /// This is the default setting. + /// Can cause a spike in network traffic if used in correlation with a large amount of events. case single = 1 - /// Sends POST requests in groups of 10 events. This is the default amount of events too - /// package into a POST. All GET requests will still emit one at a time. - case defaultGroup = 10 - /// Sends POST requests in groups of 25 events. Useful for situations where many events - /// need to be sent. All GET requests will still emit one at a time. + /// Sends POST requests in groups of 10 events. + /// All GET requests will still emit one at a time. + case smallGroup = 10 + /// Sends POST requests in groups of 25 events. + /// Useful for situations where many events need to be sent. + /// All GET requests will still emit one at a time. case largeGroup = 25 } @@ -32,8 +34,12 @@ extension BufferOption { switch value { case "Single": return .single + case "SmallGroup": + return .smallGroup case "DefaultGroup": - return .defaultGroup + return .smallGroup + case "LargeGroup": + return .largeGroup case "HeavyGroup": return .largeGroup default: diff --git a/Sources/Snowplow/Emitter/EmitterDefaults.swift b/Sources/Snowplow/Emitter/EmitterDefaults.swift index c4d36d060..7937c1414 100644 --- a/Sources/Snowplow/Emitter/EmitterDefaults.swift +++ b/Sources/Snowplow/Emitter/EmitterDefaults.swift @@ -16,11 +16,11 @@ import Foundation public class EmitterDefaults { public private(set) static var httpMethod: HttpMethodOptions = .post public private(set) static var httpProtocol: ProtocolOptions = .https - public private(set) static var emitRange = 150 + public private(set) static var emitRange = BufferOption.largeGroup.rawValue public private(set) static var emitThreadPoolSize = 15 public private(set) static var byteLimitGet = 40000 public private(set) static var byteLimitPost = 40000 public private(set) static var serverAnonymisation = false - public private(set) static var bufferOption: BufferOption = .defaultGroup + public private(set) static var bufferOption: BufferOption = .single public private(set) static var retryFailedRequests = true } diff --git a/Sources/Snowplow/Network/DefaultNetworkConnection.swift b/Sources/Snowplow/Network/DefaultNetworkConnection.swift index 143339764..9eafd8d2f 100644 --- a/Sources/Snowplow/Network/DefaultNetworkConnection.swift +++ b/Sources/Snowplow/Network/DefaultNetworkConnection.swift @@ -96,47 +96,68 @@ public class DefaultNetworkConnection: NSObject, NetworkConnection { @objc public func sendRequests(_ requests: [Request]) -> [RequestResult] { + let urlRequests = requests.map { _httpMethod == .get ? buildGet($0) : buildPost($0) } + var results: [RequestResult] = [] - - for request in requests { - let urlRequest = _httpMethod == .get - ? buildGet(request) - : buildPost(request) - - dataOperationQueue.addOperation({ - //source: https://forums.developer.apple.com/thread/11519 - var httpResponse: HTTPURLResponse? = nil - var connectionError: Error? = nil - var sem: DispatchSemaphore - - sem = DispatchSemaphore(value: 0) - - URLSession.shared.dataTask(with: urlRequest) { data, urlResponse, error in - connectionError = error - httpResponse = urlResponse as? HTTPURLResponse - sem.signal() - }.resume() - - let _ = sem.wait(timeout: .distantFuture) - var statusCode: NSNumber? - if let httpResponse = httpResponse { statusCode = NSNumber(value: httpResponse.statusCode) } - - let result = RequestResult(statusCode: statusCode, oversize: request.oversize, storeIds: request.emitterEventIds) - if !result.isSuccessful { - logError(message: "Connection error: " + (connectionError?.localizedDescription ?? "-")) - } - - objc_sync_enter(self) + // if there is only one request, make it directly + if requests.count == 1 { + if let request = requests.first, let urlRequest = urlRequests.first { + let result = DefaultNetworkConnection.makeRequest( + request: request, + urlRequest: urlRequest + ) + results.append(result) - objc_sync_exit(self) - }) + } + } + // if there are more than 1 request, use the operation queue + else if requests.count > 1 { + for (request, urlRequest) in zip(requests, urlRequests) { + dataOperationQueue.addOperation({ + let result = DefaultNetworkConnection.makeRequest( + request: request, + urlRequest: urlRequest + ) + + objc_sync_enter(self) + results.append(result) + objc_sync_exit(self) + }) + } + dataOperationQueue.waitUntilAllOperationsAreFinished() } - dataOperationQueue.waitUntilAllOperationsAreFinished() + return results } // MARK: - Private methods + private static func makeRequest(request: Request, urlRequest: URLRequest) -> RequestResult { + //source: https://forums.developer.apple.com/thread/11519 + var httpResponse: HTTPURLResponse? = nil + var connectionError: Error? = nil + var sem: DispatchSemaphore + + sem = DispatchSemaphore(value: 0) + + URLSession.shared.dataTask(with: urlRequest) { data, urlResponse, error in + connectionError = error + httpResponse = urlResponse as? HTTPURLResponse + sem.signal() + }.resume() + + let _ = sem.wait(timeout: .distantFuture) + var statusCode: NSNumber? + if let httpResponse = httpResponse { statusCode = NSNumber(value: httpResponse.statusCode) } + + let result = RequestResult(statusCode: statusCode, oversize: request.oversize, storeIds: request.emitterEventIds) + if !result.isSuccessful { + logError(message: "Connection error: " + (connectionError?.localizedDescription ?? "-")) + } + + return result + } + private func setup() { // Decode url to extract protocol let url = URL(string: _urlString) diff --git a/Tests/Legacy Tests/LegacyTestEmitter.swift b/Tests/TestEmitter.swift similarity index 79% rename from Tests/Legacy Tests/LegacyTestEmitter.swift rename to Tests/TestEmitter.swift index 1477bce80..c4b72d0cb 100644 --- a/Tests/Legacy Tests/LegacyTestEmitter.swift +++ b/Tests/TestEmitter.swift @@ -14,29 +14,9 @@ import XCTest @testable import SnowplowTracker -//class BrokenNetworkConnection: NetworkConnection { -// func sendRequests(_ requests: [Request]) -> [RequestResult] { -// NSException.raise("BrokenNetworkConnection", format: "Fake exception on network connection.") -// return nil -// } -// -// var urlEndpoint: URL? { -// NSException.raise("BrokenNetworkConnection", format: "Fake exception on network connection.") -// return nil -// } -// -// var httpMethod: HttpMethodOptions { -// NSException.raise("BrokenNetworkConnection", format: "Fake exception on network connection.") -// return .get -// } -//} - -//#pragma clang diagnostic push -//#pragma clang diagnostic ignored "-Wdeprecated-declarations" - let TEST_SERVER_EMITTER = "www.notarealurl.com" -class LegacyTestEmitter: XCTestCase { +class TestEmitter: XCTestCase { override func setUp() { super.setUp() Logger.logLevel = .verbose @@ -120,26 +100,12 @@ class LegacyTestEmitter: XCTestCase { // MARK: - Emitting tests -// func testEmitEventWithBrokenNetworkConnectionDoesntFreezeStatus() { -// let networkConnection = SPBrokenNetworkConnection() -// let emitter = self.emitter(with: networkConnection, bufferOption: SPBufferOptionSingle) -// emitter?.addPayload(toBuffer: generatePayloads(1)?.first) -// -// Thread.sleep(forTimeInterval: 1) -// -// XCTAssertFalse(emitter?.getSendingStatus()) -// -// emitter?.flush() -// } - func testEmitSingleGetEventWithSuccess() { let networkConnection = MockNetworkConnection(requestOption: .get, statusCode: 200) let emitter = self.emitter(with: networkConnection, bufferOption: .single) addPayload(generatePayloads(1).first!, emitter) - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(1, networkConnection.previousResults.count) XCTAssertEqual(1, networkConnection.previousResults.first!.count) @@ -154,9 +120,7 @@ class LegacyTestEmitter: XCTestCase { let emitter = self.emitter(with: networkConnection, bufferOption: .single) addPayload(generatePayloads(1).first!, emitter) - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(1, networkConnection.previousResults.count) XCTAssertEqual(1, networkConnection.previousResults.first!.count) @@ -174,11 +138,10 @@ class LegacyTestEmitter: XCTestCase { addPayload(payload, emitter) } - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(0, dbCount(emitter)) + XCTAssertEqual(2, networkConnection.previousResults.count) var totEvents = 0 for results in networkConnection.previousResults { for result in results { @@ -199,9 +162,7 @@ class LegacyTestEmitter: XCTestCase { addPayload(payload, emitter) } - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(2, dbCount(emitter)) for results in networkConnection.previousResults { @@ -219,9 +180,7 @@ class LegacyTestEmitter: XCTestCase { addPayload(generatePayloads(1).first!, emitter) - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(1, networkConnection.previousResults.count) XCTAssertEqual(1, networkConnection.previousResults.first?.count) @@ -235,24 +194,21 @@ class LegacyTestEmitter: XCTestCase { let payloads = generatePayloads(15) let networkConnection = MockNetworkConnection(requestOption: .post, statusCode: 500) - let emitter = self.emitter(with: networkConnection, bufferOption: .defaultGroup) + let emitter = self.emitter(with: networkConnection, bufferOption: .smallGroup) for i in 0..<14 { addPayload(payloads[i], emitter) } - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + // wait longer than the stop sending timeout + Thread.sleep(forTimeInterval: 6) XCTAssertEqual(14, dbCount(emitter)) networkConnection.statusCode = 200 let prevSendingCount = networkConnection.sendingCount addPayload(payloads[14], emitter) - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(0, dbCount(emitter)) var totEvents = 0 @@ -274,7 +230,7 @@ class LegacyTestEmitter: XCTestCase { func testEmitOversizeEventsPostAsGroup() { let networkConnection = MockNetworkConnection(requestOption: .post, statusCode: 500) - let emitter = self.emitter(with: networkConnection, bufferOption: .defaultGroup) + let emitter = self.emitter(with: networkConnection, bufferOption: .single) emitter.byteLimitPost = 5 let payloads = generatePayloads(15) @@ -282,18 +238,14 @@ class LegacyTestEmitter: XCTestCase { addPayload(payloads[i], emitter) } - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(0, dbCount(emitter)) networkConnection.statusCode = 200 _ = networkConnection.sendingCount addPayload(payloads[14], emitter) - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(0, dbCount(emitter)) @@ -369,6 +321,80 @@ class LegacyTestEmitter: XCTestCase { flush(emitter) } + + func testDoesntMakeRequestUnlessBufferSizeIsReached() { + let networkConnection = MockNetworkConnection(requestOption: .post, statusCode: 200) + let emitter = self.emitter(with: networkConnection, bufferOption: .smallGroup) + emitter.retryFailedRequests = false + + for payload in generatePayloads(9) { + addPayload(payload, emitter) + } + + Thread.sleep(forTimeInterval: 1) + + // all events waiting in queue + XCTAssertEqual(9, dbCount(emitter)) + + addPayload(generatePayloads(1).first!, emitter) + + Thread.sleep(forTimeInterval: 1) + + // all events sent + XCTAssertEqual(0, dbCount(emitter)) + + flush(emitter) + } + + func testNumberOfRequestsMatchesEmitRangeAndOversize() { + let networkConnection = MockNetworkConnection(requestOption: .post, statusCode: 200) + let emitter = self.emitter(with: networkConnection, bufferOption: .single) + emitter.emitRange = 20 + + InternalQueue.sync { emitter.pauseEmit() } + for payload in generatePayloads(20) { + addPayload(payload, emitter) + } + InternalQueue.sync { emitter.resumeEmit() } + + Thread.sleep(forTimeInterval: 0.5) + + // made a single request + XCTAssertEqual(1, networkConnection.sendingCount) + XCTAssertEqual(1, networkConnection.previousRequests.first?.count ?? 0) + + networkConnection.clear() + + InternalQueue.sync { emitter.pauseEmit() } + for payload in generatePayloads(40) { + addPayload(payload, emitter) + } + InternalQueue.sync { emitter.resumeEmit() } + + Thread.sleep(forTimeInterval: 0.5) + + // made two requests one after the other + XCTAssertEqual(2, networkConnection.sendingCount) + XCTAssertEqual(1, networkConnection.previousRequests.map { $0.count }.max()) + + networkConnection.clear() + + // test with oversize requests + emitter.byteLimitPost = 5 + InternalQueue.sync { emitter.pauseEmit() } + for payload in generatePayloads(2) { + addPayload(payload, emitter) + } + InternalQueue.sync { emitter.resumeEmit() } + + Thread.sleep(forTimeInterval: 0.5) + + // made two requests at once + XCTAssertEqual(1, networkConnection.sendingCount) + XCTAssertEqual(2, networkConnection.previousRequests.first?.count ?? 0) + + flush(emitter) + } // MARK: - Emitter builder @@ -412,4 +438,3 @@ class LegacyTestEmitter: XCTestCase { } } } -//#pragma clang diagnostic pop diff --git a/Tests/Utils/MockNetworkConnection.swift b/Tests/Utils/MockNetworkConnection.swift index 220cd5608..ce72ff9f6 100644 --- a/Tests/Utils/MockNetworkConnection.swift +++ b/Tests/Utils/MockNetworkConnection.swift @@ -48,4 +48,9 @@ class MockNetworkConnection: NSObject, NetworkConnection { previousResults.append(requestResults) return requestResults } + + func clear() { + previousRequests = [] + previousResults = [] + } }