news 2026/3/5 18:54:59

Kotlin 协程 Flow 取消的 N 种方法

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kotlin 协程 Flow 取消的 N 种方法

前言:为什么 Flow 取消很重要

今天,我们来深入探讨 Kotlin Flow 中的一个重要操作:流的取消。日常开发中我们需要保持一个 Flow 的长时间运行,确保 Flow 的及时取消,有非常重要的意义::

  • 资源管理 (Resource Management):防止内存泄漏和不必要的 CPU 占用。

  • 用户体验 (User Experience):当用户离开界面时,停止过时的操作。

  • 网络效率 (Network Efficiency):取消不再需要的挂起请求。

  • 电池寿命 (Battery Life):减少移动设备上的后台处理。

Flow 的取消操作不仅仅是调用cancel()然后期望一切顺利那么简单。多种手段都可以达到取消的意图,但每种手段都有其特定的使用场景和细微差别。接下来让我们一起探索可取消 Flow 的世界吧!

方法 1:Job 取消:基础用法

取消一个 Flow 最基本的方式就是通过Job取消。每个协程都有一个Job,当你取消这个Job时,所有在该协程作用域内运行的 Flow 都会被取消。看一个实际的例子:

suspend fun main() { val job = CoroutineScope(Dispatchers.Default).launch { createNumberFlow() .collect { value -> println("Received: $value") } } delay(3.seconds) println("Cancelling job...") job.cancel() delay(1.seconds) println("Program finished") } fun createNumberFlow() = flow { repeat(10) { i -> println("Emitting: $i") emit(i) delay(1.seconds) } }

输出:

Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Cancelling job... Program finished

底层原理是什么?

当你调用job.cancel()时,它会向协程发送一个取消信号。Flow 构建器 (flow { ... }) 是“取消协作”的,这意味着它会在挂起点 (suspension points) 检查取消状态,例如delay()emit()。一旦被取消,Flow 将停止发射新值,收集器也将停止接收它们。

但这里有个有趣的问题:如果你的 Flow 没有挂起点会怎么样?

fun nonCancellableFlow() = flow { repeat(1000000) { i -> emit(i) } }

这个 Flow 不会响应取消,因为它没有任何挂起点。要解决这个问题,你可以使用ensureActive()

fun cancellableFlow() = flow { repeat(1000000) { i -> ensureActive() emit(i) } }

方法 2:Scope 取消:结构化并发

这可以理解为方法一的进阶版,对 CoroutineScope 进行取消,可以停止基于此 Scope 启动的所有协程任务。即所谓的“结构化并发”的优势

下面是一个通过 Scope 取消的使用示例:

class DataRepository { privateval scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) fun fetchDataStream(): Flow<String> = flow { repeat(Int.MAX_VALUE) { i -> emit("Data item $i") delay(500.milliseconds) } }.flowOn(Dispatchers.IO) fun startFetching(): Job { return scope.launch { fetchDataStream() .catch { e -> println("Error: ${e.message}") } .collect { data -> println("Processing: $data") } } } fun cleanup() { scope.cancel("Repository is being cleaned up") } } // 调用 suspendfun main() { val repository = DataRepository() val fetchJob = repository.startFetching() delay(3.seconds) println("Cleaning up repository...") repository.cleanup() delay(1.seconds) println("Done") }

输出:

Processing: Data item 0 Processing: Data item 1 Processing: Data item 2 Processing: Data item 3 Processing: Data item 4 Processing: Data item 5 Cleaning up repository... Done

方法 3:withTimeout:基于时间的取消

有时,你希望在一个 Flow 操作耗时过长的情况下取消它。withTimeout非常适合这种场景。它会创建一个定时炸弹 ⏰ —— 如果操作未在指定时间内完成,它将抛出一个TimeoutCancellationException

suspend fun main() { try { withTimeout(5.seconds) { slowDataFlow() .collect { value -> println("Received: $value") } } } catch (e: TimeoutCancellationException) { println("Operation timed out: ${e.message}") } } fun slowDataFlow() = flow { repeat(10) { i -> println("Emitting: $i") emit(i) delay(1.seconds) // Each emission takes 1 second } }

输出:

Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Emitting: 3 Received: 3 Emitting: 4 Received: 4 Operation timed out: Timed out waiting for 5000 ms

方法 4:withTimeoutOrNull:更优雅的超时处理

如果你不希望处理异常,可以使用withTimeoutOrNull。它会在超时后返回null,而不是抛出异常。

suspend fun main() { val result = withTimeoutOrNull(5.seconds) { slowDataFlow() .collect { value -> println("Received: $value") } "Done"// 完成后返回一个值 } if (result == null) { println("Operation timed out") } else { println("Operation finished: $result") } }

输出:

Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Emitting: 3 Received: 3 Emitting: 4 Received: 4 Operation timed out

方法 5:使用布尔标志手动取消

你可以使用一个简单的布尔标志来控制 Flow 的发射。当需要对取消逻辑进行精细控制时,这非常有用。

class ManualCancellableFlow { privateval shouldStop = AtomicBoolean(false) fun createFlow() = flow { var i = 0 while (!shouldStop.get()) { emit("Item: $i") i++ delay(1.seconds) } } fun stop() { shouldStop.set(true) } } suspendfun main() { val manualFlow = ManualCancellableFlow() val job = CoroutineScope(Dispatchers.Default).launch { manualFlow.createFlow().collect { println(it) } } delay(3.5.seconds) manualFlow.stop() println("Stopped manually") job.join() }

输出:

Item: 0 Item: 1 Item: 2 Stopped manually

方法 6:使用cancellable操作符使 Flow 可取消

cancellable()用于使一个默认不可取消的 Flow 变得可取消。

默认情况下,由flow { ... }构建器创建的 Flow 是通过检查协程的isActive状态来协作式地取消的。但如果一个 Flow 的实现不包含挂起点(如delay)或者不主动检查isActive,那么它可能不会响应取消请求。asFlow()扩展函数创建的 Flow 就是一个例子,它会快速地发出所有元素,不检查取消状态。

cancellable操作符通过在下游消费者每次请求一个值时,在内部调用ensureActive()来确保 Flow 是可取消的。

代码示例:

我们来对比一下使用cancellable()前后的区别。

suspend fun main() = coroutineScope { println("--- Without cancellable() ---") val job1 = launch { (1..5).asFlow() .onEach { delay(100) } // 模拟一些处理 .collect { value -> println("Collecting $value") } } delay(250) job1.cancelAndJoin() println("Job 1 cancelled") println("\n--- With cancellable() ---") val job2 = launch { (1..5).asFlow() .cancellable() // 使 Flow 可取消 .onEach { delay(100) } // 模拟一些处理 .collect { value -> println("Collecting $value") } } delay(250) job2.cancelAndJoin() println("Job 2 cancelled") }

输出:

--- Without cancellable() --- Collecting 1 Collecting 2 Collecting 3 Collecting 4 Collecting 5 Job 1 cancelled --- With cancellable() --- Collecting 1 Collecting 2 Job 2 cancelled

底层原理是什么?

加上cancellable()后,Flow 在发射每个元素之前都会检查协程是否仍然处于活动状态。当job.cancel()被调用时,协程状态变为cancellingcancellable()内部的ensureActive()会抛出CancellationException,从而停止 Flow 的执行。

方法 7:使用taketakeWhile

take操作符限制了 Flow 发射的 item 数量。takeWhile操作符则在给定谓词为 true 的情况下持续发射值。

使用take:

suspend fun main() { createNumberFlow() .take(3) // 只取前 3 项 .collect { value -> println("Received: $value") } }

输出:

Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2

使用takeWhile:

suspend fun main() { createNumberFlow() .takeWhile { it < 3 } // 当值小于 3 时持续获取 .collect { value -> println("Received: $value") } }

输出:

Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Emitting: 3

方法 8:first()和条件终端操作符

first()是一个终端操作符,它启动 Flow 的收集,等待第一个元素到达,然后立即取消 Flow 的上游。这在您只需要处理数据流的第一个有效项时非常有用,例如,从多个来源获取数据,但只关心最先返回的结果。

代码示例:

suspend fun main() { val numberFlow = flow { println("Flow started") emit(1) println("This will not be printed") emit(2) } val firstNumber = numberFlow.first() println("First number is: $firstNumber") }

输出:

Flow started First number is: 1

底层原理是什么?

first()在内部收集 Flow。一旦它收到第一个元素,就会抛出一个特殊的CancellationException来通知上游生产者停止发送更多的元素。这种机制确保了不必要的计算和资源消耗被立即终止。

方法 9:single()只期望一个元素

single()操作符比first()更为严格。它期望 Flow只发射一个元素。如果 Flow 完成时没有发射任何元素,或者发射了多个元素,single()会抛出异常。

  • 零个元素: 抛出NoSuchElementException

  • 多个元素: 抛出IllegalStateException

当业务逻辑严格要求数据流必须产生且仅产生一个结果时,这个操作符非常有用。

代码示例:

suspend fun main() { // 场景 1: 成功 val singleElementFlow = flowOf(42) println("Single element: ${singleElementFlow.single()}") // 场景 2: 抛出 IllegalStateException try { val multipleElementsFlow = flowOf(1, 2, 3) multipleElementsFlow.single() } catch (e: IllegalStateException) { println("Caught expected exception: ${e.message}") } // 场景 3: 抛出 NoSuchElementException try { val emptyFlow = emptyFlow<Int>() emptyFlow.single() } catch (e: NoSuchElementException) { println("Caught expected exception: ${e.message}") } }

输出:

Single element: 42 Caught expected exception: Flow has more than one element Caught expected exception: Flow is empty

方法 10:any(),all(),none():基于布尔条件的取消

这些终端操作符用于检查 Flow 中的元素是否满足特定条件,并在得到结论后立即取消 Flow。

  • any { ... }: 当找到第一个匹配谓词的元素时,返回true并取消 Flow。

  • all { ... }: 当遇到第一个不匹配谓词的元素时,返回false并取消 Flow。如果所有元素都匹配,返回true

  • none { ... }: 当找到第一个匹配谓词的元素时,返回false并取消 Flow。如果所有元素都不匹配,返回true

代码示例 (any):

suspend fun main() { val numberFlow = flow { emit(1) println("Checking 1") emit(2) println("Checking 2") emit(3) // 满足条件 (it > 2) println("This will not be printed") emit(4) } val hasNumberGreaterThanTwo = numberFlow.any { it > 2 } println("Has a number greater than two: $hasNumberGreaterThanTwo") }

输出:

Checking 1 Checking 2 Has a number greater than two: true

方法 11:transformWhile:高级条件转换

transformWhile是一个功能强大的操作符,它结合了transformtakeWhile的特点。它允许你在转换元素的同时,根据条件决定是否继续处理 Flow。在 lambda 表达式中,你可以发射任意数量的值,最后返回一个布尔值来指示是否继续。

  • 返回true: 继续处理下一个元素。

  • 返回false: 立即取消上游 Flow。

代码示例:

假设我们想处理数字,直到遇到第一个偶数,并且只输出它们的平方。

import kotlinx.coroutines.flow.* suspendfun main() { (1..10).asFlow() .transformWhile { value -> if (value % 2 == 0) { // 遇到偶数,停止 false } else { // 是奇数,转换并继续 emit(value * value) true } } .collect { println(it) } }

输出:

1 9

方法 12:collectLatest

collectLatest是一个非常特殊的终端操作符,主要用于 UI 编程或处理需要“最新值”的场景。当一个新的值被发射时,它会取消前一个值的处理逻辑

想象一下,用户在搜索框中快速输入文本。你只想为最新的输入文本执行网络搜索,而忽略之前的输入。collectLatest正是为此而生。

代码示例:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspendfun main() = coroutineScope { val queryFlow = flow { emit("A") delay(100) emit("AB") delay(100) emit("ABC") delay(300) } queryFlow.collectLatest { query -> println("Searching for '$query'") delay(200) // 模拟网络请求 println("Finished search for '$query'") } }

输出:

Searching for 'A' Searching for 'AB' Searching for 'ABC' Finished search for 'ABC'

注意,"Finished search for 'A'" 和 "Finished search for 'AB'" 从未被打印,因为在它们的delay(200)完成之前,新的值就已经发射,导致它们的处理逻辑被取消。

方法 13:使用SharedFlowStateFlow进行自定义取消

SharedFlowStateFlow这样的热流(Hot Flow)与冷流(Cold Flow)不同。它们独立于收集器的存在而存在。当一个收集器取消时,SharedFlowStateFlow本身并不会停止。

因此,取消的责任落在了生产者这边。生产者协程需要一种方法来知道何时应该停止向 Flow 发射数据。这通常通过检查其所在协程的isActive状态来实现。

代码示例:

在这个例子中,我们创建一个DataProducer,它在一个独立的协程中向MutableSharedFlow发射数据。我们可以通过取消这个生产者协程的Job来停止数据的产生。

class DataProducer(privateval scope: CoroutineScope) { privateval _sharedFlow = MutableSharedFlow<Int>() val sharedFlow: SharedFlow<Int> = _sharedFlow.asSharedFlow() privatevar producerJob: Job? = null fun start() { producerJob = scope.launch(Dispatchers.Default) { var i = 0 while (isActive) { // 关键检查点 println("Producing ${i}") _sharedFlow.emit(i++) delay(1000) } } } fun stop() { producerJob?.cancel() } } suspendfun main() = coroutineScope { val producer = DataProducer(this) val collectorJob = launch { producer.sharedFlow.collect { println("Collector 1 received: $it") } } producer.start() delay(3500) println("Stopping producer...") producer.stop() // 取消生产者协程 delay(1000) collectorJob.cancel() // 取消收集器 println("Done") }

输出:

Producing 0 Collector 1 received: 0 Producing 1 Collector 1 received: 1 Producing 2 Collector 1 received: 2 Producing 3 Collector 1 received: 3 Stopping producer... Done

当 Flow 成功取消后,我们也要合理善后,最后看几个取消后处理的方法:

后处理 1:使用catch处理异常

catch操作符用于捕获 Flow 上游发生的异常。它也可以用来处理CancellationException,但你必须非常小心。

重要提示:如果你在catch块中“消耗”了CancellationException(即没有重新抛出它),你可能会阻止取消操作的正确传播,从而导致协程结构化并发的破坏。

一个常见的模式是在catch块中检查异常类型,如果它是CancellationException,就重新抛出它,以确保协程的取消机制正常工作。

代码示例:

suspend fun main() = coroutineScope { flow { emit(1) delay(100) emit(2) // 模拟一个非取消异常 throw IOException("Something went wrong") } .onEach { println("onEach: $it") } .catch { e -> println("Caught exception: ${e.javaClass.simpleName}") if (e is CancellationException) { throw e // 必须重新抛出 CancellationException } } .onCompletion { cause -> // 'cause' 为 null,因为异常被 'catch' 处理了 println("onCompletion with cause: $cause") } .collect { value -> println("Collected $value") } }

输出:

onEach: 1 Collected 1 onEach: 2 Caught exception: IOException onCompletion with cause: null

后处理 2:使用onCompletion清理资源

onCompletion操作符在 Flow 完成或被取消时调用。这是一个清理资源的绝佳位置,例如关闭文件或数据库连接。

onCompletionlambda 表达式会接收一个可空的Throwable作为参数。

  • 如果 Flow 正常完成,cause将为null

  • 如果 Flow 由于异常(包括取消)而终止,cause将是该异常。

代码示例:

suspend fun main() = coroutineScope { val job = launch { (1..5).asFlow() .onEach { delay(100) println("Emitting $it") } .onCompletion { cause -> if (cause != null) { println("Flow was cancelled with ${cause.javaClass.simpleName}") } else { println("Flow completed normally") } } .collect { value -> println("Collecting $value") } } delay(250) job.cancelAndJoin() println("Job cancelled") }

输出:

Emitting 1 Collecting 1 Emitting 2 Collecting 2 Flow was cancelled with CancellationException Job cancelled

从输出中可以看到,在作业被取消后,onCompletion块被执行,并且causeCancellationException

总结:Flow 取消的核心要点

在 Kotlin Coroutines 中,正确处理 Flow 的取消至关重要,这不仅能防止内存和计算资源的泄漏,还能提升应用的响应能力和用户体验。相信通过本文介绍的各种方法,从基础的 Job 取消和结构化并发,到高级的超时、手动标志甚至transformWhile操作符,大家可以掌握在各种场景下有效管理 Flow 生命周期的能力:

核心要点

  • Job 取消: Flow 的取消是协作式的,依赖于挂起点或ensureActive()

  • 结构化并发: 在CoroutineScope中启动 Flow,可以确保其生命周期与作用域绑定,实现自动取消。

  • withTimeout/withTimeoutOrNull: 为 Flow 操作提供简单可靠的超时机制。

  • 手动标志: 在无法依赖标准取消机制时,使用AtomicBoolean等标志位提供灵活的控制。

  • take/takeWhile: 根据元素数量或条件来完成 Flow,并自动取消。

  • cancellable: 使本身不可取消的 Flow 能够响应取消请求。

  • onCompletion: 在 Flow 完成或取消时执行清理操作。

  • catch: 处理上游异常,同时注意正确处理CancellationException

转自:Kotlin 协程 Flow 取消的 N 种方法

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/5 6:38:59

AB下载管理器技术创新:重新定义开源下载工具的未来

你是否曾经在下载大文件时看着缓慢的进度条焦急等待&#xff1f;是否因为网络不稳定导致下载中断而前功尽弃&#xff1f;在数字内容爆炸式增长的今天&#xff0c;传统的下载方式已经无法满足我们对效率和稳定性的需求。开源下载工具AB下载管理器正在通过一系列技术突破&#xf…

作者头像 李华
网站建设 2026/3/3 19:14:51

免费Windows系统安全神器:OpenArk完整使用手册与深度解析

在Windows系统安全防护领域&#xff0c;传统杀毒软件往往难以应对深度隐藏的Rootkit威胁。OpenArk作为新一代免费开源的反Rootkit工具&#xff0c;凭借其强大的系统底层分析能力和直观的操作界面&#xff0c;为普通用户和专业技术人员提供了全方位的系统安全解决方案。 【免费下…

作者头像 李华
网站建设 2026/2/26 15:57:58

EmotiVoice在车载语音系统中的适配性测试报告

EmotiVoice在车载语音系统中的适配性测试报告 在高端车型的智能座舱演示中&#xff0c;你是否曾被一句温柔提醒“您已连续驾驶两小时&#xff0c;建议休息片刻”所打动&#xff1f;那声音或许不是预录的人声&#xff0c;而是由AI实时生成、带着轻微疲惫感语调的个性化语音——它…

作者头像 李华
网站建设 2026/3/5 12:45:11

PDFMathTranslate终极技巧:快速修复学术论文翻译中的文字重叠问题

PDFMathTranslate终极技巧&#xff1a;快速修复学术论文翻译中的文字重叠问题 【免费下载链接】PDFMathTranslate PDF scientific paper translation with preserved formats - 基于 AI 完整保留排版的 PDF 文档全文双语翻译&#xff0c;支持 Google/DeepL/Ollama/OpenAI 等服务…

作者头像 李华
网站建设 2026/3/5 13:58:52

UKB(UK Biobank)的RAP平台获取数据和下载数据流程

首先进入RAP网址&#xff1a; https://ukbiobank.dnanexus.com1、找到后缀为dataset&#xff0c;点击进入 2、点击data previer,然后点击add column 3、找到需要获取或者下载的数据列名 &#xff08;这个不知道自己想要的列名在哪&#xff0c;可以进入 https://biobank.cts…

作者头像 李华
网站建设 2026/3/4 16:36:20

No2.1 信息系统工程错题集

1. 诺兰模型中数据库技术的应用阶段题目诺兰将计算机信息系统的发展道路划分为六个阶段&#xff0c;采用数据库&#xff08;Data Base, DB&#xff09;技术属于 () 阶段的主要特点。A. 控制阶段 B. 集成阶段 C. 数据管理阶段 D. 成熟阶段&#xff08;正确答案&#xff1a;A&…

作者头像 李华