chore: 支持 command 节流

oneRain 2020-06-24 17:45:27 +08:00
parent 5ffe1de095
commit 005752e748
1 changed files with 32 additions and 0 deletions

View File

@ -60,6 +60,7 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// 请求回调缓存 /// 请求回调缓存
/// </summary> /// </summary>
private readonly Dictionary<int, TaskCompletionSource<GenericCommand>> responses; private readonly Dictionary<int, TaskCompletionSource<GenericCommand>> responses;
private readonly List<GenericCommand> sendingRequests;
private int requestI = 1; private int requestI = 1;
@ -79,6 +80,7 @@ namespace LeanCloud.Realtime.Internal.Connection {
internal LCConnection(string id) { internal LCConnection(string id) {
this.id = id; this.id = id;
responses = new Dictionary<int, TaskCompletionSource<GenericCommand>>(); responses = new Dictionary<int, TaskCompletionSource<GenericCommand>>();
sendingRequests = new List<GenericCommand>();
heartBeat = new LCHeartBeat(this, OnDisconnect); heartBeat = new LCHeartBeat(this, OnDisconnect);
router = new LCRTMRouter(); router = new LCRTMRouter();
ws = new LCWebSocketClient { ws = new LCWebSocketClient {
@ -127,6 +129,22 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// <param name="request"></param> /// <param name="request"></param>
/// <returns></returns> /// <returns></returns>
internal async Task<GenericCommand> SendRequest(GenericCommand request) { internal async Task<GenericCommand> SendRequest(GenericCommand request) {
if (IsIdempotentCommand(request)) {
GenericCommand sendingReq = sendingRequests.Find(item => {
// TRICK 除了 I 其他字段相等
request.I = item.I;
return Equals(request, item);
});
if (sendingReq == null) {
sendingRequests.Add(request);
} else {
LCLogger.Warn("duplicated request");
if (responses.TryGetValue(sendingReq.I, out TaskCompletionSource<GenericCommand> waitingTcs)) {
return await waitingTcs.Task;
}
}
}
TaskCompletionSource<GenericCommand> tcs = new TaskCompletionSource<GenericCommand>(); TaskCompletionSource<GenericCommand> tcs = new TaskCompletionSource<GenericCommand>();
request.I = requestI++; request.I = requestI++;
responses.Add(request.I, tcs); responses.Add(request.I, tcs);
@ -187,6 +205,9 @@ namespace LeanCloud.Realtime.Internal.Connection {
LCException exception = new LCException(code, detail); LCException exception = new LCException(code, detail);
tcs.TrySetException(exception); tcs.TrySetException(exception);
} else { } else {
sendingRequests.RemoveAll(item => {
return item.I == command.I;
});
tcs.TrySetResult(command); tcs.TrySetResult(command);
} }
responses.Remove(requestIndex); responses.Remove(requestIndex);
@ -303,5 +324,16 @@ namespace LeanCloud.Realtime.Internal.Connection {
internal void Resume() { internal void Resume() {
_ = Reconnect(); _ = Reconnect();
} }
private static bool IsIdempotentCommand(GenericCommand command) {
return !(
command.Cmd == CommandType.Direct ||
(command.Cmd == CommandType.Session && command.Op == OpType.Open) ||
(command.Cmd == CommandType.Conv &&
(command.Op == OpType.Start ||
command.Op == OpType.Update ||
command.Op == OpType.Members))
);
}
} }
} }