Skip to content

Commit

Permalink
Make RibEvents' MutableSharedFlows buffers configurable.
Browse files Browse the repository at this point in the history
  • Loading branch information
psteiger committed Oct 12, 2024
1 parent d2711d8 commit a7cdef5
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,54 @@ import io.reactivex.Observable
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.rx2.asObservable

public object RibEvents {
private var extraBufferCapacity: Int = Channel.UNLIMITED

private val mutableRouterEvents =
MutableSharedFlow<RibRouterEvent>(0, Channel.UNLIMITED, BufferOverflow.DROP_OLDEST)
private val mutableRibDurationEvents =
MutableSharedFlow<RibActionInfo>(0, Channel.UNLIMITED, BufferOverflow.DROP_OLDEST)
/**
* Sets the extra buffer capacity for [routerEventsFlow] and [ribActionEventsFlow].
*
* This function must be called on the main thread, and before any usage of:
* 1. [routerEventsFlow]
* 2. [routerEvents]
* 3. [ribActionEventsFlow]
* 4. [ribActionEvents]
*/
@JvmStatic
public fun setExtraBufferCapacity(capacity: Int) {
extraBufferCapacity = capacity
}

private val mutableRouterEvents by lazy {
MutableSharedFlow<RibRouterEvent>(0, extraBufferCapacity, BufferOverflow.DROP_OLDEST)
}

private val mutableRibDurationEvents by lazy {
MutableSharedFlow<RibActionInfo>(0, extraBufferCapacity, BufferOverflow.DROP_OLDEST)
}

@JvmStatic
public val routerEvents: Observable<RibRouterEvent> = mutableRouterEvents.asObservable()
public val routerEventsFlow: SharedFlow<RibRouterEvent> by lazy {
mutableRouterEvents.asSharedFlow()
}

@JvmStatic
public val routerEvents: Observable<RibRouterEvent> by lazy {
mutableRouterEvents.asObservable()
}

@JvmStatic
public val ribActionEvents: Observable<RibActionInfo> = mutableRibDurationEvents.asObservable()
public val ribActionEventsFlow: SharedFlow<RibActionInfo> by lazy {
mutableRibDurationEvents.asSharedFlow()
}

@JvmStatic
public val ribActionEvents: Observable<RibActionInfo> by lazy {
mutableRibDurationEvents.asObservable()
}

/** Indicates if [ribActionEvents] will be emitting. */
public var areRibActionEmissionsAllowed: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (C) 2024. Uber Technologies
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.rib.core

import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import org.junit.Test
import org.mockito.kotlin.mock

@OptIn(ExperimentalCoroutinesApi::class)
class RibEventsTest {
@Test
fun setExtraBufferCapacityTest() = runTest {
val extraBufferCapacity = 16
RibEvents.setExtraBufferCapacity(extraBufferCapacity)
val results = mutableListOf<RibRouterEvent>()
backgroundScope.launch { RibEvents.routerEventsFlow.collect(results::add) }
runCurrent()
repeat(32) { RibEvents.emitRouterEvent(RibEventType.ATTACHED, mock(), mock()) }
runCurrent()
assertThat(results.size).isEqualTo(16)
}
}

0 comments on commit a7cdef5

Please sign in to comment.