A curious individual

Kotlin Coroutine Notes

On going collection of notes about Coroutines.

  ·   4 min read

Concurrency

concurrent vs parallel

Coroutines

Kotlin Coroutines are a “suspendable computation”.

  • This is a language level concept - the operating system scheduler has no idea coroutines exist.
  • Most of the time there’s a M:N mapping of Coroutines to system threads.
  • Different coroutine dispatchers use different thread pools.

model

Each coroutine is created inside a CoroutineScope. Multiple coroutines can be created inside the same scope. The CoroutineScope’s main purpose is to provide coroutine builder functions like launch and async which delimite the lifetime of coroutines.

The scope also acts as a wrapper around other data structures related to the coroutines inside it such as the CoroutineContext.

The CoroutineContext is a kind of map that uses object types as keys to hold properites related to the scope it’s in. The only property that should be accessed is the Job.

Jobs are used for structural concurrency. They handle the lifecycle state of the coroutine. Jobs can be lazy started which allows creating the coroutine upfront but only starting it when needed. Jobs can also be canceled.

Canceling Jobs

When a job is canceled, all of its children are also canceled. This code creates a tree of 3 coroutines with one parent and two children.

fun main() = runBlocking<Unit> {
    val job1 = launch {
        println("starting 1")

        launch {
            println("starting 1.1")
            delay(3.seconds)
            println("done 1.1")
        }
        launch {
            println("starting 1.2")
            delay(3.seconds)
            println("done 1.2")
        }

        delay(1.seconds)
        println("done 1")
    }

    delay(2.seconds)
    println("job 1 active: ${job1.isActive}")
    job1.cancelAndJoin()
}

Even though the parent (job1) completes it is still active waiting on the children to complete. When job1 is canceled the children will also be canceled completing the coroutine.

The output is:

starting 1
starting 1.1
starting 1.2
job 1 active: true
done 1

Shared mutable state

Kotlin doc

Coroutines will capture their context but just like threads, variables outside of the scope are shared and access must be synchronized with

In the following code the internalData is thread-safe for the wrapped coroutines (inside launch) but the externalData is not. Access to externalData would require some form of synchronization.

var externalData = 0

fun main() = runBlocking {
    launch {
        var internalData = 0
    }
}

Exception Handling

fun main() = runBlocking<Unit> {
    runCatching {
        coroutineScope {
            launch {
                val proc1 = async {
                    delay(1.seconds)
                    println("done task 1.1")
                    1
                }
                val proc2 = async {
                    delay(2.seconds)
                    throw Exception("boom!")

                    //never reached
                    println("done task 1.2")
                    2
                }

                // not the same as `this.map { it.await() }`
                val (result1, result2) = listOf(proc1, proc2).awaitAll()
                println("result is: ${result1 + result2}")
            }

            launch {
                delay(3.seconds)
                println("done task 2")
            }
        }
    }
    println("done")
}

Flows

TODO

Channels

Channels are like a BlockingQueue in that message can be put onto a channel and depending on the channel configuration will:

Channels are used for communicating between coroutines in an asynchronous, thread-safe way.

Channels are single message type only. Meaning, when creating a channel the type of data being passed on it must be defined but inheritance is allowed.

data class Message(val id: Int, val message: String)

// create a rendezvous channel
val channel = Channel<Message>()

Channels can be used in normal 1:1, fan-in, and fan-out models.

Any read operation on a channel will consume the messages. This means, two separate channels are needed for bidirectional communication between coroutines.

Reading from a channel can be done in a normal for loop. When done this way, closing the channel will automatically exit the loop.

for(message in channel) {
    // process message
}

Testing

Use runTest when creating unit tests for coroutines. This behaves like runBlocking except that it will skip delays.

Use StandardTestDispatcher in combination with TestCoroutineScheduler to override uses of custom dispatchers in tested code. This will order coroutine execution in a known way and allow stepping through time.

Use advanceUntilIdel to move the virtual time forward until there are no more tasks queued in the scheduler.