前言:为什么 Flow 取消很重要
今天,我们来深入探讨 Kotlin Flow 中的一个重要操作:流的取消。日常开发中我们需要保持一个 Flow 的长时间运行,确保 Flow 的及时取消,有非常重要的意义::
资源管理 (Resource Management):防止内存泄漏和不必要的 CPU 占用。
用户体验 (User Experience):当用户离开界面时,停止过时的操作。
网络效率 (Network Efficiency):取消不再需要的挂起请求。
电池寿命 (Battery Life):减少移动设备上的后台处理。
Flow 的取消操作不仅仅是调用cancel()然后期望一切顺利那么简单。多种手段都可以达到取消的意图,但每种手段都有其特定的使用场景和细微差别。接下来让我们一起探索可取消 Flow 的世界吧!
方法 1:Job 取消:基础用法
取消一个 Flow 最基本的方式就是通过Job取消。每个协程都有一个Job,当你取消这个Job时,所有在该协程作用域内运行的 Flow 都会被取消。看一个实际的例子:
suspend fun main() { val job = CoroutineScope(Dispatchers.Default).launch { createNumberFlow() .collect { value -> println("Received: $value") } } delay(3.seconds) println("Cancelling job...") job.cancel() delay(1.seconds) println("Program finished") } fun createNumberFlow() = flow { repeat(10) { i -> println("Emitting: $i") emit(i) delay(1.seconds) } }输出:
Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Cancelling job... Program finished底层原理是什么?
当你调用job.cancel()时,它会向协程发送一个取消信号。Flow 构建器 (flow { ... }) 是“取消协作”的,这意味着它会在挂起点 (suspension points) 检查取消状态,例如delay()和emit()。一旦被取消,Flow 将停止发射新值,收集器也将停止接收它们。
但这里有个有趣的问题:如果你的 Flow 没有挂起点会怎么样?
fun nonCancellableFlow() = flow { repeat(1000000) { i -> emit(i) } }这个 Flow 不会响应取消,因为它没有任何挂起点。要解决这个问题,你可以使用ensureActive():
fun cancellableFlow() = flow { repeat(1000000) { i -> ensureActive() emit(i) } }方法 2:Scope 取消:结构化并发
这可以理解为方法一的进阶版,对 CoroutineScope 进行取消,可以停止基于此 Scope 启动的所有协程任务。即所谓的“结构化并发”的优势
下面是一个通过 Scope 取消的使用示例:
class DataRepository { privateval scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) fun fetchDataStream(): Flow<String> = flow { repeat(Int.MAX_VALUE) { i -> emit("Data item $i") delay(500.milliseconds) } }.flowOn(Dispatchers.IO) fun startFetching(): Job { return scope.launch { fetchDataStream() .catch { e -> println("Error: ${e.message}") } .collect { data -> println("Processing: $data") } } } fun cleanup() { scope.cancel("Repository is being cleaned up") } } // 调用 suspendfun main() { val repository = DataRepository() val fetchJob = repository.startFetching() delay(3.seconds) println("Cleaning up repository...") repository.cleanup() delay(1.seconds) println("Done") }输出:
Processing: Data item 0 Processing: Data item 1 Processing: Data item 2 Processing: Data item 3 Processing: Data item 4 Processing: Data item 5 Cleaning up repository... Done方法 3:withTimeout:基于时间的取消
有时,你希望在一个 Flow 操作耗时过长的情况下取消它。withTimeout非常适合这种场景。它会创建一个定时炸弹 ⏰ —— 如果操作未在指定时间内完成,它将抛出一个TimeoutCancellationException。
suspend fun main() { try { withTimeout(5.seconds) { slowDataFlow() .collect { value -> println("Received: $value") } } } catch (e: TimeoutCancellationException) { println("Operation timed out: ${e.message}") } } fun slowDataFlow() = flow { repeat(10) { i -> println("Emitting: $i") emit(i) delay(1.seconds) // Each emission takes 1 second } }输出:
Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Emitting: 3 Received: 3 Emitting: 4 Received: 4 Operation timed out: Timed out waiting for 5000 ms方法 4:withTimeoutOrNull:更优雅的超时处理
如果你不希望处理异常,可以使用withTimeoutOrNull。它会在超时后返回null,而不是抛出异常。
suspend fun main() { val result = withTimeoutOrNull(5.seconds) { slowDataFlow() .collect { value -> println("Received: $value") } "Done"// 完成后返回一个值 } if (result == null) { println("Operation timed out") } else { println("Operation finished: $result") } }输出:
Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Emitting: 3 Received: 3 Emitting: 4 Received: 4 Operation timed out方法 5:使用布尔标志手动取消
你可以使用一个简单的布尔标志来控制 Flow 的发射。当需要对取消逻辑进行精细控制时,这非常有用。
class ManualCancellableFlow { privateval shouldStop = AtomicBoolean(false) fun createFlow() = flow { var i = 0 while (!shouldStop.get()) { emit("Item: $i") i++ delay(1.seconds) } } fun stop() { shouldStop.set(true) } } suspendfun main() { val manualFlow = ManualCancellableFlow() val job = CoroutineScope(Dispatchers.Default).launch { manualFlow.createFlow().collect { println(it) } } delay(3.5.seconds) manualFlow.stop() println("Stopped manually") job.join() }输出:
Item: 0 Item: 1 Item: 2 Stopped manually方法 6:使用cancellable操作符使 Flow 可取消
cancellable()用于使一个默认不可取消的 Flow 变得可取消。
默认情况下,由flow { ... }构建器创建的 Flow 是通过检查协程的isActive状态来协作式地取消的。但如果一个 Flow 的实现不包含挂起点(如delay)或者不主动检查isActive,那么它可能不会响应取消请求。asFlow()扩展函数创建的 Flow 就是一个例子,它会快速地发出所有元素,不检查取消状态。
cancellable操作符通过在下游消费者每次请求一个值时,在内部调用ensureActive()来确保 Flow 是可取消的。
代码示例:
我们来对比一下使用cancellable()前后的区别。
suspend fun main() = coroutineScope { println("--- Without cancellable() ---") val job1 = launch { (1..5).asFlow() .onEach { delay(100) } // 模拟一些处理 .collect { value -> println("Collecting $value") } } delay(250) job1.cancelAndJoin() println("Job 1 cancelled") println("\n--- With cancellable() ---") val job2 = launch { (1..5).asFlow() .cancellable() // 使 Flow 可取消 .onEach { delay(100) } // 模拟一些处理 .collect { value -> println("Collecting $value") } } delay(250) job2.cancelAndJoin() println("Job 2 cancelled") }输出:
--- Without cancellable() --- Collecting 1 Collecting 2 Collecting 3 Collecting 4 Collecting 5 Job 1 cancelled --- With cancellable() --- Collecting 1 Collecting 2 Job 2 cancelled底层原理是什么?
加上cancellable()后,Flow 在发射每个元素之前都会检查协程是否仍然处于活动状态。当job.cancel()被调用时,协程状态变为cancelling,cancellable()内部的ensureActive()会抛出CancellationException,从而停止 Flow 的执行。
方法 7:使用take和takeWhile
take操作符限制了 Flow 发射的 item 数量。takeWhile操作符则在给定谓词为 true 的情况下持续发射值。
使用take:
suspend fun main() { createNumberFlow() .take(3) // 只取前 3 项 .collect { value -> println("Received: $value") } }输出:
Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2使用takeWhile:
suspend fun main() { createNumberFlow() .takeWhile { it < 3 } // 当值小于 3 时持续获取 .collect { value -> println("Received: $value") } }输出:
Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Emitting: 3方法 8:first()和条件终端操作符
first()是一个终端操作符,它启动 Flow 的收集,等待第一个元素到达,然后立即取消 Flow 的上游。这在您只需要处理数据流的第一个有效项时非常有用,例如,从多个来源获取数据,但只关心最先返回的结果。
代码示例:
suspend fun main() { val numberFlow = flow { println("Flow started") emit(1) println("This will not be printed") emit(2) } val firstNumber = numberFlow.first() println("First number is: $firstNumber") }输出:
Flow started First number is: 1底层原理是什么?
first()在内部收集 Flow。一旦它收到第一个元素,就会抛出一个特殊的CancellationException来通知上游生产者停止发送更多的元素。这种机制确保了不必要的计算和资源消耗被立即终止。
方法 9:single()只期望一个元素
single()操作符比first()更为严格。它期望 Flow只发射一个元素。如果 Flow 完成时没有发射任何元素,或者发射了多个元素,single()会抛出异常。
零个元素: 抛出
NoSuchElementException。多个元素: 抛出
IllegalStateException。
当业务逻辑严格要求数据流必须产生且仅产生一个结果时,这个操作符非常有用。
代码示例:
suspend fun main() { // 场景 1: 成功 val singleElementFlow = flowOf(42) println("Single element: ${singleElementFlow.single()}") // 场景 2: 抛出 IllegalStateException try { val multipleElementsFlow = flowOf(1, 2, 3) multipleElementsFlow.single() } catch (e: IllegalStateException) { println("Caught expected exception: ${e.message}") } // 场景 3: 抛出 NoSuchElementException try { val emptyFlow = emptyFlow<Int>() emptyFlow.single() } catch (e: NoSuchElementException) { println("Caught expected exception: ${e.message}") } }输出:
Single element: 42 Caught expected exception: Flow has more than one element Caught expected exception: Flow is empty方法 10:any(),all(),none():基于布尔条件的取消
这些终端操作符用于检查 Flow 中的元素是否满足特定条件,并在得到结论后立即取消 Flow。
any { ... }: 当找到第一个匹配谓词的元素时,返回true并取消 Flow。all { ... }: 当遇到第一个不匹配谓词的元素时,返回false并取消 Flow。如果所有元素都匹配,返回true。none { ... }: 当找到第一个匹配谓词的元素时,返回false并取消 Flow。如果所有元素都不匹配,返回true。
代码示例 (any):
suspend fun main() { val numberFlow = flow { emit(1) println("Checking 1") emit(2) println("Checking 2") emit(3) // 满足条件 (it > 2) println("This will not be printed") emit(4) } val hasNumberGreaterThanTwo = numberFlow.any { it > 2 } println("Has a number greater than two: $hasNumberGreaterThanTwo") }输出:
Checking 1 Checking 2 Has a number greater than two: true方法 11:transformWhile:高级条件转换
transformWhile是一个功能强大的操作符,它结合了transform和takeWhile的特点。它允许你在转换元素的同时,根据条件决定是否继续处理 Flow。在 lambda 表达式中,你可以发射任意数量的值,最后返回一个布尔值来指示是否继续。
返回
true: 继续处理下一个元素。返回
false: 立即取消上游 Flow。
代码示例:
假设我们想处理数字,直到遇到第一个偶数,并且只输出它们的平方。
import kotlinx.coroutines.flow.* suspendfun main() { (1..10).asFlow() .transformWhile { value -> if (value % 2 == 0) { // 遇到偶数,停止 false } else { // 是奇数,转换并继续 emit(value * value) true } } .collect { println(it) } }输出:
1 9方法 12:collectLatest
collectLatest是一个非常特殊的终端操作符,主要用于 UI 编程或处理需要“最新值”的场景。当一个新的值被发射时,它会取消前一个值的处理逻辑。
想象一下,用户在搜索框中快速输入文本。你只想为最新的输入文本执行网络搜索,而忽略之前的输入。collectLatest正是为此而生。
代码示例:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspendfun main() = coroutineScope { val queryFlow = flow { emit("A") delay(100) emit("AB") delay(100) emit("ABC") delay(300) } queryFlow.collectLatest { query -> println("Searching for '$query'") delay(200) // 模拟网络请求 println("Finished search for '$query'") } }输出:
Searching for 'A' Searching for 'AB' Searching for 'ABC' Finished search for 'ABC'注意,"Finished search for 'A'" 和 "Finished search for 'AB'" 从未被打印,因为在它们的delay(200)完成之前,新的值就已经发射,导致它们的处理逻辑被取消。
方法 13:使用SharedFlow和StateFlow进行自定义取消
像SharedFlow和StateFlow这样的热流(Hot Flow)与冷流(Cold Flow)不同。它们独立于收集器的存在而存在。当一个收集器取消时,SharedFlow或StateFlow本身并不会停止。
因此,取消的责任落在了生产者这边。生产者协程需要一种方法来知道何时应该停止向 Flow 发射数据。这通常通过检查其所在协程的isActive状态来实现。
代码示例:
在这个例子中,我们创建一个DataProducer,它在一个独立的协程中向MutableSharedFlow发射数据。我们可以通过取消这个生产者协程的Job来停止数据的产生。
class DataProducer(privateval scope: CoroutineScope) { privateval _sharedFlow = MutableSharedFlow<Int>() val sharedFlow: SharedFlow<Int> = _sharedFlow.asSharedFlow() privatevar producerJob: Job? = null fun start() { producerJob = scope.launch(Dispatchers.Default) { var i = 0 while (isActive) { // 关键检查点 println("Producing ${i}") _sharedFlow.emit(i++) delay(1000) } } } fun stop() { producerJob?.cancel() } } suspendfun main() = coroutineScope { val producer = DataProducer(this) val collectorJob = launch { producer.sharedFlow.collect { println("Collector 1 received: $it") } } producer.start() delay(3500) println("Stopping producer...") producer.stop() // 取消生产者协程 delay(1000) collectorJob.cancel() // 取消收集器 println("Done") }输出:
Producing 0 Collector 1 received: 0 Producing 1 Collector 1 received: 1 Producing 2 Collector 1 received: 2 Producing 3 Collector 1 received: 3 Stopping producer... Done当 Flow 成功取消后,我们也要合理善后,最后看几个取消后处理的方法:
后处理 1:使用catch处理异常
catch操作符用于捕获 Flow 上游发生的异常。它也可以用来处理CancellationException,但你必须非常小心。
重要提示:如果你在catch块中“消耗”了CancellationException(即没有重新抛出它),你可能会阻止取消操作的正确传播,从而导致协程结构化并发的破坏。
一个常见的模式是在catch块中检查异常类型,如果它是CancellationException,就重新抛出它,以确保协程的取消机制正常工作。
代码示例:
suspend fun main() = coroutineScope { flow { emit(1) delay(100) emit(2) // 模拟一个非取消异常 throw IOException("Something went wrong") } .onEach { println("onEach: $it") } .catch { e -> println("Caught exception: ${e.javaClass.simpleName}") if (e is CancellationException) { throw e // 必须重新抛出 CancellationException } } .onCompletion { cause -> // 'cause' 为 null,因为异常被 'catch' 处理了 println("onCompletion with cause: $cause") } .collect { value -> println("Collected $value") } }输出:
onEach: 1 Collected 1 onEach: 2 Caught exception: IOException onCompletion with cause: null后处理 2:使用onCompletion清理资源
onCompletion操作符在 Flow 完成或被取消时调用。这是一个清理资源的绝佳位置,例如关闭文件或数据库连接。
onCompletionlambda 表达式会接收一个可空的Throwable作为参数。
如果 Flow 正常完成,
cause将为null。如果 Flow 由于异常(包括取消)而终止,
cause将是该异常。
代码示例:
suspend fun main() = coroutineScope { val job = launch { (1..5).asFlow() .onEach { delay(100) println("Emitting $it") } .onCompletion { cause -> if (cause != null) { println("Flow was cancelled with ${cause.javaClass.simpleName}") } else { println("Flow completed normally") } } .collect { value -> println("Collecting $value") } } delay(250) job.cancelAndJoin() println("Job cancelled") }输出:
Emitting 1 Collecting 1 Emitting 2 Collecting 2 Flow was cancelled with CancellationException Job cancelled从输出中可以看到,在作业被取消后,onCompletion块被执行,并且cause是CancellationException。
总结:Flow 取消的核心要点
在 Kotlin Coroutines 中,正确处理 Flow 的取消至关重要,这不仅能防止内存和计算资源的泄漏,还能提升应用的响应能力和用户体验。相信通过本文介绍的各种方法,从基础的 Job 取消和结构化并发,到高级的超时、手动标志甚至transformWhile操作符,大家可以掌握在各种场景下有效管理 Flow 生命周期的能力:
核心要点:
Job 取消: Flow 的取消是协作式的,依赖于挂起点或
ensureActive()。结构化并发: 在
CoroutineScope中启动 Flow,可以确保其生命周期与作用域绑定,实现自动取消。withTimeout/withTimeoutOrNull: 为 Flow 操作提供简单可靠的超时机制。
手动标志: 在无法依赖标准取消机制时,使用
AtomicBoolean等标志位提供灵活的控制。take/takeWhile: 根据元素数量或条件来完成 Flow,并自动取消。
cancellable: 使本身不可取消的 Flow 能够响应取消请求。
onCompletion: 在 Flow 完成或取消时执行清理操作。
catch: 处理上游异常,同时注意正确处理
CancellationException。
转自:Kotlin 协程 Flow 取消的 N 种方法