Skip to content

Commit

Permalink
Merge pull request #1743 from embrace-io/simplify-delivery-loop
Browse files Browse the repository at this point in the history
Simplify delivery loop logic
  • Loading branch information
fractalwrench authored Dec 9, 2024
2 parents 9b5b93e + 6c214b9 commit f3c3d7f
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ internal sealed class DeliveryTraceState {
/**
* The delivery loop was started
*/
internal class StartDeliveryLoop(private val loopAlreadyActive: Boolean) : DeliveryTraceState() {
override fun toString(): String = "StartDeliveryLoop loopAlreadyActive=$loopAlreadyActive"
internal object StartDeliveryLoop : DeliveryTraceState() {
override fun toString(): String = "StartDeliveryLoop"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class DeliveryTracer {
}.joinToString("\n")
}

fun onStartDeliveryLoop(sendLoopActive: Boolean) {
events.add(DeliveryTraceState.StartDeliveryLoop(sendLoopActive))
fun onStartDeliveryLoop() {
events.add(DeliveryTraceState.StartDeliveryLoop)
}

fun onPayloadQueueCreated(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import io.embrace.android.embracesdk.internal.logging.InternalErrorType
import io.embrace.android.embracesdk.internal.worker.BackgroundWorker
import java.io.InputStream
import java.util.Collections
import java.util.LinkedList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
Expand All @@ -32,14 +31,11 @@ class SchedulingServiceImpl(

private val blockedEndpoints: MutableMap<Endpoint, Long> = ConcurrentHashMap()
private val hasNetwork = AtomicBoolean(true)
private val sendLoopActive = AtomicBoolean(false)
private val queryForPayloads = AtomicBoolean(true)
private val activeSends: MutableSet<StoredTelemetryMetadata> = Collections.newSetFromMap(ConcurrentHashMap())
private val deleteInProgress: MutableSet<StoredTelemetryMetadata> = Collections.newSetFromMap(ConcurrentHashMap())
private val payloadsToRetry: MutableMap<StoredTelemetryMetadata, RetryInstance> = ConcurrentHashMap()

override fun onPayloadIntake() {
queryForPayloads.set(true)
startDeliveryLoop()
}

Expand All @@ -66,13 +62,9 @@ class SchedulingServiceImpl(
private fun startDeliveryLoop() {
// When a payload arrives, check to see if there's already an active job try to deliver payloads
// If not, schedule job. If so, do nothing.
if (sendLoopActive.compareAndSet(false, true)) {
deliveryTracer?.onStartDeliveryLoop(true)
schedulingWorker.submit {
deliveryLoop()
}
} else {
deliveryTracer?.onStartDeliveryLoop(false)
deliveryTracer?.onStartDeliveryLoop()
schedulingWorker.submit {
deliveryLoop()
}
}

Expand All @@ -82,9 +74,8 @@ class SchedulingServiceImpl(
private fun deliveryLoop() {
val failedPayloads = mutableSetOf<StoredTelemetryMetadata>()
try {
var deliveryQueue = createPayloadQueue()
while (deliveryQueue.isNotEmpty() && readyToSend()) {
val payload = deliveryQueue.poll()
var payload: StoredTelemetryMetadata? = findNextPayload()
while (payload != null && readyToSend()) {
runCatching {
payload?.run {
if (shouldSendPayload() && readyToSend()) {
Expand All @@ -109,23 +100,19 @@ class SchedulingServiceImpl(
throwable = IllegalStateException("Failed to queue payload with file name $fileName", error)
)
}

if (queryForPayloads.compareAndSet(true, false) || deliveryQueue.isEmpty()) {
deliveryQueue = createPayloadQueue(failedPayloads)
}
payload = findNextPayload(failedPayloads)
}
} catch (t: Throwable) {
// This block catches unhandled errors resulting from the recreation of a queue of payloads to be delivered
// When this type of error encountered, we abort the delivery loop and wait for the next retry or intake
// to retry any pending payloads.
logger.trackInternalError(InternalErrorType.DELIVERY_SCHEDULING_FAIL, t)
} finally {
sendLoopActive.set(false)
scheduleDeliveryLoopForNextRetry()
}
}

private fun createPayloadQueue(exclude: Set<StoredTelemetryMetadata> = emptySet()): LinkedList<StoredTelemetryMetadata> {
private fun findNextPayload(exclude: Set<StoredTelemetryMetadata> = emptySet()): StoredTelemetryMetadata? {
val payloadsByPriority = storageService.getPayloadsByPriority()
val payloadsToSend = payloadsByPriority
.filter { it.shouldSendPayload() && !exclude.contains(it) }
Expand All @@ -134,7 +121,7 @@ class SchedulingServiceImpl(
payloadsByPriority,
payloadsToSend,
)
return LinkedList(payloadsToSend)
return payloadsToSend.firstOrNull()
}

private fun queueDelivery(payload: StoredTelemetryMetadata): Future<ExecutionResult> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ internal class SchedulingServiceImplTest {
schedulingExecutor.blockingMode = true
schedulingService.onPayloadIntake()
schedulingService.onPayloadIntake()
assertEquals(1, schedulingExecutor.submitCount)
assertEquals(2, schedulingExecutor.submitCount)
}

@Test
Expand Down

0 comments on commit f3c3d7f

Please sign in to comment.