From a11da59ec512b4edfd2543e3ceffe559169166e4 Mon Sep 17 00:00:00 2001 From: oneRain Date: Thu, 2 Apr 2020 18:15:16 +0800 Subject: [PATCH] * LCRTMRouter.cs: * LCRTMServer.cs: * LCWebSocketConnection.cs: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * LCIMClient.cs: chore: 基础断线重连功能 --- Realtime/Internal/Router/LCRTMRouter.cs | 4 +- Realtime/Internal/Router/LCRTMServer.cs | 2 +- .../WebSocket/LCWebSocketConnection.cs | 69 ++++++++++++++++--- Realtime/LCIMClient.cs | 9 ++- 4 files changed, 69 insertions(+), 15 deletions(-) diff --git a/Realtime/Internal/Router/LCRTMRouter.cs b/Realtime/Internal/Router/LCRTMRouter.cs index 35155d9..3c0eebe 100644 --- a/Realtime/Internal/Router/LCRTMRouter.cs +++ b/Realtime/Internal/Router/LCRTMRouter.cs @@ -12,11 +12,11 @@ namespace LeanCloud.Realtime.Internal.Router { internal LCRTMRouter() { } - internal async Task GetServer() { + internal async Task GetServer() { if (rtmServer == null || !rtmServer.IsValid) { await Fetch(); } - return rtmServer.Server; + return rtmServer; } internal void Reset() { diff --git a/Realtime/Internal/Router/LCRTMServer.cs b/Realtime/Internal/Router/LCRTMServer.cs index 1787ff4..45a5176 100644 --- a/Realtime/Internal/Router/LCRTMServer.cs +++ b/Realtime/Internal/Router/LCRTMServer.cs @@ -14,7 +14,7 @@ namespace LeanCloud.Realtime.Internal.Router { } [JsonProperty("server")] - internal string Server { + internal string Primary { get; set; } diff --git a/Realtime/Internal/WebSocket/LCWebSocketConnection.cs b/Realtime/Internal/WebSocket/LCWebSocketConnection.cs index 80ab1a9..b827c78 100644 --- a/Realtime/Internal/WebSocket/LCWebSocketConnection.cs +++ b/Realtime/Internal/WebSocket/LCWebSocketConnection.cs @@ -10,7 +10,7 @@ using Google.Protobuf; namespace LeanCloud.Realtime.Internal.WebSocket { internal class LCWebSocketConnection { - private const int KEEP_ALIVE_INTERVAL = 10; + private const int KEEP_ALIVE_INTERVAL = 1; // .net standard 2.0 好像在拼合 Frame 时有 bug,所以将这个值调整大一些 private const int RECV_BUFFER_SIZE = 1024 * 5; @@ -32,7 +32,11 @@ namespace LeanCloud.Realtime.Internal.WebSocket { get; set; } - internal Action OnClose { + internal Action OnDisconnect { + get; set; + } + + internal Func OnReconnect { get; set; } @@ -44,13 +48,42 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } internal async Task Connect() { - string rtmServer = await Router.GetServer(); + // TODO 可完善策略 + LCRTMServer rtmServer = await Router.GetServer(); + try { + LCLogger.Debug($"Connect Primary Server: {rtmServer.Primary}"); + await Connect(rtmServer.Primary); + LCLogger.Debug("Connected Primary Server"); + } catch (Exception e) { + LCLogger.Error(e.Message); + LCLogger.Debug($"Connect Secondary Server: {rtmServer.Secondary}"); + await Connect(rtmServer.Secondary); + LCLogger.Debug($"Connected Secondary Server"); + } + + // 接收 + _ = StartReceive(); + } + private async Task Connect(string server) { ws = new ClientWebSocket(); ws.Options.AddSubProtocol("lc.protobuf2.3"); ws.Options.KeepAliveInterval = TimeSpan.FromSeconds(KEEP_ALIVE_INTERVAL); - await ws.ConnectAsync(new Uri(rtmServer), default); - _ = StartReceive(); + await ws.ConnectAsync(new Uri(server), default); + } + + private async Task Reconnect() { + // TODO 重连策略 + while (true) { + try { + await Connect(); + break; + } catch (Exception e) { + LCLogger.Error(e.Message); + await Task.Delay(1000 * 10); + } + } + OnReconnect?.Invoke(); } internal Task SendRequest(GenericCommand request) { @@ -61,9 +94,9 @@ namespace LeanCloud.Realtime.Internal.WebSocket { ArraySegment bytes = new ArraySegment(request.ToByteArray()); try { ws.SendAsync(bytes, WebSocketMessageType.Binary, true, default); - } catch (Exception) { + } catch (Exception e) { // TODO 发送消息异常 - + LCLogger.Error(e.Message); } return tcs.Task; } @@ -81,7 +114,13 @@ namespace LeanCloud.Realtime.Internal.WebSocket { do { result = await ws.ReceiveAsync(new ArraySegment(buffer), default); if (result.MessageType == WebSocketMessageType.Close) { - OnClose?.Invoke(-1, null); + LCLogger.Debug($"Receive Closed: {result.CloseStatusDescription}"); + try { + await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "", default); + } catch (Exception ex) { + LCLogger.Error(ex.Message); + } + OnDisconnect?.Invoke(-1, null); return; } // 拼合 WebSocket Frame @@ -99,10 +138,18 @@ namespace LeanCloud.Realtime.Internal.WebSocket { LCLogger.Error(e.Message); } } - } catch (Exception e) { - // 连接断开 + } catch (WebSocketException e) { LCLogger.Error(e.Message); - await ws.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "read error", default); + LCLogger.Debug($"WebSocket State: {ws.State}"); + try { + ws.Abort(); + ws.Dispose(); + } catch (Exception ex) { + LCLogger.Error(ex.Message); + } finally { + // 触发重连 + await Reconnect(); + } } } diff --git a/Realtime/LCIMClient.cs b/Realtime/LCIMClient.cs index 3c394a7..8403fa5 100644 --- a/Realtime/LCIMClient.cs +++ b/Realtime/LCIMClient.cs @@ -224,7 +224,9 @@ namespace LeanCloud.Realtime { GoAwayController = new LCIMGoAwayController(this); Connection = new LCWebSocketConnection(Id) { - OnNotification = OnNotification + OnNotification = OnNotification, + OnDisconnect = OnDisconnect, + OnReconnect = OnReconnect }; } @@ -387,6 +389,11 @@ namespace LeanCloud.Realtime { } } + private async Task OnReconnect() { + // 打开 Session + await SessionController.Open(); + } + internal async Task GetOrQueryConversation(string convId) { if (ConversationDict.TryGetValue(convId, out LCIMConversation conversation)) { return conversation;