IM系统基础-IM系统结构

IM系统基础-IM系统结构

从一个 IM 系统开发者的角度看,聊天系统大概由这几大部分组成:客户端、接入服务、业务处理服务、存储服务和外部接口服务。

im1.png

**客户端。**客户端一般是用户用于收发消息的终端设备,内置的客户端程序和服务端进行网络通信,用来承载用户的互动请求和消息接收功能。

**接入服务。**接入服务可以认为是服务端的门户,为客户端提供消息收发的出入口。发送的消息先由客户端通过网络给到接入服务,然后再由接入服务递交到业务层进行处理。

接入服务主要有四块功能:

  • 连接保持

    当服务端有消息需要推送给客户端时,也是将经过业务层处理的消息先递交给接入层,再由接入层通过网络发送到客户端。

  • 协议解析

    在很多基于私有通信协议的 IM 系统实现中,接入服务还提供协议的编解码工作,编解码实际主要是为了节省网络流量,系统会针对传输的内容进行紧凑的编码(比如 Protobuf),为了让业务处理时不需要关心这些业务无关的编解码工作,一般由接入层来处理。

  • Session 维护

    session 的作用是标识“哪个用户在哪个 TCP 连接”,用于后续的消息推送能够知道,如何找到接收人对应的连接来发送。

  • 消息推送

    接入服务还负责最终消息的推送执行,也就是通过网络连接把最终的消息从服务器传输送达到用户的设备上。

**业务处理服务。**业务处理服务是真正的消息业务逻辑处理层,比如消息的存储、未读数变更、更新最近联系人等,这些内容都是业务处理的范畴。

我们可以想象得到,业务处理服务是整个 IM 系统的中枢大脑,负责各种复杂业务逻辑的处理。

就好比你的信到达分拨中心后,分拨中心可能需要给接收人发条短信告知一下,或者分拨中心发现接收人告知过要拒绝接收这个发送者的任何信件,因此会在这里直接把信件退回给发信人。

**存储服务。**这个比较好理解,账号信息、关系链,以及消息本身,都需要进行持久化存储。

另外一般还会有一些用户消息相关的设置,也会进行服务端存储,比如:用户可以设置不接收某些人的消息。我们可以把它理解成辖区内所有人的通信地址簿,以及储存信件的仓库。

**外部接口服务。**由于手机操作系统的限制,以及资源优化的考虑,大部分 App 在进程关闭,或者长时间后台运行时,App 和 IM 服务端的连接会被手机操作系统断开。这样当有新的消息产生时,就没法通过 IM 服务再触达用户,因而会影响用户体验。

为了让用户在 App 未打开时,或者在后台运行时,也能接收到新消息,我们会将消息给到第三方外部接口服务,来通过手机操作系统自身的公共连接服务来进行操作系统级的“消息推送”,通过这种方式下发的消息一般会在手机的“通知栏”对用户进行提醒和展示。

这种最常用的第三方系统推送服务有苹果手机自带的 APNs(Apple Push Notification service)服务、安卓手机内置的谷歌公司的 GCM(Google Cloud Messaging)服务等。

但 GCM 服务在国内无法使用,为此很多国内手机厂商在各自手机系统中,也提供类似的公共系统推送服务,如小米、华为、OPPO、vivo 等手机厂商都有相应的 SDK 提供支持。

消息队列(二)-消息幂等

消息队列(二)-消息幂等

什么是幂等

幂等是一个数学上的概念,它的含义是多次执行同一个操作和执行一次操作,最终得到的结果是相同的。

如果我们消费一条消息的时候,要给现有的库存数量减 1,那么如果消费两条相同的消息就会给库存数量减 2,这就不是幂等的。而如果消费一条消息后,处理逻辑是将库存的数量设置为 0,或者是如果当前库存数量是 10 时则减 1,这样在消费多条消息时,所得到的结果就是相同的,这就是幂等的。

**说白了,你可以这么理解“幂等”:**一件事儿无论做多少次都和做一次产生的结果是一样的,那么这件事儿就具有幂等性。

在生产、消费过程中增加消息幂等性的保证

消息在生产和消费的过程中都可能会产生重复,所以你要做的是,在生产过程和消费过程中增加消息幂等性的保证,这样就可以认为从最终结果上来看,消息实际上是只被消费了一次的。

**在消息生产过程中,**在 Kafka0.11 版本和 Pulsar 中都支持“producer idempotency”的特性,翻译过来就是生产过程的幂等性,这种特性保证消息虽然可能在生产端产生重复,但是最终在消息队列存储时只会存储一份。

它的做法是给每一个生产者一个唯一的 ID,并且为生产的每一条消息赋予一个唯一 ID,消息队列的服务端会存储 < 生产者 ID,最后一条消息 ID> 的映射。当某一个生产者产生新的消息时,消息队列服务端会比对消息 ID 是否与存储的最后一条 ID 一致,如果一致,就认为是重复的消息,服务端会自动丢弃。

loss4.jpg

而在消费端,幂等性的保证会稍微复杂一些,你可以从通用层和业务层两个层面来考虑。

在通用层面,你可以在消息被生产的时候,使用发号器给它生成一个全局唯一的消息 ID,消息被处理之后,把这个 ID 存储在数据库中,在处理下一条消息之前,先从数据库里面查询这个全局 ID 是否被消费过,如果被消费过就放弃消费。

你可以看到,无论是生产端的幂等性保证方式,还是消费端通用的幂等性保证方式,它们的共同特点都是为每一个消息生成一个唯一的 ID,然后在使用这个消息的时候,先比对这个 ID 是否已经存在,如果存在,则认为消息已经被使用过。所以这种方式是一种标准的实现幂等的方式,**你在项目之中可以拿来直接使用,**它在逻辑上的伪代码就像下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
boolean isIDExisted = selectByID(ID); // 判断 ID 是否存在

if(isIDExisted) {

return; // 存在则直接返回

} else {

process(message); // 不存在,则处理消息

saveID(ID); // 存储 ID

}

**不过这样会有一个问题:**如果消息在处理之后,还没有来得及写入数据库,消费者宕机了重启之后发现数据库中并没有这条消息,还是会重复执行两次消费逻辑,这时你就需要引入事务机制,保证消息处理和写入数据库必须同时成功或者同时失败,但是这样消息处理的成本就更高了,所以,如果对于消息重复没有特别严格的要求,可以直接使用这种通用的方案,而不考虑引入事务。

**在业务层面怎么处理呢?**这里有很多种处理方式,其中有一种是增加乐观锁的方式。比如,你的消息处理程序需要给一个人的账号加钱,那么你可以通过乐观锁的方式来解决。

**具体的操作方式是这样的:**你给每个人的账号数据中增加一个版本号的字段,在生产消息时先查询这个账户的版本号,并且将版本号连同消息一起发送给消息队列。消费端在拿到消息和版本号后,在执行更新账户金额 SQL 的时候带上版本号,类似于执行:

1
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;

你看,我们在更新数据时给数据加了乐观锁,这样在消费第一条消息时,version 值为 1,SQL 可以执行成功,并且同时把 version 值改为了 2;在执行第二条相同的消息时,由于 version 值不再是 1,所以这条 SQL 不能执行成功,也就保证了消息的幂等性。

消息队列(一)-如何解决消息丢失

消息队列(一)-如何解决消息丢失

消息会丢失的环节

消息从被写入到消息队列,到被消费者消费完成,这个链路上会有哪些地方存在丢失消息的可能呢?其实,主要存在三个场景:

  • 消息从生产者写入到消息队列的过程。

  • 消息在消息队列中的存储场景。

  • 消息被消费者消费的过程。

lost1.jpg

1. 在消息生产的过程中丢失消息

消息的生产者一般是我们的业务服务器,消息队列是独立部署在单独的服务器上的。两者之间的网络虽然是内网,但是也会存在抖动的可能,而一旦发生抖动,消息就有可能因为网络的错误而丢失。

**针对这种情况,我建议你采用的方案是消息重传:**也就是当你发现发送超时后你就将消息重新发一次,但是你也不能无限制地重传消息。一般来说,如果不是消息队列发生故障,或者是到消息队列的网络断开了,重试 2~3 次就可以了。

不过,这种方案可能会造成消息的重复,从而导致在消费的时候会重复消费同样的消息。比方说,消息生产时由于消息队列处理慢或者网络的抖动,导致虽然最终写入消息队列成功,但在生产端却超时了,生产者重传这条消息就会形成重复的消息。

2. 在消息队列中丢失消息

拿 Kafka 举例,消息在 Kafka 中是存储在本地磁盘上的,而为了减少消息存储时对磁盘的随机 I/O,我们一般会将消息先写入到操作系统的 Page Cache 中,然后再找合适的时机刷新到磁盘上。

比如,Kafka 可以配置当达到某一时间间隔,或者累积一定的消息数量的时候再刷盘,也就是所说的异步刷盘。

不过,如果发生机器掉电或者机器异常重启,那么 Page Cache 中还没有来得及刷盘的消息就会丢失了。那么怎么解决呢?

你可能会把刷盘的间隔设置很短,或者设置累积一条消息就就刷盘,但这样频繁刷盘会对性能有比较大的影响,而且从经验来看,出现机器宕机或者掉电的几率也不高,所以我不建议你这样做。

loss2.jpg

如果你的系统对消息丢失的容忍度很低,那么你可以考虑以集群方式部署 Kafka 服务,通过部署多个副本备份数据,保证消息尽量不丢失。

那么它是怎么实现的呢?

Kafka 集群中有一个 Leader 负责消息的写入和消费,可以有多个 Follower 负责数据的备份。Follower 中有一个特殊的集合叫做 ISR(in-sync replicas),当 Leader 故障时,新选举出来的 Leader 会从 ISR 中选择,默认 Leader 的数据会异步地复制给 Follower,这样在 Leader 发生掉电或者宕机时,Kafka 会从 Follower 中消费消息,减少消息丢失的可能。

由于默认消息是异步地从 Leader 复制到 Follower 的,所以一旦 Leader 宕机,那些还没有来得及复制到 Follower 的消息还是会丢失。为了解决这个问题,Kafka 为生产者提供一个选项叫做“acks”,当这个选项被设置为“all”时,生产者发送的每一条消息除了发给 Leader 外还会发给所有的 ISR,并且必须得到 Leader 和所有 ISR 的确认后才被认为发送成功。这样,只有 Leader 和所有的 ISR 都挂了,消息才会丢失。

loss3.jpg

从上面这张图来看,当设置“acks=all”时,需要同步执行 1,3,4 三个步骤,对于消息生产的性能来说也是有比较大的影响的,所以你在实际应用中需要仔细地权衡考量。我给你的建议是:

  1. 如果你需要确保消息一条都不能丢失,那么建议不要开启消息队列的同步刷盘,而是需要使用集群的方式来解决,可以配置当所有 ISR Follower 都接收到消息才返回成功。

  2. 如果对消息的丢失有一定的容忍度,那么建议不部署集群,即使以集群方式部署,也建议配置只发送给一个 Follower 就可以返回成功了。

  3. 我们的业务系统一般对于消息的丢失有一定的容忍度,比如说以上面的红包系统为例,如果红包消息丢失了,我们只要后续给没有发送红包的用户补发红包就好了。

3. 在消费的过程中存在消息丢失的可能

我还是以 Kafka 为例来说明。一个消费者消费消息的进度是记录在消息队列集群中的,而消费的过程分为三步:接收消息、处理消息、更新消费进度。

这里面接收消息和处理消息的过程都可能会发生异常或者失败,比如说,消息接收时网络发生抖动,导致消息并没有被正确的接收到;处理消息时可能发生一些业务的异常导致处理流程未执行完成,这时如果更新消费进度,那么这条失败的消息就永远不会被处理了,也可以认为是丢失了。

所以,在这里你需要注意的是,一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后,消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后,还会重复地消费这条消息。

4. 最佳实践

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  5. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。