重构即时IM项目10: 上行消息可靠性实现
新闻动态
发布日期:2026-02-06 10:58 点击次数:53
上一节中我们分析了消息可靠性应该怎么做,这次我们进行具体实现。
上行消息可靠回顾要保证客户端发送消息可靠的话,我们就必须在客户端与服务端交互的 protocal.proto 文件中添加几种新的消息类型,首先是 MessageUp ,这是上行消息,其中包含客户端生成的唯一 ID,用于去重。还需要增加 MessageACK ,这是上行消息的回执,其中包含客户端消息 ID,告诉客户端哪条消息成功了,客户端就可以把消息从 Pending Queue 中移除。
我们现在讨论上行消息 ID 需要有序吗?首先如果 TCP 连接稳定的话,那么客户端向服务端发送的每条消息其实都是按照用户期待的顺序放入 TCP 缓冲区中的,此时 TCP 会尽力将数据送达对端。此时服务端如果收到消息的话,由于 TCP 字节流有序,所以服务端处理的顺序也是一样的,此时不需要递增的序列号额外保证有序性。
但是如果 TCP 连接不稳定,客户端处于弱网环境的话,我每次发送消息都直接调用 Write 把消息写入 FD 缓冲区中,那么当用户切换网络的时候 #后端,此时运营商分配给用户的 IP 地址可能变化,所以 TCP 连接也会发生变化,此时之前调用 Write 写入缓冲区的消息全部丢失了,用户需要重新输入之前发送的消息,造成不好的体验。所以我们需要一个 Pending Queue 存储客户端发送后等待 ACK 的消息,用户每次发送消息就往这个 Pending Queue 中 Append 一条消息,此时消息的有序性就体现在 Pending Queue 中了,重新建立新的 TCP 连接也没事,把 Pending Queue 中的消息按照顺序写入 TCP 缓冲区然后发送出去就好了。
此时 Pending Queue 还需要维护重传定时器,理由和上面弱网 TCP 连接一样,如果连接断开又重建,之前写入到 TCP 缓冲区中的消息全部丢失,就算重传之后服务端回复 ACK 对应连接也收不到,因为现在客户端使用的是新 TCP 连接的缓冲区。所以 Pending Queue 需要超时重传,定时器触发之后就重传 Pending Queue 对应消息,这样就算新建了连接,也是重新写入新的 TCP 缓冲区,由新的 TCP 协议确保消息送达对端。
那么是否需要序列号呢,这需要看 Pending Queue 的发送机制。如果是停止等待协议的话就不需要序列号,只有当 Pending Queue 头部的消息 A 明确收到服务端的成功 ACK 后,才允许将下一条消息 B 写入 TCP 缓冲区,如果 A 超时且未收到回执,客户端将超时重试 A,不会跳过 A 去发送 B。此时很好理解,因为这种模式下服务端收到的消息顺序永远是 Pending Queue 中的顺序,即时发生网络重连也不会产生影响,此时只需要去重就可以了。
第二种是并发发送,这种方式允许客户端一口气全部把 Pending Queue 中的消息全部写入 TCP 缓冲区,客户端不等待 ACK,只要 Pending Queue 有消息就直接发送,此时假如服务端的处理是收到一个消息就交给一个协程处理,那么此时就需要序列号机制,因为可能 A 消息处理慢,B 消息处理快导致服务端认为 B 消息在 A 消息前,所以需要序列号机制帮助服务端确认消息时序。
虽然客户端并发发送,但在我们的服务端架构中,Gateway 使用了 EPOLLONESHOT 机制,保证了对同一个 TCP 连接的事件处理是严格串行的,所以目前架构中我们不需要引入递增的序列号。
上行消息设计总结协议设计部分,新增 MessageACK 消息类型,必须包含 uuid 字段,用于服务端向客户端确认消息已成功落库,MessageUp 必须携带客户端生成的唯一 uuid (UUIDv4),用于服务端去重。
客户端设计部分,维护一个持久化的 Pending Queue,所有用户发送的消息先入队,再尝试发送。Queue 需要维护超时重传定时器,对队列中已发送但未收到 ACK 的消息(如超过 5s)触发重传,当 Pending Queue 有消息时就尝试发送。
服务端设计部分,redis 或者本地内存表记录 uuid,防止因重传导致的重复处理,去重通过之后,执行消息落库+转发逻辑,执行成功之后就给客户端发送 ACK 防止重传。
上行消息代码实现首先新增消息类型,MessageUP 和 MessageACK :
// MessageUp 上行聊天消息
message MessageUp {
string uuid = 1; // 消息唯一ID
int32 type = 2; // 消息类型 (文本/图片等)
string content = 3; // 消息内容
string receiver_id = 4; // 接收者ID (群聊则是群ID)
}
// MessageAck 回执 enum MessageAckStatus { ACK_OK = 0; ACK_RETRY = 1; ACK_FAIL = 2; }
message MessageAck { string uuid = 1; MessageAckStatus status = 2; }
新增协议之后我们来看客户端的处理:
type pendingItem struct {
uuid string
payloadBytes []byte
lastSendAt time.Time
retryCount int
}
type pendingQueue struct { items []pendingItem inflight *pendingItem mu sync.Mutex }
这就是前面分析的客户端维护的 Pending Queue,里面维护了所有用户待发送的消息,用户每次点击发送都是在往 items 中 append 一条新消息。注意 inflight 表示正在等待收到 ACK 的消息,这里只有一个,是因为我当前实现做的是停止等待协议,后续会讨论怎么优化为选择重传协议。
func (q *pendingQueue) markAck(uuid string) {
q.mu.Lock
defer q.mu.Unlock
if q.inflight != nil && q.inflight.uuid == uuid {
q.items = q.items[1:]
q.inflight = nil
return
}
}
func (q *pendingQueue) markRetry(uuid string) { q.mu.Lock defer q.mu.Unlock if q.inflight != nil && q.inflight.uuid == uuid { q.inflight.retryCount++ q.inflight.lastSendAt = time.Time{} return } }
这两个方法没什么好说的,就是停止等待协议的实现,看一下就好。
客户端点击发送消息时,消息被 append 到 Pending Queue 中,然后 Pending Queue 会发送队头消息,并且阻塞在这里等待收到服务端 ACK,客户端消息被包装成 MessageUP ,然后被装入 Pending Queue 对应的 item 的 payloadBytes 字段,之后 payloadBytes 字段再被包装为 protocal.Command 对象,然后就可以发送了,和之前的包装逻辑差不多,只不过多了一层 Pending Queue 中 item 的包装。
下面是调用链路:
func enqueueAndTrySend(c *websocket.Conn, content string) {
msgPayload := &protocol.MessageUp{
Uuid: uuid.NewString,
Type: 1,
Content: content,
ReceiverId: "1002", // 测试用
}
data, _ := proto.Marshal(msgPayload)
q.append(pendingItem{uuid: msgPayload.Uuid, payloadBytes: data})
sendHead(c)
}
func sendHead(c *websocket.Conn) { item := q.head if item == nil { return } cmd := &protocol.Command{ Type: protocol.CommandType_MESSAGEUP, Data: item.payloadBytes, } item.lastSendAt = time.Now sendProto(c, cmd) }
func sendProto(c *websocket.Conn, cmd *protocol.Command) { bytes, err := proto.Marshal(cmd) if err != nil { log.Println("marshal error:", err) return }
err = c.WriteMessage(websocket.BinaryMessage, bytes) if err != nil { log.Println("write error:", err) } else { log.Printf("sent command: type=%v", cmd.Type) } }
之后是超时重传方法,这里也只针对队头在途消息:
func tryResendIfTimeout(c *websocket.Conn, timeout time.Duration) {
item := q.head
if item == nil {
return
}
if time.Since(item.lastSendAt) >= timeout {
item.retryCount++
cmd := &protocol.Command{ Type: protocol.CommandType_MESSAGEUP, Data: item.payloadBytes, } sendProto(c, cmd) } }
有上面这些基础设施之后,很容易得出客户端处理 ACK 的代码:
case protocol.CommandType_MESSAGEACK:
if len(cmd.Data) > 0 {
var ack protocol.MessageAck
if err := proto.Unmarshal(cmd.Data, &ack); err == nil {
if ack.Uuid != "" {
switch ack.Status {
case protocol.MessageAckStatus_ACK_OK:
q.markAck(ack.Uuid)
log.Printf("recv ACK_OK: uuid=%s", ack.Uuid)
sendHead(c)
case protocol.MessageAckStatus_ACK_RETRY:
q.markRetry(ack.Uuid)
log.Printf("recv ACK_RETRY: uuid=%s", ack.Uuid)
sendHead(c)
case protocol.MessageAckStatus_ACK_FAIL:
// 可添加错误处理相关逻辑
log.Printf("recv ACK_FAIL: uuid=%s", ack.Uuid)
}
}
break
}
}
下面来看 StateServer 是怎么处理上行消息的。收到 MessageUp 之后,先反序列化得到对应消息结构,然后用 Redis 的 SETNX 作为幂等的快速判定,若 Redis 异常即返回 ACK_RETRY ,其余情况统一返回 ACK_OK (包括重复的 uuid),如果重传或重复提交同一个 uuid,SETNX 都会返回 false,服务端据此判断为重复,不再重复处理或落库。处理完去重和落库之后就返回 MessageACK 给客户端。
func (s *Service) handleMessageUp(ctx context.Context, cmd *protocol.Command, req *pb.ReceiveMessageRequest) (*pb.ReceiveMessageResponse, error) {
var msgCmd protocol.MessageUp
if err := proto.Unmarshal(cmd.Data, &msgCmd); err != nil {
return nil, err
}
log.Printf("[StateServer] Handle Message: From=%s, To=%s, Content=%s", req.Uid, msgCmd.ReceiverId, msgCmd.Content) ttl := 1 *
status := protocol.MessageAckStatus_ACK_OK if s.rdb != nil { if s.rdb.Ping(ctx).Err == nil { key := fmt.Sprintf("MsgUUID:%s:%s", req.Uid, msgCmd.Uuid) ok, err := s.rdb.SetNX(ctx, key, "1", ttl).Result if err != nil { status = protocol.MessageAckStatus_ACK_RETRY } else { if ok { // TODO: 落库处理 status = protocol.MessageAckStatus_ACK_OK } else { // 重传幂等情况 status = protocol.MessageAckStatus_ACK_OK } } } }
ack := &protocol.MessageAck{ Uuid: msgCmd.Uuid, Status: status, } ackBytes, _ := proto.Marshal(ack) respCmd := &protocol.Command{ Type: protocol.CommandType_MESSAGEACK, Code: 0, Data: ackBytes, } respBytes, _ := proto.Marshal(respCmd)
return &pb.ReceiveMessageResponse{ ResponsePayload: respBytes, }, nil }
注意这里 SETNX 操作的幂等 Key 需要设置 TTL,这样可以形成一个去重窗口,在窗口内的重试与重传都能被拦住。窗口外的极端迟到消息会重新进入最终幂等层(DB UNIQUE(uuid))兜底,SETNX 更像快速层的幂等判定,负责高效挡住绝大多数重复;数据库唯一约束负责最终层的幂等保证,避免 Redis 故障或 TTL 过期导致的漏判。需要注意的是,TTL 要覆盖客户端的最大重试周期,Redis 异常时不要误回 ACK_OK,改用 ACK_RETRY 引导客户端退避,以及控制键数量与过期策略,避免无界内存增长。
上行消息实现滑动窗口的拓展分析在上行消息的客户端实现中,我们选择了停止等待协议实现消息可靠性,这套协议的思想十分简单,客户端为每条消息生成一个全局唯一的 uuid,将其 append 到待发送队列,发送出去并等待服务端回执。服务端在处理之后回复 ACK,客户端收到 ACK 之后,决定是出队并继续发送下一条,还是立刻重试,或记录失败并推进。这种逐条消息的串行发送与网关的 EPOLLONESHOT 串行处理形成合力,在不引入额外序号与乱序处理的前提下满足会话内顺序与幂等的双重约束。
这种方案的最大优点就是实现简单,状态机只有一个在途消息,逻辑容易验证,而且只有一个在途消息,顺序自然成立,不需要维护复杂的窗口和做乱序重排,幂等也只需要直接对客户端发送过来的 Key 去 Redis 做 SETNX 就好了,因为客户端每次只会发送一条消息然后等待重试。总体而言,这种设计十分简单,非常适合当前阶段设计的初步实现。
缺点显而易见,一次只能在途一条,RTT 增大或链路抖动时,整体延迟会线性拉长,而且还有队头阻塞问题,一旦某条消息临时失败或需要反复重试,会卡住后续全部消息。
此时我们可以实现选择重传协议,它允许并行在途 N 条,ACK 可以乱序到达,窗口按已确认集合滑动,只重传未确认的缺口,这样可以极大提高吞吐量,解决队头阻塞和反复重传问题,但是这种协议设计的复杂度也很高。客户端要维护 inflight 集合与独立的重试调度(最小堆或时间轮),ACK 乱序到达时只移除已确认项并滑动窗口,对未确认的缺口选择性重传,重连后从持久化队列恢复 inflight,按窗口大小批量重放,幂等靠 uuid 保证不重复处理。
客户端的超时与重试也必须自适应,避免重发风暴。在上面的实现中超时是固定的5s,但是我们应该采样 RTT 计算 RTO,结合重试退避算法,比如说指数退避,还可以在重试时间中加随机抖动,避免大量客户端在同一时间重试。这样可以避免重试风暴。
由于客户端可能一次性发送许多消息过来,服务端需要维护客户端消息的顺序性,考虑如下情况:客户端一次发送 AB 两个消息之后,A 消息处理时可能与 Redis、数据库或者 grpc 交互失败,所以返回 Message_ACK_Retry 提示客户端重试,客户端重传 A 消息给服务端,但是 B 消息已经收到并且落库,在服务端看来 A 是比 B 晚的消息,所以落库顺序是 BA,但是客户端期望的顺序是 AB,这里就发生了错误。
对于上述问题服务端有几种解决策略,最暴力的解法就是让消息 ID 可以体现消息的顺序,客户端给每条消息分配 ID 的时候都是使用 SessionID + SeqID,其中 SeqID 是在会话内递增的 ID,表示消息在会话内有序。此时客户端连续发送 AB 两条消息就有 ID(A)= ID(B)- 1,这样的话服务端记录上一次处理收到的消息,比如说3,那么之后 A 的 ID 就是4,B 的 ID 就是5。服务端收到 A 消息之后发现消息 ID 是4,恰好等于3+1,此时可以进行处理,但是 A 消息落库失败了,此时服务端给客户端返回重传 ACK 提示客户端重传。然后服务端收到消息 B,发现5 != 3 + 1,表示中间有消息漏洞,直接丢弃这条消息,等到 A 消息再次到达,如果落库成功的话就把服务端上次处理的 ID 变为4,等到消息 B 到达之后再落库顺序就对了,服务端落库的顺序也是 AB,所以消息顺序一致性实现。
上面这种方法在网络情况良好时允许的很好,因为客户端通过 TCP 连接将窗口内的消息逐个发送出去,此时服务端解析 TCP 缓冲区,收到的消息顺序恰好是客户期待的顺序,所以会满足 ID = prevID + 1,可以直接处理,处理完之后再更新 prevID,然后再检测发现依旧符合条件,所以可以继续处理。
但它的缺点也很明显,一旦进入真实环境,严格 ID=prevID+1的接收门槛会把接收端变成单会话串行队列,弱网抖动、服务端短暂失败都会导致吞吐量大大降低。而且这么做通信复杂度也会变高,因为 StateServer 是无状态的,需要把对应会话的 PrevID 存储在 Redis 中做共享,每次进行处理都需要与 Redis 进行通信,而且一旦 Redis 宕机,又要处理一致性的问题,实现复杂度大大升高。
我们来尝试优化一下上面这个策略,其实主要就是收到不是 PrevID 的下一个消息时不要丢弃,而是存储到缓冲区中,一旦缓冲区中的所有消息都形成连续递增区间,比如2->3->4......这种,就一次性按顺序弹出并推送。为了避免拖慢和占用过多资源,给缓冲区设定两个硬约束:窗口上限(如几十条)和等待超时(如几十到几百毫秒)。超出时间限制还是有缺口的话也有取舍,服务端可以选择直接发送缓冲区中已有的,然后给用户返回 ACK,让用户知道有一条消息没发出去,自己重传,还可以直接全部丢弃,提示用户网络不好。
这样的话,PrevID 只用来推进已知连续段,不会把接收端变成单会话串行队列,也不需要把 PrevID 做成跨实例强一致的共享状态;配合用户粘滞路由,PrevID 可以仅在本实例内存维护,周期性做轻量 checkpoint,故障切换时回退到并发接收+逻辑顺序重排的安全模式即可。
这部分是补充客户端持久化PendingQueue的代码,现在主要就是直接写文件,每次修改了Pending Queue结构的地方都调用save方法,后续可以采用这几种方式优化。
异步写:前台只更新内存并把"保存任务"投递到一个有缓冲的队列;后台单线程消费者统一序列化与落盘。
合并多个事件:在窗口期(如 100–300ms)收集多次变更只生成一次写任务,同时设置阈值触发(累计 N 次变更或新增 M 条目即触发)。
定时刷盘:用定时器每 T 毫秒触发写入。
func (q *pendingQueue) initStorage {
dir := filepath.Join("test", "message")
_ = os.MkdirAll(dir, 0755)
q.filePath = filepath.Join(dir, "pending.json")
}
func (q *pendingQueue) load { if q.filePath == "" { return } b, err := os.ReadFile(q.filePath) if err != nil