From 1447070c8f111fd7049ac4d76731068bbbcb1b08 Mon Sep 17 00:00:00 2001 From: oneRain Date: Wed, 29 Apr 2020 17:57:30 +0800 Subject: [PATCH] * LCHeartBeat.cs: * LCConnection.cs: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * LCWebSocketClient.cs: chore: 重构连接相关结构 --- Realtime/Internal/Connection/LCConnection.cs | 35 ++++++++++----- Realtime/Internal/Connection/LCHeartBeat.cs | 2 +- .../Internal/WebSocket/LCWebSocketClient.cs | 44 +++---------------- 3 files changed, 29 insertions(+), 52 deletions(-) diff --git a/Realtime/Internal/Connection/LCConnection.cs b/Realtime/Internal/Connection/LCConnection.cs index 60dd8a0..8b11292 100644 --- a/Realtime/Internal/Connection/LCConnection.cs +++ b/Realtime/Internal/Connection/LCConnection.cs @@ -43,11 +43,6 @@ namespace LeanCloud.Realtime.Internal.Connection { /// internal Action OnDisconnect; - /// - /// 开始重连事件 - /// - internal Action OnReconnecting; - /// /// 重连成功事件 /// @@ -73,14 +68,26 @@ namespace LeanCloud.Realtime.Internal.Connection { responses = new Dictionary>(); heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL); router = new LCRTMRouter(); - client = new LCWebSocketClient(router, heartBeat) { + client = new LCWebSocketClient { OnMessage = OnClientMessage, OnClose = OnClientDisconnect }; } internal async Task Connect() { - await client.Connect(); + try { + LCRTMServer rtmServer = await router.GetServer(); + try { + LCLogger.Debug($"Primary Server"); + await client.Connect(rtmServer.Primary); + } catch (Exception e) { + LCLogger.Error(e); + LCLogger.Debug($"Secondary Server"); + await client.Connect(rtmServer.Secondary); + } + } catch (Exception e) { + throw e; + } } /// @@ -93,7 +100,7 @@ namespace LeanCloud.Realtime.Internal.Connection { // 重新创建连接组件 heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL); router = new LCRTMRouter(); - client = new LCWebSocketClient(router, heartBeat) { + client = new LCWebSocketClient { OnMessage = OnClientMessage, OnClose = OnClientDisconnect }; @@ -140,12 +147,12 @@ namespace LeanCloud.Realtime.Internal.Connection { internal async Task Close() { OnNotification = null; OnDisconnect = null; - OnReconnecting = null; OnReconnected = null; await client.Close(); } private void OnClientMessage(byte[] bytes) { + _ = heartBeat.Refresh(OnPingTimeout); try { GenericCommand command = GenericCommand.Parser.ParseFrom(bytes); LCLogger.Debug($"{id} <= {FormatCommand(command)}"); @@ -178,21 +185,25 @@ namespace LeanCloud.Realtime.Internal.Connection { } private void OnClientDisconnect() { + heartBeat.Stop(); OnDisconnect?.Invoke(); - OnReconnecting?.Invoke(); // 重连 _ = Reconnect(); } + private async void OnPingTimeout() { + await client.Close(); + OnClientDisconnect(); + } + private async Task Reconnect() { - OnReconnecting?.Invoke(); while (true) { int reconnectCount = 0; // 重连策略 while (reconnectCount < MAX_RECONNECT_TIMES) { try { LCLogger.Debug($"Reconnecting... {reconnectCount}"); - await client.Connect(); + await Connect(); break; } catch (Exception e) { reconnectCount++; diff --git a/Realtime/Internal/Connection/LCHeartBeat.cs b/Realtime/Internal/Connection/LCHeartBeat.cs index dde94ba..8de4230 100644 --- a/Realtime/Internal/Connection/LCHeartBeat.cs +++ b/Realtime/Internal/Connection/LCHeartBeat.cs @@ -38,7 +38,7 @@ namespace LeanCloud.Realtime.Internal.Connection { /// 更新心跳监听 /// /// - internal async Task Update(Action onTimeout) { + internal async Task Refresh(Action onTimeout) { LCLogger.Debug("HeartBeat update"); pingCTS?.Cancel(); pongCTS?.Cancel(); diff --git a/Realtime/Internal/WebSocket/LCWebSocketClient.cs b/Realtime/Internal/WebSocket/LCWebSocketClient.cs index f0b5548..09d599e 100644 --- a/Realtime/Internal/WebSocket/LCWebSocketClient.cs +++ b/Realtime/Internal/WebSocket/LCWebSocketClient.cs @@ -1,8 +1,6 @@ using System; using System.Threading.Tasks; using System.Net.WebSockets; -using LeanCloud.Realtime.Internal.Router; -using LeanCloud.Realtime.Internal.Connection; namespace LeanCloud.Realtime.Internal.WebSocket { /// @@ -34,44 +32,12 @@ namespace LeanCloud.Realtime.Internal.WebSocket { private ClientWebSocket ws; - private readonly LCRTMRouter router; - - private readonly LCHeartBeat heartBeat; - - internal LCWebSocketClient(LCRTMRouter router, LCHeartBeat heartBeat) { - this.router = router; - this.heartBeat = heartBeat; - } - - /// - /// 连接 - /// - /// - internal async Task Connect() { - try { - LCRTMServer rtmServer = await router.GetServer(); - try { - LCLogger.Debug($"Primary Server"); - await Connect(rtmServer.Primary); - } catch (Exception e) { - LCLogger.Error(e); - LCLogger.Debug($"Secondary Server"); - await Connect(rtmServer.Secondary); - } - } catch (Exception e) { - throw e; - } - - // 接收 - _ = StartReceive(); - } - /// /// 连接指定 ws 服务器 /// /// /// - private async Task Connect(string server) { + internal async Task Connect(string server) { LCLogger.Debug($"Connecting WebSocket: {server}"); Task timeoutTask = Task.Delay(CONNECT_TIMEOUT); ws = new ClientWebSocket(); @@ -79,6 +45,9 @@ namespace LeanCloud.Realtime.Internal.WebSocket { Task connectTask = ws.ConnectAsync(new Uri(server), default); if (await Task.WhenAny(connectTask, timeoutTask) == connectTask) { LCLogger.Debug($"Connected WebSocket: {server}"); + await connectTask; + // 接收 + _ = StartReceive(); } else { throw new TimeoutException("Connect timeout"); } @@ -92,7 +61,6 @@ namespace LeanCloud.Realtime.Internal.WebSocket { LCLogger.Debug("Closing WebSocket"); OnMessage = null; OnClose = null; - heartBeat.Stop(); try { // 发送关闭帧可能会很久,所以增加超时 // 主动挥手关闭,不会再收到 Close Frame @@ -152,7 +120,6 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } } } else if (result.MessageType == WebSocketMessageType.Binary) { - _ = heartBeat.Update(HandleExceptionClose); // 拼合 WebSocket Message int length = result.Count; byte[] data = new byte[length]; @@ -165,13 +132,12 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } catch (Exception e) { // 客户端网络异常 LCLogger.Error(e); - OnClose?.Invoke(); + HandleExceptionClose(); } } private void HandleExceptionClose() { try { - heartBeat.Stop(); ws.Abort(); ws.Dispose(); } catch (Exception e) {