Concurrency
Coroutines
Kotlin Coroutines are a “suspendable computation”.
- This is a language level concept - the operating system scheduler has no idea coroutines exist.
- Most of the time there’s a M:N mapping of Coroutines to system threads.
- Different coroutine dispatchers use different thread pools.
Each coroutine is created inside a CoroutineScope
. Multiple coroutines can be created inside the same scope. The CoroutineScope
’s main purpose is to provide coroutine builder functions like launch
and async
which delimite the lifetime of coroutines.
The scope also acts as a wrapper around other data structures related to the coroutines inside it such as the CoroutineContext
.
The CoroutineContext
is a kind of map that uses object types as keys to hold properites related to the scope it’s in. The only property that should be accessed is the Job.
Jobs are used for structural concurrency. They handle the lifecycle state of the coroutine. Jobs can be lazy started which allows creating the coroutine upfront but only starting it when needed. Jobs can also be canceled.
Canceling Jobs
When a job is canceled, all of its children are also canceled. This code creates a tree of 3 coroutines with one parent and two children.
fun main() = runBlocking<Unit> {
val job1 = launch {
println("starting 1")
launch {
println("starting 1.1")
delay(3.seconds)
println("done 1.1")
}
launch {
println("starting 1.2")
delay(3.seconds)
println("done 1.2")
}
delay(1.seconds)
println("done 1")
}
delay(2.seconds)
println("job 1 active: ${job1.isActive}")
job1.cancelAndJoin()
}
Even though the parent (job1) completes it is still active waiting on the children to complete. When job1
is canceled the children will also be canceled completing the coroutine.
The output is:
starting 1
starting 1.1
starting 1.2
job 1 active: true
done 1
Shared mutable state
Coroutines will capture their context but just like threads, variables outside of the scope are shared and access must be synchronized with
In the following code the internalData
is thread-safe for the wrapped coroutines (inside launch
) but the externalData
is not.
Access to externalData
would require some form of synchronization.
var externalData = 0
fun main() = runBlocking {
launch {
var internalData = 0
}
}
Exception Handling
fun main() = runBlocking<Unit> {
runCatching {
coroutineScope {
launch {
val proc1 = async {
delay(1.seconds)
println("done task 1.1")
1
}
val proc2 = async {
delay(2.seconds)
throw Exception("boom!")
//never reached
println("done task 1.2")
2
}
// not the same as `this.map { it.await() }`
val (result1, result2) = listOf(proc1, proc2).awaitAll()
println("result is: ${result1 + result2}")
}
launch {
delay(3.seconds)
println("done task 2")
}
}
}
println("done")
}
Flows
TODO
Channels
Channels are like a BlockingQueue
in that message can be put onto a channel and depending on the channel configuration will:
- block until there’s a receiver actively polling for new messages (
RENDEZVOUS
) - block until there’s capacity in the channel (
BUFFERED
) - always succeed (
CONFLATED
orUNLIMITED
)
Channels are used for communicating between coroutines in an asynchronous, thread-safe way.
Channels are single message type only. Meaning, when creating a channel the type of data being passed on it must be defined but inheritance is allowed.
data class Message(val id: Int, val message: String)
// create a rendezvous channel
val channel = Channel<Message>()
Channels can be used in normal 1:1, fan-in, and fan-out models.
Any read operation on a channel will consume the messages. This means, two separate channels are needed for bidirectional communication between coroutines.
Reading from a channel can be done in a normal for
loop. When done this way, closing the channel will automatically exit the loop.
for(message in channel) {
// process message
}
Testing
Use runTest
when creating unit tests for coroutines. This behaves like runBlocking
except that it will skip delays.
Use StandardTestDispatcher
in combination with TestCoroutineScheduler
to override uses of custom dispatchers in tested code. This will order coroutine execution in a known way and allow stepping through time.
Use advanceUntilIdel
to move the virtual time forward until there are no more tasks queued in the scheduler.