From 88f2b64eba0884c26eaa72bc82923c9c5d8496c9 Mon Sep 17 00:00:00 2001 From: oneRain Date: Mon, 13 Apr 2020 17:29:55 +0800 Subject: [PATCH] * LCLogger.cs: * LCIMClient.cs: * LCAppRouter.cs: * LCRTMRouter.cs: * LCHeartBeat.cs: * LCConnection.cs: * LCWebSocketClient.cs: * LCIMGoAwayController.cs: * LCIMConversationController.cs: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * LCHttpClient.cs: chore: 支持 goaway --- Common/AppRouter/LCAppRouter.cs | 2 +- Common/Log/LCLogger.cs | 11 +++ Realtime/Internal/Connection/LCConnection.cs | 68 +++++++++++------- Realtime/Internal/Connection/LCHeartBeat.cs | 9 +-- .../Controller/LCIMConversationController.cs | 2 +- .../Controller/LCIMGoAwayController.cs | 3 +- Realtime/Internal/Router/LCRTMRouter.cs | 21 +++++- .../Internal/WebSocket/LCWebSocketClient.cs | 69 +++++++++++-------- Realtime/LCIMClient.cs | 2 +- Storage/Internal/Http/LCHttpClient.cs | 2 +- 10 files changed, 122 insertions(+), 67 deletions(-) diff --git a/Common/AppRouter/LCAppRouter.cs b/Common/AppRouter/LCAppRouter.cs index 523b465..d7a0699 100644 --- a/Common/AppRouter/LCAppRouter.cs +++ b/Common/AppRouter/LCAppRouter.cs @@ -63,7 +63,7 @@ namespace LeanCloud.Common { Dictionary data = JsonConvert.DeserializeObject>(resultString); appServer = new LCAppServer(data); } catch (Exception e) { - LCLogger.Error(e.Message); + LCLogger.Error(e); // 拉取服务地址失败后,使用国际节点的默认服务地址 appServer = LCAppServer.GetInternalFallbackAppServer(appId); } diff --git a/Common/Log/LCLogger.cs b/Common/Log/LCLogger.cs index 63b917e..d54ec23 100644 --- a/Common/Log/LCLogger.cs +++ b/Common/Log/LCLogger.cs @@ -1,4 +1,5 @@ using System; +using System.Text; namespace LeanCloud.Common { /// @@ -36,5 +37,15 @@ namespace LeanCloud.Common { public static void Error(string format, params object[] args) { LogDelegate?.Invoke(LCLogLevel.Error, string.Format(format, args)); } + + public static void Error(Exception e) { + StringBuilder sb = new StringBuilder(); + sb.Append(e.GetType()); + sb.Append("\n"); + sb.Append(e.Message); + sb.Append("\n"); + sb.Append(e.StackTrace); + Error(sb.ToString()); + } } } diff --git a/Realtime/Internal/Connection/LCConnection.cs b/Realtime/Internal/Connection/LCConnection.cs index 6311b42..601d304 100644 --- a/Realtime/Internal/Connection/LCConnection.cs +++ b/Realtime/Internal/Connection/LCConnection.cs @@ -3,6 +3,7 @@ using System.Text; using System.Collections.Generic; using System.Threading.Tasks; using Google.Protobuf; +using LeanCloud.Realtime.Internal.Router; using LeanCloud.Realtime.Internal.WebSocket; using LeanCloud.Realtime.Protocol; using LeanCloud.Common; @@ -15,7 +16,11 @@ namespace LeanCloud.Realtime.Internal.Connection { internal class LCConnection { private const int SEND_TIMEOUT = 10000; - private const int MAX_RECONNECT_TIMES = 10; + private const int MAX_RECONNECT_TIMES = 3; + + private const int RECONNECT_INTERVAL = 5000; + + private const int HEART_BEAT_INTERVAL = 5000; internal Action OnNotification; @@ -25,24 +30,25 @@ namespace LeanCloud.Realtime.Internal.Connection { internal Action OnReconnected; - private LCHeartBeat heartBeat; - internal string id; private readonly Dictionary> responses; private int requestI = 1; + private LCRTMRouter router; + + private LCHeartBeat heartBeat; + private LCWebSocketClient client; internal LCConnection(string id) { this.id = id; responses = new Dictionary>(); - heartBeat = new LCHeartBeat(this, 10000, 10000, () => { - - }); - client = new LCWebSocketClient { - OnMessage = OnMessage, + heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL); + router = new LCRTMRouter(); + client = new LCWebSocketClient(router, heartBeat) { + OnMessage = OnClientMessage, OnDisconnect = OnClientDisconnect }; } @@ -51,6 +57,18 @@ namespace LeanCloud.Realtime.Internal.Connection { await client.Connect(); } + internal async Task Reset() { + router.Reset(); + await client.Close(); + heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL); + router = new LCRTMRouter(); + client = new LCWebSocketClient(router, heartBeat) { + OnMessage = OnClientMessage, + OnDisconnect = OnClientDisconnect + }; + await Reconnect(); + } + internal async Task SendRequest(GenericCommand request) { TaskCompletionSource tcs = new TaskCompletionSource(); request.I = requestI++; @@ -58,14 +76,14 @@ namespace LeanCloud.Realtime.Internal.Connection { LCLogger.Debug($"{id} => {FormatCommand(request)}"); byte[] bytes = request.ToByteArray(); Task sendTask = client.Send(bytes); - Task timeoutTask = Task.Delay(SEND_TIMEOUT); - try { - Task doneTask = await Task.WhenAny(sendTask, timeoutTask); - if (timeoutTask == doneTask) { - tcs.TrySetException(new TimeoutException("Send request")); + if (await Task.WhenAny(sendTask, Task.Delay(SEND_TIMEOUT)) == sendTask) { + try { + await sendTask; + } catch (Exception e) { + tcs.TrySetException(e); } - } catch (Exception e) { - tcs.TrySetException(e); + } else { + tcs.TrySetException(new TimeoutException("Send request")); } return await tcs.Task; } @@ -77,8 +95,7 @@ namespace LeanCloud.Realtime.Internal.Connection { await client.Close(); } - private void OnMessage(byte[] bytes) { - _ = heartBeat.Update(); + private void OnClientMessage(byte[] bytes) { try { GenericCommand command = GenericCommand.Parser.ParseFrom(bytes); LCLogger.Debug($"{id} <= {FormatCommand(command)}"); @@ -106,14 +123,14 @@ namespace LeanCloud.Realtime.Internal.Connection { OnNotification?.Invoke(command); } } catch (Exception e) { - LCLogger.Error(e.Message); + LCLogger.Error(e); } } private void OnClientDisconnect() { OnDisconnect?.Invoke(); OnReconnecting?.Invoke(); - // TODO 重连 + // 重连 _ = Reconnect(); } @@ -125,13 +142,14 @@ namespace LeanCloud.Realtime.Internal.Connection { try { LCLogger.Debug($"Reconnecting... {reconnectCount}"); await client.Connect(); + client.OnMessage = OnClientMessage; + client.OnDisconnect = OnClientDisconnect; break; } catch (Exception e) { reconnectCount++; - LCLogger.Error(e.Message); - int delay = 10; - LCLogger.Debug($"Reconnect after {delay}s"); - await Task.Delay(1000 * delay); + LCLogger.Error(e); + LCLogger.Debug($"Reconnect after {RECONNECT_INTERVAL}ms"); + await Task.Delay(RECONNECT_INTERVAL); } } if (reconnectCount < MAX_RECONNECT_TIMES) { @@ -140,8 +158,8 @@ namespace LeanCloud.Realtime.Internal.Connection { OnReconnected?.Invoke(); break; } else { - // TODO 重置连接 - client = new LCWebSocketClient(); + // 重置 Router,继续尝试重连 + router.Reset(); } } } diff --git a/Realtime/Internal/Connection/LCHeartBeat.cs b/Realtime/Internal/Connection/LCHeartBeat.cs index d2a3d0e..be26f5f 100644 --- a/Realtime/Internal/Connection/LCHeartBeat.cs +++ b/Realtime/Internal/Connection/LCHeartBeat.cs @@ -24,26 +24,22 @@ namespace LeanCloud.Realtime.Internal.Connection { /// private readonly int pongInterval; - private Action onTimeout; - private CancellationTokenSource pingCTS; private CancellationTokenSource pongCTS; internal LCHeartBeat(LCConnection connection, int pingInterval, - int pongInterval, - Action onTimeout) { + int pongInterval) { this.connection = connection; this.pingInterval = pingInterval; this.pongInterval = pongInterval; - this.onTimeout = onTimeout; } /// /// 更新心跳监听 /// /// - internal async Task Update() { + internal async Task Update(Action onTimeout) { LCLogger.Debug("HeartBeat update"); pingCTS?.Cancel(); pongCTS?.Cancel(); @@ -78,7 +74,6 @@ namespace LeanCloud.Realtime.Internal.Connection { /// 停止心跳监听 /// internal void Stop() { - onTimeout = null; pingCTS?.Cancel(); pongCTS?.Cancel(); } diff --git a/Realtime/Internal/Controller/LCIMConversationController.cs b/Realtime/Internal/Controller/LCIMConversationController.cs index 122bb53..cd3a355 100644 --- a/Realtime/Internal/Controller/LCIMConversationController.cs +++ b/Realtime/Internal/Controller/LCIMConversationController.cs @@ -451,7 +451,7 @@ namespace LeanCloud.Realtime.Internal.Controller { Data = where }; } catch (Exception e) { - LCLogger.Error(e.Message); + LCLogger.Error(e); } } command.ConvMessage = convMessage; diff --git a/Realtime/Internal/Controller/LCIMGoAwayController.cs b/Realtime/Internal/Controller/LCIMGoAwayController.cs index 3f6b669..81616f5 100644 --- a/Realtime/Internal/Controller/LCIMGoAwayController.cs +++ b/Realtime/Internal/Controller/LCIMGoAwayController.cs @@ -11,8 +11,7 @@ namespace LeanCloud.Realtime.Internal.Controller { internal override async Task OnNotification(GenericCommand notification) { // 清空缓存,断开连接,等待重新连接 - //Connection.Router.Reset(); - await Connection.Close(); + await Connection.Reset(); } #endregion diff --git a/Realtime/Internal/Router/LCRTMRouter.cs b/Realtime/Internal/Router/LCRTMRouter.cs index 3c0eebe..ad7d9f5 100644 --- a/Realtime/Internal/Router/LCRTMRouter.cs +++ b/Realtime/Internal/Router/LCRTMRouter.cs @@ -6,12 +6,21 @@ using LeanCloud.Common; using Newtonsoft.Json; namespace LeanCloud.Realtime.Internal.Router { + /// + /// RTM Router + /// internal class LCRTMRouter { + private const int REQUEST_TIMEOUT = 10000; + private LCRTMServer rtmServer; internal LCRTMRouter() { } + /// + /// 获取服务器地址 + /// + /// internal async Task GetServer() { if (rtmServer == null || !rtmServer.IsValid) { await Fetch(); @@ -19,6 +28,9 @@ namespace LeanCloud.Realtime.Internal.Router { return rtmServer; } + /// + /// 重置服务器地址缓存 + /// internal void Reset() { rtmServer = null; } @@ -33,9 +45,14 @@ namespace LeanCloud.Realtime.Internal.Router { }; HttpClient client = new HttpClient(); LCHttpUtils.PrintRequest(client, request); - HttpResponseMessage response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead); - request.Dispose(); + Task requestTask = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead); + if (await Task.WhenAny(requestTask, Task.Delay(REQUEST_TIMEOUT)) != requestTask) { + throw new TimeoutException("Request timeout."); + } + + HttpResponseMessage response = await requestTask; + request.Dispose(); string resultString = await response.Content.ReadAsStringAsync(); response.Dispose(); LCHttpUtils.PrintResponse(response, resultString); diff --git a/Realtime/Internal/WebSocket/LCWebSocketClient.cs b/Realtime/Internal/WebSocket/LCWebSocketClient.cs index c8f2055..c12bbe8 100644 --- a/Realtime/Internal/WebSocket/LCWebSocketClient.cs +++ b/Realtime/Internal/WebSocket/LCWebSocketClient.cs @@ -3,6 +3,7 @@ using System.Threading.Tasks; using System.Net.WebSockets; using LeanCloud.Common; using LeanCloud.Realtime.Internal.Router; +using LeanCloud.Realtime.Internal.Connection; namespace LeanCloud.Realtime.Internal.WebSocket { /// @@ -14,6 +15,8 @@ namespace LeanCloud.Realtime.Internal.WebSocket { private const int CLOSE_TIMEOUT = 5000; + private const int CONNECT_TIMEOUT = 10000; + internal Action OnMessage; internal Action OnDisconnect; @@ -24,28 +27,35 @@ namespace LeanCloud.Realtime.Internal.WebSocket { private readonly LCRTMRouter router; - internal LCWebSocketClient() { - router = new LCRTMRouter(); + private readonly LCHeartBeat heartBeat; + + internal LCWebSocketClient(LCRTMRouter router, LCHeartBeat heartBeat) { + this.router = router; + this.heartBeat = heartBeat; } internal async Task Connect() { - LCRTMServer rtmServer = await router.GetServer(); try { - LCLogger.Debug($"Primary Server"); - await Connect(rtmServer.Primary); + LCRTMServer rtmServer = await router.GetServer(); + try { + LCLogger.Debug($"Primary Server"); + await Connect(rtmServer.Primary); + } catch (Exception e) { + LCLogger.Error(e); + LCLogger.Debug($"Secondary Server"); + await Connect(rtmServer.Secondary); + } } catch (Exception e) { - LCLogger.Error(e.Message); - LCLogger.Debug($"Secondary Server"); - await Connect(rtmServer.Secondary); + throw e; } - + // 接收 _ = StartReceive(); } private async Task Connect(string server) { LCLogger.Debug($"Connecting WebSocket: {server}"); - Task timeoutTask = Task.Delay(5000); + Task timeoutTask = Task.Delay(CONNECT_TIMEOUT); ws = new ClientWebSocket(); ws.Options.AddSubProtocol("lc.protobuf2.3"); Task connectTask = ws.ConnectAsync(new Uri(server), default); @@ -61,14 +71,15 @@ namespace LeanCloud.Realtime.Internal.WebSocket { OnMessage = null; OnDisconnect = null; OnReconnect = null; + heartBeat.Stop(); try { // 发送关闭帧可能会很久,所以增加超时 // 主动挥手关闭,不会再收到 Close Frame - Task closeTask = ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "", default); + Task closeTask = ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default); Task delayTask = Task.Delay(CLOSE_TIMEOUT); await Task.WhenAny(closeTask, delayTask); } catch (Exception e) { - LCLogger.Error(e.Message); + LCLogger.Error(e); } finally { ws.Abort(); ws.Dispose(); @@ -82,7 +93,7 @@ namespace LeanCloud.Realtime.Internal.WebSocket { try { await ws.SendAsync(bytes, WebSocketMessageType.Binary, true, default); } catch (Exception e) { - LCLogger.Error(e.Message); + LCLogger.Error(e); throw e; } } else { @@ -99,16 +110,22 @@ namespace LeanCloud.Realtime.Internal.WebSocket { WebSocketReceiveResult result = await ws.ReceiveAsync(new ArraySegment(buffer), default); if (result.MessageType == WebSocketMessageType.Close) { // 由服务端发起关闭 - LCLogger.Debug($"Receive Closed: {result.CloseStatusDescription}"); - try { - // 挥手关闭 - await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default); - } catch (Exception ex) { - LCLogger.Error(ex.Message); - } finally { - HandleClose(); + LCLogger.Debug($"Receive Closed: {result.CloseStatus}"); + LCLogger.Debug($"ws state: {ws.State}"); + // 这里有可能是客户端主动关闭,也有可能是服务端主动关闭 + if (ws.State == WebSocketState.CloseReceived) { + // 如果是服务端主动关闭,则挥手关闭,并认为是断线 + try { + await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default); + } catch (Exception e) { + LCLogger.Error(e); + } finally { + // TODO 正常关闭不知道能否完成??? + OnDisconnect?.Invoke(); + } } } else if (result.MessageType == WebSocketMessageType.Binary) { + _ = heartBeat.Update(HandleClose); // 拼合 WebSocket Message int length = result.Count; byte[] data = new byte[length]; @@ -120,20 +137,18 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } } catch (Exception e) { // 客户端网络异常 - LCLogger.Error(e.Message); - LCLogger.Debug($"WebSocket State: {ws.State}"); - HandleClose(); + LCLogger.Error(e); + OnDisconnect?.Invoke(); } } private void HandleClose() { try { + heartBeat.Stop(); ws.Abort(); ws.Dispose(); } catch (Exception e) { - LCLogger.Error(e.Message); - } finally { - OnDisconnect?.Invoke(); + LCLogger.Error(e); } } } diff --git a/Realtime/LCIMClient.cs b/Realtime/LCIMClient.cs index 177b3ca..24c4056 100644 --- a/Realtime/LCIMClient.cs +++ b/Realtime/LCIMClient.cs @@ -455,7 +455,7 @@ namespace LeanCloud.Realtime { // 回调用户 OnReconnected?.Invoke(); } catch (Exception e) { - LCLogger.Error(e.Message); + LCLogger.Error(e); await Connection.Close(); OnReconnectError?.Invoke(); } diff --git a/Storage/Internal/Http/LCHttpClient.cs b/Storage/Internal/Http/LCHttpClient.cs index 9351f0d..e8bb135 100644 --- a/Storage/Internal/Http/LCHttpClient.cs +++ b/Storage/Internal/Http/LCHttpClient.cs @@ -165,7 +165,7 @@ namespace LeanCloud.Storage.Internal.Http { code = (int)error["code"]; message = error["error"].ToString(); } catch (Exception e) { - LCLogger.Error(e.Message); + LCLogger.Error(e); } return new LCException(code, message); }