oneRain 2020-05-13 17:04:23 +08:00
parent 04e8229f89
commit 51fe69d7c0
2 changed files with 495 additions and 0 deletions

View File

@ -0,0 +1,230 @@
using System;
using System.Text;
using System.Collections.Generic;
using System.Threading.Tasks;
using Newtonsoft.Json;
using LeanCloud.Realtime.Internal.Router;
using LeanCloud.Realtime.Internal.WebSocket;
using LeanCloud.Common;
using LeanCloud.Storage;
namespace LeanCloud.LiveQuery.Internal {
public class LCLiveQueryConnection {
/// <summary>
/// 发送超时
/// </summary>
private const int SEND_TIMEOUT = 10000;
/// <summary>
/// 最大重连次数,超过后重置 Router 缓存后再次尝试重连
/// </summary>
private const int MAX_RECONNECT_TIMES = 10;
/// <summary>
/// 重连间隔
/// </summary>
private const int RECONNECT_INTERVAL = 10000;
/// <summary>
/// 子协议
/// </summary>
private const string SUB_PROTOCOL = "lc.json.3";
/// <summary>
/// 通知事件
/// </summary>
internal Action<Dictionary<string, object>> OnNotification;
/// <summary>
/// 断线事件
/// </summary>
internal Action OnDisconnect;
/// <summary>
/// 重连成功事件
/// </summary>
internal Action OnReconnected;
internal string id;
/// <summary>
/// 请求回调缓存
/// </summary>
private readonly Dictionary<int, TaskCompletionSource<Dictionary<string, object>>> responses;
private int requestI = 1;
private LCRTMRouter router;
private LCLiveQueryHeartBeat heartBeat;
private LCWebSocketClient client;
public LCLiveQueryConnection(string id) {
this.id = id;
responses = new Dictionary<int, TaskCompletionSource<Dictionary<string, object>>>();
heartBeat = new LCLiveQueryHeartBeat(this);
router = new LCRTMRouter();
client = new LCWebSocketClient {
OnMessage = OnClientMessage,
OnClose = OnClientDisconnect
};
}
public async Task Connect() {
try {
LCRTMServer rtmServer = await router.GetServer();
try {
LCLogger.Debug($"Primary Server");
await client.Connect(rtmServer.Primary, SUB_PROTOCOL);
} catch (Exception e) {
LCLogger.Error(e);
LCLogger.Debug($"Secondary Server");
await client.Connect(rtmServer.Secondary, SUB_PROTOCOL);
}
} catch (Exception e) {
throw e;
}
}
/// <summary>
/// 重置连接
/// </summary>
/// <returns></returns>
internal async Task Reset() {
// 关闭就连接
await client.Close();
// 重新创建连接组件
heartBeat = new LCLiveQueryHeartBeat(this);
router = new LCRTMRouter();
client = new LCWebSocketClient {
OnMessage = OnClientMessage,
OnClose = OnClientDisconnect
};
await Reconnect();
}
/// <summary>
/// 发送请求,会在收到应答后返回
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
internal async Task<Dictionary<string, object>> SendRequest(Dictionary<string, object> request) {
TaskCompletionSource<Dictionary<string, object>> tcs = new TaskCompletionSource<Dictionary<string, object>>();
int requestIndex = requestI++;
request["i"] = requestIndex;
responses.Add(requestIndex, tcs);
try {
string json = JsonConvert.SerializeObject(request);
await SendText(json);
} catch (Exception e) {
tcs.TrySetException(e);
}
return await tcs.Task;
}
/// <summary>
/// 发送文本消息
/// </summary>
/// <param name="text"></param>
/// <returns></returns>
internal async Task SendText(string text) {
LCLogger.Debug($"{id} => {text}");
Task sendTask = client.Send(text);
if (await Task.WhenAny(sendTask, Task.Delay(SEND_TIMEOUT)) == sendTask) {
await sendTask;
} else {
throw new TimeoutException("Send request time out");
}
}
/// <summary>
/// 关闭连接
/// </summary>
/// <returns></returns>
internal async Task Close() {
OnNotification = null;
OnDisconnect = null;
OnReconnected = null;
heartBeat.Stop();
await client.Close();
}
private void OnClientMessage(byte[] bytes) {
_ = heartBeat.Refresh(OnPingTimeout);
try {
string json = Encoding.UTF8.GetString(bytes);
Dictionary<string, object> msg = JsonConvert.DeserializeObject<Dictionary<string, object>>(json,
LCJsonConverter.Default);
LCLogger.Debug($"{id} <= {json}");
if (msg.TryGetValue("i", out object i)) {
int requestIndex = Convert.ToInt32(i);
if (responses.TryGetValue(requestIndex, out TaskCompletionSource<Dictionary<string, object>> tcs)) {
if (msg.TryGetValue("error", out object error)) {
// 错误
if (error is Dictionary<string, object> dict) {
int code = Convert.ToInt32(dict["code"]);
string detail = dict["detail"] as string;
tcs.SetException(new LCException(code, detail));
} else {
tcs.SetException(new Exception(error as string));
}
} else {
tcs.SetResult(msg);
}
responses.Remove(requestIndex);
} else {
LCLogger.Error($"No request for {requestIndex}");
}
} else {
// 通知
OnNotification?.Invoke(msg);
}
} catch (Exception e) {
LCLogger.Error(e);
}
}
private void OnClientDisconnect() {
heartBeat.Stop();
OnDisconnect?.Invoke();
// 重连
_ = Reconnect();
}
private async void OnPingTimeout() {
await client.Close();
OnClientDisconnect();
}
private async Task Reconnect() {
while (true) {
int reconnectCount = 0;
// 重连策略
while (reconnectCount < MAX_RECONNECT_TIMES) {
try {
LCLogger.Debug($"Reconnecting... {reconnectCount}");
await Connect();
break;
} catch (Exception e) {
reconnectCount++;
LCLogger.Error(e);
LCLogger.Debug($"Reconnect after {RECONNECT_INTERVAL}ms");
await Task.Delay(RECONNECT_INTERVAL);
}
}
if (reconnectCount < MAX_RECONNECT_TIMES) {
// 重连成功
LCLogger.Debug("Reconnected");
client.OnMessage = OnClientMessage;
client.OnClose = OnClientDisconnect;
OnReconnected?.Invoke();
break;
} else {
// 重置 Router继续尝试重连
router = new LCRTMRouter();
}
}
}
}
}

View File

@ -0,0 +1,265 @@
using System;
using System.Linq;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using LeanCloud.Storage;
using LeanCloud.Storage.Internal.Object;
using LeanCloud.LiveQuery.Internal;
namespace LeanCloud.LiveQuery {
/// <summary>
/// LiveQuery
/// </summary>
public class LCLiveQuery {
/// <summary>
/// 新对象创建事件
/// </summary>
public Action<LCObject> OnCreate;
/// <summary>
/// 对象更新事件
/// </summary>
public Action<LCObject, ReadOnlyCollection<string>> OnUpdate;
/// <summary>
/// 对象被删除
/// </summary>
public Action<string> OnDelete;
/// <summary>
/// 有新的满足条件的对象产生
/// </summary>
public Action<LCObject, ReadOnlyCollection<string>> OnEnter;
/// <summary>
/// 不再满足条件
/// </summary>
public Action<LCObject, ReadOnlyCollection<string>> OnLeave;
/// <summary>
/// 当一个用户登录成功
/// </summary>
public Action<LCUser> OnLogin;
public string Id {
get; private set;
}
public LCQuery Query {
get; internal set;
}
private static LCLiveQueryConnection connection;
private static Dictionary<string, WeakReference<LCLiveQuery>> liveQueries = new Dictionary<string, WeakReference<LCLiveQuery>>();
internal LCLiveQuery() {
}
private static readonly string DeviceId = Guid.NewGuid().ToString();
/// <summary>
/// 订阅
/// </summary>
/// <returns></returns>
public async Task Subscribe() {
// TODO 判断当前连接情况
if (connection == null) {
connection = new LCLiveQueryConnection(DeviceId) {
OnReconnected = OnReconnected,
OnNotification = OnNotification
};
await connection.Connect();
await Login();
}
Dictionary<string, object> queryData = new Dictionary<string, object> {
{ "className", Query.ClassName },
{ "where", Query.Condition.Encode() }
};
Dictionary<string, object> data = new Dictionary<string, object> {
{ "query", queryData },
{ "id", DeviceId },
{ "clientTimestamp", DateTimeOffset.Now.ToUnixTimeMilliseconds() }
};
LCUser user = await LCUser.GetCurrent();
if (user != null && !string.IsNullOrEmpty(user.SessionToken)) {
data.Add("sessionToken", user.SessionToken);
}
string path = "LiveQuery/subscribe";
Dictionary<string, object> result = await LCApplication.HttpClient.Post<Dictionary<string, object>>(path,
data: data);
if (result.TryGetValue("query_id", out object id)) {
Id = id as string;
WeakReference<LCLiveQuery> weakRef = new WeakReference<LCLiveQuery>(this);
liveQueries[Id] = weakRef;
}
}
/// <summary>
/// 取消订阅
/// </summary>
/// <returns></returns>
public async Task Unsubscribe() {
Dictionary<string, object> data = new Dictionary<string, object> {
{ "id", DeviceId },
{ "query_id", Id }
};
string path = "LiveQuery/unsubscribe";
await LCApplication.HttpClient.Post<Dictionary<string, object>>(path,
data: data);
// 移除
liveQueries.Remove(Id);
}
private static async Task Login() {
Dictionary<string, object> data = new Dictionary<string, object> {
{ "cmd", "login" },
{ "appId", LCApplication.AppId },
{ "installationId", DeviceId },
{ "clientTs", DateTimeOffset.Now.ToUnixTimeMilliseconds() },
{ "service", 1 }
};
await connection.SendRequest(data);
}
private static async void OnReconnected() {
await Login();
Dictionary<string, WeakReference<LCLiveQuery>> oldLiveQueries = liveQueries;
liveQueries = new Dictionary<string, WeakReference<LCLiveQuery>>();
foreach (WeakReference<LCLiveQuery> weakRef in oldLiveQueries.Values) {
if (weakRef.TryGetTarget(out LCLiveQuery liveQuery)) {
await liveQuery.Subscribe();
}
}
}
private static void OnNotification(Dictionary<string, object> notification) {
if (!notification.TryGetValue("cmd", out object cmd) ||
!"data".Equals(cmd)) {
return;
}
if (!notification.TryGetValue("msg", out object msg) ||
!(msg is IEnumerable<object> list)) {
return;
}
foreach (object item in list) {
if (item is Dictionary<string, object> dict) {
if (!dict.TryGetValue("op", out object op)) {
continue;
}
switch (op as string) {
case "create":
OnCreateNotification(dict);
break;
case "update":
OnUpdateNotification(dict);
break;
case "enter":
OnEnterNotification(dict);
break;
case "leave":
OnLeaveNotification(dict);
break;
case "delete":
OnDeleteNotification(dict);
break;
case "login":
OnLoginNotification(dict);
break;
default:
LCLogger.Debug($"Not support: {op}");
break;
}
}
}
}
private static void OnCreateNotification(Dictionary<string, object> data) {
if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) &&
TryGetObject(data, out LCObject obj)) {
liveQuery.OnCreate?.Invoke(obj);
}
}
private static void OnUpdateNotification(Dictionary<string, object> data) {
if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) &&
TryGetObject(data, out LCObject obj) &&
TryGetUpdatedKeys(data, out ReadOnlyCollection<string> keys)) {
liveQuery.OnUpdate?.Invoke(obj, keys);
}
}
private static void OnEnterNotification(Dictionary<string, object> data) {
if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) &&
TryGetObject(data, out LCObject obj) &&
TryGetUpdatedKeys(data, out ReadOnlyCollection<string> keys)) {
liveQuery.OnEnter?.Invoke(obj, keys);
}
}
private static void OnLeaveNotification(Dictionary<string, object> data) {
if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) &&
TryGetObject(data, out LCObject obj) &&
TryGetUpdatedKeys(data, out ReadOnlyCollection<string> keys)) {
liveQuery.OnLeave?.Invoke(obj, keys);
}
}
private static void OnDeleteNotification(Dictionary<string, object> data) {
if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) &&
TryGetObject(data, out LCObject obj)) {
liveQuery.OnDelete?.Invoke(obj.ObjectId);
}
}
private static void OnLoginNotification(Dictionary<string, object> data) {
if (TryGetLiveQuery(data, out LCLiveQuery liveQuery) &&
data.TryGetValue("object", out object obj) &&
obj is Dictionary<string, object> dict) {
LCObjectData objectData = LCObjectData.Decode(dict);
LCUser user = new LCUser(objectData);
liveQuery.OnLogin?.Invoke(user);
}
}
private static bool TryGetLiveQuery(Dictionary<string, object> data, out LCLiveQuery liveQuery) {
if (!data.TryGetValue("query_id", out object i) ||
!(i is string id)) {
liveQuery = null;
return false;
}
if (!liveQueries.TryGetValue(id, out WeakReference<LCLiveQuery> weakRef) ||
!weakRef.TryGetTarget(out LCLiveQuery lq)) {
liveQuery = null;
return false;
}
liveQuery = lq;
return true;
}
private static bool TryGetObject(Dictionary<string, object> data, out LCObject obj) {
if (!data.TryGetValue("object", out object o) ||
!(o is Dictionary<string, object> dict)) {
obj = null;
return false;
}
LCObjectData objectData = LCObjectData.Decode(dict);
obj = LCObject.Create(dict["className"] as string);
obj.Merge(objectData);
return true;
}
private static bool TryGetUpdatedKeys(Dictionary<string, object> data, out ReadOnlyCollection<string> keys) {
if (!data.TryGetValue("updatedKeys", out object uks) ||
!(uks is List<object> list)) {
keys = null;
return false;
}
keys = list.Cast<string>().ToList()
.AsReadOnly();
return true;
}
}
}