* LCIMClient.cs:

* LCRTMRouter.cs:
* LCIMConversation.cs:
* LCHeartBeat.cs:
* LCConnection.cs:
* LCWebSocketClient.cs:
* LCIMMessageController.cs:
* LCIMSessionController.cs:

* LCIMMessage.cs: chore: 完善连接模块
oneRain 2020-04-16 11:38:22 +08:00
parent 8f81bf245a
commit 95acf35e65
9 changed files with 219 additions and 102 deletions

View File

@ -4,22 +4,36 @@ using System.Threading.Tasks;
using System.Linq;
using System.Collections.ObjectModel;
using LeanCloud.Storage;
using LeanCloud.Storage.Internal.Codec;
namespace LeanCloud.Realtime {
/// <summary>
/// 普通对话
/// </summary>
public class LCIMConversation {
/// <summary>
/// 对话 Id
/// </summary>
public string Id {
get; internal set;
}
/// <summary>
/// 是否唯一
/// </summary>
public bool Unique {
get; internal set;
}
/// <summary>
/// 唯一 Id
/// </summary>
public string UniqueId {
get; internal set;
}
/// <summary>
/// 对话名称
/// </summary>
public string Name {
get {
return this["name"] as string;
@ -29,42 +43,69 @@ namespace LeanCloud.Realtime {
}
}
/// <summary>
/// 创建者 Id
/// </summary>
public string CreatorId {
get; set;
}
/// <summary>
/// 成员 Id
/// </summary>
public ReadOnlyCollection<string> MemberIds {
get {
return new ReadOnlyCollection<string>(ids.ToList());
}
}
/// <summary>
/// 静音成员 Id
/// </summary>
public ReadOnlyCollection<string> MutedMemberIds {
get {
return new ReadOnlyCollection<string>(mutedIds.ToList());
}
}
/// <summary>
/// 未读消息数量
/// </summary>
public int Unread {
get; internal set;
}
/// <summary>
/// 最新的一条消息
/// </summary>
public LCIMMessage LastMessage {
get; internal set;
}
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreatedAt {
get; internal set;
}
/// <summary>
/// 更新时间
/// </summary>
public DateTime UpdatedAt {
get; internal set;
}
/// <summary>
/// 最新送达消息时间戳
/// </summary>
public long LastDeliveredTimestamp {
get; internal set;
}
/// <summary>
/// 最新送达消息时间
/// </summary>
public DateTime LastDeliveredAt {
get {
DateTimeOffset dateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(LastDeliveredTimestamp);
@ -72,10 +113,16 @@ namespace LeanCloud.Realtime {
}
}
/// <summary>
/// 最新已读消息时间戳
/// </summary>
public long LastReadTimestamp {
get; internal set;
}
/// <summary>
/// 最新已读消息时间
/// </summary>
public DateTime LastReadAt {
get {
DateTimeOffset dateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(LastReadTimestamp);
@ -83,6 +130,11 @@ namespace LeanCloud.Realtime {
}
}
/// <summary>
/// 设置/获取对话属性
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public object this[string key] {
get {
return customProperties[key];
@ -92,6 +144,9 @@ namespace LeanCloud.Realtime {
}
}
/// <summary>
/// 是否已静音
/// </summary>
public bool IsMute {
get; private set;
}
@ -100,7 +155,7 @@ namespace LeanCloud.Realtime {
get; private set;
}
private Dictionary<string, object> customProperties;
private readonly Dictionary<string, object> customProperties;
internal HashSet<string> ids;
@ -128,7 +183,7 @@ namespace LeanCloud.Realtime {
if (LastMessage == null) {
return;
}
await Client.ConversationController.Read(Id, LastMessage);
await Client.MessageController.Read(Id, LastMessage);
}
/// <summary>
@ -374,7 +429,7 @@ namespace LeanCloud.Realtime {
/// <param name="limit">限制</param>
/// <param name="messageType">消息类型</param>
/// <returns></returns>
public async Task<List<LCIMMessage>> QueryMessages(LCIMMessageQueryEndpoint start = null,
public async Task<ReadOnlyCollection<LCIMMessage>> QueryMessages(LCIMMessageQueryEndpoint start = null,
LCIMMessageQueryEndpoint end = null,
LCIMMessageQueryDirection direction = LCIMMessageQueryDirection.NewToOld,
int limit = 20,

View File

@ -34,16 +34,31 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// </summary>
private const int HEART_BEAT_INTERVAL = 5000;
/// <summary>
/// 通知事件
/// </summary>
internal Action<GenericCommand> OnNotification;
/// <summary>
/// 断线事件
/// </summary>
internal Action OnDisconnect;
/// <summary>
/// 开始重连事件
/// </summary>
internal Action OnReconnecting;
/// <summary>
/// 重连成功事件
/// </summary>
internal Action OnReconnected;
internal string id;
/// <summary>
/// 请求回调缓存
/// </summary>
private readonly Dictionary<int, TaskCompletionSource<GenericCommand>> responses;
private int requestI = 1;
@ -61,7 +76,7 @@ namespace LeanCloud.Realtime.Internal.Connection {
router = new LCRTMRouter();
client = new LCWebSocketClient(router, heartBeat) {
OnMessage = OnClientMessage,
OnDisconnect = OnClientDisconnect
OnClose = OnClientDisconnect
};
}
@ -69,41 +84,63 @@ namespace LeanCloud.Realtime.Internal.Connection {
await client.Connect();
}
/// <summary>
/// 重置连接
/// </summary>
/// <returns></returns>
internal async Task Reset() {
router.Reset();
// 关闭就连接
await client.Close();
// 重新创建连接组件
heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL);
router = new LCRTMRouter();
client = new LCWebSocketClient(router, heartBeat) {
OnMessage = OnClientMessage,
OnDisconnect = OnClientDisconnect
OnClose = OnClientDisconnect
};
await Reconnect();
}
/// <summary>
/// 发送请求,会在收到应答后返回
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
internal async Task<GenericCommand> SendRequest(GenericCommand request) {
TaskCompletionSource<GenericCommand> tcs = new TaskCompletionSource<GenericCommand>();
request.I = requestI++;
responses.Add(request.I, tcs);
LCLogger.Debug($"{id} => {FormatCommand(request)}");
byte[] bytes = request.ToByteArray();
Task sendTask = client.Send(bytes);
if (await Task.WhenAny(sendTask, Task.Delay(SEND_TIMEOUT)) == sendTask) {
try {
await sendTask;
await SendCommand(request);
} catch (Exception e) {
tcs.TrySetException(e);
}
} else {
tcs.TrySetException(new TimeoutException("Send request"));
}
return await tcs.Task;
}
/// <summary>
/// 发送命令
/// </summary>
/// <param name="command"></param>
/// <returns></returns>
internal async Task SendCommand(GenericCommand command) {
LCLogger.Debug($"{id} => {FormatCommand(command)}");
byte[] bytes = command.ToByteArray();
Task sendTask = client.Send(bytes);
if (await Task.WhenAny(sendTask, Task.Delay(SEND_TIMEOUT)) == sendTask) {
await sendTask;
} else {
throw new TimeoutException("Send request");
}
}
/// <summary>
/// 关闭连接
/// </summary>
/// <returns></returns>
internal async Task Close() {
OnNotification = null;
OnDisconnect = null;
heartBeat.Stop();
await client.Close();
}
@ -154,8 +191,6 @@ namespace LeanCloud.Realtime.Internal.Connection {
try {
LCLogger.Debug($"Reconnecting... {reconnectCount}");
await client.Connect();
client.OnMessage = OnClientMessage;
client.OnDisconnect = OnClientDisconnect;
break;
} catch (Exception e) {
reconnectCount++;
@ -167,11 +202,13 @@ namespace LeanCloud.Realtime.Internal.Connection {
if (reconnectCount < MAX_RECONNECT_TIMES) {
// 重连成功
LCLogger.Debug("Reconnected");
client.OnMessage = OnClientMessage;
client.OnClose = OnClientDisconnect;
OnReconnected?.Invoke();
break;
} else {
// 重置 Router继续尝试重连
router.Reset();
router = new LCRTMRouter();
}
}
}

View File

@ -6,7 +6,7 @@ using LeanCloud.Realtime.Protocol;
namespace LeanCloud.Realtime.Internal.Connection {
/// <summary>
/// 心跳控制器
/// 心跳控制器,由于 .Net Standard 2.0 不支持发送 ping frame所以需要发送逻辑心跳
/// 1. 每次接收到消息后开始监听,如果在 pingInterval 时间内没有再次接收到消息,则发送 ping 请求;
/// 2. 发送后等待 pongInterval 时间,如果在此时间内接收到了任何消息,则取消并重新开始监听 1
/// 3. 如果没收到消息,则认为超时并回调,连接层接收回调后放弃当前连接,以断线逻辑处理

View File

@ -223,6 +223,7 @@ namespace LeanCloud.Realtime.Internal.Controller {
message.IsTransient = direct.Transient;
// 通知服务端已接收
if (!message.IsTransient) {
// 只有非暂态消息才需要发送 ack
_ = Ack(message.ConversationId, message.Id);
}
// 获取对话

View File

@ -18,9 +18,9 @@ namespace LeanCloud.Realtime.Internal.Controller {
/// 打开会话
/// </summary>
/// <returns></returns>
internal async Task Open(bool reconnect) {
internal async Task Open(bool force) {
SessionCommand session = NewSessionCommand();
session.R = reconnect;
session.R = !force;
GenericCommand request = NewCommand(CommandType.Session, OpType.Open);
request.SessionMessage = session;
GenericCommand response = await Client.Connection.SendRequest(request);
@ -78,7 +78,9 @@ namespace LeanCloud.Realtime.Internal.Controller {
SessionCommand session = new SessionCommand();
if (Client.Tag != null) {
session.Tag = Client.Tag;
session.DeviceId = Guid.NewGuid().ToString();
}
if (Client.DeviceId != null) {
session.DeviceId = Client.DeviceId;
}
if (Client.SignatureFactory != null) {
LCIMSignature signature = Client.SignatureFactory.CreateConnectSignature(Client.Id);

View File

@ -28,13 +28,6 @@ namespace LeanCloud.Realtime.Internal.Router {
return rtmServer;
}
/// <summary>
/// 重置服务器地址缓存
/// </summary>
internal void Reset() {
rtmServer = null;
}
async Task<LCRTMServer> Fetch() {
string server = await LCApplication.AppRouter.GetRealtimeServer();
string url = $"{server}/v1/route?appId={LCApplication.AppId}&secure=1";

View File

@ -7,21 +7,31 @@ using LeanCloud.Realtime.Internal.Connection;
namespace LeanCloud.Realtime.Internal.WebSocket {
/// <summary>
/// WebSocket 客户端,只与通信协议相关
/// WebSocket 客户端,负责底层连接和事件,只与通信协议相关
/// </summary>
internal class LCWebSocketClient {
// .net standard 2.0 好像在拼合 Frame 时有 bug所以将这个值调整大一些
private const int RECV_BUFFER_SIZE = 1024 * 5;
/// <summary>
/// 关闭超时
/// </summary>
private const int CLOSE_TIMEOUT = 5000;
/// <summary>
/// 连接超时
/// </summary>
private const int CONNECT_TIMEOUT = 10000;
/// <summary>
/// 消息事件
/// </summary>
internal Action<byte[]> OnMessage;
internal Action OnDisconnect;
internal Action OnReconnect;
/// <summary>
/// 连接关闭
/// </summary>
internal Action OnClose;
private ClientWebSocket ws;
@ -34,6 +44,10 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
this.heartBeat = heartBeat;
}
/// <summary>
/// 连接
/// </summary>
/// <returns></returns>
internal async Task Connect() {
try {
LCRTMServer rtmServer = await router.GetServer();
@ -53,6 +67,11 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
_ = StartReceive();
}
/// <summary>
/// 连接指定 ws 服务器
/// </summary>
/// <param name="server"></param>
/// <returns></returns>
private async Task Connect(string server) {
LCLogger.Debug($"Connecting WebSocket: {server}");
Task timeoutTask = Task.Delay(CONNECT_TIMEOUT);
@ -66,11 +85,14 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
}
}
/// <summary>
/// 主动关闭连接
/// </summary>
/// <returns></returns>
internal async Task Close() {
LCLogger.Debug("Closing WebSocket");
OnMessage = null;
OnDisconnect = null;
OnReconnect = null;
OnClose = null;
heartBeat.Stop();
try {
// 发送关闭帧可能会很久,所以增加超时
@ -87,6 +109,11 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
}
}
/// <summary>
/// 发送数据
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
internal async Task Send(byte[] data) {
ArraySegment<byte> bytes = new ArraySegment<byte>(data);
if (ws.State == WebSocketState.Open) {
@ -103,28 +130,30 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
}
}
/// <summary>
/// 接收数据
/// </summary>
/// <returns></returns>
private async Task StartReceive() {
byte[] buffer = new byte[RECV_BUFFER_SIZE];
try {
while (ws.State == WebSocketState.Open) {
WebSocketReceiveResult result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer), default);
if (result.MessageType == WebSocketMessageType.Close) {
// 由服务端发起关闭
LCLogger.Debug($"Receive Closed: {result.CloseStatus}");
LCLogger.Debug($"ws state: {ws.State}");
// 这里有可能是客户端主动关闭,也有可能是服务端主动关闭
if (ws.State == WebSocketState.CloseReceived) {
// 如果是服务端主动关闭,则挥手关闭,并认为是断线
try {
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default);
Task closeTask = ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default);
await Task.WhenAny(closeTask, Task.Delay(CLOSE_TIMEOUT));
} catch (Exception e) {
LCLogger.Error(e);
} finally {
OnDisconnect?.Invoke();
HandleExceptionClose();
}
}
} else if (result.MessageType == WebSocketMessageType.Binary) {
_ = heartBeat.Update(HandleClose);
_ = heartBeat.Update(HandleExceptionClose);
// 拼合 WebSocket Message
int length = result.Count;
byte[] data = new byte[length];
@ -137,17 +166,19 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
} catch (Exception e) {
// 客户端网络异常
LCLogger.Error(e);
OnDisconnect?.Invoke();
OnClose?.Invoke();
}
}
private void HandleClose() {
private void HandleExceptionClose() {
try {
heartBeat.Stop();
ws.Abort();
ws.Dispose();
} catch (Exception e) {
LCLogger.Error(e);
} finally {
OnClose?.Invoke();
}
}
}

View File

@ -23,8 +23,39 @@ namespace LeanCloud.Realtime {
get; private set;
}
public string DeviceId {
get; private set;
}
#region 事件
#region 连接状态事件
/// <summary>
/// 客户端连接断开
/// </summary>
public Action OnPaused {
get; set;
}
/// <summary>
/// 客户端连接恢复正常
/// </summary>
public Action OnResume {
get; set;
}
/// <summary>
/// 当前客户端被服务端强行下线
/// </summary>
public Action<int, string> OnClose {
get; set;
}
#endregion
#region 对话事件
/// <summary>
/// 当前用户被加入某个对话的黑名单
/// </summary>
@ -49,55 +80,6 @@ namespace LeanCloud.Realtime {
/// </summary>
public Action<LCIMConversation, string> OnUnmuted;
/// <summary>
/// 客户端连接断开
/// </summary>
public Action OnPaused {
get; set;
}
/// <summary>
/// 客户端连接恢复正常
/// </summary>
public Action OnResume {
get; set;
}
/// <summary>
/// 当前客户端被服务端强行下线
/// </summary>
public Action<int, string> OnClose {
get; set;
}
/// <summary>
/// 客户端连接断开
/// </summary>
public Action OnDisconnect {
get; set;
}
/// <summary>
/// 客户端正在重连
/// </summary>
public Action OnReconnecting {
get; set;
}
/// <summary>
/// 客户端重连成功
/// </summary>
public Action OnReconnected {
get; set;
}
/// <summary>
/// 用户在其他客户端登录,当前客户端被服务端强行下线
/// </summary>
public Action<string> OnConflict {
get; set;
}
/// <summary>
/// 该对话信息被更新
/// </summary>
@ -164,6 +146,10 @@ namespace LeanCloud.Realtime {
/// </summary>
public Action<LCIMConversation, string, string, string> OnMemberInfoUpdated;
#endregion
#region 消息事件
/// <summary>
/// 当前用户收到消息
/// </summary>
@ -207,18 +193,23 @@ namespace LeanCloud.Realtime {
}
/// <summary>
///
/// 最近分发消息更新
/// </summary>
public Action OnLastDeliveredAtUpdated {
get; set;
}
/// <summary>
/// 最近已读消息更新
/// </summary>
public Action OnLastReadAtUpdated {
get; set;
}
#endregion
#endregion
internal ILCIMSignatureFactory SignatureFactory {
get; private set;
}
@ -255,9 +246,11 @@ namespace LeanCloud.Realtime {
public LCIMClient(string clientId,
string tag = null,
string deviceId = null,
ILCIMSignatureFactory signatureFactory = null) {
Id = clientId;
Tag = tag;
DeviceId = deviceId;
SignatureFactory = signatureFactory;
ConversationDict = new Dictionary<string, LCIMConversation>();
@ -278,13 +271,14 @@ namespace LeanCloud.Realtime {
}
/// <summary>
/// 连接
/// 登录
/// </summary>
/// <param name="force">是否强制登录</param>
/// <returns></returns>
public async Task Open(bool reconnect = false) {
public async Task Open(bool force = true) {
await Connection.Connect();
// 打开 Session
await SessionController.Open(reconnect);
await SessionController.Open(force);
}
/// <summary>
@ -442,7 +436,7 @@ namespace LeanCloud.Realtime {
}
private void OnConnectionDisconnect() {
OnDisconnect?.Invoke();
OnPaused?.Invoke();
}
private void OnConnectionReconnect() {
@ -454,7 +448,7 @@ namespace LeanCloud.Realtime {
// 打开 Session
await SessionController.Reopen();
// 回调用户
OnReconnected?.Invoke();
OnResume?.Invoke();
} catch (Exception e) {
LCLogger.Error(e);
await Connection.Close();

View File

@ -78,6 +78,10 @@ namespace LeanCloud.Realtime {
get; internal set;
}
public bool IsTransient {
get; internal set;
}
internal LCIMMessage() {
}
}