Rx to Coroutines Concepts, Part 4: Cold Flows

Part 1, Async vs. Reactive, Part 2, Structured Concurrency, Part 2.1, Exceptions, Part 3, Deferred & Channels

In this post, we’re finally going to start to address the main tool used in Kotlin coroutines to represent asynchronous sequences of data: the Flow.

But before I get going on this topic, I should briefly address the elephant in the room: in a series about migrating from RxJava to Coroutines, I’ve avoided covering Flow until now, four posts in. Why?

When I started, all I knew was that I wanted to show how to migrate from the reactive paradigm to the structured concurrent paradigm. It is easy to blindly migrate from one reactive toolkit (RxJava) to another (Flow), but doing so misses a huge opportunity. So my only thought was, “I can’t start with Flow. If I do, coroutines will just look like another reactive toolkit.”

With the benefit of hindsight, the reason is more clear: Flow is built on Kotlin’s coroutine-based concurrency, just like how RxJava is built on vanilla Java concurrency. Trying to understand Flow first will result in a lot of confusion. So if you’ve skipped to this post to get to the “meat” of the series, you may find it less meaty than you expect.

RxJava: The Observable

Okay, enough of that. Let’s get into it by reviewing RxJava one more time.

In RxJava, all asynchronous sequences of data are represented the same way: by the Observable.

val subscription = featureFlagManager
  .values(FeatureFlagManager.FeatureFlag.BtcDynamicLimits)
  .doOnNext { println("Received an item in the stream: $it") }
  .doOnComplete { println("Stream completed") }
  .doOnError { println("Stream errored") }
  .subscribe()

At its heart, an Observable is a stream of events: payload objects, error events, and completion events. The stream will either complete or error, but it shouldn’t do both. (This is one of the tricky bits of RxJava: it’s easy to build an Observable that violates this rule if you use the wrong tools.)

Those events are always accessed through callbacks — usually (but not always) asynchronous callbacks. You subscribe to the observable, and at a later time, those callbacks are invoked. This means that subscribing to an observable will often implicitly add concurrency to your code. Like this:

observable1.subscribe {
  println("Current value for stream 1: $it")
}
observable2.subscribe {
  println("Current value for stream 2: $it")
}

They may be running on the same thread, but these two feature flag monitors are both conceptually running at the same time, adding concurrency to your app.

Observables can be either “hot” or “cold” — a “cold” observable only starts when you subscribe to it. Like this:

// Print out 1, 2, 3
Observable.fromArray("1", "2", "3").subscribe { println(it) }

…and a “hot” observable like a PublishSubject is running whether you subscribe to it or not:

// Print out 1, 2, 3
val publishSubject = PublishSubject<String>()
// Publish subject is "running" here, even though no one 
// is subscribed
publishSubject.onNext("not printed")

publishSubject.subscribe { println(it) }
publishSubject.onNext("1")
publishSubject.onNext("2")
publishSubject.onNext("3")

There’s a lot more going on with RxJava, but those are the foundational concepts to understand. And as we’ll see in a moment, *none* of them are true for a Flow, even though Flow does basically the same job as RxJava. Weird!

Inline Callbacks

If you’ve read the previous posts in this series, you may realize that this callback subscription approach cannot work for coroutines. Callbacks are bad style in structured concurrent coroutines code. There is one key exception, though: inline callbacks.

By “inline callback,” I don’t necessarily mean a callback that is inlined at compile time. I mean a callback that is invoked before the method returns. You have used callbacks like these if you’ve ever used forEach:

listOf(1, 2, 3).forEach { item ->
  println("Check it out: $item")
}

The code to call println here is a callback, but it doesn’t break control flow. It’s run inside of forEach, and when forEach returns, the callback won’t be called anymore.

Let’s take that forEach method and put it on an interface. And let’s also add the suspend keyword to everything — the callback, and the method itself:

interface Producer<T> {
  suspend fun forEach(callback: suspend (T) -> Unit)
}

Now let’s create a simple implementation of that interface that imitates listOf(1, 2, 3). Calling forEach should emit the numbers 1, 2, 3, just like listOf(1, 2, 3):

val producer = object: Producer<Int> {
  override suspend fun forEach(callback: suspend (Int) -> Unit) {
    callback(1)
    callback(2)
    callback(3)
  }
}

To use it, call forEach with a callback, just like the Iterable.forEach extension method, and you will get the same results:

producer.forEach { value ->
  println("Received: $value")
}

This little abstraction is pretty powerful. Because everything is marked suspend, it is asynchronous. You can add a delay in between each item:

val producerWithDelays = object: Producer<Int> {
  override suspend fun forEach(callback: suspend (Int) -> Unit) {
    callback(1)
    delay(1000)
    callback(2)
    delay(1000)
    callback(3)
  }
}

Or transform an existing producer, adding new values — say, by interpolating between values like this:

val interpolator = object: Producer<Float> {
  override suspend fun forEach(callback: suspend (Float) -> Unit) {
    var prev: Float? = null
    producer.forEach { next ->
      val nextFloat = next.toFloat()
      if (prev != null) {
        callback((nextFloat + prev) / 2)
      }
      callback(nextFloat)
      prev = nextFloat
  }
}

It can represent everything an Observable can represent:

It’s simpler than an Observable, because there’s no need for “completion” or “error” events. And if the Flow implementation is structured concurrent (as it should be), then it also cannot introduce concurrency like an Observable can.

The Flow API

That idea is the foundation for the Flow API. “A suspending forEach” works pretty well as a mental model for Flow.

Flow includes a bit more than that, though. Let’s dive into the weeds to see what makes Flow different. Here’s the core Flow API:

interface Flow<out T> {
    @InternalCoroutinesApi
    suspend fun collect(collector: FlowCollector<T>)
}

interface FlowCollector<in T> {
    suspend fun emit(value: T)
}

suspend inline fun <T> Flow<T>.collect(
  crossinline action: suspend (value: T) -> Unit): Unit

With this API, you always call the Flow<T>.collect extension method, never the collect method on the interface.

You never implement the interface yourself, either. (That’s why it’s marked @InternalCoroutinesApi.) Instead, you use functions that construct Flow instances for you. Functions like flow:

val producer = flow {
  emit(1)
  emit(2)
  emit(3)
}

And that’s pretty much the API.

There is one important difference to know between this and Observable, or any other non-coroutines API: calls to Flow.collect are always sequential, never concurrent.

This may make sense if you’ve followed along with this series so far: structured concurrent coroutines code never introduces concurrency without launching a coroutine into a scope. You can call a Flow concurrently with other code, or use concurrency under the hood of a Flow, but it’s not baked into the API and never happens automatically. Concurrency is orthogonal to Flow: like a for loop, collect can be run concurrently or not. It doesn’t care, and won’t force you in either direction.

So in the following code:

flow1.collect {
  println("Current value for stream 1: $it")
}
flow2.collect {
  println("Current value for stream 2: $it")
}

…unlike the Rx example above, the values for flow2 will not start printing until every value in flow1 has been collected. (Just like a for loop.)

To add concurrency and get the equivalent of RxJava’s subscribe, use the tools covered previously in this series: CoroutineScope, and launch.

launch {
  flow.collect {
    println("Received item: $it")
  }
}

The convenience function Flow<T>.launchIn(CoroutineScope) will launch and collect in a single call. I recommend avoiding it because I like separating the concurrency out into its own launch call: combining the two into a single asynchronous stream of events pushes my code towards a reactive style of programming that I’ve been trying to learn to live without as much as I can.

Flow Invariants

The flow function from above is wonderful. It has a simple API, and in most scenarios it is simple. It’s the most basic tool: wish some thought, flow can be used to build any kind of valid cold Flow. (It might not be the simplest choice, though.)

However: flow is not simply a convenience notation for writing object: Flow { ... }. Valid Flow implementations also abide by additional guarantees (or invariants). These invariants are all valuable and necessary to make collect work the way forEach does.

The type system can’t enforce these invariants, though. So flow includes runtime guardrails to detect some (but not all) violations of these invariants. Knowing the guardrails is okay, but the only way to be truly safe is to know and abide by the invariants themselves. So here they are, to put under your pillow:

Let’s go over each one.

Exception transparency. When you collect on a Flow, your code might throw an exception. Like this:

states.collect { value ->
  println("viewModel translation: ${value!!.toViewModel()}")
}

If value is null (which it can be, unlike in RxJava), value!! will throw a NullPointerException. The exception transparency invariant says that when this happens, that NullPointerException will bubble up and be thrown by states.collect.

How would you violate that guarantee? Like this:

val states: Flow<State?> = flow {
  try {
    emit(null)
  } catch (e: Exception) {
    println("Whew! We’re safe.")
  }
  emit(State.Default)
}

states.collect { value ->
  println("viewModel translation: ${value!!.toViewModel()}")
}

When the exception is thrown by value!! in this version, it is caught inside the implementation of states. The exception never makes it to collect. That’s a violation of exception transparency.

flow can detect this error, but not always. I’ll leave those details of when as an exercise to the reader.

Context preservation. When you collect a Flow, the code inside collect should run in the same context as the flow outside collect. So when you write code like this:

withContext(Dispatchers.Default) {
  states.collect {
    println("Current state: $it")
  }
}

The println above should run on Dispatchers.Default, just like a forEach would. But if states were written like this:

val states = flow {
  withContext(Dispatchers.Main) {
    emit(State.Default)
  }
}

…it won’t. (In this case, it won’t even run: flow will detect the invariant violation and throw an exception.)

Sequentiality. When you collect a Flow, your callback should be run sequentially, not concurrently. It should never be the case that two callback invocations are running at the same time.

So if you wrote this code:

withContext(Dispatchers.Default) {
  states.collect {
    println("Collecting a state")
    delay(10)
    println("Ready for next state")
  }
}

…you should be able to rely on the fact that Collecting a state will come first, then Ready for next state, then Collecting a state, and so on.

You can violate this invariant by emitting from multiple coroutines simultaneously:

val states = flow {
  launch {
    emit(State(1))
  }
  launch {
    emit(State(2))
  }
}

Both states would be emitted concurrently, resulting in two invocations of the collect body possibly running at the same time. Two printouts of Collecting a state would occur, and then two printouts of Ready for next state. (Once again, this simple example won’t run: flow’s guardrails will break it.)

Other Flow construction tools

For simple flows, flow is the best and most transparent tool. It should be your first choice for constructing a Flow. But for more sophisticated flows that launch their own coroutines or interact with external callbacks, other tools are better:

Flow Transformation Techniques

In RxJava, transforming a flow means only one thing: using an operator. Under the hood, the operator constructs a new Observable for you with the desired properties. Building an observable is not easy, so “Observable transformation techniques” just means “learn your operators”.

That’s not the case with Flow. Flow also has a rich operator set (which you can read about in the reference documentation), but the techniques used to implement those operators are within the grasp of the average engineer.

So: let’s get to know them. This section is a bit of a cookbook of code examples, many of them simplified versions of implementations from Kotlin’s library. So don’t worry if you feel like you need to return to some examples later, or if they take several readthroughs to understand.

Collect And Emit

The first one is the simplest: using flow or channelFlow, call collect on a source Flow and emit items in a new and different way.

Using this technique with a Flow as a receiver is an easy way to write your own operators. Here’s a toy example that transforms a flow of Int into a sequence of Unit emissions:

fun Flow<Int>.toUnit() = flow {
  collect { number ->
    repeat(number) { emit(Unit) }
  }
}

This is how map works under the hood, as well as the even more useful Flow-specific transform operator.

Collate

To produce the items from a Flow, you must call collect on it. That means that to monitor multiple flows at the same time, or run logic in parallel with a Flow, you must collect that Flow in its own coroutine. To get the results out, you then collate them in a Channel.

merge and combine are implemented this way:

fun <T> merge(a: Flow<T>, b: Flow<T>): Flow<T> = flow {
  val combined = Channel<T>()
  coroutineScope {
    launch {
      a.collect { combined.send(it) }
    }
    launch {
      b.collect { combined.send(it) }
    }
    combined.consumeAsFlow().collect { emit(it) }
  }
}

Collating is a powerful tool. In fact, channelFlow is essentially a collating tool under the hood: it creates and consumes combined for you, saving a bit of code. Most examples of the collating technique can be written more simply with channelFlow.

Collating is also used to implement flowOn, the operator that runs a Flow on a different CoroutineContext:

fun <T> Flow<T>.flowOn(context: CoroutineContext) = channelFlow {
  coroutineScope {
    launch (context) { collect { send(it) } }
  }
}

Here’s an even more advanced example: throttleLatest (which isn’t currently part of the standard library) can be implemented with the same technique.

fun <T> Flow<T>.throttleLatest(
  timeoutMillis: Long,
): Flow<T> = channelFlow {
  coroutineScope {
    val nextValue = Channel<T>(CONFLATED)

    val sender = launch {
      while (true) {
        send(nextValue.receive())
        delay(timeoutMillis)
      }
    }

    collect { value ->
      nextValue.send(value)
    }
    sender.cancel()
  }
}

Here the Channel being collected is CONFLATED, so if it receives multiple elements during the delay, it will drop all except the most recent one. (Compare to RxJava’s version if you want to see how much nicer this makes things.)

flattenMerge, which flatMapMerge is built on, uses collation to merge coroutines in parallel. The full implementation adds useful guardrails and uses some shared code, but this is what it is doing at heart:

suspend fun <T> Flow<Flow<T>>.flattenMerge() = channelFlow {
  coroutineScope {
    collect { subFlow ->
      launch {
        subFlow.collect { send(it) }
      }
    }
  }
}

Programmatic Cancellation

The last technique is a fun one: you can collect a value, start a coroutine that operates on that value, and then programmatically cancel it when some new value comes in.

This is handy in lots of real world situations. Say, for example, that you have a tool that polls a service periodically (like a service exposing Boosts), and a Flow that tells you whether you should be polling or not:

val shouldBePolling: Flow<Boolean>

suspend fun pollForBoosts() {
  while(true) {
    try {
      currentBoosts.value = service.getLatestBoosts().boosts
    } catch (e: Exception) {
      Timber.e(e, "Unable to fetch boosts")
    }
    delay(TimeUnit.MINUTES.toMillis(5)
  }
}

We can run a coroutine that calls pollForBoosts(), and cancel it when it should stop running. Like this:

var pollingJob: Job? = null
shouldBePolling.distinctUntilChanged().collect { isPolling ->
  pollingJob?.cancel()
  if (isPolling) {
    pollingJob = launch { pollForBoosts() }
  }
}

This is programmatic cancellation. Operators suffixed with *Latest (like collectLatest, or mapLatest) use programmatic cancellation under the hood to achieve this same effect:

shouldBePolling.distinctUntilChanged().collectLatest { isPolling ->
  if (isPolling) {
    pollForBoosts()
  }
}

So do Compose-based tools like Compose UI and Molecule:

@Compose
override fun models(events: Flow<Event>): ViewModel {
  val isRunning by remember { mutableStateOf(false) }
  LaunchedEffect(events) {
    events.collect { event ->
      when(event) {
        is SubmitClicked -> isRunning = true
        is CancelClicked -> isRunning = false
      }
    }
  }
  if (isRunning) {
    LaunchedEffect("submit result") {
      appService.submit(args.result)
    }
  }
  return ViewModel(prompt = args.prompt)
}

While isRunning is true, LaunchedEffect("submit result") will run a coroutine. But when it is false, the running effect will be cancelled.

These effects are analogous to RxJava’s switchMap operator, but not identical. It’s well worth reading up on Kotlin cooperative cancellation (I didn’t go into much detail on it earlier in the series) to use these tools effectively. Here’s a quick tl;dr on topic, though:

DIY: You Can If You Need To

These examples are here to demystify Flow: these implementations are not complex, and you really can write the code that does the same thing as these basic operators. In practice, the standard library implementations of these usually have little features that can help make code safer and more efficient (like fusibility) that an implementation written from scratch won’t have.

However, if it’s a choice between using a complicated string of operators and writing the same thing with a single transform or flow, it is better to avoid the complicated string of operators. Straightline logic is easier to read than operator chaining.

A Dark Secret: Non-Flow Async Values

It’s not a story the Jedi would tell you. However…

If you don’t like the flow invariants, and you can safely protect against flow invariant violations, you don’t have to create a Flow. You can use a bare suspend (T)->Unit inline callback API instead.

Now, the flow invariants exist for a very good reason. But at Cash App, we have used the following signature for coroutines presenters:

suspend fun produceModels(
  events: Flow<ViewEvent>,
  emit: suspend (ViewModel)->Unit,
)

When you call produceModels, you have to keep in mind that those flow invariants may be broken. models can be invoked from any coroutine on any context, and may eat exceptions. We found that loosening those restrictions made presenters simpler to write. And since presenters were consumed by shared tooling, this choice made a lot of sense.

Channels Versus Flows

Before I wrap this up, I want to talk about channels one last time. Now that we’ve covered Flow, I can show how they relate to one another:

A Flow is an asynchronous stream of values as a data abstraction: that is, a Flow instance represents a stream of values that any component can read. But a Channel is an asynchronous stream of values as a communication tool: that is, it represents a stream of values that one coroutine is sending to another.

Communication is how concurrent code is written in coroutines. It is what you use instead of synchronized, Mutex, AtomicReference, and all the other Java tools you may have learned to make warding signs against. This post has quite a few examples of concurrent code; not one of them uses anything other than Channel to coordinate that work.

There’s a famous saying in the golang community: Do not communicate by sharing memory; share memory by communicating. Their example uses low level shared memory, but the same lesson applies elsewhere: in coroutines, tough concurrent reasoning can be made simple and clear by using channels instead of locking or synchronized blocks. Solutions that use tools like synchronized, Mutex, and AtomicReference to share memory instead tend to be longer, more brittle, and more difficult to understand.

So channels and flows are often working together: if a Flow is doing something concurrent, a Channel under the hood is almost always used to do it. With a little thought, you may be able to write the implementation yourself.

Next Stop: Shared Flows

In this post, we covered the foundations of the Flow API: how to use suspend callbacks to model an asynchronous stream of events, and the different flow invariants that make those callbacks safe to call into. We also glossed over the tools Flow gives you to transform any flow, and talked a bit about how channels and flows play together.

In the next post, we’ll talk about the equivalent of “hot” observables in Flow: shared flows.