SoFunction
Updated on 2025-03-09

Kotlin coroutine Flow exception example handling

Example

The code is as follows:

launch() {
    // Part 1    flow {
        emit(1)
        throw NullPointerException("e")
    }.catch {
        ("liduo", "onCreate1: $it")
    }.collect {
        ("liudo", "onCreate2: $it")
    }
    // Part 2    flow {
        emit(1)
    }.onCompletion {
        ("liduo", "onCreate3: $it")
    }.collect {
        ("liudo", "onCreate4: $it")
    }
    // Part 3    flow {
        emit(1)
        throw NullPointerException("e")
    }.retryWhen { cause, attempt ->
        cause !is NullPointerException && attempt <= 2
    }.collect {
        ("liudo", "onCreate5: $it")
    }
}

1.catch method

The catch method is used to catch exceptions generated by upstream streams, and the code is as follows:

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> =
    flow { // Create Flow object        // Trigger the execution of the upstream stream and catch the exception        val exception = catchImpl(this)
        // When an exception is caught, the callback action is processed        if (exception != null) action(exception)
    }

The catch method is an extension method of the Flow interface and returns an object of type Flow. In the catch method, a Flow object is created by calling the flow method.

The core of the catch method is to realize the capture of exceptions through the catchImpl method. If the exception is successfully caught, the callback parameter action is processed. The parameter action here is an extension method of the FlowCollector interface, so you can continue to call the emit method and send a value to the downstream.

catchImpl method

When the collect method is called downstream, the execution of the Flow object created by the catch method will be triggered, and the catchImpl method will be called to handle it. The code is as follows:

internal suspend fun <T> Flow<T>.catchImpl(
    collector: FlowCollector<T>
): Throwable? {
    // Save the exception thrown by downstream stream execution    var fromDownstream: Throwable? = null
    try {
        // Trigger the execution of upstream stream        collect {
            try {
                // Use the value sent by the upstream stream as a parameter to trigger the downstream stream execution                (it)
            } catch (e: Throwable) { // If an exception occurs during execution of the downstream stream, save and throw                fromDownstream = e
                throw e
            }
        }
    } catch (e: Throwable) { // The exception caught here may be an exception in the upstream stream - the collect method.                             // It may also be an exception of downstream stream - emit method        // If the exception is an exception generated by the downstream stream, or an exception thrown when the coroutine is cancelled        if ((fromDownstream) || (coroutineContext)) {
            throw e // Throw it again and leave it to the downstream for processing        } else { // If it is an exception of the upstream stream and the exception is not canceled for the coroutine            return e // Successfully captured        }
    }
    // No exception was caught, return    return null
}

The catchImpl method is an extension method of the Flow interface, so when the collect method is called, the execution of the upstream flow will be triggered. The core of the catchImpl method is: pass the value emitted by the upstream to the downstream processing, and perform exception capture operations on this process.

2. onCompletion method

The onCompletion method is used to execute the last after all upstream streams are executed. The code is as follows:

public fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // Create a Flow object    try {
        // Trigger the execution of upstream stream        // this represents the downstream FlowCollector        collect(this)
    } catch (e: Throwable) {// If an exception occurs downstream        // Encapsulate the exception into a FlowCollector of type ThrowingCollector, and callback parameter action,        ThrowingCollector(e).invokeSafely(action, e)
        // throw an exception        throw e
    }
    // If normal execution is over, you will go here    val sc = SafeCollector(this, currentCoroutineContext())
    try {
        // Callback execution parameter action        (null)
    } finally {
        ()
    }
}

The onCompletion method is an extension method of the Flow interface, so when the collect method is called, the execution of the upstream stream will be triggered. At the same time, this is passed as a parameter, which means that when the downstream stream calls the collect method, the Flow object of the type Flow object created by the unsafeFlow method is passed to the FlowCollector. The core of the onCompletion method is: use the Flow object created by itself as the connection container between the upstream and downstream. Only when all the streams are executed or an exception occurs during the execution process, the collect method can be executed and continue to be executed downward.

method

The unsafeFlow method is used to create an object of type Flow, with the previous one inKotlin coroutine: Basic principles of FlowCompared with the mentioned SafeFlow class, the Flow object created by the unsafeFlow method will not check the execution context, and the code is as follows:

@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
    // Return an anonymous internal class    return object : Flow<T> {
        // The callback collect method is to directly execute block        override suspend fun collect(collector: FlowCollector<T>) {
            ()
        }
    }
}

Although the onCompletion method uses the unsafeFlow method to create a Flow object internally, the SafeCollector class is used. According to previousKotlin coroutine: Basic principles of FlowAs mentioned, when calling the emit method of the SafeCollector class, the context is checked. Therefore, the actual effect is the same as using the SafeFlow class.

kind

The ThrowingCollector class is also a FlowCollector, which is used to wrap exceptions. When its emit method is called, a wrapped exception will be thrown, the code is as follows:

private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> {
    override suspend fun emit(value: Any?) {
        // throw an exception        throw e
    }
}

Why recreate the ThrowingCollector object without using the downstream FlowCollector object?

In order to prevent the onCompletion method from sending data when the downstream stream execution fails, the onCompletion method is called when the action parameter is executed, which will cause the onCompletion method to be not the last executed method when used in the "finally code block".

The onCompletion method is combined with the catch method to achieve the effect of the try-catch-finially code block.

3. retryWhen method

The retryWhen method is similar to the catch method, and can be used to catch exceptions generated by upstream flow. But the difference between the two is that the retryWhen method can also determine whether to trigger the execution of the upstream stream again based on the "exception type" and "retry times". Moreover, when the retryWhen method does not intend to trigger the execution of the upstream stream again, the captured exception will be thrown, the code is as follows:

// Parameter cause indicates the captured exception// Parameter attempt indicates the number of retry// The parameter predicate returns true to indicate that the execution of the upstream stream is triggered againpublic fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
    // Create a Flow object    flow {
        // Record the number of retry        var attempt = 0L
        // Indicates whether to re-trigger        var shallRetry: Boolean
        do {
            // Reset to false            shallRetry = false
            // Trigger the execution of the upstream stream and catch the exception            val cause = catchImpl(this)
            // If an exception is caught            if (cause != null) {
                // User determines whether to re-trigger                if (predicate(cause, attempt)) {
                    // Indicates that you want to re-trigger                    shallRetry = true
                    // Add 1 retry                    attempt++
                } else { // If the user does not need to re-trigger                    // Abnormal is thrown                    throw cause
                }
            }
        // Determine whether to re-trigger        } while (shallRetry)
    }

The retryWhen method is an extension method of the Flow interface. The core of the retryWhen method realizes triggering and exception capture of upstream flow through the catchImpl method, and adds a retry logic implementation judged by the user.

This is the end of this article about the Flow exception example handling of Kotlin coroutines. For more related Kotlin Flow exception content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!