Skip to content

Commit

Permalink
Make network requests serially in network connection (close #846)
Browse files Browse the repository at this point in the history
  • Loading branch information
matus-tomlein committed Nov 30, 2023
1 parent 7cb7e54 commit 3c8d1d8
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 133 deletions.
2 changes: 1 addition & 1 deletion Examples
71 changes: 34 additions & 37 deletions Sources/Core/Emitter/Emitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -380,46 +380,43 @@ class Emitter: EmitterEventProcessing {
requests.append(request)
}
} else {
var i = 0
while i < events.count {
var eventArray: [Payload] = []
var indexArray: [Int64] = []

let iUntil = min(i + bufferOption.rawValue, events.count)
for j in i..<iUntil {
let event = events[j]

let payload = event.payload
let emitterEventId = event.storeId
addSendingTime(to: payload, timestamp: sendingTime)

if isOversize(payload, byteLimit: byteLimit) {
let request = Request(payload: payload, emitterEventId: emitterEventId, oversize: true)
requests.append(request)
} else if isOversize(payload, byteLimit: byteLimit, previousPayloads: eventArray) {
let request = Request(payloads: eventArray, emitterEventIds: indexArray)
requests.append(request)

// Clear collection and build a new POST
eventArray = []
indexArray = []

// Build and store the request
eventArray.append(payload)
indexArray.append(emitterEventId)
} else {
// Add event to collections
eventArray.append(payload)
indexArray.append(emitterEventId)
}
var eventPayloads: [Payload] = []
var eventIds: [Int64] = []

for event in events {
let payload = event.payload
let emitterEventId = event.storeId
addSendingTime(to: payload, timestamp: sendingTime)

// Oversize event -> separate requests
if isOversize(payload, byteLimit: byteLimit) {
let request = Request(payload: payload, emitterEventId: emitterEventId, oversize: true)
requests.append(request)
}

// Check if all payloads have been processed
if eventArray.count != 0 {
let request = Request(payloads: eventArray, emitterEventIds: indexArray)
// Events up to this one are oversize -> create request for them
else if isOversize(payload, byteLimit: byteLimit, previousPayloads: eventPayloads) {
let request = Request(payloads: eventPayloads, emitterEventIds: eventIds)
requests.append(request)

// Clear collection and build a new POST
eventPayloads = []
eventIds = []

// Build and store the request
eventPayloads.append(payload)
eventIds.append(emitterEventId)
}
i += bufferOption.rawValue
// Add to the list of events for the request
else {
eventPayloads.append(payload)
eventIds.append(emitterEventId)
}
}

// Check if there are any remaining events not in a request
if !eventPayloads.isEmpty {
let request = Request(payloads: eventPayloads, emitterEventIds: eventIds)
requests.append(request)
}
}
return requests
Expand Down
2 changes: 1 addition & 1 deletion Sources/Snowplow/Emitter/EmitterDefaults.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import Foundation
public class EmitterDefaults {
public private(set) static var httpMethod: HttpMethodOptions = .post
public private(set) static var httpProtocol: ProtocolOptions = .https
public private(set) static var emitRange = 150
public private(set) static var emitRange = BufferOption.largeGroup.rawValue
public private(set) static var emitThreadPoolSize = 15
public private(set) static var byteLimitGet = 40000
public private(set) static var byteLimitPost = 40000
Expand Down
87 changes: 54 additions & 33 deletions Sources/Snowplow/Network/DefaultNetworkConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,47 +96,68 @@ public class DefaultNetworkConnection: NSObject, NetworkConnection {

@objc
public func sendRequests(_ requests: [Request]) -> [RequestResult] {
let urlRequests = requests.map { _httpMethod == .get ? buildGet($0) : buildPost($0) }

var results: [RequestResult] = []

for request in requests {
let urlRequest = _httpMethod == .get
? buildGet(request)
: buildPost(request)

dataOperationQueue.addOperation({
//source: https://forums.developer.apple.com/thread/11519
var httpResponse: HTTPURLResponse? = nil
var connectionError: Error? = nil
var sem: DispatchSemaphore

sem = DispatchSemaphore(value: 0)

URLSession.shared.dataTask(with: urlRequest) { data, urlResponse, error in
connectionError = error
httpResponse = urlResponse as? HTTPURLResponse
sem.signal()
}.resume()

let _ = sem.wait(timeout: .distantFuture)
var statusCode: NSNumber?
if let httpResponse = httpResponse { statusCode = NSNumber(value: httpResponse.statusCode) }

let result = RequestResult(statusCode: statusCode, oversize: request.oversize, storeIds: request.emitterEventIds)
if !result.isSuccessful {
logError(message: "Connection error: " + (connectionError?.localizedDescription ?? "-"))
}

objc_sync_enter(self)
// if there is only one request, make it directly
if requests.count == 1 {
if let request = requests.first, let urlRequest = urlRequests.first {
let result = DefaultNetworkConnection.makeRequest(
request: request,
urlRequest: urlRequest
)

results.append(result)
objc_sync_exit(self)
})
}
}
// if there are more than 1 request, use the operation queue
else if requests.count > 1 {
for (request, urlRequest) in zip(requests, urlRequests) {
dataOperationQueue.addOperation({
let result = DefaultNetworkConnection.makeRequest(
request: request,
urlRequest: urlRequest
)

objc_sync_enter(self)
results.append(result)
objc_sync_exit(self)
})
}
dataOperationQueue.waitUntilAllOperationsAreFinished()
}
dataOperationQueue.waitUntilAllOperationsAreFinished()

return results
}

// MARK: - Private methods

private static func makeRequest(request: Request, urlRequest: URLRequest) -> RequestResult {
//source: https://forums.developer.apple.com/thread/11519
var httpResponse: HTTPURLResponse? = nil
var connectionError: Error? = nil
var sem: DispatchSemaphore

sem = DispatchSemaphore(value: 0)

URLSession.shared.dataTask(with: urlRequest) { data, urlResponse, error in
connectionError = error
httpResponse = urlResponse as? HTTPURLResponse
sem.signal()
}.resume()

let _ = sem.wait(timeout: .distantFuture)
var statusCode: NSNumber?
if let httpResponse = httpResponse { statusCode = NSNumber(value: httpResponse.statusCode) }

let result = RequestResult(statusCode: statusCode, oversize: request.oversize, storeIds: request.emitterEventIds)
if !result.isSuccessful {
logError(message: "Connection error: " + (connectionError?.localizedDescription ?? "-"))
}

return result
}

private func setup() {
// Decode url to extract protocol
let url = URL(string: _urlString)
Expand Down
Loading

0 comments on commit 3c8d1d8

Please sign in to comment.