* LCLogger.cs:

* LCIMClient.cs:
* LCAppRouter.cs:
* LCRTMRouter.cs:
* LCHeartBeat.cs:
* LCConnection.cs:
* LCWebSocketClient.cs:
* LCIMGoAwayController.cs:
* LCIMConversationController.cs:

* LCHttpClient.cs: chore: 支持 goaway
oneRain 2020-04-13 17:29:55 +08:00
parent 35c66d65cf
commit 88f2b64eba
10 changed files with 122 additions and 67 deletions

View File

@ -63,7 +63,7 @@ namespace LeanCloud.Common {
Dictionary<string, object> data = JsonConvert.DeserializeObject<Dictionary<string, object>>(resultString);
appServer = new LCAppServer(data);
} catch (Exception e) {
LCLogger.Error(e.Message);
LCLogger.Error(e);
// 拉取服务地址失败后,使用国际节点的默认服务地址
appServer = LCAppServer.GetInternalFallbackAppServer(appId);
}

View File

@ -1,4 +1,5 @@
using System;
using System.Text;
namespace LeanCloud.Common {
/// <summary>
@ -36,5 +37,15 @@ namespace LeanCloud.Common {
public static void Error(string format, params object[] args) {
LogDelegate?.Invoke(LCLogLevel.Error, string.Format(format, args));
}
public static void Error(Exception e) {
StringBuilder sb = new StringBuilder();
sb.Append(e.GetType());
sb.Append("\n");
sb.Append(e.Message);
sb.Append("\n");
sb.Append(e.StackTrace);
Error(sb.ToString());
}
}
}

View File

@ -3,6 +3,7 @@ using System.Text;
using System.Collections.Generic;
using System.Threading.Tasks;
using Google.Protobuf;
using LeanCloud.Realtime.Internal.Router;
using LeanCloud.Realtime.Internal.WebSocket;
using LeanCloud.Realtime.Protocol;
using LeanCloud.Common;
@ -15,7 +16,11 @@ namespace LeanCloud.Realtime.Internal.Connection {
internal class LCConnection {
private const int SEND_TIMEOUT = 10000;
private const int MAX_RECONNECT_TIMES = 10;
private const int MAX_RECONNECT_TIMES = 3;
private const int RECONNECT_INTERVAL = 5000;
private const int HEART_BEAT_INTERVAL = 5000;
internal Action<GenericCommand> OnNotification;
@ -25,24 +30,25 @@ namespace LeanCloud.Realtime.Internal.Connection {
internal Action OnReconnected;
private LCHeartBeat heartBeat;
internal string id;
private readonly Dictionary<int, TaskCompletionSource<GenericCommand>> responses;
private int requestI = 1;
private LCRTMRouter router;
private LCHeartBeat heartBeat;
private LCWebSocketClient client;
internal LCConnection(string id) {
this.id = id;
responses = new Dictionary<int, TaskCompletionSource<GenericCommand>>();
heartBeat = new LCHeartBeat(this, 10000, 10000, () => {
});
client = new LCWebSocketClient {
OnMessage = OnMessage,
heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL);
router = new LCRTMRouter();
client = new LCWebSocketClient(router, heartBeat) {
OnMessage = OnClientMessage,
OnDisconnect = OnClientDisconnect
};
}
@ -51,6 +57,18 @@ namespace LeanCloud.Realtime.Internal.Connection {
await client.Connect();
}
internal async Task Reset() {
router.Reset();
await client.Close();
heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL);
router = new LCRTMRouter();
client = new LCWebSocketClient(router, heartBeat) {
OnMessage = OnClientMessage,
OnDisconnect = OnClientDisconnect
};
await Reconnect();
}
internal async Task<GenericCommand> SendRequest(GenericCommand request) {
TaskCompletionSource<GenericCommand> tcs = new TaskCompletionSource<GenericCommand>();
request.I = requestI++;
@ -58,14 +76,14 @@ namespace LeanCloud.Realtime.Internal.Connection {
LCLogger.Debug($"{id} => {FormatCommand(request)}");
byte[] bytes = request.ToByteArray();
Task sendTask = client.Send(bytes);
Task timeoutTask = Task.Delay(SEND_TIMEOUT);
try {
Task doneTask = await Task.WhenAny(sendTask, timeoutTask);
if (timeoutTask == doneTask) {
tcs.TrySetException(new TimeoutException("Send request"));
if (await Task.WhenAny(sendTask, Task.Delay(SEND_TIMEOUT)) == sendTask) {
try {
await sendTask;
} catch (Exception e) {
tcs.TrySetException(e);
}
} catch (Exception e) {
tcs.TrySetException(e);
} else {
tcs.TrySetException(new TimeoutException("Send request"));
}
return await tcs.Task;
}
@ -77,8 +95,7 @@ namespace LeanCloud.Realtime.Internal.Connection {
await client.Close();
}
private void OnMessage(byte[] bytes) {
_ = heartBeat.Update();
private void OnClientMessage(byte[] bytes) {
try {
GenericCommand command = GenericCommand.Parser.ParseFrom(bytes);
LCLogger.Debug($"{id} <= {FormatCommand(command)}");
@ -106,14 +123,14 @@ namespace LeanCloud.Realtime.Internal.Connection {
OnNotification?.Invoke(command);
}
} catch (Exception e) {
LCLogger.Error(e.Message);
LCLogger.Error(e);
}
}
private void OnClientDisconnect() {
OnDisconnect?.Invoke();
OnReconnecting?.Invoke();
// TODO 重连
// 重连
_ = Reconnect();
}
@ -125,13 +142,14 @@ namespace LeanCloud.Realtime.Internal.Connection {
try {
LCLogger.Debug($"Reconnecting... {reconnectCount}");
await client.Connect();
client.OnMessage = OnClientMessage;
client.OnDisconnect = OnClientDisconnect;
break;
} catch (Exception e) {
reconnectCount++;
LCLogger.Error(e.Message);
int delay = 10;
LCLogger.Debug($"Reconnect after {delay}s");
await Task.Delay(1000 * delay);
LCLogger.Error(e);
LCLogger.Debug($"Reconnect after {RECONNECT_INTERVAL}ms");
await Task.Delay(RECONNECT_INTERVAL);
}
}
if (reconnectCount < MAX_RECONNECT_TIMES) {
@ -140,8 +158,8 @@ namespace LeanCloud.Realtime.Internal.Connection {
OnReconnected?.Invoke();
break;
} else {
// TODO 重置连接
client = new LCWebSocketClient();
// 重置 Router继续尝试重连
router.Reset();
}
}
}

View File

@ -24,26 +24,22 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// </summary>
private readonly int pongInterval;
private Action onTimeout;
private CancellationTokenSource pingCTS;
private CancellationTokenSource pongCTS;
internal LCHeartBeat(LCConnection connection,
int pingInterval,
int pongInterval,
Action onTimeout) {
int pongInterval) {
this.connection = connection;
this.pingInterval = pingInterval;
this.pongInterval = pongInterval;
this.onTimeout = onTimeout;
}
/// <summary>
/// 更新心跳监听
/// </summary>
/// <returns></returns>
internal async Task Update() {
internal async Task Update(Action onTimeout) {
LCLogger.Debug("HeartBeat update");
pingCTS?.Cancel();
pongCTS?.Cancel();
@ -78,7 +74,6 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// 停止心跳监听
/// </summary>
internal void Stop() {
onTimeout = null;
pingCTS?.Cancel();
pongCTS?.Cancel();
}

View File

@ -451,7 +451,7 @@ namespace LeanCloud.Realtime.Internal.Controller {
Data = where
};
} catch (Exception e) {
LCLogger.Error(e.Message);
LCLogger.Error(e);
}
}
command.ConvMessage = convMessage;

View File

@ -11,8 +11,7 @@ namespace LeanCloud.Realtime.Internal.Controller {
internal override async Task OnNotification(GenericCommand notification) {
// 清空缓存,断开连接,等待重新连接
//Connection.Router.Reset();
await Connection.Close();
await Connection.Reset();
}
#endregion

View File

@ -6,12 +6,21 @@ using LeanCloud.Common;
using Newtonsoft.Json;
namespace LeanCloud.Realtime.Internal.Router {
/// <summary>
/// RTM Router
/// </summary>
internal class LCRTMRouter {
private const int REQUEST_TIMEOUT = 10000;
private LCRTMServer rtmServer;
internal LCRTMRouter() {
}
/// <summary>
/// 获取服务器地址
/// </summary>
/// <returns></returns>
internal async Task<LCRTMServer> GetServer() {
if (rtmServer == null || !rtmServer.IsValid) {
await Fetch();
@ -19,6 +28,9 @@ namespace LeanCloud.Realtime.Internal.Router {
return rtmServer;
}
/// <summary>
/// 重置服务器地址缓存
/// </summary>
internal void Reset() {
rtmServer = null;
}
@ -33,9 +45,14 @@ namespace LeanCloud.Realtime.Internal.Router {
};
HttpClient client = new HttpClient();
LCHttpUtils.PrintRequest(client, request);
HttpResponseMessage response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
request.Dispose();
Task<HttpResponseMessage> requestTask = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
if (await Task.WhenAny(requestTask, Task.Delay(REQUEST_TIMEOUT)) != requestTask) {
throw new TimeoutException("Request timeout.");
}
HttpResponseMessage response = await requestTask;
request.Dispose();
string resultString = await response.Content.ReadAsStringAsync();
response.Dispose();
LCHttpUtils.PrintResponse(response, resultString);

View File

@ -3,6 +3,7 @@ using System.Threading.Tasks;
using System.Net.WebSockets;
using LeanCloud.Common;
using LeanCloud.Realtime.Internal.Router;
using LeanCloud.Realtime.Internal.Connection;
namespace LeanCloud.Realtime.Internal.WebSocket {
/// <summary>
@ -14,6 +15,8 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
private const int CLOSE_TIMEOUT = 5000;
private const int CONNECT_TIMEOUT = 10000;
internal Action<byte[]> OnMessage;
internal Action OnDisconnect;
@ -24,28 +27,35 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
private readonly LCRTMRouter router;
internal LCWebSocketClient() {
router = new LCRTMRouter();
private readonly LCHeartBeat heartBeat;
internal LCWebSocketClient(LCRTMRouter router, LCHeartBeat heartBeat) {
this.router = router;
this.heartBeat = heartBeat;
}
internal async Task Connect() {
LCRTMServer rtmServer = await router.GetServer();
try {
LCLogger.Debug($"Primary Server");
await Connect(rtmServer.Primary);
LCRTMServer rtmServer = await router.GetServer();
try {
LCLogger.Debug($"Primary Server");
await Connect(rtmServer.Primary);
} catch (Exception e) {
LCLogger.Error(e);
LCLogger.Debug($"Secondary Server");
await Connect(rtmServer.Secondary);
}
} catch (Exception e) {
LCLogger.Error(e.Message);
LCLogger.Debug($"Secondary Server");
await Connect(rtmServer.Secondary);
throw e;
}
// 接收
_ = StartReceive();
}
private async Task Connect(string server) {
LCLogger.Debug($"Connecting WebSocket: {server}");
Task timeoutTask = Task.Delay(5000);
Task timeoutTask = Task.Delay(CONNECT_TIMEOUT);
ws = new ClientWebSocket();
ws.Options.AddSubProtocol("lc.protobuf2.3");
Task connectTask = ws.ConnectAsync(new Uri(server), default);
@ -61,14 +71,15 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
OnMessage = null;
OnDisconnect = null;
OnReconnect = null;
heartBeat.Stop();
try {
// 发送关闭帧可能会很久,所以增加超时
// 主动挥手关闭,不会再收到 Close Frame
Task closeTask = ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "", default);
Task closeTask = ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default);
Task delayTask = Task.Delay(CLOSE_TIMEOUT);
await Task.WhenAny(closeTask, delayTask);
} catch (Exception e) {
LCLogger.Error(e.Message);
LCLogger.Error(e);
} finally {
ws.Abort();
ws.Dispose();
@ -82,7 +93,7 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
try {
await ws.SendAsync(bytes, WebSocketMessageType.Binary, true, default);
} catch (Exception e) {
LCLogger.Error(e.Message);
LCLogger.Error(e);
throw e;
}
} else {
@ -99,16 +110,22 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
WebSocketReceiveResult result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer), default);
if (result.MessageType == WebSocketMessageType.Close) {
// 由服务端发起关闭
LCLogger.Debug($"Receive Closed: {result.CloseStatusDescription}");
try {
// 挥手关闭
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default);
} catch (Exception ex) {
LCLogger.Error(ex.Message);
} finally {
HandleClose();
LCLogger.Debug($"Receive Closed: {result.CloseStatus}");
LCLogger.Debug($"ws state: {ws.State}");
// 这里有可能是客户端主动关闭,也有可能是服务端主动关闭
if (ws.State == WebSocketState.CloseReceived) {
// 如果是服务端主动关闭,则挥手关闭,并认为是断线
try {
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default);
} catch (Exception e) {
LCLogger.Error(e);
} finally {
// TODO 正常关闭不知道能否完成???
OnDisconnect?.Invoke();
}
}
} else if (result.MessageType == WebSocketMessageType.Binary) {
_ = heartBeat.Update(HandleClose);
// 拼合 WebSocket Message
int length = result.Count;
byte[] data = new byte[length];
@ -120,20 +137,18 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
}
} catch (Exception e) {
// 客户端网络异常
LCLogger.Error(e.Message);
LCLogger.Debug($"WebSocket State: {ws.State}");
HandleClose();
LCLogger.Error(e);
OnDisconnect?.Invoke();
}
}
private void HandleClose() {
try {
heartBeat.Stop();
ws.Abort();
ws.Dispose();
} catch (Exception e) {
LCLogger.Error(e.Message);
} finally {
OnDisconnect?.Invoke();
LCLogger.Error(e);
}
}
}

View File

@ -455,7 +455,7 @@ namespace LeanCloud.Realtime {
// 回调用户
OnReconnected?.Invoke();
} catch (Exception e) {
LCLogger.Error(e.Message);
LCLogger.Error(e);
await Connection.Close();
OnReconnectError?.Invoke();
}

View File

@ -165,7 +165,7 @@ namespace LeanCloud.Storage.Internal.Http {
code = (int)error["code"];
message = error["error"].ToString();
} catch (Exception e) {
LCLogger.Error(e.Message);
LCLogger.Error(e);
}
return new LCException(code, message);
}