diff --git a/Realtime/Realtime.Test/Throttle.cs b/Realtime/Realtime.Test/Throttle.cs new file mode 100644 index 0000000..b0f5efa --- /dev/null +++ b/Realtime/Realtime.Test/Throttle.cs @@ -0,0 +1,71 @@ +using NUnit.Framework; +using System; +using System.Threading.Tasks; +using LeanCloud; +using LeanCloud.Realtime; +using LeanCloud.Realtime.Internal.Protocol; + +namespace Realtime.Test { + public class Throttle { + private LCIMClient c1; + private LCIMClient c2; + + private LCIMConversation conversation; + + [SetUp] + public async Task SetUp() { + LCLogger.LogDelegate += Utils.Print; + LCApplication.Initialize("ikGGdRE2YcVOemAaRbgp1xGJ-gzGzoHsz", "NUKmuRbdAhg1vrb2wexYo1jo", "https://ikggdre2.lc-cn-n1-shared.com"); + c1 = new LCIMClient(Guid.NewGuid().ToString()); + c2 = new LCIMClient(Guid.NewGuid().ToString()); + await c1.Open(); + await c2.Open(); + + conversation = await c1.CreateConversation(new string[] { Guid.NewGuid().ToString() }); + } + + [TearDown] + public async Task TearDown() { + await c1.Close(); + await c2.Close(); + LCLogger.LogDelegate -= Utils.Print; + } + + [Test] + public void Equality() { + GenericCommand cmd1 = new GenericCommand { + Cmd = CommandType.Session, + Op = OpType.Open, + PeerId = "hello", + I = 1, + SessionMessage = new SessionCommand { + Code = 123 + } + }; + GenericCommand cmd2 = new GenericCommand { + Cmd = CommandType.Session, + Op = OpType.Open, + PeerId = "hello", + I = 2, + SessionMessage = new SessionCommand { + Code = 123 + } + }; + Assert.IsFalse(Equals(cmd1, cmd2)); + cmd2.I = cmd1.I; + Assert.IsTrue(Equals(cmd1, cmd2)); + } + + [Test] + public async Task RemoveMemberTwice() { + Task t1 = conversation.RemoveMembers(new string[] { c2.Id }).ContinueWith(t => { + Assert.IsTrue(t.IsCompleted && !t.IsFaulted); + }); + Task t2 = conversation.RemoveMembers(new string[] { c2.Id }).ContinueWith(t => { + Assert.IsTrue(t.IsCompleted && !t.IsFaulted); + }); + + await Task.WhenAll(t1, t2); + } + } +} diff --git a/Realtime/Realtime/Internal/Connection/LCConnection.cs b/Realtime/Realtime/Internal/Connection/LCConnection.cs index 3b797ac..1e7732a 100644 --- a/Realtime/Realtime/Internal/Connection/LCConnection.cs +++ b/Realtime/Realtime/Internal/Connection/LCConnection.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Text; using System.Collections.Generic; using System.Threading.Tasks; @@ -12,6 +13,17 @@ namespace LeanCloud.Realtime.Internal.Connection { /// 连接层,只与数据协议相关 /// public class LCConnection { + // 请求/应答比对,即 I 相等 + class RequestAndResponseComparer : IEqualityComparer { + public bool Equals(GenericCommand x, GenericCommand y) { + return true; + } + + public int GetHashCode(GenericCommand obj) { + return obj.I; + } + } + /// /// 连接状态 /// @@ -59,7 +71,7 @@ namespace LeanCloud.Realtime.Internal.Connection { /// /// 请求回调缓存 /// - private readonly Dictionary> responses; + private readonly Dictionary> requestToResponses; private int requestI = 1; @@ -78,7 +90,8 @@ namespace LeanCloud.Realtime.Internal.Connection { internal LCConnection(string id) { this.id = id; - responses = new Dictionary>(); + requestToResponses = new Dictionary>(new RequestAndResponseComparer()); + heartBeat = new LCHeartBeat(this, OnDisconnect); router = new LCRTMRouter(); ws = new LCWebSocketClient { @@ -127,9 +140,24 @@ namespace LeanCloud.Realtime.Internal.Connection { /// /// internal async Task SendRequest(GenericCommand request) { + if (IsIdempotentCommand(request)) { + GenericCommand sendingReq = requestToResponses.Keys.FirstOrDefault(item => { + // TRICK 除了 I 其他字段相等 + request.I = item.I; + return Equals(request, item); + }); + if (sendingReq != null) { + LCLogger.Warn("duplicated request"); + if (requestToResponses.TryGetValue(sendingReq, out TaskCompletionSource waitingTcs)) { + return await waitingTcs.Task; + } + LCLogger.Error($"error request: {request}"); + } + } + TaskCompletionSource tcs = new TaskCompletionSource(); request.I = requestI++; - responses.Add(request.I, tcs); + requestToResponses.Add(request, tcs); try { await SendCommand(request); } catch (Exception e) { @@ -177,7 +205,7 @@ namespace LeanCloud.Realtime.Internal.Connection { if (command.HasI) { // 应答 int requestIndex = command.I; - if (responses.TryGetValue(requestIndex, out TaskCompletionSource tcs)) { + if (requestToResponses.TryGetValue(command, out TaskCompletionSource tcs)) { if (command.HasErrorMessage) { // 错误 ErrorCommand error = command.ErrorMessage; @@ -189,7 +217,7 @@ namespace LeanCloud.Realtime.Internal.Connection { } else { tcs.TrySetResult(command); } - responses.Remove(requestIndex); + requestToResponses.Remove(command); } else { LCLogger.Error($"No request for {requestIndex}"); } @@ -303,5 +331,16 @@ namespace LeanCloud.Realtime.Internal.Connection { internal void Resume() { _ = Reconnect(); } + + private static bool IsIdempotentCommand(GenericCommand command) { + return !( + command.Cmd == CommandType.Direct || + (command.Cmd == CommandType.Session && command.Op == OpType.Open) || + (command.Cmd == CommandType.Conv && + (command.Op == OpType.Start || + command.Op == OpType.Update || + command.Op == OpType.Members)) + ); + } } }