Skip to content

Commit

Permalink
Upload module improvements (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
ArielDemarco authored Dec 4, 2024
1 parent dbcf4b2 commit 7ae30ac
Show file tree
Hide file tree
Showing 13 changed files with 546 additions and 398 deletions.
26 changes: 1 addition & 25 deletions Sources/EmbraceUploadInternal/Cache/EmbraceUploadCache.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,7 @@ class EmbraceUploadCache {
/// Removes stale data based on size or date, if they're limited in options.
@discardableResult public func clearStaleDataIfNeeded() throws -> UInt {
let limitDays = options.cacheDaysLimit
let limitSize = options.cacheSizeLimit
let recordsBasedOnDate = limitDays > 0 ? try fetchRecordsToDeleteBasedOnDate(maxDays: limitDays) : []
let recordsBasedOnSize = limitSize > 0 ? try fetchRecordsToDeleteBasedOnSize(maxSize: limitSize) : []

let recordsToDelete = Array(Set(recordsBasedOnDate + recordsBasedOnSize))

let recordsToDelete = limitDays > 0 ? try fetchRecordsToDeleteBasedOnDate(maxDays: limitDays) : []
let deleteCount = recordsToDelete.count

if deleteCount > 0 {
Expand Down Expand Up @@ -163,25 +158,6 @@ class EmbraceUploadCache {
}
}

/// Sorts Upload Cache by descending order and goes through it adding the space taken by each record.
/// Once the __maxSize__ has been reached, all the following record IDs will be returned indicating those need to be deleted.
/// - Parameter maxSize: The maximum allowed size in bytes for the Database.
/// - Returns: An array of IDs of the oldest records which are making the DB go above the target maximum size.
func fetchRecordsToDeleteBasedOnSize(maxSize: UInt) throws -> [String] {
let sqlQuery = """
WITH t AS (SELECT id, date, SUM(LENGTH(data)) OVER (ORDER BY date DESC,id) total_size FROM uploads)
SELECT id FROM t WHERE total_size>=\(maxSize) ORDER BY date DESC;
"""

var result: [String] = []

try dbQueue.read { db in
result = try String.fetchAll(db, sql: sqlQuery)
}

return result
}

/// Fetches all records that should be deleted based on them being older than __maxDays__ days
/// - Parameter db: The database where to pull the data from, assumes the records to be UploadDataRecord.
/// - Parameter maxDays: The maximum allowed days old a record is allowed to be cached.
Expand Down
145 changes: 112 additions & 33 deletions Sources/EmbraceUploadInternal/EmbraceUpload.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,31 @@ public class EmbraceUpload: EmbraceLogUploader {
public private(set) var options: Options
public private(set) var logger: InternalLogger
public private(set) var queue: DispatchQueue
@ThreadSafe
private(set) var isRetryingCache: Bool = false

private let urlSession: URLSession
let cache: EmbraceUploadCache
let urlSession: URLSession
let operationQueue: OperationQueue
var reachabilityMonitor: EmbraceReachabilityMonitor?
let semaphore: DispatchSemaphore
private var reachabilityMonitor: EmbraceReachabilityMonitor?

/// Returns an `EmbraceUpload` instance
/// - Parameters:
/// - options: `EmbraceUpload.Options` instance
/// - logger: `EmbraceConsoleLogger` instance
/// - queue: `DispatchQueue` to be used for all upload operations
public init(options: Options, logger: InternalLogger, queue: DispatchQueue) throws {
public init(
options: Options,
logger: InternalLogger,
queue: DispatchQueue,
semaphore: DispatchSemaphore = .init(value: 2)
) throws {

self.options = options
self.logger = logger
self.queue = queue
self.semaphore = semaphore

cache = try EmbraceUploadCache(options: options.cache, logger: logger)

Expand All @@ -54,25 +63,46 @@ public class EmbraceUpload: EmbraceLogUploader {
/// Attempts to upload all the available cached data.
public func retryCachedData() {
queue.async { [weak self] in
guard let self = self else { return }
do {
guard let cachedObjects = try self?.cache.fetchAllUploadData() else {
// in place mechanism to not retry sending cache data at the same time
guard !self.isRetryingCache else {
return
}

self.isRetryingCache = true

defer {
// on finishing everything, allow to retry cache (i.e. reconnection)
self.isRetryingCache = false
}

// clear data from cache that shouldn't be retried as it's stale
self.clearCacheFromStaleData()

// get all the data cached first, is the only thing that could throw
let cachedObjects = try self.cache.fetchAllUploadData()

// create a sempahore to allow only to send two request at a time so we don't
// get throttled by the backend on cases where cache has many failed requests.

for uploadData in cachedObjects {
guard let type = EmbraceUploadType(rawValue: uploadData.type) else {
continue
}
self.semaphore.wait()

self?.uploadData(
self.reUploadData(
id: uploadData.id,
data: uploadData.data,
type: type,
attemptCount: uploadData.attemptCount,
completion: nil)
attemptCount: uploadData.attemptCount
) {
self.semaphore.signal()
}
}
} catch {
self?.logger.debug("Error retrying cached upload data: \(error.localizedDescription)")
self.logger.debug("Error retrying cached upload data: \(error.localizedDescription)")
}
}
}
Expand All @@ -84,7 +114,12 @@ public class EmbraceUpload: EmbraceLogUploader {
/// - completion: Completion block called when the data is successfully cached, or when an `Error` occurs
public func uploadSpans(id: String, data: Data, completion: ((Result<(), Error>) -> Void)?) {
queue.async { [weak self] in
self?.uploadData(id: id, data: data, type: .spans, completion: completion)
self?.uploadData(
id: id,
data: data,
type: .spans,
completion: completion
)
}
}

Expand All @@ -95,7 +130,12 @@ public class EmbraceUpload: EmbraceLogUploader {
/// - completion: Completion block called when the data is successfully cached, or when an `Error` occurs
public func uploadLog(id: String, data: Data, completion: ((Result<(), Error>) -> Void)?) {
queue.async { [weak self] in
self?.uploadData(id: id, data: data, type: .log, completion: completion)
self?.uploadData(
id: id,
data: data,
type: .log,
completion: completion
)
}
}

Expand All @@ -105,6 +145,7 @@ public class EmbraceUpload: EmbraceLogUploader {
data: Data,
type: EmbraceUploadType,
attemptCount: Int = 0,
retryCount: Int? = nil,
completion: ((Result<(), Error>) -> Void)?) {

// validate identifier
Expand All @@ -123,7 +164,7 @@ public class EmbraceUpload: EmbraceLogUploader {
let cacheOperation = BlockOperation { [weak self] in
do {
try self?.cache.saveUploadData(id: id, type: type, data: data)
completion?(.success(()))
completion?(.success(()))
} catch {
self?.logger.debug("Error caching upload data: \(error.localizedDescription)")
completion?(.failure(error))
Expand All @@ -133,23 +174,23 @@ public class EmbraceUpload: EmbraceLogUploader {
// upload operation
let uploadOperation = EmbraceUploadOperation(
urlSession: urlSession,
queue: queue,
metadataOptions: options.metadata,
endpoint: endpoint(for: type),
identifier: id,
data: data,
retryCount: options.redundancy.automaticRetryCount,
retryCount: retryCount ?? options.redundancy.automaticRetryCount,
exponentialBackoffBehavior: options.redundancy.exponentialBackoffBehavior,
attemptCount: attemptCount,
logger: logger) { [weak self] (cancelled, count, error) in

logger: logger) { [weak self] (result, attemptCount) in
self?.queue.async { [weak self] in
self?.handleOperationFinished(
id: id,
type: type,
cancelled: cancelled,
attemptCount: count,
error: error
result: result,
attemptCount: attemptCount
)
self?.cleanCacheFromStaleData()
self?.clearCacheFromStaleData()
}
}

Expand All @@ -159,37 +200,75 @@ public class EmbraceUpload: EmbraceLogUploader {
operationQueue.addOperation(uploadOperation)
}

func reUploadData(id: String,
data: Data,
type: EmbraceUploadType,
attemptCount: Int,
completion: @escaping (() -> Void)) {
let totalPendingRetries = options.redundancy.maximumAmountOfRetries - attemptCount
let retries = min(options.redundancy.automaticRetryCount, totalPendingRetries)

let uploadOperation = EmbraceUploadOperation(
urlSession: urlSession,
queue: queue,
metadataOptions: options.metadata,
endpoint: endpoint(for: type),
identifier: id,
data: data,
retryCount: retries,
exponentialBackoffBehavior: options.redundancy.exponentialBackoffBehavior,
attemptCount: attemptCount,
logger: logger) { [weak self] (result, attemptCount) in
self?.queue.async { [weak self] in
self?.handleOperationFinished(
id: id,
type: type,
result: result,
attemptCount: attemptCount
)
completion()
}
}
operationQueue.addOperation(uploadOperation)
}

private func handleOperationFinished(
id: String,
type: EmbraceUploadType,
cancelled: Bool,
attemptCount: Int,
error: Error?) {

// error?
if cancelled == true || error != nil {
// update attempt count in cache
operationQueue.addOperation { [weak self] in
do {
try self?.cache.updateAttemptCount(id: id, type: type, attemptCount: attemptCount)
} catch {
self?.logger.debug("Error updating cache: \(error.localizedDescription)")
result: EmbraceUploadOperationResult,
attemptCount: Int
) {
switch result {
case .success:
addDeleteUploadDataOperation(id: id, type: type)
case .failure(let isRetriable):
if isRetriable, attemptCount < options.redundancy.maximumAmountOfRetries {
operationQueue.addOperation { [weak self] in
do {
try self?.cache.updateAttemptCount(id: id, type: type, attemptCount: attemptCount)
} catch {
self?.logger.debug("Error updating cache: \(error.localizedDescription)")
}
}
return
}
return

addDeleteUploadDataOperation(id: id, type: type)
}
}

// success -> clear cache
private func addDeleteUploadDataOperation(id: String, type: EmbraceUploadType) {
operationQueue.addOperation { [weak self] in
do {
try self?.cache.deleteUploadData(id: id, type: type)
} catch {
self?.logger.debug("Error deleting cache: \(error.localizedDescription)")
}
}

}

private func cleanCacheFromStaleData() {
private func clearCacheFromStaleData() {
operationQueue.addOperation { [weak self] in
do {
try self?.cache.clearStaleDataIfNeeded()
Expand Down
Loading

0 comments on commit 7ae30ac

Please sign in to comment.