文章目录
- 有话说
- 概述
- 初识Channel
- Channel种类
- Channel五大金刚
- Send
- Receive
- Closed
- Queue
- Buffer
- Channel的行为
- Channel源码分析
- 发送数据大动脉
- 接收数据大动脉
- 父类默认实现方式(RendezvousChannel)
- 发送流程
- send()函数
- onSend()函数
- 接收流程
- receiveCatching()函数
- onReceiveCatching()函数
- 关闭流程
- 取消流程
有话说
学不好Channel就学不好Flow。也学不好协程。Channel源码理解起来不难,但是逻辑有点多,本文内容比较多,请耐心反复多看几遍,相信你会有很大收获。如有不正确的地方请帮忙在评论区支持,创作不易,如果喜欢请关注收藏,谢谢
概述
引用Go
语言中协程通信的一句话:“Do not communicate by sharing memory; instead, share memory by communicating”。Channel一个“【线程安全】”的可用于不同协程之间通信的一种东西。就像一个管道,有点类似于生产者和消费者的模式,不同之处在于生产者和消费者模式只能运行在不同线程里面,如果消费者和生产者的速度不一致时是通过线程阻塞的方式来实现的。而Channel的发送和接收可以运行在同一个线程里面(当协程的调度器为Dispatchers.Unconfined
)。因为同一个线程里面可以开启多个不同的协程,而且采用的是挂起而非阻塞的方式,性能更优。
Channel
分为支持缓存和不支持缓存两种类型,不支持缓存的Channel
发送一个数据的时候,如果没有接收者在等待接收,那么发送者就需要挂起等待。只有等接受者把数据消费了,发送者才能继续发送。对于支持缓存的Channel
,即使没有接收者,一样可以继续发送,把发送的数据放在缓存Buffer
里面,缓存Buffer
满了在根据异常策略来决定怎么操作,比如挂起,比如丢掉缓存里面最旧的那个,比如丢掉本次发送的新值等。
Channel
属于比较偏底层的东西了,但是同时也很重要,因为Kotlin
里面协程之间需要通过Channel
来通信,Flow
的内部也大量使用了
Channel
,不说完全了解Channel
的整个源码,Channel
基本的运作还是要了解清楚,否则Flow
很多时候你使用起来都不知所以然。想要学好协程需要理解Channel
,想要学好Channel
需要对协程的一些基本原理有一定理解。
初识Channel
Channel
创建出来后,可以往Channel
里面发送东西,然后就可以从Channel
里面取数据,类Channel
是一个接口,他的所有子类的可见性都是Internal
,因此我们在项目中都不能直接 new
出来,只能通过库提供的方法创建,创建一个Channel
用的最多方式有两种:
第一种:
val channel = Channel<Int>()
调用的是一个名为 Channel
的函数,返回一个Channel
接口的实现类:
/*** capacity:缓存容量,在Channel中定义了四种:* (1) RENDEZVOUS 没有缓存* (2) CONFLATED 一个缓存。* (3) BUFFERED 默认为64,如果没用通过系统属性指定,如果系统属性指定值不在1~Int.Max-1之间,也用默认64* (4) UNLIMITED 为Int.MAX_VALUE,Int的最大值* * onBufferOverflow :缓存溢出(满了,不能再放东西)的时候的策略。有三种:* (1)SUSPEND 挂起发送协程* (2)DROP_OLDEST,丢弃缓存中最久的那一个值,不会挂起,协程继续* (3)DROP_LATEST ,丢弃当前发送的这个值,缓存保持不变,不会挂起,协程继续** onUndeliveredElement :一个回调,当发送出去的数据不会被消费时(没有机会被接收方接收)回调。* 不会被消费指的是,已经发送出去的数据(被挂起等待的或者在Buffer中的)没有机会被消费了。* 有三种情况会被调调:* 1.调用了channel的cancel()方法:不管是已经在Buffer中还是挂起等待的数据都会触发回调。* 2.调用协程的cancel方法:只有导致协程挂起的数据会被触发(这里面包括select函数onSend发出未被接收的),* Buffer中的数据不会,因为Buffer中的数据任然有机会被接收。* * 3.Channel被关闭后在往里面发数据:这一次发送也会被回调,因为再也不可能被消费了,都发不进去了。* 同时还会跑出异常(ClosedSendChannelException )* ** 这儿要注意一点一个channel调用了close方法,并不意味这个channel已经发送出去的数据不能被消费。* ***/
public fun <E> Channel(capacity: Int = RENDEZVOUS,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> ={ }
用一张图总结一下这个三个参数的作用:
第二种:
//通过produce返回了一个receiveChannel
val receiveChannel = GlobalScope.produce<Int> { //在里面发送数据,运行在新开启的协程里面send(1)}//or 通过actor返回了一个sendChannel
val sendChannel = GlobalScope.actor<Int> {//在里面接收数据,运行在新开启的协程里面val value = receiveCatching().getOrNull()}//------ produce和actor----------public fun <E> CoroutineScope.produce(context: CoroutineContext = EmptyCoroutineContext,capacity: Int = 0,@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> public fun <E> CoroutineScope.actor(context: CoroutineContext = EmptyCoroutineContext,capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?start: CoroutineStart = CoroutineStart.DEFAULT,onCompletion: CompletionHandler? = null,block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>
通过produce
或者actor
函数返回一个Channel
对象。produce
返回值类型是ReceiveChannel
类型,actor
返回类型为SendChannel
类型,这种方式内部也是采用第一种方式创建一个Channel
,然后开启一个新的协程,然后让发送和接收的代码都运行在一个协程里面,比如produce函数源码流程:
Actor
函数的流程和Produce
大同小异。就不在列出了。
produce和actor方法都会开启一个新的协程,让发送或者接收的代码运行在新开启的协程里面。默认新的协程的启动方式为:CoroutineStart.DEFAULT,produce不能重新指定(目前没有开放支持重新指定启动方式的函数),actor可以重新指定。
调度器的情况消费复杂一点,我们可以选择自己想要的调度器。如果我们传入了自己想要的调度器,那新起的协程就采用该调度器。
如果没有传,那就采用父协程同样的调度器,如果没有父协程那就采用默认调度器Dispatchers.Default
先来一个简单的关于Channel
的用法:
通过第一种创建方式:
fun main() = runBlocking{//默认返回的是RendezvousChannel.val channel = Channel<Int>()//开启一个新协程去发送数据val sendCoroutine = launch(Dispatchers.IO) {repeat(5){delay(1000)log("send $it")channel.send(it)}//用完后了,不想用了需要手动关闭。channel.close()}/*** 开启一个新协程去接收数据* 接收时不要用receive方法。容易抛异常ClosedReceiveChannelException* while (!channel.isClosedForReceive){* log(channel.receive())* }*/val receiveCoroutine = launch(Dispatchers.IO) {while (true){val result = channel.receiveCatching()if(result.isClosed){break;}log("receive ${result.getOrNull()}")}}joinAll(sendCoroutine,receiveCoroutine)log("exit")
}
第二种:
//(1)produce
fun main() = runBlocking {//发送的代码运行在produce所启动的新协程里面val receiveChannel = produce<Int> {repeat(5) {delay(1000)log("send $it")send(it)}}while (true) {val result = receiveChannel.receiveCatching()if (result.isClosed) {break;}log("receive ${result.getOrNull()}")}log("exit")
}//(2)actor
fun main() = runBlocking {//接收的代码运行在actor所启动的新协程里面。val sendChannel = actor<Int> {while (true){val result = receiveCatching()if (result.isClosed) {break;}log("receive ${result.getOrNull()}")}}repeat(5) {delay(1000)log("send $it")sendChannel.send(it)}log("exit")
}
Channel种类
不管上面提到的第一种还是第二种创建Channel
的方式,最终都是通过函数Channel()
来创建一个Channel
对象:
public fun <E> Channel(capacity: Int = RENDEZVOUS,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =when (capacity) {RENDEZVOUS -> {if (onBufferOverflow == BufferOverflow.SUSPEND)RendezvousChannel(onUndeliveredElement) // 第一种,默认参数情况下返回该类型else//之前说RENDEZVOUS相当于没有缓存,前提溢出策略是SUSPEND,如果//是其他两种溢出策略,返回的对象是带有一个缓存的ArrayChannelArrayChannel(1, onBufferOverflow, onUndeliveredElement) //第二种}CONFLATED -> {/*** 这种情况下移除策略必须是默认值,实际上ConflatedChannel它* 不受onBufferOverflow这个参数的影响,你看它构造函数只接收了onUndeliveredElement。* 所以不管你传什么溢出策略都是没影响的,但是ConflatedChannel的实际使用效果* 却和BufferOverflow.DROP_OLDEST效果一样。丢掉旧值,新值覆盖旧值。* 这儿之所以要加这个require判断一下,就是防止你传了其其他两个缓存策略,你还在* 那幻想着没有接收者是挂起的场景,实际上ConflatedChannel根本就不支持。所以* 系统为了让你不要对ConflatedChannel抱有其他不切实际的幻想,才加了require判断* 其目的就是让你如要capacity=CONFLATED ,那就不支持自定义缓存策略,只能是丢掉旧址的行为* require里面的输出信息意思也是 CONFLATED 不能被用于onBufferOverflow不是默认值的情况.* 意思就是说要使用CONFLATED,onBufferOverflow就只能采用默认值。* 其实这个地方还有有点矛盾,既然使用CONFLATED时采用了默认SUSPEND。但是实际使用效果* 却不是SUSPNED的效果。* */require(onBufferOverflow == BufferOverflow.SUSPEND) {"CONFLATED capacity cannot be used with non-default onBufferOverflow"}ConflatedChannel(onUndeliveredElement) //第三种}//不受溢出策略限制,因为是无限,不会 溢出UNLIMITED -> LinkedListChannel(onUndeliveredElement) //第四种/*** CHANNEL_DEFAULT_CAPACITY默认为64,可用通过系统属性配置,大小在(1 ~ Int最大值-1)。*/BUFFERED -> ArrayChannel( //如果不是SUSPEND时,缓存大小为1if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,onBufferOverflow, onUndeliveredElement)else -> {//如果缓存大小为1,策略为DROP_OLDEST 使用ConflatedChannel比使用ArrayChannel的性能更好。if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)ConflatedChannel(onUndeliveredElement) elseArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)}}
从源码可知,我们能使用的Channel
就四种(当然还其他子类,比如BroadcastChannel)。
按照是否支持缓存Buffer
,又可以分为两种类型:
Channel五大金刚
在深入源码分析之前,先要搞清楚Channel
里面几个重要的东西,也就是几个关键的角色。Send
(代表一次发送数据),Receive
(代表一次接收),Closed
(代表Channel
被关闭), Queue
(链表模拟的一个队列),Buffer
(Channel的缓存)这五个。
Send
Send
是一个抽象类,它有好几个实现类型。可以把他理解为是对一次发送数据行为的封装,它里面不仅包括发送的数据,还有可能包含发送方的协程,如果是调用onSend()
发送数据,它里面还包括了Select
实例(SelectBuilderImpl
)。
这里我们要分两种情况来描述Send
。一种是LinkedListChannel
。一种是其他三个Channel
。
先说其他三个Channel:
当我们调用send()
或者onSend()
函数发送一个数据,如果这次要发送的数据不能直接交到接收者手上,也不能丢弃(缓存溢出策略限制),也不能放入Buffer
中(比如没有Buffer,或者Buffer满了),也不能替换Buffer
中的旧值(有Buffer的前提下)。简而言之就是需让本次执行发送行为协程挂起等待。这时候就把本次发送封装成一个SendElement
(send()调用)或者SendSelect
(onSend()调用)放入Queue
中,这两个Send
对象里面都包含了本次要发送的数据和执行发送操作的协程。这样当另一个协程调用Channel
的接收数据的函数来接收数据时,如果发现拿到的是一个Send
对象。就能从Send
对象里面拿到发送的数据和发送方的协程,拿到发送方协程后就可以通过 resume
恢复发送方的协程。
LinkedListChannel:
之所以要把它单独拿出来说,是因为LinkedListChannel
它是第一个无限制缓存的Channel
。不管是send()
还是onSend()
往它里面发送数据永远不会被导致发送方协程挂起。对于他来说,如果没有挂起的接收者在等待,发送的数据都封装成一个SendBuffered
对象存入它独有的缓存(Queue)中,SendBuffered
中只有要发送的数据,不包含发送方协程。它都不会导致发送数据的协程挂起,要协程对象干嘛?。当有接收者来去数据时只需要从SendBuffered中拿到数据即可。
SendBuffered
比较特殊,他是LinkedListChannel
独有的,linkeListChannel
永远不会被挂起,如果没有接收者,LinkedListChannel
每次都会把数据封装到一个SendBuffered
中,然后把SendBuffered
加入到Queue
中。
ConflateChannel
和ArrayChannel
如果Buffer
没有满,那么发送的数据不会进行任何封装,直接把数据存入其Buffer
中。
SendSelect
只有在使用Select
多路复用时并且遇到需要让select
函数挂起时才会被使用。
这儿只是简单介绍了一下Send的几种子类,和什么时候被使用,如果不明白,没关系,先有一个大概印象,等你学习完后面的源码就会很明白了。
Receive
同理当调用Channel
的receive()
或者onReceive()
函数去接收一个数据时,如果发现Channel
中没有可消费的数据时,就把当前接收方协程封装到一个Receivd
的对象中,并把Receive
放入Queue
中。这样当往Channel
中发送数据时,如果发现Queue中有``Receive
对象时,就知道了有接受者在等待,这样就可以从Receive
中取出接收方协程,然后恢复接收方协程,同时把数据也给到接收方。
不管什么类型的Channel
,去接收数据时,如需要把接收方挂起时,在调用不同的接收函数去接收数据时会使用不同Receive
子类:
Closed
接收方和发送方都可以调用Channel
的close()
方法关闭一个Channel
,关闭的时候会创建一个Closed
对象放到Queue
中(加入到队列尾部),起一个标识作用,标识Channel
被关闭了。
对于发送发来说,只要Queue
中有Closed
,再往Channel发送数据就会抛异常。
对于接收者来收,要分情况,即使Channel
关闭了,但是如果Channel
中有挂起等待被消费的数据,或者Buffer
中还有未被消费的数据,接收者也可以先继续接收数据,只有在Channel
中没有可被消费的数据时,接收数据时遇到Channel被关闭才会抛出异常。
Queue
Queue
我们暂且把他理解为Channel
中用来存储Send
,Receive
,Closed
的一个数据结构,实际上他不是一个数据结构,他既不是一个队列,也不是一个链表,他只是一个Node(节点),对没错,它只是链表中的一个节点。在Channel
中实际上只持有了一个节点:
LockFreeLinkedListHead
继承自 LockFreeLinkedListNode
。LockFreeLinkedListNode
它有两个指针prev
和next
。分别用来指向上一个和下一个,new一个这个节点的时候,在没有被被添加到其他节点上时,prev
和 next
都指向自己:
每一个LockFreeLinkedListNode
有一个remove
函数,可以让自己从整个链表中被删除,然后自己的上一个和下一个重新建立连接。为什么要用LockFreeLinkedListHead
而不直接用LockFreeLinkedListNode
呢?是因为LockFreeLinkedListHead
它虽然继承了LockFreeLinkedListNode
,但是它仅仅只重写了LockFreeLinkedListNode
中remove
相关的函数,使其自己不能被删除。这样LockFreeLinkedListHead
就可以作为一个哨兵节点。它永远不会被删除。最开始没有添加任何东西的时候LockFreeLinkedListHead
的prev
和next
同样指向自己。当我们往哨兵节点上添加新节点时,哨兵节点的next
执行第一个新节点,prev
指向最后一个新节点,最后一个新节点的next
执行哨兵节点。这样形成一个双向循环链表:
LockFreeLinkedListNode
通过限制添加和删除方法,使其添加元素时只能添加到尾部,取数据时只能从头开始取,从而达到了和使用队列时先进先出,后进后出的效果。LockFreeLinkedListNode
内部采用了CAS
而非锁的方式保证了添加和删除时的线程安全问题,哨兵节点的prev
和next
分别指向了头结点和尾结点,这样在Channel
中只要我们持有了哨兵节点。就能实现取数据时取第一个节点,添加数据时添加到最后一个节点上。Send
,Receive
,Closed
都是LockFreeLinkedListNode
的子类,因此可以被添加到Queue
中。
通过以上的解释,这就是我为什么说暂且认为它是一个数据结构,你说它一个数据结构吗?但却只是一个节点。你说它不是一个数据结构吧,你又可以通过它存取数据。是什么不重要,重要的是理解了他本质就行。之所以要把它说成是一个队列(Queue
),是为了方便右面内容分析时有个具体的表达方式。
Buffer
Buffer
就是Channel
的缓存,如果一个Channel
支持Buffer
:
发送数据时如果有接收者在等待(那意味着Buffer是空的)直接把数据交给接收者。如果没有接收者在等待那就把数据放入Buffer。如果Buffer满了根据缓存溢出策略决定如何处理。
接收数据时,如果Buffer
里没有数据(那意味着还没有往Channel
发送数据),那接收者挂起,如果Buffer
里有数据,直接从Buffer
里取数据
这五大金刚可以说是Channel的核心,重中之重。Channel的发送,接收,关闭,取消,都离不开他们。
Channel的行为
在分析Channel
源码之前有必要先说一下Channel
的行文,不然你一头扎进源码去了容易迷失方向。只有在清楚Channel
大概思想的前提下再去看它的具体细节才不会迷失,就不会出现还未深入了就被劝退了。
Channel
有好几种不同的实现,有支持缓存Buffer
的,也有不支持缓存Buffer
的。不同Channel
的Buffer
容量不一样,容量满了后处理的方式也不一样(称之为缓存溢出策略)。因此也就导致不同Channel
发送数据和接收数据的流程上也稍许不同,为了更好的理解Channel
的行为时,我们暂且忽略掉Channel
的种类,不用去管在使用中到底是使用的那一种Channel
,我们只要知道我么用的是Channel
就行,我们只关注Channel
的两个重要特征,第一Buffer
的容量,第二缓存溢出策略。我们把Channel
想象成一根管道。这个管道的容量为 0 ~ 无穷大。 即意味着这根管道可以容纳0到无穷多个数据。发送数据就是往Channel
里面扔数据。接收数据就是从Channel
里面取数据。发送数据的协程我把他当做发送者,接收数据的协程当作接收者。不管他们具体怎么实现,只要是一个Channel
,在行为上都可以总结出如下规律(不包括Select多路复用场景):
发送数据时:
- 先发的数据先被接收。
- 发送数据时有接收者在等待(有接收者在等待说明Channel的Buffer中是没有等待被接收的数据),那么唤醒接接收者并把本次要发送的数据交给接收者。不用放入管道(Channel)中(即不管是挂起等待还是放入Buffer)。本次发送成功,发送者(协程)继续执行剩余操作(可能是继续发送也可能是其他和任意操作)。
- 发送数据时如果没有接收者在等待,那要根据自身的Buffer容量和缓存溢出策略来决定:
- 如果Buffer容量0(比如RendezvousChannel)。把发送者(协程)挂起。等待接收者来取
- 如果Buffer容量大于0(最少都有一个),那先看Buffer满了没?如果没有满则放入Buffer中,相当于往Channel中
扔数据成功了。发送者继续执行剩余操作。如果满了根据溢出策略,类决定是否挂起?还用新值替换最旧的那个值?
还是放弃本次发送的数据?只要不是挂起这种策略,也相当于被认为这次发送是成功的,因此发送者继续执行。不会被挂起。
- 发送数据时发现Channel被关闭了,认定为发送失败。需要做一下几步操作:
- 如果设置了发送的数据不会到达接收者的回调(OnUndeliveredElement),那么先回调。
- 如果Queue中任有其他接收者在等待,需要唤醒所有接收者(协程),不然接收者将永远没有机会被唤醒
- 最后发送者(协程)因收到一个发送失败异常(ClosedSendChannelException)而终止。
接收数据时:
- 先到的接收者先接收数据
- 接收数据时支持Buffer和不支持Buffer有所不同:
- 不支持Buffer的Channel.
- 先看有没有发送者挂起等待被接收,如果有从挂起的发送者中取出数据,恢复发送者协程,接收者拿到数据,继续执行。
- 如果没有没有发送者挂起等待,并且Channel没有被关闭,那么接受者挂起等待
- 如果发现Channel中没有发送者挂起等待,并且Channel被关闭了,那么接收者(协程)要么收到一个接收失败异常(ClosedReceiveChannelException)而终止,要么接收到一个带有异常信息的ChannelResult结果。用户可以
根据这个结果自行决定如何处理。
- 支持Buffer的Channel
- 如果Buffer中有数据直接从Buffer中取数据,接收者继续执行。
- 如果Buffer中没有数据(既然Buffer中没有必然没有发送者挂起)。接收者挂起等待。
- 如果Buffer中没有数据,并且发现Channel被关闭了,那么接收者(协程)要么收到一个接收失败异常(ClosedReceiveChannelException)而终止,要么接收到一个带有异常信息的ChannelResult结果。用户可以
根据这个结果自行决定如何处理。
- 不支持Buffer的Channel.
关闭Channel
不管是发送者还是接收者调用的close()时: 都会去看看Channel中有没有挂起等待的接收者。如果有就把挂起的发送者从Channel中移除,并且唤醒接收者的协程,根据接收者接收数据时是调用的receive()方法还是receiveCatching()的不同,接收者被唤醒后的行为不一样,如果是调用receive()被挂起的,接收者协程会抛出一个异常(ClosedReceiveChannelException),如果接收者是调用的receiveCatching()被挂起的,接收者协程会收到一个带有异常的ChannelResult。
被关闭的Channel不能再继续发送数据,但是如果Chananl里面还有没有被消费的数据仍然可以被接收,直到消费完了后才能不再被接收。
取消Channel
可以通过调用Channel的cancel()取消一个Channel。取消一个Chanel的流程分为两步:
第一步:先调用close()关闭Channel。和上面“关闭Channel”一样。
第二步:去看看Channel中有没有没有还未被消费的数据(包括挂起等待的和Buffer中没有被接收的).
- 针对Buffer中未被消费的数据直接移除,如有设置OnUndeliveredElement则回调移除的每一个数据,
- 针对挂起的发送者,同样如果有设置OnUndeliveredElement则回调,最后唤醒发送者协程并抛给发送者一个ClosedSendChannelException
线程安全
Channel不管是发送还是接收都是线程安全的。
看似比较复杂,其实总结下来就一句话:对于发送方来说,优先把数据交给接收者,没有接收者在考虑放进Buffer中,Buffer满了再根据溢出策略执行,对于接收方来收,优先考虑从Buffer中取数据,如果没有再考虑从挂起的发送者取数据。针对关闭的情况,只要关闭了发送方就不能继续发送,但是接收方如果Buffer中有数据任然可以继续接收,直到Buffer中没有数据。
如果针对以上Channel的协程表示看不懂的,没关系,以上的行为是根据Channel的源码终结出来的。学习完后面的源码就明白了。
Channel源码分析
很多朋友在刚接触Channel
的时候就被Channel
里面复杂的源码代码给劝退了,实际上Channel的源码很简单,只要静下心来,耐心的去分析各个类之间的关系,理清楚关系,最后发现也不难。不管怎么说,Channel
涉及的关键类就那么几个。能有多复杂。所以耐心的看完本文,一遍不行就多看几遍,结合源码,我相信人人都能学会。
Channel
是干嘛的,他的主要作用就是在不同协程之间,用来发数据和接收数据的,所以只要我们从发送数据和接收数据两个入口来分析就能把整个Channel
分析透彻。
在前文我们知道了Channel
有四个比较重要的子类,这几个也是我们日常用的比较多的几个。不同的子在发送和接收数据时都有自己的行为和方式。但是大部分逻辑都写在父类AbstractChannel
和AbstractSendChannel
中。
如果让你画一座山,你只有站在远处能看到整个山的全貌你才能画出来,因此在学习Channel
源码的时候,我们先丢掉那些反锁的细节,直接去寻找它的大动脉,至于那些细小的血管,稍后在分析。
发送数据大动脉
以调用send()
函数去送一个数据为例。首先是调用父类的send()
函数,在send()
函数面去调用offerInternal()
,该函数是子类具体如何把一个数据发送到Channel
里面去的实现,至于子类如何发送,不同的子类有不同的实现方式(比如不支持Buffer的Channel只能把数据交给发送者,支持Buffer的可以把数据放入Buffer中),不管怎么发送,细节暂时不管,父类也不关心,反正就是父类说子类你自己去发送吧,把结果告诉我就行,如果子类说发送成功。ok 返回从send()
返回,这次发送结束。如果失败。那父类只能继续处理这个烂摊子,父类就调用自己的sendSuspend()
函数。该方法是一个挂起函数,在sendSuspend()
函数中,父类会把本次要发送的数据和协程包装成一个Send
对象,调用enqueueSend()
函数把Send
对象放入Queue
中(队尾),正常情况下是能加入成功的,加入成功后,sendSuspend()
结束,发送方挂起,如果遇到特殊情况,比如Chanel
被关闭了,sendSuspend()
调用流程里面就会抛出一个异常。
offerInternal这个函数的名字取得特别形象,offer理解为往Channel发送的意思,Internal内部的意思,指的就是子类自己,组合在一起就是子类你先自己先按照你你的方式往Channel里发送数据,搞不定在叫我(父类)。
接收数据大动脉
理解了上面发送数据的大动脉,在来看接收数据的大动脉就容易明白了。有send()
就有receive()
,有offerInternal()
就有pollInternal()
,有sendSuspend()
就有receiveSuspend()
。
当调用receive()
函数去Channel
中接收一个数据时,首先也是调用父类的receive()函数,父类说,儿子你先自己按照你的方式去取数据吧,你怎么取我不管(比如有的儿子就很轴,非得有发送者在哪儿挂起等他他才能取到数据,有的儿子就从Buffer
中取数据)。是取成功了还是取失败了,只要告诉我(父类)结果就行,其他的我不关心。如果子类取成功了,父类就很开心啊,直接把子类取到的结果返回出去。如果取失败了,父类又要开始骂了,你个笨蛋,这都搞不定,还得我来帮忙,然后父类就调用自己的receiveSuspend
挂起函数,在receiveSuspend
中,把接收方协程包装成一个Receive
对象调用enqueueReceive()
函数把Receive
对象放入Queue
中。正常情况下都会放入成功,放入成功,receiveSuspend
函数结束,接收方协程挂起,如果遇到特殊情况,比如Chanel
被关闭了,sendSuspend()
调用流程里面就会抛出一个异常。
同样pollInternal这个函数和OfferInternal一样,poll理解为从Channel中拿,Internal内部的意思,指的就是子类自己,组合在一起就是子类你自己先按照你的方式先从Channel取数据,搞不定在叫我(父类)。
有了前面的所有铺垫,终于可以开始撸源码了,
父类默认实现方式(RendezvousChannel)
AbstractSendChannl
中提供了offerInternal
的默认实现方式,AbstractChannel
提供了默认的pollInternal
实现,这两个默认试下也是RendezvousChannel
的实现。其他几个子类都重写了offer和poll相关方法。只有RendezvousChannel
没有,所以分析父类的offer和poll相关的操作就是在分析RendezvousChannel
的offer和poll操作。为了保持统一,暂且认为RendezvousChannel
也有自己的offer和poll,只是行为上和父类一模一样。
发送流程
Channel发送一个数据有四种方式:
- send(element) : 发送一个数据给Channel,是一个挂起函数
- onSend: 非挂起函数,在多路复用(Select)时使用该方法发送一个数据到Channel.
- trySend(element):非挂起函数,返回值类型为ChannelResult
- 如果发送成功(有接收者等待)ChannelResult.isSuccess = true
- 如果发送失败(没有接收者等待)ChannelResult.isFailure = true
- 如果Channel被关闭了ChannelResult.isClosed = true
- trySendBlocking(element) : 一个阻塞式发送(非挂起),线程会阻塞在此处,直到有结果返回,
返回结果类型为ChannelResult。它内部实际上用到了runBlocking函数,因此不要把它用在协程里面。- 如果有接收者在等待,把数据交给发送者,自己立马返回ChannelResult.isSuccess = true
- 如果Channel被关闭,也立马返回,ChannelResult.isClosed = true
- 即没有接收者也没关闭,那就阻塞等待,直到有接收者消费了再返回ChannelResult.isClosed = true
或者一直没人消费,那就一直阻塞,即使Channel被关了也没用。
本文着重讲send(element)和 onSend()两个,其他两个方相对简单。
send()函数
//AbstractSendChannel.send
public final override suspend fun send(element: E) {/*** 先调用子类的offerInternal让其把数据发送到Channel里面。* 子类如何offer,父类不关心细节,子类只需要把offer的结果告诉父类就行。* 有三种结果:* OFFER_SUCCESS:说明子类把数据offer成功了* OFFER_FAILED:说明子类offer失败:* Closed:说明Channel被关闭了。不能继续发送数据了。* * 如果子类offer成功,父类直接return,本次send流程也就终止了。*/if (offerInternal(element) === OFFER_SUCCESS) return/*** 如果offre的结果不是成功,那就父类继续处理。*/return sendSuspend(element)
}
offerInternal
父类AbstractSendChannel
中提供了offerInternal
的默认实现,该实现方式也是RendezvousChannel
的实现方式,其他几个子类都有自己的实现方式。总的来说,四个子类的offerInternal
实现方式可以分为两大类:
第一类:不支持Buffer的的Channel:
- 有接收者在等待,直接把要发送的数据交给接收者,返回OFFER_SUCCESS
- 没有接收者在等待,返回OFFER_FAILD,意味着offer失败
- 如果Channel被关闭,返回结果为Closed
第二类:支持Buffer的Channel:
- Buffer为空:意味着之前还没有发送过数据或者之前发送的都被消耗了.
- 有接收者在等待,把数据交给接收者,返回OFFER_SUCCESS
- 没有接收者在等待,把数据放入Buffer,返回OFFER_SUCCESS
- 如果Channel被关闭,返回结果为Closed
- Buffer不为空,但是没有满:意味着之前发送的数据还没有被消费完。所以只能继续放入Buffer,返回OFFER_SUCCESS,即使接收者来了也应该先把之前发送的数据消费了。
- Buffer不为空,但是满了:
- 溢出策略为SUSPEND:返回OFFER_FAILD
- 溢出策略DROP_LATEST:丢掉本次的,也相当于本次是成功的,返回OFFER_SUCCESS
- 溢出策略为DROP_OLDEST:丢到最旧的,用本次的数据替换掉Buffer中最早那一个,返回返回OFFER_SUCCESS
/*** AbstructSendChannel.offerInternal* 父类中默认的实现方式,即也是RendezvousChannel的实现方式,是一种不支持Buffer的实现方式* 返回结果:OFFFER_SUCCESS | OFFER_FAILED |Closed*/
protected open fun offerInternal(element: E): Any {while (true) {/*** takeFirstReceiveOrPeekClosed()意思就是如果Queeu中第一个节点是ReceiveOrClosed类型* 就取出,并把该节点从Queue中移除。如果Queue为空或者不为空但是第一个节点不是ReceiveOrClosed* 类型,则返回null. * ReceiveOrClosed是 Receive 和Closed的父类。所以如果返回不为null时,既可能是一个接收者Receive,* 也有可能是一个关闭Closed.** 如果takeFirstReceiveOrPeekClosed()返回null,也就意味着Queue中没有接收者,并且Channel* 没有被关闭,直接return OFFER_FAILED,offerInternal失败,交给父类继续处理* * 如果想看takeFirstReceiveOrPeekClosed的内部流程可以查阅下面 “源码1”*/val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED /*** 如果是一个Closed对象.tryResumeReceive方法里面什么也不做直接返回 RESUME_TOKEN* 如果receive不是一个Closed,即是一个ReceiveElement或者ReceiveSelect,* (1)如果是ReceiveElement:* ->tryResumeReceive会调用receive中接收方协程continuation的tryResume方法* 去尝试恢复接收方协程。并把数据带过去然后返回RESUME_TOKEN。* * * * tryResumeReceive 和 completeResumeReceive里面分别调用了协程的* tyeResume和completeResume。这两个函数是搭配使用的,我们知道恢复一个协程* 可以调用其resume函数,那这里为什么要用try 和 complete这个组合呢?因为Channel是* 可能存在多线程的情况,很有可能我们拿到这个receive的时候,正准备去resume的时候,receive* 对应的协程被取消了,在其他线程里面调用了协程的cancel把协程给取消了,或者被其他* 线程强先一步把receive对应的协程给resume了,如果只是协程被cannel了就算调用resume也不会* 有任何效果,resume这次调用会被丢弃。* 但是如果是被别的线程先一步resume了,这次再次调用resume就会抛出异常。因此再此处不能* 直接使用协程的resume函数去恢复一个协程。* 协程的tryResume会判断协程是否已经被取消了或者已经resume了,* 如果是就会返回null,反之则返回RESUME_TOKEN,tryReusme只是前期的一个检查操作,如果协程没有* 被取消,没有被resume,返回RESUME_TOKEN的同时会做一个标记,这样别的线程想调用tryResume就会* 返回null 了。* 真正让协程resume的操作是completeResumeReceive里面去调用的。* 一旦一个线程调用了协程的tryResume函数,即使这个线程还没来得及调用compleResume,被另外一个线程* 也调用了该协程的tryResume,另外一个线程的tryResume会返回空,因此不会存造成同一个协程被resume两次。** ReceiveElement部分源码在下面列出“源码2”)* * (2)如果是ReceiveSelect: * tryResumeReceive会让select状态设置为选中。(“具体看源码(2)”)* */val token = receive.tryResumeReceive(element, null)if (token != null) {assert { token === RESUME_TOKEN }/*** 如果是receive一个Closed对象:(Closed源码下面已经列出“源码2”)* tryResumeReceive方法里面什么也不做,是一个空方法。** 如果receive是ReceiveElement:* 那么调用接收方协程continuation的completeResume方法去恢复接收方协程* * 如果receive时ReceiveSelect:* 会开启一个新协程,把要发送的数据传入到新协程里面,新协程执行完后才就会恢复select* 所挂起的协程,新协程的返回值作为select的结果。*/receive.completeResumeReceive(element)/** * 只要不是Close,offerReuslt都为父类Receive中返回的OFFER_SUCCESS(查看下面“源码2”)* 如果recieve是Close,offerResult的结果返回的是Close自己(查看下面“源码2”)*/return receive.offerResult}}
}// --------------------------------------源码(1)------start------------------------------------------/** AbstractSendChannel.takeFirstSendOrPeekClosed** 如果Queue中第一个节点是Receive or Closed 那么返回就返回第一个节点,并把该节点从queue中移除,否则返回null.* */
protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =//具体的操作调用Queue的方法,传递了一个lambad,这个lambad的意思就是取出的这个节点是否为Closedqueue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })/** LockFreeLinkedList.removeFirstIfIsInstanceOfOrPeekIf** 这个方法内部还是有点复杂,主要流程如下:* 如果Queue为空,那么return null.* 对于发送方来说就是Channel没有被关闭,也没有接收者挂起等待* 对于接收方来说就是Channel没有被关闭,也没有发送者挂起等待* 如果Queue不为空,取出第一个节点。看看第一个节点是不是我们想要的 “T”类型* 对于发送方来说 “T” 就是 ReceiveOrClosed类型* 对于接收方来说 “T” 就是 Send类型* 如果不是我们想要的 "T"类型 那就 return null.* 如果是,那在看看是否为Closed类型。如果是直接返回Closed(不从Queue移除),* 如果不是Closed类型,那就是我们需要的"T" 类型,先把第一个节点从 queue中移除。然后把第一个节返回出去。** */public actual inline fun <reified T> removeFirstIfIsInstanceOfOrPeekIf(predicate: (T) -> Boolean): T? {while (true) {//首先从Queue中拿到第一个节点val first = this.next as Node/*** 如果第一个节点和自己(queue)相等,说明了queue是空的,只有一个哨兵节点,都指向自己* 相对于发送方来说就意味着没有接收者挂起等待接收,Channel也没有被关闭。* 相对于接收方来说就意味着没有发送者挂起等待接收,Channel也没有被关闭* 那么直接 return null*/if (first === this) return null /*** 如果有第一个节点,如果第一个节点不是我们想要的类型,也直接 return null* 那么我们想要的类型是什么?T 代表了什么呢?* T 是可推导的泛型:* 对于发送方来说: T就是ReceiveOrClosed* 对于接收方来说: T就是Send*/if (first !is T) return null/*** 我们传过来的lambdab。比如对于发送方来说lambda为{ it is Closed<*>}* predicate为true说明了queue中第一个节点是Closed类型,false则不是。*/if (predicate(first)) {/** * 进入到这里面,代表了first是Closed.* isRemoved()方法实际上是判断fist节点的next节点是否为一个Removed类型节点。* Removed节点是对其他节点的一个封装。它表示了这个节点正在被移除(还没有完全移除)* 也就是说,如果first.isRemoved返回了ture,代表了first的下一个节点正在被其他线程* 正在移除。如果是这种情况,不能直接把first返回。Queue它模拟的是一个队列结构。他是一个线性* 的,先进先出,不能跳过中间的,first.isRemoved = true,说了此时此刻其他线程已经开始消费* Queue中fisrt这个节点的下一个节点了,由于是线性操作原则,因此也就是说first这个节点必然已经被其他* 线程消费掉了。因此如果此时在返回frist就不对了。只能继续走下面的first.removeOrNext()调用* 这个first.removeOrNext()里面会把first的next节点返回出来,next是一个Removed节点。* 然后再走Removed.helpRemovePrev()该方法会把Removed节点从Queeue中删除,这样,while在下* 一次循序的时候Queue中原来的first 和 first的下一个节点都已经不在队列里面了。* ** 如果first是Closed类型,Closed标识了Channel被关闭,那么Closed后面将不会再有其他节点。* Closed的nex节点就是哨兵节点,哨兵节点不是一个人Removed节点,因此返回false.* 所以最终把Closed节点返回出去了。*/if (!first.isRemoved) return first}/*** 如果上面没有return出去。那么对于发送方来说first就是Receive类型* 对于接收方来说first就是Send类型。** removeOrNext()方法做的事情是把first自己从Queue中删除,如果删除成功则return null* 如果自己已经被删除了,那么返回自己的下一个节点。* * 正常情况下肯定是能被删除的,但是也有特殊情况,比如多线程环境下:* 假如线程T1 走到此处,从Queue中拿到了first。此时Queue中结构如下:* [Hea]<=> [receive1] <=> [receive2] <=> [receive3] <=> [Head]* T1 拿到的first = [receive1]* Cpu 切换到T2。T2是一个发送方。T2也执行到该方法,也拿到了first = [receive1]* T2继续执行,调用了first.removeOrNext()方法,把frist从Queue中移除了,* 此时 Queue的情况如下:* 【Hea】 <=> [receive2] <=> [receive3] <=> [Head]* 而在T2调用frist.removeOrNext函数中把reccie1的指向就变成了这样:* [receive1].next = [Removed[ref = receive2]] 即receive1的下一个指向了一个Removed类型* 在Removed中有一个ref变量指向了receive2.* [reveive1].prev = [Head]* T2执行完了。出去了。Cpu切换到T1是。** 在T1中first仍然指向了[receive1], T1线程调用first.removeOrNext再想把first* 从Queue中移除就不会成功,以为[receive1]已经不在Queue中了 * 因此在T1 中调用first.removeOrNext()方法时将不会成功,于是就找到[receive1]的next,* [receive1]的next现在是[Removed[ref = receive2]],发现是一个Removed类型* 就就返回Removed.ref,刚好ref执行了[receive2]。因此最终[receive2]被返回出来了。** * removeOrNext的具体详情,看下面源码*/val next = first.removeOrNext()//如果first从queue中删除成功,把就把fist返回出去,first就是 T 类型,想要的类型if (next === null) return first/*** next != null 说明 first节点已经被删除了。next指向的就是first原来的下一个节点** helpRemovePrev做的事情就是帮助把上一个节点移除(prev),谁的上一个,next的上一个。* next 是 [receive2],那么它的上一个是[receive1],所以意思是帮忙把first从Queue中移除。* 说白了,就是上面的的T1线程拿到[receive1]的时候,发现[receive1]被线程T2抢先一步了。* 因此T1想做的最后事情就是确保[receive1]能被从Queue中移除。实际上有可能[receive1]已经* 被线程T2成功移除了,但是也有可能线程T2对[receive1]的移除操作才执行到一半。cpu就切换到* 线程T1了,这都无所谓,因为LookFreeLinkedListNode采用CAS操作的,是线程安全的,不管谁先,* 都不重要。谁移除了,另外一个就不用更新Queue了。最终的目的就是让[receive1]从Queue中移除。*/next.helpRemovePrev()//然后while循环继续。此时[receive1]已经不在Queue中了。那么下一次while循环first就是[receive2]}}/** LockFreddLinkedListNode.removeOrNext* removeOrNext方法它是LockFreddLinkedListNode提供的一个删除方法,一个节点(Node)调用* 该方法就是把自己从整个链表上移除,移除成功返回null. 如果当前节点已经被移除了(被其他线程)则返回* 当前节点的下一个节点**/
internal fun removeOrNext(): Node? {while (true) { //拿到当前节点的下一个节点,比如接着上面的例子(this 就是 first,也就是Queue中第一个节点)val next = this.next/*** 如果next 是 Rmoved类型,说明当前节点被另一个线程移除了(有点绕)。* * 通过removeOrNext这个方法把节点自己从链表中删除时和我们以前认知的链表操作行有点不一样* 假如有如下链表:* [Head]<=>[node1]<=>[node2]<=>[node3]<=>[Head]* 由于该链表是模拟队列的操作,因此取数据只能从头开始取,所以取的[node1],* 要把[node1]从连变种删除,共分为三步:** 1. 创建一个Removed节点,让Removed节点的ref变量指向自己的下一个节点(node2)* 2. 让自己的next指针指向Removed节点,那么就会形成两条不完整的链表:* 第一条:[node1]的next 指向了[Removed]* [Head]<=>[node1] ->[Removed(ref = node2)]* 第二条:[node2]的prev依然还是指向了[node1]* [Head]<=>[node1] <- [node2]<=>[node3]<=>[Head]* * 3. 然后通过调用[node2]的correctPrev函数来整理链表,把[node1]从链表中踢出去:* [Head]<=> [node2]<=>[node3]<=>[Head]* * 所以如果next是Removed类型,那就说明当前节点(this)被另外一个线程从Queue中移除了。* 有没有把上面的三步走完不知道,但是至少第二部执行完了。* * next.ref 指向的就是当前节点原来的下一个节点。把它返回出去。*/if (next is Removed) return next.ref /*** 只有哨兵节点才会存在他的next指向自己,在我们使用Channel发送和接收数据时不会发生这种情况* */if (next === this) return next /*** 创建一个Removed类型节点,让Removed.ref = next. * 对应上面举例中的第一步*/val removed = (next as Node).removed()/*** 当前节点的"_next"指针原来指向的next变量。现在修改为指向上一步创建的Removed。* 对应上面举例重的第二步。**/if (_next.compareAndSet(next, removed)) {/** 调用correctPrev()更新链表,把当前节点从链表中删除,对应上面举例中的第三部。* 该方法里面实际上并不复杂,就是来回各种修改指向,最终达到删除成功的目的。* 感兴趣的同学可以自己看看源码。*/next.correctPrev(null)//最终帮当前节点删除成功,return null.return null}}}//LockFreeLinkedListNode.remove()
private fun removed(): Removed =_removedRef.value ?: Removed(this).also { _removedRef.lazySet(it) }private class Removed(@JvmField val ref: Node) {override fun toString(): String = "Removed[$ref]"
}// -------------------------------------源码(1)------end---------------------------------------------// --------------------------------------源码(2)------start------------------------------------------internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {override val offerResult get() = OFFER_SUCCESS //只要不是close,都为OFFER_SUCCESS,子类没有重写该属性abstract fun resumeReceiveClosed(closed: Closed<*>)open fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? = null
}private open class ReceiveElement<in E>(@JvmField val cont: CancellableContinuation<Any?>,@JvmField val receiveMode: Int) : Receive<E>() {// 如果接收方调用的是receiveCatching()方法,那么会把数据封装成ChannelResult对象//如果不是,直接返回原来的valuefun resumeValue(value: E): Any? = when (receiveMode) {RECEIVE_RESULT -> ChannelResult.success(value)else -> value}//要么返回null 要么返回 RESUME_TOKENoverride fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {//tryResume并不会让协程恢复,它要做的事情就是检查协程有没有被取消,有没有已经resume了。//如果都没有会把发送的数据暂时存起来。val token = cont.tryResume(resumeValue(value), otherOp?.desc, resumeOnCancellationFun(value)) ?: return nullassert { token === RESUME_TOKEN } otherOp?.finishPrepare()return RESUME_TOKEN}//调用接收方的协程的completeResume方法,恢复挂起的发送方协程。并把tryResume时存起来的数据给到//接收方协程。override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
}private class ReceiveSelect<R, E>(@JvmField val channel: AbstractChannel<E>,@JvmField val select: SelectInstance<R>,@JvmField val block: suspend (Any?) -> R,@JvmField val receiveMode: Int
) : Receive<E>(), DisposableHandle {override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =/*** 让select状态设置为选中(isSelected = true)。* select选中了某一路后,其他路(其他已经被添加到Queue中的ReceiveSelect)都会被从Queue移除。* 具体源码就不讲了,属于Selcet的内容。*/select.trySelectOther(otherOp) as Symbol?override fun completeResumeReceive(value: E) {/*** block为在select中调用onReceiveXXX时传入的suspend函数类型。* 用block开启一个新协程,并把要发送的数据作为参数出入block中。* 这个新协程的启动模式为Default。调度器与select所在协程的调度器一样。* 新协程执行完后才就会恢复select所挂起的协程,新协程的返回值作为select的结果。** resumeOnCancellationFun(value)返回了一个新协程如果被取消的回调,意思是* 如果这个新协程还没执行完成的时候被取消了,也就意味着这一次去的数据失败了,* 需要回调onUndeliveredElement。新协程会因为select所在的协程被取消而取消。* */block.startCoroutineCancellable(if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,select.completion,resumeOnCancellationFun(value))}override fun resumeReceiveClosed(closed: Closed<*>) {if (!select.trySelect()) returnwhen (receiveMode) {RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed<R>(closed.closeCause), select.completion)}}override fun dispose() { // invoked on select completionif (remove())channel.onReceiveDequeued() // notify cancellation of receive}override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =channel.onUndeliveredElement?.bindCancellationFun(value, select.completion.context)override fun toString(): String = "ReceiveSelect@$hexAddress[$select,receiveMode=$receiveMode]"}internal class Closed<in E>(@JvmField val closeCause: Throwable?
) : Send(), ReceiveOrClosed<E> {val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)override val offerResult get() = this //返回自己。override val pollResult get() = thisoverride fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }override fun completeResumeSend() {}//由于otherOp = null,所以直接返回RESUME_TOKENoverride fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }override fun completeResumeReceive(value: E) {}override fun resumeSendClosed(closed: Closed<*>) = assert { false } override fun toString(): String = "Closed@$hexAddress[$closeCause]"
}// -------------------------------------源码(2)------end---------------------------------------------
总结: 父类的offerInternal实现就是如果有接收者在等待,把数据交给接收者,唤醒接收者,返回OFFER_SUCCESS,如果Channel被关闭,则把Closed对象返回出来,如果Channel既没被关闭,也没有接收者在等待,则返回OFFER_FAILED
sendSuspend()
父类根据子类offerInternal
的结果,只要结果不是OFFER_SUCCESS
。父类调用sendSuspend
函数继续处理。在sendSuspend
函数中,父类的核心就是把本次要发送的数据和发送方的协程打包成一个Send
对象调用enqueueSend
函数把Send
加入Queue
中,让发送的协程挂起,当然也有可能发生意外,比如有人搞事情,其他线程把Channel关闭了,或者其他线调用了receive
函数来接收数据了,因此父类根据enqueueSend
返回结果有以下行为:
- 返回
null
:说明了Channel没有被关闭,也没有接收者来或者支持Buffer的Channel任然是满的(意思就是不能往Channel里面扔数据)。那么父类就把发送方协程挂起。 - 返回
Closed
:说明在调用enqueueSend
方法时,Queue的尾结点是Closed
。那么意味着Channel
被关闭了,那么sendSuspend
抛出异常(并不会挂起发送到协程,而是直接跑异常),都关闭了还发个毛线。 - 如果返回
ENQUEUE_FAILED
:说明在此期间,支持Bufferd
的Channel
的Buffer
有空间了(被其他线程把原来满的Buffer中的数据给取走了,让Buffer有空间放新发送的数据)。那么重新调用子类的offerInternal
函数,让子类把本次要发送的数据放入Buffer
中。最终sendSuspend
返回Unit,不会把发送方挂起,发送方协程继续执行。 - 如果返回
Receive
对象:说明不支持Buffer
的Channel
有接收者来了,那么重新调用子类的offerInternal
函数。让子类把数据交给接收者,最终sendSuspend
返回Unit,不会把发送方挂起,发送方协程继续执行。
/** AbstructSendChannel.sendSuspend* 针对子类Offer不成功的情况下父类的继续处理逻辑:* 如果对suspendCancellableCoroutineReusable不是很理解的可以查看:* https://blog.csdn.net/Just_keep/article/details/130952896?spm=1001.2014.3001.5501*/
private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont loop@ while (true) { //一个死循环/** * isFullImpl:* Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull** isBufferFull:缓存是否满了。RendezvousChannel 没有缓存,因此 isBufferFull 永远为true.* protected final override val isBufferFull: Boolean get() = true** isFullImpl = true 也就意味着队列里面第一个节点不是ReceiveOrClosed类型,* 也就是意味着没有接收方在等待,Channel没有被关闭,并且缓存数据buffer已经满了* (isBufferFull = true).那么需要挂起发送方。** */if (isFullImpl) { //把要发送的数据和发送方的协程对象封装到一个Send对象中。// SendElementWithUndeliveredHandler继承SendElement,//只是多了一个onUndeliveredElement回调val send = if (onUndeliveredElement == null)SendElement(element, cont) elseSendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)/*** 把send对象加入到Queue中(尾部)* enqueueResult 有4种返回值情况:* * (1)null : 表示入队成功,发送方协程成功挂起。* (2)Closed :如果Queue尾部节点是一个Closed,说明Chanel被关闭了,* send不会被加入到Queue中,返回Closed,* (3)ENQUEUE_FAILED : 说明缓存数据的buffer没有满,又有空间了。send不入Queue* (4 )Receive:如果是一个Receive类型,说明接收者来了,返回Receive,Send不入Queue* */val enqueueResult = enqueueSend(send)//(查看下面“源码1”)when {//入队成功.enqueueResult == null -> { //给发送方协程注册一个取消监听。当发送方协程取消时从Queue中移除send,//同时如果send 是SendElementWithUndeliveredHandler那么还会调用//onUndeliveredElement的回调,意味着这次挂起的发送将不会到达消费者。cont.removeOnCancellation(send)//从lambda中返回,回到suspendCancellableCoroutineReusable方法里面//回到suspendCancellableCoroutineReusable函数里面把COROUTINE_SUSPENDED返回//到sendSuspend函数,然后sendSuspend函数把COROUTINE_SUSPENDED返回到send函数里面//最后send函数再把COROUTINE_SUSPENDED返回到调用它的地方,协程收到//COROUTINE_SUSPENDED就会挂起。//如果你对suspendCancellableCoroutineReusable里面的原理不是很明白,//可以查看另一篇文章 "你真的了解kotlin中协程的suspendCoroutine原理吗?"// “https://blog.csdn.net/Just_keep/article/details/130952896?spm=1001.2014.3001.5501” return@sc}//如果在入队的时候发现Channel被关闭了。enqueueResult is Closed<*> -> {//helpClose意味着要执行一些帮助Channel关闭的操作。具体看”源码2“//如果有onUndeliveredElement,那么调用它.//在helpCloseAndResumeWithSendException函数里面会调用//协程的(CancellableContinuationImpl)的resumeWithException函数//把Channel关闭的异常暂存入CancellableContinuationImpl中。//当下面'return @sc'执行完成后,在suspendCancellableCoroutineReusable函数//中会调用CancellableContinuationImpl的getResut函数,在getResut函数中//会直接throw 刚才暂存的 SendException。因此发送方协程不会挂起,直接抛异常了//如果你对suspendCancellableCoroutineReusable里面的原理不是很明白,//可以查看另一篇文章 "你真的了解kotlin中协程的suspendCoroutine原理吗?"// “https://blog.csdn.net/Just_keep/article/details/130952896?spm=1001.2014.3001.5501”cont.helpCloseAndResumeWithSendException(element, enqueueResult)return@sc}//如果这个时候缓存数据的buffer有剩余空间了,send就不会被添加到Queue中//那么什么都做,调用子类的fferInternal(element),让子类自己把数据放到缓存Buffer中,//这种情况只有在支持Buffer的Channel中才会发生enqueueResult === ENQUEUE_FAILED -> {} //如果这个时候接收者来了,调用子类的fferInternal(element)让子类把数据放交给接收方enqueueResult is Receive<*> -> {} // try to offer instead//未知错误,直接抛异常。else -> error("enqueueSend returned $enqueueResult")}}//如果上面enqueueResult是ENQUEUE_FAILED 重新把数据放到Buffer中// 如果上面enqueueResult是Receive那就把数据交给Receiveval offerResult = offerInternal(element)when {//子类的offerInternal告诉父类,我成功了.//至于子类你是把数据交给了接收者还是放入Buffer。父类不关心。 offerResult === OFFER_SUCCESS -> {/*** cont是一个CancellableContinuationImpl,调用其resume后,会把“Unit”* 存入cont中,当 "return @sr"后,在suspendCancellableCoroutineReusable函数* 会直接把“Unit”作为结果返回给sendSuspend函数,然后由sendSuspend函数会回给* 调用它的地方,因此sendSuspend不会挂起协程,发送方协程继续。*/cont.resume(Unit)return@sc}/*** 如果失败了回到while继续循环,什么时候会失败呢?不同Channel情况不同:* 1 针对RendezvousChannel来说肯定走了上面isFullImpl的逻辑,而且enqueueResult* 的结果必然是Receive(Queue中有接收者等待)。但是offferInternal依然返回OFFER_FAILED* 又说明了Queue的第一个节点不是Receive 也不是Closed,那就只能是null.* 那也就是说原来的Receive节点被移除了。什么情况会被移除?* (1)接收者所在协程被取消。* (2).另一个协程先一步把数据发送出去了让这个Receive接收到了* * 2 ConflatedChannel由于采用的是类似DROP_OLDEST策略,会不走上面isFullImpl的逻辑* 因此只有两种结果OFFER_SUCCESS | Closed。不会出现OFFER_FAILD** 3 ArrayChannel:对于她来说,能走的sendSuspend方法里面来,那说明在sendSuspend之前* 调用offerInternal的时候要么Buffer满了,要么Channel被关闭了。进入sendSuspend* 后,如果Channel没有被关闭,那么isFullImpl必然等于true. 代码能执行到这里,* 那说明在isFullImpl的逻辑中enqueueResult 的结果只能ENQUEUE_FAILED或者 Receive。* 但是不可能是Receive。因为 对于ArrayChannel来说,有挂起等待只能是还没有发送过数据* 那buffer肯定是空的.如果buffer为null。那也轮不到sendSuspend方方。* 因此在上面isFullImpl逻辑中 enqueueResult为ENQUEUE_FAILED,* 也就说明了Buffer腾出空间了(比如其他协程从Channel中接收了数据)。* 那这一次offerInternal返回OFFER_FAILED,那就只能说明之前腾出空间的Buffer又被其他* 协程发送数据给填满了。因此只能继续while循环把这一次send尝试挂起了。* * 4 LinkedListChannel 无限缓存,offerInternal 只有 OFFER_SUCCESS | Closed两种情况* **/offerResult === OFFER_FAILED -> continue@loop//如果发现接收方把协程关掉了。那就简单了,本次必然发送不成功。offerResult is Closed<*> -> {/*** 1,执行helpClose相关操作,看下面“ 源码2”* 2,如果有onUndeliveredElement,那么调用它。* 3,抛出Channel关闭异常,发送方协程终止。*/cont.helpCloseAndResumeWithSendException(element, offerResult)return@sc}//其他情况,直接抛异常终止发送方协程。else -> error("offerInternal returned $offerResult")}}}// ------------------------------------------源码(1)----start------------------------------------------//AbstructSendChannel.enqueueSend
/*** 把send对象加入到Queue(LockFreeLinkedListHead)尾部* enqueueResult 有4种返回值情况* (1)null : 表示入队成功,* (2)Closed :如果Queue尾部节点是一个Closed,说明Chanel被关闭了,* send不会被加入到Queue中,返回Closed * (3)ENQUEUE_FAILED : 说明缓存数据的buffer没有满,又有空间了。send不入Queue* (4 )Receive:如果是一个Receive类型,说明接收者来了,返回Receive,Send不入Queue*/
protected open fun enqueueSend(send: Send): Any? /*** RendezvousChannel和LinkedListChannel的isBufferAlwaysFull 永远为true* 另外两个永远为false.* 针对Rendezvous来说,它都没有Buffer,因此不用考虑在把Send入队时会存在Buffer腾出空间需要把* 发送的数据放入Buffer而不是把Send入队的情况。* 这对LinkedList来说,他的Buffer就是Queue,无限制大小,因此把Send入队时也不用考虑。*/if (isBufferAlwaysFull ) {/*** addLastIfPrev这个函数的干的事情就是,如果Queue的prev指向的节点(尾结点的意思)* 满足某个条件就把send添加到尾结点.满足神条件呢?就是lambda表达式的返回值为ture.* 在这里,lambda中如果尾结点是ReceiveOrClosed就直接把尾结点直接从enqueueSend* 返回出去,如果不是lambda就返回true.* 也就说明了只要尾结点不是Receivee或者Closed。都会添加成功。*/queue.addLastIfPrev(send) { prev ->if (prev is ReceiveOrClosed<*>) return@enqueueSend prevtrue}} else {//这个addLastIfPrevAndIf方法在addLastIfPrev的基础上又多了一个条件,看这个名字//就知道用And连接了两个if条件。即除了尾结点不是Receive 和Closed,还需要满足,第二个Lambda的//返回值,只有第二个Lambda返回true时,才能添加成功。在这儿第二个Lambda的返回值类型是isBufferFull//的值,这个值表示了Buffer是否满了。如果满了那就是true,那么Send就可以加入队列。如果没满//那就是false,则加入失败,很好理解,都没有满,就不需要Send入队了。直接放到Buffer中,然后发送方继续//最终如果没有把Send入队成功,enqueueSend方法最终的返回值就是ENQUEUE_FAILED 入队失败。if (!queue.addLastIfPrevAndIf(send, { prev ->if (prev is ReceiveOrClosed<*>) return@enqueueSend prevtrue}, { isBufferFull }))return ENQUEUE_FAILED}// 如果Send入队成功,返回nullreturn null}// ------------------------------------------源码(1)----end------------------------------------------// ------------------------------------------源码(2)----start------------------------------------------private fun Continuation<*>.helpCloseAndResumeWithSendException(element: E, closed: Closed<*>) {//把Closed节点前面的Receive节点都移除,并resume Receive节点对应的协程。helpClose(closed) //具体看下面源码//如果调用close()关闭Channel时没有传具体异常,那么就是默认的ClosedSendChannelExceptionval sendException = closed.sendException//如果创建Channel是设置了onUndeliveredElement,那就call 调用onUndeliveredElement//的回调。onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {//it 就是UndeliveredElementException,他代表了onUndeliveredElement执行时发生异常//把sendException 追加的就是UndeliveredElementException中it.addSuppressed(sendException)resumeWithException(it)return}/*** resumeWithException会调用CancellableContinuationImpl的resumeWith函数,在它里面会判断是否* 已经调用了CancellableContinuationImpl的getResult函数,如果没有调用,就把异常暂存起来,等待调用* getResult的时候直接throw。如果已经调用了getResult函数,那就说明发送方协程已经被挂起了,* 那么resumeWithException会恢复发送方协程,然后把异常传到发送协程,发送协程收到异常终止,在本例中,getResut* 还没有调用,因此会先把异常存起来。* 如果你对suspendCancellableCoroutineReusable里面的原理不是很明白* 可以查看另一篇文章 "你真的了解kotlin中协程的suspendCoroutine原理吗?"* “https://blog.csdn.net/Just_keep/article/details/130952896?spm=1001.2014.3001.5501” */resumeWithException(sendException)}/** AbstractChannel.helpClose* 顾名思义,帮助关闭,是什么意思呢?* 假如Queue中是如下情况:* head <=> [receive_1] <=> [receive_2] <=> head* 目前Queue中有两个接收者在等待,这个时候:* 线程T1 调用了close()方法,那么Queue:* head <=> [receive_1] <=> [receive_2] <=> [close_1] <=> head* 由于Channel被关闭了,那么不能再往Channel中发送数据了,如果不做任何处理,那receive_1 和receive_2* 所在的协程将不会有机会被ressum.(问题1)* * 这个时候如果线程T2在调用send()发送数据,由于Queue中第一个节点是receive_1,所以不仅不会抛出异常,* 还会把数据交给receive_1.(问题2)* * 所以如果调用chanel的close()方法把Closed加入Queue后,什么都不做就会存在以上的问题,* 因此还需要一个helpClose方法来帮助完成close操作* 也就是处理Queue中Closed左边的Receive节点。把Closed左边的所有Receive节点都从Queue中移除,并resume* 每一个Receive对应的协程(协程恢复后可能会抛异常,也有可能是收到一个ChannelResult类型,里面标识了Channel被关闭)**/
private fun helpClose(closed: Closed<*>) {//暂且就把InlineList当做一个类属于ArrayList的数据结构,实际上内部也用到里ArrayList//初始化一个空的List.里面什么都没有var closedList = InlineList<Receive<E>>()while (true) {//如果Cloesd节点的上一个节点是Receive(代表的是一个挂起的接收者)那就调用其remove方法//把自己从Queue中移除。如果不是就不做任何处理,不是Receive那就有可能是head(哨兵节点)或者Send类型 val previous = closed.prevNode as? Receive<E> ?: break//从Queue中移除自己(也就是previous节点)if (!previous.remove()) {//如果移除不成功,(可能因为线程竞争,多线程问题),调用helpRemove再次移除//目的就是要保证移除成功后才能进行下一次操作。previous.helpRemove() continue}//如果previous成功从Queue中移除,就把previous加入到这个List中,closedList += previous}//循环遍历closedList通知每一个Receive告知其Channel被关闭了//意思就是恢复被挂起的所有Receive对应的协程。被恢复的协程有可能会抛出异常也有可能收到的是//包含异常的ChannelResultclosedList.forEachReversed { it.resumeReceiveClosed(closed) }//看下面源码//这个方法目前在AbstractChannel中是一个空实现,子类也没有重写。先不管。onClosedIdempotent(closed)}//比如ReceiveElement中的实现override fun resumeReceiveClosed(closed: Closed<*>) {when {//如果用的channel的receiveCatching()方法那么receiveMode = RECEIVE_RESULT//意味着恢复的协程eceiveCatching()收到的是一个ChanelResult结果,ChanelResult//中包含了一个receiveException,isClosed = true.receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())//如果调用的channel的receive()方法那么receiveMode = RECEIVE_THROWS_ON_CLOSE//意味着恢复的协程要抛异常else -> cont.resumeWithException(closed.receiveException)}
}
// ------------------------------------------源码(2)----end------------------------------------------
总结:父类中的sendSuspend函数就是针对子类offer失败的最后手段,主要目的就是把本次要发送的数据包装成一个Send对象放入Queue中,等待接受者来取,由于Channel是支持多线程的,那么就有可能在进入sendSuspend的时候时候有接收者来了或者有Buffer的channel Buffer腾出空间了,再或者Channel被其他线程给关闭了。因此在调用enqueueSend的时候:
- 如果发现Channel被关闭了就向发送方协程抛出移除,如果Channel设置了onUndeliveredElement则回调本次发送的数据,同时如果Queue中还有其他等待的接收者则唤醒与之其对应的协程。
- 如果有接收者在等待(就算Channel有Buffer,既然有接收者,说明Buffer是空的),唤醒接收方协程并把数据交给接收者,发送方协程继续
- 如果没有接收者在等待,但是Channel中Buffer有空间,则把数据放入Buffer中,发送方协程继续
- 如果没有接收者在等待,并且也不能往Buffer中放入(满了),而且Channel也没有关闭,则把Send加入Queue中,发送方协程挂起
父类默认的实现(RendezvousChannel
)send代码执行流程如下:
上面的流程图比较接近代码执行的流程,记忆起来比较复杂,下面为简化版的。
onSend()函数
onSend
是在多路复用Select
中使用的,如果还不是很了解多路复用技术的先了解一下再来看一下内容会比较好理解一点,否则看起来会比较费劲。简单介绍一下多路复用是什么?在Select中,一路就是指的一个SelectClause
的invoke
调用。多路就是指有多个SelectClause
的invoke
调用,在invoke
方法中去做想要做的事情。那一路先执行完成(达到想要的目的),那么这一路的结果作为select
函数的返回值,然后恢复select
挂起函数。
在Channel
中的onSend
方法就是用在多路复用中的一种技术。一次onSend
调用可以理解Select
中的一路向Channel
发送一次数据,把数据交给了接收者或者把数据放入Buffer
则认为这一路的目标达成。因此在select
中有多个Channel
的onSend
调用时,谁发送的数据先被接收者接收,或者先放入自己Channel
的Buffer
,那么select
就选择这个Channel
的onSend作为结果。
onSend
的逻辑和send
函数的逻辑有一些区别,首先send
函数时挂起函数,onSend
不是,在send
函数中,首先是调用子类的offerInternal
方法,让自己把数据发送到Channel
中。只有子类offer
失败了,父类才处理。而onSend
方法的逻辑是,先父类处理(把包装的Send
对象加入Queue
)。只有加入不成功(说明有接收者在等待或者Buffer
有空间)才会调用子类的offerSelectInternal
方法让子类把数据发送到Channel
中。
先看例子:
val result = select<Int>{channel1.onSend(1){xxx1 //如果channel1这一路先执达成目的,你希望select的结果是什么?就返回什么,然后select挂起函数恢复}channel2.onSend(1){xxx2 //如果channel2这一路先执达成目的,你希望select的结果是什么?就返回什么,然后select挂起函数恢复}
}
实际上onSend
函数是一个get行为,返回值的类型为SelectClause2
,实际我们调用的是SelectClause2
的invoke
函数,这两个参数也是invoke
函数的参数,只是因为kotlin
的语法糖,让看起来是调用onSend
传入的参数:
//AbstractSendChannle.onSendfinal override val onSend: SelectClause2<E, SendChannel<E>>//返回的一个SelectClause2对象出去get() = object : SelectClause2<E, SendChannel<E>> {override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {registerSelectSend(select, param, block)}}
SelectClause2
的invoke
声明:
public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
所以我们可以写成这样:
val result = select<Int>{val selectClause : SelectClause2 = channel.onSend//invoke关键字省略掉了selectClause(1){xxx1 //返回1}}
第一个参数好理解,就是你要发送的数据,关键是第二个参数是干什么用的呢?由于select
函数是一个挂起函数,所以协程执行select
函数时就有可能会被挂起,假如被挂起了后,那谁来恢复呢?这是一个问题。
没错,从select
函数这个挂起点恢复的关键就是第二个参数,我们知道Select
是一个多路复用的技术。在select
函数里面可以同时监听多条路(一个SelectClause
的一次invoke
调用算一路),那一条路先完成,select
就选择这一条路中的第二个参数,然后用这个参数去开启一个协程(block.startCoroutineCancellable
)。这个新协程启执行的就是第二参数里面的代码。当这个新协程执行完成后。就会去恢复select这个挂起点
,并且新协程的返回值就作为select
的结果。
onSend
的完整例子用法:
fun main() {runBlocking {val channel1 = Channel<Int>(){log("channel1 已经发送的 $it 将不会被消费")}val channel2 = Channel<Int>(){log("channel2 已经发送的 $it 将不会被消费")}GlobalScope.launch(Dispatchers.IO) {val result = select<String> {//下面代码先接收的channel1。因此channel1的onSend这一路会先完成。channel1.onSend(1) {"channel1 发送的 1 被接收了"}//虽然channel2发送的数据也会被放到Channel的Queue中,但是由于//channel1的数据线被接收。因此channel2的数据在Channel1的数据//被接收的时候会被从Queue中移除。channel2.onSend(2) {"channel2 发送的 2 被接收了"}}log("select 的返回值为: $result")}//延迟一定时间是为了让Channel1和Channel2的onSend已经把数据放自己的Queue中了,//先接收Chanel1的数据,launch {delay(1000)val value1 = channel1.receive()log("channel1 received value is $value1")}launch {delay(2000)//由于channel1先被接收,select选择了channel1,因此会把//channel2发送的数据(SendSelect)会被从Queue移除.所以chnanel2的//onUndeliveredElement会被回调,//channel2.receive会被挂起,因为Channel2中没有数据可被接收了。val value2 = channel2.receive()log("channel2 received value is $value2")}}
}
//输出结果:
17:13:37:069[ main ] channel2 已经发送的 2 将不会被消费//onUndeliveredElement回调
17:13:37:096[ DefaultDispatcher-worker-1 ] select 的返回值为: channel1 发送的 1 被接收了
17:13:37:096[ main ] channel1 received value is 1
OK,现在正式进入源码:
第一步调用onSend获得SelectClause2
//AbstractSendChannel.onSend
final override val onSend: SelectClause2<E, SendChannel<E>>//返回的一个SelectClause2的匿名内部类的对象get() = object : SelectClause2<E, SendChannel<E>> {override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {registerSelectSend(select, param, block)}
}
第二步调用SelectClause2
的invoke
:
// 在SelectBuilderImpl中具体的实现
override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {//调用了自己的registerSelectClause2方法。//在自己的registerSelectClause2方法中调用了Channel的registerSelectSend方法。registerSelectClause2(this@SelectBuilderImpl, param, block)
}
所以最终调用到了registerSelectSend
里面:
//AbstractSendChannel.registerSelectSend
//第一个参数select为SelectBuilderImpl对象
//第二参数为要发送的数据
//但三个参数为调用SelectClause2的invoke方法里面的第二个参数。private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {while (true) {//isSeleced = true说明了select中已经有其他路已经完成了//seelect的状态为选中了,那么本次这一路就直接返回,啥也不用做,回到select中继续剩余操作(如果还有)if (select.isSelected) return/*** 这个参数在前面”send()函数“ 发送数据里面已经解释过了,* isFullImpl = true 意思就是没有接收者在等待,Channel也没被关闭* 并且针对有buffer的Channel,Buffer也满了。*/if (isFullImpl) {//封装成一个SendSlect对象val node = SendSelect(element, this, select, block)//加入到Queue的尾部,该方法在前面”send函数发送数据“详细介绍过。val enqueueResult = enqueueSend(node)when {//只要Channel没有被关闭,而且没有接收者在等待,如果有Buffer,Buffer任然是满的都会成功。enqueueResult == null -> { //注册回调,当有其他路被选择了后,把本次加入Queue中的SendSelect移除,//如设置了onUndeliveredElement,那么onUndeliveredElement也会//回调本次发送的数据elementselect.disposeOnSelect(node) //具体查看下面 “源码1”//本次onSend结束,直接回到select中,继续执行select函数中剩余代码return}//加入Queue失败,因为发现Channel被关闭了。enqueueResult is Closed<*> -> //helpClose会帮助处理Closed节点前的Receive节点(如果有)。前面send流程介绍过。//如设置了onUndeliveredElement,那么onUndeliveredElement也会回调本次发送的数据//然后返回一个SendException//直接抛出返回的SendException.那么select所在的协程也就结束了。throw recoverStackTrace(helpCloseAndGetSendException(element, enqueueResult))//加入Queue失败,这种失败是针对有Buffer的Channel. 上面isFullImpl = true说明Buffer满//了,但是在这个期间有可能被其他线程从Buffer里面取出数据导致Buffer有空间了。//那么不处理,走下面的逻辑把数据放入Buffer中。enqueueResult === ENQUEUE_FAILED -> {} // try to offer//加入Queue失败, 上面isFullImpl = true说明是没有接收者的,但是在这个期间//有可能别的线程来接收数据了。那么不处理,走下面的逻辑把数据交给接收者enqueueResult is Receive<*> -> {} // try to offer//加入失败,未知的异常。直接抛异常,.那么select所在的协程也就结束了。else -> error("enqueueSend returned $enqueueResult ")}}/*** 如果上面isFullImpl= true时,没有return出去。或者isFullImpl = false时那么就走下面逻辑。* offerSelectInternal跟offerInternal做的事情是一个意思,就是子类具体如何把一个数据发送* 到Channel中。每个子类都有自己发送方式,不同是offerSelectInternal会多做一件事就是* 如果offer成功了让Select选择当前路(修改SelectBuilderImpl中的_state为null,* 这样select.isSelected的值就会返回ture)。既然选择了当前路作为结果。* select中的其他已经被onSend的路就会全部被从Queue中移除,该回调onUndeliveredElement就回调。* 父类中的默认实现就是把数据交给发送者。父类的实现* 也是RendezvousChannel的实现方式。** 不同的子类offerSelectInternal返回的结果有一些差异:** 1. RendezvousChannel:* 会返回 ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC |* Closed.** ALREADY_SELECTED: 有其他路被选择了* OFFER_SUCCESS:成功把数据交给接收者* OFFER_FAILED:没能把数据交给接收者,Queue中没有接收者了(接收者可能因取到别的线程发送* 的数据走了,也有可能是接收方协程被取消了)。Channel也没被关闭。* RETRY_ATOMIC:有多个线程都在调用同一个Channel的onSend,执行稍微慢一步* 的这个线程发现另外一个线程已经开始把数据交给Queue中的接收者了,那么慢一步这个线程就会* 返回RETRY_ATOMIC。* Closed:Channel被关闭了(被别的线程给关了)* * 2.ConflateChannel:* 只会返回 ALREADY_SELECTED | OFFER_SUCCESS | Closed。* 有其他路被选了,返回ALREADY_SELECTED。* Channel被关闭了返回Closed.* 因为ConflateChannel的策略为DROP_OLDED。只要不是另外两种情况,都会发送成功返回* OFFER_SUCCESS(要么是交给接收者,要么是放入Buffer),** 3.ArrayChannel:* 只会返回`ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed。* 只有当Buffer满了的时候,并且没有接收者在等待时,并且缓存溢出策略为SUSPEND,* 并且没有关闭时才会返回OFFER_FAILED* OFFER_SUCCESS:有可能是放入了Buffer中,也有可能是交给了接收者。也有* 可能丢掉了(溢出策略为DROP_LATEST)。** 4.LinkedListChannel:* 只会返回 ALREADY_SELECTED | OFFER_SUCCESS | Closed。* 它是一个无限缓存队列,只要没有其他路被选中,没有关闭,都会发送成功,* OFFER_SUCCESS:有可能是放入了Buffer中,也有可能是交给了接收者。**/val offerResult = offerSelectInternal(element, select) //查看下面”源码2“when {//说明有select中有其他路先一步被选择了,直接返回到select中offerResult === ALREADY_SELECTED -> return/*** OFFER_FAILED 意味着子类没能成功的把数据发送到Channel.那么继续while循环* 把本次要发送的数据包装成SendSelect对象放入Queue中,等待接收者来接收。* */offerResult === OFFER_FAILED -> {} // retry/*** 和OFFER_FAILED一样 retry*/offerResult === RETRY_ATOMIC -> {} // retry//发送数据成功,可能是交给了发送者,也有可能是放入Buffer了,也有可能是丢掉了(溢出策略)offerResult === OFFER_SUCCESS -> {/*** block是调用SelectClause2的invoke方法里面的第二个参数。用这个函数开启一个新的协程。* select.completion 是 SelectBuilderImpl对象,它是Continuation的子类。* 当block开启的这个协程执行完成后,会调用SelectBuilderImpl的resume函数。在* SelectBuilderImpl的resume中会调用select函数所在协程的resume,并把* block的返回值传过去。** 至于select函数是会被挂起还是直接拿到block的值,取决于新协程和* select中最后一行代码谁先执行完成:* -> 如果select中最后一行代码先执行完成,调用那SelectBuilderImpl* 的getResult就返回COROUTINE_SUSPENDED作为select的返回值,* 因此select被挂起了。 等待新协程之心完成后调用resume恢复。** -> 如果新协程先执行完成,调用SelectBuilderImpl的resume函数传入* 的block的返回值会先存起来。当select中最后一行代码执行完成的时候* 调用SelectBuilderImpl的getResult就会把resume时存入的值取出* 作为select函数的返回值。select就不会被挂起了。** */block.startCoroutineUnintercepted(receiver = this, completion = select.completion)//回到select函数中,继续执行select中的剩余代码return}offerResult is Closed<*> ->//跟上面isFullImpl中遇到Channel被关闭时一样throw recoverStackTrace(helpCloseAndGetSendException(element, offerResult))//未知的异常。直接抛异常,.那么select所在的协程也就结束了。else -> error("offerSelectInternal returned $offerResult")}}}// ------------------------------------源码(1)---start--------------------------------------------//SelectBuilderImpl.disposeOnSelect
/*** 参数handle就是SendSelect对象,SendSelect是DisposableHandle的实现类* SelectBuilderImpl 继承了LockFreeLinkedListHead,因此SelectBuilderImpl本身就是一个双向链表的哨兵节点。* 这个函数的作用就是把SendSelect包装成另外一个Node(DisposeNode)存入SelectBuilderImpl这个链表中。** 不管是同一个Channel的多次onSend还是不同Channel的onSend,如果没有接收者他们的数据包装成SendSelect对象后首先会* 加入自己Channel的Queue中,然后在把SendSelect包装成DisposeNode加入SelectBuilderImpl这个链表中。** 一个Channel对应一个自己的Queue.一个Select对应自己一个链表。一个Select可以对应多个不同Channel.* 因此Channel调用onSend发送的数据包装成SendSelect放入自己Channel中的Queue.然后在把SendSelect包装成* DisposeNode存入Select的链表中,那么得出结论:在一个select函数中多个不同Channel的onSend的数据* 先包装成SendSelect放入自己Channel中的Queue,* 然后这些SendSelect再包装成DisposeNode存入同一个链表中(SelectBuilderImpl作为哨兵节点这个链表)*** 所以当Select中某一路被选择后,就会调用SelectBuilderImpl的doAfterSelect方法。在这个方法中* 回去遍历SelectBuilderImpl作为哨兵节点这个链表的所有节点(DisposeNode),然后拿到里面的SendSelect* 对象,然后调用SendSelect对象的dispose方法,把SendSelect对象从Channel中的Queue移除。*/
override fun disposeOnSelect(handle: DisposableHandle) {///*** DisposeNode继续LockFreeLinkedListNode,就是一个简单的对SendSelect进行了包装,* private class DisposeNode(* @JvmField val handle: DisposableHandle) : LockFreeLinkedListNode()** */val node = DisposeNode(handle)//再次确认Select目前还没有某一路被选择if (!isSelected) {/*** 把包装后后的node添加到 SelectBuilderImpl 作为哨兵节点的这个双向链表里面。* 注意这个链表和Channel中的Queue不是同一个链表* * 这儿就说了为什么要对SendSelect进行包装呢?因为SendSelect目前已经被添加到Channel的Queue中,* 也就是说SendSelect的 prev 和 next已经指向了Queue中的其他节点。如果直接把SendSelect添加到 * SelectBuilderImpl这个链表中,那就要让SendSelect的 prev和next重新指向,那不就导致SendSelect* 又被从Channel的Queue中移除了,因此把SendSelect包装到DisposeNode里面。把DisposeNode添加到* SelectBuilderImpl这个链表中,这样SendSelect任然还在Channel的Queue中。*/addLast(node) //再次确认,如果Select还没有其他路被选择,那就成功,然后return .//否则就会走下面的 handle.dispose()if (!isSelected) return }// 如果Select中已经有其他路被选择了,就调用SendSelect的dispose方法。//把SendSelect从Channel的Queue中移除handle.dispose()
}//SendSelect.dispose
override fun dispose() { //把SendSelect自己从Channel的Queue移除。如果自己已经被移除了就会移除失败if (!remove())return//移除成功,如果设置了onUndeliveredElement就进行回调undeliveredElement()
}//SelectBuilderImpl.doAfterSelect
private fun doAfterSelect() {/*** parentHandle是给select函数所在协程注册“取消回调”时返回的一个DisposableHandle对象,* 给协程注册一个“取消回调”的目的是select函数挂起期间如果所在协程被取消了,那么需要把select函数中* 所调用onSend或者onReciveXXX时封装的SendSelect和ReceiveSelect都从Queue中移除。** 调用其dispose方法可以把”取消监听器“移除。意思就是select已经有选择了,那么协程就会从* select函数拐点恢复了,也就不需要再监听协程取消了。*/parentHandle?.dispose()//遍历整个链表,然后拿到SendSelect对象,调用其disopse方法forEach<DisposeNode> {it.handle.dispose()}
}
// ------------------------------------源码(1)---end-------------------------------------------// ------------------------------------源码(2)-----start---------------------------------------
//AbstractSendChannel.offerSelectInternal
//父类默认实现为把数据交给接收者,也是RendezvousChannel的实现方式
//返回结果为:ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed.
protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {// offer atomically with selectval offerOp = describeTryOffer(element)val failure = select.performAtomicTrySelect(offerOp)//上面两步里面的代码比较深了,主要做了两件事://1. 就是通过类似于CAS原子的操作,从Queue中取出Receive,把这个Receive存到offerOp中//2. 让Select选择当前路(修改SelectBuilderImpl中的_state的值由原来的NOT_SELECTED改为null,//这样select.isSelected的值就会返回ture.//既然选择了当前路作为结果。select中的其他已经被onSend的路就会全部被从Queue中移除,该回调//onUndeliveredElement就回调(这一步是在SelectBuilderImple的 doAfterSelect()函数中执行的,// doAfterSelect() 函数是在把Select的状态变量值由原来的NOT_SELECTED改为null时调用的,)/*** 如果上面操作成功了那么failure为null,如果操作失败failure代表了失败的原因* * 错误结果可能是 ALREADY_SELECTED | RETRY_ATOMIC |OFFER_FAILED | Closed* ALREADY_SELECTED : 说明有select中有其他路先一步被选择了,比如说 select中调用了A,B,C* 三个Channel的onSend。A,B 执行的时候没有接收者来取数据,执行C的时候,另外一个线程去接收A的数据* 让A的这一次onSend的数据先被接收了,从而让Select选择了A这一路。C就会返回ALREADY_SELECTED * RETRY_ATOMIC :比如有两个不同的协程都在对同一个Channel onSend数据,就会存在先后问题,执行快的* 那个协程就会先一步把数据交给接收者,慢的这个协程发现有人先一步了正在把数据交给接收者* 就会返回RETRY_ATOMIC ** OFFER_FAILED:Queue中没有等待的接收者了,可能有其他线程往Channel中发送了数据,原来挂起的* 接收者取到数据后就走了。* * Closed: Channel被关闭了。*/if (failure != null) return failure//如果操作成功了,就拿到Receiveval receive = offerOp.result//恢复接收的协程receive.completeResumeReceive(element)// 接收者Receive中offerResul 永远为OFFER_SUCCESSreturn receive.offerResult
}
// ------------------------------------源码(2)-----end---------------------------------------
父类中默认的实现(RendezvousChannel)onSend代码执行流程图
对于RendezvousChannel
总结下来onSend
发送一个数据到Channel
中,onSend
本身不是挂起函数,他所在的select
是挂起函数,发送数据时如果有接收者就把数据交给接收者,如果没有接收者就把数据包装厂一个SendSelect
对象放入Channel
的Queue
中。和send()
函数不同的时,send()
函数发送数据时如果没有接收者在等待那么就需要挂起等待,而onSend()
函数不会挂起,当onSend
把数据包装成SendSelect
对象后,还可以继续onSend
(也就是说即使是同一个Channel
在数据没有被消费掉时可以onSend
多次,而send
不可以)。比如:
val channel = Channel<Int>(){println("$it 没有被消费" )
}launch{//延迟一秒后去接收,确保三次onSend都执行完成,因此Queue中的情况<=>如下:<=> [Head] <=> [SendSelect1] <=> [SendSelect2] <=>[SendSelect3] <=>delay(1000) //接受数据时,先取的是SendSelect1的,也就是 数据 1//Queue中其他几个会因为SendSelect1的接收而导致被从Queue中移除。//起原因是每次onSend把数据加入Queue时都通过select.disposeOnSelect(node)会注册一个监听//意思就是当select中有其他路被选择时,此处node会被从Queue中 移除。上面源码中已经分析过了。val value = channel.receive() //拿到数据}
val resutl = seelct<Int>{//可以调用同一个Channel的多次onSend,channel.onSend(1){1}channel.onSend(2){2}channel.onSend(3){3}
}
onSend
发送的数据被接收者接收后,那么这次onSend
就会被Select
选中。然后调用onSend
时传的第二个参数就会作为一个新协程的代码体被启动执行,协程返回值就是select
函数的结果,协程执行完成后恢复select
函数挂起点,其他没有被select
选择的路就会丢弃,意味着其他onSend
出去的数据都会被丢弃(SendSelect
从Queue
中移除)。被移除时如果设置了onUndeliveredElement
,那么onUndeliveredElement
就会回调这些被移除的数据,因为他们再也没有机会到达接收者了。
**当然在实际使用中我们一般都是在select函数中调用不同Channel的onSend,**谁的onSend
的数据先被接收者消费,那么这次onSend
就被Select
作为选择。
简化版流程:
接收流程
Channel接收一个数据有六种方式:
- receive() :T
是一个挂起函数,从Channel中接收一个数据。遇到Channel关闭时会抛出异常导致接收方协程终止 - tryreceive() :ChannelResult
非挂起函数,立马返回结果,如果有数据(不管是从Send里面还是从Buffer里面)ChannelResult.isSuccess = true。
如果没有数据,返回失败ChannelResult.isFailure = true。
如果被关闭,返回ChannelResult.isClosed = true - receiveCatching() :ChannelResult
receive()函数的替代品,只是在接收数据时如果Channel被关闭了,不会抛出异常,而是ChannelResult.isClosed = true。
如果接收成功ChannelResult.isSuccess = true。 - onReciveCatching():
非挂起函数,在多路复用(Select)时使用该方法发从Channel中接收一个数据 - receiveAsFlow() : Flow
把Channel 转成成一个热流(Hot Flow).当我们订阅Flow的时候Flow就会调用Channel的receiveCatching()方法把接收到的数据emit出去,这样Flow的下游消费者就能收到数据了。 - 通过迭代方式:比如:
for(data in channel){}
本文着重讲解receiveCatching()
和onReciveCatching()
两个,理解了这两个,其他几个就比较简单了了
receiveCatching()函数
创建一个Channel
,调用receiveCatching()
接收数据:
val channel = Channel<Int>() //默认为RendzvousChannelxxxxwhile (true) {//如果发送方没有发送数据,就会挂起。val channelResult = channel.receiveCatching()if(channelResult.isClosed){break}//接收成功就返回数据,否则抛出异常,比如Channel被关闭了,val result = channelResult.getOrThrow()//如果不想因为Channel关闭而抛出异常可以使用 channelResult.getOrNull()}
receiveCatching
为父类AbstructChannel
中的final方法:
public final override suspend fun receiveCatching(): ChannelResult<E> {/*** pollInternal() 和 offerInternal 是相对应的,有offer就有poll.* 子类都有自己的poll方式,先调用子类的pollInternal,让子类自己从Channel中去取数据,* 子类如何取,父类中不关心,只需要把子类取的结果告诉父类就行。* poll的结果有三种:* 1. 具体的数据:表示子类poll成功了。* 2. POLL_FAILED:还没有往Channel中发送数据* 3. Closed : Channel被关闭** 不同的Channel有自己的poll逻辑。具体请看后面内容分析。*/val result = pollInternal()//只要不是POLL_FAILED,就把结果转化成一个ChannelResult对象返回到外部协程(接收者协程),//即使是Closed。也把Closed放入ChannelReuslt中返回。//这儿就是和receive函数不一样的地方,如果是receive函数,遇到result是Closed时,是不会返回。//会继续走下面的receiveSuspend方法,在receiveSuspend方法里面让外部协程(发送者协程)抛出异常的。if (result !== POLL_FAILED) return result.toResult()/*** 如果子类poll失败,再由父类调用receiveSuspend继续处理* * 注意这个参数和receive方法中传的也不一样,receive方法中传的是RECEIVE_THROWS_ON_CLOSE* 这个参数最终导致了接收数据时遇到Channel被关闭的情况,接收方协程是否抛出异常的关键。* * 在遇到Channel被关闭时:* RECEIVE_RESULT:* resume接收方协程时把Channel关闭异常封装到ChannelResult里面丢给接收方协程,接收方协程不会抛异常* * RECEIVE_THROWS_ON_CLOSE: * resume接收方协程时把Channel关闭的异常丢给接收方协程,接收方协程抛异常*/return receiveSuspend(RECEIVE_RESULT)}//简单对比对比一下receive方法的源码:public final override suspend fun receive(): E {val result = pollInternal()//这儿多了一个判断条件result !is Closed<*>if (result !== POLL_FAILED && result !is Closed<*>) return result as Ereturn receiveSuspend(RECEIVE_THROWS_ON_CLOSE)}
pollInternal()
父类中提供的pollInternal
实现方式不光是RendezvousChannel
的实现方式同时还是LinkedListChannel
的实现方式,对于RendezvousChannel
来说就是没有Buffer
的情况,poll
的行为就是从Queue
中取挂起等待发送者。对LinkedListChannel
来说就是有Buffer
的情况,Buffer
就是Queue
,所以就从Queue
中取缓存的数据,LinkedListChannel
他的缓存是无限制的,不存在挂起的发送者。
不管Channel有没有Buffer
,没有Buffer
的Channel
可以认为是有Buffer
但是里面永远没有数据,因此可以把子类的poll行为总结如下:
子类在从Channel
去数据时,如果Buffer
中有数据,优先从Buffer
中取,如果Buffer
中没有数据再从挂起等待的发送者中取,如果没有挂起等待的发送者并且Channel
没有被关闭时则把接收方挂起,如果Channel
关闭了,则抛出异常。
/*** AbstractChannel中pollInternal的逻辑就从Queue中取Send对象* 对于Rendezvous来说是挂起的发送者,对linkedlist来说是缓存在Queue种的数据。* */
protected open fun pollInternal(): Any? {while (true) {/***takeFirstSendOrPeekClosed如果Queue中第一个节点是Send 或者Closed类型则返回。否则返回null.**由于父类中的poll实现代表了Rendezvous和LinkedList两种Channel的实现方式*因此takeFirstSendOrPeekClosed()返回的结果的情况如下:* 1. null: Queue中没有东西,对于Rendezvous来说就是没有挂起的发送者,* 对于LinkedList来说就是没有缓存的数据。* 或者有东西到那时不是Send类型。* 2. Closed:代表了Channel被关闭了,并且Queue中没有需要被接收的数据,* 3. Send:Send类型又分为三种:* (1)SendElement或者其子类:这是Rendezvous调用send函发送方被挂起了。* (2)SendSelect:这是Rendezvous在select函数中调用了onSend。* (3)SendBuffered:这是LinkedList缓存在Buffer中的数据。* * 如果take出来的是null。直接返回POLL_FAILED,意味着子类取数据失败*/val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED/*** 关于tryResumeSend:** 1.如果取出来的是SendElemeent,那么尝试resume发送方协程。如果tryResume成功返回RESUME_TOKEN* tryRsume失败,返回null.** 2.如果取出来的是Closed。在Closed中 tryResumeSend永远返回RESUME_TOKEN** 3.如果取出来的是SendSelect,那么就把SelectBuilderImpl的状态_state值设置为null.让Select* 的状态变为有选中了,Select选中的就是SendSelect对应的那一次onSend调用,操作成功返回* RESUME_TOKEN,操作失败返回null,比如Select中已有其他路先一步被选择了。* * 4。如果取出来的是SendBuffered,也直接放回RESUME_TOKEN*/val token = send.tryResumeSend(null)if (token != null) {assert { token === RESUME_TOKEN }/*** 关于completeResumeSend:* 1. 如果send是SendElemeent,那就调用发送方协程的completeResume完成resume操作。* 2. 如果Send是Closed或者SendBuffered,completeResumeSend是个空方法。* 3. 如果是SendSelect,那么就开一个协程去执行select函数中channel.onSend时传入的suspend函数* 从而使得select挂起函数被恢复*/send.completeResumeSend()/*** 关于 pollResult:* send 如果是Closed,pollResult的值就是Closed自己,否则pollResult就是要取的数据* * return出结果,告知父类*/return send.pollResult}/*** 对应上面tryResumeSend,只有send是SendElement或者SendSelect时token才有可能为null,* 如果send是SendElement,说明tryResume太迟了,挂起等待的发方法协程已经被取消了 token才会为null.* 如果send是SendSelect,说明慢了一步,Select中有其他路先一步被选择了,token才会为null.** 因此不管是SendElement还是SendSelect,他们发送的这一次数据将被认为不会被接收。所以回调* OnUndeliveredElement*/ send.undeliveredElement()//重新While循环}}
receiveSuspend()
在receiveCatching函数中,父类根据子类pollInternal的结果,来决定下一步如何进行。只要结果不是POLL_FAILED(对应的要么是取到数据,要么是Channel被关闭的Closed)。父类就把结果转换成一个ChannelResult对象返回到接收方协程。一次receiveCatching调用就结束了。如果子类告诉父类poll的结果是POLL_FAILED,父类还将继续调用receiveSuspend函数继续处理,父类的核心就是创建个Receive对象调用enqueueReceive函数把Receive加入到Queue中,让接收方协程挂起。当然也有可能发生意外,比如有人搞事情,其他线程把Channel关闭了,或者其他线程往Channell里面发送了数据,因此父类根据enqueueReceive的结果有如下行为:
- Receive加入Queue成功返回true:
给接收方协程注册一个取消监听,在接收到数据之前如果接收方协程取消了就把Receivec从Queue中移除。然后挂起接收方协程 - Receeive加入Queue失败返回false:
这种情况要么是Channel被关闭了,要么是有人往Channel中发送了数据(有可能是发送者挂起,也有可能是Buffer中有数据了),因此父类将再次调用子类的pollInternal方法,让子类去取数据,等待子类的poll结果:- 子类poll回来说Channnel被关闭了,父类就把包含关闭异常信息的ChannelResult返回给接收方协程
- 子类poll回来说取到数据了,成功了,父类就把包含数据的ChannelResult返回给接收方协程
- 子类poll回来说是失败了,这种情况就是原本Channel中有数据,但是又被别的其他线程先一步把数据取走了,因此父类重新执行while循环,尝试把接收方挂起。
/*** AbstractChannel.receivesSuspend*/
private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont ->/*** 根据在获取channel时是否设置了onUndeliveredElement创建不同的Receive对象。* ReceiveElementWithUndeliveredHandler继承至ReceiveElement。* * ReceiveElementWithUndeliveredHandler就是下面代码再次调用pollInternal,取到数据返回成功,* 需要重新resume接收方协程时,设置了一个回调,什么回调呢?重新resume协程时如果遇到协程被取消* ,相当于resume失败,相当于接收方协程嗝屁了,也就意味着poll回来的数据不会被成功交给接收方协程,* 接收方协程都死了。因此便可通过ReceiveElementWithUndeliveredHandler中的* onUndeliveredElement回调告知外界这个数据没有成功到达接收者被消费。* * ** 创建Receive对象是有两个关键参数:* (1)cont : 当前协程,用来被发送方发送数据时唤醒挂起放协程* (2)receiveMode : 这个参数是receiveCatching和receive函数中调用receiveSuspend时传进来的* 它决定了当碰到Channnel关闭时是直接把异常抛给接收方还是把异常放到ChannelReuslt中给到接收方。*/val receive = if (onUndeliveredElement == null)ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) elseReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)while (true) { //while死循环/*** 把Receive对象加入到Queue中,准备把接收方挂起。返回true表示加入成功,false则失败.* 只要Channel中没有可被接收到的数据(挂起的发送者或者Buffer中的数据)就会加入成功。* 反之既然有数据了就没必要让接收方挂起直接从Channel中取数据,取完就走。* */if (enqueueReceive(receive)) { //查看下面 “源码(1)”//加入成功,给接收方协程注册一个取消监听,在接收到数据之前如果接收方协程取消了//就把Receivec从Queue中移除。removeReceiveOnCancel(cont, receive)return@sc //return出去,接收方协程被挂起了}//加入失败,要么Channel被关闭了,要么Channel中有数据了,//重新调用子类的pollInternal(),让子类去取数据,看看到底是个什么情况val result = pollInternal()//如果是Channel被关闭了if (result is Closed<*>) {/*** 在resumeReceiveClosed函数中,会根据在前面创建Receive对象时的第二参数receiveMode* 来决定以何种方式resume接收方协程。* * receiveMode = RECEIVE_RESULT : resume接收方协程时,给接收方协程一个包含* 关闭异常信息的ChannelResult对象。* * receiveMode != RECEIVE_RESULT: resume接收方协程时会给接收方协程一个异常,* 导致接收方协程终止*/receive.resumeReceiveClosed(result) //查看下面“源码(2)”//return出去,接收方协协程要么收到一个异常终止,要么收到一个包含异常的ChannelResultreturn@sc }//如果子类poll成功了,取到数据了。if (result !== POLL_FAILED) {/*** 调用接收方协程的resume让接收方协程继续,该方法接收两个参数:* 第一个参数:‘receive.resumeValue(result as E)“ 会根据创建Recive对象时的* 参数receiveMode来决定是返回一个包含result的ChannelResult对象,* 还是直接返回result.** 第二个参数:’receive.resumeOnCancellationFun(result as E)‘这个* resumeOnCancellationFun函数在ReceiveElementWithUndeliveredHandler* 有实现,而在ReceiveElement没有实现。其意思就接收方协程自收到结果之前* 如果被协程被取消了,就执行ReceiveElementWithUndeliveredHandler中的* onUndeliveredElement回调* */cont.resume(receive.resumeValue(result as E), //查看下面”源码(3)“receive.resumeOnCancellationFun(result as E))//查看下面”源码(4)“return@sc return出去,接收方协程收到结果。}//至于result == POLLFAILED的情况说明Channel中没有可被消费的数据了,有可能被其他线程先一步//消费了,也有可能被挂起的发送方自己取消了,那就继续while循环}}//------------------------------源码(1) ------------------start--------------------------------//AbstractChannel.enqueueReceive
//直接调用了enqueueReceiveInternal函数
private fun enqueueReceive(receive: Receive<E>) = enqueueReceiveInternal(receive).also { result ->if (result) onReceiveEnqueued //如果加入成,执行onReceiveEnqueued,目前该方法是个空实现,}//AbstractChannel.enqueueReceiveInternal
protected open fun enqueueReceiveInternal(receive: Receive<E>): Boolean = /*** RendezvousChannel和LinkedListChannel的isBufferAlwaysFull 永远为true* 另外两个永远为false.* 针对Rendezvous来说,它都没有Buffer,因此不用考虑在把Receive入队时会存在别的线程* 往Buffer中发送了数据而导致不需要把Receive入队挂起,而是直接从Buffer中取数据的情况。* 这对LinkedList来说,他的Buffer就是Queue,无限制大小,往它里面发送数据如果没有接收者* 时都是把数据打包成SendBuffered对象放入Queue中,因此把Receive入也不用考虑Buffer中会突然* 来数据了。** Rendezvous 和 LinkedList只需要考虑Queue的最后一个节点是不是Send类型即可*/if (isBufferAlwaysEmpty)/*** 这对不需要考虑Buffer中是否有数据的情况,只需要考虑Queue中的最后一个节点是否为Send类型。* Closed也是Send的子类。** 'it !is Send'最后一个节点不是Send类型,说明了Channel没有被关闭,同时Channel中也没有* 可以用来接收的数据,因此满足这个条件的情况下就可以把Receive加入到Queue的尾部让接收者挂起等待。** */queue.addLastIfPrev(receive) { it !is Send }else/*** isBufferAlwaysEmpty = false 说明是另外两个Channel.那么除了需要考虑虑Queue中的最后一个* 节点是否为Send类型。还需要考虑Buffer中是否有数据。** 'it !is Send': 最后一个节点不是Send类型,说明了Channel没有被关闭,同时Channel中也没有* 可以用来接收的数据,* * ‘isBufferEmpty’ : true 说明了Buffer中没有数据,反之false则有** 只有在两个条件都满足时,才能把Receive加入到Queue的尾部让接收者挂起等待。**/queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })//------------------------------源码(1) ------------------end--------------------------------//------------------------------源码(2) ------------------start--------------------------------//ReceiveElement.resumeReceiveClosed
override fun resumeReceiveClosed(closed: Closed<*>) {when {//closed.toResult<Any>()会创建一个包含关闭异常的ChannelResult对象,//resume函数会让接收方协程的收到一个ChannelResul继续执行,receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())//resumeWithException会让接收方协程收到一个异常而终止else -> cont.resumeWithException(closed.receiveException)}}//------------------------------源码(2) ------------------end--------------------------------//------------------------------源码(3) ------------------start--------------------------------//ReceiveElement.resumeValue
fun resumeValue(value: E): Any? = when (receiveMode) {RECEIVE_RESULT -> ChannelResult.success(value)else -> value
}
//------------------------------源码(3) ------------------end--------------------------------//------------------------------源码(4) ------------------start--------------------------------
//ReceiveElementWithUndeliveredHandler.resumeOnCancellationFun
override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =//把onUndeliveredElement回调和协程的取消绑定在一块,让协程取消的时候能回调onUndeliveredElement//bindCancellationFun就不继续深入了,代码很简单,感兴趣的就自己看看onUndeliveredElement.bindCancellationFun(value, cont.context)//-----------------------------源码(4) ------------------end--------------------------------
RendezvousChannel 和 LinkeLilstChannel的 receiveCatching 都是采用父类中的默认行为。同样用一种图来总结一下整个过程:
RendezvousChannel的简化版的receiveCatching流程:
onReceiveCatching()函数
onReceiveCatching
用于在select
函数中调用,用于从Channel
中接收一个数据,在select
中,谁的onReceiveCatching
先接收到数据(包括关闭),select
就选择这一路。onReceiveCatching
函数本身不是一个挂起函数,它运行在select
这个挂起函数中。
经过前面onSend
函数的学习,其他废话就不用多输了,直接上源码。
//onReceiveCatching的简单用法,三个channel谁先接收到数据,就把接收到的结果给到result。然后select挂起函数恢复
val result = select<Int?> {channel.onReceiveCatching{it.getOrNull() //it 为 从ChannelResult}channel2.onReceiveCatching{it.getOrNull()}channel3.onReceiveCatching{it.getOrNull()}
}
onReceiveCatching
返回的是一个SelectClause1
,相对于onSend
的 SelectClause2
来说,它少了一个参数。
final override val onReceiveCatching: SelectClause1<ChannelResult<E>>get() = object : SelectClause1<ChannelResult<E>> {@Suppress("UNCHECKED_CAST")override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ChannelResult<E>) -> R) {registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R)}}
调用到了registerSelectReceiveMode
:
/** AbstractChannel.registerSelectReceiveMode* */
private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {while (true) {//如果有其他路被选择了,直接return 出去if (select.isSelected) return//isEmptyImpl为true代表了Channel没有被关闭 and 没有挂起的发送者 and 如果有Buffer里面也没有数据if (isEmptyImpl) {//把本次onReceivCatching封装成一个 ReceiveSelect对象加入到Queue中、//只要Channel中没有可被消费的数据(挂起的发送者,或者Buffer中的数据),//并且Channel没有关闭,就能加入成功返回true,否则返回false.if (enqueueReceiveSelect(select, block, receiveMode)) //加入成功,回到select函数继续。加入失败一般是指Channel被关闭了或者有可被消费的数据了。return //查看下面“源码(1)”} else {// isEmptyImpl为false,说明要么Channel中有可以用来接收的数据,要么Channel被关闭了//调用子类的pollSelectInternal去从Channel中取数据,子类怎么取不关心,只关注结果。val pollResult = pollSelectInternal(select)when {//子类在取的过程中,发现有其他路被先一步被选择了pollResult === ALREADY_SELECTED -> return//POLL_FAILED代表了Channel中没有可被消费的数据了,可能是被其他线程抢先一步给取走了。//因此回到while循环继续。pollResult === POLL_FAILED -> {} // retry//RETRY_ATOMIC代表了Channel中任然有可被消费的数据,取这个数据的过程是一个otmoic操作。// 操作失败了也继续重新回到while循环,pollResult === RETRY_ATOMIC -> {} // retry/*** 其他情况里面包括了取数据成功和Channel被关闭两种的情,* 在tryStartBlockUnintercepted中** 针对Channel关闭,又有两种情况:* (1)receiveMode = RECEIVE_THROWS_ON_CLOSE:直接throw 异常* (2)receiveMode = RECEIVE_RESULT: 会尝试让select选择当前路,如果尝试成功了* 就开启一个新协程,去执行block代码快并把包含关闭异常的ChannelResult传到block* 代码块中。开启协程后就会回到while循环中,然后在下一次while时因为* select.isSelected = true* 而从registerSelectReceiveMode中返回出去。新协程执行完成后恢复select挂起函数。* 如果尝试失败,说明select选择了别的路,那直接return出来,继续while循环,* 然后会因为select.isSelected = true从registerSelectReceiveMode中返回出去。* * 针对取数据成功的情况也有两种情况,这两种情况都是开启一个新协程去执行block代码快,区别就是在* receiveMode = RECEIVE_RESULT时执行block代码快时传的东西为ChanelResult类型,* receiveMode为其他时传的东西就是取到的原始数据。新协程执行完成后恢复select挂起函数。* * 在tryStartBlockUnintercepted中开启新协程后就会回到while循环里面,由于子类poll的时候* 已经让select选择了当前路,因此下一次while循环就会return出去了。* * tryStartBlockUnintercepted的代码在下面“源码(2)”*/else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)}}}
}// ----------------------------源码(1) -----------start ------------------------------------/** AbstractChannel.enqueueReceiveSelect* 把本次onReceivCatching封装成一个 ReceiveSelect对象加入到Queue中、* 只要Channel中没有可被消费的数据(挂起的发送者,或者Buffer中的数据),并且Channel没有关闭,就能加入成功返回true* 否则失败返回false.*/
private fun <R> enqueueReceiveSelect(select: SelectInstance<R>,block: suspend (Any?) -> R,receiveMode: Int): Boolean {/*** this指Channel自己, select中持有了select函数所在的协程。* block为调用onReceiveCatching传的 lambda*/val node = ReceiveSelect(this, select, block, receiveMode)// 把Receive对象加入到Queue中。只要Channel中没有可被消费的数据,并且Channel没有关闭,就能加入成功,否则失败val result = enqueueReceive(node)//如果加入成功,注册一个回调,意思是当有其他路被选择了,把这一次的Receive对象从Queue中移除。// 关于disposeOnSelect的具体细节请参考前面 onSend中的内容。if (result) select.disposeOnSelect(node)return result
}// ↓↓private fun enqueueReceive(receive: Receive<E>) = enqueueReceiveInternal(receive).also { result ->//如果把receive加入Queue成功,回调receive入队成功,该回调目前几个Channel子类均为实现。 if (result) onReceiveEnqueued()
}// ↓↓//把Receive加入到Queeue中,加入成功 return true, 否则 return false.
protected open fun enqueueReceiveInternal(receive: Receive<E>): Boolean =
/*** isBufferAlwaysEmpty 意思就是Channel中的Buffer中是不是永远都不可能有数据,* 对于Rendezvous来说,它不支持Buffer,所以可以认为Buffer中永远不可能有数据,因此Rendezvous 的改值永远为true.* 对于LinkedList来说,它的Buffer为Queue,如果把LinkedList 和 Conflate和Array来比较的话,它没有额外的Buffer。* 因此LinkedList中isBufferAlwaysEmpty也永远为true。* * 对于Conflate和Array来说 isBufferAlwaysEmpty = false.** 也就是说对于 Rendezvous 和 LinkedList 走if ,另外两个走 else 流程。*/
if (isBufferAlwaysEmpty)/*** 如果Queue中最后一个节点不是Send类型,简而言之就是:* 对于Rendezvous来说 Channel没有被关闭,也没有挂起的发送者.* 对于LinkedList来说Queue中没有还未被消费的数据,LinkedList是无限缓存的,不存在挂起的发送者。** 满足这个条件就把Receive加入Queue中,好让发送方协程挂起。*/queue.addLastIfPrev(receive) { it !is Send }
else/*** addLastIfPrevAndIf 相较于 addLastIfPrev多了一个条件 isBufferEmpty 还得为true.* 因为eles里面是 Conflate和Array走的流程,isBufferEmpty 意思就是Buffer是不是空的,* true说明Buffer中没有数据,false则有。**/queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })// ----------------------------源码(1) -----------end ------------------------------------// ----------------------------源码(2) -----------start ------------------------------------private fun <R> (suspend (Any?) -> R).tryStartBlockUnintercepted(select: SelectInstance<R>, receiveMode: Int, value: Any?) {when (value) {is Closed<*> -> {when (receiveMode) {RECEIVE_THROWS_ON_CLOSE -> {throw recoverStackTrace(value.receiveException)}RECEIVE_RESULT -> {if (!select.trySelect()) returnstartCoroutineUnintercepted(ChannelResult.closed<Any>(value.closeCause), select.completion)}}}else -> {if (receiveMode == RECEIVE_RESULT) {startCoroutineUnintercepted(value.toResult<Any>(), select.completion)} else {startCoroutineUnintercepted(value, select.completion)}}}
}// ----------------------------源码(2) -----------end ------------------------------------
接下里看看在父类AbstractChannel
中对pollSelectInternal
的默认实现是怎么样的(子类Rendezvous
和Linkedlist
都采用了父类的默认行为):
protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {val pollOp = describeTryPoll()val failure = select.performAtomicTrySelect(pollOp)/*** 上面两行代码主要做的事情有两件:* (1)就是在select中还没有某一个路被选择的时候,尝试把数据从Channel中取出,* 这里的取出包括挂起的发送者,也包括缓存在Buffer中的数据,至于具体从什么地方取就看channel的种类了。* (2)成功取出收据后,让Select选择当前这一路为结果。当前这一路被选中成功,就把其他已经通过* enqueueReceive(node)函数加入Queue的ReceiveSelect移除,* * 如果两件事情都成功了failure = null。如果失败failure的值就失败的原因:** POLL_FAILED:比如Channnel_A 里面只有一个可被消费的数据,正准pollSelectInternal的时候,* 很有可能此时有另外一个线程也在从Channel_A中取数据,所以pollSelectInternal就很有可能慢一步取不到了。* 这种情况失败的结果为 POLL_FAILED** ALREADY_SELECTED:假如在select中依次调用了ChannelA,ChannelB,ChannelC的onReceiveCatching,* ChannelA 和 ChannelB执行onReceiveCathcing都没有取到数据,因此他们都被包装成了一个ReceiveSelect对象* 加入到了Queue中。当执行到ChannelC的时候,ChannelC中有可被消费的数据,此时也没有别的线程在取ChannelC的数据,* 当ChannelC进入到pollSelectInternal函数里面的时候,此时有另外换一个线程往ChannelA中发了一个数据,* 于是乎ChannelA就接收到了数据,从而让select选中了ChannelA这一路,因此ChannelC这一次就会失败。* 这种失败的结果是ALREADY_SELECTED** Closed:和POLL_FAILED情况比较类似,只是被别的线程把调用了Channel的cancel函数把Channel取消了* 一个Channel被取消后里面所有数据都会被移除,然后channel变成关闭状态。再或者别的线程先把数据取走了,* 在调用了关闭。总之就是Channel里面没有可被消费数据,并且Channel被关闭了,这种失败的结果为Closed。* * RETRY_ATOMIC:比如有两个select函数分处不同协程(不同线程)都在对同一个Channel进行onReceiveCatching。* 此时就可能因为otomic原子操作失败,比如被别的线程抢先一步取到数据,失败的结果为RETRY_ATOMIC。* 意思是需回到调用pollSelectInternal的registerSelectReceiveMode中重新再while循环一次。* **/if (failure != null) return failure//failure 为 null ,说明取出数据成功,pollOp的result为Send对象。val send = pollOp.result/*** 此处的Send 有三种可能:SendElement | SendBuffered | SendSelect* * SendElement: completeResumeSend就是恢复挂起的发送方协程。* SendBuffered : completeResumeSend里面什么都不做。* SendSelect: completeResumeSend就是开启一个新协程去执行SendSelect中的block代码块也就是调用onSend* 时传的第二个参数。block代码块执行完成后,onSend所在的seelet函数所挂起的协程恢复。*/send.completeResumeSend()//把从Send中的真实数据返回出去return pollOp.result.pollResult
}
关闭流程
关闭一个Channel
采用的方式是给Queue
中添加一个Closed
节点,作为一个关闭标识,一旦Queue
中添加了Closed
节点后,Queue
中就再也不能添加其他Send
或者Receive
节点了。因为对于发送来说,发送数据之前,需要判断Queue
中的队尾是否为Closed
,如果是就不能再发送了,对于接收来收,它会先把缓存的数据取走,如果没有缓存的数据,就需要判断Queue
中的队尾是否为Closed
节点,如果是就不能挂起发送者了,这时候Channel
对接收方来说就是关闭的。
也就是说只要调用了close
,对与发送放来说,Channel
就是关闭了,不能再继续发了,但是对于接收方来说,只有Channel
中没有可被消费的数据后,Channel才是算关闭。
相对前面的发送和接收流程来说,关闭的源码就不要太简单了:
//调用close关闭Channel的时候可以传一个异常,这样做的目的是当有人注册了关闭回调的时候,可以知道Channel是
//因为什么原因关闭的。如果不传,cause默认为null.
public override fun close(cause: Throwable?): Boolean {//创建一个Closed对象val closed = Closed<E>(cause)//如果Queue中队尾没有Closed节点,就把创建的这个Closed对象添加到队尾,返回true,否则返回false.//比如说之前调用过close函数,那么Queue种就已经存在Closed节点了。val closeAdded = queue.addLastIfPrev(closed) { it !is Closed<*> }//actuallyClosed代表了已经被陈工添加到Queue队尾的Closed节点。val actuallyClosed = //如果添加成功,actuallyClosed就等于创建的Closed对象if (closeAdded) closed else //如果失败,actuallyClosed就等于Queue中尾部的Closedqueue.prevNode as Closed<*>//这个函数在前面发送流程里中讲send函数的时候已经详细讲过,此处就不在重复了。意思就是//如果Closed节点前面还有其他Receive节点,就把Receive节点全部移除,然后唤醒这些挂起的接收方协程//这些被唤醒的接收方协程要么收到一个异常终止,要么收到一个包含异常的ChannelResult对象。helpClose(actuallyClosed)//closeAdded加入成功,说名了本次close的调用是第一次。那么调用Channel关闭的回调,//Channel的关闭回调可以通过invokeOnClose设置。if (closeAdded) invokeOnCloseHandler(cause)//如果是第一次调用closed 就返回true。否则返回false.return closeAdded
}
取消流程
调用close
关闭一个channel
后,channel
中只要还有数据,接收方任然可以继续取出数据。那有没有一种方式,把Channel
彻底关闭了,不管是发送方和接收方都不能在继续操作了。答案肯定是有的,其实我们心中已经大概知道了,只要调用close
后,Channel
中不存在未被接收的收据,那么发送方和接收方都不能再继续操作了。cancel
就是这么干的,先对Channel
调用关闭操作,关闭后再对Channel
中可能存在的未被接收的数据做清除。因此我们可以把cancel
看做是close
的加强版。
/*** 和close函数一样,同样可以传入一个异常。因为在cannel内部回去调用close函数。*/
final override fun cancel(cause: CancellationException?) {//如果对于接收方来说Channel关闭了,意思就是Queue中第一个节点是Closed节点,并且Buffer中没有数据。//处于这种情况的Channel,也不能往里面发送数据了,也不能接收数据了,因此不要做额外的操作,直接返回if (isClosedForReceive) return//否则Channel可能没有被关闭,或者关闭了但是任有还未被接收的数据。调用cancelInternal去把Channel关闭//并且把未被接收的数据都清空。cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
}
internal fun cancelInternal(cause: Throwable?): Boolean =//调用close关闭Channelclose(cause).also {//清空Channel中可能存在未被接收的数据onCancelIdempotent(it)}
onCancelIdempotent清空Channel
中未被接收的数据:
- 对于
Rendezvous
如果存在未被接收的数据,那这数据可能是一个SendElement
(挂起的发送者)或者一个或者多个SendSelect
存在于Queue
中. - 对于
LinkedList
来说是一个或者多个SendBuffered
存在与Queue
中。 - 对于
Conflate
来说是存在Buffer
中的数据,Queue
中不会有任何Send
。 - 对于
ArrayList
来说是存在Buffer
中的数据和存在于Queue
中的一个SendElement
(挂起的发送者)或者一个或者多个SendSelect
.
对于不同Channel
类说,未被接收的数据存在的地方是不一样的,清除方式也是不一样的,父类是不知道如何完全清除子类中的数据,父类中能做的就是清除Queue
中的数据,其他的数据只有子类自己知道该如何清除,因此不同的子类都重写了onCancelIdempotent
来清除自己的数据。
//父类中清除Queue中数据的实现
protected open fun onCancelIdempotent(wasClosed: Boolean) {/*** closedForSend 返回一个对于发放方来说的Closed对象,意思就是如果Channel对于发送来说* 是关闭了,就把Queue中尾结点这个Closed对象返回出来。* 在调用onCancelIdempotent之前已经调用了close。因此closed不可能为空*/val closed = closedForSend ?: error("Cannot happen")//InlineList就当做一个简单的Listvar list = InlineList<Send>()while (true) {//获取到closed节点的前一个节点。val previous = closed.prevNode//如果前一个节点是哨兵节点。if (previous is LockFreeLinkedListHead) {//那就说明Queue中不存在Send类型节点。break}//确保previous是Send类型,否则就会抛异常assert { previous is Send }//把Send从Queue中移除,如果Send是SendElementWithUndeliveredHandler类型,移除的时候//还会调用onUndeliveredElement回调告知外界这个数据不会被接收了。if (!previous.remove()) {//如果移除失败,比如多线程问题导致,调用helpRemove()再次移除,总之就是一定要移除previous.helpRemove() continue}//把每一个移除的Send都存放在这个list中,为什么要存在list中呢?因为这些Send对象//很有可能代表了一个挂起发送方协程。如果移除了什么都不做,这些因为调用send挂起的协程将//永远一直挂起,或者因为调用onSend导致select这个挂起点没有机会被恢复。//因此把这些移除的Send存放在List中,然后调用onCancelIdempotentList集中对他们处理。list += previous as Send}onCancelIdempotentList(list, closed)
}
onCancelIdempotentList:就要做的事情就是让挂起的发送协程知道Channel
被取消了。但是这里存在一个问题,如果Send是一个SendBuffered
类型,它是不会导致发送方协程挂起的,因此就没有必要做让发送方协程知道Channel
被取消了,只需要回调onUndeliveredElement
即可,SendBuffered
是LinkedListChannel
独有的,为了解决这个问题,LinkedListChannel
重写了onCancelIdempotentList
函数,做了单独的特殊处理。
//父类中的实现方式能处理的Send类型为: SendElement | SendSelect
protected open fun onCancelIdempotentList(list: InlineList<Send>, closed: Closed<*>) {//遍历List中每一个Send对象list.forEachReversed { /*** SendElement:说明了有挂起的协程,那必须恢复啊,因此在resumeSendClosed中调用协程的* resumeWithException(closed.sendException),恢复协程并传入一个关闭异常,* * SendSelect:它是在一个select函数中调用了onSend生产的,因此在其resumeSendClosed中做两件事:* (1)让select状态变成选中状态,select中所有Channel的onSend发送的数据都将被* onUndeliveredElement回调(如果设置了)。注意是所有onSend的数据,包括本次SendSelect* (2)恢复被select挂起的协程并出入一个关闭异常。**/it.resumeSendClosed(closed) }
}
Channel
的几个子类就LinkedListChannel
重写了onCancelIdempotentList
,因此在此处就直接把他讲了:
//LinkedListChannel中的实现,它里面的Send类型为: SendBuffered | SendSelect
override fun onCancelIdempotentList(list: InlineList<Send>, closed: Closed<*>) {var undeliveredElementException: UndeliveredElementException? = null//同样遍历list中的每一个。list.forEachReversed {when (it) {is SendBuffered<*> -> {//这里做的事情就是,如果设置了onUndeliveredElement就回调,回调过程中如果发送了异常那么就把//异常返回复赋值给undeliveredElementException,否则undeliveredElementException为null.undeliveredElementException = onUndeliveredElement?.callUndeliveredElementCatchingException(it.element as E, undeliveredElementException)}//这里的else就是 SendSelect类型。和父类中处理逻辑一样。else -> it.resumeSendClosed(closed)}}//如果回调onUndeliveredElement时发生了异常,就直接抛出。undeliveredElementException?.let { throw it }
}
onCancelIdempotent
父类中的实现只是对Queue
中的Send
对象做了移除操作。Rendzvous
和Linkelist
他们发送的数据如果没有接收者都是被包装成Send
类型存入Queue
中的,因此他们两都是采用父类的默认实现方式。Conflate
和Array
两个Channel
有自己额外的Buffer
,因此他们有自己的清除方式:
ConflateChannel
的onCancelIdempotent
:
protected override fun onCancelIdempotent(wasClosed: Boolean) {var undeliveredElementException: UndeliveredElementException? = null lock.withLock {//把Buffer置空,并且回调onUndeliveredElement(如果设置了)//回调onUndeliveredElement时如果发生已经就返回,否则返回null.undeliveredElementException = updateValueLocked(EMPTY)}//对于Conflate来说,调用父类的onCancelIdempotent也不会做任何事情。super.onCancelIdempotent(wasClosed)//回调onUndeliveredElement是发生异常了,就直接抛出。undeliveredElementException?.let { throw it }
}
ArrayChannel
的onCancelIdempotent
:
override fun onCancelIdempotent(wasClosed: Boolean) {val onUndeliveredElement = onUndeliveredElementvar undeliveredElementException: UndeliveredElementException? = nulllock.withLock {//循环,循环的次数为Buffer中数据的数量repeat(size.value) {//取出第一个val value = buffer[head]//如果设置了onUndeliveredElement,并且第一个是有值的,if (onUndeliveredElement != null && value !== EMPTY) {//回调onUndeliveredElement,由于这个地方会回调多次,如果多次回调发生的异常都是//同一个异常,那么值保留一个,如果多次回调中发生了不同异常,每一次异常都会被最佳到//前一次异常里面,最终形成了一个异常链。undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(value as E, undeliveredElementException)}//把Buffer第一个位置置空,buffer[head] = EMPTY//指针加一,继续下一个数据。为什么要对size取余,是因为Buffer存取数据采用的是一个循环的。head = (head + 1) % buffer.size}//把缓存Buffer中的数据都移除后,把Buffer的size 置为 0size.value = 0}//调用父类中的onCancelIdempotent处理Queue中的Send对象(SendElement or SendSelect)super.onCancelIdempotent(wasClosed)//如果在回调onUndeliveredElement过程中发生了异常。就抛出。undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one
}
总结:cancel就是close的升级版,它除了做了close做的事情,还把Channel中存在未被接收到数据都给移除了(如果有)。移除这些未被接收到数据就会有相应的onUndeliveredElement回调和与之对应协程的恢复操作。
结语:没想到写这篇文章断断续续的写了好长时间,虽然Channel的源码没有什么难度,也没有用到什么高深的技术,但是逻辑还是有点复杂,所以想要彻底搞懂还得自己结合源码对照本文仔细揣摩。限于篇幅有限,ArrayChannel,ConflatedChannel,LinkedListChannel这三个的源码以稍后再以单独的文章发不出来。感兴趣的可以留个关注,或者关注本人微信公众号新文章发出来的时候第一时间收到通知。等不及的朋友们也可以自己研究研究。如果本文你看懂了,那么这三个的源码对你来说就是小菜一碟。
虽然ArrayChannel,ConflatedChannel,LinkedListChannel这三个的源码分析文章还没出来,不过我为大家准备了有关四个Channel的完整发送,接收代码执行流程图,但是由于图片太大了,没法看清楚,我把xmind源文件上传到了下面地址:Kotlin Channel 发送接收代码执行Xmind版流程图