* LCLiveQueryHeartBeat.cs:

* LCLiveQueryConnection.cs:
* LCConnection.cs:

* LCHeartBeat.cs: chore: 重构 ping/pong 机制
oneRain 2020-06-11 11:33:11 +08:00
parent 6b40e4bfd5
commit fef8f0c18c
4 changed files with 105 additions and 100 deletions

View File

@ -6,7 +6,7 @@ using Newtonsoft.Json;
using LeanCloud.Realtime.Internal.Router; using LeanCloud.Realtime.Internal.Router;
using LeanCloud.Realtime.Internal.WebSocket; using LeanCloud.Realtime.Internal.WebSocket;
using LeanCloud.Common; using LeanCloud.Common;
using LeanCloud.Storage; using LeanCloud.Realtime.Internal.Connection;
namespace LeanCloud.LiveQuery.Internal { namespace LeanCloud.LiveQuery.Internal {
public class LCLiveQueryConnection { public class LCLiveQueryConnection {
@ -63,7 +63,7 @@ namespace LeanCloud.LiveQuery.Internal {
public LCLiveQueryConnection(string id) { public LCLiveQueryConnection(string id) {
this.id = id; this.id = id;
responses = new Dictionary<int, TaskCompletionSource<Dictionary<string, object>>>(); responses = new Dictionary<int, TaskCompletionSource<Dictionary<string, object>>>();
heartBeat = new LCLiveQueryHeartBeat(this); heartBeat = new LCLiveQueryHeartBeat(this, OnPingTimeout);
router = new LCRTMRouter(); router = new LCRTMRouter();
client = new LCWebSocketClient { client = new LCWebSocketClient {
OnMessage = OnClientMessage, OnMessage = OnClientMessage,
@ -82,6 +82,8 @@ namespace LeanCloud.LiveQuery.Internal {
LCLogger.Debug($"Secondary Server"); LCLogger.Debug($"Secondary Server");
await client.Connect(rtmServer.Secondary, SUB_PROTOCOL); await client.Connect(rtmServer.Secondary, SUB_PROTOCOL);
} }
// 启动心跳
heartBeat.Start();
} catch (Exception e) { } catch (Exception e) {
throw e; throw e;
} }
@ -92,10 +94,11 @@ namespace LeanCloud.LiveQuery.Internal {
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
internal async Task Reset() { internal async Task Reset() {
heartBeat?.Stop();
// 关闭就连接 // 关闭就连接
await client.Close(); await client.Close();
// 重新创建连接组件 // 重新创建连接组件
heartBeat = new LCLiveQueryHeartBeat(this); heartBeat = new LCLiveQueryHeartBeat(this, OnPingTimeout);
router = new LCRTMRouter(); router = new LCRTMRouter();
client = new LCWebSocketClient { client = new LCWebSocketClient {
OnMessage = OnClientMessage, OnMessage = OnClientMessage,
@ -151,7 +154,6 @@ namespace LeanCloud.LiveQuery.Internal {
} }
private void OnClientMessage(byte[] bytes) { private void OnClientMessage(byte[] bytes) {
_ = heartBeat.Refresh(OnPingTimeout);
try { try {
string json = Encoding.UTF8.GetString(bytes); string json = Encoding.UTF8.GetString(bytes);
Dictionary<string, object> msg = JsonConvert.DeserializeObject<Dictionary<string, object>>(json, Dictionary<string, object> msg = JsonConvert.DeserializeObject<Dictionary<string, object>>(json,
@ -177,8 +179,12 @@ namespace LeanCloud.LiveQuery.Internal {
LCLogger.Error($"No request for {requestIndex}"); LCLogger.Error($"No request for {requestIndex}");
} }
} else { } else {
// 通知 if (json == "{}") {
OnNotification?.Invoke(msg); heartBeat.Pong();
} else {
// 通知
OnNotification?.Invoke(msg);
}
} }
} catch (Exception e) { } catch (Exception e) {
LCLogger.Error(e); LCLogger.Error(e);
@ -194,7 +200,9 @@ namespace LeanCloud.LiveQuery.Internal {
private async void OnPingTimeout() { private async void OnPingTimeout() {
await client.Close(); await client.Close();
OnClientDisconnect(); OnDisconnect?.Invoke();
// 重连
_ = Reconnect();
} }
private async Task Reconnect() { private async Task Reconnect() {

View File

@ -1,55 +1,27 @@
using System; using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using LeanCloud.Realtime.Internal.Connection;
using System.Collections.Generic; using System.Collections.Generic;
namespace LeanCloud.LiveQuery.Internal { namespace LeanCloud.LiveQuery.Internal {
/// <summary> /// <summary>
/// LiveQuery 心跳控制器 /// LiveQuery 心跳控制器
/// </summary> /// </summary>
internal class LCLiveQueryHeartBeat { internal class LCLiveQueryHeartBeat : LCHeartBeat {
private const int PING_INTERVAL = 5000;
private const int PONG_INTERVAL = 5000;
private readonly LCLiveQueryConnection connection; private readonly LCLiveQueryConnection connection;
private CancellationTokenSource pingCTS; internal LCLiveQueryHeartBeat(LCLiveQueryConnection connection,
private CancellationTokenSource pongCTS; Action onTimeout) : base(onTimeout) {
internal LCLiveQueryHeartBeat(LCLiveQueryConnection connection) {
this.connection = connection; this.connection = connection;
} }
internal async Task Refresh(Action onTimeout) { protected override void SendPing() {
LCLogger.Debug("LiveQuery HeartBeat refresh"); try {
Stop(); _ = connection.SendText("{}");
} catch (Exception) {
pingCTS = new CancellationTokenSource();
Task delayTask = Task.Delay(PING_INTERVAL, pingCTS.Token);
await delayTask;
if (delayTask.IsCanceled) {
return;
} }
// 发送 Ping 包
LCLogger.Debug("Ping ~~~");
_ = connection.SendText("{}");
// 等待 Pong
pongCTS = new CancellationTokenSource();
Task timeoutTask = Task.Delay(PONG_INTERVAL, pongCTS.Token);
await timeoutTask;
if (timeoutTask.IsCanceled) {
return;
}
// 超时
LCLogger.Debug("Ping timeout");
onTimeout?.Invoke();
}
internal void Stop() {
pingCTS?.Cancel();
pongCTS?.Cancel();
} }
} }
} }

View File

@ -6,13 +6,12 @@ using Google.Protobuf;
using LeanCloud.Realtime.Internal.Router; using LeanCloud.Realtime.Internal.Router;
using LeanCloud.Realtime.Internal.WebSocket; using LeanCloud.Realtime.Internal.WebSocket;
using LeanCloud.Realtime.Internal.Protocol; using LeanCloud.Realtime.Internal.Protocol;
using LeanCloud.Storage;
namespace LeanCloud.Realtime.Internal.Connection { namespace LeanCloud.Realtime.Internal.Connection {
/// <summary> /// <summary>
/// 连接层,只与数据协议相关 /// 连接层,只与数据协议相关
/// </summary> /// </summary>
internal class LCConnection { public class LCConnection {
/// <summary> /// <summary>
/// 发送超时 /// 发送超时
/// </summary> /// </summary>
@ -28,11 +27,6 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// </summary> /// </summary>
private const int RECONNECT_INTERVAL = 10000; private const int RECONNECT_INTERVAL = 10000;
/// <summary>
/// 心跳间隔
/// </summary>
private const int HEART_BEAT_INTERVAL = 30000;
/// <summary> /// <summary>
/// 子协议 /// 子协议
/// </summary> /// </summary>
@ -71,7 +65,7 @@ namespace LeanCloud.Realtime.Internal.Connection {
internal LCConnection(string id) { internal LCConnection(string id) {
this.id = id; this.id = id;
responses = new Dictionary<int, TaskCompletionSource<GenericCommand>>(); responses = new Dictionary<int, TaskCompletionSource<GenericCommand>>();
heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL); heartBeat = new LCHeartBeat(this, OnPingTimeout);
router = new LCRTMRouter(); router = new LCRTMRouter();
client = new LCWebSocketClient { client = new LCWebSocketClient {
OnMessage = OnClientMessage, OnMessage = OnClientMessage,
@ -90,6 +84,8 @@ namespace LeanCloud.Realtime.Internal.Connection {
LCLogger.Debug($"Secondary Server"); LCLogger.Debug($"Secondary Server");
await client.Connect(rtmServer.Secondary, SUB_PROTOCOL); await client.Connect(rtmServer.Secondary, SUB_PROTOCOL);
} }
// 启动心跳
heartBeat.Start();
} catch (Exception e) { } catch (Exception e) {
throw e; throw e;
} }
@ -100,17 +96,18 @@ namespace LeanCloud.Realtime.Internal.Connection {
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
internal async Task Reset() { internal async Task Reset() {
heartBeat?.Stop();
// 关闭就连接 // 关闭就连接
await client.Close(); await client.Close();
// 重新创建连接组件 // 重新创建连接组件
heartBeat = new LCHeartBeat(this, HEART_BEAT_INTERVAL, HEART_BEAT_INTERVAL); heartBeat = new LCHeartBeat(this, OnPingTimeout);
router = new LCRTMRouter(); router = new LCRTMRouter();
client = new LCWebSocketClient { client = new LCWebSocketClient {
OnMessage = OnClientMessage, OnMessage = OnClientMessage,
OnClose = OnClientDisconnect OnClose = OnClientDisconnect
}; };
await Reconnect(); await Reconnect();
} }
/// <summary> /// <summary>
/// 发送请求,会在收到应答后返回 /// 发送请求,会在收到应答后返回
@ -158,7 +155,6 @@ namespace LeanCloud.Realtime.Internal.Connection {
} }
private void OnClientMessage(byte[] bytes) { private void OnClientMessage(byte[] bytes) {
_ = heartBeat.Refresh(OnPingTimeout);
try { try {
GenericCommand command = GenericCommand.Parser.ParseFrom(bytes); GenericCommand command = GenericCommand.Parser.ParseFrom(bytes);
LCLogger.Debug($"{id} <= {FormatCommand(command)}"); LCLogger.Debug($"{id} <= {FormatCommand(command)}");
@ -182,8 +178,12 @@ namespace LeanCloud.Realtime.Internal.Connection {
LCLogger.Error($"No request for {requestIndex}"); LCLogger.Error($"No request for {requestIndex}");
} }
} else { } else {
// 通知 if (command.Cmd == CommandType.Echo) {
OnNotification?.Invoke(command); heartBeat.Pong();
} else {
// 通知
OnNotification?.Invoke(command);
}
} }
} catch (Exception e) { } catch (Exception e) {
LCLogger.Error(e); LCLogger.Error(e);
@ -199,7 +199,9 @@ namespace LeanCloud.Realtime.Internal.Connection {
private async void OnPingTimeout() { private async void OnPingTimeout() {
await client.Close(); await client.Close();
OnClientDisconnect(); OnDisconnect?.Invoke();
// 重连
_ = Reconnect();
} }
private async Task Reconnect() { private async Task Reconnect() {

View File

@ -6,75 +6,98 @@ using LeanCloud.Realtime.Internal.Protocol;
namespace LeanCloud.Realtime.Internal.Connection { namespace LeanCloud.Realtime.Internal.Connection {
/// <summary> /// <summary>
/// 心跳控制器,由于 .Net Standard 2.0 不支持发送 ping frame所以需要发送逻辑心跳 /// 心跳控制器,由于 .Net Standard 2.0 不支持发送 ping frame所以需要发送逻辑心跳
/// 1. 每次接收到消息后开始监听,如果在 pingInterval 时间内没有再次接收到消息,则发送 ping 请求; /// 1. 每隔 180s 发送 ping 包
/// 2. 发送后等待 pongInterval 时间,如果在此时间内接收到了任何消息,则取消并重新开始监听 1 /// 2. 接收到 pong 包刷新上次 pong 时间
/// 3. 如果没收到消息,则认为超时并回调,连接层接收回调后放弃当前连接,以断线逻辑处理 /// 3. 每隔 180s 检测 pong 包间隔,超过 360s 则认为断开
/// </summary> /// </summary>
internal class LCHeartBeat { public class LCHeartBeat {
private const int PING_INTERVAL = 5 * 1000;
private readonly LCConnection connection; private readonly LCConnection connection;
/// <summary> private readonly Action onTimeout;
/// ping 间隔
/// </summary>
private readonly int pingInterval;
/// <summary> private CancellationTokenSource heartBeatCTS;
/// pong 间隔
/// </summary>
private readonly int pongInterval;
private CancellationTokenSource pingCTS; private bool running = false;
private CancellationTokenSource pongCTS;
private DateTimeOffset lastPongTime;
public LCHeartBeat(Action onTimeout) {
this.onTimeout = onTimeout;
}
internal LCHeartBeat(LCConnection connection, internal LCHeartBeat(LCConnection connection,
int pingInterval, Action onTimeout) : this(onTimeout) {
int pongInterval) {
this.connection = connection; this.connection = connection;
this.pingInterval = pingInterval;
this.pongInterval = pongInterval;
} }
/// <summary> /// <summary>
/// 更新心跳监听 /// 启动心跳
/// </summary> /// </summary>
/// <returns></returns> public void Start() {
internal async Task Refresh(Action onTimeout) { running = true;
LCLogger.Debug("HeartBeat refresh"); heartBeatCTS = new CancellationTokenSource();
pingCTS?.Cancel(); StartPing();
pongCTS?.Cancel(); StartPong();
}
// 计时准备 ping private async void StartPing() {
pingCTS = new CancellationTokenSource(); while (running) {
Task delayTask = Task.Delay(pingInterval, pingCTS.Token); try {
await delayTask; await Task.Delay(PING_INTERVAL, heartBeatCTS.Token);
if (delayTask.IsCanceled) { } catch (TaskCanceledException) {
return; return;
}
LCLogger.Debug("Ping ~~~");
SendPing();
} }
}
protected virtual void SendPing() {
// 发送 ping 包 // 发送 ping 包
LCLogger.Debug("Ping ~~~");
GenericCommand command = new GenericCommand { GenericCommand command = new GenericCommand {
Cmd = CommandType.Echo, Cmd = CommandType.Echo,
AppId = LCApplication.AppId, AppId = LCApplication.AppId,
PeerId = connection.id PeerId = connection.id
}; };
_ = connection.SendRequest(command); try {
pongCTS = new CancellationTokenSource(); _ = connection.SendCommand(command);
Task timeoutTask = Task.Delay(pongInterval, pongCTS.Token); } catch (Exception e) {
await timeoutTask; LCLogger.Error(e.Message);
if (timeoutTask.IsCanceled) {
return;
} }
// timeout }
LCLogger.Error("Ping timeout");
onTimeout.Invoke(); private async void StartPong() {
lastPongTime = DateTimeOffset.Now;
while (running) {
try {
await Task.Delay(PING_INTERVAL / 2, heartBeatCTS.Token);
} catch (TaskCanceledException) {
return;
}
// 检查 pong 包时间
TimeSpan interval = DateTimeOffset.Now - lastPongTime;
if (interval.TotalMilliseconds > PING_INTERVAL * 2) {
// 断线
Stop();
onTimeout.Invoke();
}
}
}
public void Pong() {
LCLogger.Debug("Pong ~~~");
// 刷新最近 pong 时间戳
lastPongTime = DateTimeOffset.Now;
} }
/// <summary> /// <summary>
/// 停止心跳监听 /// 停止心跳监听
/// </summary> /// </summary>
internal void Stop() { public void Stop() {
pingCTS?.Cancel(); running = false;
pongCTS?.Cancel(); heartBeatCTS.Cancel();
} }
} }
} }