Sharp Edges in Kotlin Coroutines Testing Tools

At Cash App, we’re starting to integrate Kotlin coroutines into our app architecture. My coworker Bill, who has taken interest and ownership in this migration, came to me the other day with a problem that left the both of us scratching our heads in confusion. I’d like to tell you about what we discovered.

We use the Model, View, Presenter (MVP) architecture. Some presenters are RxJava-based and some are coroutines-based, but they all communicate with the UI using a common presenter API. This is the interface our coroutines presenters implement:

interface CoroutinePresenter<UiModel : Any, UiEvent : Any> {
  suspend fun produceModels(
    events: Flow<UiEvent>,
    models: FlowCollector<UiModel>
  )
}

Here’s a simplified version of the code that bridges it to the UI layer:

fun CoroutinePresenter.start(
  coroutineContext: CoroutineContext = Dispatchers.Main.immediate,
  events: Flow<UiEvent>,
  modelsCallback: (UiModel) -> Unit
) {
  val scope: CoroutineScope = ...

  scope.launch {
    produceModels(
      events,
      object : FlowCollector<UiModel> {
        override suspend fun emit(value: UiModel) {
          modelsCallback(value)
        }
      }
    )
  }
}

There’s nothing magical going on here. We launch a new coroutine which is responsible for sending model data to our UI. Because our presenters communicate with our UI, we must be on Android’s main thread when we send data from presenter to UI. Bill discovered this wasn’t always true, and attempted to fix it.

The fix seemed easy enough – just invoke the callback in withContext!

        // ...
        override suspend fun emit(value: UiModel) {
          withContext(coroutineContext) {
            modelsCallback(value)
          }
        }

Bill wrote a test, and it failed! He brought me in to investigate:

The Problem

@Test fun modelEmissionsUseCallingDispatcher() {
  runBlocking(TestCoroutineDispatcher()) {
    val coroutinePresenter = object : CoroutinePresenter<MyEvent, String> {
      override suspend fun produceModels(events: Flow<MyEvent>, models: FlowCollector<String>) {
        withContext(newSingleThreadContext("a background thread")) {
          models.emit("my model")
        }
      }
    }

    val outerThread = Thread.currentThread()
    var innerThread: Thread? = null
    val signal = Channel<String>()
    coroutinePresenter.start(coroutineContext, emptyFlow()) { value: String ->
      innerThread = Thread.currentThread()
      signal.trySend(value)    
    }

    withTimeout(1000) { 
      assertThat(signal.receive()).isEqualTo("my model") 
    }


    assertThat(innerThread).isEqualTo(outerThread)
  }
}

The entrypoint to our test is the call to coroutinePresenter.start which launches the coroutine to produce models. It also registers a lambda that gets invoked on every model emission. Once our presenter coroutine is running, it calls produceModels, which then switches threads and emits the string "my model". Control is passed back to the start lambda where we capture the inner thread that the model callback was run on, and send the model value to the signal channel. Then we compare the captured innerThread to outerThread, which was recorded prior to this execution flow.

We expected the threads gathered to be equal, but instead we’re failing with the following difference.

expected: Thread[main @coroutine#1,5,main]
but was : Thread[test,5,main]

After several hours of debugging and several re-reads of some choice coroutines documentation, we isolated the problem.

Narrowing it down

@Test fun isolate() = runBlocking(TestCoroutineDispatcher()) {
  val job = launch {
    val inner = withContext(newSingleThreadContext("test")) {
      Thread.currentThread()
    }
    val outer = Thread.currentThread()
    assertThat(outer).isNotEqualTo(inner)
  }
  job.join()
}

This test inverts the problem. Instead of asserting that withContext changes off the original thread, we assert that when control flow exits withContext execution returns to the original thread. Our inverted assertion still fails because outer and inner end up being the same thread — once we switch to the single threaded context, we never switch off of that thread.

That sounded familiar? After asking around I found myself reading the docs of Dispatchers.Unconfined:

A coroutine dispatcher that is not confined to any specific thread. It executes the initial continuation of a coroutine in the current call-frame and lets the coroutine resume in whatever thread that is used by the corresponding suspending function, without mandating any specific threading policy.

Replacing TestCoroutineDispatcher with Dispatchers.Unconfined in both the isolated and original test cases reproduced the errors we saw before. It turns out their threading policies are similar. Bill and I studied the TestCoroutineDispatcher documentation yet again:

By default, TestCoroutineDispatcher is immediate. That means any tasks scheduled to be run without delay are immediately executed. If they were scheduled with a delay, the virtual clock-time must be advanced via one of the methods on DelayController.

This documentation originally led us to believe that “immediate” referred to the ability for tasks to automatically execute or for the ability to manually control the dispatcher state machine via utilities like DelayController.pauseDispatcher.

We were wrong.

“Immediate” in the context of coroutine dispatchers refers to avoiding CoroutineDispatcher.dispatch calls, and instead executing tasks immediately in whatever the heck thread we’re currently on! For example, Dispatchers.Main.immediate will only dispatch tasks when the calling thread is not the main thread. Otherwise, it will “immediately” execute the current task. The unconfined dispatcher is similar, except that it will always execute tasks “immediately”.

Takeaways

Generally, avoid making assertions on regular threads when verifying coroutines implementations. Not all dispatchers are guaranteed to run on a particular thread! Instead, you should first try to make assertions against CoroutineDispatcher. This can be done by grabbing the current dispatcher from a coroutine context.

assertThat(currentCoroutineContext()[CoroutineDispatcher.Key]).isEqualTo(Dispatchers.Main.immediate)

If a CoroutineContext is unavailable and makes this impossible (like within our start lambda), make sure to avoid immediate dispatchers like TestCoroutineDispatcher during Thread assertions. For us, a simple runBlocking fixed the issue.

Special thanks to Bill Phillips and Uli Luckas who both helped work through this issue.