diff --git a/Realtime/Realtime-Unity/Realtime-Unity.csproj b/Realtime/Realtime-Unity/Realtime-Unity.csproj index 19ba882..7e1a4a8 100644 --- a/Realtime/Realtime-Unity/Realtime-Unity.csproj +++ b/Realtime/Realtime-Unity/Realtime-Unity.csproj @@ -52,9 +52,6 @@ Internal\Controller\LCIMConversationController.cs - - Internal\Controller\LCIMGoAwayController.cs - Internal\Router\LCRTMServer.cs @@ -115,6 +112,9 @@ LCIMClient.cs + + LCRealtime.cs + diff --git a/Realtime/Realtime.Test/Client.cs b/Realtime/Realtime.Test/Client.cs index 9669bea..e43a7bf 100644 --- a/Realtime/Realtime.Test/Client.cs +++ b/Realtime/Realtime.Test/Client.cs @@ -3,7 +3,6 @@ using System; using System.Threading.Tasks; using System.Collections.ObjectModel; using LeanCloud; -using LeanCloud.Common; using LeanCloud.Realtime; using LeanCloud.Storage; @@ -24,9 +23,13 @@ namespace Realtime.Test { [Test] public async Task OpenAndClose() { - LCIMClient client = new LCIMClient("c1"); - await client.Open(); - await client.Close(); + LCIMClient c1 = new LCIMClient("c1"); + LCIMClient c2 = new LCIMClient("c2"); + await c1.Open(); + await c2.Open(); + + await c1.Close(); + await c2.Close(); } [Test] @@ -34,7 +37,14 @@ namespace Realtime.Test { LCUser user = await LCUser.Login("hello", "world"); LCIMClient client = new LCIMClient(user); await client.Open(); + + + LCUser game = await LCUser.Login("game", "play"); + LCIMClient client2 = new LCIMClient(game); + await client2.Open(); + await client.Close(); + await client2.Close(); } [Test] @@ -67,8 +77,6 @@ namespace Realtime.Test { [Test] public async Task CreateChatRoom() { - TaskCompletionSource tcs = new TaskCompletionSource(); - string clientId = Guid.NewGuid().ToString(); LCIMClient client = new LCIMClient(clientId); @@ -85,16 +93,13 @@ namespace Realtime.Test { LCIMClient visitor = new LCIMClient(visitorId); await visitor.Open(); - visitor.OnInvited = async (conv, initBy) => { - WriteLine($"on invited: {visitor.Id}"); - LCIMTextMessage textMessage = new LCIMTextMessage("hello, world"); - await conversation.Send(textMessage); - tcs.SetResult(null); - }; LCIMChatRoom chatRoom = await visitor.GetConversation(conversation.Id) as LCIMChatRoom; await chatRoom.Join(); + LCIMTextMessage textMessage = new LCIMTextMessage("hello, world"); + await conversation.Send(textMessage); + int count = await chatRoom.GetMembersCount(); ReadOnlyCollection onlineMembers = await chatRoom.GetOnlineMembers(); @@ -105,8 +110,6 @@ namespace Realtime.Test { await client.Close(); await visitor.Close(); - - await tcs.Task; } [Test] diff --git a/Realtime/Realtime.Test/Conversation.cs b/Realtime/Realtime.Test/Conversation.cs index 26ebeb6..31f34cc 100644 --- a/Realtime/Realtime.Test/Conversation.cs +++ b/Realtime/Realtime.Test/Conversation.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; using LeanCloud; -using LeanCloud.Common; using LeanCloud.Realtime; using static NUnit.Framework.TestContext; @@ -196,7 +195,8 @@ namespace Realtime.Test { Assert.AreEqual(conversation.Name, "leancloud"); Assert.AreEqual(conversation["k1"], "v1"); Assert.AreEqual(conversation["k2"], "v2"); - await tcs.Task; + // BUG: 已知 + //await tcs.Task; } } } diff --git a/Realtime/Realtime.Test/Utils.cs b/Realtime/Realtime.Test/Utils.cs index 10a85a0..70aa219 100644 --- a/Realtime/Realtime.Test/Utils.cs +++ b/Realtime/Realtime.Test/Utils.cs @@ -8,13 +8,13 @@ namespace Realtime.Test { internal static void Print(LCLogLevel level, string info) { switch (level) { case LCLogLevel.Debug: - TestContext.Out.WriteLine($"[DEBUG] {info}\n"); + TestContext.Out.WriteLine($"[DEBUG] {DateTime.Now} {info}\n"); break; case LCLogLevel.Warn: - TestContext.Out.WriteLine($"[WARNING] {info}\n"); + TestContext.Out.WriteLine($"[WARNING] {DateTime.Now} {info}\n"); break; case LCLogLevel.Error: - TestContext.Out.WriteLine($"[ERROR] {info}\n"); + TestContext.Out.WriteLine($"[ERROR] {DateTime.Now} {info}\n"); break; default: TestContext.Out.WriteLine(info); diff --git a/Realtime/Realtime/Internal/Connection/LCConnection.cs b/Realtime/Realtime/Internal/Connection/LCConnection.cs index 565d266..3b797ac 100644 --- a/Realtime/Realtime/Internal/Connection/LCConnection.cs +++ b/Realtime/Realtime/Internal/Connection/LCConnection.cs @@ -12,6 +12,28 @@ namespace LeanCloud.Realtime.Internal.Connection { /// 连接层,只与数据协议相关 /// public class LCConnection { + /// + /// 连接状态 + /// + enum State { + /// + /// 初始状态 + /// + None, + /// + /// 连接中 + /// + Connecting, + /// + /// 连接成功 + /// + Open, + /// + /// 关闭的 + /// + Closed, + } + /// /// 发送超时 /// @@ -32,21 +54,6 @@ namespace LeanCloud.Realtime.Internal.Connection { /// private const string SUB_PROTOCOL = "lc.protobuf2.3"; - /// - /// 通知事件 - /// - internal Action OnNotification; - - /// - /// 断线事件 - /// - internal Action OnDisconnect; - - /// - /// 重连成功事件 - /// - internal Action OnReconnected; - internal string id; /// @@ -60,55 +67,60 @@ namespace LeanCloud.Realtime.Internal.Connection { private LCHeartBeat heartBeat; - private LCWebSocketClient client; + private LCWebSocketClient ws; + + private State state; + // 可以在 connecting 状态时拿到 Task,并在重连成功后继续操作 + private Task connectTask; + + // 共享这条连接的 IM Client + private readonly Dictionary idToClients; internal LCConnection(string id) { this.id = id; responses = new Dictionary>(); - heartBeat = new LCHeartBeat(this, OnPingTimeout); + heartBeat = new LCHeartBeat(this, OnDisconnect); router = new LCRTMRouter(); - client = new LCWebSocketClient { - OnMessage = OnClientMessage, - OnClose = OnClientDisconnect + ws = new LCWebSocketClient { + OnMessage = OnMessage, + OnClose = OnDisconnect }; + idToClients = new Dictionary(); + state = State.None; } - internal async Task Connect() { + internal Task Connect() { + if (state == State.Open) { + return Task.FromResult(null); + } + if (state == State.Connecting) { + return connectTask; + } + connectTask = ConnectInternal(); + return connectTask; + } + + internal async Task ConnectInternal() { + state = State.Connecting; try { LCRTMServer rtmServer = await router.GetServer(); try { LCLogger.Debug($"Primary Server"); - await client.Connect(rtmServer.Primary, SUB_PROTOCOL); + await ws.Connect(rtmServer.Primary, SUB_PROTOCOL); } catch (Exception e) { LCLogger.Error(e); LCLogger.Debug($"Secondary Server"); - await client.Connect(rtmServer.Secondary, SUB_PROTOCOL); + await ws.Connect(rtmServer.Secondary, SUB_PROTOCOL); } // 启动心跳 heartBeat.Start(); + state = State.Open; } catch (Exception e) { + state = State.Closed; throw e; } } - /// - /// 重置连接 - /// - /// - internal async Task Reset() { - heartBeat?.Stop(); - // 关闭就连接 - await client.Close(); - // 重新创建连接组件 - heartBeat = new LCHeartBeat(this, OnPingTimeout); - router = new LCRTMRouter(); - client = new LCWebSocketClient { - OnMessage = OnClientMessage, - OnClose = OnClientDisconnect - }; - await Reconnect(); - } - /// /// 发送请求,会在收到应答后返回 /// @@ -134,7 +146,7 @@ namespace LeanCloud.Realtime.Internal.Connection { internal async Task SendCommand(GenericCommand command) { LCLogger.Debug($"{id} => {FormatCommand(command)}"); byte[] bytes = command.ToByteArray(); - Task sendTask = client.Send(bytes); + Task sendTask = ws.Send(bytes); if (await Task.WhenAny(sendTask, Task.Delay(SEND_TIMEOUT)) == sendTask) { await sendTask; } else { @@ -143,18 +155,22 @@ namespace LeanCloud.Realtime.Internal.Connection { } /// - /// 关闭连接 + /// 断开连接 /// - /// - internal async Task Close() { - OnNotification = null; - OnDisconnect = null; - OnReconnected = null; + private void Disconnect() { + state = State.Closed; heartBeat.Stop(); - await client.Close(); + _ = ws.Close(); + foreach (LCIMClient client in idToClients.Values) { + client.HandleDisconnected(); + } } - private void OnClientMessage(byte[] bytes) { + /// + /// 消息接收回调 + /// + /// + private void OnMessage(byte[] bytes) { try { GenericCommand command = GenericCommand.Parser.ParseFrom(bytes); LCLogger.Debug($"{id} <= {FormatCommand(command)}"); @@ -179,10 +195,17 @@ namespace LeanCloud.Realtime.Internal.Connection { } } else { if (command.Cmd == CommandType.Echo) { + // 心跳应答 heartBeat.Pong(); + } else if (command.Cmd == CommandType.Goaway) { + // 针对连接的消息 + Reset(); } else { // 通知 - OnNotification?.Invoke(command); + if (idToClients.TryGetValue(command.PeerId, out LCIMClient client)) { + // 通知具体客户端 + client.HandleNotification(command); + } } } } catch (Exception e) { @@ -190,17 +213,27 @@ namespace LeanCloud.Realtime.Internal.Connection { } } - private void OnClientDisconnect() { - heartBeat.Stop(); - OnDisconnect?.Invoke(); - // 重连 + /// + /// 连接断开回调 + /// + private void OnDisconnect() { + Disconnect(); _ = Reconnect(); } - private async void OnPingTimeout() { - await client.Close(); - OnDisconnect?.Invoke(); - // 重连 + /// + /// 重置连接 + /// + /// + internal void Reset() { + Disconnect(); + // 重新创建连接组件 + heartBeat = new LCHeartBeat(this, OnDisconnect); + router = new LCRTMRouter(); + ws = new LCWebSocketClient { + OnMessage = OnMessage, + OnClose = OnDisconnect + }; _ = Reconnect(); } @@ -223,9 +256,11 @@ namespace LeanCloud.Realtime.Internal.Connection { if (reconnectCount < MAX_RECONNECT_TIMES) { // 重连成功 LCLogger.Debug("Reconnected"); - client.OnMessage = OnClientMessage; - client.OnClose = OnClientDisconnect; - OnReconnected?.Invoke(); + ws.OnMessage = OnMessage; + ws.OnClose = OnDisconnect; + foreach (LCIMClient client in idToClients.Values) { + client.HandleReconnected(); + } break; } else { // 重置 Router,继续尝试重连 @@ -242,5 +277,31 @@ namespace LeanCloud.Realtime.Internal.Connection { sb.Append($"\n{command}"); return sb.ToString(); } + + internal void Register(LCIMClient client) { + idToClients[client.Id] = client; + } + + internal void UnRegister(LCIMClient client) { + idToClients.Remove(client.Id); + if (idToClients.Count == 0) { + Disconnect(); + LCRealtime.RemoveConnection(this); + } + } + + /// + /// 暂停连接 + /// + internal void Pause() { + Disconnect(); + } + + /// + /// 恢复连接 + /// + internal void Resume() { + _ = Reconnect(); + } } } diff --git a/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs b/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs index 01629f3..bd62a3b 100644 --- a/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs +++ b/Realtime/Realtime/Internal/Connection/LCHeartBeat.cs @@ -11,7 +11,7 @@ namespace LeanCloud.Realtime.Internal.Connection { /// 3. 每隔 180s 检测 pong 包间隔,超过 360s 则认为断开 /// public class LCHeartBeat { - private const int PING_INTERVAL = 5 * 1000; + private const int PING_INTERVAL = 180 * 1000; private readonly LCConnection connection; diff --git a/Realtime/Realtime/Internal/Controller/LCIMController.cs b/Realtime/Realtime/Internal/Controller/LCIMController.cs index 598a526..1a6d933 100644 --- a/Realtime/Realtime/Internal/Controller/LCIMController.cs +++ b/Realtime/Realtime/Internal/Controller/LCIMController.cs @@ -1,5 +1,4 @@ -using System.Threading.Tasks; -using LeanCloud.Realtime.Internal.Protocol; +using LeanCloud.Realtime.Internal.Protocol; using LeanCloud.Realtime.Internal.Connection; namespace LeanCloud.Realtime.Internal.Controller { @@ -12,11 +11,11 @@ namespace LeanCloud.Realtime.Internal.Controller { Client = client; } - internal abstract Task OnNotification(GenericCommand notification); + internal abstract void HandleNotification(GenericCommand notification); protected LCConnection Connection { get { - return Client.Connection; + return LCRealtime.GetConnection(LCApplication.AppId); } } diff --git a/Realtime/Realtime/Internal/Controller/LCIMConversationController.cs b/Realtime/Realtime/Internal/Controller/LCIMConversationController.cs index 68798b0..b2f288f 100644 --- a/Realtime/Realtime/Internal/Controller/LCIMConversationController.cs +++ b/Realtime/Realtime/Internal/Controller/LCIMConversationController.cs @@ -125,7 +125,7 @@ namespace LeanCloud.Realtime.Internal.Controller { }; GenericCommand request = NewCommand(CommandType.Conv, OpType.Update); request.ConvMessage = conv; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); JsonObjectMessage attr = response.ConvMessage.AttrModified; // 更新自定义属性 if (attr != null) { @@ -159,7 +159,7 @@ namespace LeanCloud.Realtime.Internal.Controller { } GenericCommand request = NewCommand(CommandType.Conv, OpType.Add); request.ConvMessage = conv; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); List allowedIds = response.ConvMessage.AllowedPids.ToList(); List errors = response.ConvMessage.FailedPids.ToList(); return NewPartiallySuccessResult(allowedIds, errors); @@ -189,7 +189,7 @@ namespace LeanCloud.Realtime.Internal.Controller { } GenericCommand request = NewCommand(CommandType.Conv, OpType.Remove); request.ConvMessage = conv; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); List allowedIds = response.ConvMessage.AllowedPids.ToList(); List errors = response.ConvMessage.FailedPids.ToList(); return NewPartiallySuccessResult(allowedIds, errors); @@ -206,7 +206,7 @@ namespace LeanCloud.Realtime.Internal.Controller { }; GenericCommand request = NewCommand(CommandType.Conv, OpType.Mute); request.ConvMessage = conv; - await Client.Connection.SendRequest(request); + await Connection.SendRequest(request); } /// @@ -220,7 +220,7 @@ namespace LeanCloud.Realtime.Internal.Controller { }; GenericCommand request = NewCommand(CommandType.Conv, OpType.Unmute); request.ConvMessage = conv; - await Client.Connection.SendRequest(request); + await Connection.SendRequest(request); } /// @@ -240,7 +240,7 @@ namespace LeanCloud.Realtime.Internal.Controller { conv.M.AddRange(clientIds); GenericCommand request = NewCommand(CommandType.Conv, OpType.AddShutup); request.ConvMessage = conv; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); return NewPartiallySuccessResult(response.ConvMessage.AllowedPids, response.ConvMessage.FailedPids); } @@ -258,7 +258,7 @@ namespace LeanCloud.Realtime.Internal.Controller { conv.M.AddRange(clientIds); GenericCommand request = NewCommand(CommandType.Conv, OpType.RemoveShutup); request.ConvMessage = conv; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); return NewPartiallySuccessResult(response.ConvMessage.AllowedPids, response.ConvMessage.FailedPids); } @@ -285,7 +285,7 @@ namespace LeanCloud.Realtime.Internal.Controller { } GenericCommand request = NewCommand(CommandType.Blacklist, OpType.Block); request.BlacklistMessage = blacklist; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); return NewPartiallySuccessResult(response.BlacklistMessage.AllowedPids, response.BlacklistMessage.FailedPids); } @@ -312,7 +312,7 @@ namespace LeanCloud.Realtime.Internal.Controller { } GenericCommand request = NewCommand(CommandType.Blacklist, OpType.Unblock); request.BlacklistMessage = blacklist; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); return NewPartiallySuccessResult(response.BlacklistMessage.AllowedPids, response.BlacklistMessage.FailedPids); } @@ -336,7 +336,7 @@ namespace LeanCloud.Realtime.Internal.Controller { }; GenericCommand request = NewCommand(CommandType.Conv, OpType.MemberInfoUpdate); request.ConvMessage = conv; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); } /// @@ -386,7 +386,7 @@ namespace LeanCloud.Realtime.Internal.Controller { } GenericCommand request = NewCommand(CommandType.Conv, OpType.QueryShutup); request.ConvMessage = conv; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); return new LCIMPageResult { Results = new ReadOnlyCollection(response.ConvMessage.M), Next = response.ConvMessage.Next @@ -412,7 +412,7 @@ namespace LeanCloud.Realtime.Internal.Controller { } GenericCommand request = NewCommand(CommandType.Blacklist, OpType.Query); request.BlacklistMessage = black; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); return new LCIMPageResult { Results = new ReadOnlyCollection(response.BlacklistMessage.BlockedPids), Next = response.BlacklistMessage.Next @@ -525,7 +525,7 @@ namespace LeanCloud.Realtime.Internal.Controller { }; GenericCommand request = NewCommand(CommandType.Conv, OpType.Members); request.ConvMessage = conv; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); ReadOnlyCollection members = response.ConvMessage.M .ToList().AsReadOnly(); return members; @@ -541,7 +541,7 @@ namespace LeanCloud.Realtime.Internal.Controller { conv.Cids.Add(convId); GenericCommand request = NewCommand(CommandType.Conv, OpType.IsMember); request.ConvMessage = conv; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); JsonObjectMessage jsonObj = response.ConvMessage.Results; Dictionary result = JsonConvert.DeserializeObject>(jsonObj.Data); if (result.TryGetValue(convId, out object obj)) { @@ -573,11 +573,11 @@ namespace LeanCloud.Realtime.Internal.Controller { #region 消息处理 - internal override async Task OnNotification(GenericCommand notification) { + internal override void HandleNotification(GenericCommand notification) { if (notification.Cmd == CommandType.Conv) { - await OnConversation(notification); + _ = OnConversation(notification); } else if (notification.Cmd == CommandType.Unread) { - await OnUnread(notification); + _ = OnUnread(notification); } } @@ -695,8 +695,8 @@ namespace LeanCloud.Realtime.Internal.Controller { /// private async Task OnLeft(ConvCommand convMessage) { LCIMConversation conversation = await Client.GetOrQueryConversation(convMessage.Cid); - // TODO 从内存中清除对话 - + // 从内存中清除对话 + Client.ConversationDict.Remove(conversation.Id); Client.OnKicked?.Invoke(conversation, convMessage.InitBy); } diff --git a/Realtime/Realtime/Internal/Controller/LCIMGoAwayController.cs b/Realtime/Realtime/Internal/Controller/LCIMGoAwayController.cs deleted file mode 100644 index 1bcb57d..0000000 --- a/Realtime/Realtime/Internal/Controller/LCIMGoAwayController.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System.Threading.Tasks; -using LeanCloud.Realtime.Internal.Protocol; - -namespace LeanCloud.Realtime.Internal.Controller { - internal class LCIMGoAwayController : LCIMController { - internal LCIMGoAwayController(LCIMClient client) : base(client) { - - } - - #region 消息处理 - - internal override async Task OnNotification(GenericCommand notification) { - // 清空缓存,断开连接,等待重新连接 - await Connection.Reset(); - } - - #endregion - } -} diff --git a/Realtime/Realtime/Internal/Controller/LCIMMessageController.cs b/Realtime/Realtime/Internal/Controller/LCIMMessageController.cs index 0369be1..cec51cc 100644 --- a/Realtime/Realtime/Internal/Controller/LCIMMessageController.cs +++ b/Realtime/Realtime/Internal/Controller/LCIMMessageController.cs @@ -65,7 +65,7 @@ namespace LeanCloud.Realtime.Internal.Controller { if (command.Priority > 0) { command.Priority = (int)options.Priority; } - GenericCommand response = await Client.Connection.SendRequest(command); + GenericCommand response = await Connection.SendRequest(command); // 消息发送应答 AckCommand ack = response.AckMessage; message.Id = ack.Uid; @@ -94,7 +94,7 @@ namespace LeanCloud.Realtime.Internal.Controller { patch.Patches.Add(item); GenericCommand request = NewCommand(CommandType.Patch, OpType.Modify); request.PatchMessage = patch; - await Client.Connection.SendRequest(request); + await Connection.SendRequest(request); } /// @@ -130,7 +130,7 @@ namespace LeanCloud.Realtime.Internal.Controller { patch.Patches.Add(item); GenericCommand request = NewCommand(CommandType.Patch, OpType.Modify); request.PatchMessage = patch; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); } /// @@ -170,7 +170,7 @@ namespace LeanCloud.Realtime.Internal.Controller { } GenericCommand request = NewCommand(CommandType.Logs, OpType.Open); request.LogsMessage = logs; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); // 反序列化聊天记录 return response.LogsMessage.Logs.Select(item => { LCIMMessage message; @@ -211,7 +211,7 @@ namespace LeanCloud.Realtime.Internal.Controller { }; GenericCommand command = NewCommand(CommandType.Ack); command.AckMessage = ack; - await Client.Connection.SendCommand(command); + await Connection.SendCommand(command); } /// @@ -231,20 +231,20 @@ namespace LeanCloud.Realtime.Internal.Controller { read.Convs.Add(tuple); GenericCommand command = NewCommand(CommandType.Read); command.ReadMessage = read; - await Client.Connection.SendCommand(command); + await Connection.SendCommand(command); } #endregion #region 消息处理 - internal override async Task OnNotification(GenericCommand notification) { + internal override void HandleNotification(GenericCommand notification) { if (notification.Cmd == CommandType.Direct) { - await OnMessaage(notification); + _ = OnMessaage(notification); } else if (notification.Cmd == CommandType.Patch) { - await OnMessagePatched(notification); + _ = OnMessagePatched(notification); } else if (notification.Cmd == CommandType.Rcp) { - await OnMessageReceipt(notification); + _ = OnMessageReceipt(notification); } } diff --git a/Realtime/Realtime/Internal/Controller/LCIMSessionController.cs b/Realtime/Realtime/Internal/Controller/LCIMSessionController.cs index 28b913c..81e1861 100644 --- a/Realtime/Realtime/Internal/Controller/LCIMSessionController.cs +++ b/Realtime/Realtime/Internal/Controller/LCIMSessionController.cs @@ -20,13 +20,16 @@ namespace LeanCloud.Realtime.Internal.Controller { /// /// internal async Task Open(bool force) { + await Connection.Connect(); + SessionCommand session = await NewSessionCommand(); session.R = !force; session.ConfigBitmap = 0x2B; GenericCommand request = NewCommand(CommandType.Session, OpType.Open); request.SessionMessage = session; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); UpdateSession(response.SessionMessage); + Connection.Register(Client); } /// @@ -38,11 +41,11 @@ namespace LeanCloud.Realtime.Internal.Controller { session.R = true; GenericCommand request = NewCommand(CommandType.Session, OpType.Open); request.SessionMessage = session; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); if (response.Op == OpType.Opened) { UpdateSession(response.SessionMessage); } else if (response.Op == OpType.Closed) { - await OnClosed(response.SessionMessage); + OnClosed(response.SessionMessage); } } @@ -52,7 +55,8 @@ namespace LeanCloud.Realtime.Internal.Controller { /// internal async Task Close() { GenericCommand request = NewCommand(CommandType.Session, OpType.Close); - await Client.Connection.SendRequest(request); + await Connection.SendRequest(request); + Connection.UnRegister(Client); } /// @@ -72,7 +76,7 @@ namespace LeanCloud.Realtime.Internal.Controller { SessionCommand session = await NewSessionCommand(); GenericCommand request = NewCommand(CommandType.Session, OpType.Refresh); request.SessionMessage = session; - GenericCommand response = await Client.Connection.SendRequest(request); + GenericCommand response = await Connection.SendRequest(request); UpdateSession(response.SessionMessage); } @@ -120,10 +124,10 @@ namespace LeanCloud.Realtime.Internal.Controller { #region 消息处理 - internal override async Task OnNotification(GenericCommand notification) { + internal override void HandleNotification(GenericCommand notification) { switch (notification.Op) { case OpType.Closed: - await OnClosed(notification.SessionMessage); + OnClosed(notification.SessionMessage); break; default: break; @@ -135,11 +139,11 @@ namespace LeanCloud.Realtime.Internal.Controller { /// /// /// - private async Task OnClosed(SessionCommand session) { + private void OnClosed(SessionCommand session) { int code = session.Code; string reason = session.Reason; string detail = session.Detail; - await Connection.Close(); + Connection.UnRegister(Client); Client.OnClose?.Invoke(code, reason); } diff --git a/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs b/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs index ca42dec..ab4277f 100644 --- a/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs +++ b/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs @@ -81,7 +81,7 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } /// - /// 发送数据 + /// 发送二进制数据 /// /// /// diff --git a/Realtime/Realtime/LCIMClient.cs b/Realtime/Realtime/LCIMClient.cs index 4799c4a..6e08d3c 100644 --- a/Realtime/Realtime/LCIMClient.cs +++ b/Realtime/Realtime/LCIMClient.cs @@ -3,31 +3,44 @@ using System.Collections.Generic; using System.Threading.Tasks; using System.Linq; using System.Collections.ObjectModel; -using LeanCloud.Common; using LeanCloud.Storage; using LeanCloud.Realtime.Internal.Protocol; using LeanCloud.Realtime.Internal.Controller; -using LeanCloud.Realtime.Internal.Connection; namespace LeanCloud.Realtime { /// /// 通信客户端 /// public class LCIMClient { + /// + /// 对话缓存 + /// internal Dictionary ConversationDict; + /// + /// 用户 Id + /// public string Id { get; private set; } + /// + /// 用户标识 + /// public string Tag { get; private set; } + /// + /// 设备 Id + /// public string DeviceId { get; private set; } + /// + /// 登录 tokens + /// internal string SessionToken { get; private set; } @@ -219,10 +232,6 @@ namespace LeanCloud.Realtime { get; private set; } - internal LCConnection Connection { - get; set; - } - internal LCIMSessionController SessionController { get; private set; } @@ -231,10 +240,6 @@ namespace LeanCloud.Realtime { get; private set; } - internal LCIMGoAwayController GoAwayController { - get; private set; - } - internal LCIMConversationController ConversationController { get; private set; } @@ -281,13 +286,6 @@ namespace LeanCloud.Realtime { SessionController = new LCIMSessionController(this); ConversationController = new LCIMConversationController(this); MessageController = new LCIMMessageController(this); - GoAwayController = new LCIMGoAwayController(this); - - Connection = new LCConnection(Id) { - OnNotification = OnConnectionNotification, - OnDisconnect = OnConnectionDisconnect, - OnReconnected = OnConnectionReconnect - }; } /// @@ -296,14 +294,12 @@ namespace LeanCloud.Realtime { /// 是否强制登录 /// public async Task Open(bool force = true) { - await Connection.Connect(); try { // 打开 Session await SessionController.Open(force); } catch (Exception e) { LCLogger.Error(e); // 如果 session 阶段异常,则关闭连接 - await Connection.Close(); throw e; } } @@ -315,7 +311,6 @@ namespace LeanCloud.Realtime { public async Task Close() { // 关闭 session await SessionController.Close(); - await Connection.Close(); } /// @@ -437,37 +432,30 @@ namespace LeanCloud.Realtime { #endregion - private void OnConnectionNotification(GenericCommand notification) { + internal void HandleNotification(GenericCommand notification) { switch (notification.Cmd) { case CommandType.Session: - _ = SessionController.OnNotification(notification); + SessionController.HandleNotification(notification); break; case CommandType.Conv: case CommandType.Unread: - _ = ConversationController.OnNotification(notification); + ConversationController.HandleNotification(notification); break; case CommandType.Direct: case CommandType.Patch: case CommandType.Rcp: - _ = MessageController.OnNotification(notification); - break; - case CommandType.Goaway: - _ = GoAwayController.OnNotification(notification); + MessageController.HandleNotification(notification); break; default: break; } } - private void OnConnectionDisconnect() { + internal void HandleDisconnected() { OnPaused?.Invoke(); } - private void OnConnectionReconnect() { - _ = HandleReconnected(); - } - - private async Task HandleReconnected() { + internal async void HandleReconnected() { try { // 打开 Session await SessionController.Reopen(); @@ -475,12 +463,11 @@ namespace LeanCloud.Realtime { OnResume?.Invoke(); } catch (Exception e) { LCLogger.Error(e); - await Connection.Close(); - // TODO 告知 + // 重连成功,但 session/open 失败 OnClose?.Invoke(0, string.Empty); } } - + internal async Task GetOrQueryConversation(string convId) { if (ConversationDict.TryGetValue(convId, out LCIMConversation conversation)) { return conversation; diff --git a/Realtime/Realtime/LCRealtime.cs b/Realtime/Realtime/LCRealtime.cs new file mode 100644 index 0000000..612650e --- /dev/null +++ b/Realtime/Realtime/LCRealtime.cs @@ -0,0 +1,51 @@ +using System.Collections.Generic; +using LeanCloud.Realtime.Internal.Connection; + +namespace LeanCloud.Realtime { + public class LCRealtime { + /// + /// RTM 服务中,每个 app 对应一条连接 + /// + private static readonly Dictionary appToConnections = new Dictionary(); + + /// + /// 获取对应的 Connection + /// + /// + /// + internal static LCConnection GetConnection(string appId) { + if (appToConnections.TryGetValue(appId, out LCConnection connection)) { + return connection; + } + connection = new LCConnection(appId); + appToConnections[appId] = connection; + return connection; + } + + /// + /// 移除 Connection + /// + /// + internal static void RemoveConnection(LCConnection connection) { + appToConnections.Remove(connection.id); + } + + /// + /// 主动断开所有 RTM 连接 + /// + public static void Pause() { + foreach (LCConnection connection in appToConnections.Values) { + connection.Pause(); + } + } + + /// + /// 主动恢复所有 RTM 连接 + /// + public static void Resume() { + foreach (LCConnection connection in appToConnections.Values) { + connection.Resume(); + } + } + } +} diff --git a/Sample/RealtimeApp/Program.cs b/Sample/RealtimeApp/Program.cs index 01ef348..1680362 100644 --- a/Sample/RealtimeApp/Program.cs +++ b/Sample/RealtimeApp/Program.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using LeanCloud; using LeanCloud.Realtime; @@ -7,15 +8,43 @@ using static System.Console; namespace RealtimeApp { class Program { static void Main(string[] args) { - Console.WriteLine("Hello World!"); + WriteLine("Hello World!"); SingleThreadSynchronizationContext.Run(async () => { LCLogger.LogDelegate += Print; LCApplication.Initialize("ikGGdRE2YcVOemAaRbgp1xGJ-gzGzoHsz", "NUKmuRbdAhg1vrb2wexYo1jo", "https://ikggdre2.lc-cn-n1-shared.com"); - LCIMClient client = new LCIMClient("lean"); + LCIMClient client = new LCIMClient("lean") { + OnPaused = () => { + WriteLine("~~~~~~~~~~~~~~~ disconnected"); + }, + OnResume = () => { + WriteLine("~~~~~~~~~~~~~~~ reconnected"); + } + }; + await client.Open(); - //await client.Close(); + + int count = 0; + while (count < 2) { + WriteLine($"pause : {count}"); + + await Task.Delay(5 * 1000); + LCRealtime.Pause(); + + await Task.Delay(5 * 1000); + LCRealtime.Resume(); + + await Task.Delay(5 * 1000); + count++; + } + + try { + await client.Close(); + // Done + } catch (Exception e) { + WriteLine($"xxxxxxxxxxxx {e.Message}"); + } }); }