From 51fe69d7c0ff1da7f753ecaf062d5238569b9df2 Mon Sep 17 00:00:00 2001 From: oneRain Date: Wed, 13 May 2020 17:04:23 +0800 Subject: [PATCH] chore --- .../Internal/LCLiveQueryConnection.cs | 230 +++++++++++++++ LiveQuery/LiveQuery/LCLiveQuery.cs | 265 ++++++++++++++++++ 2 files changed, 495 insertions(+) create mode 100644 LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs create mode 100644 LiveQuery/LiveQuery/LCLiveQuery.cs diff --git a/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs b/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs new file mode 100644 index 0000000..7203eac --- /dev/null +++ b/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs @@ -0,0 +1,230 @@ +using System; +using System.Text; +using System.Collections.Generic; +using System.Threading.Tasks; +using Newtonsoft.Json; +using LeanCloud.Realtime.Internal.Router; +using LeanCloud.Realtime.Internal.WebSocket; +using LeanCloud.Common; +using LeanCloud.Storage; + +namespace LeanCloud.LiveQuery.Internal { + public class LCLiveQueryConnection { + /// + /// 发送超时 + /// + private const int SEND_TIMEOUT = 10000; + + /// + /// 最大重连次数,超过后重置 Router 缓存后再次尝试重连 + /// + private const int MAX_RECONNECT_TIMES = 10; + + /// + /// 重连间隔 + /// + private const int RECONNECT_INTERVAL = 10000; + + /// + /// 子协议 + /// + private const string SUB_PROTOCOL = "lc.json.3"; + + /// + /// 通知事件 + /// + internal Action> OnNotification; + + /// + /// 断线事件 + /// + internal Action OnDisconnect; + + /// + /// 重连成功事件 + /// + internal Action OnReconnected; + + internal string id; + + /// + /// 请求回调缓存 + /// + private readonly Dictionary>> responses; + + private int requestI = 1; + + private LCRTMRouter router; + + private LCLiveQueryHeartBeat heartBeat; + + private LCWebSocketClient client; + + public LCLiveQueryConnection(string id) { + this.id = id; + responses = new Dictionary>>(); + heartBeat = new LCLiveQueryHeartBeat(this); + router = new LCRTMRouter(); + client = new LCWebSocketClient { + OnMessage = OnClientMessage, + OnClose = OnClientDisconnect + }; + } + + public async Task Connect() { + try { + LCRTMServer rtmServer = await router.GetServer(); + try { + LCLogger.Debug($"Primary Server"); + await client.Connect(rtmServer.Primary, SUB_PROTOCOL); + } catch (Exception e) { + LCLogger.Error(e); + LCLogger.Debug($"Secondary Server"); + await client.Connect(rtmServer.Secondary, SUB_PROTOCOL); + } + } catch (Exception e) { + throw e; + } + } + + /// + /// 重置连接 + /// + /// + internal async Task Reset() { + // 关闭就连接 + await client.Close(); + // 重新创建连接组件 + heartBeat = new LCLiveQueryHeartBeat(this); + router = new LCRTMRouter(); + client = new LCWebSocketClient { + OnMessage = OnClientMessage, + OnClose = OnClientDisconnect + }; + await Reconnect(); + } + + /// + /// 发送请求,会在收到应答后返回 + /// + /// + /// + internal async Task> SendRequest(Dictionary request) { + TaskCompletionSource> tcs = new TaskCompletionSource>(); + int requestIndex = requestI++; + request["i"] = requestIndex; + responses.Add(requestIndex, tcs); + try { + string json = JsonConvert.SerializeObject(request); + await SendText(json); + } catch (Exception e) { + tcs.TrySetException(e); + } + return await tcs.Task; + } + + /// + /// 发送文本消息 + /// + /// + /// + internal async Task SendText(string text) { + LCLogger.Debug($"{id} => {text}"); + Task sendTask = client.Send(text); + if (await Task.WhenAny(sendTask, Task.Delay(SEND_TIMEOUT)) == sendTask) { + await sendTask; + } else { + throw new TimeoutException("Send request time out"); + } + } + + /// + /// 关闭连接 + /// + /// + internal async Task Close() { + OnNotification = null; + OnDisconnect = null; + OnReconnected = null; + heartBeat.Stop(); + await client.Close(); + } + + private void OnClientMessage(byte[] bytes) { + _ = heartBeat.Refresh(OnPingTimeout); + try { + string json = Encoding.UTF8.GetString(bytes); + Dictionary msg = JsonConvert.DeserializeObject>(json, + LCJsonConverter.Default); + LCLogger.Debug($"{id} <= {json}"); + if (msg.TryGetValue("i", out object i)) { + int requestIndex = Convert.ToInt32(i); + if (responses.TryGetValue(requestIndex, out TaskCompletionSource> tcs)) { + if (msg.TryGetValue("error", out object error)) { + // 错误 + if (error is Dictionary dict) { + int code = Convert.ToInt32(dict["code"]); + string detail = dict["detail"] as string; + tcs.SetException(new LCException(code, detail)); + } else { + tcs.SetException(new Exception(error as string)); + } + } else { + tcs.SetResult(msg); + } + responses.Remove(requestIndex); + } else { + LCLogger.Error($"No request for {requestIndex}"); + } + } else { + // 通知 + OnNotification?.Invoke(msg); + } + } catch (Exception e) { + LCLogger.Error(e); + } + } + + private void OnClientDisconnect() { + heartBeat.Stop(); + OnDisconnect?.Invoke(); + // 重连 + _ = Reconnect(); + } + + private async void OnPingTimeout() { + await client.Close(); + OnClientDisconnect(); + } + + private async Task Reconnect() { + while (true) { + int reconnectCount = 0; + // 重连策略 + while (reconnectCount < MAX_RECONNECT_TIMES) { + try { + LCLogger.Debug($"Reconnecting... {reconnectCount}"); + await Connect(); + break; + } catch (Exception e) { + reconnectCount++; + LCLogger.Error(e); + LCLogger.Debug($"Reconnect after {RECONNECT_INTERVAL}ms"); + await Task.Delay(RECONNECT_INTERVAL); + } + } + if (reconnectCount < MAX_RECONNECT_TIMES) { + // 重连成功 + LCLogger.Debug("Reconnected"); + client.OnMessage = OnClientMessage; + client.OnClose = OnClientDisconnect; + OnReconnected?.Invoke(); + break; + } else { + // 重置 Router,继续尝试重连 + router = new LCRTMRouter(); + } + } + } + } +} diff --git a/LiveQuery/LiveQuery/LCLiveQuery.cs b/LiveQuery/LiveQuery/LCLiveQuery.cs new file mode 100644 index 0000000..c6f96a6 --- /dev/null +++ b/LiveQuery/LiveQuery/LCLiveQuery.cs @@ -0,0 +1,265 @@ +using System; +using System.Linq; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Threading.Tasks; +using LeanCloud.Storage; +using LeanCloud.Storage.Internal.Object; +using LeanCloud.LiveQuery.Internal; + +namespace LeanCloud.LiveQuery { + /// + /// LiveQuery + /// + public class LCLiveQuery { + /// + /// 新对象创建事件 + /// + public Action OnCreate; + /// + /// 对象更新事件 + /// + public Action> OnUpdate; + /// + /// 对象被删除 + /// + public Action OnDelete; + /// + /// 有新的满足条件的对象产生 + /// + public Action> OnEnter; + /// + /// 不再满足条件 + /// + public Action> OnLeave; + /// + /// 当一个用户登录成功 + /// + public Action OnLogin; + + public string Id { + get; private set; + } + + public LCQuery Query { + get; internal set; + } + + private static LCLiveQueryConnection connection; + + private static Dictionary> liveQueries = new Dictionary>(); + + internal LCLiveQuery() { + + } + + private static readonly string DeviceId = Guid.NewGuid().ToString(); + + /// + /// 订阅 + /// + /// + public async Task Subscribe() { + // TODO 判断当前连接情况 + if (connection == null) { + connection = new LCLiveQueryConnection(DeviceId) { + OnReconnected = OnReconnected, + OnNotification = OnNotification + }; + await connection.Connect(); + await Login(); + } + Dictionary queryData = new Dictionary { + { "className", Query.ClassName }, + { "where", Query.Condition.Encode() } + }; + Dictionary data = new Dictionary { + { "query", queryData }, + { "id", DeviceId }, + { "clientTimestamp", DateTimeOffset.Now.ToUnixTimeMilliseconds() } + }; + LCUser user = await LCUser.GetCurrent(); + if (user != null && !string.IsNullOrEmpty(user.SessionToken)) { + data.Add("sessionToken", user.SessionToken); + } + string path = "LiveQuery/subscribe"; + Dictionary result = await LCApplication.HttpClient.Post>(path, + data: data); + if (result.TryGetValue("query_id", out object id)) { + Id = id as string; + WeakReference weakRef = new WeakReference(this); + liveQueries[Id] = weakRef; + } + } + + /// + /// 取消订阅 + /// + /// + public async Task Unsubscribe() { + Dictionary data = new Dictionary { + { "id", DeviceId }, + { "query_id", Id } + }; + string path = "LiveQuery/unsubscribe"; + await LCApplication.HttpClient.Post>(path, + data: data); + // 移除 + liveQueries.Remove(Id); + } + + private static async Task Login() { + Dictionary data = new Dictionary { + { "cmd", "login" }, + { "appId", LCApplication.AppId }, + { "installationId", DeviceId }, + { "clientTs", DateTimeOffset.Now.ToUnixTimeMilliseconds() }, + { "service", 1 } + }; + await connection.SendRequest(data); + } + + private static async void OnReconnected() { + await Login(); + Dictionary> oldLiveQueries = liveQueries; + liveQueries = new Dictionary>(); + foreach (WeakReference weakRef in oldLiveQueries.Values) { + if (weakRef.TryGetTarget(out LCLiveQuery liveQuery)) { + await liveQuery.Subscribe(); + } + } + } + + private static void OnNotification(Dictionary notification) { + if (!notification.TryGetValue("cmd", out object cmd) || + !"data".Equals(cmd)) { + return; + } + if (!notification.TryGetValue("msg", out object msg) || + !(msg is IEnumerable list)) { + return; + } + + foreach (object item in list) { + if (item is Dictionary dict) { + if (!dict.TryGetValue("op", out object op)) { + continue; + } + switch (op as string) { + case "create": + OnCreateNotification(dict); + break; + case "update": + OnUpdateNotification(dict); + break; + case "enter": + OnEnterNotification(dict); + break; + case "leave": + OnLeaveNotification(dict); + break; + case "delete": + OnDeleteNotification(dict); + break; + case "login": + OnLoginNotification(dict); + break; + default: + LCLogger.Debug($"Not support: {op}"); + break; + } + } + } + } + + private static void OnCreateNotification(Dictionary data) { + if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) && + TryGetObject(data, out LCObject obj)) { + liveQuery.OnCreate?.Invoke(obj); + } + } + + private static void OnUpdateNotification(Dictionary data) { + if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) && + TryGetObject(data, out LCObject obj) && + TryGetUpdatedKeys(data, out ReadOnlyCollection keys)) { + liveQuery.OnUpdate?.Invoke(obj, keys); + } + } + + private static void OnEnterNotification(Dictionary data) { + if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) && + TryGetObject(data, out LCObject obj) && + TryGetUpdatedKeys(data, out ReadOnlyCollection keys)) { + liveQuery.OnEnter?.Invoke(obj, keys); + } + } + + private static void OnLeaveNotification(Dictionary data) { + if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) && + TryGetObject(data, out LCObject obj) && + TryGetUpdatedKeys(data, out ReadOnlyCollection keys)) { + liveQuery.OnLeave?.Invoke(obj, keys); + } + } + + private static void OnDeleteNotification(Dictionary data) { + if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) && + TryGetObject(data, out LCObject obj)) { + liveQuery.OnDelete?.Invoke(obj.ObjectId); + } + } + + private static void OnLoginNotification(Dictionary data) { + if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) && + data.TryGetValue("object", out object obj) && + obj is Dictionary dict) { + LCObjectData objectData = LCObjectData.Decode(dict); + LCUser user = new LCUser(objectData); + liveQuery.OnLogin?.Invoke(user); + } + } + + private static bool TryGetLiveQuery(Dictionary data, out LCLiveQuery liveQuery) { + if (!data.TryGetValue("query_id", out object i) || + !(i is string id)) { + liveQuery = null; + return false; + } + + if (!liveQueries.TryGetValue(id, out WeakReference weakRef) || + !weakRef.TryGetTarget(out LCLiveQuery lq)) { + liveQuery = null; + return false; + } + + liveQuery = lq; + return true; + } + + private static bool TryGetObject(Dictionary data, out LCObject obj) { + if (!data.TryGetValue("object", out object o) || + !(o is Dictionary dict)) { + obj = null; + return false; + } + + LCObjectData objectData = LCObjectData.Decode(dict); + obj = LCObject.Create(dict["className"] as string); + obj.Merge(objectData); + return true; + } + + private static bool TryGetUpdatedKeys(Dictionary data, out ReadOnlyCollection keys) { + if (!data.TryGetValue("updatedKeys", out object uks) || + !(uks is List list)) { + keys = null; + return false; + } + + keys = list.Cast().ToList() + .AsReadOnly(); + return true; + } + } +}