[//]: # (title: Asynchronous Flow)

A suspending function asynchronously returns a single value, but how can we return
multiple asynchronously computed values? This is where Kotlin Flows come in.

## Representing multiple values

Multiple values can be represented in Kotlin using [collections]. 
For example, we can have a `simple` function that returns a [List] 
of three numbers and then print them all using [forEach]:

```kotlin
fun simple(): List = listOf(1, 2, 3)
 
fun main() {
    simple().forEach { value -> println(value) } 
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt).
>
{type="note"}

This code outputs:

```text
1
2
3
```



### Sequences

If we are computing the numbers with some CPU-consuming blocking code 
(each computation taking 100ms), then we can represent the numbers using a [Sequence]:

```kotlin
fun simple(): Sequence = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt).
>
{type="note"}

This code outputs the same numbers, but it waits 100ms before printing each one.



### Suspending functions

However, this computation blocks the main thread that is running the code. 
When these values are computed by asynchronous code we can mark the `simple` function with a `suspend` modifier,
so that it can perform its work without blocking and return the result as a list:

```kotlin
import kotlinx.coroutines.*                 
                           
//sampleStart
suspend fun simple(): List {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking {
    simple().forEach { value -> println(value) } 
}
//sampleEnd
```  
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt).
>
{type="note"}

This code prints the numbers after waiting for a second.



### Flows

Using the `List` result type, means we can only return all the values at once. To represent
the stream of values that are being asynchronously computed, we can use a [`Flow`][Flow] type just like we would use the `Sequence` type for synchronously computed values:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart               
fun simple(): Flow = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> println(value) } 
}
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt).
>
{type="note"}

This code waits 100ms before printing each number without blocking the main thread. This is verified
by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:

```text
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
```



Notice the following differences in the code with the [Flow] from the earlier examples:

* A builder function for [Flow] type is called [flow][_flow].
* Code inside the `flow { ... }` builder block can suspend.
* The `simple` function  is no longer marked with `suspend` modifier.   
* Values are _emitted_ from the flow using [emit][FlowCollector.emit] function.
* Values are _collected_ from the flow using [collect][collect] function.  

> We can replace [delay] with `Thread.sleep` in the body of `simple`'s `flow { ... }` and see that the main
> thread is blocked in this case. 
>
{type="note"}

## Flows are cold

Flows are _cold_ streams similar to sequences — the code inside a [flow][_flow] builder does not
run until the flow is collected. This becomes clear in the following example:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart      
fun simple(): Flow = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}
//sampleEnd
```  
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt).
>
{type="note"}

Which prints:

```text
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
```


 
This is a key reason the `simple` function (which returns a flow) is not marked with `suspend` modifier.
By itself, `simple()` call returns quickly and does not wait for anything. The flow starts every time it is collected,
that is why we see "Flow started" when we call `collect` again.

## Flow cancellation basics

Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be 
cancelled when the flow is suspended in a cancellable suspending function (like [delay]).
The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block 
and stops executing its code:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart           
fun simple(): Flow = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt).
>
{type="note"}

Notice how only two numbers get emitted by the flow in the `simple` function, producing the following output: 

```text
Emitting 1
1
Emitting 2
2
Done
```



See [Flow cancellation checks](#flow-cancellation-checks) section for more details.

## Flow builders

The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
easier declaration of flows:

* [flowOf] builder that defines a flow emitting a fixed set of values.
* Various collections and sequences can be converted to flows using `.asFlow()` extension functions.

So, the example that prints the numbers from 1 to 3 from a flow can be written as:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
//sampleStart
    // Convert an integer range to a flow
    (1..3).asFlow().collect { value -> println(value) }
//sampleEnd 
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt).
>
{type="note"}



## Intermediate flow operators

Flows can be transformed with operators, just as you would with collections and sequences. 
Intermediate operators are applied to an upstream flow and return a downstream flow. 
These operators are cold, just like flows are. A call to such an operator is not
a suspending function itself. It works quickly, returning the definition of a new transformed flow. 

The basic operators have familiar names like [map] and [filter]. 
The important difference to sequences is that blocks of 
code inside these operators can call suspending functions. 

For example, a flow of incoming requests can be
mapped to the results with the [map] operator, even when performing a request is a long-running
operation that is implemented by a suspending function:   

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart           
suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt).
>
{type="note"}

It produces the following three lines, each line appearing after each second:

```text                                                                    
response 1
response 2
response 3
```



### Transform operator

Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate
simple transformations like [map] and [filter], as well as implement more complex transformations. 
Using the `transform` operator, we can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times.

For example, using `transform` we can emit a string before performing a long-running asynchronous request 
and follow it with a response:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking {
//sampleStart
    (1..3).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request") 
            emit(performRequest(request)) 
        }
        .collect { response -> println(response) }
//sampleEnd
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt).
>
{type="note"}

The output of this code is:

```text
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
```



### Size-limiting operators

Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit
is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management
functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun numbers(): Flow = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}            
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt).
>
{type="note"}

The output of this code clearly shows that the execution of the `flow { ... }` body in the `numbers()` function
stopped after emitting the second number:

```text       
1
2
Finally in numbers
```



## Terminal flow operators

Terminal operators on flows are _suspending functions_ that start a collection of the flow.
The [collect] operator is the most basic one, but there are other terminal operators, which can make it easier:

* Conversion to various collections like [toList] and [toSet].
* Operators to get the [first] value and to ensure that a flow emits a [single] value.
* Reducing a flow to a value with [reduce] and [fold].

For example:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
//sampleStart         
    val sum = (1..5).asFlow()
        .map { it * it } // squares of numbers from 1 to 5                           
        .reduce { a, b -> a + b } // sum them (terminal operator)
    println(sum)
//sampleEnd     
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt).
>
{type="note"}

Prints a single number:

```text
55
```



## Flows are sequential

Each individual collection of a flow is performed sequentially unless special operators that operate
on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator. 
No new coroutines are launched by default. 
Each emitted value is processed by all the intermediate operators from 
upstream to downstream and is then delivered to the terminal operator after. 

See the following example that filters the even integers and maps them to strings:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
//sampleStart         
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0              
        }              
        .map { 
            println("Map $it")
            "string $it"
        }.collect { 
            println("Collect $it")
        }    
//sampleEnd                  
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt).
>
{type="note"}

Producing:

```text
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
```



## Flow context

Collection of a flow always happens in the context of the calling coroutine. For example, if there is 
a `simple` flow, then the following code runs in the context specified
by the author of this code, regardless of the implementation details of the `simple` flow:

```kotlin
withContext(context) {
    simple().collect { value ->
        println(value) // run in the specified context 
    }
}
```



This property of a flow is called _context preservation_.

So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector
of the corresponding flow. For example, consider the implementation of a `simple` function that prints the thread
it is called on and emits three numbers:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
//sampleStart
fun simple(): Flow = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking {
    simple().collect { value -> log("Collected $value") } 
}            
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt).
>
{type="note"}

Running this code produces:

```text  
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
```



Since `simple().collect` is called from the main thread, the body of `simple`'s flow is also called in the main thread.
This is the perfect default for fast-running or asynchronous code that does not care about the execution context and
does not block the caller. 

### Wrong emission withContext

However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating
code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used
to change the context in the code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor the context
preservation property and is not allowed to [emit][FlowCollector.emit] from a different context. 

Try running the following code:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
                      
//sampleStart
fun simple(): Flow = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking {
    simple().collect { value -> println(value) } 
}            
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt).
>
{type="note"}

This code produces the following exception:

```text
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
	at ...
``` 



### flowOn operator
   
The exception refers to the [flowOn] function that shall be used to change the context of the flow emission.
The correct way to change the context of a flow is shown in the example below, which also prints the 
names of the corresponding threads to show how it all works:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
//sampleStart
fun simple(): Flow = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking {
    simple().collect { value ->
        log("Collected $value") 
    } 
}            
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt).
>
{type="note"}
  
Notice how `flow { ... }` works in the background thread, while collection happens in the main thread:   



Another thing to observe here is that the [flowOn] operator has changed the default sequential nature of the flow.
Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine
("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The [flowOn] operator
creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context. 

## Buffering

Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes 
to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when
the emission by a `simple` flow is slow, taking 100 ms to produce an element; and collector is also slow, 
taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

//sampleStart
fun simple(): Flow = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt).
>
{type="note"}

It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each):

```text
1
2
3
Collected in 1220 ms
```



We can use a [buffer] operator on a flow to run emitting code of the `simple` flow concurrently with collecting code,
as opposed to running them sequentially:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking { 
//sampleStart
    val time = measureTimeMillis {
        simple()
            .buffer() // buffer emissions, don't wait
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
//sampleEnd
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt).
>
{type="note"}

It produces the same numbers just faster, as we have effectively created a processing pipeline,
having to only wait 100 ms for the first number and then spending only 300 ms to process
each number. This way it takes around 1000 ms to run:

```text
1
2
3
Collected in 1071 ms
```                    



> Note that the [flowOn] operator uses the same buffering mechanism when it has to change a [CoroutineDispatcher],
> but here we explicitly request buffering without changing the execution context.
>
{type="note"}

### Conflation

When a flow represents partial results of the operation or operation status updates, it may not be necessary
to process each value, but instead, only most recent ones. In this case, the [conflate] operator can be used to skip
intermediate values when a collector is too slow to process them. Building on the previous example:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking { 
//sampleStart
    val time = measureTimeMillis {
        simple()
            .conflate() // conflate emissions, don't process each one
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
//sampleEnd
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt).
>
{type="note"}

We see that while the first number was still being processed the second, and third were already produced, so
the second one was _conflated_ and only the most recent (the third one) was delivered to the collector:

```text
1
3
Collected in 758 ms
```             



### Processing the latest value

Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values.
The other way is to cancel a slow collector and restart it every time a new value is emitted. There is
a family of `xxxLatest` operators that perform the same essential logic of a `xxx` operator, but cancel the
code in their block on a new value. Let's try changing [conflate] to [collectLatest] in the previous example:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking { 
//sampleStart
    val time = measureTimeMillis {
        simple()
            .collectLatest { value -> // cancel & restart on the latest value
                println("Collecting $value") 
                delay(300) // pretend we are processing it for 300 ms
                println("Done $value") 
            } 
    }   
    println("Collected in $time ms")
//sampleEnd
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt).
>
{type="note"}
 
Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block
is run on every value, but completes only for the last value:

```text 
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
``` 



## Composing multiple flows

There are lots of ways to compose multiple flows.

### Zip

Just like the [Sequence.zip] extension function in the Kotlin standard library, 
flows have a [zip] operator that combines the corresponding values of two flows:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking { 
//sampleStart                                                                           
    val nums = (1..3).asFlow() // numbers 1..3
    val strs = flowOf("one", "two", "three") // strings 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { println(it) } // collect and print
//sampleEnd
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt).
>
{type="note"}

This example prints:

```text
1 -> one
2 -> two
3 -> three
```
 


### Combine

When flow represents the most recent value of a variable or operation (see also the related 
section on [conflation](#conflation)), it might be needed to perform a computation that depends on
the most recent values of the corresponding flows and to recompute it whenever any of the upstream
flows emit a value. The corresponding family of operators is called [combine].

For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms, 
then zipping them using the [zip] operator will still produce the same result, 
albeit results that are printed every 400 ms:

> We use a [onEach] intermediate operator in this example to delay each element and make the code 
> that emits sample flows more declarative and shorter.
>
{type="note"}

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking { 
//sampleStart                                                                           
    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
    val startTime = System.currentTimeMillis() // remember the start time 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
//sampleEnd
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt).
>
{type="note"}



However, when using a [combine] operator here instead of a [zip]:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking { 
//sampleStart                                                                           
    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
    val startTime = System.currentTimeMillis() // remember the start time 
    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
//sampleEnd
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt).
>
{type="note"}

We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows:

```text 
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
```



## Flattening flows

Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where 
each value triggers a request for another sequence of values. For example, we can have the following
function that returns a flow of two strings 500 ms apart:

```kotlin
fun requestFlow(i: Int): Flow = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}
```



Now if we have a flow of three integers and call `requestFlow` for each of them like this:

```kotlin
(1..3).asFlow().map { requestFlow(it) }
```



Then we end up with a flow of flows (`Flow>`) that needs to be _flattened_ into a single flow for 
further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap]
operators for this. However, due to the asynchronous nature of flows they call for different _modes_ of flattening, 
as such, there is a family of flattening operators on flows.

### flatMapConcat

Concatenating mode is implemented by [flatMapConcat] and [flattenConcat] operators. They are the most direct
analogues of the corresponding sequence operators. They wait for the inner flow to complete before
starting to collect the next one as the following example shows:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking { 
//sampleStart
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapConcat { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
//sampleEnd
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt).
>
{type="note"}

The sequential nature of [flatMapConcat] is clearly seen in the output:

```text                      
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
```



### flatMapMerge

Another flattening mode is to concurrently collect all the incoming flows and merge their values into
a single flow so that values are emitted as soon as possible.
It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional 
`concurrency` parameter that limits the number of concurrent flows that are collected at the same time
(it is equal to [DEFAULT_CONCURRENCY] by default). 

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking { 
//sampleStart
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
//sampleEnd
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt).
>
{type="note"}

The concurrent nature of [flatMapMerge] is obvious:

```text                      
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
```                                               



> Note that the [flatMapMerge] calls its block of code (`{ requestFlow(it) }` in this example) sequentially, but 
> collects the resulting flows concurrently, it is the equivalent of performing a sequential 
> `map { requestFlow(it) }` first and then calling [flattenMerge] on the result.
>
{type="note"}

### flatMapLatest   

In a similar way to the [collectLatest] operator, that was shown in 
["Processing the latest value"](#processing-the-latest-value) section, there is the corresponding "Latest" 
flattening mode where a collection of the previous flow is cancelled as soon as new flow is emitted. 
It is implemented by the [flatMapLatest] operator.

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking { 
//sampleStart
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
//sampleEnd
}
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt).
>
{type="note"}

The output here in this example is a good demonstration of how [flatMapLatest] works:

```text                      
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
```                                               


  
> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) on a new value. 
> It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending,
> and cannot be cancelled. However, it would show up if we were to use suspending functions like `delay` in there.
>
{type="note"}

## Flow exceptions

Flow collection can complete with an exception when an emitter or code inside the operators throw an exception. 
There are several ways to handle these exceptions.

### Collector try and catch

A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions: 

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun simple(): Flow = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt).
>
{type="note"}

This code successfully catches an exception in [collect] terminal operator and, 
as we see, no more values are emitted after that:

```text 
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
```



### Everything is caught

The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators.
For example, let's change the code so that emitted values are [mapped][map] to strings,
but the corresponding code produces an exception:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun simple(): Flow = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt).
>
{type="note"}

This exception is still caught and collection is stopped:

```text 
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
```



## Exception transparency

But how can code of the emitter encapsulate its exception handling behavior?  

Flows must be _transparent to exceptions_ and it is a violation of the exception transparency to [emit][FlowCollector.emit] values in the 
`flow { ... }` builder from inside of a `try/catch` block. This guarantees that a collector throwing an exception
can always catch it using `try/catch` as in the previous example.

The emitter can use a [catch] operator that preserves this exception transparency and allows encapsulation
of its exception handling. The body of the `catch` operator can analyze an exception
and react to it in different ways depending on which exception was caught:

* Exceptions can be rethrown using `throw`.
* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch].
* Exceptions can be ignored, logged, or processed by some other code.

For example, let us emit the text on catching an exception:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking {
//sampleStart
    simple()
        .catch { e -> emit("Caught $e") } // emit on exception
        .collect { value -> println(value) }
//sampleEnd
}            
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt).
>
{type="note"} 
 
The output of the example is the same, even though we do not have `try/catch` around the code anymore. 



### Transparent catch

The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions
(that is an exception from all the operators above `catch`, but not below it).
If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes:  

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun simple(): Flow = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    simple()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}            
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt).
>
{type="note"}
 
A "Caught ..." message is not printed despite there being a `catch` operator: 

```text  
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
	at ...
```



### Catching declaratively

We can combine the declarative nature of the [catch] operator with a desire to handle all the exceptions, by moving the body
of the [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must
be triggered by a call to `collect()` without parameters:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
//sampleStart
    simple()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
        .catch { e -> println("Caught $e") }
        .collect()
//sampleEnd
}            
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt).
>
{type="note"} 
 
Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly
using a `try/catch` block: 

```text 
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
```



## Flow completion

When flow collection completes (normally or exceptionally) it may need to execute an action. 
As you may have already noticed, it can be done in two ways: imperative or declarative.

### Imperative finally block

In addition to `try`/`catch`, a collector can also use a `finally` block to execute an action 
upon `collect` completion.

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun simple(): Flow = (1..3).asFlow()

fun main() = runBlocking {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}            
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt).
>
{type="note"} 

This code prints three numbers produced by the `simple` flow followed by a "Done" string:

```text
1
2
3
Done
```



### Declarative handling

For the declarative approach, flow has [onCompletion] intermediate operator that is invoked
when the flow has completely collected.

The previous example can be rewritten using an [onCompletion] operator and produces the same output:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow = (1..3).asFlow()

fun main() = runBlocking {
//sampleStart
    simple()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }
//sampleEnd
}            
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt).
>
{type="note"} 



The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used
to determine whether the flow collection was completed normally or exceptionally. In the following
example the `simple` flow throws an exception after emitting the number 1:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun simple(): Flow = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}            
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt).
>
{type="note"}

As you may expect, it prints:

```text
1
Flow completed exceptionally
Caught exception
```



The [onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above
example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
and can be handled with a `catch` operator.

### Successful completion

Another difference with [catch] operator is that [onCompletion] sees all exceptions and receives
a `null` exception only on successful completion of the upstream flow (without cancellation or failure).

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun simple(): Flow = (1..3).asFlow()

fun main() = runBlocking {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
>
{type="note"}

We can see the completion cause is not null, because the flow was aborted due to downstream exception:

```text 
1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
```



## Imperative versus declarative

Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways.
The natural question here is, which approach is preferred and why?
As a library, we do not advocate for any particular approach and believe that both options
are valid and should be selected according to your own preferences and code style. 

## Launching flow

It is easy to use flows to represent asynchronous events that are coming from some source.
In this case, we need an analogue of the `addEventListener` function that registers a piece of code with a reaction
for incoming events and continues further work. The [onEach] operator can serve this role. 
However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow. 
Otherwise, just calling `onEach` has no effect.
 
If we use the [collect] terminal operator after `onEach`, then the code after it will wait until the flow is collected:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
// Imitate a flow of events
fun events(): Flow = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}            
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt).
>
{type="note"} 
  
As you can see, it prints:

```text 
Event: 1
Event: 2
Event: 3
Done
```    


 
The [launchIn] terminal operator comes in handy here. By replacing `collect` with `launchIn` we can
launch a collection of the flow in a separate coroutine, so that execution of further code 
immediately continues:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Imitate a flow of events
fun events(): Flow = (1..3).asFlow().onEach { delay(100) }

//sampleStart
fun main() = runBlocking {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}            
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt).
>
{type="note"} 
  
It prints:

```text          
Done
Event: 1
Event: 2
Event: 3
```    



The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is 
launched. In the above example this scope comes from the [runBlocking]
coroutine builder, so while the flow is running, this [runBlocking] scope waits for completion of its child coroutine
and keeps the main function from returning and terminating this example. 

In actual applications a scope will come from an entity with a limited 
lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling
the collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works
like the `addEventListener`. However, there is no need for the corresponding `removeEventListener` function, 
as cancellation and structured concurrency serve this purpose.

Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
coroutine only without cancelling the whole scope or to [join][Job.join] it.

### Flow cancellation checks

For convenience, the [flow][_flow] builder performs additional [ensureActive] checks for cancellation on each emitted value. 
It means that a busy loop emitting from a `flow { ... }` is cancellable:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart           
fun foo(): Flow = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

fun main() = runBlocking {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt).
>
{type="note"}

We get only numbers up to 3 and a [CancellationException] after trying to emit number 4:

```text 
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
```



However, most other flow operators do not do additional cancellation checks on their own for performance reasons. 
For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere, 
then there are no checks for cancellation:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart           
fun main() = runBlocking {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt).
>
{type="note"}

All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`:

```text
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
```



#### Making busy flow cancellable 

In the case where you have a busy loop with coroutines you must explicitly check for cancellation.
You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use
[cancellable] operator provided to do that:

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart           
fun main() = runBlocking {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
//sampleEnd
```
{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}

> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt).
>
{type="note"}

With the `cancellable` operator only the numbers from 1 to 3 are collected:

```text
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
```



## Flow and Reactive Streams

For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor, 
design of the Flow may look very familiar.

Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, 
be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article.

While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa.
Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2`/`kotlinx-coroutines-rx3` for RxJava2/RxJava3).
Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities.
 


[collections]: https://kotlinlang.org/docs/reference/collections-overview.html 
[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/ 
[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html
[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/
[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html
[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html
[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html
[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html




[delay]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
[withTimeoutOrNull]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
[Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
[Dispatchers.Main]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
[withContext]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
[CoroutineDispatcher]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
[CoroutineScope]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
[runBlocking]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
[Job]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
[Job.cancel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel.html
[Job.join]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
[ensureActive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html
[CancellationException]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html



[Flow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
[_flow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
[FlowCollector.emit]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
[collect]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
[flowOf]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
[map]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
[filter]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html
[transform]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
[take]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
[toList]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
[toSet]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html
[first]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html
[single]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html
[reduce]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html
[fold]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html
[flowOn]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
[buffer]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html
[conflate]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
[collectLatest]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html
[zip]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html
[combine]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html
[onEach]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html
[flatMapConcat]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html
[flattenConcat]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html
[flatMapMerge]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
[flattenMerge]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html
[DEFAULT_CONCURRENCY]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html
[flatMapLatest]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html
[catch]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
[onCompletion]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
[launchIn]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
[IntRange.asFlow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/as-flow.html
[cancellable]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html