Rx to Coroutines Concepts, Part 2.1: Exceptions

Part 1, Async vs. Reactive, Part 2, Structured Concurrency, Part 3, Deferred & Channels, Part 4, Cold Flows, Part 5, Shared Flows

When I wrote the last entry in this series, I thought I had covered the important points about exceptions in structured concurrent coroutines code.

It turns out that’s not the case! So here we are: entry 2.1 in the series. There won’t be much talk of RxJava in this post. Just talk about exceptions. I’ll start with the fundamentals, show how structured concurrent control flow is built on top of those fundamentals, and then try and arrive at some best practice takeaways.

Exceptions In Bare Coroutines

Let’s say I launch a coroutine and immediately throw an exception:

launch {
  error(“I blew up!”)
}

We’ve got a coroutine that has blown up. What else should blow up when that happens?

For normal non-concurrent Java/Kotlin exception handling, we say “If a piece of code blows up, any code that depends on its successful completion also blows up.” Since Kotlin code is structured logic, that means that the exception will jump to exactly one place: to the nearest enclosing try/catch block, or up the stack to the nearest catching try/catch block.

fun functionA() {
  // Block A
  if (this) {
    // Block B
    if (that) {
      functionB()
    }
  }
}

In other words, if functionB blows up, that blows up block B, which blows up block A, which blows up functionA. It’s like a chain of firecrackers: satisfyingly logical and destructive.

For a coroutine, the same logic applies: “If a piece of code blows up, any code that depends on its successful completion also blows up.”

Unlike plain Kotlin code, though, we don’t have a simple call stack. Multiple call sites can depend on successful completion of a coroutine:

val job = launch {
  error(“I blew up!”)
}

launch {
  println(“Waiting for job to finish...”)
  job.join()
  doSomeAdditionalWork()
}

launch {
  println(“Waiting for job to finish...”)
  job.join()
  doMoreAdditionalWork()
}

A call to job.join() waits for successful completion of the job. If the job throws an exception instead of completing successfully, any call to job.join() will throw.

async and await work exactly the same way:

val loginResult = async {
  loginService.login(credentials)
}

launch {
  db.saveLoginToken(loginResult.await().token)
}
launch {
  sessionManager.startSession(loginResult.await().user)
}

Both async and launch create a Job. The only difference is that you can await on the job you start when you call async.

In either case, multiple call sites can await or join on the same coroutine, which means that multiple call sites can throw the same exception. This is different from the custody semantics we are familiar with from plain old Java, where every exception is handled by exactly one catch block.

Avoid Catching Awaits and Joins

Because there are no clear owners to exceptions thrown by coroutines, it’s a bad idea to write logic around the exceptions thrown by await() or join().

To see what I mean, take a look at the following attempt to add some analytics around failed login attempts:

launch {
  try {
    db.saveLoginToken(loginResult.await().token)
  } catch (e: Exception) {
    if (e.isFailedLoginAttempt()) {
      analytics.logFailedLoginAttempt(e)
    }
  }
}

This seems okay. But is it, really? This same exception will also be thrown here:

launch {
  sessionManager.startSession(loginResult.await().user)
}

It’s not entirely clear which await() is the right place to put this code. Some misguided soul might add the same code in both locations, or put some code in one and some in the other.

Truth be told, neither call site is the right place to “handle” the exception, because neither of them owns the exception. If you wish to “handle” the error scenario represented by the exception, that code should live within the original coroutine:

val loginResult = async {
  try {
    loginService.login(credentials)
  } catch (e: Exception) {
    if (e.isFailedLoginAttempt()) {
      analytics.logFailedLoginAttempt(e)
    }
  }
}

It’s only within the coroutine that you can maintain a structured guarantee that you’ll take action on that exception exactly once. So put that kind of exception handling within the coroutine, not outside of it.

Using the exception to indicate that the coroutine failed to complete (like by rethrowing, as above), on the other hand, is still perfectly fine:

val loginResult = async {
  try {
    loginService.login(credentials)
  } catch (e: Exception) {
    if (e.isFailedLoginAttempt()) {
      analytics.logFailedLoginAttempt(e)
    }
    throw
  }
}

However, in a moment you’ll see that you cannot expect that your caller will be able to treat this as anything other than an uncontrolled failure. Coroutines that throw exceptions tend to tear down their peers.

Exceptions In Structured Concurrency

One of the core concepts that structured concurrency gives you is a hierarchy of ownership for failed coroutines: the “job hierarchy”. That hierarchy is built by launching coroutines within scopes.

With the foundations of coroutine exception propagation in our heads, let’s take a look at how it works in structured concurrency. The login service example above needs a coroutineScope around it, so let’s add it and see what happens:

coroutineScope {
  val loginResult = async {
    loginService.login(credentials)
  }

  launch {
    db.saveLoginToken(loginResult.await().token)
  }
  launch {
    sessionManager.startSession(loginResult.await().user)
  }
}

coroutineScope is built on top of join. When control flow reaches the end of coroutineScope, it ensures that all its child coroutines have finished running the same way that you or I would: by calling join on them.

And if your child coroutine terminates with an exception? join will throw.

coroutineScope treats a join throwing as a failure of the whole scope. When this happens, it immediately cancels all the other coroutines in the scope. This means that any uncaught exception in any child coroutine terminates the whole coroutineScope.

So say that you try and handle these exceptions on the await calls as described above:

coroutineScope {
  val loginResult = async {
    loginService.login(credentials)
  }

  launch {
    try {
      db.saveLoginToken(loginResult.await().token)
    } catch (e: Exception) {
      if (e.isFailedLoginAttempt()) {
        analytics.logFailedLoginAttempt(e)
      }
    }
  }
  launch {
    sessionManager.startSession(loginResult.await().user)
  }
}

When loginService.login throws an exception, its coroutine will finish with an exception. Its containing coroutineScope will receive it when it calls join and tear down all of its child coroutines. (This will happen even if nobody calls await.)

As a result the exception handling code above isn’t guaranteed to run: if its coroutine is torn down first by the dying coroutineScope, it won’t. Even if it does run, it’s racing against the impending cancellation: if it ever yields the coroutine, it won’t ever come back.

This is the default behavior, and it’s what you should design for when you write code that launches coroutines. As a result, it is unsafe to handle exceptions thrown by join and await. If you do, plan on your code being subject to cancellation.

SupervisorScope

Okay, that covers all of the important points about coroutines exceptions handling. Now let’s talk about SupervisorScope. TL;DR: don’t use it.

supervisorScope uses a SupervisorJob as its parent job instead of a regular Job. And SupervisorJob exists to do the opposite of coroutineScope: child coroutines of a SupervisorJob will not bring down their peer coroutines when they throw an exception. So e.g. if we change to a supervisorScope in the previous example:

supervisorScope {
  val loginResult = async {
    loginService.login(credentials)
  }

  launch {
    try {
      db.saveLoginToken(loginResult.await().token)
    } catch (e: Exception) {
      if (e.isFailedLoginAttempt()) {
        analytics.logFailedLoginAttempt(e)
      }
    }
  }
  launch {
    sessionManager.startSession(loginResult.await().user)
  }
}

…the exception handling code is guaranteed to run, and the enclosing scope will not throw an exception.

Why To Avoid SupervisorJob

In newly written coroutines code, usage of SupervisorJob or supervisorScope should be avoided:

In short, SupervisorScope changes exception propagation in a way that can be unexpected, and it doesn’t accomplish anything that can’t also be accomplished with coroutineScope. So keep things consistent and stick with coroutineScope.

TL;DR Takeaways

I wanted to give you takeaways, and nearly forgot to! So here they are: