Merge pull request #79 from onerain88/throttle

Throttle
oneRain 2020-06-28 14:59:36 +08:00 committed by GitHub
commit 6db2980d39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 115 additions and 5 deletions

View File

@ -0,0 +1,71 @@
using NUnit.Framework;
using System;
using System.Threading.Tasks;
using LeanCloud;
using LeanCloud.Realtime;
using LeanCloud.Realtime.Internal.Protocol;
namespace Realtime.Test {
public class Throttle {
private LCIMClient c1;
private LCIMClient c2;
private LCIMConversation conversation;
[SetUp]
public async Task SetUp() {
LCLogger.LogDelegate += Utils.Print;
LCApplication.Initialize("ikGGdRE2YcVOemAaRbgp1xGJ-gzGzoHsz", "NUKmuRbdAhg1vrb2wexYo1jo", "https://ikggdre2.lc-cn-n1-shared.com");
c1 = new LCIMClient(Guid.NewGuid().ToString());
c2 = new LCIMClient(Guid.NewGuid().ToString());
await c1.Open();
await c2.Open();
conversation = await c1.CreateConversation(new string[] { Guid.NewGuid().ToString() });
}
[TearDown]
public async Task TearDown() {
await c1.Close();
await c2.Close();
LCLogger.LogDelegate -= Utils.Print;
}
[Test]
public void Equality() {
GenericCommand cmd1 = new GenericCommand {
Cmd = CommandType.Session,
Op = OpType.Open,
PeerId = "hello",
I = 1,
SessionMessage = new SessionCommand {
Code = 123
}
};
GenericCommand cmd2 = new GenericCommand {
Cmd = CommandType.Session,
Op = OpType.Open,
PeerId = "hello",
I = 2,
SessionMessage = new SessionCommand {
Code = 123
}
};
Assert.IsFalse(Equals(cmd1, cmd2));
cmd2.I = cmd1.I;
Assert.IsTrue(Equals(cmd1, cmd2));
}
[Test]
public async Task RemoveMemberTwice() {
Task t1 = conversation.RemoveMembers(new string[] { c2.Id }).ContinueWith(t => {
Assert.IsTrue(t.IsCompleted && !t.IsFaulted);
});
Task t2 = conversation.RemoveMembers(new string[] { c2.Id }).ContinueWith(t => {
Assert.IsTrue(t.IsCompleted && !t.IsFaulted);
});
await Task.WhenAll(t1, t2);
}
}
}

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Linq;
using System.Text; using System.Text;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -12,6 +13,17 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// 连接层,只与数据协议相关 /// 连接层,只与数据协议相关
/// </summary> /// </summary>
public class LCConnection { public class LCConnection {
// 请求/应答比对,即 I 相等
class RequestAndResponseComparer : IEqualityComparer<GenericCommand> {
public bool Equals(GenericCommand x, GenericCommand y) {
return true;
}
public int GetHashCode(GenericCommand obj) {
return obj.I;
}
}
/// <summary> /// <summary>
/// 连接状态 /// 连接状态
/// </summary> /// </summary>
@ -59,7 +71,7 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// <summary> /// <summary>
/// 请求回调缓存 /// 请求回调缓存
/// </summary> /// </summary>
private readonly Dictionary<int, TaskCompletionSource<GenericCommand>> responses; private readonly Dictionary<GenericCommand, TaskCompletionSource<GenericCommand>> requestToResponses;
private int requestI = 1; private int requestI = 1;
@ -78,7 +90,8 @@ namespace LeanCloud.Realtime.Internal.Connection {
internal LCConnection(string id) { internal LCConnection(string id) {
this.id = id; this.id = id;
responses = new Dictionary<int, TaskCompletionSource<GenericCommand>>(); requestToResponses = new Dictionary<GenericCommand, TaskCompletionSource<GenericCommand>>(new RequestAndResponseComparer());
heartBeat = new LCHeartBeat(this, OnDisconnect); heartBeat = new LCHeartBeat(this, OnDisconnect);
router = new LCRTMRouter(); router = new LCRTMRouter();
ws = new LCWebSocketClient { ws = new LCWebSocketClient {
@ -127,9 +140,24 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// <param name="request"></param> /// <param name="request"></param>
/// <returns></returns> /// <returns></returns>
internal async Task<GenericCommand> SendRequest(GenericCommand request) { internal async Task<GenericCommand> SendRequest(GenericCommand request) {
if (IsIdempotentCommand(request)) {
GenericCommand sendingReq = requestToResponses.Keys.FirstOrDefault(item => {
// TRICK 除了 I 其他字段相等
request.I = item.I;
return Equals(request, item);
});
if (sendingReq != null) {
LCLogger.Warn("duplicated request");
if (requestToResponses.TryGetValue(sendingReq, out TaskCompletionSource<GenericCommand> waitingTcs)) {
return await waitingTcs.Task;
}
LCLogger.Error($"error request: {request}");
}
}
TaskCompletionSource<GenericCommand> tcs = new TaskCompletionSource<GenericCommand>(); TaskCompletionSource<GenericCommand> tcs = new TaskCompletionSource<GenericCommand>();
request.I = requestI++; request.I = requestI++;
responses.Add(request.I, tcs); requestToResponses.Add(request, tcs);
try { try {
await SendCommand(request); await SendCommand(request);
} catch (Exception e) { } catch (Exception e) {
@ -177,7 +205,7 @@ namespace LeanCloud.Realtime.Internal.Connection {
if (command.HasI) { if (command.HasI) {
// 应答 // 应答
int requestIndex = command.I; int requestIndex = command.I;
if (responses.TryGetValue(requestIndex, out TaskCompletionSource<GenericCommand> tcs)) { if (requestToResponses.TryGetValue(command, out TaskCompletionSource<GenericCommand> tcs)) {
if (command.HasErrorMessage) { if (command.HasErrorMessage) {
// 错误 // 错误
ErrorCommand error = command.ErrorMessage; ErrorCommand error = command.ErrorMessage;
@ -189,7 +217,7 @@ namespace LeanCloud.Realtime.Internal.Connection {
} else { } else {
tcs.TrySetResult(command); tcs.TrySetResult(command);
} }
responses.Remove(requestIndex); requestToResponses.Remove(command);
} else { } else {
LCLogger.Error($"No request for {requestIndex}"); LCLogger.Error($"No request for {requestIndex}");
} }
@ -303,5 +331,16 @@ namespace LeanCloud.Realtime.Internal.Connection {
internal void Resume() { internal void Resume() {
_ = Reconnect(); _ = Reconnect();
} }
private static bool IsIdempotentCommand(GenericCommand command) {
return !(
command.Cmd == CommandType.Direct ||
(command.Cmd == CommandType.Session && command.Op == OpType.Open) ||
(command.Cmd == CommandType.Conv &&
(command.Op == OpType.Start ||
command.Op == OpType.Update ||
command.Op == OpType.Members))
);
}
} }
} }