using System; using System.Threading.Tasks; using System.Net.WebSockets; using LeanCloud.Common; using LeanCloud.Realtime.Internal.Router; using LeanCloud.Realtime.Internal.Connection; namespace LeanCloud.Realtime.Internal.WebSocket { /// /// WebSocket 客户端,负责底层连接和事件,只与通信协议相关 /// internal class LCWebSocketClient { // .net standard 2.0 好像在拼合 Frame 时有 bug,所以将这个值调整大一些 private const int RECV_BUFFER_SIZE = 1024 * 5; /// /// 关闭超时 /// private const int CLOSE_TIMEOUT = 5000; /// /// 连接超时 /// private const int CONNECT_TIMEOUT = 10000; /// /// 消息事件 /// internal Action OnMessage; /// /// 连接关闭 /// internal Action OnClose; private ClientWebSocket ws; private readonly LCRTMRouter router; private readonly LCHeartBeat heartBeat; internal LCWebSocketClient(LCRTMRouter router, LCHeartBeat heartBeat) { this.router = router; this.heartBeat = heartBeat; } /// /// 连接 /// /// internal async Task Connect() { try { LCRTMServer rtmServer = await router.GetServer(); try { LCLogger.Debug($"Primary Server"); await Connect(rtmServer.Primary); } catch (Exception e) { LCLogger.Error(e); LCLogger.Debug($"Secondary Server"); await Connect(rtmServer.Secondary); } } catch (Exception e) { throw e; } // 接收 _ = StartReceive(); } /// /// 连接指定 ws 服务器 /// /// /// private async Task Connect(string server) { LCLogger.Debug($"Connecting WebSocket: {server}"); Task timeoutTask = Task.Delay(CONNECT_TIMEOUT); ws = new ClientWebSocket(); ws.Options.AddSubProtocol("lc.protobuf2.3"); Task connectTask = ws.ConnectAsync(new Uri(server), default); if (await Task.WhenAny(connectTask, timeoutTask) == connectTask) { LCLogger.Debug($"Connected WebSocket: {server}"); } else { throw new TimeoutException("Connect timeout"); } } /// /// 主动关闭连接 /// /// internal async Task Close() { LCLogger.Debug("Closing WebSocket"); OnMessage = null; OnClose = null; heartBeat.Stop(); try { // 发送关闭帧可能会很久,所以增加超时 // 主动挥手关闭,不会再收到 Close Frame Task closeTask = ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default); Task delayTask = Task.Delay(CLOSE_TIMEOUT); await Task.WhenAny(closeTask, delayTask); } catch (Exception e) { LCLogger.Error(e); } finally { ws.Abort(); ws.Dispose(); LCLogger.Debug("Closed WebSocket"); } } /// /// 发送数据 /// /// /// internal async Task Send(byte[] data) { ArraySegment bytes = new ArraySegment(data); if (ws.State == WebSocketState.Open) { try { await ws.SendAsync(bytes, WebSocketMessageType.Binary, true, default); } catch (Exception e) { LCLogger.Error(e); throw e; } } else { string message = $"Error Websocket state: {ws.State}"; LCLogger.Error(message); throw new Exception(message); } } /// /// 接收数据 /// /// private async Task StartReceive() { byte[] buffer = new byte[RECV_BUFFER_SIZE]; try { while (ws.State == WebSocketState.Open) { WebSocketReceiveResult result = await ws.ReceiveAsync(new ArraySegment(buffer), default); if (result.MessageType == WebSocketMessageType.Close) { LCLogger.Debug($"Receive Closed: {result.CloseStatus}"); if (ws.State == WebSocketState.CloseReceived) { // 如果是服务端主动关闭,则挥手关闭,并认为是断线 try { Task closeTask = ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default); await Task.WhenAny(closeTask, Task.Delay(CLOSE_TIMEOUT)); } catch (Exception e) { LCLogger.Error(e); } finally { HandleExceptionClose(); } } } else if (result.MessageType == WebSocketMessageType.Binary) { _ = heartBeat.Update(HandleExceptionClose); // 拼合 WebSocket Message int length = result.Count; byte[] data = new byte[length]; Array.Copy(buffer, data, length); OnMessage?.Invoke(data); } else { LCLogger.Error($"Error message type: {result.MessageType}"); } } } catch (Exception e) { // 客户端网络异常 LCLogger.Error(e); OnClose?.Invoke(); } } private void HandleExceptionClose() { try { heartBeat.Stop(); ws.Abort(); ws.Dispose(); } catch (Exception e) { LCLogger.Error(e); } finally { OnClose?.Invoke(); } } } }