Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload module improvements #135

Merged
merged 6 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,11 @@
BlueprintName = "EmbraceCoreTests"
ReferencedContainer = "container:">
</BuildableReference>
<SkippedTests>
<Test
Identifier = "EmbraceCoreTests">
</Test>
</SkippedTests>
</TestableReference>
<TestableReference
skipped = "NO">
Expand Down
26 changes: 1 addition & 25 deletions Sources/EmbraceUploadInternal/Cache/EmbraceUploadCache.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,7 @@ class EmbraceUploadCache {
/// Removes stale data based on size or date, if they're limited in options.
@discardableResult public func clearStaleDataIfNeeded() throws -> UInt {
let limitDays = options.cacheDaysLimit
let limitSize = options.cacheSizeLimit
let recordsBasedOnDate = limitDays > 0 ? try fetchRecordsToDeleteBasedOnDate(maxDays: limitDays) : []
let recordsBasedOnSize = limitSize > 0 ? try fetchRecordsToDeleteBasedOnSize(maxSize: limitSize) : []

let recordsToDelete = Array(Set(recordsBasedOnDate + recordsBasedOnSize))

let recordsToDelete = limitDays > 0 ? try fetchRecordsToDeleteBasedOnDate(maxDays: limitDays) : []
let deleteCount = recordsToDelete.count

if deleteCount > 0 {
Expand Down Expand Up @@ -163,25 +158,6 @@ class EmbraceUploadCache {
}
}

/// Sorts Upload Cache by descending order and goes through it adding the space taken by each record.
/// Once the __maxSize__ has been reached, all the following record IDs will be returned indicating those need to be deleted.
/// - Parameter maxSize: The maximum allowed size in bytes for the Database.
/// - Returns: An array of IDs of the oldest records which are making the DB go above the target maximum size.
func fetchRecordsToDeleteBasedOnSize(maxSize: UInt) throws -> [String] {
let sqlQuery = """
WITH t AS (SELECT id, date, SUM(LENGTH(data)) OVER (ORDER BY date DESC,id) total_size FROM uploads)
SELECT id FROM t WHERE total_size>=\(maxSize) ORDER BY date DESC;
"""

var result: [String] = []

try dbQueue.read { db in
result = try String.fetchAll(db, sql: sqlQuery)
}

return result
}

/// Fetches all records that should be deleted based on them being older than __maxDays__ days
/// - Parameter db: The database where to pull the data from, assumes the records to be UploadDataRecord.
/// - Parameter maxDays: The maximum allowed days old a record is allowed to be cached.
Expand Down
145 changes: 112 additions & 33 deletions Sources/EmbraceUploadInternal/EmbraceUpload.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,31 @@ public class EmbraceUpload: EmbraceLogUploader {
public private(set) var options: Options
public private(set) var logger: InternalLogger
public private(set) var queue: DispatchQueue
@ThreadSafe
private(set) var isRetryingCache: Bool = false

private let urlSession: URLSession
let cache: EmbraceUploadCache
let urlSession: URLSession
let operationQueue: OperationQueue
var reachabilityMonitor: EmbraceReachabilityMonitor?
let semaphore: DispatchSemaphore
private var reachabilityMonitor: EmbraceReachabilityMonitor?

/// Returns an `EmbraceUpload` instance
/// - Parameters:
/// - options: `EmbraceUpload.Options` instance
/// - logger: `EmbraceConsoleLogger` instance
/// - queue: `DispatchQueue` to be used for all upload operations
public init(options: Options, logger: InternalLogger, queue: DispatchQueue) throws {
public init(
options: Options,
logger: InternalLogger,
queue: DispatchQueue,
semaphore: DispatchSemaphore = .init(value: 2)
) throws {

self.options = options
self.logger = logger
self.queue = queue
self.semaphore = semaphore

cache = try EmbraceUploadCache(options: options.cache, logger: logger)

Expand All @@ -54,25 +63,46 @@ public class EmbraceUpload: EmbraceLogUploader {
/// Attempts to upload all the available cached data.
public func retryCachedData() {
queue.async { [weak self] in
guard let self = self else { return }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • ⚠️ Conditional statements should always return on the next line (conditional_returns_on_newline)

do {
guard let cachedObjects = try self?.cache.fetchAllUploadData() else {
// in place mechanism to not retry sending cache data at the same time
guard !self.isRetryingCache else {
return
}

self.isRetryingCache = true

defer {
// on finishing everything, allow to retry cache (i.e. reconnection)
self.isRetryingCache = false
}

// clear data from cache that shouldn't be retried as it's stale
self.clearCacheFromStaleData()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • ⚠️ Lines should not have trailing whitespace (trailing_whitespace)

// get all the data cached first, is the only thing that could throw
let cachedObjects = try self.cache.fetchAllUploadData()

// create a sempahore to allow only to send two request at a time so we don't
// get throttled by the backend on cases where cache has many failed requests.

for uploadData in cachedObjects {
guard let type = EmbraceUploadType(rawValue: uploadData.type) else {
continue
}
self.semaphore.wait()

self?.uploadData(
self.reUploadData(
id: uploadData.id,
data: uploadData.data,
type: type,
attemptCount: uploadData.attemptCount,
completion: nil)
attemptCount: uploadData.attemptCount
) {
self.semaphore.signal()
}
}
} catch {
self?.logger.debug("Error retrying cached upload data: \(error.localizedDescription)")
self.logger.debug("Error retrying cached upload data: \(error.localizedDescription)")
}
}
}
Expand All @@ -84,7 +114,12 @@ public class EmbraceUpload: EmbraceLogUploader {
/// - completion: Completion block called when the data is successfully cached, or when an `Error` occurs
public func uploadSpans(id: String, data: Data, completion: ((Result<(), Error>) -> Void)?) {
queue.async { [weak self] in
self?.uploadData(id: id, data: data, type: .spans, completion: completion)
self?.uploadData(
id: id,
data: data,
type: .spans,
completion: completion
)
}
}

Expand All @@ -95,7 +130,12 @@ public class EmbraceUpload: EmbraceLogUploader {
/// - completion: Completion block called when the data is successfully cached, or when an `Error` occurs
public func uploadLog(id: String, data: Data, completion: ((Result<(), Error>) -> Void)?) {
queue.async { [weak self] in
self?.uploadData(id: id, data: data, type: .log, completion: completion)
self?.uploadData(
id: id,
data: data,
type: .log,
completion: completion
)
}
}

Expand All @@ -105,6 +145,7 @@ public class EmbraceUpload: EmbraceLogUploader {
data: Data,
type: EmbraceUploadType,
attemptCount: Int = 0,
retryCount: Int? = nil,
completion: ((Result<(), Error>) -> Void)?) {

// validate identifier
Expand All @@ -123,7 +164,7 @@ public class EmbraceUpload: EmbraceLogUploader {
let cacheOperation = BlockOperation { [weak self] in
do {
try self?.cache.saveUploadData(id: id, type: type, data: data)
completion?(.success(()))
completion?(.success(()))
} catch {
self?.logger.debug("Error caching upload data: \(error.localizedDescription)")
completion?(.failure(error))
Expand All @@ -133,23 +174,23 @@ public class EmbraceUpload: EmbraceLogUploader {
// upload operation
let uploadOperation = EmbraceUploadOperation(
urlSession: urlSession,
queue: queue,
metadataOptions: options.metadata,
endpoint: endpoint(for: type),
identifier: id,
data: data,
retryCount: options.redundancy.automaticRetryCount,
retryCount: retryCount ?? options.redundancy.automaticRetryCount,
exponentialBackoffBehavior: options.redundancy.exponentialBackoffBehavior,
attemptCount: attemptCount,
logger: logger) { [weak self] (cancelled, count, error) in

logger: logger) { [weak self] (result, attemptCount) in
self?.queue.async { [weak self] in
self?.handleOperationFinished(
id: id,
type: type,
cancelled: cancelled,
attemptCount: count,
error: error
result: result,
attemptCount: attemptCount
)
self?.cleanCacheFromStaleData()
self?.clearCacheFromStaleData()
}
}

Expand All @@ -159,37 +200,75 @@ public class EmbraceUpload: EmbraceLogUploader {
operationQueue.addOperation(uploadOperation)
}

func reUploadData(id: String,
data: Data,
type: EmbraceUploadType,
attemptCount: Int,
completion: @escaping (() -> Void)) {
let totalPendingRetries = options.redundancy.maximumAmountOfRetries - attemptCount
let retries = min(options.redundancy.automaticRetryCount, totalPendingRetries)

let uploadOperation = EmbraceUploadOperation(
urlSession: urlSession,
queue: queue,
metadataOptions: options.metadata,
endpoint: endpoint(for: type),
identifier: id,
data: data,
retryCount: retries,
exponentialBackoffBehavior: options.redundancy.exponentialBackoffBehavior,
attemptCount: attemptCount,
logger: logger) { [weak self] (result, attemptCount) in
self?.queue.async { [weak self] in
self?.handleOperationFinished(
id: id,
type: type,
result: result,
attemptCount: attemptCount
)
completion()
}
}
operationQueue.addOperation(uploadOperation)
}

private func handleOperationFinished(
id: String,
type: EmbraceUploadType,
cancelled: Bool,
attemptCount: Int,
error: Error?) {

// error?
if cancelled == true || error != nil {
// update attempt count in cache
operationQueue.addOperation { [weak self] in
do {
try self?.cache.updateAttemptCount(id: id, type: type, attemptCount: attemptCount)
} catch {
self?.logger.debug("Error updating cache: \(error.localizedDescription)")
result: EmbraceUploadOperationResult,
attemptCount: Int
) {
switch result {
case .success:
addDeleteUploadDataOperation(id: id, type: type)
case .failure(let isRetriable):
if isRetriable, attemptCount < options.redundancy.maximumAmountOfRetries {
operationQueue.addOperation { [weak self] in
do {
try self?.cache.updateAttemptCount(id: id, type: type, attemptCount: attemptCount)
} catch {
self?.logger.debug("Error updating cache: \(error.localizedDescription)")
}
}
return
}
return

addDeleteUploadDataOperation(id: id, type: type)
}
}

// success -> clear cache
private func addDeleteUploadDataOperation(id: String, type: EmbraceUploadType) {
operationQueue.addOperation { [weak self] in
do {
try self?.cache.deleteUploadData(id: id, type: type)
} catch {
self?.logger.debug("Error deleting cache: \(error.localizedDescription)")
}
}

}

private func cleanCacheFromStaleData() {
private func clearCacheFromStaleData() {
operationQueue.addOperation { [weak self] in
do {
try self?.cache.clearStaleDataIfNeeded()
Expand Down
Loading
Loading