using System; using System.Threading; using System.Threading.Tasks; using System.Collections; using System.Collections.Generic; using LeanCloud.Storage.Internal; using LeanCloud.Realtime; using LeanCloud.Realtime.Internal; using System.Linq; using System.Linq.Expressions; namespace LeanCloud.LiveQuery { /// /// AVLiveQuery 类 /// public static class AVLiveQuery { /// /// LiveQuery 传输数据的 AVRealtime 实例 /// public static AVRealtime Channel { get; set; } internal static long ClientTs { get; set; } internal static bool Inited { get; set; } internal static string InstallationId { get; set; } } /// /// AVLiveQuery 对象 /// /// public class AVLiveQuery where T : AVObject { internal static Dictionary>> liveQueryDict = new Dictionary>>(); /// /// 当前 AVLiveQuery 对象的 Id /// public string Id { get; set; } /// /// 根据 AVQuery 创建 AVLiveQuery 对象 /// /// public AVLiveQuery(AVQuery query) { this.Query = query; } /// /// AVLiveQuery 对应的 AVQuery 对象 /// public AVQuery Query { get; set; } /// /// 数据推送的触发的事件通知 /// public event EventHandler> OnLiveQueryReceived; /// /// 推送抵达时触发事件通知 /// /// 产生这条推送的原因。 /// /// create:符合查询条件的对象创建; /// update:符合查询条件的对象属性修改。 /// enter:对象修改事件,从不符合查询条件变成符合。 /// leave:对象修改时间,从符合查询条件变成不符合。 /// delete:对象删除 /// login:只对 _User 对象有效,表示用户登录。 /// /// /// public void On(string scope, Action onRecevived) { this.OnLiveQueryReceived += (sender, e) => { if (e.Scope == scope) { onRecevived.Invoke(e.Payload); } }; } /// /// 订阅操作 /// /// /// public async Task> SubscribeAsync(CancellationToken cancellationToken = default(CancellationToken)) { if (Query == null) { throw new Exception("Query can not be null when subcribe."); } if (!AVLiveQuery.Inited) { await Login(); AVLiveQuery.Channel.OnReconnected += OnChannelReconnected; AVLiveQuery.Channel.NoticeReceived += OnChannelNoticeReceived; AVLiveQuery.Inited = true; } await InternalSubscribe(); var liveQueryRef = new WeakReference>(this); liveQueryDict.Add(Id, liveQueryRef); return this; } static async void OnChannelReconnected(object sender, AVIMReconnectedEventArgs e) { await Login(); lock (liveQueryDict) { foreach (var kv in liveQueryDict) { if (kv.Value.TryGetTarget(out var liveQuery)) { liveQuery.InternalSubscribe().ContinueWith(_ => { }); } } } } static async Task Login() { var installation = await AVPlugins.Instance.InstallationIdController.GetAsync(); AVLiveQuery.InstallationId = installation.ToString(); AVLiveQuery.Channel.ToggleNotification(true); await AVLiveQuery.Channel.OpenAsync(); AVLiveQuery.ClientTs = (long) DateTime.UtcNow.Subtract(new DateTime(1970, 1, 1)).TotalSeconds; var liveQueryLogInCmd = new AVIMCommand().Command("login") .Argument("installationId", AVLiveQuery.InstallationId) .Argument("clientTs", AVLiveQuery.ClientTs) .Argument("service", 1).AppId(AVClient.CurrentConfiguration.ApplicationId); // open the session for LiveQuery. try { await AVLiveQuery.Channel.AVIMCommandRunner.RunCommandAsync(liveQueryLogInCmd); } catch (Exception e) { AVRealtime.PrintLog(e.Message); } } static void OnChannelNoticeReceived(object sender, AVIMNotice e) { if (e.CommandName == "data") { var ids = AVDecoder.Instance.DecodeList(e.RawData["ids"]); if (e.RawData["msg"] is IEnumerable msg) { var receivedPayloads = from item in msg select item as Dictionary; if (receivedPayloads != null) { foreach (var payload in receivedPayloads) { var liveQueryId = payload["query_id"] as string; if (liveQueryDict.TryGetValue(liveQueryId, out var liveQueryRef) && liveQueryRef.TryGetTarget(out var liveQuery)) { var scope = payload["op"] as string; var objectPayload = payload["object"] as Dictionary; string[] keys = null; if (payload.TryGetValue("updatedKeys", out object updatedKeys)) { // enter, leave, update keys = (updatedKeys as List).Select(x => x.ToString()).ToArray(); } liveQuery.Emit(scope, objectPayload, keys); } } } } } } async Task InternalSubscribe() { var queryMap = new Dictionary { { "where", Query.Condition}, { "className", Query.GetClassName()} }; Dictionary data = new Dictionary { { "query", queryMap }, { "id", AVLiveQuery.InstallationId }, { "clientTimestamp", AVLiveQuery.ClientTs } }; string sessionToken = AVUser.CurrentUser?.SessionToken; if (!string.IsNullOrEmpty(sessionToken)) { data.Add("sessionToken", sessionToken); } var command = new AVCommand("LiveQuery/subscribe", "POST", sessionToken, data: data); var res = await AVPlugins.Instance.CommandRunner.RunCommandAsync(command); Id = res.Item2["query_id"] as string; } /// /// 取消对当前 LiveQuery 对象的订阅 /// /// public async Task UnsubscribeAsync() { Dictionary strs = new Dictionary { { "id", AVLiveQuery.InstallationId }, { "query_id", Id }, }; string sessionToken = AVUser.CurrentUser?.SessionToken; var command = new AVCommand("LiveQuery/unsubscribe", "POST", sessionToken, data: strs); await AVPlugins.Instance.CommandRunner.RunCommandAsync(command); lock (liveQueryDict) { liveQueryDict.Remove(Id); } } void Emit(string scope, IDictionary payloadMap, string[] keys) { var objectState = AVObjectCoder.Instance.Decode(payloadMap, AVDecoder.Instance); var payloadObject = AVObject.FromState(objectState, Query.GetClassName()); var args = new AVLiveQueryEventArgs { Scope = scope, Keys = keys, Payload = payloadObject }; OnLiveQueryReceived?.Invoke(this, args); } } }