chore: 简化缓存的请求/应答

oneRain 2020-06-28 12:30:29 +08:00
parent 4a6cf1ea09
commit 5976f5dbb3
1 changed files with 20 additions and 14 deletions

View File

@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Text;
using System.Collections.Generic;
using System.Threading.Tasks;
@ -12,6 +13,17 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// 连接层,只与数据协议相关
/// </summary>
public class LCConnection {
// 请求/应答比对,即 I 相等
class RequestAndResponseComparer : IEqualityComparer<GenericCommand> {
public bool Equals(GenericCommand x, GenericCommand y) {
return true;
}
public int GetHashCode(GenericCommand obj) {
return obj.I;
}
}
/// <summary>
/// 连接状态
/// </summary>
@ -59,8 +71,7 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// <summary>
/// 请求回调缓存
/// </summary>
private readonly Dictionary<int, TaskCompletionSource<GenericCommand>> responses;
private readonly List<GenericCommand> sendingRequests;
private readonly Dictionary<GenericCommand, TaskCompletionSource<GenericCommand>> requestToResponses;
private int requestI = 1;
@ -79,8 +90,8 @@ namespace LeanCloud.Realtime.Internal.Connection {
internal LCConnection(string id) {
this.id = id;
responses = new Dictionary<int, TaskCompletionSource<GenericCommand>>();
sendingRequests = new List<GenericCommand>();
requestToResponses = new Dictionary<GenericCommand, TaskCompletionSource<GenericCommand>>(new RequestAndResponseComparer());
heartBeat = new LCHeartBeat(this, OnDisconnect);
router = new LCRTMRouter();
ws = new LCWebSocketClient {
@ -130,25 +141,23 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// <returns></returns>
internal async Task<GenericCommand> SendRequest(GenericCommand request) {
if (IsIdempotentCommand(request)) {
GenericCommand sendingReq = sendingRequests.Find(item => {
GenericCommand sendingReq = requestToResponses.Keys.FirstOrDefault(item => {
// TRICK 除了 I 其他字段相等
request.I = item.I;
return Equals(request, item);
});
if (sendingReq != null) {
LCLogger.Warn("duplicated request");
if (responses.TryGetValue(sendingReq.I, out TaskCompletionSource<GenericCommand> waitingTcs)) {
if (requestToResponses.TryGetValue(sendingReq, out TaskCompletionSource<GenericCommand> waitingTcs)) {
return await waitingTcs.Task;
}
LCLogger.Error($"error request: {request}");
} else {
sendingRequests.Add(request);
}
}
TaskCompletionSource<GenericCommand> tcs = new TaskCompletionSource<GenericCommand>();
request.I = requestI++;
responses.Add(request.I, tcs);
requestToResponses.Add(request, tcs);
try {
await SendCommand(request);
} catch (Exception e) {
@ -196,7 +205,7 @@ namespace LeanCloud.Realtime.Internal.Connection {
if (command.HasI) {
// 应答
int requestIndex = command.I;
if (responses.TryGetValue(requestIndex, out TaskCompletionSource<GenericCommand> tcs)) {
if (requestToResponses.TryGetValue(command, out TaskCompletionSource<GenericCommand> tcs)) {
if (command.HasErrorMessage) {
// 错误
ErrorCommand error = command.ErrorMessage;
@ -206,12 +215,9 @@ namespace LeanCloud.Realtime.Internal.Connection {
LCException exception = new LCException(code, detail);
tcs.TrySetException(exception);
} else {
sendingRequests.RemoveAll(item => {
return item.I == command.I;
});
tcs.TrySetResult(command);
}
responses.Remove(requestIndex);
requestToResponses.Remove(command);
} else {
LCLogger.Error($"No request for {requestIndex}");
}