SoFunction
Updated on 2025-04-10

Introduction to the Fusion and Channel Capacity and Overflow Policy of Flow in Kotlin Coroutine Development

1. Communication between coroutines

When communication between coroutines is required, the Channel method can be called to create an object pointed to by the Channel interface, and send and receive messages by calling the send method and receive method of the object. The implementation of the Channel interface by coroutines is essentially similar to blocking queues, so I will not repeat it here.

1. Channel capacity

In fact, the send method and the receive method are not defined in the Channel interface, but are defined in the SendChannel interface and the ReceiveChannel interface respectively. The Channel interface only defines some enumeration constants related to the Channel capacity strategy, and the code is as follows:

//Inherit SendChannel interface and ReceiveChannel interfacepublic interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
    // Enumeration constants    public companion object Factory {
        // The capacity of Channel is unlimited        public const val UNLIMITED: Int = Int.MAX_VALUE
        // The capacity of Channel is 0, no cache        public const val RENDEZVOUS: Int = 0
        // The capacity of Channel is 1, and the overflow policy is DROP_OLDEST.        // The next data will overwrite the previous data        public const val CONFLATED: Int = -1
        // The capacity of Channel is the default value CHANNEL_DEFAULT_CAPACITY,        // The default overflow policy is SUSPEND, and the send method will hang        // When the capacity policy is BUFFERED and the overflow policy is not SUSPEND, the capacity of the Channel is 1        public const val BUFFERED: Int = -2
        // A default enum value used internally by coroutines is not exposed to the outside world        internal const val OPTIONAL_CHANNEL = -3
        // Default value for manually configuring the capacity policy to BUFFERED        public const val DEFAULT_BUFFER_PROPERTY_NAME: String = ""
        // The default value when the capacity policy is BUFFERED        // Default 64, minimum 1, maximum is Int.MAX_VALUE-1        internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
            64, 1, UNLIMITED - 1
        )
    }
}

From the above code, we can see that the Channel interface inherits from the SendChannel interface and the ReceiveChannel interface. Therefore, an object pointed to by a Channel interface can be used to send messages or receive messages.

2. Overflow strategy

In addition to capacity policies, Channel also has an overflow policy, which is used to determine the behavior when the Channel's capacity is full and the next message arrives. The overflow policy is defined in the enumeration class BufferOverflow, and the code is as follows:

public enum class BufferOverflow {
    // When the capacity is full, suspend the coroutine that calls the send method    SUSPEND,
    // When the capacity is full, delete the old data, add new data, and do not suspend the coroutine that calls the send method    DROP_OLDEST,
    // When the capacity is full, ignore the data to be added and do not suspend the coroutine that calls the send method    DROP_LATEST
}

2.FusibleFlow interface

The FusibleFlow interface inherits from the Flow interface. A class implements this interface, indicating that the flow created by the class can be fused with the stream adjacent to its upstream or downstream. When the stream merges, the fuse method defined in the interface will be called. The code is as follows:

@InternalCoroutinesApi
public interface FusibleFlow<T> : Flow<T> {
    // for fusion of streams    public fun fuse(
        context: CoroutineContext = EmptyCoroutineContext,
        capacity: Int = Channel.OPTIONAL_CHANNEL,
        onBufferOverflow: BufferOverflow = 
    ): Flow<T>
}

The fuse method of the FusibleFlow interface has a default capacity of OPTIONAL_CHANNEL and a default overflow policy of SUSPEND.

Fusion of streams

In Flow, when channelFlow method, flowOn method, buffer method, produceIn method, and broadcastIn method are called adjacently, the fusion of the flow will be triggered.

The specific fusion process is actually passing the downstream flow capacity, overflow strategy, and context to the upstream flow processing. The upstream flow recalculates based on its own capacity, overflow strategy, context, and downstream flow capacity, overflow strategy, and context, obtains a new capacity, overflow strategy, and context, and returns a fused flow.

3.ChannelFlow class

The ChannelFlow class is an abstract class that implements the FusibleFlow interface. The following is analyzing the strategy of fuse method for upstream and downstream flow fusion. The code is as follows:

@InternalCoroutinesApi
public abstract class ChannelFlow<T>(
    // The context of upstream stream    @JvmField public val context: CoroutineContext,
    // The cache capacity of streams between upstream and downstream    @JvmField public val capacity: Int,
    // Overflow policy    @JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {
    ...
    public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
        // CONFLATED is a composite type that needs to be disassembled into capacity = 0, onBufferOverflow = DROP_OLDEST        assert { capacity !=  }
        // Calculate the context of the fusion stream        val newContext = context + 
        // Used to save the capacity of the fusion stream        val newCapacity: Int
        // Overflow strategy used to save the converged stream        val newOverflow: BufferOverflow
        // SUSPEND is the default overflow policy, if the overflow policy is not the default policy        if (onBufferOverflow != ) {
            // Save directly            newCapacity = capacity
            newOverflow = onBufferOverflow
        } else { // If it is the default policy            // Calculate and save new capacity            newCapacity = when {
                // If the previous capacity is the default enumeration value, use the new one                 == Channel.OPTIONAL_CHANNEL -> capacity
                // If the new capacity is the default enumeration value, use the original                capacity == Channel.OPTIONAL_CHANNEL -> 
                // If the original capacity is the default value CHANNEL_DEFAULT_CAPACITY, use the new one                 ==  -> capacity
                // If the new capacity is the default value CHANNEL_DEFAULT_CAPACITY, use the original                capacity ==  -> 
                // If not default or default enumeration value                else -> {
                    // Check the capacity is greater than or equal to 0                    assert {  >= 0 }
                    assert { capacity >= 0 }
                    // Add the original capacity and the new capacity                    val sum =  + capacity
                    // If the addition is large and equal to 0, the capacity is the result after addition, otherwise it is infinite                    if (sum >= 0) sum else 
                }
            }
            // Save overflow policy            newOverflow = 
        }
        // If the context and capacity of the two converged streams are the same, the overflow strategy is the same.        if (newContext ==  && newCapacity ==  && newOverflow == )
            // Return directly            return this
        // If there is any change, the parameters are obtained based on the newly calculated and the fused stream is created.        return create(newContext, newCapacity, newOverflow)
    }
    // Rewrite by subclass    protected abstract fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T>
    ...
}

The principle of flow fusion

Based on the above analysis of fuse method, we can summarize the four principles of fuse method when calculating capacity and overflow strategies:

1) Downstream takes precedence over upstream

2) Overflow strategy takes precedence over capacity

3) Non-default value takes precedence over default value

4) The upstream and downstream capacity is not the default value, then the sum is added.

This is the article about the integration of Flow and Channel capacity and overflow strategy of Kotlin coroutine development. This is the end of this article. For more information about Kotlin Flow, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!