* LCIMClient.cs:

* LCIMConversation.cs:
* LCHeartBeat.cs:
* LCConnection.cs:
* LCIMController.cs:
* LCWebSocketClient.cs:
* LCWebSocketConnection.cs:
* LCIMGoAwayController.cs:
* LCIMUnreadController.cs:

* Realtime.csproj: chore: 完善连接层,WebSocket 客户端,心跳监听器
oneRain 2020-04-10 16:32:33 +08:00
parent a11da59ec5
commit 72d65b4fb5
10 changed files with 454 additions and 224 deletions

View File

@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;
using System.Collections.ObjectModel;
using LeanCloud.Realtime.Protocol;
using LeanCloud.Storage;
using LeanCloud.Storage.Internal.Codec;

View File

@ -0,0 +1,155 @@
using System;
using System.Text;
using System.Collections.Generic;
using System.Threading.Tasks;
using Google.Protobuf;
using LeanCloud.Realtime.Internal.WebSocket;
using LeanCloud.Realtime.Protocol;
using LeanCloud.Common;
using LeanCloud.Storage;
namespace LeanCloud.Realtime.Internal.Connection {
internal class LCConnection {
private const int SEND_TIMEOUT = 10000;
private const int MAX_RECONNECT_TIMES = 10;
internal Action<GenericCommand> OnNotification;
internal Action OnDisconnect;
internal Action OnReconnecting;
internal Action OnReconnected;
private LCHeartBeat heartBeat;
internal string id;
private readonly Dictionary<int, TaskCompletionSource<GenericCommand>> responses;
private int requestI = 1;
private LCWebSocketClient client;
internal LCConnection(string id) {
this.id = id;
responses = new Dictionary<int, TaskCompletionSource<GenericCommand>>();
heartBeat = new LCHeartBeat(this, 10000, 10000, () => {
});
client = new LCWebSocketClient {
OnMessage = OnMessage,
OnDisconnect = OnClientDisconnect
};
}
internal async Task Connect() {
await client.Connect();
}
internal async Task<GenericCommand> SendRequest(GenericCommand request) {
TaskCompletionSource<GenericCommand> tcs = new TaskCompletionSource<GenericCommand>();
request.I = requestI++;
responses.Add(request.I, tcs);
LCLogger.Debug($"{id} => {FormatCommand(request)}");
byte[] bytes = request.ToByteArray();
Task sendTask = client.Send(bytes);
Task timeoutTask = Task.Delay(SEND_TIMEOUT);
try {
Task doneTask = await Task.WhenAny(sendTask, timeoutTask);
if (timeoutTask == doneTask) {
tcs.TrySetException(new TimeoutException("Send request"));
}
} catch (Exception e) {
tcs.TrySetException(e);
}
return await tcs.Task;
}
internal async Task Close() {
OnNotification = null;
OnDisconnect = null;
heartBeat.Stop();
await client.Close();
}
private void OnMessage(byte[] bytes) {
_ = heartBeat.Update();
try {
GenericCommand command = GenericCommand.Parser.ParseFrom(bytes);
LCLogger.Debug($"{id} <= {FormatCommand(command)}");
if (command.HasI) {
// 应答
int requestIndex = command.I;
if (responses.TryGetValue(requestIndex, out TaskCompletionSource<GenericCommand> tcs)) {
if (command.HasErrorMessage) {
// 错误
ErrorCommand error = command.ErrorMessage;
int code = error.Code;
string detail = error.Detail;
// 包装成异常抛出
LCException exception = new LCException(code, detail);
tcs.TrySetException(exception);
} else {
tcs.TrySetResult(command);
}
responses.Remove(requestIndex);
} else {
LCLogger.Error($"No request for {requestIndex}");
}
} else {
// 通知
OnNotification?.Invoke(command);
}
} catch (Exception e) {
LCLogger.Error(e.Message);
}
}
private void OnClientDisconnect() {
OnDisconnect?.Invoke();
OnReconnecting?.Invoke();
// TODO 重连
_ = Reconnect();
}
private async Task Reconnect() {
while (true) {
int reconnectCount = 0;
// 重连策略
while (reconnectCount < MAX_RECONNECT_TIMES) {
try {
LCLogger.Debug($"Reconnecting... {reconnectCount}");
await client.Connect();
break;
} catch (Exception e) {
reconnectCount++;
LCLogger.Error(e.Message);
int delay = 10;
LCLogger.Debug($"Reconnect after {delay}s");
await Task.Delay(1000 * delay);
}
}
if (reconnectCount < MAX_RECONNECT_TIMES) {
// 重连成功
LCLogger.Debug("Reconnected");
OnReconnected?.Invoke();
break;
} else {
// TODO 重置连接
client = new LCWebSocketClient();
}
}
}
private static string FormatCommand(GenericCommand command) {
StringBuilder sb = new StringBuilder($"{command.Cmd}");
if (command.HasOp) {
sb.Append($"/{command.Op}");
}
sb.Append($"\n{command}");
return sb.ToString();
}
}
}

View File

@ -0,0 +1,86 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using LeanCloud.Common;
using LeanCloud.Realtime.Protocol;
namespace LeanCloud.Realtime.Internal.Connection {
/// <summary>
/// 心跳控制器
/// 1. 每次接收到消息后开始监听,如果在 pingInterval 时间内没有再次接收到消息,则发送 ping 请求;
/// 2. 发送后等待 pongInterval 时间,如果在此时间内接收到了任何消息,则取消并重新开始监听 1
/// 3. 如果没收到消息,则认为超时并回调,连接层接收回调后放弃当前连接,以断线逻辑处理
/// </summary>
internal class LCHeartBeat {
private readonly LCConnection connection;
/// <summary>
/// ping 间隔
/// </summary>
private readonly int pingInterval;
/// <summary>
/// pong 间隔
/// </summary>
private readonly int pongInterval;
private Action onTimeout;
private CancellationTokenSource pingCTS;
private CancellationTokenSource pongCTS;
internal LCHeartBeat(LCConnection connection,
int pingInterval,
int pongInterval,
Action onTimeout) {
this.connection = connection;
this.pingInterval = pingInterval;
this.pongInterval = pongInterval;
this.onTimeout = onTimeout;
}
/// <summary>
/// 更新心跳监听
/// </summary>
/// <returns></returns>
internal async Task Update() {
LCLogger.Debug("HeartBeat update");
pingCTS?.Cancel();
pongCTS?.Cancel();
// 计时准备 ping
pingCTS = new CancellationTokenSource();
Task delayTask = Task.Delay(pingInterval, pingCTS.Token);
await delayTask;
if (delayTask.IsCanceled) {
return;
}
// 发送 ping 包
LCLogger.Debug("Ping ~~~");
GenericCommand command = new GenericCommand {
Cmd = CommandType.Echo,
AppId = LCApplication.AppId,
PeerId = connection.id
};
_ = connection.SendRequest(command);
pongCTS = new CancellationTokenSource();
Task timeoutTask = Task.Delay(pongInterval, pongCTS.Token);
await timeoutTask;
if (timeoutTask.IsCanceled) {
return;
}
// timeout
LCLogger.Error("Ping timeout");
onTimeout.Invoke();
}
/// <summary>
/// 停止心跳监听
/// </summary>
internal void Stop() {
onTimeout = null;
pingCTS?.Cancel();
pongCTS?.Cancel();
}
}
}

View File

@ -1,6 +1,6 @@
using System.Threading.Tasks;
using LeanCloud.Realtime.Protocol;
using LeanCloud.Realtime.Internal.WebSocket;
using LeanCloud.Realtime.Internal.Connection;
namespace LeanCloud.Realtime.Internal.Controller {
internal abstract class LCIMController {
@ -14,7 +14,7 @@ namespace LeanCloud.Realtime.Internal.Controller {
internal abstract Task OnNotification(GenericCommand notification);
protected LCWebSocketConnection Connection {
protected LCConnection Connection {
get {
return Client.Connection;
}

View File

@ -11,7 +11,7 @@ namespace LeanCloud.Realtime.Internal.Controller {
internal override async Task OnNotification(GenericCommand notification) {
// 清空缓存,断开连接,等待重新连接
Connection.Router.Reset();
//Connection.Router.Reset();
await Connection.Close();
}

View File

@ -23,22 +23,24 @@ namespace LeanCloud.Realtime.Internal.Controller {
// 设置对话中的未读数据
LCIMConversation conversation = conversationDict[conv.Cid];
conversation.Unread = conv.Unread;
LCIMMessage message = null;
if (conv.HasBinaryMsg) {
// 二进制消息
byte[] bytes = conv.BinaryMsg.ToByteArray();
message = LCIMBinaryMessage.Deserialize(bytes);
} else {
// 类型消息
message = LCIMTypedMessage.Deserialize(conv.Data);
if (conv.HasData || conv.HasBinaryMsg) {
// 如果有消息,则反序列化
LCIMMessage message = null;
if (conv.HasBinaryMsg) {
// 二进制消息
byte[] bytes = conv.BinaryMsg.ToByteArray();
message = LCIMBinaryMessage.Deserialize(bytes);
} else {
// 类型消息
message = LCIMTypedMessage.Deserialize(conv.Data);
}
// 填充消息数据
message.ConversationId = conv.Cid;
message.Id = conv.Mid;
message.FromClientId = conv.From;
message.SentTimestamp = conv.Timestamp;
conversation.LastMessage = message;
}
// 填充消息数据
message.ConversationId = conv.Cid;
message.Id = conv.Mid;
message.FromClientId = conv.From;
message.SentTimestamp = conv.Timestamp;
conversation.LastMessage = message;
return conversation;
}).ToList().AsReadOnly();
Client.OnUnreadMessagesCountUpdated?.Invoke(conversations);

View File

@ -0,0 +1,134 @@
using System;
using System.Threading.Tasks;
using System.Net.WebSockets;
using LeanCloud.Common;
using LeanCloud.Realtime.Internal.Router;
namespace LeanCloud.Realtime.Internal.WebSocket {
internal class LCWebSocketClient {
// .net standard 2.0 好像在拼合 Frame 时有 bug所以将这个值调整大一些
private const int RECV_BUFFER_SIZE = 1024 * 5;
private const int CLOSE_TIMEOUT = 5000;
internal Action<byte[]> OnMessage;
internal Action OnDisconnect;
internal Action OnReconnect;
private ClientWebSocket ws;
private readonly LCRTMRouter router;
internal LCWebSocketClient() {
router = new LCRTMRouter();
}
internal async Task Connect() {
LCRTMServer rtmServer = await router.GetServer();
try {
LCLogger.Debug($"Connect Primary Server");
await Connect(rtmServer.Primary);
} catch (Exception e) {
LCLogger.Error(e.Message);
LCLogger.Debug($"Connect Secondary");
await Connect(rtmServer.Secondary);
}
// 接收
_ = StartReceive();
}
private async Task Connect(string server) {
LCLogger.Debug($"Connect WebSocket: {server}");
Task timeoutTask = Task.Delay(5000);
ws = new ClientWebSocket();
ws.Options.AddSubProtocol("lc.protobuf2.3");
Task connectTask = ws.ConnectAsync(new Uri(server), default);
if (await Task.WhenAny(connectTask, timeoutTask) == connectTask) {
LCLogger.Debug($"Connected WebSocket: {server}");
} else {
throw new TimeoutException("Connect timeout");
}
}
internal async Task Close() {
OnMessage = null;
OnDisconnect = null;
OnReconnect = null;
try {
// 发送关闭帧可能会很久,所以增加超时
// 主动挥手关闭,不会再收到 Close Frame
Task closeTask = ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "", default);
Task delayTask = Task.Delay(CLOSE_TIMEOUT);
await Task.WhenAny(closeTask, delayTask);
} catch (Exception e) {
LCLogger.Error(e.Message);
} finally {
ws.Abort();
ws.Dispose();
LCLogger.Debug("Closed WebSocket.");
}
}
internal async Task Send(byte[] data) {
ArraySegment<byte> bytes = new ArraySegment<byte>(data);
if (ws.State == WebSocketState.Open) {
try {
await ws.SendAsync(bytes, WebSocketMessageType.Binary, true, default);
} catch (Exception e) {
LCLogger.Error(e.Message);
throw e;
}
} else {
throw new Exception($"Error Websocket state: {ws.State}");
}
}
private async Task StartReceive() {
byte[] buffer = new byte[RECV_BUFFER_SIZE];
try {
while (ws.State == WebSocketState.Open) {
WebSocketReceiveResult result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer), default);
if (result.MessageType == WebSocketMessageType.Close) {
// 由服务端发起关闭
LCLogger.Debug($"Receive Closed: {result.CloseStatusDescription}");
try {
// 挥手关闭
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default);
} catch (Exception ex) {
LCLogger.Error(ex.Message);
} finally {
HandleClose();
}
} else if (result.MessageType == WebSocketMessageType.Binary) {
// 拼合 WebSocket Message
int length = result.Count;
byte[] data = new byte[length];
Array.Copy(buffer, data, length);
OnMessage?.Invoke(data);
} else {
LCLogger.Error($"Error message type: {result.MessageType}");
}
}
} catch (Exception e) {
// 客户端网络异常
LCLogger.Error(e.Message);
LCLogger.Debug($"WebSocket State: {ws.State}");
HandleClose();
}
}
private void HandleClose() {
try {
ws.Abort();
ws.Dispose();
} catch (Exception e) {
LCLogger.Error(e.Message);
} finally {
OnDisconnect?.Invoke();
}
}
}
}

View File

@ -1,190 +0,0 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Net.WebSockets;
using LeanCloud.Realtime.Protocol;
using LeanCloud.Storage;
using LeanCloud.Realtime.Internal.Router;
using LeanCloud.Common;
using Google.Protobuf;
namespace LeanCloud.Realtime.Internal.WebSocket {
internal class LCWebSocketConnection {
private const int KEEP_ALIVE_INTERVAL = 1;
// .net standard 2.0 好像在拼合 Frame 时有 bug所以将这个值调整大一些
private const int RECV_BUFFER_SIZE = 1024 * 5;
private ClientWebSocket ws;
private volatile int requestI = 1;
private readonly object requestILock = new object();
private readonly Dictionary<int, TaskCompletionSource<GenericCommand>> responses;
private readonly string id;
internal LCRTMRouter Router {
get; private set;
}
internal Func<GenericCommand, Task> OnNotification {
get; set;
}
internal Action<int, string> OnDisconnect {
get; set;
}
internal Func<Task> OnReconnect {
get; set;
}
internal LCWebSocketConnection(string id) {
Router = new LCRTMRouter();
this.id = id;
responses = new Dictionary<int, TaskCompletionSource<GenericCommand>>();
}
internal async Task Connect() {
// TODO 可完善策略
LCRTMServer rtmServer = await Router.GetServer();
try {
LCLogger.Debug($"Connect Primary Server: {rtmServer.Primary}");
await Connect(rtmServer.Primary);
LCLogger.Debug("Connected Primary Server");
} catch (Exception e) {
LCLogger.Error(e.Message);
LCLogger.Debug($"Connect Secondary Server: {rtmServer.Secondary}");
await Connect(rtmServer.Secondary);
LCLogger.Debug($"Connected Secondary Server");
}
// 接收
_ = StartReceive();
}
private async Task Connect(string server) {
ws = new ClientWebSocket();
ws.Options.AddSubProtocol("lc.protobuf2.3");
ws.Options.KeepAliveInterval = TimeSpan.FromSeconds(KEEP_ALIVE_INTERVAL);
await ws.ConnectAsync(new Uri(server), default);
}
private async Task Reconnect() {
// TODO 重连策略
while (true) {
try {
await Connect();
break;
} catch (Exception e) {
LCLogger.Error(e.Message);
await Task.Delay(1000 * 10);
}
}
OnReconnect?.Invoke();
}
internal Task<GenericCommand> SendRequest(GenericCommand request) {
TaskCompletionSource<GenericCommand> tcs = new TaskCompletionSource<GenericCommand>();
request.I = RequestI;
responses.Add(request.I, tcs);
LCLogger.Debug($"{id} => {request.Cmd}/{request.Op}: {request}");
ArraySegment<byte> bytes = new ArraySegment<byte>(request.ToByteArray());
try {
ws.SendAsync(bytes, WebSocketMessageType.Binary, true, default);
} catch (Exception e) {
// TODO 发送消息异常
LCLogger.Error(e.Message);
}
return tcs.Task;
}
internal async Task Close() {
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "1", default);
}
private async Task StartReceive() {
byte[] buffer = new byte[RECV_BUFFER_SIZE];
try {
while (ws.State == WebSocketState.Open) {
byte[] data = new byte[0];
WebSocketReceiveResult result;
do {
result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer), default);
if (result.MessageType == WebSocketMessageType.Close) {
LCLogger.Debug($"Receive Closed: {result.CloseStatusDescription}");
try {
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "", default);
} catch (Exception ex) {
LCLogger.Error(ex.Message);
}
OnDisconnect?.Invoke(-1, null);
return;
}
// 拼合 WebSocket Frame
byte[] oldData = data;
data = new byte[oldData.Length + result.Count];
Array.Copy(oldData, data, oldData.Length);
Array.Copy(buffer, 0, data, oldData.Length, result.Count);
} while (!result.EndOfMessage);
try {
GenericCommand command = GenericCommand.Parser.ParseFrom(data);
LCLogger.Debug($"{id} <= {command.Cmd}/{command.Op}: {command}");
_ = HandleCommand(command);
} catch (Exception e) {
// 解析消息错误
LCLogger.Error(e.Message);
}
}
} catch (WebSocketException e) {
LCLogger.Error(e.Message);
LCLogger.Debug($"WebSocket State: {ws.State}");
try {
ws.Abort();
ws.Dispose();
} catch (Exception ex) {
LCLogger.Error(ex.Message);
} finally {
// 触发重连
await Reconnect();
}
}
}
private async Task HandleCommand(GenericCommand command) {
try {
if (command.HasI) {
// 应答
if (responses.TryGetValue(command.I, out TaskCompletionSource<GenericCommand> tcs)) {
if (command.HasErrorMessage) {
// 错误
ErrorCommand error = command.ErrorMessage;
int code = error.Code;
string detail = error.Detail;
// 包装成异常抛出
LCException exception = new LCException(code, detail);
tcs.SetException(exception);
} else {
tcs.SetResult(command);
}
}
} else {
// 通知
await OnNotification?.Invoke(command);
}
} catch (Exception e) {
LCLogger.Error(e.Message);
}
}
private int RequestI {
get {
lock (requestILock) {
return requestI++;
};
}
}
}
}

View File

@ -3,9 +3,10 @@ using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;
using System.Collections.ObjectModel;
using LeanCloud.Realtime.Internal.WebSocket;
using LeanCloud.Common;
using LeanCloud.Realtime.Protocol;
using LeanCloud.Realtime.Internal.Controller;
using LeanCloud.Realtime.Internal.Connection;
namespace LeanCloud.Realtime {
public class LCIMClient {
@ -69,6 +70,27 @@ namespace LeanCloud.Realtime {
get; set;
}
/// <summary>
/// 客户端正在重连
/// </summary>
public Action OnReconnecting {
get; set;
}
/// <summary>
/// 客户端重连成功
/// </summary>
public Action OnReconnected {
get; set;
}
/// <summary>
/// 客户端重连失败,连接成功,登录失败
/// </summary>
public Action OnReconnectError {
get; set;
}
/// <summary>
/// 用户在其他客户端登录,当前客户端被服务端强行下线
/// </summary>
@ -187,7 +209,7 @@ namespace LeanCloud.Realtime {
get; private set;
}
internal LCWebSocketConnection Connection {
internal LCConnection Connection {
get; set;
}
@ -211,6 +233,8 @@ namespace LeanCloud.Realtime {
get; private set;
}
#region 接口
public LCIMClient(string clientId,
ILCIMSignatureFactory signatureFactory = null) {
Id = clientId;
@ -223,10 +247,10 @@ namespace LeanCloud.Realtime {
UnreadController = new LCIMUnreadController(this);
GoAwayController = new LCIMGoAwayController(this);
Connection = new LCWebSocketConnection(Id) {
OnNotification = OnNotification,
OnDisconnect = OnDisconnect,
OnReconnect = OnReconnect
Connection = new LCConnection(Id) {
OnNotification = OnConnectionNotification,
OnDisconnect = OnConnectionDisconnect,
OnReconnected = OnConnectionReconnect
};
}
@ -367,31 +391,49 @@ namespace LeanCloud.Realtime {
return new LCIMConversationQuery(this);
}
private async Task OnNotification(GenericCommand notification) {
#endregion
private void OnConnectionNotification(GenericCommand notification) {
switch (notification.Cmd) {
case CommandType.Session:
await SessionController.OnNotification(notification);
_ = SessionController.OnNotification(notification);
break;
case CommandType.Conv:
await ConversationController.OnNotification(notification);
_ = ConversationController.OnNotification(notification);
break;
case CommandType.Direct:
await MessageController.OnNotification(notification);
_ = MessageController.OnNotification(notification);
break;
case CommandType.Unread:
await UnreadController.OnNotification(notification);
_ = UnreadController.OnNotification(notification);
break;
case CommandType.Goaway:
await GoAwayController.OnNotification(notification);
_ = GoAwayController.OnNotification(notification);
break;
default:
break;
}
}
private async Task OnReconnect() {
// 打开 Session
await SessionController.Open();
private void OnConnectionDisconnect() {
OnDisconnect?.Invoke();
}
private void OnConnectionReconnect() {
_ = HandleReconnected();
}
private async Task HandleReconnected() {
try {
// 打开 Session
await SessionController.Open();
// 回调用户
OnReconnected?.Invoke();
} catch (Exception e) {
LCLogger.Error(e.Message);
await Connection.Close();
OnReconnectError?.Invoke();
}
}
internal async Task<LCIMConversation> GetOrQueryConversation(string convId) {

View File

@ -4,6 +4,7 @@
<TargetFramework>netstandard2.0</TargetFramework>
<ReleaseVersion>0.1.0</ReleaseVersion>
<RootNamespace>LeanCloud.Realtime</RootNamespace>
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
</PropertyGroup>
<ItemGroup>
@ -21,5 +22,6 @@
<Folder Include="Internal\WebSocket\" />
<Folder Include="Signature\" />
<Folder Include="Internal\Controller\" />
<Folder Include="Internal\Connection\" />
</ItemGroup>
</Project>