diff --git a/gradle.properties b/gradle.properties index 4452c98..2935a36 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=1.5.15 +version=2.0.0 GROUP=com.spotify.mobius diff --git a/gradle/binary_compatibility.gradle b/gradle/binary_compatibility.gradle index d00d8e9..d4dd7d8 100644 --- a/gradle/binary_compatibility.gradle +++ b/gradle/binary_compatibility.gradle @@ -15,7 +15,7 @@ buildscript { } File baselineJar = null -def baselineVersion = "1.5.14" +def baselineVersion = "2.0.0" def projectGroup = project.group def projectName = project.name diff --git a/mobius-coroutines/src/main/java/com/spotify/mobius/coroutines/CoroutinesSubtypeEffectHandlerBuilder.kt b/mobius-coroutines/src/main/java/com/spotify/mobius/coroutines/CoroutinesSubtypeEffectHandlerBuilder.kt index 23dc080..82de054 100644 --- a/mobius-coroutines/src/main/java/com/spotify/mobius/coroutines/CoroutinesSubtypeEffectHandlerBuilder.kt +++ b/mobius-coroutines/src/main/java/com/spotify/mobius/coroutines/CoroutinesSubtypeEffectHandlerBuilder.kt @@ -2,10 +2,18 @@ package com.spotify.mobius.coroutines import com.spotify.mobius.Connectable import com.spotify.mobius.Connection +import com.spotify.mobius.coroutines.CoroutinesSubtypeEffectHandlerBuilder.EffectsHandler +import com.spotify.mobius.coroutines.CoroutinesSubtypeEffectHandlerBuilder.ExecutionPolicy.RunSequentially import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.flow +import java.util.concurrent.ConcurrentHashMap import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KClass /** @@ -23,68 +31,319 @@ import kotlin.reflect.KClass * instances of the builder are mutable and not thread-safe. */ class CoroutinesSubtypeEffectHandlerBuilder { - private val effectHandlersMap = mutableMapOf, suspend (F) -> Flow>() + private val effectsHandlersMap = mutableMapOf, EffectsHandler>() + /** + * Adds an "action lambda" for handling effects of a given type. The action will be invoked once + * for every received effect object that extends the given class. + * + *

Adding handlers for two effect classes where one is a super-class of the other is + * considered a collision and is not allowed. Registering the same class twice is also + * considered a collision. + * + * @param G the class to handle + * @param executionPolicy the [ExecutionPolicy] to use when running effects of the given type + * @param action the action that should be invoked for the effect + * @return this builder + * @throws IllegalStateException if there is a handler collision + */ inline fun addAction( + executionPolicy: ExecutionPolicy = RunSequentially(), crossinline action: suspend () -> Unit - ) = addFlowProducer { + ) = addEffectHandler(executionPolicy) { _, _ -> action.invoke() - flowOf() } + /** + * Adds a "consumer lambda" for handling effects of a given type. The consumer will be invoked + * once for every received effect object that extends the given class. + * + *

Adding handlers for two effect classes where one is a super-class of the other is + * considered a collision and is not allowed. Registering the same class twice is also + * considered a collision. + * + * @param G the class to handle + * @param executionPolicy the [ExecutionPolicy] to use when running effects of the given type + * @param consumer the consumer that should be invoked for the effect + * @return this builder + * @throws IllegalStateException if there is a handler collision + */ inline fun addConsumer( + executionPolicy: ExecutionPolicy = RunSequentially(), crossinline consumer: suspend (G) -> Unit - ) = addFlowProducer { effect -> + ) = addEffectHandler(executionPolicy) { effect, _ -> consumer.invoke(effect) - flowOf() } + /** + * Adds a "producer lambda" for handling effects of a given type. The producer will be invoked + * once for every received effect object that extends the given class. The returned event will + * be forwarded to the Mobius loop. + * + *

Adding handlers for two effect classes where one is a super-class of the other is + * considered a collision and is not allowed. Registering the same class twice is also + * considered a collision. + * + * @param G the class to handle + * @param executionPolicy the [ExecutionPolicy] to use when running effects of the given type + * @param producer the producer that should be invoked for the effect + * @return this builder + * @throws IllegalStateException if there is a handler collision + */ inline fun addProducer( + executionPolicy: ExecutionPolicy = RunSequentially(), crossinline producer: suspend () -> E - ) = addFlowProducer { + ) = addEffectHandler(executionPolicy) { _, eventsChannel -> val event = producer.invoke() - flowOf(event) + eventsChannel.send(event) } + /** + * Adds a "function lambda" for handling effects of a given type. The function will be invoked + * once for every received effect object that extends the given class. The returned event will + * be forwarded to the Mobius loop. + * + *

Adding handlers for two effect classes where one is a super-class of the other is + * considered a collision and is not allowed. Registering the same class twice is also + * considered a collision. + * + * @param G the class to handle + * @param executionPolicy the [ExecutionPolicy] to use when running effects of the given type + * @param function the function that should be invoked for the effect + * @return this builder + * @throws IllegalStateException if there is a handler collision + */ inline fun addFunction( + executionPolicy: ExecutionPolicy = RunSequentially(), crossinline function: suspend (G) -> E - ): CoroutinesSubtypeEffectHandlerBuilder = addFlowProducer { effect -> + ) = addEffectHandler(executionPolicy) { effect, eventsChannel -> val event = function.invoke(effect) - flowOf(event) + eventsChannel.send(event) } + /** + * Adds a "flow collector function lambda" for handling effects of a given type. A flow will be created and + * the flow collector function will be invoked once for every received effect object that extends the given class. + * The emitted events will be forwarded to the Mobius loop. + * + *

Adding handlers for two effect classes where one is a super-class of the other is + * considered a collision and is not allowed. Registering the same class twice is also + * considered a collision. + * + * @param G the class to handle + * @param executionPolicy the [ExecutionPolicy] to use when running effects of the given type + * @param flowCollectorFunction the function that should be invoked for the effect + * @return this builder + * @throws IllegalStateException if there is a handler collision + */ + inline fun addFlow( + executionPolicy: ExecutionPolicy = RunSequentially(), + crossinline flowCollectorFunction: suspend FlowCollector.(G) -> Unit + ) = addEffectHandler(executionPolicy) { effect, eventsChannel -> + flow { flowCollectorFunction(effect) } + .collect { event -> eventsChannel.send(event) } + } + + /** + * Adds a "flow producer lambda" for handling effects of a given type. The flow producer function will be invoked + * once for every received effect object that extends the given class. The emitted events + * will be forwarded to the Mobius loop. + * + *

Adding handlers for two effect classes where one is a super-class of the other is + * considered a collision and is not allowed. Registering the same class twice is also + * considered a collision. + * + * @param G the class to handle + * @param executionPolicy the [ExecutionPolicy] to use when running effects of the given type + * @param function the function that should be invoked for the effect to create the flow + * @return this builder + * @throws IllegalStateException if there is a handler collision + */ inline fun addFlowProducer( + executionPolicy: ExecutionPolicy = RunSequentially(), crossinline function: suspend (G) -> Flow + ) = addEffectHandler(executionPolicy) { effect, eventsChannel -> + function.invoke(effect) + .collect { event -> eventsChannel.send(event) } + } + + /** + * Adds an [EffectHandler] for handling effects of a given type. The [EffectHandler.handleEffect] function will + * be invoked once for every received effect object that extends the given class. TThe events sent to + * the eventsChannel will be forwarded to the Mobius loop. + * + *

Adding handlers for two effect classes where one is a super-class of the other is + * considered a collision and is not allowed. Registering the same class twice is also + * considered a collision. + * + * @param G the class to handle + * @param executionPolicy the [ExecutionPolicy] to use when running effects of the given type + * @param effectHandler the [EffectHandler] that should be invoked for the effect + * @return this builder + * @throws IllegalStateException if there is a handler collision + */ + inline fun addEffectHandler( + executionPolicy: ExecutionPolicy = RunSequentially(), + effectHandler: EffectHandler, ): CoroutinesSubtypeEffectHandlerBuilder { - addEffectHandler(G::class) { effect -> - function.invoke(effect as G) - } + addEffectHandler(G::class, executionPolicy.createEffectsHandler(effectHandler)) return this } - fun addEffectHandler(kClass: KClass, function: suspend (F) -> Flow) { - val previousValue = effectHandlersMap.put(kClass, function) + /** + * Adds an [EffectsHandler] for handling effects of a given type. The [EffectsHandler.handleEffects] function will + * be invoked only once, when the first effect that extends the given class is emitted. All effects from the + * Mobius loop, will be forwarded to the effectsChannel. The events sent to the eventsChannel will be forwarded + * back to the Mobius loop. + * + *

Adding handlers for two effect classes where one is a super-class of the other is + * considered a collision and is not allowed. Registering the same class twice is also + * considered a collision. + * + * @param kClass the class to handle + * @param effectsHandler the [EffectsHandler] that should be invoked for all the effects + * @return this builder + * @throws IllegalStateException if there is a handler collision + */ + fun addEffectHandler(kClass: KClass, effectsHandler: EffectsHandler) { + val previousValue = effectsHandlersMap.put(kClass, effectsHandler) if (previousValue != null) error("Trying to add more than one handler for the effect ${kClass.simpleName}") } - fun build(coroutineContext: CoroutineContext) = build(CoroutineScope(coroutineContext)) + /** + * Creates a [Connectable] to be used as an effect handler. It is backed by an internal [CoroutineScope] created + * with the given [CoroutineContext]. All coroutines will be canceled when the [Connection.dispose] method + * is called in a [Connection] created by this [Connectable]. + * + * @param coroutineContext the context where the effects will run. + * @return a [Connectable] to be used as an effect handler. + * */ + fun build(coroutineContext: CoroutineContext = EmptyCoroutineContext) = Connectable { eventConsumer -> + val scope = CoroutineScope(coroutineContext) + val eventsChannel = Channel() + val subEffectChannels = ConcurrentHashMap, Channel>() + + // Connects the eventConsumer + scope.launch { + for (event in eventsChannel) { + if (isActive) eventConsumer.accept(event) + } + } - private fun build(scope: CoroutineScope) = Connectable { eventConsumer -> object : Connection { override fun accept(effect: F) { scope.launch { - if (scope.isActive) { - val effectHandler = effectHandlersMap[effect::class] ?: error("No effectHandler for $effect") - effectHandler.invoke(effect).collect { - if (scope.isActive) eventConsumer.accept(it) + // Creates an effectChannel if this is the first time the effect is processed + val subEffectChannel = subEffectChannels.computeIfAbsent(effect::class) { + val subEffectChannel = Channel() + val effectHandler = + effectsHandlersMap[effect::class] ?: error("No effectHandler for $effect") + // Connects the effectHandler if this is the first time the effect is processed + scope.launch { + if (isActive) effectHandler.handleEffects(subEffectChannel, eventsChannel) } + subEffectChannel } + + if (isActive) subEffectChannel.send(effect) } } override fun dispose() { - scope.cancel(CancellationException("Effect Handler disposed")) + scope.cancel("Effect Handler disposed") + eventsChannel.close() + subEffectChannels.forEachValue(1) { it.close() } } } } + + /** + * An execution policy defines how effects of the same type are executed. It is used to create + * an [EffectsHandler] from an [EffectHandler] implementing its own concurrency execution policy. + * */ + @Suppress("UNCHECKED_CAST") + fun interface ExecutionPolicy { + + /** + * Creates an [EffectsHandler] from an [EffectHandler] implementing its own concurrency execution policy. + * + * @param effectHandler the [EffectHandler] to use when handling single effects of type [G] + * @return an [EffectsHandler] that handles all effects of type [G] + * */ + fun createEffectsHandler(effectHandler: EffectHandler): EffectsHandler + + /** + * Implementation of [ExecutionPolicy] where all effects of the same type wait for the previous one + * to finish executing before being executed. + * */ + class RunSequentially : ExecutionPolicy { + override fun createEffectsHandler(effectHandler: EffectHandler) = + EffectsHandler { effectChannel, eventsChannel -> + for (effect in effectChannel) { + effectHandler.handleEffect(effect as G, eventsChannel) + } + } + } + + /** + * Implementation of [ExecutionPolicy] where all effects of the same type are executed immediately + * and concurrently. + * */ + class RunConcurrently : ExecutionPolicy { + override fun createEffectsHandler(effectHandler: EffectHandler) = + EffectsHandler { effectChannel, eventsChannel -> + coroutineScope { + for (effect in effectChannel) { + launch { effectHandler.handleEffect(effect as G, eventsChannel) } + } + } + } + } + + /** + * Implementation of [ExecutionPolicy] where a new effect cancels the execution of any previously running + * effect of the same type and start executing immediately. + * */ + class CancelPrevious : ExecutionPolicy { + override fun createEffectsHandler(effectHandler: EffectHandler) = + EffectsHandler { effectChannel, eventsChannel -> + coroutineScope { + var currentJob: Job = Job() + for (effect in effectChannel) { + currentJob.cancel() + currentJob = launch { effectHandler.handleEffect(effect as G, eventsChannel) } + } + } + } + } + } + + /** + * An [EffectHandler] is a function that handles a single effect of type [F] and sends events of type [E]. + * It is used in conjunction with an [ExecutionPolicy] to create an [EffectsHandler] for handling all effects of + * the same type. + * */ + fun interface EffectHandler { + /** + * Handles a single effect of type [F] and sends events of type [E]. + * + * + * @param effect the effect to handle + * @param eventsChannel the channel where the events should be sent + * */ + suspend fun handleEffect(effect: F, eventsChannel: SendChannel) + } + + /** + * An [EffectsHandler] is a function that handles all effects of type [F] and sends events of type [E]. + * */ + fun interface EffectsHandler { + /** + * Handles all effects of type [F] and sends events of type [E]. + * + * @param effectsChannel the channel where the effects are received + * @param eventsChannel the channel where the events should be sent + * */ + suspend fun handleEffects(effectsChannel: ReceiveChannel, eventsChannel: SendChannel) + } } diff --git a/mobius-coroutines/src/main/java/com/spotify/mobius/coroutines/MobiusCoroutines.kt b/mobius-coroutines/src/main/java/com/spotify/mobius/coroutines/MobiusCoroutines.kt index 4dd85d9..97d2941 100644 --- a/mobius-coroutines/src/main/java/com/spotify/mobius/coroutines/MobiusCoroutines.kt +++ b/mobius-coroutines/src/main/java/com/spotify/mobius/coroutines/MobiusCoroutines.kt @@ -1,12 +1,42 @@ package com.spotify.mobius.coroutines +import com.spotify.mobius.Connectable +import com.spotify.mobius.Connection import com.spotify.mobius.MobiusLoop +import com.spotify.mobius.functions.Consumer +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext /** Factory methods for wrapping Mobius core classes in coroutines transformers. */ interface MobiusCoroutines { companion object { + /** + * Creates a [Connectable] holding a scope to use as a simple effect handler. + * Each effect will be launched in a new coroutine concurrently. + * + * @param the effect type + * @param the event type + */ + fun effectHandler( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + onEffect: suspend (effect: F, eventConsumer: Consumer) -> Unit + ) = Connectable { eventConsumer -> + val scope = CoroutineScope(coroutineContext) + object : Connection { + override fun accept(value: F) { + scope.launch { onEffect.invoke(value, eventConsumer) } + } + + override fun dispose() { + scope.cancel("Effect Handler disposed") + } + } + } + /** * Create a [CoroutinesSubtypeEffectHandlerBuilder] for handling effects based on their type. * diff --git a/mobius-coroutines/src/test/java/com/spotify/mobius/coroutines/CoroutinesSubtypeEffectHandlerBuilderTest.kt b/mobius-coroutines/src/test/java/com/spotify/mobius/coroutines/CoroutinesSubtypeEffectHandlerBuilderTest.kt index 6457f8e..ef79a39 100644 --- a/mobius-coroutines/src/test/java/com/spotify/mobius/coroutines/CoroutinesSubtypeEffectHandlerBuilderTest.kt +++ b/mobius-coroutines/src/test/java/com/spotify/mobius/coroutines/CoroutinesSubtypeEffectHandlerBuilderTest.kt @@ -1,19 +1,16 @@ package com.spotify.mobius.coroutines import com.google.common.truth.Truth.assertThat +import com.spotify.mobius.coroutines.CoroutinesSubtypeEffectHandlerBuilder.ExecutionPolicy import com.spotify.mobius.coroutines.MobiusCoroutines.Companion.subtypeEffectHandler import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.test.StandardTestDispatcher -import kotlinx.coroutines.test.advanceTimeBy -import kotlinx.coroutines.test.advanceUntilIdle -import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.test.* import org.junit.Assert.assertThrows import org.junit.Test -@ExperimentalCoroutinesApi +@OptIn(ExperimentalCoroutinesApi::class) class CoroutinesSubtypeEffectHandlerBuilderTest { @Test @@ -41,7 +38,7 @@ class CoroutinesSubtypeEffectHandlerBuilderTest { runTest { val effectHandler = subtypeEffectHandler() - effectHandler.build(coroutineContext) + effectHandler.build(UnconfinedTestDispatcher(testScheduler)) .connect { } .accept(Effect.Simple) advanceUntilIdle() @@ -60,7 +57,8 @@ class CoroutinesSubtypeEffectHandlerBuilderTest { val effectHandler = subtypeEffectHandler() .addAction { actionCalled = true } - effectHandler.build(coroutineContext) + + effectHandler.build(UnconfinedTestDispatcher(testScheduler)) .connect { } .accept(Effect.Simple) advanceUntilIdle() @@ -80,7 +78,7 @@ class CoroutinesSubtypeEffectHandlerBuilderTest { assertThrows(RuntimeException::class.java) { runTest { - effectHandler.build(coroutineContext) + effectHandler.build(UnconfinedTestDispatcher(testScheduler)) .connect { } .accept(Effect.Simple) advanceUntilIdle() @@ -99,7 +97,7 @@ class CoroutinesSubtypeEffectHandlerBuilderTest { val effectHandler = subtypeEffectHandler() .addConsumer { effectConsumed = it } - effectHandler.build(coroutineContext) + effectHandler.build(UnconfinedTestDispatcher(testScheduler)) .connect { } .accept(Effect.SingleValue("Effect to consume")) advanceUntilIdle() @@ -123,7 +121,7 @@ class CoroutinesSubtypeEffectHandlerBuilderTest { Event.SingleValue("Produced event") } - effectHandler.build(coroutineContext) + effectHandler.build(UnconfinedTestDispatcher(testScheduler)) .connect { eventProduced = it } .accept(Effect.Simple) advanceUntilIdle() @@ -148,7 +146,7 @@ class CoroutinesSubtypeEffectHandlerBuilderTest { Event.SingleValue("Produced event") } - effectHandler.build(coroutineContext) + effectHandler.build(UnconfinedTestDispatcher(testScheduler)) .connect { eventProduced = it } .accept(Effect.SingleValue("Effect to produce event")) advanceUntilIdle() @@ -157,6 +155,35 @@ class CoroutinesSubtypeEffectHandlerBuilderTest { assertThat(eventProduced).isEqualTo(Event.SingleValue("Produced event")) } + @Test + @Requirement( + given = "A connectable with a flow as effect handler", + `when` = "a matching effect is produced", + then = "the effect is consumed successfully " + + "AND all the produced events are propagated" + ) + fun flowEffectHandler() = runTest { + val eventsProduced = mutableListOf() + var effectConsumed: Effect? = null + val effectHandler = subtypeEffectHandler() + .addFlow { effect -> + effectConsumed = effect + effect.tokens.forEach { token -> emit(Event.SingleValue(token)) } + } + + effectHandler.build(UnconfinedTestDispatcher(testScheduler)) + .connect { eventsProduced.add(it) } + .accept(Effect.ValueList(listOf("token1", "token2", "token3"))) + advanceUntilIdle() + + assertThat(effectConsumed).isEqualTo(Effect.ValueList(listOf("token1", "token2", "token3"))) + assertThat(eventsProduced).containsExactly( + Event.SingleValue("token1"), + Event.SingleValue("token2"), + Event.SingleValue("token3"), + ) + } + @Test @Requirement( given = "A connectable with a flow producer as effect handler", @@ -175,7 +202,7 @@ class CoroutinesSubtypeEffectHandlerBuilderTest { } } - effectHandler.build(coroutineContext) + effectHandler.build(UnconfinedTestDispatcher(testScheduler)) .connect { eventsProduced.add(it) } .accept(Effect.ValueList(listOf("token1", "token2", "token3"))) advanceUntilIdle() @@ -204,8 +231,9 @@ class CoroutinesSubtypeEffectHandlerBuilderTest { effectFinished = true } - val connection = effectHandler.build(StandardTestDispatcher(testScheduler) + Job()) + val connection = effectHandler.build(StandardTestDispatcher(testScheduler)) .connect { } + connection.accept(Effect.Simple) advanceTimeBy(500) connection.dispose() @@ -215,10 +243,120 @@ class CoroutinesSubtypeEffectHandlerBuilderTest { assertThat(effectFinished).isFalse() } + @Test + @Requirement( + given = "An effect handler using RunSequentially cancellation policy", + `when` = "several matching effects are produced", + then = "all the effect are started successfully" + + "AND the effects are consumed sequentially" + ) + fun processEffectHandlerSequentially() = runTest { + val effectsConsumed = mutableListOf() + val effectHandler = subtypeEffectHandler() + .addConsumer(executionPolicy = ExecutionPolicy.RunSequentially()) { effect -> + delay(effect.delayMillis) + effectsConsumed.add(effect) + } + + + val connection = effectHandler.build(StandardTestDispatcher(testScheduler)) + .connect { } + connection.accept(Effect.DelayAction(300)) + connection.accept(Effect.DelayAction(200)) + connection.accept(Effect.DelayAction(100)) + advanceUntilIdle() + + assertThat(effectsConsumed).containsExactly( + Effect.DelayAction(300), + Effect.DelayAction(200), + Effect.DelayAction(100) + ).inOrder() + } + + @Test + @Requirement( + given = "An effect handler using RunConcurrently cancellation policy", + `when` = "several matching effects are produced", + then = "all the effect are started successfully" + + "AND the effects are consumed concurrently" + ) + fun processEffectHandlerConcurrently() = runTest { + val effectsConsumed = mutableListOf() + val effectHandler = subtypeEffectHandler() + .addConsumer(executionPolicy = ExecutionPolicy.RunConcurrently()) { effect -> + delay(effect.delayMillis) + effectsConsumed.add(effect) + } + + val connection = effectHandler.build(StandardTestDispatcher(testScheduler)) + .connect { } + connection.accept(Effect.DelayAction(300)) + connection.accept(Effect.DelayAction(200)) + connection.accept(Effect.DelayAction(100)) + advanceUntilIdle() + + assertThat(effectsConsumed).containsExactly( + Effect.DelayAction(100), + Effect.DelayAction(200), + Effect.DelayAction(300) + ).inOrder() + } + + @Test + @Requirement( + given = "An effect handler using CancelPrevious cancellation policy", + `when` = "several matching effects are produced", + then = "all the effect are started successfully" + + "AND new effects cancel previous effects while running" + ) + fun processEffectHandlerCancelPrevious() = runTest { + val effectsStarted = mutableListOf() + val effectsFinished = mutableListOf() + val effectHandler = subtypeEffectHandler() + .addConsumer(executionPolicy = ExecutionPolicy.CancelPrevious()) { effect -> + effectsStarted.add(effect) + delay(effect.delayMillis) + effectsFinished.add(effect) + } + + val connection = effectHandler.build(StandardTestDispatcher(testScheduler)) + .connect { } + + connection.accept(Effect.DelayAction(300)) + advanceTimeBy(50) + assertThat(effectsStarted).containsExactly( + Effect.DelayAction(300), + ) + assertThat(effectsFinished).isEmpty() + + connection.accept(Effect.DelayAction(200)) + advanceTimeBy(210) + assertThat(effectsStarted).containsExactly( + Effect.DelayAction(300), + Effect.DelayAction(200), + ).inOrder() + assertThat(effectsFinished).containsExactly( + Effect.DelayAction(200), + ) + + connection.accept(Effect.DelayAction(100)) + advanceUntilIdle() + assertThat(effectsStarted).containsExactly( + Effect.DelayAction(300), + Effect.DelayAction(200), + Effect.DelayAction(100) + ).inOrder() + assertThat(effectsFinished).containsExactly( + Effect.DelayAction(200), + Effect.DelayAction(100), + ).inOrder() + } + private sealed interface Effect { data object Simple : Effect data class SingleValue(val id: String) : Effect data class ValueList(val tokens: List) : Effect + data class DelayAction(val delayMillis: Long) : Effect } private sealed interface Event { diff --git a/mobius-coroutines/src/test/java/com/spotify/mobius/coroutines/MobiusCoroutinesTest.kt b/mobius-coroutines/src/test/java/com/spotify/mobius/coroutines/MobiusCoroutinesTest.kt new file mode 100644 index 0000000..ca3f62e --- /dev/null +++ b/mobius-coroutines/src/test/java/com/spotify/mobius/coroutines/MobiusCoroutinesTest.kt @@ -0,0 +1,131 @@ +package com.spotify.mobius.coroutines + +import com.google.common.truth.Ordered +import com.google.common.truth.Truth.assertThat +import com.spotify.mobius.functions.Consumer +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.* +import org.junit.Test + +@OptIn(ExperimentalCoroutinesApi::class) +class MobiusCoroutinesTest { + + @Test + @Requirement( + given = "an effect handler created from MobiusCoroutines.effectHandler", + `when` = "an effect is dispatched", + then = "the effect is processed" + ) + fun effectHandlerProcessEffects() = runTest { + val testEventConsumer = TestEventConsumer() + + val connection = createEffectHandler().connect(testEventConsumer) + connection.accept(Effect.ComputeSuccess("success1", 100)) + advanceUntilIdle() + + testEventConsumer.assertConsumedEvents( + Event.Loading("success1"), + Event.Success("success1"), + ).inOrder() + } + + + @Test + @Requirement( + given = "an effect handler created from MobiusCoroutines.effectHandler", + `when` = "an effect is dispatched AND the effect handler is disposed before the effect processing completes", + then = "the effect is cancelled" + ) + fun effectHandlerCancelsEffects() = runTest { + val testEventConsumer = TestEventConsumer() + + val connection = createEffectHandler().connect(testEventConsumer) + connection.accept(Effect.ComputeSuccess("success1", 100)) + + advanceTimeBy(50) + testEventConsumer.assertConsumedEvents( + Event.Loading("success1"), + ) + + connection.dispose() + advanceUntilIdle() + testEventConsumer.assertConsumedEvents( + Event.Loading("success1"), + ) + } + + @Test + @Requirement( + given = "an effect handler created from MobiusCoroutines.effectHandler", + `when` = "several effects are dispatched", + then = "the effects are processed concurrently" + ) + fun effectHandlerProcessEffectsConcurrently() = runTest { + val testEventConsumer = TestEventConsumer() + + val connection = createEffectHandler().connect(testEventConsumer) + connection.accept(Effect.ComputeSuccess("success1", 100)) + connection.accept(Effect.ComputeFailure("failure1", 150)) + + advanceTimeBy(50) + testEventConsumer.assertConsumedEvents( + Event.Loading("success1"), + Event.Loading("failure1"), + ) + + advanceTimeBy(60) + testEventConsumer.assertConsumedEvents( + Event.Loading("success1"), + Event.Loading("failure1"), + Event.Success("success1"), + ) + + advanceUntilIdle() + testEventConsumer.assertConsumedEvents( + Event.Loading("success1"), + Event.Loading("failure1"), + Event.Success("success1"), + Event.Failure("failure1"), + ) + } + + private fun TestScope.createEffectHandler() = + MobiusCoroutines.effectHandler(StandardTestDispatcher(testScheduler)) { effect, eventConsumer -> + when (effect) { + is Effect.ComputeFailure -> { + eventConsumer.accept(Event.Loading(effect.id)) + delay(effect.delayMillis) + eventConsumer.accept(Event.Failure(effect.id)) + } + + is Effect.ComputeSuccess -> { + eventConsumer.accept(Event.Loading(effect.id)) + delay(effect.delayMillis) + eventConsumer.accept(Event.Success(effect.id)) + } + } + } + + sealed class Event { + data class Loading(val id: String) : Event() + data class Success(val id: String) : Event() + data class Failure(val id: String) : Event() + } + + sealed class Effect { + data class ComputeSuccess(val id: String, val delayMillis: Long) : Effect() + data class ComputeFailure(val id: String, val delayMillis: Long) : Effect() + } + + private class TestEventConsumer : Consumer { + val events = mutableListOf() + override fun accept(value: Event) { + events.add(value) + } + + fun assertConsumedEvents(vararg expectedEvents: Event): Ordered = + assertThat(events).containsExactly(*expectedEvents) + + } +} \ No newline at end of file