From fef8f0c18c4e06c09739e50391d55da3d44e8326 Mon Sep 17 00:00:00 2001 From: oneRain Date: Thu, 11 Jun 2020 11:33:11 +0800 Subject: [PATCH] * LCLiveQueryHeartBeat.cs: * LCLiveQueryConnection.cs: * LCConnection.cs: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * LCHeartBeat.cs: chore: 重构 ping/pong 机制 --- .../Internal/LCLiveQueryConnection.cs | 22 ++-- .../Internal/LCLiveQueryHeartBeat.cs | 44 ++----- .../Internal/Connection/LCConnection.cs | 30 ++--- .../Internal/Connection/LCHeartBeat.cs | 109 +++++++++++------- 4 files changed, 105 insertions(+), 100 deletions(-) diff --git a/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs b/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs index 7203eac..cfeeb34 100644 --- a/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs +++ b/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs @@ -6,7 +6,7 @@ using Newtonsoft.Json; using LeanCloud.Realtime.Internal.Router; using LeanCloud.Realtime.Internal.WebSocket; using LeanCloud.Common; -using LeanCloud.Storage; +using LeanCloud.Realtime.Internal.Connection; namespace LeanCloud.LiveQuery.Internal { public class LCLiveQueryConnection { @@ -63,7 +63,7 @@ namespace LeanCloud.LiveQuery.Internal { public LCLiveQueryConnection(string id) { this.id = id; responses = new Dictionary>>(); - heartBeat = new LCLiveQueryHeartBeat(this); + heartBeat = new LCLiveQueryHeartBeat(this, OnPingTimeout); router = new LCRTMRouter(); client = new LCWebSocketClient { OnMessage = OnClientMessage, @@ -82,6 +82,8 @@ namespace LeanCloud.LiveQuery.Internal { LCLogger.Debug($"Secondary Server"); await client.Connect(rtmServer.Secondary, SUB_PROTOCOL); } + // 启动心跳 + heartBeat.Start(); } catch (Exception e) { throw e; } @@ -92,10 +94,11 @@ namespace LeanCloud.LiveQuery.Internal { /// /// internal async Task Reset() { + heartBeat?.Stop(); // 关闭就连接 await client.Close(); // 重新创建连接组件 - heartBeat = new LCLiveQueryHeartBeat(this); + heartBeat = new LCLiveQueryHeartBeat(this, OnPingTimeout); router = new LCRTMRouter(); client = new LCWebSocketClient { OnMessage = OnClientMessage, @@ -151,7 +154,6 @@ namespace LeanCloud.LiveQuery.Internal { } private void OnClientMessage(byte[] bytes) { - _ = heartBeat.Refresh(OnPingTimeout); try { string json = Encoding.UTF8.GetString(bytes); Dictionary msg = JsonConvert.DeserializeObject>(json, @@ -177,8 +179,12 @@ namespace LeanCloud.LiveQuery.Internal { LCLogger.Error($"No request for {requestIndex}"); } } else { - // 通知 - OnNotification?.Invoke(msg); + if (json == "{}") { + heartBeat.Pong(); + } else { + // 通知 + OnNotification?.Invoke(msg); + } } } catch (Exception e) { LCLogger.Error(e); @@ -194,7 +200,9 @@ namespace LeanCloud.LiveQuery.Internal { private async void OnPingTimeout() { await client.Close(); - OnClientDisconnect(); + OnDisconnect?.Invoke(); + // 重连 + _ = Reconnect(); } private async Task Reconnect() { diff --git a/LiveQuery/LiveQuery/Internal/LCLiveQueryHeartBeat.cs b/LiveQuery/LiveQuery/Internal/LCLiveQueryHeartBeat.cs index a0fec18..6dd5262 100644 --- a/LiveQuery/LiveQuery/Internal/LCLiveQueryHeartBeat.cs +++ b/LiveQuery/LiveQuery/Internal/LCLiveQueryHeartBeat.cs @@ -1,55 +1,27 @@ using System; using System.Threading; using System.Threading.Tasks; +using LeanCloud.Realtime.Internal.Connection; using System.Collections.Generic; namespace LeanCloud.LiveQuery.Internal { /// /// LiveQuery 心跳控制器 /// - internal class LCLiveQueryHeartBeat { - private const int PING_INTERVAL = 5000; - private const int PONG_INTERVAL = 5000; - + internal class LCLiveQueryHeartBeat : LCHeartBeat { private readonly LCLiveQueryConnection connection; - private CancellationTokenSource pingCTS; - private CancellationTokenSource pongCTS; - - internal LCLiveQueryHeartBeat(LCLiveQueryConnection connection) { + internal LCLiveQueryHeartBeat(LCLiveQueryConnection connection, + Action onTimeout) : base(onTimeout) { this.connection = connection; } - internal async Task Refresh(Action onTimeout) { - LCLogger.Debug("LiveQuery HeartBeat refresh"); - Stop(); + protected override void SendPing() { + try { + _ = connection.SendText("{}"); + } catch (Exception) { - pingCTS = new CancellationTokenSource(); - Task delayTask = Task.Delay(PING_INTERVAL, pingCTS.Token); - await delayTask; - if (delayTask.IsCanceled) { - return; } - // 发送 Ping 包 - LCLogger.Debug("Ping ~~~"); - _ = connection.SendText("{}"); - - // 等待 Pong - pongCTS = new CancellationTokenSource(); - Task timeoutTask = Task.Delay(PONG_INTERVAL, pongCTS.Token); - await timeoutTask; - if (timeoutTask.IsCanceled) { - return; - } - - // 超时 - LCLogger.Debug("Ping timeout"); - onTimeout?.Invoke(); - } - - internal void Stop() { - pingCTS?.Cancel(); - pongCTS?.Cancel(); } } } diff --git a/Realtime/Realtime/Internal/Connection/LCConnection.cs b/Realtime/Realtime/Internal/Connection/LCConnection.cs index c52a2d9..565d266 100644 --- a/Realtime/Realtime/Internal/Connection/LCConnection.cs +++ b/Realtime/Realtime/Internal/Connection/LCConnection.cs @@ -6,13 +6,12 @@ using Google.Protobuf; using LeanCloud.Realtime.Internal.Router; using LeanCloud.Realtime.Internal.WebSocket; using LeanCloud.Realtime.Internal.Protocol; -using LeanCloud.Storage; namespace LeanCloud.Realtime.Internal.Connection { /// /// 连接层,只与数据协议相关 /// - internal class LCConnection { + public class LCConnection { /// /// 发送超时 /// @@ -28,11 +27,6 @@ namespace LeanCloud.Realtime.Internal.Connection { /// private const int RECONNECT_INTERVAL = 10000; - /// - /// 心跳间隔 - /// - private const int HEART_BEAT_INTERVAL = 30000; - /// /// 子协议 /// @@ -71,7 +65,7 @@ namespace LeanCloud.Realtime.Internal.Connection { internal LCConnection(string id) { this.id = id; responses = new Dictionary>(); - heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL); + heartBeat = new LCHeartBeat(this, OnPingTimeout); router = new LCRTMRouter(); client = new LCWebSocketClient { OnMessage = OnClientMessage, @@ -90,6 +84,8 @@ namespace LeanCloud.Realtime.Internal.Connection { LCLogger.Debug($"Secondary Server"); await client.Connect(rtmServer.Secondary, SUB_PROTOCOL); } + // 启动心跳 + heartBeat.Start(); } catch (Exception e) { throw e; } @@ -100,17 +96,18 @@ namespace LeanCloud.Realtime.Internal.Connection { /// /// internal async Task Reset() { + heartBeat?.Stop(); // 关闭就连接 await client.Close(); // 重新创建连接组件 - heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL); + heartBeat = new LCHeartBeat(this, OnPingTimeout); router = new LCRTMRouter(); client = new LCWebSocketClient { OnMessage = OnClientMessage, OnClose = OnClientDisconnect }; await Reconnect(); - } + } /// /// 发送请求,会在收到应答后返回 @@ -158,7 +155,6 @@ namespace LeanCloud.Realtime.Internal.Connection { } private void OnClientMessage(byte[] bytes) { - _ = heartBeat.Refresh(OnPingTimeout); try { GenericCommand command = GenericCommand.Parser.ParseFrom(bytes); LCLogger.Debug($"{id} <= {FormatCommand(command)}"); @@ -182,8 +178,12 @@ namespace LeanCloud.Realtime.Internal.Connection { LCLogger.Error($"No request for {requestIndex}"); } } else { - // 通知 - OnNotification?.Invoke(command); + if (command.Cmd == CommandType.Echo) { + heartBeat.Pong(); + } else { + // 通知 + OnNotification?.Invoke(command); + } } } catch (Exception e) { LCLogger.Error(e); @@ -199,7 +199,9 @@ namespace LeanCloud.Realtime.Internal.Connection { private async void OnPingTimeout() { await client.Close(); - OnClientDisconnect(); + OnDisconnect?.Invoke(); + // 重连 + _ = Reconnect(); } private async Task Reconnect() { diff --git a/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs b/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs index 6e7853c..01629f3 100644 --- a/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs +++ b/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs @@ -6,75 +6,98 @@ using LeanCloud.Realtime.Internal.Protocol; namespace LeanCloud.Realtime.Internal.Connection { /// /// 心跳控制器,由于 .Net Standard 2.0 不支持发送 ping frame,所以需要发送逻辑心跳 - /// 1. 每次接收到消息后开始监听,如果在 pingInterval 时间内没有再次接收到消息,则发送 ping 请求; - /// 2. 发送后等待 pongInterval 时间,如果在此时间内接收到了任何消息,则取消并重新开始监听 1; - /// 3. 如果没收到消息,则认为超时并回调,连接层接收回调后放弃当前连接,以断线逻辑处理 + /// 1. 每隔 180s 发送 ping 包 + /// 2. 接收到 pong 包刷新上次 pong 时间 + /// 3. 每隔 180s 检测 pong 包间隔,超过 360s 则认为断开 /// - internal class LCHeartBeat { + public class LCHeartBeat { + private const int PING_INTERVAL = 5 * 1000; + private readonly LCConnection connection; - /// - /// ping 间隔 - /// - private readonly int pingInterval; + private readonly Action onTimeout; - /// - /// pong 间隔 - /// - private readonly int pongInterval; + private CancellationTokenSource heartBeatCTS; - private CancellationTokenSource pingCTS; - private CancellationTokenSource pongCTS; + private bool running = false; + + private DateTimeOffset lastPongTime; + + public LCHeartBeat(Action onTimeout) { + this.onTimeout = onTimeout; + } internal LCHeartBeat(LCConnection connection, - int pingInterval, - int pongInterval) { + Action onTimeout) : this(onTimeout) { this.connection = connection; - this.pingInterval = pingInterval; - this.pongInterval = pongInterval; } /// - /// 更新心跳监听 + /// 启动心跳 /// - /// - internal async Task Refresh(Action onTimeout) { - LCLogger.Debug("HeartBeat refresh"); - pingCTS?.Cancel(); - pongCTS?.Cancel(); + public void Start() { + running = true; + heartBeatCTS = new CancellationTokenSource(); + StartPing(); + StartPong(); + } - // 计时准备 ping - pingCTS = new CancellationTokenSource(); - Task delayTask = Task.Delay(pingInterval, pingCTS.Token); - await delayTask; - if (delayTask.IsCanceled) { - return; + private async void StartPing() { + while (running) { + try { + await Task.Delay(PING_INTERVAL, heartBeatCTS.Token); + } catch (TaskCanceledException) { + return; + } + LCLogger.Debug("Ping ~~~"); + SendPing(); } + } + + protected virtual void SendPing() { // 发送 ping 包 - LCLogger.Debug("Ping ~~~"); GenericCommand command = new GenericCommand { Cmd = CommandType.Echo, AppId = LCApplication.AppId, PeerId = connection.id }; - _ = connection.SendRequest(command); - pongCTS = new CancellationTokenSource(); - Task timeoutTask = Task.Delay(pongInterval, pongCTS.Token); - await timeoutTask; - if (timeoutTask.IsCanceled) { - return; + try { + _ = connection.SendCommand(command); + } catch (Exception e) { + LCLogger.Error(e.Message); } - // timeout - LCLogger.Error("Ping timeout"); - onTimeout.Invoke(); + } + + private async void StartPong() { + lastPongTime = DateTimeOffset.Now; + while (running) { + try { + await Task.Delay(PING_INTERVAL / 2, heartBeatCTS.Token); + } catch (TaskCanceledException) { + return; + } + // 检查 pong 包时间 + TimeSpan interval = DateTimeOffset.Now - lastPongTime; + if (interval.TotalMilliseconds > PING_INTERVAL * 2) { + // 断线 + Stop(); + onTimeout.Invoke(); + } + } + } + + public void Pong() { + LCLogger.Debug("Pong ~~~"); + // 刷新最近 pong 时间戳 + lastPongTime = DateTimeOffset.Now; } /// /// 停止心跳监听 /// - internal void Stop() { - pingCTS?.Cancel(); - pongCTS?.Cancel(); + public void Stop() { + running = false; + heartBeatCTS.Cancel(); } } }