diff --git a/Realtime/Conversation/LCIMConversation.cs b/Realtime/Conversation/LCIMConversation.cs index 8f19470..95ceb64 100644 --- a/Realtime/Conversation/LCIMConversation.cs +++ b/Realtime/Conversation/LCIMConversation.cs @@ -4,22 +4,36 @@ using System.Threading.Tasks; using System.Linq; using System.Collections.ObjectModel; using LeanCloud.Storage; -using LeanCloud.Storage.Internal.Codec; namespace LeanCloud.Realtime { + /// + /// 普通对话 + /// public class LCIMConversation { + /// + /// 对话 Id + /// public string Id { get; internal set; } + /// + /// 是否唯一 + /// public bool Unique { get; internal set; } + /// + /// 唯一 Id + /// public string UniqueId { get; internal set; } + /// + /// 对话名称 + /// public string Name { get { return this["name"] as string; @@ -29,42 +43,69 @@ namespace LeanCloud.Realtime { } } + /// + /// 创建者 Id + /// public string CreatorId { get; set; } + /// + /// 成员 Id + /// public ReadOnlyCollection MemberIds { get { return new ReadOnlyCollection(ids.ToList()); } } + /// + /// 静音成员 Id + /// public ReadOnlyCollection MutedMemberIds { get { return new ReadOnlyCollection(mutedIds.ToList()); } } + /// + /// 未读消息数量 + /// public int Unread { get; internal set; } + /// + /// 最新的一条消息 + /// public LCIMMessage LastMessage { get; internal set; } + /// + /// 创建时间 + /// public DateTime CreatedAt { get; internal set; } + /// + /// 更新时间 + /// public DateTime UpdatedAt { get; internal set; } + /// + /// 最新送达消息时间戳 + /// public long LastDeliveredTimestamp { get; internal set; } + /// + /// 最新送达消息时间 + /// public DateTime LastDeliveredAt { get { DateTimeOffset dateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(LastDeliveredTimestamp); @@ -72,10 +113,16 @@ namespace LeanCloud.Realtime { } } + /// + /// 最新已读消息时间戳 + /// public long LastReadTimestamp { get; internal set; } + /// + /// 最新已读消息时间 + /// public DateTime LastReadAt { get { DateTimeOffset dateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(LastReadTimestamp); @@ -83,6 +130,11 @@ namespace LeanCloud.Realtime { } } + /// + /// 设置/获取对话属性 + /// + /// + /// public object this[string key] { get { return customProperties[key]; @@ -92,6 +144,9 @@ namespace LeanCloud.Realtime { } } + /// + /// 是否已静音 + /// public bool IsMute { get; private set; } @@ -100,7 +155,7 @@ namespace LeanCloud.Realtime { get; private set; } - private Dictionary customProperties; + private readonly Dictionary customProperties; internal HashSet ids; @@ -128,7 +183,7 @@ namespace LeanCloud.Realtime { if (LastMessage == null) { return; } - await Client.ConversationController.Read(Id, LastMessage); + await Client.MessageController.Read(Id, LastMessage); } /// @@ -374,7 +429,7 @@ namespace LeanCloud.Realtime { /// 限制 /// 消息类型 /// - public async Task> QueryMessages(LCIMMessageQueryEndpoint start = null, + public async Task> QueryMessages(LCIMMessageQueryEndpoint start = null, LCIMMessageQueryEndpoint end = null, LCIMMessageQueryDirection direction = LCIMMessageQueryDirection.NewToOld, int limit = 20, diff --git a/Realtime/Internal/Connection/LCConnection.cs b/Realtime/Internal/Connection/LCConnection.cs index 7204feb..ab98eba 100644 --- a/Realtime/Internal/Connection/LCConnection.cs +++ b/Realtime/Internal/Connection/LCConnection.cs @@ -34,16 +34,31 @@ namespace LeanCloud.Realtime.Internal.Connection { /// private const int HEART_BEAT_INTERVAL = 5000; + /// + /// 通知事件 + /// internal Action OnNotification; + /// + /// 断线事件 + /// internal Action OnDisconnect; + /// + /// 开始重连事件 + /// internal Action OnReconnecting; + /// + /// 重连成功事件 + /// internal Action OnReconnected; internal string id; + /// + /// 请求回调缓存 + /// private readonly Dictionary> responses; private int requestI = 1; @@ -61,7 +76,7 @@ namespace LeanCloud.Realtime.Internal.Connection { router = new LCRTMRouter(); client = new LCWebSocketClient(router, heartBeat) { OnMessage = OnClientMessage, - OnDisconnect = OnClientDisconnect + OnClose = OnClientDisconnect }; } @@ -69,41 +84,63 @@ namespace LeanCloud.Realtime.Internal.Connection { await client.Connect(); } + /// + /// 重置连接 + /// + /// internal async Task Reset() { - router.Reset(); + // 关闭就连接 await client.Close(); + // 重新创建连接组件 heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL); router = new LCRTMRouter(); client = new LCWebSocketClient(router, heartBeat) { OnMessage = OnClientMessage, - OnDisconnect = OnClientDisconnect + OnClose = OnClientDisconnect }; await Reconnect(); } + /// + /// 发送请求,会在收到应答后返回 + /// + /// + /// internal async Task SendRequest(GenericCommand request) { TaskCompletionSource tcs = new TaskCompletionSource(); request.I = requestI++; responses.Add(request.I, tcs); - LCLogger.Debug($"{id} => {FormatCommand(request)}"); - byte[] bytes = request.ToByteArray(); - Task sendTask = client.Send(bytes); - if (await Task.WhenAny(sendTask, Task.Delay(SEND_TIMEOUT)) == sendTask) { - try { - await sendTask; - } catch (Exception e) { - tcs.TrySetException(e); - } - } else { - tcs.TrySetException(new TimeoutException("Send request")); + try { + await SendCommand(request); + } catch (Exception e) { + tcs.TrySetException(e); } return await tcs.Task; } + /// + /// 发送命令 + /// + /// + /// + internal async Task SendCommand(GenericCommand command) { + LCLogger.Debug($"{id} => {FormatCommand(command)}"); + byte[] bytes = command.ToByteArray(); + Task sendTask = client.Send(bytes); + if (await Task.WhenAny(sendTask, Task.Delay(SEND_TIMEOUT)) == sendTask) { + await sendTask; + } else { + throw new TimeoutException("Send request"); + } + } + + /// + /// 关闭连接 + /// + /// internal async Task Close() { OnNotification = null; OnDisconnect = null; - heartBeat.Stop(); await client.Close(); } @@ -154,8 +191,6 @@ namespace LeanCloud.Realtime.Internal.Connection { try { LCLogger.Debug($"Reconnecting... {reconnectCount}"); await client.Connect(); - client.OnMessage = OnClientMessage; - client.OnDisconnect = OnClientDisconnect; break; } catch (Exception e) { reconnectCount++; @@ -167,11 +202,13 @@ namespace LeanCloud.Realtime.Internal.Connection { if (reconnectCount < MAX_RECONNECT_TIMES) { // 重连成功 LCLogger.Debug("Reconnected"); + client.OnMessage = OnClientMessage; + client.OnClose = OnClientDisconnect; OnReconnected?.Invoke(); break; } else { // 重置 Router,继续尝试重连 - router.Reset(); + router = new LCRTMRouter(); } } } diff --git a/Realtime/Internal/Connection/LCHeartBeat.cs b/Realtime/Internal/Connection/LCHeartBeat.cs index be26f5f..fa343d5 100644 --- a/Realtime/Internal/Connection/LCHeartBeat.cs +++ b/Realtime/Internal/Connection/LCHeartBeat.cs @@ -6,7 +6,7 @@ using LeanCloud.Realtime.Protocol; namespace LeanCloud.Realtime.Internal.Connection { /// - /// 心跳控制器 + /// 心跳控制器,由于 .Net Standard 2.0 不支持发送 ping frame,所以需要发送逻辑心跳 /// 1. 每次接收到消息后开始监听,如果在 pingInterval 时间内没有再次接收到消息,则发送 ping 请求; /// 2. 发送后等待 pongInterval 时间,如果在此时间内接收到了任何消息,则取消并重新开始监听 1; /// 3. 如果没收到消息,则认为超时并回调,连接层接收回调后放弃当前连接,以断线逻辑处理 diff --git a/Realtime/Internal/Controller/LCIMMessageController.cs b/Realtime/Internal/Controller/LCIMMessageController.cs index d041b21..83deb91 100644 --- a/Realtime/Internal/Controller/LCIMMessageController.cs +++ b/Realtime/Internal/Controller/LCIMMessageController.cs @@ -223,6 +223,7 @@ namespace LeanCloud.Realtime.Internal.Controller { message.IsTransient = direct.Transient; // 通知服务端已接收 if (!message.IsTransient) { + // 只有非暂态消息才需要发送 ack _ = Ack(message.ConversationId, message.Id); } // 获取对话 diff --git a/Realtime/Internal/Controller/LCIMSessionController.cs b/Realtime/Internal/Controller/LCIMSessionController.cs index 894502e..336dbb9 100644 --- a/Realtime/Internal/Controller/LCIMSessionController.cs +++ b/Realtime/Internal/Controller/LCIMSessionController.cs @@ -18,9 +18,9 @@ namespace LeanCloud.Realtime.Internal.Controller { /// 打开会话 /// /// - internal async Task Open(bool reconnect) { + internal async Task Open(bool force) { SessionCommand session = NewSessionCommand(); - session.R = reconnect; + session.R = !force; GenericCommand request = NewCommand(CommandType.Session, OpType.Open); request.SessionMessage = session; GenericCommand response = await Client.Connection.SendRequest(request); @@ -78,7 +78,9 @@ namespace LeanCloud.Realtime.Internal.Controller { SessionCommand session = new SessionCommand(); if (Client.Tag != null) { session.Tag = Client.Tag; - session.DeviceId = Guid.NewGuid().ToString(); + } + if (Client.DeviceId != null) { + session.DeviceId = Client.DeviceId; } if (Client.SignatureFactory != null) { LCIMSignature signature = Client.SignatureFactory.CreateConnectSignature(Client.Id); diff --git a/Realtime/Internal/Router/LCRTMRouter.cs b/Realtime/Internal/Router/LCRTMRouter.cs index ad7d9f5..d663b46 100644 --- a/Realtime/Internal/Router/LCRTMRouter.cs +++ b/Realtime/Internal/Router/LCRTMRouter.cs @@ -28,13 +28,6 @@ namespace LeanCloud.Realtime.Internal.Router { return rtmServer; } - /// - /// 重置服务器地址缓存 - /// - internal void Reset() { - rtmServer = null; - } - async Task Fetch() { string server = await LCApplication.AppRouter.GetRealtimeServer(); string url = $"{server}/v1/route?appId={LCApplication.AppId}&secure=1"; diff --git a/Realtime/Internal/WebSocket/LCWebSocketClient.cs b/Realtime/Internal/WebSocket/LCWebSocketClient.cs index 5d80ccc..75af743 100644 --- a/Realtime/Internal/WebSocket/LCWebSocketClient.cs +++ b/Realtime/Internal/WebSocket/LCWebSocketClient.cs @@ -7,21 +7,31 @@ using LeanCloud.Realtime.Internal.Connection; namespace LeanCloud.Realtime.Internal.WebSocket { /// - /// WebSocket 客户端,只与通信协议相关 + /// WebSocket 客户端,负责底层连接和事件,只与通信协议相关 /// internal class LCWebSocketClient { // .net standard 2.0 好像在拼合 Frame 时有 bug,所以将这个值调整大一些 private const int RECV_BUFFER_SIZE = 1024 * 5; + /// + /// 关闭超时 + /// private const int CLOSE_TIMEOUT = 5000; + /// + /// 连接超时 + /// private const int CONNECT_TIMEOUT = 10000; + /// + /// 消息事件 + /// internal Action OnMessage; - internal Action OnDisconnect; - - internal Action OnReconnect; + /// + /// 连接关闭 + /// + internal Action OnClose; private ClientWebSocket ws; @@ -34,6 +44,10 @@ namespace LeanCloud.Realtime.Internal.WebSocket { this.heartBeat = heartBeat; } + /// + /// 连接 + /// + /// internal async Task Connect() { try { LCRTMServer rtmServer = await router.GetServer(); @@ -53,6 +67,11 @@ namespace LeanCloud.Realtime.Internal.WebSocket { _ = StartReceive(); } + /// + /// 连接指定 ws 服务器 + /// + /// + /// private async Task Connect(string server) { LCLogger.Debug($"Connecting WebSocket: {server}"); Task timeoutTask = Task.Delay(CONNECT_TIMEOUT); @@ -66,11 +85,14 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } } + /// + /// 主动关闭连接 + /// + /// internal async Task Close() { LCLogger.Debug("Closing WebSocket"); OnMessage = null; - OnDisconnect = null; - OnReconnect = null; + OnClose = null; heartBeat.Stop(); try { // 发送关闭帧可能会很久,所以增加超时 @@ -87,6 +109,11 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } } + /// + /// 发送数据 + /// + /// + /// internal async Task Send(byte[] data) { ArraySegment bytes = new ArraySegment(data); if (ws.State == WebSocketState.Open) { @@ -103,28 +130,30 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } } + /// + /// 接收数据 + /// + /// private async Task StartReceive() { byte[] buffer = new byte[RECV_BUFFER_SIZE]; try { while (ws.State == WebSocketState.Open) { WebSocketReceiveResult result = await ws.ReceiveAsync(new ArraySegment(buffer), default); if (result.MessageType == WebSocketMessageType.Close) { - // 由服务端发起关闭 LCLogger.Debug($"Receive Closed: {result.CloseStatus}"); - LCLogger.Debug($"ws state: {ws.State}"); - // 这里有可能是客户端主动关闭,也有可能是服务端主动关闭 if (ws.State == WebSocketState.CloseReceived) { // 如果是服务端主动关闭,则挥手关闭,并认为是断线 try { - await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default); + Task closeTask = ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default); + await Task.WhenAny(closeTask, Task.Delay(CLOSE_TIMEOUT)); } catch (Exception e) { LCLogger.Error(e); } finally { - OnDisconnect?.Invoke(); + HandleExceptionClose(); } } } else if (result.MessageType == WebSocketMessageType.Binary) { - _ = heartBeat.Update(HandleClose); + _ = heartBeat.Update(HandleExceptionClose); // 拼合 WebSocket Message int length = result.Count; byte[] data = new byte[length]; @@ -137,17 +166,19 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } catch (Exception e) { // 客户端网络异常 LCLogger.Error(e); - OnDisconnect?.Invoke(); + OnClose?.Invoke(); } } - private void HandleClose() { + private void HandleExceptionClose() { try { heartBeat.Stop(); ws.Abort(); ws.Dispose(); } catch (Exception e) { LCLogger.Error(e); + } finally { + OnClose?.Invoke(); } } } diff --git a/Realtime/LCIMClient.cs b/Realtime/LCIMClient.cs index ded9bce..be282b3 100644 --- a/Realtime/LCIMClient.cs +++ b/Realtime/LCIMClient.cs @@ -23,8 +23,39 @@ namespace LeanCloud.Realtime { get; private set; } + public string DeviceId { + get; private set; + } + #region 事件 + #region 连接状态事件 + + /// + /// 客户端连接断开 + /// + public Action OnPaused { + get; set; + } + + /// + /// 客户端连接恢复正常 + /// + public Action OnResume { + get; set; + } + + /// + /// 当前客户端被服务端强行下线 + /// + public Action OnClose { + get; set; + } + + #endregion + + #region 对话事件 + /// /// 当前用户被加入某个对话的黑名单 /// @@ -49,55 +80,6 @@ namespace LeanCloud.Realtime { /// public Action OnUnmuted; - /// - /// 客户端连接断开 - /// - public Action OnPaused { - get; set; - } - - /// - /// 客户端连接恢复正常 - /// - public Action OnResume { - get; set; - } - - /// - /// 当前客户端被服务端强行下线 - /// - public Action OnClose { - get; set; - } - - /// - /// 客户端连接断开 - /// - public Action OnDisconnect { - get; set; - } - - /// - /// 客户端正在重连 - /// - public Action OnReconnecting { - get; set; - } - - /// - /// 客户端重连成功 - /// - public Action OnReconnected { - get; set; - } - - /// - /// 用户在其他客户端登录,当前客户端被服务端强行下线 - /// - public Action OnConflict { - get; set; - } - /// /// 该对话信息被更新 /// @@ -164,6 +146,10 @@ namespace LeanCloud.Realtime { /// public Action OnMemberInfoUpdated; + #endregion + + #region 消息事件 + /// /// 当前用户收到消息 /// @@ -207,18 +193,23 @@ namespace LeanCloud.Realtime { } /// - /// + /// 最近分发消息更新 /// public Action OnLastDeliveredAtUpdated { get; set; } + /// + /// 最近已读消息更新 + /// public Action OnLastReadAtUpdated { get; set; } #endregion + #endregion + internal ILCIMSignatureFactory SignatureFactory { get; private set; } @@ -255,9 +246,11 @@ namespace LeanCloud.Realtime { public LCIMClient(string clientId, string tag = null, + string deviceId = null, ILCIMSignatureFactory signatureFactory = null) { Id = clientId; Tag = tag; + DeviceId = deviceId; SignatureFactory = signatureFactory; ConversationDict = new Dictionary(); @@ -278,13 +271,14 @@ namespace LeanCloud.Realtime { } /// - /// 连接 + /// 登录 /// + /// 是否强制登录 /// - public async Task Open(bool reconnect = false) { + public async Task Open(bool force = true) { await Connection.Connect(); // 打开 Session - await SessionController.Open(reconnect); + await SessionController.Open(force); } /// @@ -442,7 +436,7 @@ namespace LeanCloud.Realtime { } private void OnConnectionDisconnect() { - OnDisconnect?.Invoke(); + OnPaused?.Invoke(); } private void OnConnectionReconnect() { @@ -454,7 +448,7 @@ namespace LeanCloud.Realtime { // 打开 Session await SessionController.Reopen(); // 回调用户 - OnReconnected?.Invoke(); + OnResume?.Invoke(); } catch (Exception e) { LCLogger.Error(e); await Connection.Close(); diff --git a/Realtime/Message/LCIMMessage.cs b/Realtime/Message/LCIMMessage.cs index 2c6e70a..786c229 100644 --- a/Realtime/Message/LCIMMessage.cs +++ b/Realtime/Message/LCIMMessage.cs @@ -78,6 +78,10 @@ namespace LeanCloud.Realtime { get; internal set; } + public bool IsTransient { + get; internal set; + } + internal LCIMMessage() { } }