第13章IM服务——13.5 消息的有序性保证

13.5 消息的有序性保证

即时通信就相当于现实生活中人与人之间的聊天,用户之间收发消息的有序性必然非常重要,因为如果出现不合乎逻辑的聊天内容,则会影响用户体验。本节我们将介绍在消息发送与消息接收的过程中哪些环节可能会发生乱序情况,以及针对各种乱序情况的处理思路。

13.5.1 消息乱序

我们先来分析哪些情况可能会造成消息乱序。

  • 客户端发送消息:如果客户端与服务端之间采用了短连接,即客户端每发送一条消息都与服务端建立一次连接,那么受限于公网环境的不确定性,用户发送3条消息A、B、C,最终到达服务端的消息顺序可能是C、B、A,从发送者的角度来说 ,消息发生了乱序。
  • 服务端存储消息:即使用户发送的消息A、B、C按顺序到达服务端,但是由于这3条消息可能是由不同的IM服务实例处理的,每个服务实例处理消息的延迟不同,且与数据库网络通信的延迟不同,最终消息也可能会被按照C、B、A的顺序存储,于是消息接收者在拉取消息时发生了乱序。
  • 服务端推送消息:即使按照发送者的本意消息被顺序存储起来,但是由于消息推送环节仍然要借助推送系统与目标用户客户端的网络连接,如果网络异常造成消息A丢失,只有消息B、C到达客户端,那么对于消息接收者来说,消息也仍然发生了乱序。

针对每个可能造成消息乱序的环节,下面我们分别提出对应的解决方案。

13.5.2 客户端发送消息

首先,从消息发送者的视角来说,消息发送者在每个会话中连续发送消息时,都应该保证这些消息被有序地投递到服务端。所以,在客户端可以按照会话与服务端建立长连接,每个会话独占一个长连接,用户在某会话中发送消息时会通过此会话与服务端的长连接推送请求,这样就可以保证发送者发送消息的请求有序到达后台机房网关。

其次,为了保证从机房网关传输消息到IM服务的有序性,机房网关应该将消息请求发送到固定的IM服务实例上。比如负载均衡策略采用消息会话ID进行哈希计算:将发送到同一个会话的消息请求都转发到同一个IM服务实例上,这样就可以保证消息按照发送顺序有序到达IM服务内部。

13.5.3 服务端存储消息

IM 服务与存储系统交互时,也要保证所接收到的用户消息被有序存储到会话消息链中,这就意味着对于同一个会话的消息来说,必须使用单个线程来串行存储数据。能够较为简单地实现这个目的的架构是采用消息队列(如Kafka),并以会话ID作为消息Key,IM服务一旦收到新消息就投递到Kafka中,使得同一个会话产生的消息都被串行存储到同一个分区(Partition)中;然后构建消费者,从Kafka中依次获取用户消息并写入会话消息链中。所以,IM服务被拆分为如下两个子服务。

  • IMAPI:负责接收客户端发送的用户消息,为用户消息生成唯一消息ID,再将消息投递到Kafka主题(Topic)conversation_im_topic中,并为所投递的消息设置消息Key为会话ID,然后直接响应给客户端。
  • 会话消息服务:作为conversation_im_topic的消费者,获取最新的用户消息并写入会话消息链中,然后根据与会话相关的用户列表决定将消息投递给哪些用户。

会话消息服务在确定消息投递的目标用户后,还要保证同一个目标用户的用户消息链中的消息也是有序的,于是我们以用户ID为消息Key再度引入消息队列。

  • 会话消息服务在获取到与会话相关的N个用户后,分别发送N条消息到Kafka主题user_im_topic中,并为每条消息设置消息Key为用户ID。
  • 创建用户消息服务,作为user_im_topic的消费者,将用户应该收到的新消息依次存储到用户消息链中,同时将新消息交给推送系统下发给这些用户。

conversation_im_topic和user_im_topic主题分别将会话ID和目标用户ID作为消息队列存储消息的Key,可以保证用户消息在会话消息链和用户消息链中的有序性。至此,IM服务被拆分为3个子服务:IM API、会话消息服务和用户消息服务,如图13-6所示OIM API 仅负责创建消息必要的基础信息,以及按照会话ID维度将消息投递到消息队列中;会话消息服务负责在存储系统中存储有序的会话消息链,以及进一步按照用户ID维度将消息 投递到消息队列中;用户消息服务负责在存储系统中构建有序的用户消息链,以及推送消息。

image-20250503222324446

13.5.4 服务端推送消息与客户端补偿

在将消息推送到客户端时可能会造成消息的丢失,进而导致客户端收到乱序消息。比如小王在线教不熟悉互联网的父亲老王怎么使用微信分享好友名片,小王顺序发送了3条消息。

  • 消息1:打开聊天框,点击加号打开功能面板。
  • 消息2:左滑找到并点击“名片”。
  • 消息3:上滑找到要分享的好友,点击“发送”按钮。

IM服务依次推送这3条消息,但是在推送过程中消息2丢失了,导致老王的客户端只收到消息1和消息3。由于客户端采用推拉结合模式获取消息,客户端之后在拉取消息时收到了消息2,最后老王收到的消息顺序是消息1、消息3、消息2。此顺序形成的消息上下文使得老王无法准确按照小王的指示学会分享好友名片,这就是消息推送可能造成的消息乱序情况。

推拉结合模式的消息推送与消息拉取没有统一的消息排序规则,这是造成消息触达客户端时可能发生乱序的根本原因。

  • 如果消息存储采用读扩散模式,那么消息推送和消息拉取都会基于会话消息链;

  • 如果消息存储采用写扩散模式,那么消息推送和消息拉取都会基于用户消息链。

为了准确描述消息排序规则,需要在会话消息链和用户消息链中为每条消息显式标记消息正确的顺序。我们在会话消息链和用户消息链的数据表中分别增加一个Seq属性来表示消息在链中的顺序,每当有一条新消息被插入会话消息链和用户消息链中时,都要保证新消息的Seq值一定比上一条消息的Seq值大,即Seq值要满足严格递增的特性。这样一来,一条消息的Seq就可以表示其在对应消息链中的准确顺序。

那么,如何保证Seq值严格递增呢?这件事情由将新消息写入会话消息链的会话消息服务和将新消息写入用户消息链的用户消息服务来保证。

  • 对于会话消息链来说,会话消息服务在本地内存中为每个会话ID都保存最近一条消息的Seq(初始值为0),每当收到一条新消息时,就将会话ID对应的Seq值加1并作为此消息的Seq值插入会话消息链中,这样可以保证按照消息的顺序递增Seq值。不过,如果会话消息服务有服务发版、扩缩容等导致服务实例重启的变更行为发生,那么本地内存中的Seq值会丢失。所以,我们要设计一个兜底策略:如果某会话ID对应的本地内存中的Seq值为0,则先从会话消息链中获取最后一条消息的Seq值 ,作为本地内存中的Seq值。
  • 用户消息链与会话消息链的原理非常相似,只不过在本地内存中换成了为每个用户ID保存最近一条消息的Seq值。

如此一来,将消息插入会话消息链和用户消息链中的流程改进如图13-7所示。

image-20250503222639179

从图13-7中可以看到,小王给老王发送的3条消息在两者的会话消息链中依次被赋予了100、101、102的 Seq值,而在老王的用户消息链中,这 3条消息依次被赋予了10000、 10001、10002的 Seq值。最后,服务端在处理消息触达目标用户客户端的逻辑时:

  • 在读扩散模式下,在推送消息时,推送的数据除了消息本身,还包括消息在会话消息链中的Seq值;
  • 在读扩散模式下,在拉取消息时,既拉取了消息本身,又拉取了消息在会话消息链中的Seq值;
  • 在写扩散模式下,在推送消息时,推送的数据除了消息本身,还包括消息在用户消息链中的Seq值;
  • 在写扩散模式下,在拉取消息时,既拉取了消息本身,又拉取了消息在用户消息链中的Seq值。

无论是读扩散模式还是写扩散模式,在拉取消息和推送消息时都会附带消息在对应消息链中的Seq值,而客户端在本地会以消息接收队列的形式记录已收到的消息Seq值,对应于服务端存储的会话消息链和用户消息链,如图13-8所示。

image-20250503222821942

如果IM服务采用读扩散模式,那么客户端会为每个会话都维护一个消息接收队列;如果IM服务采用写扩散模式,那么客户端只维护一个消息接收队列来表示用户消息链中已触达的消息。无论采用哪种模式,消息接收队列最重要的都是要记录当前已经收到的Seq值连续递增、无空洞的最后一条消息的Seq local_seq,这样就可以很容易地发现推送的消息是否有乱序情况发生。假设此时客户端收到一条消息,此消息的seq为current_seq那么:

  • 如果current_seq = local_seq+1,则说明所收到的消息是连续的,客户端可以展示给用户;
  • 如果current_seq < local_seq,则说明所收到的消息是重复的,客户端直接忽略即可;
  • 如果current_seq > local_seq + 1,则说明所收到的消息是乱序的,消息接收队列出现local_seq + 1至current_seq - 1的消息空洞,所以客户端只暂存此消息,而不展示给用户。此时客户端主动发起补偿:拉取Seq local_seq + 1至current_seq - 1的消息列表。

如上所述,local_seq控制哪些消息可以被展示给用户。如果客户端收到的消息Seq与local_seq并不是连续递增的,则说明必有消息丢失,于是客户端主动拉取丢失的消息。由此可知,客户端在实现推拉结合模式时,不仅要周期性地拉取消息,还要在有消息空洞时补偿拉取消息;客户端在拉取到消息后,更新local_seq为最新的连续消息的最大Seq值。

下面还是以小王给老王连续发送消息的例子来说明客户端接收消息的流程。假设IM服务采用的是写扩散模式,也就是客户端基于用户消息链的消息接收队列来进行消息排序。

  1. 小王发送消息1、消息2、消息3,在将这3条消息插入老王的用户消息链中时依次赋予其Seq值为10000、10001、10002,服务端推送这3条消息给老王。
  2. 老王收到消息1,客户端将local_seq更新为10000。
  3. 消息2丢失,老王的客户端感知不到消息2的到来。
  4. 消息3到来,其Seq值为10002,在老王的客户端此值不等于local_seq + 1,于是先不展示消息3,而是从服务端拉取Seq值为10002的消息。
  5. 客户端获取到消息2,最终3条消息连续,客户端展示消息2和消息3,并更新local_seq为10003。

最后,小王发送的3条消息被有序展示在老王的客户端。