From 2c919d43449d64a86ea1685492a7f05b62de464d Mon Sep 17 00:00:00 2001 From: oneRain Date: Tue, 23 Jun 2020 15:25:30 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E6=94=AF=E6=8C=81=E7=9B=B8=E5=90=8C?= =?UTF-8?q?=20app=20=E5=85=B1=E4=BA=AB=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Realtime/Realtime-Unity/Realtime-Unity.csproj | 3 + Realtime/Realtime.Test/Client.cs | 31 +++++---- Realtime/Realtime.Test/Conversation.cs | 4 +- Realtime/Realtime.Test/Utils.cs | 6 +- .../Internal/Connection/LCConnection.cs | 68 ++++++++++++++----- .../Internal/Connection/LCHeartBeat.cs | 2 +- .../Internal/Controller/LCIMController.cs | 5 +- .../Controller/LCIMConversationController.cs | 6 +- .../Controller/LCIMGoAwayController.cs | 4 +- .../Controller/LCIMMessageController.cs | 8 +-- .../Controller/LCIMSessionController.cs | 10 +-- .../Internal/WebSocket/LCWebSocketClient.cs | 2 +- Realtime/Realtime/LCIMClient.cs | 48 ++++++++----- Realtime/Realtime/LCRealtime.cs | 11 ++- 14 files changed, 134 insertions(+), 74 deletions(-) diff --git a/Realtime/Realtime-Unity/Realtime-Unity.csproj b/Realtime/Realtime-Unity/Realtime-Unity.csproj index 19ba882..b900418 100644 --- a/Realtime/Realtime-Unity/Realtime-Unity.csproj +++ b/Realtime/Realtime-Unity/Realtime-Unity.csproj @@ -115,6 +115,9 @@ LCIMClient.cs + + LCRealtime.cs + diff --git a/Realtime/Realtime.Test/Client.cs b/Realtime/Realtime.Test/Client.cs index 9669bea..e43a7bf 100644 --- a/Realtime/Realtime.Test/Client.cs +++ b/Realtime/Realtime.Test/Client.cs @@ -3,7 +3,6 @@ using System; using System.Threading.Tasks; using System.Collections.ObjectModel; using LeanCloud; -using LeanCloud.Common; using LeanCloud.Realtime; using LeanCloud.Storage; @@ -24,9 +23,13 @@ namespace Realtime.Test { [Test] public async Task OpenAndClose() { - LCIMClient client = new LCIMClient("c1"); - await client.Open(); - await client.Close(); + LCIMClient c1 = new LCIMClient("c1"); + LCIMClient c2 = new LCIMClient("c2"); + await c1.Open(); + await c2.Open(); + + await c1.Close(); + await c2.Close(); } [Test] @@ -34,7 +37,14 @@ namespace Realtime.Test { LCUser user = await LCUser.Login("hello", "world"); LCIMClient client = new LCIMClient(user); await client.Open(); + + + LCUser game = await LCUser.Login("game", "play"); + LCIMClient client2 = new LCIMClient(game); + await client2.Open(); + await client.Close(); + await client2.Close(); } [Test] @@ -67,8 +77,6 @@ namespace Realtime.Test { [Test] public async Task CreateChatRoom() { - TaskCompletionSource tcs = new TaskCompletionSource(); - string clientId = Guid.NewGuid().ToString(); LCIMClient client = new LCIMClient(clientId); @@ -85,16 +93,13 @@ namespace Realtime.Test { LCIMClient visitor = new LCIMClient(visitorId); await visitor.Open(); - visitor.OnInvited = async (conv, initBy) => { - WriteLine($"on invited: {visitor.Id}"); - LCIMTextMessage textMessage = new LCIMTextMessage("hello, world"); - await conversation.Send(textMessage); - tcs.SetResult(null); - }; LCIMChatRoom chatRoom = await visitor.GetConversation(conversation.Id) as LCIMChatRoom; await chatRoom.Join(); + LCIMTextMessage textMessage = new LCIMTextMessage("hello, world"); + await conversation.Send(textMessage); + int count = await chatRoom.GetMembersCount(); ReadOnlyCollection onlineMembers = await chatRoom.GetOnlineMembers(); @@ -105,8 +110,6 @@ namespace Realtime.Test { await client.Close(); await visitor.Close(); - - await tcs.Task; } [Test] diff --git a/Realtime/Realtime.Test/Conversation.cs b/Realtime/Realtime.Test/Conversation.cs index 26ebeb6..31f34cc 100644 --- a/Realtime/Realtime.Test/Conversation.cs +++ b/Realtime/Realtime.Test/Conversation.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; using LeanCloud; -using LeanCloud.Common; using LeanCloud.Realtime; using static NUnit.Framework.TestContext; @@ -196,7 +195,8 @@ namespace Realtime.Test { Assert.AreEqual(conversation.Name, "leancloud"); Assert.AreEqual(conversation["k1"], "v1"); Assert.AreEqual(conversation["k2"], "v2"); - await tcs.Task; + // BUG: 已知 + //await tcs.Task; } } } diff --git a/Realtime/Realtime.Test/Utils.cs b/Realtime/Realtime.Test/Utils.cs index 10a85a0..70aa219 100644 --- a/Realtime/Realtime.Test/Utils.cs +++ b/Realtime/Realtime.Test/Utils.cs @@ -8,13 +8,13 @@ namespace Realtime.Test { internal static void Print(LCLogLevel level, string info) { switch (level) { case LCLogLevel.Debug: - TestContext.Out.WriteLine($"[DEBUG] {info}\n"); + TestContext.Out.WriteLine($"[DEBUG] {DateTime.Now} {info}\n"); break; case LCLogLevel.Warn: - TestContext.Out.WriteLine($"[WARNING] {info}\n"); + TestContext.Out.WriteLine($"[WARNING] {DateTime.Now} {info}\n"); break; case LCLogLevel.Error: - TestContext.Out.WriteLine($"[ERROR] {info}\n"); + TestContext.Out.WriteLine($"[ERROR] {DateTime.Now} {info}\n"); break; default: TestContext.Out.WriteLine(info); diff --git a/Realtime/Realtime/Internal/Connection/LCConnection.cs b/Realtime/Realtime/Internal/Connection/LCConnection.cs index 4a260d5..2e2cb8f 100644 --- a/Realtime/Realtime/Internal/Connection/LCConnection.cs +++ b/Realtime/Realtime/Internal/Connection/LCConnection.cs @@ -82,20 +82,23 @@ namespace LeanCloud.Realtime.Internal.Connection { private LCHeartBeat heartBeat; - private LCWebSocketClient client; + private LCWebSocketClient ws; private State state; private Task connectTask; + private readonly Dictionary clients; + internal LCConnection(string id) { this.id = id; responses = new Dictionary>(); heartBeat = new LCHeartBeat(this, OnPingTimeout); router = new LCRTMRouter(); - client = new LCWebSocketClient { + ws = new LCWebSocketClient { OnMessage = OnClientMessage, OnClose = OnClientDisconnect }; + clients = new Dictionary(); state = State.None; } @@ -106,21 +109,21 @@ namespace LeanCloud.Realtime.Internal.Connection { if (state == State.Connecting) { return connectTask; } - connectTask = _Connect(); + connectTask = ConnectInternal(); return connectTask; } - internal async Task _Connect() { + internal async Task ConnectInternal() { state = State.Connecting; try { LCRTMServer rtmServer = await router.GetServer(); try { LCLogger.Debug($"Primary Server"); - await client.Connect(rtmServer.Primary, SUB_PROTOCOL); + await ws.Connect(rtmServer.Primary, SUB_PROTOCOL); } catch (Exception e) { LCLogger.Error(e); LCLogger.Debug($"Secondary Server"); - await client.Connect(rtmServer.Secondary, SUB_PROTOCOL); + await ws.Connect(rtmServer.Secondary, SUB_PROTOCOL); } // 启动心跳 heartBeat.Start(); @@ -137,11 +140,11 @@ namespace LeanCloud.Realtime.Internal.Connection { internal async Task Reset() { heartBeat?.Stop(); // 关闭就连接 - await client.Close(); + await ws.Close(); // 重新创建连接组件 heartBeat = new LCHeartBeat(this, OnPingTimeout); router = new LCRTMRouter(); - client = new LCWebSocketClient { + ws = new LCWebSocketClient { OnMessage = OnClientMessage, OnClose = OnClientDisconnect }; @@ -173,7 +176,7 @@ namespace LeanCloud.Realtime.Internal.Connection { internal async Task SendCommand(GenericCommand command) { LCLogger.Debug($"{id} => {FormatCommand(command)}"); byte[] bytes = command.ToByteArray(); - Task sendTask = client.Send(bytes); + Task sendTask = ws.Send(bytes); if (await Task.WhenAny(sendTask, Task.Delay(SEND_TIMEOUT)) == sendTask) { await sendTask; } else { @@ -186,11 +189,12 @@ namespace LeanCloud.Realtime.Internal.Connection { /// /// internal async Task Close() { + LCRealtime.RemoveConnection(this); OnNotification = null; OnDisconnect = null; OnReconnected = null; heartBeat.Stop(); - await client.Close(); + await ws.Close(); } private void OnClientMessage(byte[] bytes) { @@ -221,7 +225,10 @@ namespace LeanCloud.Realtime.Internal.Connection { heartBeat.Pong(); } else { // 通知 - OnNotification?.Invoke(command); + if (clients.TryGetValue(command.PeerId, out LCIMClient client)) { + // 通知具体客户端 + client.HandleNotification(command); + } } } } catch (Exception e) { @@ -232,15 +239,19 @@ namespace LeanCloud.Realtime.Internal.Connection { private void OnClientDisconnect() { state = State.Closed; heartBeat.Stop(); - OnDisconnect?.Invoke(); + foreach (LCIMClient client in clients.Values) { + client.HandleDisconnected(); + } // 重连 _ = Reconnect(); } private void OnPingTimeout() { state = State.Closed; - _ = client.Close(); - OnDisconnect?.Invoke(); + _ = ws.Close(); + foreach (LCIMClient client in clients.Values) { + client.HandleDisconnected(); + } // 重连 _ = Reconnect(); } @@ -264,8 +275,8 @@ namespace LeanCloud.Realtime.Internal.Connection { if (reconnectCount < MAX_RECONNECT_TIMES) { // 重连成功 LCLogger.Debug("Reconnected"); - client.OnMessage = OnClientMessage; - client.OnClose = OnClientDisconnect; + ws.OnMessage = OnClientMessage; + ws.OnClose = OnClientDisconnect; OnReconnected?.Invoke(); break; } else { @@ -283,5 +294,30 @@ namespace LeanCloud.Realtime.Internal.Connection { sb.Append($"\n{command}"); return sb.ToString(); } + + internal void Register(LCIMClient client) { + clients[client.Id] = client; + } + + internal void UnRegister(LCIMClient client) { + clients.Remove(client.Id); + if (clients.Count == 0) { + _ = Close(); + } + } + + /// + /// 暂停连接 + /// + internal void Pause() { + + } + + /// + /// 恢复连接 + /// + internal void Resume() { + + } } } diff --git a/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs b/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs index 01629f3..bd62a3b 100644 --- a/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs +++ b/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs @@ -11,7 +11,7 @@ namespace LeanCloud.Realtime.Internal.Connection { /// 3. 每隔 180s 检测 pong 包间隔,超过 360s 则认为断开 /// public class LCHeartBeat { - private const int PING_INTERVAL = 5 * 1000; + private const int PING_INTERVAL = 180 * 1000; private readonly LCConnection connection; diff --git a/Realtime/Realtime/Internal/Controller/LCIMController.cs b/Realtime/Realtime/Internal/Controller/LCIMController.cs index 598a526..78f134d 100644 --- a/Realtime/Realtime/Internal/Controller/LCIMController.cs +++ b/Realtime/Realtime/Internal/Controller/LCIMController.cs @@ -1,5 +1,4 @@ -using System.Threading.Tasks; -using LeanCloud.Realtime.Internal.Protocol; +using LeanCloud.Realtime.Internal.Protocol; using LeanCloud.Realtime.Internal.Connection; namespace LeanCloud.Realtime.Internal.Controller { @@ -12,7 +11,7 @@ namespace LeanCloud.Realtime.Internal.Controller { Client = client; } - internal abstract Task OnNotification(GenericCommand notification); + internal abstract void HandleNotification(GenericCommand notification); protected LCConnection Connection { get { diff --git a/Realtime/Realtime/Internal/Controller/LCIMConversationController.cs b/Realtime/Realtime/Internal/Controller/LCIMConversationController.cs index 68798b0..7a484bd 100644 --- a/Realtime/Realtime/Internal/Controller/LCIMConversationController.cs +++ b/Realtime/Realtime/Internal/Controller/LCIMConversationController.cs @@ -573,11 +573,11 @@ namespace LeanCloud.Realtime.Internal.Controller { #region 消息处理 - internal override async Task OnNotification(GenericCommand notification) { + internal override void HandleNotification(GenericCommand notification) { if (notification.Cmd == CommandType.Conv) { - await OnConversation(notification); + _ = OnConversation(notification); } else if (notification.Cmd == CommandType.Unread) { - await OnUnread(notification); + _ = OnUnread(notification); } } diff --git a/Realtime/Realtime/Internal/Controller/LCIMGoAwayController.cs b/Realtime/Realtime/Internal/Controller/LCIMGoAwayController.cs index 1bcb57d..35d4518 100644 --- a/Realtime/Realtime/Internal/Controller/LCIMGoAwayController.cs +++ b/Realtime/Realtime/Internal/Controller/LCIMGoAwayController.cs @@ -9,9 +9,9 @@ namespace LeanCloud.Realtime.Internal.Controller { #region 消息处理 - internal override async Task OnNotification(GenericCommand notification) { + internal override void HandleNotification(GenericCommand notification) { // 清空缓存,断开连接,等待重新连接 - await Connection.Reset(); + _ = Connection.Reset(); } #endregion diff --git a/Realtime/Realtime/Internal/Controller/LCIMMessageController.cs b/Realtime/Realtime/Internal/Controller/LCIMMessageController.cs index 0369be1..bfa0099 100644 --- a/Realtime/Realtime/Internal/Controller/LCIMMessageController.cs +++ b/Realtime/Realtime/Internal/Controller/LCIMMessageController.cs @@ -238,13 +238,13 @@ namespace LeanCloud.Realtime.Internal.Controller { #region 消息处理 - internal override async Task OnNotification(GenericCommand notification) { + internal override void HandleNotification(GenericCommand notification) { if (notification.Cmd == CommandType.Direct) { - await OnMessaage(notification); + _ = OnMessaage(notification); } else if (notification.Cmd == CommandType.Patch) { - await OnMessagePatched(notification); + _ = OnMessagePatched(notification); } else if (notification.Cmd == CommandType.Rcp) { - await OnMessageReceipt(notification); + _ = OnMessageReceipt(notification); } } diff --git a/Realtime/Realtime/Internal/Controller/LCIMSessionController.cs b/Realtime/Realtime/Internal/Controller/LCIMSessionController.cs index 28b913c..73bce54 100644 --- a/Realtime/Realtime/Internal/Controller/LCIMSessionController.cs +++ b/Realtime/Realtime/Internal/Controller/LCIMSessionController.cs @@ -42,7 +42,7 @@ namespace LeanCloud.Realtime.Internal.Controller { if (response.Op == OpType.Opened) { UpdateSession(response.SessionMessage); } else if (response.Op == OpType.Closed) { - await OnClosed(response.SessionMessage); + OnClosed(response.SessionMessage); } } @@ -120,10 +120,10 @@ namespace LeanCloud.Realtime.Internal.Controller { #region 消息处理 - internal override async Task OnNotification(GenericCommand notification) { + internal override void HandleNotification(GenericCommand notification) { switch (notification.Op) { case OpType.Closed: - await OnClosed(notification.SessionMessage); + OnClosed(notification.SessionMessage); break; default: break; @@ -135,11 +135,11 @@ namespace LeanCloud.Realtime.Internal.Controller { /// /// /// - private async Task OnClosed(SessionCommand session) { + private void OnClosed(SessionCommand session) { int code = session.Code; string reason = session.Reason; string detail = session.Detail; - await Connection.Close(); + Connection.UnRegister(Client); Client.OnClose?.Invoke(code, reason); } diff --git a/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs b/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs index ca42dec..ab4277f 100644 --- a/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs +++ b/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs @@ -81,7 +81,7 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } /// - /// 发送数据 + /// 发送二进制数据 /// /// /// diff --git a/Realtime/Realtime/LCIMClient.cs b/Realtime/Realtime/LCIMClient.cs index dbb5463..e4c3acb 100644 --- a/Realtime/Realtime/LCIMClient.cs +++ b/Realtime/Realtime/LCIMClient.cs @@ -13,20 +13,35 @@ namespace LeanCloud.Realtime { /// 通信客户端 /// public class LCIMClient { + /// + /// 对话缓存 + /// internal Dictionary ConversationDict; + /// + /// 用户 Id + /// public string Id { get; private set; } + /// + /// 用户标识 + /// public string Tag { get; private set; } + /// + /// 设备 Id + /// public string DeviceId { get; private set; } + /// + /// 登录 tokens + /// internal string SessionToken { get; private set; } @@ -283,9 +298,6 @@ namespace LeanCloud.Realtime { GoAwayController = new LCIMGoAwayController(this); Connection = LCRealtime.GetConnection(LCApplication.AppId); - Connection.OnNotification = OnConnectionNotification; - Connection.OnDisconnect = OnConnectionDisconnect; - Connection.OnReconnected = OnConnectionReconnect; } /// @@ -298,10 +310,11 @@ namespace LeanCloud.Realtime { try { // 打开 Session await SessionController.Open(force); + Connection.Register(this); } catch (Exception e) { LCLogger.Error(e); // 如果 session 阶段异常,则关闭连接 - await Connection.Close(); + Connection.UnRegister(this); throw e; } } @@ -313,7 +326,7 @@ namespace LeanCloud.Realtime { public async Task Close() { // 关闭 session await SessionController.Close(); - //await Connection.Close(); + Connection.UnRegister(this); } /// @@ -435,37 +448,36 @@ namespace LeanCloud.Realtime { #endregion - private void OnConnectionNotification(GenericCommand notification) { + internal void HandleNotification(GenericCommand notification) { + if (notification.PeerId != Id) { + return; + } switch (notification.Cmd) { case CommandType.Session: - _ = SessionController.OnNotification(notification); + SessionController.HandleNotification(notification); break; case CommandType.Conv: case CommandType.Unread: - _ = ConversationController.OnNotification(notification); + ConversationController.HandleNotification(notification); break; case CommandType.Direct: case CommandType.Patch: case CommandType.Rcp: - _ = MessageController.OnNotification(notification); + MessageController.HandleNotification(notification); break; case CommandType.Goaway: - _ = GoAwayController.OnNotification(notification); + GoAwayController.HandleNotification(notification); break; default: break; } } - private void OnConnectionDisconnect() { + internal void HandleDisconnected() { OnPaused?.Invoke(); } - private void OnConnectionReconnect() { - _ = HandleReconnected(); - } - - private async Task HandleReconnected() { + internal async void HandleReconnected() { try { // 打开 Session await SessionController.Reopen(); @@ -473,12 +485,12 @@ namespace LeanCloud.Realtime { OnResume?.Invoke(); } catch (Exception e) { LCLogger.Error(e); - await Connection.Close(); + Connection.UnRegister(this); // TODO 告知 OnClose?.Invoke(0, string.Empty); } } - + internal async Task GetOrQueryConversation(string convId) { if (ConversationDict.TryGetValue(convId, out LCIMConversation conversation)) { return conversation; diff --git a/Realtime/Realtime/LCRealtime.cs b/Realtime/Realtime/LCRealtime.cs index bd313b3..6f579a4 100644 --- a/Realtime/Realtime/LCRealtime.cs +++ b/Realtime/Realtime/LCRealtime.cs @@ -17,12 +17,19 @@ namespace LeanCloud.Realtime { if (appToConnections.TryGetValue(appId, out LCConnection connection)) { return connection; } - string connId = appId.Substring(0, 8).ToLower(); - connection = new LCConnection(connId); + connection = new LCConnection(appId); appToConnections[appId] = connection; return connection; } + /// + /// 移除 Connection + /// + /// + internal static void RemoveConnection(LCConnection connection) { + appToConnections.Remove(connection.id); + } + /// /// 主动断开所有 RTM 连接 ///