Rx to Coroutines Concepts, Part 2.1: Exceptions
Part 1, Async vs. Reactive, Part 2, Structured Concurrency, Part 3, Deferred & Channels, Part 4, Cold Flows, Part 5, Shared Flows
When I wrote the last entry in this series, I thought I had covered the important points about exceptions in structured concurrent coroutines code.
It turns out that’s not the case! So here we are: entry 2.1 in the series. There won’t be much talk of RxJava in this post. Just talk about exceptions. I’ll start with the fundamentals, show how structured concurrent control flow is built on top of those fundamentals, and then try and arrive at some best practice takeaways.
Exceptions In Bare Coroutines
Let’s say I launch a coroutine and immediately throw an exception:
launch {
error(“I blew up!”)
}
We’ve got a coroutine that has blown up. What else should blow up when that happens?
For normal non-concurrent Java/Kotlin exception handling, we say “If a piece of code blows up, any code that depends on its successful completion also blows up.” Since Kotlin code is structured logic, that means that the exception will jump to exactly one place: to the nearest enclosing try/catch
block, or up the stack to the nearest catching try/catch
block.
fun functionA() {
// Block A
if (this) {
// Block B
if (that) {
functionB()
}
}
}
In other words, if functionB
blows up, that blows up block B, which blows up block A, which blows up functionA
. It’s like a chain of firecrackers: satisfyingly logical and destructive.
For a coroutine, the same logic applies: “If a piece of code blows up, any code that depends on its successful completion also blows up.”
Unlike plain Kotlin code, though, we don’t have a simple call stack. Multiple call sites can depend on successful completion of a coroutine:
val job = launch {
error(“I blew up!”)
}
launch {
println(“Waiting for job to finish...”)
job.join()
doSomeAdditionalWork()
}
launch {
println(“Waiting for job to finish...”)
job.join()
doMoreAdditionalWork()
}
A call to job.join()
waits for successful completion of the job. If the job throws an exception instead of completing successfully, any call to job.join()
will throw.
async
and await
work exactly the same way:
val loginResult = async {
loginService.login(credentials)
}
launch {
db.saveLoginToken(loginResult.await().token)
}
launch {
sessionManager.startSession(loginResult.await().user)
}
Both async
and launch
create a Job
. The only difference is that you can await
on the job you start when you call async
.
In either case, multiple call sites can await
or join
on the same coroutine, which means that multiple call sites can throw the same exception. This is different from the custody semantics we are familiar with from plain old Java, where every exception is handled by exactly one catch
block.
Avoid Catching Awaits and Joins
Because there are no clear owners to exceptions thrown by coroutines, it’s a bad idea to write logic around the exceptions thrown by await()
or join()
.
To see what I mean, take a look at the following attempt to add some analytics around failed login attempts:
launch {
try {
db.saveLoginToken(loginResult.await().token)
} catch (e: Exception) {
if (e.isFailedLoginAttempt()) {
analytics.logFailedLoginAttempt(e)
}
}
}
This seems okay. But is it, really? This same exception will also be thrown here:
launch {
sessionManager.startSession(loginResult.await().user)
}
It’s not entirely clear which await()
is the right place to put this code. Some misguided soul might add the same code in both locations, or put some code in one and some in the other.
Truth be told, neither call site is the right place to “handle” the exception, because neither of them owns the exception. If you wish to “handle” the error scenario represented by the exception, that code should live within the original coroutine:
val loginResult = async {
try {
loginService.login(credentials)
} catch (e: Exception) {
if (e.isFailedLoginAttempt()) {
analytics.logFailedLoginAttempt(e)
}
}
}
It’s only within the coroutine that you can maintain a structured guarantee that you’ll take action on that exception exactly once. So put that kind of exception handling within the coroutine, not outside of it.
Using the exception to indicate that the coroutine failed to complete (like by rethrowing, as above), on the other hand, is still perfectly fine:
val loginResult = async {
try {
loginService.login(credentials)
} catch (e: Exception) {
if (e.isFailedLoginAttempt()) {
analytics.logFailedLoginAttempt(e)
}
throw
}
}
However, in a moment you’ll see that you cannot expect that your caller will be able to treat this as anything other than an uncontrolled failure. Coroutines that throw exceptions tend to tear down their peers.
Exceptions In Structured Concurrency
One of the core concepts that structured concurrency gives you is a hierarchy of ownership for failed coroutines: the “job hierarchy”. That hierarchy is built by launching coroutines within scopes.
With the foundations of coroutine exception propagation in our heads, let’s take a look at how it works in structured concurrency. The login service example above needs a coroutineScope
around it, so let’s add it and see what happens:
coroutineScope {
val loginResult = async {
loginService.login(credentials)
}
launch {
db.saveLoginToken(loginResult.await().token)
}
launch {
sessionManager.startSession(loginResult.await().user)
}
}
coroutineScope
is built on top of join
. When control flow reaches the end of coroutineScope
, it ensures that all its child coroutines have finished running the same way that you or I would: by calling join
on them.
And if your child coroutine terminates with an exception? join
will throw.
coroutineScope
treats a join
throwing as a failure of the whole scope. When this happens, it immediately cancels all the other coroutines in the scope. This means that any uncaught exception in any child coroutine terminates the whole coroutineScope.
So say that you try and handle these exceptions on the await
calls as described above:
coroutineScope {
val loginResult = async {
loginService.login(credentials)
}
launch {
try {
db.saveLoginToken(loginResult.await().token)
} catch (e: Exception) {
if (e.isFailedLoginAttempt()) {
analytics.logFailedLoginAttempt(e)
}
}
}
launch {
sessionManager.startSession(loginResult.await().user)
}
}
When loginService.login
throws an exception, its coroutine will finish with an exception. Its containing coroutineScope
will receive it when it calls join
and tear down all of its child coroutines. (This will happen even if nobody calls await
.)
As a result the exception handling code above isn’t guaranteed to run: if its coroutine is torn down first by the dying coroutineScope
, it won’t. Even if it does run, it’s racing against the impending cancellation: if it ever yields the coroutine, it won’t ever come back.
This is the default behavior, and it’s what you should design for when you write code that launches coroutines. As a result, it is unsafe to handle exceptions thrown by join
and await
. If you do, plan on your code being subject to cancellation.
SupervisorScope
Okay, that covers all of the important points about coroutines exceptions handling. Now let’s talk about SupervisorScope
. TL;DR: don’t use it.
supervisorScope
uses a SupervisorJob
as its parent job instead of a regular Job
. And SupervisorJob
exists to do the opposite of coroutineScope
: child coroutines of a SupervisorJob
will not bring down their peer coroutines when they throw an exception.
So e.g. if we change to a supervisorScope
in the previous example:
supervisorScope {
val loginResult = async {
loginService.login(credentials)
}
launch {
try {
db.saveLoginToken(loginResult.await().token)
} catch (e: Exception) {
if (e.isFailedLoginAttempt()) {
analytics.logFailedLoginAttempt(e)
}
}
}
launch {
sessionManager.startSession(loginResult.await().user)
}
}
…the exception handling code is guaranteed to run, and the enclosing scope will not throw an exception.
Why To Avoid SupervisorJob
In newly written coroutines code, usage of SupervisorJob
or supervisorScope
should be avoided:
- It is often used to patch over sloppy exception handling
coroutineScope
is more flexible thansupervisorScope
: If you’re launching your own coroutines, you can achieve whatsupervisorScope
does by catching those exceptions before they bring down the coroutine. But if you’ve got asupervisorScope
, you can’t achieve whatcoroutineScope
does- If you’re writing code that takes in a scope as a parameter, you don’t know whether it’s a
supervisorScope
or acoroutineScope
. Since you don’t know, you have to assume that it’s acoroutineScope
anyway to defend against aggressive teardown when a peer coroutine throws. So why not just usecoroutineScope
everywhere?
In short, SupervisorScope
changes exception propagation in a way that can be unexpected, and it doesn’t accomplish anything that can’t also be accomplished with coroutineScope
. So keep things consistent and stick with coroutineScope
.
TL;DR Takeaways
I wanted to give you takeaways, and nearly forgot to! So here they are:
- Don’t expect to be able to recover from exceptions thrown by a
join
orawait
. They will tear down their enclosingCoroutineScope
, including your exception handling code. - Wrap exceptions from
async
jobs in wrappers, e.g. a sealed class withSuccess
/Failure
options. - Avoid using
SupervisorJob
to work around this behavior. It’s not what you want.