Coroutines in Kotlin
Author:
Views:
Category:
Introduction
Why Coroutine
Implementation
Coroutine

Kotlin 1.1 introduced coroutines, a new way of writing asynchronous, non-blocking code (and much more). One can think of a coroutine as a light-weight thread. Like threads, coroutines can run in parallel, wait for each other and communicate.

How to use coroutine in kotlin??

kotlinx.coroutines is a rich library for coroutines developed by JetBrains. It contains a number of high-level coroutine-enabled primitives that this guide covers, including launch, async and runBlocking. With the help of the kotlinx.coroutines library, which is a collection of helpers and wrappers for existing Java libraries. Simply add dependencies in build.gradle file

dependencies {
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.4"
}
How coroutines comes into existence?

For decades, as developers we are confronted with a problem to solve - how to prevent our applications from blocking. Whether we're developing desktop, mobile, or even server-side applications, we want to avoid having the user wait or what's worse cause bottlenecks that would prevent an application from scaling.

Check out why coroutine tab to know its advantages.

There have been many approaches to solving this problem, including:

  • Threading
  • Callbacks
  • Futures, Promises et al.
  • Coroutines

Lets look into above topics breefly

Threading

Threads are by far probably the most well-known approach to avoid applications from blocking

fun postItem(item: Item) {
    val token = preparePost()
    val post = submitPost(token, item)
    processPost(post)
}

fun preparePost(): Token {
    // makes a request and consequently blocks the main thread
    return token
}

Let's assume in the code above that preparePost is a long-running process and consequently would block the user interface. What we can do is launch it in a separate thread. This would then allow us to avoid the UI from blocking. This is a very common technique, but has a series of drawbacks :

  • Threads aren't cheap. Threads require context switches which are costly.
  • Threads aren't infinite. The number of threads that can be launched is limited by the underlying operating system. In server-side applications, this could cause a major bottleneck.
  • Threads aren't always available. Some platforms, such as JavaScript do not even support threads.
  • Threads aren't easy. Debugging threads, avoiding race conditions are common problems we suffer in multi-threaded programming.
Callbacks

With callbacks, the idea is to pass one function as a parameter to another function, and have this one invoked once the process has completed.

fun postItem(item: Item) {
    preparePostAsync { token -> 
        submitPostAsync(token, item) { post -> 
            processPost(post)
        }
    }
}

fun preparePostAsync(callback: (Token) -> Unit) {
    // make request and return immediately 
    // arrange callback to be invoked later
}

This in principle feels like a much more elegant solution, but once again has several issues:

  • Difficulty of nested callbacks. Usually a function that is used as a callback, often ends up needing its own callback. This leads to a series of nested callbacks which lead to incomprehensible code.
  • Error handling is complicated. The nesting model makes error handling and propagation of these somewhat more complicated.

Callbacks are quite common in event-loop architectures such as JavaScript, but even there, generally people have moved away to using other approaches such as promises or reactive extensions.

Futures, Promises

The idea behind futures or promises (there are also other terms these can be referred to depending on language/platform), is that when we make a call, we're promised that at some point it will return with an object called a Promise, which can then be operated on.

fun postItem(item: Item) {
    preparePostAsync() 
        .thenCompose { token -> 
            submitPostAsync(token, item)
        }
        .thenAccept { post -> 
            processPost(post)
        }
         
}

fun preparePostAsync(): Promise {
    // makes request an returns a promise that is completed later
    return promise 
}

This approach requires a series of changes in how we program, in particular

  • Different programming model. Similar to callbacks, the programming model moves away from a top-down imperative approach to a compositional model with chained calls. Traditional program structures such as loops, exception handling, etc. usually are no longer valid in this model.
  • Different APIs. Usually there's a need to learn a completely new API such as thenCompose or thenAccept, which can also vary across platforms.
  • Specific return type. The return type moves away from the actual data that we need and instead returns a new type Promise which has to be introspected
  • Error handling can be complicated. The propagation and chaining of errors aren't always straightforward.
Implementation of coroutine

One can think of a coroutine as a light-weight thread. Like threads, coroutines can run in parallel, wait for each other and communicate. The biggest difference is that coroutines are very cheap, almost free: we can create thousands of them, and pay very little in terms of performance.

True threads, on the other hand, are expensive to start and keep around. A thousand threads can be a serious challenge for a modern machine.

Let's use the launch {} function:

launch {
    ...
}

This starts a new coroutine. By default, coroutines are run on a shared pool of threads. Threads still exist in a program based on coroutines, but one thread can run many coroutines, so there's no need for too many threads.

Let's look at a full program that uses launch:

println("Start")

// Start a coroutine
GlobalScope.launch {
    delay(1000)
    println("Hello")
}

Thread.sleep(2000) // wait for 2 seconds
println("Stop")

Here we start a coroutine that waits for 1 second and prints Hello.

We are using the delay() function that's like Thread.sleep(), but better: it doesn't block a thread, but only suspends the coroutine itself.

The thread is returned to the pool while the coroutine is waiting, and when the waiting is done, the coroutine resumes on a free thread in the pool.

The main thread (that runs the main() function) must wait until our coroutine completes, otherwise the program ends before Hello is printed.

Let's use the runBlocking {} function:

If we try to use the same non-blocking delay() function directly inside main(), we'll get a compiler error:

Suspend functions are only allowed to be called from a coroutine or another suspend function

This is because we are not inside any coroutine. We can use delay if we wrap it into runBlocking {} that starts a coroutine and waits until it's done:

runBlocking {
    delay(2000)
}

So, first the resulting program prints Start, then it runs a coroutine through launch {}, then it runs another one through runBlocking {} and blocks until it's done, then prints Stop. Meanwhile the first coroutine completes and prints Hello. Just like threads.

Async: returning a value from a coroutine

Another way of starting a coroutine is async {}. It is like launch {}, but returns an instance of Deferred, which has an await() function that returns the result of the coroutine. Deferred is a very basic future (fully-fledged JDK futures are also supported, but here we'll confine ourselves to Deferred for now).

Let's create a million coroutines again, keeping their Deferred objects. Now there's no need in the atomic counter, as we can just return the numbers to be added from our coroutines:

val deferred = (1..1_000_000).map { n ->
    GlobalScope.async {
        n
    }
}

All these have already started, all we need is collect the results:

val sum = deferred.map { it.await().toLong() }.sum()

We simply take every coroutine and await its result here, then all results are added together by the standard library function sum(). But the compiler rightfully complains:

Suspend functions are only allowed to be called from a coroutine or another suspend function

await() can not be called outside a coroutine, because it needs to suspend until the computation finishes, and only coroutines can suspend in a non-blocking way. So, let's put this inside a coroutine:

runBlocking {
    val sum = deferred.map { it.await().toLong() }.sum()
    println("Sum: $sum")
}

Now it prints something sensible: 500000500000, because all coroutines complete.

Let's also make sure that our coroutines actually run in parallel. If we add a 1-second delay() to each of the async's, the resulting program won't run for 1'000'000 seconds (over 11,5 days):

val deferred = (1..1_000_000).map { n ->
    GlobalScope.async {
        delay(1000)
        n
    }
}

This takes about 10 seconds on my machine, so yes, coroutines do run in parallel.

Suspending functions

Now, if we want to extract our workload (which is "wait 1 second and return a number") into a separate function:

fun workload(n: Int): Int {
    delay(1000)
    return n
}

A familiar error pops up:

Suspend functions are only allowed to be called from a coroutine or another suspend function

Let's dig a little into what it means. The biggest merit of coroutines is that they can suspend without blocking a thread. The compiler has to emit some special code to make this possible, so we have to mark functions that may suspend explicitly in the code. We use the suspend modifier for it:

suspend fun workload(n: Int): Int {
    delay(1000)
    return n
}

Now when we call workload() from a coroutine, the compiler knows that it may suspend and will prepare accordingly:

GlobalScope.async {
    workload(n)
}

Our workload() function can be called from a coroutine (or another suspending function), but cannot be called from outside a coroutine. Naturally, delay() and await() that we used above are themselves declared as suspend, and this is why we had to put them inside runBlocking {}, launch {} or async {}.