1. Communication between coroutines
When communication between coroutines is required, the Channel method can be called to create an object pointed to by the Channel interface, and send and receive messages by calling the send method and receive method of the object. The implementation of the Channel interface by coroutines is essentially similar to blocking queues, so I will not repeat it here.
1. Channel capacity
In fact, the send method and the receive method are not defined in the Channel interface, but are defined in the SendChannel interface and the ReceiveChannel interface respectively. The Channel interface only defines some enumeration constants related to the Channel capacity strategy, and the code is as follows:
//Inherit SendChannel interface and ReceiveChannel interfacepublic interface Channel<E> : SendChannel<E>, ReceiveChannel<E> { // Enumeration constants public companion object Factory { // The capacity of Channel is unlimited public const val UNLIMITED: Int = Int.MAX_VALUE // The capacity of Channel is 0, no cache public const val RENDEZVOUS: Int = 0 // The capacity of Channel is 1, and the overflow policy is DROP_OLDEST. // The next data will overwrite the previous data public const val CONFLATED: Int = -1 // The capacity of Channel is the default value CHANNEL_DEFAULT_CAPACITY, // The default overflow policy is SUSPEND, and the send method will hang // When the capacity policy is BUFFERED and the overflow policy is not SUSPEND, the capacity of the Channel is 1 public const val BUFFERED: Int = -2 // A default enum value used internally by coroutines is not exposed to the outside world internal const val OPTIONAL_CHANNEL = -3 // Default value for manually configuring the capacity policy to BUFFERED public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "" // The default value when the capacity policy is BUFFERED // Default 64, minimum 1, maximum is Int.MAX_VALUE-1 internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME, 64, 1, UNLIMITED - 1 ) } }
From the above code, we can see that the Channel interface inherits from the SendChannel interface and the ReceiveChannel interface. Therefore, an object pointed to by a Channel interface can be used to send messages or receive messages.
2. Overflow strategy
In addition to capacity policies, Channel also has an overflow policy, which is used to determine the behavior when the Channel's capacity is full and the next message arrives. The overflow policy is defined in the enumeration class BufferOverflow, and the code is as follows:
public enum class BufferOverflow { // When the capacity is full, suspend the coroutine that calls the send method SUSPEND, // When the capacity is full, delete the old data, add new data, and do not suspend the coroutine that calls the send method DROP_OLDEST, // When the capacity is full, ignore the data to be added and do not suspend the coroutine that calls the send method DROP_LATEST }
2.FusibleFlow interface
The FusibleFlow interface inherits from the Flow interface. A class implements this interface, indicating that the flow created by the class can be fused with the stream adjacent to its upstream or downstream. When the stream merges, the fuse method defined in the interface will be called. The code is as follows:
@InternalCoroutinesApi public interface FusibleFlow<T> : Flow<T> { // for fusion of streams public fun fuse( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.OPTIONAL_CHANNEL, onBufferOverflow: BufferOverflow = ): Flow<T> }
The fuse method of the FusibleFlow interface has a default capacity of OPTIONAL_CHANNEL and a default overflow policy of SUSPEND.
Fusion of streams
In Flow, when channelFlow method, flowOn method, buffer method, produceIn method, and broadcastIn method are called adjacently, the fusion of the flow will be triggered.
The specific fusion process is actually passing the downstream flow capacity, overflow strategy, and context to the upstream flow processing. The upstream flow recalculates based on its own capacity, overflow strategy, context, and downstream flow capacity, overflow strategy, and context, obtains a new capacity, overflow strategy, and context, and returns a fused flow.
3.ChannelFlow class
The ChannelFlow class is an abstract class that implements the FusibleFlow interface. The following is analyzing the strategy of fuse method for upstream and downstream flow fusion. The code is as follows:
@InternalCoroutinesApi public abstract class ChannelFlow<T>( // The context of upstream stream @JvmField public val context: CoroutineContext, // The cache capacity of streams between upstream and downstream @JvmField public val capacity: Int, // Overflow policy @JvmField public val onBufferOverflow: BufferOverflow ) : FusibleFlow<T> { ... public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> { // CONFLATED is a composite type that needs to be disassembled into capacity = 0, onBufferOverflow = DROP_OLDEST assert { capacity != } // Calculate the context of the fusion stream val newContext = context + // Used to save the capacity of the fusion stream val newCapacity: Int // Overflow strategy used to save the converged stream val newOverflow: BufferOverflow // SUSPEND is the default overflow policy, if the overflow policy is not the default policy if (onBufferOverflow != ) { // Save directly newCapacity = capacity newOverflow = onBufferOverflow } else { // If it is the default policy // Calculate and save new capacity newCapacity = when { // If the previous capacity is the default enumeration value, use the new one == Channel.OPTIONAL_CHANNEL -> capacity // If the new capacity is the default enumeration value, use the original capacity == Channel.OPTIONAL_CHANNEL -> // If the original capacity is the default value CHANNEL_DEFAULT_CAPACITY, use the new one == -> capacity // If the new capacity is the default value CHANNEL_DEFAULT_CAPACITY, use the original capacity == -> // If not default or default enumeration value else -> { // Check the capacity is greater than or equal to 0 assert { >= 0 } assert { capacity >= 0 } // Add the original capacity and the new capacity val sum = + capacity // If the addition is large and equal to 0, the capacity is the result after addition, otherwise it is infinite if (sum >= 0) sum else } } // Save overflow policy newOverflow = } // If the context and capacity of the two converged streams are the same, the overflow strategy is the same. if (newContext == && newCapacity == && newOverflow == ) // Return directly return this // If there is any change, the parameters are obtained based on the newly calculated and the fused stream is created. return create(newContext, newCapacity, newOverflow) } // Rewrite by subclass protected abstract fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> ... }
The principle of flow fusion
Based on the above analysis of fuse method, we can summarize the four principles of fuse method when calculating capacity and overflow strategies:
1) Downstream takes precedence over upstream
2) Overflow strategy takes precedence over capacity
3) Non-default value takes precedence over default value
4) The upstream and downstream capacity is not the default value, then the sum is added.
This is the article about the integration of Flow and Channel capacity and overflow strategy of Kotlin coroutine development. This is the end of this article. For more information about Kotlin Flow, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!