* LCIMConversation.cs: chore: 未读消息等逻辑

* LCIMClient.cs:
* Program.cs:
* LCIMFileMessage.cs:
* LCIMAudioMessage.cs:
* LCIMImageMessage.cs:
* LCIMTypedMessage.cs:
* LCIMVideoMessage.cs:
* LCIMConversationQuery.cs:
* LCIMMessageQueryOptions.cs:
* LCIMServiceConversation.cs:
* LCWebSocketConnection.cs:
oneRain 2020-03-24 17:42:04 +08:00
parent 5dcdc56f56
commit cd453ce134
12 changed files with 441 additions and 57 deletions

View File

@ -38,6 +38,14 @@ namespace LeanCloud.Realtime {
get; internal set;
}
public int Unread {
get; internal set;
}
public LCIMMessage LastMessage {
get; internal set;
}
public DateTime CreatedAt {
get; internal set;
}
@ -86,6 +94,28 @@ namespace LeanCloud.Realtime {
return response.ConvMessage.Count;
}
/// <summary>
/// 将该会话标记为已读
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public async Task<LCIMConversation> Read() {
if (LastMessage == null) {
return this;
}
ReadCommand read = new ReadCommand();
ReadTuple tuple = new ReadTuple {
Cid = Id,
Mid = LastMessage.Id,
Timestamp = LastMessage.SentTimestamp
};
read.Convs.Add(tuple);
GenericCommand request = client.NewCommand(CommandType.Read, OpType.Open);
request.ReadMessage = read;
await client.connection.SendRequest(request);
return this;
}
public async Task<LCIMConversation> Save() {
ConvCommand conv = new ConvCommand {
Cid = Id,
@ -479,6 +509,38 @@ namespace LeanCloud.Realtime {
};
}
public async Task<List<LCIMMessage>> QueryMessage(LCIMMessageQueryEndpoint start = null,
LCIMMessageQueryEndpoint end = null,
LCIMMessageQueryDirection direction = LCIMMessageQueryDirection.NewToOld,
int limit = 20,
int messageType = 0) {
LogsCommand logs = new LogsCommand {
Cid = Id
};
if (start != null) {
logs.T = start.SentTimestamp;
logs.Mid = start.MessageId;
logs.TIncluded = start.IsClosed;
}
if (end != null) {
logs.Tt = end.SentTimestamp;
logs.Tmid = end.MessageId;
logs.TtIncluded = end.IsClosed;
}
logs.Direction = direction == LCIMMessageQueryDirection.NewToOld ?
LogsCommand.Types.QueryDirection.Old : LogsCommand.Types.QueryDirection.New;
logs.Limit = limit;
if (messageType != 0) {
logs.Lctype = messageType;
}
GenericCommand request = client.NewCommand(CommandType.Logs, OpType.Open);
request.LogsMessage = logs;
GenericCommand response = await client.connection.SendRequest(request);
// TODO 反序列化聊天记录
return null;
}
private LCIMPartiallySuccessResult NewPartiallySuccessResult(IEnumerable<string> succesfulIds, IEnumerable<ErrorCommand> errors) {
LCIMPartiallySuccessResult result = new LCIMPartiallySuccessResult {
SuccessfulClientIdList = succesfulIds.ToList()

View File

@ -256,7 +256,10 @@ namespace LeanCloud.Realtime {
foreach (object c in convs) {
Dictionary<string, object> cd = c as Dictionary<string, object>;
string convId = cd["objectId"] as string;
LCIMConversation conversation = client.GetOrCreateConversation(convId);
if (!client.conversationDict.TryGetValue(convId, out LCIMConversation conversation)) {
conversation = new LCIMConversation(client);
client.conversationDict[convId] = conversation;
}
conversation.MergeFrom(cd);
convList.Add(conversation);
}

View File

@ -0,0 +1,26 @@
using System;
namespace LeanCloud.Realtime {
public class LCIMMessageQueryEndpoint {
public string MessageId {
get; set;
}
public long SentTimestamp {
get; set;
}
public bool IsClosed {
get; set;
}
public LCIMMessageQueryEndpoint() {
}
}
public enum LCIMMessageQueryDirection {
NewToOld,
OldToNew
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
namespace LeanCloud.Realtime {
public class LCIMServiceConversation : LCIMConversation {

View File

@ -23,7 +23,7 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
private string id;
internal Action<GenericCommand> OnNotification {
internal Func<GenericCommand, Task> OnNotification {
get; set;
}
@ -77,14 +77,14 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
}
// 拼合 WebSocket Frame
byte[] oldData = data;
data = new byte[data.Length + result.Count];
data = new byte[oldData.Length + result.Count];
Array.Copy(oldData, data, oldData.Length);
Array.Copy(buffer, 0, data, oldData.Length, result.Count);
} while (!result.EndOfMessage);
try {
GenericCommand command = GenericCommand.Parser.ParseFrom(data);
LCLogger.Debug($"{id} <= {command.Cmd}/{command.Op}: {command.ToString()}");
HandleCommand(command);
_ = HandleCommand(command);
} catch (Exception e) {
// 解析消息错误
LCLogger.Error(e.Message);
@ -96,25 +96,29 @@ namespace LeanCloud.Realtime.Internal.WebSocket {
}
}
private void HandleCommand(GenericCommand command) {
if (command.HasI) {
// 应答
if (responses.TryGetValue(command.I, out TaskCompletionSource<GenericCommand> tcs)) {
if (command.HasErrorMessage) {
// 错误
ErrorCommand error = command.ErrorMessage;
int code = error.Code;
string detail = error.Detail;
// TODO 包装成异常抛出
LCException exception = new LCException(code, detail);
tcs.SetException(exception);
} else {
tcs.SetResult(command);
private async Task HandleCommand(GenericCommand command) {
try {
if (command.HasI) {
// 应答
if (responses.TryGetValue(command.I, out TaskCompletionSource<GenericCommand> tcs)) {
if (command.HasErrorMessage) {
// 错误
ErrorCommand error = command.ErrorMessage;
int code = error.Code;
string detail = error.Detail;
// TODO 包装成异常抛出
LCException exception = new LCException(code, detail);
tcs.SetException(exception);
} else {
tcs.SetResult(command);
}
}
} else {
// 通知
await OnNotification?.Invoke(command);
}
} else {
// 通知
OnNotification?.Invoke(command);
} catch (Exception e) {
LCLogger.Error(e.Message);
}
}

View File

@ -5,13 +5,14 @@ using System.Linq;
using LeanCloud.Realtime.Internal.WebSocket;
using LeanCloud.Realtime.Protocol;
using LeanCloud.Storage.Internal.Codec;
using LeanCloud.Storage.Internal;
using Newtonsoft.Json;
namespace LeanCloud.Realtime {
public class LCIMClient {
internal LCWebSocketConnection connection;
private Dictionary<string, LCIMConversation> conversationDict;
internal Dictionary<string, LCIMConversation> conversationDict;
public string ClientId {
get; private set;
@ -29,6 +30,11 @@ namespace LeanCloud.Realtime {
get; set;
}
/// <summary>
/// 当前客户端在某个对话中被禁言
/// </summary>
public Action<LCIMConversation, string> OnMuted;
/// <summary>
/// 客户端连接断开
/// </summary>
@ -50,6 +56,25 @@ namespace LeanCloud.Realtime {
get; set;
}
/// <summary>
/// 客户端连接断开
/// </summary>
public Action OnDisconnect {
get; set;
}
/// <summary>
/// 用户在其他客户端登录,当前客户端被服务端强行下线
/// </summary>
public Action<string> OnConflict {
get; set;
}
/// <summary>
/// 该对话信息被更新
/// </summary>
public Action<LCIMConversation, Dictionary<string, object>, string> OnConversationInfoUpdated;
/// <summary>
/// 当前用户被添加至某个对话
/// </summary>
@ -71,14 +96,74 @@ namespace LeanCloud.Realtime {
get; set;
}
/// <summary>
/// 有成员被从某个对话中移除
/// </summary>
public Action<LCIMConversation, List<string>, string> OnMembersLeft {
get; set;
}
/// <summary>
/// 有成员被加入某个对话的黑名单
/// </summary>
public Action<LCIMConversation, List<string>, string> OnMembersBlocked {
get; set;
}
/// <summary>
/// 有成员被移出某个对话的黑名单
/// </summary>
public Action<LCIMConversation, List<string>, string> OnMembersUnblocked {
get; set;
}
/// <summary>
/// 有成员在某个对话中被禁言
/// </summary>
public Action<LCIMConversation, List<string>, string> OnMembersMuted {
get; set;
}
/// <summary>
/// 有成员被移出某个对话的黑名单
/// </summary>
public Action<LCIMConversation, List<string>, string> OnMembersUnmuted {
get; set;
}
/// <summary>
/// 有成员的对话信息被更新
/// </summary>
public Action<LCIMConversation, string, Dictionary<string, object>, string> OnMemberInfoUpdated;
/// <summary>
/// 当前用户收到消息
/// </summary>
public Action<LCIMConversation, LCIMMessage> OnMessageReceived {
get; set;
}
/// <summary>
/// 消息被撤回
/// </summary>
public Action<LCIMConversation, LCIMMessage> OnMessageRecall {
get; set;
}
/// <summary>
/// 消息被修改
/// </summary>
public Action<LCIMConversation, LCIMMessage> OnMessageUpdate {
get; set;
}
/// <summary>
/// 未读消息数目更新
/// </summary>
public Action<List<LCIMConversation>> OnUnreadMessagesCountUpdated {
get; set;
}
internal ILCIMSignatureFactory SignatureFactory {
get; private set;
}
@ -117,6 +202,8 @@ namespace LeanCloud.Realtime {
/// </summary>
/// <returns></returns>
public async Task Close() {
GenericCommand request = NewCommand(CommandType.Session, OpType.Close);
await connection.SendRequest(request);
await connection.Close();
}
@ -209,8 +296,8 @@ namespace LeanCloud.Realtime {
if (string.IsNullOrEmpty(id)) {
throw new ArgumentNullException(nameof(id));
}
LCIMConversationQuery query = GetQuery();
query.WhereEqualTo("objectId", id)
LCIMConversationQuery query = GetQuery()
.WhereEqualTo("objectId", id)
.Limit(1);
List<LCIMConversation> results = await query.Find();
if (results == null || results.Count < 1) {
@ -244,19 +331,44 @@ namespace LeanCloud.Realtime {
return new LCIMConversationQuery(this);
}
private void OnNotification(GenericCommand notification) {
private async Task OnNotification(GenericCommand notification) {
switch (notification.Cmd) {
case CommandType.Session:
await OnSessionNotification(notification);
break;
case CommandType.Conv:
OnConversationNotification(notification);
break;
case CommandType.Direct:
OnDirectNotification(notification.DirectMessage);
await OnDirectNotification(notification.DirectMessage);
break;
case CommandType.Unread:
await OnUnreadNotification(notification.UnreadMessage);
break;
default:
break;
}
}
private async Task OnSessionNotification(GenericCommand notification) {
switch (notification.Op) {
case OpType.Closed:
await OnSessionClosed(notification.SessionMessage);
break;
default:
break;
}
}
private async Task OnSessionClosed(SessionCommand session) {
int code = session.Code;
string reason = session.Reason;
string detail = session.Detail;
await connection.Close();
// TODO 关闭连接后回调给开发者
}
private void OnConversationNotification(GenericCommand notification) {
ConvCommand conv = notification.ConvMessage;
switch (notification.Op) {
@ -273,21 +385,24 @@ namespace LeanCloud.Realtime {
OnConversationMemberLeft(conv);
break;
case OpType.Updated:
OnPropertiesUpdated(conv);
OnConversationPropertiesUpdated(conv);
break;
case OpType.MemberInfoChanged:
OnConversationMemberInfoChanged(conv);
break;
default:
break;
}
}
private void OnConversationJoined(ConvCommand conv) {
LCIMConversation conversation = GetOrCreateConversation(conv.Cid);
private async void OnConversationJoined(ConvCommand conv) {
LCIMConversation conversation = await GetOrQueryConversation(conv.Cid);
conversation.MergeFrom(conv);
OnInvited?.Invoke(conversation, conv.InitBy);
}
private void OnConversationMembersJoined(ConvCommand conv) {
LCIMConversation conversation = GetOrCreateConversation(conv.Cid);
private async void OnConversationMembersJoined(ConvCommand conv) {
LCIMConversation conversation = await GetOrQueryConversation(conv.Cid);
conversation.MergeFrom(conv);
OnMembersJoined?.Invoke(conversation, conv.M.ToList(), conv.InitBy);
}
@ -305,14 +420,19 @@ namespace LeanCloud.Realtime {
}
}
private void OnPropertiesUpdated(ConvCommand conv) {
private void OnConversationPropertiesUpdated(ConvCommand conv) {
if (conversationDict.TryGetValue(conv.Cid, out LCIMConversation conversation)) {
// TODO
// TODO 修改对话属性,并回调给开发者
OnConversationInfoUpdated?.Invoke(conversation, null, conv.InitBy);
}
}
private void OnDirectNotification(DirectCommand direct) {
private void OnConversationMemberInfoChanged(ConvCommand conv) {
}
private async Task OnDirectNotification(DirectCommand direct) {
LCIMMessage message = null;
if (direct.HasBinaryMsg) {
// 二进制消息
@ -321,7 +441,8 @@ namespace LeanCloud.Realtime {
} else {
// 文本消息
string messageData = direct.Msg;
Dictionary<string, object> msg = JsonConvert.DeserializeObject<Dictionary<string, object>>(messageData);
Dictionary<string, object> msg = JsonConvert.DeserializeObject<Dictionary<string, object>>(messageData,
new LCJsonConverter());
int msgType = (int)(long)msg["_lctype"];
switch (msgType) {
case -1:
@ -347,16 +468,30 @@ namespace LeanCloud.Realtime {
}
message.Decode(direct);
}
// TODO 获取对话
OnMessageReceived?.Invoke(null, message);
// 获取对话
LCIMConversation conversation = await GetOrQueryConversation(direct.Cid);
OnMessageReceived?.Invoke(conversation, message);
}
internal LCIMConversation GetOrCreateConversation(string convId) {
if (!conversationDict.TryGetValue(convId, out LCIMConversation conversation)) {
conversation = new LCIMConversation(this);
conversationDict.Add(convId, conversation);
private async Task OnUnreadNotification(UnreadCommand unread) {
List<LCIMConversation> conversationList = new List<LCIMConversation>();
foreach (UnreadTuple conv in unread.Convs) {
// 查询对话
LCIMConversation conversation = await GetOrQueryConversation(conv.Cid);
conversation.Unread = conv.Unread;
// TODO 反序列化对话
// 最后一条消息
JsonConvert.DeserializeObject<Dictionary<string, object>>(conv.Data);
conversationList.Add(conversation);
}
OnUnreadMessagesCountUpdated?.Invoke(conversationList);
}
internal async Task<LCIMConversation> GetOrQueryConversation(string convId) {
if (conversationDict.TryGetValue(convId, out LCIMConversation conversation)) {
return conversation;
}
conversation = await GetConversation(convId);
return conversation;
}

View File

@ -23,7 +23,9 @@ namespace LeanCloud.Realtime {
Dictionary<string, object> data = base.Encode();
Dictionary<string, object> fileData = data["_lcfile"] as Dictionary<string, object>;
Dictionary<string, object> metaData = fileData["metaData"] as Dictionary<string, object>;
metaData["duration"] = File.MetaData["duration"];
if (File.MetaData.TryGetValue("duration", out object duration)) {
metaData["duration"] = duration;
}
return data;
}

View File

@ -46,10 +46,13 @@ namespace LeanCloud.Realtime {
{ "url", File.Url },
{ "metaData", new Dictionary<string, object> {
{ "name", File.Name },
{ "format", File.MimeType },
{ "size", File.MetaData["size"] }
{ "format", File.MimeType }
} }
};
if (File.MetaData.TryGetValue("size", out object size)) {
Dictionary<string, object> metaData = fileData["metaData"] as Dictionary<string, object>;
metaData["size"] = size;
}
Dictionary<string, object> data = base.Encode();
data["_lcfile"] = fileData;
return data;

View File

@ -32,8 +32,12 @@ namespace LeanCloud.Realtime {
Dictionary<string, object> data = base.Encode();
Dictionary<string, object> fileData = data["_lcfile"] as Dictionary<string, object>;
Dictionary<string, object> metaData = fileData["metaData"] as Dictionary<string, object>;
metaData["width"] = File.MetaData["width"];
metaData["height"] = File.MetaData["height"];
if (File.MetaData.TryGetValue("width", out object width)) {
metaData["width"] = width;
}
if (File.MetaData.TryGetValue("height", out object height)) {
metaData["height"] = height;
}
return data;
}

View File

@ -52,5 +52,34 @@ namespace LeanCloud.Realtime {
customProperties = LCDecoder.Decode(attrObj) as Dictionary<string, object>;
}
}
internal static LCIMTypedMessage Deserialize(Dictionary<string, object> messageData) {
LCIMTypedMessage message = null;
int msgType = (int)(long)messageData["_lctype"];
switch (msgType) {
case -1:
message = new LCIMTextMessage();
break;
case -2:
message = new LCIMImageMessage();
break;
case -3:
message = new LCIMAudioMessage();
break;
case -4:
message = new LCIMVideoMessage();
break;
case -5:
message = new LCIMLocationMessage();
break;
case -6:
message = new LCIMFileMessage();
break;
default:
break;
}
//message.Decode(direct);
return message;
}
}
}

View File

@ -23,9 +23,15 @@ namespace LeanCloud.Realtime {
Dictionary<string, object> data = base.Encode();
Dictionary<string, object> fileData = data["_lcfile"] as Dictionary<string, object>;
Dictionary<string, object> metaData = fileData["metaData"] as Dictionary<string, object>;
metaData["width"] = File.MetaData["width"];
metaData["height"] = File.MetaData["height"];
metaData["duration"] = File.MetaData["duration"];
if (File.MetaData.TryGetValue("width", out object width)) {
metaData["width"] = width;
}
if (File.MetaData.TryGetValue("height", out object height)) {
metaData["height"] = height;
}
if (File.MetaData.TryGetValue("duration", out object duration)) {
metaData["duration"] = duration;
}
return data;
}

View File

@ -15,13 +15,13 @@ namespace RealtimeConsole {
LCLogger.LogDelegate += (level, info) => {
switch (level) {
case LCLogLevel.Debug:
Console.WriteLine($"[DEBUG] {info}");
Console.WriteLine($"[DEBUG]\n{info}");
break;
case LCLogLevel.Warn:
Console.WriteLine($"[WARNING] {info}");
Console.WriteLine($"[WARNING]\n{info}");
break;
case LCLogLevel.Error:
Console.WriteLine($"[ERROR] {info}");
Console.WriteLine($"[ERROR]\n{info}");
break;
default:
Console.WriteLine(info);
@ -30,22 +30,131 @@ namespace RealtimeConsole {
};
LCApplication.Initialize("ikGGdRE2YcVOemAaRbgp1xGJ-gzGzoHsz", "NUKmuRbdAhg1vrb2wexYo1jo", "https://ikggdre2.lc-cn-n1-shared.com");
//_ = Start();
//Conversation().Wait();
//_ = ChatRoom();
_ = TemporaryConversation();
//_ = TemporaryConversation();
//_ = Signature();
//_ = Block();
//_ = Mute();
//QueryConversation().Wait();
//_ = OpenAndClose();
//SendMessage().Wait();
_ = Unread();
Console.ReadKey(true);
}
static async Task Run(int s) {
for (int i = 0; i < s; i++) {
Console.WriteLine($"run {i}");
await Task.Delay(1000);
Console.WriteLine($"run {i} done");
}
}
static async Task Unread() {
LCIMClient u2 = new LCIMClient("u2");
await u2.Open();
u2.OnUnreadMessagesCountUpdated = conversationList => {
foreach (LCIMConversation conv in conversationList) {
Console.WriteLine($"unread: {conv.Unread}");
}
};
}
static async Task SendMessage() {
try {
LCIMClient u1 = new LCIMClient("u1");
await u1.Open();
LCIMConversation conversation = await u1.CreateConversation(new string[] { "u2" });
LCIMTextMessage textMessage = new LCIMTextMessage("hello, text message");
await conversation.Send(textMessage);
//LCFile file = new LCFile("avatar", "../../../Storage.Test/assets/hello.png");
//await file.Save();
//LCIMImageMessage imageMessage = new LCIMImageMessage(file);
//await conversation.Send(imageMessage);
} catch (Exception e) {
Console.WriteLine(e.ToString());
}
}
static async Task OpenAndClose() {
LCIMClient o1 = new LCIMClient("o1");
await o1.Open();
await o1.Close();
}
static async Task QueryConversation() {
LCIMClient m2 = new LCIMClient("m2");
await m2.Open();
LCIMConversation conv = (await m2.GetQuery()
.WhereEqualTo("objectId", "5e7863bf90aef5aa849be75a")
.Find())[0];
LCIMTextMessage textMessage = new LCIMTextMessage("hello, world");
await conv.Send(textMessage);
}
static async Task Mute() {
LCIMClient m1 = new LCIMClient("m0");
await m1.Open();
LCIMClient m2 = new LCIMClient("m2");
await m2.Open();
LCIMConversation conversation = await m1.CreateConversation(new string[] { "m2", "m3" });
await conversation.MuteMembers(new string[] { "m2" });
LCIMConversation conv = (await m2.GetQuery()
.WhereEqualTo("objectId", conversation.Id)
.Find())[0];
LCIMTextMessage textMessage = new LCIMTextMessage("hello, world");
await conv.Send(textMessage);
}
static async Task Block() {
LocalSignatureFactory signatureFactory = new LocalSignatureFactory();
LCIMClient c1 = new LCIMClient("c0");
await c1.Open();
LCIMConversation conversation = await c1.CreateConversation(new string[] { "c2", "c3", "c4", "c5" });
LCIMTextMessage textMessage = new LCIMTextMessage("hello");
await conversation.Send(textMessage);
await conversation.BlockMembers(new string[] { "c5" });
LCIMClient c5 = new LCIMClient("c5");
await c5.Open();
await conversation.Add(new string[] { "c5" });
}
static async Task Signature() {
LocalSignatureFactory signatureFactory = new LocalSignatureFactory();
LCIMClient hello = new LCIMClient("hello111", signatureFactory);
await hello.Open();
}
static async Task ChatRoom() {
LCIMClient hello = new LCIMClient("hello");
LocalSignatureFactory signatureFactory = new LocalSignatureFactory();
LCIMClient hello = new LCIMClient("hello", signatureFactory);
await hello.Open();
string name = Guid.NewGuid().ToString();
LCIMChatRoom chatRoom = await hello.CreateChatRoom(name);
Console.WriteLine(chatRoom.Name);
await chatRoom.Add(new string[] { "world" });
await chatRoom.Remove(new string[] { "world" });
}
static async Task TemporaryConversation() {
@ -61,7 +170,7 @@ namespace RealtimeConsole {
Console.WriteLine(temporaryConversation.Id);
}
static async Task Start() {
static async Task Conversation() {
LCIMClient hello = new LCIMClient("hello");
await hello.Open();