Rx to Coroutines Concepts, Part 1

Part 2, Structured Concurrency, Part 2.1, Exceptions, Part 3, Deferred & Channels, Part 4, Cold Flows Part 5, Shared Flows_

Our Android codebase at Cash App is thoroughly invested in the strange and wonderful land of RxJava. Almost all of our presenter layer and a fair amount of code outside of it is written entirely in Observables and compositions of Observables.

By and large, this is great. Writing code in a reactive paradigm has given us superpowers that we can’t give up. Whether we’re wiring up a toggle in our debug drawer, a changing user interface element, a value written to SharedPreferences or a value pulled from a customer’s sync’d profile, for the most part everything Just Works™ without us doing any additional work to make it so or worrying about what thread a value was emitted on.

This is an incredible superpower. If you haven’t experienced it already, you’re missing out. And if it’s part of your day-to-day work, no doubt you’d rather not go back to the days when it wasn’t. Nothing beats a tool that gives you useful functionality for free, and that’s what the reactive paradigm does.

There is a cost, though. The price we paid was that we had to stop writing natural code, and switch to expressing ourselves in Rx-ese.

How things used to work

Let me show you what I mean. Here is a simple example of some straightline logic adapted from our codebase:

viewModel.accept(viewModel.value!!.copy(isLoading = true, isError = false))
val result = aliasRegistrar.register(registerArgs)
if (result is AliasRegistrar.Result.Successful) {
  val sms = alias.redact()
  val nextData = args.blockersData
    .copy(
      flowToken = flowToken,
      smsNumber = sms
    )
    .updateFromResponseContext(result.responseContext!!)
  goTo.accept(blockersNavigator.getNext(args, nextData))
}

This is straightline code, just as we would have written when we first learned to program imperatively. There are no callbacks or threads, just blocking requests.

This is also why this code isn’t written this way in Cash App’s codebase, nor can it be written this way in any UX. The call to viewModel.accept drives an immutable viewModel instance into the View, updating the user interface, while the call to aliasRegistrar.register(registerArgs) is a blocking call to a web service. Running both of these on the same thread isn’t feasible: we’ll either update the UI from the background thread and explode, or run a web request on the UI thread and hang.

This is such old news that I hate recapitulating it, but it’s worth saying: if this worked, we’d still write code this way.

But it doesn’t. This is why we started write code in callbacks instead.

Preemptibility: The Hero’s Call

This failure of straightline code to even work is our call to write code that is preemptible instead of greedy with respect to computational resources.

Here’s what I mean: The old way of writing code is greedy because when you run it, it uses all of the thread you run it in to do its job. It starts at the start, and it continues on to the end, no matter what.

Preemptible code tries to be the opposite: instead of using all of the thread you give it, it tries to get out of the way so that you can use the thread for other things. That’s what the humble callback does:

viewModel.accept(viewModel.value!!.copy(isLoading = true, isError = false))
aliasRegistrar.register(registerArgs) { result ->
  if (result is AliasRegistrar.Result.Successful) {
    val sms = alias.redact()
    val nextData = args.blockersData
      .copy(
        flowToken = flowToken,
        smsNumber = sms
      )
      .updateFromResponseContext(result.responseContext!!)
    goTo.accept(blockersNavigator.getNext(args, nextData))
  }
}

The callback works because it gives the programmer two important capabilities:

You can choose which threading resources to use. The humble callback doesn’t make those resources explicit or parameterizable, but it does let you say, “Hey, run this registration on your background thread and get back to me on my current thread.” You can split up your code into pieces that can be run one at a time.

It solves the problem, but it’s not pretty, especially when chaining multiple callbacks together. And each callback API has to independently expose how to schedule the work and assign it to a thread.

Translating to Rx, the High Elvish of Callbacks

RxJava and other reactive frameworks emerged out of the functional programming world, but in the Android context they operate more like a sort of uber-callback tool. Once the solution is expressed as a reactive component like an Observable or Single

aliasRegistrar.register(registerArgs).subscribe { result -> 
  ...
}

…one can take this uber-callback and chain on operators or other callbacks to do all kinds of things. The data flowing through the callback can be transformed, side effects can be triggered, additional callbacks can be triggered — it’s a whole language, a data transformation High Elvish that we can translate the original example into:

disposables += aliasRegistrar.register(registerArgs)
  .maybeUntil(signOut)
  .subscribeOn(backgroundScheduler)
  .toObservable()
  .startWith(AliasRegistrar.Result.Loading)
  .errorHandlingSubscribe { result ->
    when (result) {
      is AliasRegistrar.Result.Loading -> {
        viewModel.accept(viewModel.value!!.copy(isLoading = true, isError = false))
      }
      is AliasRegistrar.Result.Successful -> {
        val sms = alias.redact()
        val nextData = args.blockersData
          .copy(
            flowToken = flowToken,
            smsNumber = sms
          )
          .updateFromResponseContext(result.responseContext!!)
        goTo.accept(blockersNavigator.getNext(args, nextData))
      }
      else -> {

      }
    }
  }

The grammar of this example is different: the subject is usually not, say, the response, but a stream of data containing the response. To the uninitiated it may seem unnatural, but the strange powers it grants are irresistable. (And fun.) Namely, it does the two things callbacks let you do: it lets you split up your code into independently schedulable morsels, and it lets you assign those morsels to a particular thread. Delicious.

Work for a long time in RxJava, and eventually one starts to think in this grammar. It’s easy to forget that it doesn’t correspond to how any of us would describe the operation of the machine to a lay person, or to most programmers. If someone asks what the linear control flow of your reactive presenter is, you are better off encouraging them to rethink their question in terms of reactive streams than giving a straight answer. Because if you do, that answer might look like this:

Don’t think about Rx this way! It’s a bad idea. Unfortunately, this way of thinking is the best fit for many real world tasks

Returning to the Common Tongue With Coroutines

RxJava converts like me often zero in on Kotlins’ reactive Flow tools, which allow one to go on speaking foreign grammar. But coroutines allow you to discard it and write what is essentially straight up Kotlin code. We could replicate the original example like this:

val scope = CoroutineScope(dispatchers.io)
scope.launch {
  viewModel.accept(viewModel.value!!.copy(isLoading = true, isError = false))
  val result = aliasRegistrar.register(registerArgs)
  if (result is AliasRegistrar.Result.Successful) {
    val sms = alias.redact()
    val nextData = args.blockersData
      .copy(
        flowToken = flowToken,
        smsNumber = sms
      )
      .updateFromResponseContext(result.responseContext!!)
    goTo.accept(blockersNavigator.getNext(args, nextData))
  }
}
disposables += signOut.errorHandlingSubscribe { scope.cancel() }

We retain the same grammar as the original example, and while we could use Flow to express this reactively we are not required to as we were in RxJava. We don’t have to give up our superpowers to do it, either: we can assign work to different threads (with different CoroutineDispatchers, or wrapping a section of work in withContext), and we can split up our work into little morsels of – hold up, where are our morsels?

This is why coroutines had to be a language feature rather than a library like RxJava: the compiler morselizes your code based on the methods it calls. aliasRegistrar.register(registerArgs) is a suspend fun in the example above. Being suspend signals to the compiler that it should split your method before and after that method call, creating a morsel on either side. (See Coroutines: Suspending State Machines for all the details on how that happens.)

How big a deal is it, really?

The wins laid out here are mostly in clear expression and comprehension. If nothing else, never having to explain the difference between subscribeOn and observeOn would save a lot of heartache in the future. But the ability to clearly express how a system works in code is invaluable, and RxJava denies it to us by forcing us into a reactive paradigm.

There are other benefits to coroutines, too. But that’s for a later post.