diff --git a/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs b/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs
new file mode 100644
index 0000000..7203eac
--- /dev/null
+++ b/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs
@@ -0,0 +1,230 @@
+using System;
+using System.Text;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Newtonsoft.Json;
+using LeanCloud.Realtime.Internal.Router;
+using LeanCloud.Realtime.Internal.WebSocket;
+using LeanCloud.Common;
+using LeanCloud.Storage;
+
+namespace LeanCloud.LiveQuery.Internal {
+ public class LCLiveQueryConnection {
+ ///
+ /// 发送超时
+ ///
+ private const int SEND_TIMEOUT = 10000;
+
+ ///
+ /// 最大重连次数,超过后重置 Router 缓存后再次尝试重连
+ ///
+ private const int MAX_RECONNECT_TIMES = 10;
+
+ ///
+ /// 重连间隔
+ ///
+ private const int RECONNECT_INTERVAL = 10000;
+
+ ///
+ /// 子协议
+ ///
+ private const string SUB_PROTOCOL = "lc.json.3";
+
+ ///
+ /// 通知事件
+ ///
+ internal Action> OnNotification;
+
+ ///
+ /// 断线事件
+ ///
+ internal Action OnDisconnect;
+
+ ///
+ /// 重连成功事件
+ ///
+ internal Action OnReconnected;
+
+ internal string id;
+
+ ///
+ /// 请求回调缓存
+ ///
+ private readonly Dictionary>> responses;
+
+ private int requestI = 1;
+
+ private LCRTMRouter router;
+
+ private LCLiveQueryHeartBeat heartBeat;
+
+ private LCWebSocketClient client;
+
+ public LCLiveQueryConnection(string id) {
+ this.id = id;
+ responses = new Dictionary>>();
+ heartBeat = new LCLiveQueryHeartBeat(this);
+ router = new LCRTMRouter();
+ client = new LCWebSocketClient {
+ OnMessage = OnClientMessage,
+ OnClose = OnClientDisconnect
+ };
+ }
+
+ public async Task Connect() {
+ try {
+ LCRTMServer rtmServer = await router.GetServer();
+ try {
+ LCLogger.Debug($"Primary Server");
+ await client.Connect(rtmServer.Primary, SUB_PROTOCOL);
+ } catch (Exception e) {
+ LCLogger.Error(e);
+ LCLogger.Debug($"Secondary Server");
+ await client.Connect(rtmServer.Secondary, SUB_PROTOCOL);
+ }
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ ///
+ /// 重置连接
+ ///
+ ///
+ internal async Task Reset() {
+ // 关闭就连接
+ await client.Close();
+ // 重新创建连接组件
+ heartBeat = new LCLiveQueryHeartBeat(this);
+ router = new LCRTMRouter();
+ client = new LCWebSocketClient {
+ OnMessage = OnClientMessage,
+ OnClose = OnClientDisconnect
+ };
+ await Reconnect();
+ }
+
+ ///
+ /// 发送请求,会在收到应答后返回
+ ///
+ ///
+ ///
+ internal async Task> SendRequest(Dictionary request) {
+ TaskCompletionSource> tcs = new TaskCompletionSource>();
+ int requestIndex = requestI++;
+ request["i"] = requestIndex;
+ responses.Add(requestIndex, tcs);
+ try {
+ string json = JsonConvert.SerializeObject(request);
+ await SendText(json);
+ } catch (Exception e) {
+ tcs.TrySetException(e);
+ }
+ return await tcs.Task;
+ }
+
+ ///
+ /// 发送文本消息
+ ///
+ ///
+ ///
+ internal async Task SendText(string text) {
+ LCLogger.Debug($"{id} => {text}");
+ Task sendTask = client.Send(text);
+ if (await Task.WhenAny(sendTask, Task.Delay(SEND_TIMEOUT)) == sendTask) {
+ await sendTask;
+ } else {
+ throw new TimeoutException("Send request time out");
+ }
+ }
+
+ ///
+ /// 关闭连接
+ ///
+ ///
+ internal async Task Close() {
+ OnNotification = null;
+ OnDisconnect = null;
+ OnReconnected = null;
+ heartBeat.Stop();
+ await client.Close();
+ }
+
+ private void OnClientMessage(byte[] bytes) {
+ _ = heartBeat.Refresh(OnPingTimeout);
+ try {
+ string json = Encoding.UTF8.GetString(bytes);
+ Dictionary msg = JsonConvert.DeserializeObject>(json,
+ LCJsonConverter.Default);
+ LCLogger.Debug($"{id} <= {json}");
+ if (msg.TryGetValue("i", out object i)) {
+ int requestIndex = Convert.ToInt32(i);
+ if (responses.TryGetValue(requestIndex, out TaskCompletionSource> tcs)) {
+ if (msg.TryGetValue("error", out object error)) {
+ // 错误
+ if (error is Dictionary dict) {
+ int code = Convert.ToInt32(dict["code"]);
+ string detail = dict["detail"] as string;
+ tcs.SetException(new LCException(code, detail));
+ } else {
+ tcs.SetException(new Exception(error as string));
+ }
+ } else {
+ tcs.SetResult(msg);
+ }
+ responses.Remove(requestIndex);
+ } else {
+ LCLogger.Error($"No request for {requestIndex}");
+ }
+ } else {
+ // 通知
+ OnNotification?.Invoke(msg);
+ }
+ } catch (Exception e) {
+ LCLogger.Error(e);
+ }
+ }
+
+ private void OnClientDisconnect() {
+ heartBeat.Stop();
+ OnDisconnect?.Invoke();
+ // 重连
+ _ = Reconnect();
+ }
+
+ private async void OnPingTimeout() {
+ await client.Close();
+ OnClientDisconnect();
+ }
+
+ private async Task Reconnect() {
+ while (true) {
+ int reconnectCount = 0;
+ // 重连策略
+ while (reconnectCount < MAX_RECONNECT_TIMES) {
+ try {
+ LCLogger.Debug($"Reconnecting... {reconnectCount}");
+ await Connect();
+ break;
+ } catch (Exception e) {
+ reconnectCount++;
+ LCLogger.Error(e);
+ LCLogger.Debug($"Reconnect after {RECONNECT_INTERVAL}ms");
+ await Task.Delay(RECONNECT_INTERVAL);
+ }
+ }
+ if (reconnectCount < MAX_RECONNECT_TIMES) {
+ // 重连成功
+ LCLogger.Debug("Reconnected");
+ client.OnMessage = OnClientMessage;
+ client.OnClose = OnClientDisconnect;
+ OnReconnected?.Invoke();
+ break;
+ } else {
+ // 重置 Router,继续尝试重连
+ router = new LCRTMRouter();
+ }
+ }
+ }
+ }
+}
diff --git a/LiveQuery/LiveQuery/LCLiveQuery.cs b/LiveQuery/LiveQuery/LCLiveQuery.cs
new file mode 100644
index 0000000..c6f96a6
--- /dev/null
+++ b/LiveQuery/LiveQuery/LCLiveQuery.cs
@@ -0,0 +1,265 @@
+using System;
+using System.Linq;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Threading.Tasks;
+using LeanCloud.Storage;
+using LeanCloud.Storage.Internal.Object;
+using LeanCloud.LiveQuery.Internal;
+
+namespace LeanCloud.LiveQuery {
+ ///
+ /// LiveQuery
+ ///
+ public class LCLiveQuery {
+ ///
+ /// 新对象创建事件
+ ///
+ public Action OnCreate;
+ ///
+ /// 对象更新事件
+ ///
+ public Action> OnUpdate;
+ ///
+ /// 对象被删除
+ ///
+ public Action OnDelete;
+ ///
+ /// 有新的满足条件的对象产生
+ ///
+ public Action> OnEnter;
+ ///
+ /// 不再满足条件
+ ///
+ public Action> OnLeave;
+ ///
+ /// 当一个用户登录成功
+ ///
+ public Action OnLogin;
+
+ public string Id {
+ get; private set;
+ }
+
+ public LCQuery Query {
+ get; internal set;
+ }
+
+ private static LCLiveQueryConnection connection;
+
+ private static Dictionary> liveQueries = new Dictionary>();
+
+ internal LCLiveQuery() {
+
+ }
+
+ private static readonly string DeviceId = Guid.NewGuid().ToString();
+
+ ///
+ /// 订阅
+ ///
+ ///
+ public async Task Subscribe() {
+ // TODO 判断当前连接情况
+ if (connection == null) {
+ connection = new LCLiveQueryConnection(DeviceId) {
+ OnReconnected = OnReconnected,
+ OnNotification = OnNotification
+ };
+ await connection.Connect();
+ await Login();
+ }
+ Dictionary queryData = new Dictionary {
+ { "className", Query.ClassName },
+ { "where", Query.Condition.Encode() }
+ };
+ Dictionary data = new Dictionary {
+ { "query", queryData },
+ { "id", DeviceId },
+ { "clientTimestamp", DateTimeOffset.Now.ToUnixTimeMilliseconds() }
+ };
+ LCUser user = await LCUser.GetCurrent();
+ if (user != null && !string.IsNullOrEmpty(user.SessionToken)) {
+ data.Add("sessionToken", user.SessionToken);
+ }
+ string path = "LiveQuery/subscribe";
+ Dictionary result = await LCApplication.HttpClient.Post>(path,
+ data: data);
+ if (result.TryGetValue("query_id", out object id)) {
+ Id = id as string;
+ WeakReference weakRef = new WeakReference(this);
+ liveQueries[Id] = weakRef;
+ }
+ }
+
+ ///
+ /// 取消订阅
+ ///
+ ///
+ public async Task Unsubscribe() {
+ Dictionary data = new Dictionary {
+ { "id", DeviceId },
+ { "query_id", Id }
+ };
+ string path = "LiveQuery/unsubscribe";
+ await LCApplication.HttpClient.Post>(path,
+ data: data);
+ // 移除
+ liveQueries.Remove(Id);
+ }
+
+ private static async Task Login() {
+ Dictionary data = new Dictionary {
+ { "cmd", "login" },
+ { "appId", LCApplication.AppId },
+ { "installationId", DeviceId },
+ { "clientTs", DateTimeOffset.Now.ToUnixTimeMilliseconds() },
+ { "service", 1 }
+ };
+ await connection.SendRequest(data);
+ }
+
+ private static async void OnReconnected() {
+ await Login();
+ Dictionary> oldLiveQueries = liveQueries;
+ liveQueries = new Dictionary>();
+ foreach (WeakReference weakRef in oldLiveQueries.Values) {
+ if (weakRef.TryGetTarget(out LCLiveQuery liveQuery)) {
+ await liveQuery.Subscribe();
+ }
+ }
+ }
+
+ private static void OnNotification(Dictionary notification) {
+ if (!notification.TryGetValue("cmd", out object cmd) ||
+ !"data".Equals(cmd)) {
+ return;
+ }
+ if (!notification.TryGetValue("msg", out object msg) ||
+ !(msg is IEnumerable