来看下面代码:
class ChatManager {private val messages = mutableListOf<Message>()/*** 当收到消息时回调*/fun onMessageReceived(message: Message) {messages.add(message)}/*** 当删除消息时回调*/fun onMessageDeleted(message: Message) {messages.remove(message)}/*** 当消息成功发送到服务器时回调*/fun onMessageDeliveryStateChanged(messageId: String, state: DeliveryState) {val messageIndex = messages.indexOfFirst { it.id == messageId }if (messageIndex >= 0) {val message = messages[messageIndex]messages[messageIndex] = message.copy(deliveryState = state)}}
}data class Message(val id: String,val content: String,val senderId: String,val receiverId: String,val deliveryState: DeliveryState
)
enum class DeliveryState { UNDELIVERED, SENT, DELIVERED }
上面代码中 ChatManager
持有一个 mutableListOf
类型的属性成员 messages
,ChatManager
主要负责在接收消息、删除消息、发送消息时对消息状态进行管理。
我们思考一下,这个代码有什么问题呢?
如果你只是在单线程/主线程中调用这个ChatManager
类的相关方法,那么不会有任何问题,但是假如你在多个线程中调用这个类,比如在线程池中跑,那就不一定了。
想必你大概已经猜到了,出现问题的原因就是多个线程的情况下,不同的线程调用不同的方法对 messages
进行操作可能导致资源竞争,因此这里有潜在的并发安全问题。
举个例子,假如在多线程环境下我们有以下代码:
val chatManager = ChatManager()
...
chatManager.onMessageDeleted(message) // Thread1 正在访问这一行
...
// 同时,Thread2 正在访问这一行
chatManager.onMessageDeliveryStateChanged("abc", DeliveryState.DELIVERED)
这时会有什么问题呢?
假设程序按照上图标注的顺序执行, messages
集合列表中此时共有 [A, B, C, D, E]
5个消息对象,那么线程 2 首先查询到 index = 3
的消息(也就是D
),此时线程 1 同时在执行 onMessageDeleted
方法,删除了消息 D
,这之后,线程 2 开始进入 if
代码块执行,此时线程 2 并不知道有其他人修改了 messages
集合,那么它会按照 index = 3
来取出消息并修改它的状态,但是由于消息列表中的 D
被线程 1 删除了,列表变成 [A, B, C, E]
,因此这时线程 2 取到的index = 3
的消息会是 E
,那么结果就是本应该修改 D
的状态却阴差阳错地修改了 E
的状态!这就很要命了!
还没有完,假如 messages
集合列表只有 [A, B, C, D]
4个消息,同样按照上面的逻辑分析你会发现线程 2 这时取不到 index = 3
的消息了,因为被线程 1 删除了一个,消息列表不够 4 个了,这种情况下,你的应用可能会得到某种类似于 IndexOutOfBoundsException
的异常信息,如果你没有捕获处理异常,那么恭喜你,你的应用此时崩溃了!
所以,如果你没有意识到集合类可能在多线程下导致的并发安全问题,一旦产生这样的bug或异常,就会很棘手,很难发现问题的原因。
有人可能会想到,既然 MutableList
有问题,那么我用不可变的 List
不就可以了(严格说是只读的),于是代码可能会修改成下面这样:
class ChatManagerFixed {private var messages = listOf<Message>()/*** 当收到消息时回调*/fun onMessageReceived(message: Message) {messages += message}/*** 当删除消息时回调*/fun onMessageDeleted(message: Message) {messages -= message}/*** 当消息成功发送到服务器时回调*/fun onMessageDeliveryStateChanged(messageId: String, state: DeliveryState) {messages = messages.map { message ->if (message.id == messageId) {message.copy(deliveryState = state)} else message}}
}
注意,messages += message
、messages -= message
这样的操作每次都会产生一个新的 List
对象,就像 Java 的 String
类那样,每次操作都会产生一个新的不可变String
对象,这样应该没有问题了吧?
但实际上这个代码仍然存在并发安全隐患,问题就在于 messages += message
,它其实等价于下面代码:
messages = messages + message
很明显,这不是一个原子操作,涉及到 messages
变量的一次读操作和 messages
变量的一次写操作。假设有多个线程同时执行这段代码,依然会存在同步问题:
fun onMessageReceived(message: Message) {// List is initially []// Thread 1 adds "Message 1"// Thread 2 adds "Message 2"// Expected: ["Message 1", "Message 2"]// If thread 1 finishes first, the list will be ["Message 1"]// If thread 2 finishes first, the list will be ["Message 2"]messages = messages + message
}
如上面代码注释所示,如果 List
初始为空,有 2 个线程同时往里面添加消息,那么可能结果不会按照我们的预期那样。
一旦理解了问题所在,解决办法就很简单了,从 Java 过来的我们肯定有着解决并发问题的丰富经验,比如最简单的就是使用 Kotlin 提供的同步工具 synchronized
函数:
class ChatManagerFixed {private val lock = Any()private var messages = listOf<Message>()/*** 当收到消息时回调*/fun onMessageReceived(message: Message) {synchronized(lock) {messages += message}}/*** 当删除消息时回调*/fun onMessageDeleted(message: Message) {synchronized(lock) {messages -= message}}/*** 当消息成功发送到服务器时回调*/fun onMessageDeliveryStateChanged(messageId: String, state: DeliveryState) {synchronized(lock) {messages = messages.map { message ->if (message.id == messageId) {message.copy(deliveryState = state)} else message}}}
}
当然,如果你喜欢用 MutableList
,也是一样的解决方式:
class ChatManagerFixed {private val lock = Any()private val messages = mutableListOf<Message>()/*** 当收到消息时回调*/fun onMessageReceived(message: Message) {synchronized(lock) {messages.add(message)}}/*** 当删除消息时回调*/fun onMessageDeleted(message: Message) {synchronized(lock) {messages.remove(message)}}/*** 当消息成功发送到服务器时回调*/fun onMessageDeliveryStateChanged(messageId: String, state: DeliveryState) {synchronized(lock) {val messageIndex = messages.indexOfFirst { it.id == messageId }if (messageIndex >= 0) {val message = messages[messageIndex]messages[messageIndex] = message.copy(deliveryState = state)}}}
}
可以看到这个问题的解决并非难事,非常简单,困难的是如何发现这种问题,如果没有并发安全的意识,可能只能对着应用抛出的异常日志发呆而无从下手。
如果你使用 Kotlin 协程,在 Kotlin 协程中也提供了一些相应的并发工具,如
Mutex
、Semaphore
等,感兴趣的可以参考我的另一篇文章:【深入理解Kotlin协程】协程中的Channel和Flow & 协程中的线程安全问题