From 3c8d1d8f73272c199be8bc4b6965b2cf508aba67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matu=CC=81s=CC=8C=20Tomlein?= Date: Wed, 29 Nov 2023 17:08:47 +0100 Subject: [PATCH] Make network requests serially in network connection (close #846) --- Examples | 2 +- Sources/Core/Emitter/Emitter.swift | 71 +++++----- .../Snowplow/Emitter/EmitterDefaults.swift | 2 +- .../Network/DefaultNetworkConnection.swift | 87 ++++++++----- ...acyTestEmitter.swift => TestEmitter.swift} | 123 +++++++++--------- Tests/Utils/MockNetworkConnection.swift | 5 + 6 files changed, 157 insertions(+), 133 deletions(-) rename Tests/{Legacy Tests/LegacyTestEmitter.swift => TestEmitter.swift} (84%) diff --git a/Examples b/Examples index 511a65329..79e6256e6 160000 --- a/Examples +++ b/Examples @@ -1 +1 @@ -Subproject commit 511a653291dcb237d953a5948086dfc64ebe22c5 +Subproject commit 79e6256e67aef88c3a2d95d5a1afc9c804baee87 diff --git a/Sources/Core/Emitter/Emitter.swift b/Sources/Core/Emitter/Emitter.swift index 3390a24ea..a1d24845c 100644 --- a/Sources/Core/Emitter/Emitter.swift +++ b/Sources/Core/Emitter/Emitter.swift @@ -380,46 +380,43 @@ class Emitter: EmitterEventProcessing { requests.append(request) } } else { - var i = 0 - while i < events.count { - var eventArray: [Payload] = [] - var indexArray: [Int64] = [] - - let iUntil = min(i + bufferOption.rawValue, events.count) - for j in i.. separate requests + if isOversize(payload, byteLimit: byteLimit) { + let request = Request(payload: payload, emitterEventId: emitterEventId, oversize: true) + requests.append(request) } - - // Check if all payloads have been processed - if eventArray.count != 0 { - let request = Request(payloads: eventArray, emitterEventIds: indexArray) + // Events up to this one are oversize -> create request for them + else if isOversize(payload, byteLimit: byteLimit, previousPayloads: eventPayloads) { + let request = Request(payloads: eventPayloads, emitterEventIds: eventIds) requests.append(request) + + // Clear collection and build a new POST + eventPayloads = [] + eventIds = [] + + // Build and store the request + eventPayloads.append(payload) + eventIds.append(emitterEventId) } - i += bufferOption.rawValue + // Add to the list of events for the request + else { + eventPayloads.append(payload) + eventIds.append(emitterEventId) + } + } + + // Check if there are any remaining events not in a request + if !eventPayloads.isEmpty { + let request = Request(payloads: eventPayloads, emitterEventIds: eventIds) + requests.append(request) } } return requests diff --git a/Sources/Snowplow/Emitter/EmitterDefaults.swift b/Sources/Snowplow/Emitter/EmitterDefaults.swift index 98e1e6e9b..7937c1414 100644 --- a/Sources/Snowplow/Emitter/EmitterDefaults.swift +++ b/Sources/Snowplow/Emitter/EmitterDefaults.swift @@ -16,7 +16,7 @@ import Foundation public class EmitterDefaults { public private(set) static var httpMethod: HttpMethodOptions = .post public private(set) static var httpProtocol: ProtocolOptions = .https - public private(set) static var emitRange = 150 + public private(set) static var emitRange = BufferOption.largeGroup.rawValue public private(set) static var emitThreadPoolSize = 15 public private(set) static var byteLimitGet = 40000 public private(set) static var byteLimitPost = 40000 diff --git a/Sources/Snowplow/Network/DefaultNetworkConnection.swift b/Sources/Snowplow/Network/DefaultNetworkConnection.swift index 143339764..9eafd8d2f 100644 --- a/Sources/Snowplow/Network/DefaultNetworkConnection.swift +++ b/Sources/Snowplow/Network/DefaultNetworkConnection.swift @@ -96,47 +96,68 @@ public class DefaultNetworkConnection: NSObject, NetworkConnection { @objc public func sendRequests(_ requests: [Request]) -> [RequestResult] { + let urlRequests = requests.map { _httpMethod == .get ? buildGet($0) : buildPost($0) } + var results: [RequestResult] = [] - - for request in requests { - let urlRequest = _httpMethod == .get - ? buildGet(request) - : buildPost(request) - - dataOperationQueue.addOperation({ - //source: https://forums.developer.apple.com/thread/11519 - var httpResponse: HTTPURLResponse? = nil - var connectionError: Error? = nil - var sem: DispatchSemaphore - - sem = DispatchSemaphore(value: 0) - - URLSession.shared.dataTask(with: urlRequest) { data, urlResponse, error in - connectionError = error - httpResponse = urlResponse as? HTTPURLResponse - sem.signal() - }.resume() - - let _ = sem.wait(timeout: .distantFuture) - var statusCode: NSNumber? - if let httpResponse = httpResponse { statusCode = NSNumber(value: httpResponse.statusCode) } - - let result = RequestResult(statusCode: statusCode, oversize: request.oversize, storeIds: request.emitterEventIds) - if !result.isSuccessful { - logError(message: "Connection error: " + (connectionError?.localizedDescription ?? "-")) - } - - objc_sync_enter(self) + // if there is only one request, make it directly + if requests.count == 1 { + if let request = requests.first, let urlRequest = urlRequests.first { + let result = DefaultNetworkConnection.makeRequest( + request: request, + urlRequest: urlRequest + ) + results.append(result) - objc_sync_exit(self) - }) + } + } + // if there are more than 1 request, use the operation queue + else if requests.count > 1 { + for (request, urlRequest) in zip(requests, urlRequests) { + dataOperationQueue.addOperation({ + let result = DefaultNetworkConnection.makeRequest( + request: request, + urlRequest: urlRequest + ) + + objc_sync_enter(self) + results.append(result) + objc_sync_exit(self) + }) + } + dataOperationQueue.waitUntilAllOperationsAreFinished() } - dataOperationQueue.waitUntilAllOperationsAreFinished() + return results } // MARK: - Private methods + private static func makeRequest(request: Request, urlRequest: URLRequest) -> RequestResult { + //source: https://forums.developer.apple.com/thread/11519 + var httpResponse: HTTPURLResponse? = nil + var connectionError: Error? = nil + var sem: DispatchSemaphore + + sem = DispatchSemaphore(value: 0) + + URLSession.shared.dataTask(with: urlRequest) { data, urlResponse, error in + connectionError = error + httpResponse = urlResponse as? HTTPURLResponse + sem.signal() + }.resume() + + let _ = sem.wait(timeout: .distantFuture) + var statusCode: NSNumber? + if let httpResponse = httpResponse { statusCode = NSNumber(value: httpResponse.statusCode) } + + let result = RequestResult(statusCode: statusCode, oversize: request.oversize, storeIds: request.emitterEventIds) + if !result.isSuccessful { + logError(message: "Connection error: " + (connectionError?.localizedDescription ?? "-")) + } + + return result + } + private func setup() { // Decode url to extract protocol let url = URL(string: _urlString) diff --git a/Tests/Legacy Tests/LegacyTestEmitter.swift b/Tests/TestEmitter.swift similarity index 84% rename from Tests/Legacy Tests/LegacyTestEmitter.swift rename to Tests/TestEmitter.swift index 7940baf44..c4b72d0cb 100644 --- a/Tests/Legacy Tests/LegacyTestEmitter.swift +++ b/Tests/TestEmitter.swift @@ -14,29 +14,9 @@ import XCTest @testable import SnowplowTracker -//class BrokenNetworkConnection: NetworkConnection { -// func sendRequests(_ requests: [Request]) -> [RequestResult] { -// NSException.raise("BrokenNetworkConnection", format: "Fake exception on network connection.") -// return nil -// } -// -// var urlEndpoint: URL? { -// NSException.raise("BrokenNetworkConnection", format: "Fake exception on network connection.") -// return nil -// } -// -// var httpMethod: HttpMethodOptions { -// NSException.raise("BrokenNetworkConnection", format: "Fake exception on network connection.") -// return .get -// } -//} - -//#pragma clang diagnostic push -//#pragma clang diagnostic ignored "-Wdeprecated-declarations" - let TEST_SERVER_EMITTER = "www.notarealurl.com" -class LegacyTestEmitter: XCTestCase { +class TestEmitter: XCTestCase { override func setUp() { super.setUp() Logger.logLevel = .verbose @@ -120,26 +100,12 @@ class LegacyTestEmitter: XCTestCase { // MARK: - Emitting tests -// func testEmitEventWithBrokenNetworkConnectionDoesntFreezeStatus() { -// let networkConnection = SPBrokenNetworkConnection() -// let emitter = self.emitter(with: networkConnection, bufferOption: SPBufferOptionSingle) -// emitter?.addPayload(toBuffer: generatePayloads(1)?.first) -// -// Thread.sleep(forTimeInterval: 1) -// -// XCTAssertFalse(emitter?.getSendingStatus()) -// -// emitter?.flush() -// } - func testEmitSingleGetEventWithSuccess() { let networkConnection = MockNetworkConnection(requestOption: .get, statusCode: 200) let emitter = self.emitter(with: networkConnection, bufferOption: .single) addPayload(generatePayloads(1).first!, emitter) - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(1, networkConnection.previousResults.count) XCTAssertEqual(1, networkConnection.previousResults.first!.count) @@ -154,9 +120,7 @@ class LegacyTestEmitter: XCTestCase { let emitter = self.emitter(with: networkConnection, bufferOption: .single) addPayload(generatePayloads(1).first!, emitter) - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(1, networkConnection.previousResults.count) XCTAssertEqual(1, networkConnection.previousResults.first!.count) @@ -174,11 +138,10 @@ class LegacyTestEmitter: XCTestCase { addPayload(payload, emitter) } - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(0, dbCount(emitter)) + XCTAssertEqual(2, networkConnection.previousResults.count) var totEvents = 0 for results in networkConnection.previousResults { for result in results { @@ -199,9 +162,7 @@ class LegacyTestEmitter: XCTestCase { addPayload(payload, emitter) } - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(2, dbCount(emitter)) for results in networkConnection.previousResults { @@ -219,9 +180,7 @@ class LegacyTestEmitter: XCTestCase { addPayload(generatePayloads(1).first!, emitter) - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(1, networkConnection.previousResults.count) XCTAssertEqual(1, networkConnection.previousResults.first?.count) @@ -241,18 +200,15 @@ class LegacyTestEmitter: XCTestCase { addPayload(payloads[i], emitter) } - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + // wait longer than the stop sending timeout + Thread.sleep(forTimeInterval: 6) XCTAssertEqual(14, dbCount(emitter)) networkConnection.statusCode = 200 let prevSendingCount = networkConnection.sendingCount addPayload(payloads[14], emitter) - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(0, dbCount(emitter)) var totEvents = 0 @@ -282,18 +238,14 @@ class LegacyTestEmitter: XCTestCase { addPayload(payloads[i], emitter) } - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(0, dbCount(emitter)) networkConnection.statusCode = 200 _ = networkConnection.sendingCount addPayload(payloads[14], emitter) - for _ in 0..<10 { - Thread.sleep(forTimeInterval: 1) - } + Thread.sleep(forTimeInterval: 1) XCTAssertEqual(0, dbCount(emitter)) @@ -393,6 +345,56 @@ class LegacyTestEmitter: XCTestCase { flush(emitter) } + + func testNumberOfRequestsMatchesEmitRangeAndOversize() { + let networkConnection = MockNetworkConnection(requestOption: .post, statusCode: 200) + let emitter = self.emitter(with: networkConnection, bufferOption: .single) + emitter.emitRange = 20 + + InternalQueue.sync { emitter.pauseEmit() } + for payload in generatePayloads(20) { + addPayload(payload, emitter) + } + InternalQueue.sync { emitter.resumeEmit() } + + Thread.sleep(forTimeInterval: 0.5) + + // made a single request + XCTAssertEqual(1, networkConnection.sendingCount) + XCTAssertEqual(1, networkConnection.previousRequests.first?.count ?? 0) + + networkConnection.clear() + + InternalQueue.sync { emitter.pauseEmit() } + for payload in generatePayloads(40) { + addPayload(payload, emitter) + } + InternalQueue.sync { emitter.resumeEmit() } + + Thread.sleep(forTimeInterval: 0.5) + + // made two requests one after the other + XCTAssertEqual(2, networkConnection.sendingCount) + XCTAssertEqual(1, networkConnection.previousRequests.map { $0.count }.max()) + + networkConnection.clear() + + // test with oversize requests + emitter.byteLimitPost = 5 + InternalQueue.sync { emitter.pauseEmit() } + for payload in generatePayloads(2) { + addPayload(payload, emitter) + } + InternalQueue.sync { emitter.resumeEmit() } + + Thread.sleep(forTimeInterval: 0.5) + + // made two requests at once + XCTAssertEqual(1, networkConnection.sendingCount) + XCTAssertEqual(2, networkConnection.previousRequests.first?.count ?? 0) + + flush(emitter) + } // MARK: - Emitter builder @@ -436,4 +438,3 @@ class LegacyTestEmitter: XCTestCase { } } } -//#pragma clang diagnostic pop diff --git a/Tests/Utils/MockNetworkConnection.swift b/Tests/Utils/MockNetworkConnection.swift index 220cd5608..ce72ff9f6 100644 --- a/Tests/Utils/MockNetworkConnection.swift +++ b/Tests/Utils/MockNetworkConnection.swift @@ -48,4 +48,9 @@ class MockNetworkConnection: NSObject, NetworkConnection { previousResults.append(requestResults) return requestResults } + + func clear() { + previousRequests = [] + previousResults = [] + } }