Asynchronous and reactive programming in Kotlin made easy
- Flow is a stream that produces values asynchronously
- Flow uses coroutines internally. And because of this, it enjoys all the perks of structured concurrency
- When you cancel the scope, you also release any running coroutines. The same rules apply to Kotlin Flow as well. When you cancel the scope, you also dispose of the Flow. You don’t have to free up memory manually
In simple words, we can say that it helps in doing a task and emitting items.
Here are a few examples of creating flow using flow builder
1.flow{}
- Using the
flow
builder function with a lambda expression to emit values to the flow. This is the most common way to create a flow
val myFlow = flow {
delay(1000)
emit(1) // Emit values to the flow
}
myflow.collect{it ->
print(it)
}
//1
2.flowOf
- Using the
flowOf
function to create a flow from a fixed set of values.
val myFlow = flowOf(1, 2, 3).collect{
print(it)
}
//1 2 3
3.asFlow
- Using the
asFlow
extension function to convert other types of collections or sequences to flows
(1..5).asFlow().collect{ print(it) }
//1 2 3 4 5
4.channelFlow
- Using the channelFlow function to create a flow that can emit values from multiple coroutines concurrently
val myFlow = channelFlow {
send(1) // Emit values to the flow
}
//1
- The operator helps in transforming the data from one format to another.
Here are some of the commonly used flow operators: 1.map Transforms each emitted value of the flow using a provided function.
flow{
(1..3).forEach{
emit(it)
}
}.map{
it * it
}.collect{
print(it)
}
//1 4 9
2.filter Filters the values emitted by the flow based on a provided predicate function.
flow{
(1..3).forEach{
emit(it)
}
}.filter{
it > 1
}.collect{
print(it)
}
//2 3
3.take Limits the number of values emitted by the flow to a specified number. myFlow.take(5)
flow{
(1..3).forEach{
emit(it)
}
}.take(1)
.collect{
print(it)
}
//1
4.zip Combines two flows into a new flow of pairs.
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf("one", "two", "three")
flow1.zip(flow2) { a, b -> "$a -> $b" }.collect{
print(it)
}
// 1 -> one 2 -> two 3 -> three
5.flowOn To controlling the thread on which the task will be done
val flow = flow {
// Run on Background Thread (Dispatchers.Default)
(0..10).forEach {
// emit items with 500 milliseconds delay
delay(500)
emit(it)
}
}
.flowOn(Dispatchers.Default) <--------------controlling thread
6.flatMapConcat Transforms elements emitted by the original flow by applying transform, that returns another flow, and then concatenating and flattening these flows.
private val firstNameFlow = flow {
emit("Ahmad")
}
firstNameFlow.flatMapConcat { firstName ->
flow {
emit(firstName)
emit("Kazimi")
}
}.collect {
println("My name is : $it ")
}
//My name is Ahmad Kazimi
7.retry Used to retry an upstream flow collection in case of failure.
doLongRunningTask().
.retry(retries = 3) { cause -> //we declare 3 retries here
if (cause is IOException) {
return@retry true
}else{
return@retry false
}
}.catch {
// error
}.collect {
// success
}
8.debounce
searchView.getQueryTextChangeStateFlow()
.debounce(300)
- Mainly used in search functionality
- when the user types “a”, “ab”, “abc”, in a very short time. So, there will be so many network calls. But the user is finally interested in the result of the search “abc”. So, we must discard the results of “a” and “ab”. Ideally, there should be no network calls for “a” and “ab” as the user typed those in a very short time.
- So, the
debounce
operator comes to the rescue. It will wait for the provided time for doing anything
9. distinctUntilChanged Used to avoid duplicate network calls.
- Let say the last on-going search query was “abc” and the user deleted “c” and again typed “c”. So again it’s “abc”. So if the network call is already going on with the search query “abc”, it will not make the duplicate call again with the search query “abc”.
searchView.getQueryTextChangeStateFlow()
.distinctUntilChanged()
10. flatMapLatest Used to avoid the network call results which are not needed more for displaying to the user.
- Let say the last search query was “ab” and there is an ongoing network call for “ab” and the user typed “abc”. Then, we are no more interested in the result of “ab”. We are only interested in the result of “abc”. So, the
flatMapLatest
comes to the rescue. It only provides the result for the last search query(most recent) and ignores the rest.
searchView.getQueryTextChangeStateFlow()
.flatMapLatest { query ->
dataFromNetwork(query)
.catch {
emitAll(flowOf(""))
}
}
11. reduce This operator applies a given operation to all the emitted items and returns the final result.
val sum = flowOf(1, 2, 3).reduce { a, b -> a + b }
print(sum)
//6
12. toList() collects all the items emitted by the flow and returns them as a list.
val list = flowOf(1, 2, 3).toList()
print(list)
//[1, 2, 3]
13. fold
This operator is similar to reduce
, but it also takes an initial value and applies the given operation to the initial value and all the emitted items.
val sum = flowOf(1, 2, 3).fold(1) { acc, value -> acc + value }
print(sum)
//7
- The collector collects the items emitted using the Flow Builder which are transformed by the operators.
flowOf(4, 2, 5, 1, 7)
.collect {
//here we collect items which are emitted by flow
}
Cold flow | Hot flow |
---|---|
It emits data only when there is a collector. | It emits data even when there is no collector. |
It can't have multiple collectors. one flow for one collector. | It can have multiple collectors. one flow for many collector. |
fun coldFlowExample(): Flow<Int> = flow {
for (i in 1..5) {
delay(1000)
emit(i)
}
}
runBlocking {
val coldFlow = coldFlowExample()
coldFlow.collect { print(it) }
delay(2000)
coldFlow.collect { print(it) }
}
//1234512345
//it will print all emitted value by flow with delay
fun hotFlowExample(): Flow<Long> = flow {
var count = 0L
while (true) {
emit(count++)
delay(1000)
}
}.shareIn(GlobalScope, SharingStarted.Eagerly, 1)
runBlocking {
val hotFlow = hotFlowExample()
val job1 = launch {
hotFlow.collect { println("Subscriber 1: $it") }
}
delay(5000)
val job2 = launch {
hotFlow.collect { println("Subscriber 2: $it") }
}
delay(5000)
job1.cancel()
job2.cancel()
}
//Subscriber 1: 0
//Subscriber 1: 1
//Subscriber 1: 2
//Subscriber 1: 3
//Subscriber 1: 4
//Subscriber 2: 4
//Subscriber 1: 5
//Subscriber 2: 5
//Subscriber 1: 6
//Subscriber 2: 6
//Subscriber 1: 7
//Subscriber 2: 7
//Subscriber 1: 8
//Subscriber 2: 8
//Subscriber 1: 9
//Subscriber 2: 9
//if you notice, subscriber 2 is not able to collect all values which emit by
flow
- It's hot flow that Only emits the last known value.
- Needs an initial value and emits it as soon as the collector starts collecting.
Note: We should use repeatOnLifecycle scope with StateFlow to add the Lifecycle awareness to it.
val stateFlow = MutableStateFlow(0)
stateFlow.collect {
println(it)
}
//0
//it takes an initial value and emits it immediately.
stateFlow.value = 1
stateFlow.value = 2
stateFlow.value = 2
stateFlow.value = 1
stateFlow.value = 3
//1
//2
//1
//3
//Notice here that we are getting "2" only once, not twice. As it does not emit consecutive repeated values.
//Suppose we add a new collector now:
stateFlow.collect {
println(it)
}
//3
//As the StateFlow stores the last value and emits it as soon as a new
collector starts collecting.
- It's hot flow that does not need an initial value so does not emit any value by default.
- It emits all the values and does not care about the distinct from the previous item. It emits consecutive repeated values also.
val sharedFlow = MutableSharedFlow<Int>()
sharedFlow.collect {
println(it)
}
//we get nothing because it does not take an initial value.
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(2)
sharedFlow.emit(1)
sharedFlow.emit(3)
//1
//2
//2
//1
//3
//Notice here that we are getting "2" twice. As it emits consecutive repeated
values also.
//Suppose we add a new collector now:
sharedFlow.collect {
println(it)
}
//We will not get anything as the SharedFlow does not store the last value
We have a StateFlow in our ViewModel
val usersStateFlow = MutableStateFlow<UiState<List<User>>>(UiState.Loading)
we have a collector in our Activity
usersStateFlow.collect {
}
Now, as soon as we open the activity.ViewModel fetches the data from the network. It will set the data to the usersStateFlow
usersStateFlow.value = UiState.Success(usersFromNetwork)
Now, if orientation changes, StateFlow keeps the last value, so No need for a new network call.
Now, let's try to use the SharedFlow in place of the StateFlow.
val usersSharedFlow = MutableSharedFlow<UiState<List<User>>>()
usersSharedFlow.collect {
}
Now, as soon as we open the activity. ViewModel fetches the data from the network. It will set the data to the usersSharedFlow.
usersSharedFlow.emit(UiState.Success(usersFromNetwork))
Now, if orientation changes, SharedFlow does not store any data. We will have to make a new network call.
So, in this case, we should use StateFlow instead of SharedFlow
our code like this,
val showSnackbarSharedFlow = MutableSharedFlow<Boolean>()
showSnackbarSharedFlow.collect {
}
Now, as soon as we open the activity. ViewModel starts the task and gets failed. It will set the value true to showSnackbarSharedFlow.
showSnackbarSharedFlow.emit(true)
Now, if orientation changes, SharedFlow does not store any data & so we are not seen Snackbar again. Now, let's try to use the StateFlow in place of the SharedFlow.
val showSnackbarStateFlow = MutableStateFlow(false)
showSnackbarStateFlow.collect {
}
Now, as soon as we open the activity. ViewModel starts the task and gets failed. It will set the value true to showSnackbarSharedFlow
showSnackbarStateFlow.value = true
Now, if orientation changes, here as StateFlow keeps the last value. It will show the Snackbar again. So, in this case, we should use SharedFlow instead of StateFlow.
- Used to convert any Callback to Flow API
simple callback
val locationListener = object : LocationListener {
override fun onLocationUpdate(location: Location) {
// do something with the updated location
}
}
//register location
LocationManager.registerForLocation(locationListener)
//unregister location
LocationManager.unregisterForLocation(locationListener)
using callbackFlow
fun getLocationFlow(): Flow<Location> {
return callbackFlow {
val locationListener = object : LocationListener {
override fun onLocationUpdate(location: Location) {
trySend(location)
}
}
LocationManager.registerForLocation(locationListener)
awaitClose {
LocationManager.unregisterForLocation(locationListener)
}
}
}
//use it like,
runBlocking {
getLocationFlow()
.collect { location ->
// you will get updated location here
}
}