diff --git a/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs b/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs index d2e4466..d11a33e 100644 --- a/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs +++ b/LiveQuery/LiveQuery/Internal/LCLiveQueryConnection.cs @@ -136,9 +136,9 @@ namespace LeanCloud.LiveQuery.Internal { await client.Close(); } - private void OnClientMessage(byte[] bytes) { + private void OnClientMessage(byte[] bytes, int length) { try { - string json = Encoding.UTF8.GetString(bytes); + string json = Encoding.UTF8.GetString(bytes, 0, length); Dictionary msg = JsonConvert.DeserializeObject>(json, LCJsonConverter.Default); LCLogger.Debug($"{id} <= {json}"); diff --git a/Realtime/Realtime/Internal/Connection/LCConnection.cs b/Realtime/Realtime/Internal/Connection/LCConnection.cs index 8e3d1d5..5716a74 100644 --- a/Realtime/Realtime/Internal/Connection/LCConnection.cs +++ b/Realtime/Realtime/Internal/Connection/LCConnection.cs @@ -160,9 +160,9 @@ namespace LeanCloud.Realtime.Internal.Connection { } } - private void OnMessage(byte[] bytes) { + private void OnMessage(byte[] bytes, int length) { try { - GenericCommand command = GenericCommand.Parser.ParseFrom(bytes); + GenericCommand command = GenericCommand.Parser.ParseFrom(bytes, 0, length); LCLogger.Debug($"{id} <= {FormatCommand(command)}"); if (command.HasI) { // 应答 diff --git a/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs b/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs index 1a6e5e4..6c6456f 100644 --- a/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs +++ b/Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs @@ -6,13 +6,15 @@ using System.Text; namespace LeanCloud.Realtime.Internal.WebSocket { public class LCWebSocketClient { // .net standard 2.0 好像在拼合 Frame 时有 bug,所以将这个值调整大一些 - private const int RECV_BUFFER_SIZE = 1024 * 5; + private const int SEND_BUFFER_SIZE = 1024 * 5; + private const int RECV_BUFFER_SIZE = 1024 * 8; + private const int MSG_BUFFER_SIZE = 1024 * 10; private const int CLOSE_TIMEOUT = 5000; private const int CONNECT_TIMEOUT = 10000; - public Action OnMessage; + public Action OnMessage; public Action OnClose; @@ -23,6 +25,7 @@ namespace LeanCloud.Realtime.Internal.WebSocket { LCLogger.Debug($"Connecting WebSocket: {server}"); Task timeoutTask = Task.Delay(CONNECT_TIMEOUT); ws = new ClientWebSocket(); + ws.Options.SetBuffer(RECV_BUFFER_SIZE, SEND_BUFFER_SIZE); if (!string.IsNullOrEmpty(subProtocol)) { ws.Options.AddSubProtocol(subProtocol); } @@ -78,10 +81,12 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } private async Task StartReceive() { - byte[] buffer = new byte[RECV_BUFFER_SIZE]; + byte[] recvBuffer = new byte[RECV_BUFFER_SIZE]; + byte[] msgBuffer = new byte[MSG_BUFFER_SIZE]; + int offset = 0; try { while (ws.State == WebSocketState.Open) { - WebSocketReceiveResult result = await ws.ReceiveAsync(new ArraySegment(buffer), default); + WebSocketReceiveResult result = await ws.ReceiveAsync(new ArraySegment(recvBuffer), default); if (result.MessageType == WebSocketMessageType.Close) { LCLogger.Debug($"Receive Closed: {result.CloseStatus}"); if (ws.State == WebSocketState.CloseReceived) { @@ -98,9 +103,18 @@ namespace LeanCloud.Realtime.Internal.WebSocket { } else { // 拼合 WebSocket Message int length = result.Count; - byte[] data = new byte[length]; - Array.Copy(buffer, data, length); - OnMessage?.Invoke(data); + if (offset + length > msgBuffer.Length) { + // 反序列化数组大小不够,则以 2x 扩充 + byte[] newBuffer = new byte[msgBuffer.Length * 2]; + Array.Copy(msgBuffer, newBuffer, msgBuffer.Length); + msgBuffer = newBuffer; + } + Array.Copy(recvBuffer, 0, msgBuffer, offset, length); + offset += length; + if (result.EndOfMessage) { + OnMessage?.Invoke(msgBuffer, offset); + offset = 0; + } } } } catch (Exception e) {