* AVCorePlugins.cs: chore: 简化 AVObject 相关请求

* AVObject.cs:
* AVObject2.cs:
* AVCommandRunner.cs:
* QiniuUploader.cs:
* QCloudUploader.cs:
* AVObjectController.cs:
oneRain 2019-11-26 16:43:48 +08:00
parent 5d88f601b3
commit 3bdf46e5a5
9 changed files with 92 additions and 219 deletions

View File

@ -1,12 +0,0 @@
using NUnit.Framework;
using System.Threading.Tasks;
using LeanCloud.Storage.Internal;
namespace LeanCloud.Test {
public class AppRouterTest {
[Test]
public async Task GetServers() {
}
}
}

View File

@ -1,4 +1,5 @@

using LeanCloud.Common;
namespace LeanCloud.Storage.Internal {
public class AVPlugins {
private static readonly object instanceMutex = new object();
@ -52,7 +53,8 @@ namespace LeanCloud.Storage.Internal {
public AppRouterController AppRouterController {
get {
lock (mutex) {
appRouterController = appRouterController ?? new AppRouterController();
var conf = AVClient.CurrentConfiguration;
appRouterController = appRouterController ?? new AppRouterController(conf.ApplicationId, conf.ApiServer);
return appRouterController;
}
}

View File

@ -5,9 +5,9 @@ using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
using System.Linq;
using Newtonsoft.Json;
using LeanCloud.Common;
namespace LeanCloud.Storage.Internal {
/// <summary>
@ -44,6 +44,7 @@ namespace LeanCloud.Storage.Internal {
httpClient.DefaultRequestHeaders.Add("X-LC-Prod", AVClient.UseProduction ? USE_PRODUCTION : USE_DEVELOPMENT);
}
/// <summary>
///
/// </summary>
@ -71,14 +72,14 @@ namespace LeanCloud.Storage.Internal {
!string.IsNullOrEmpty(AVUser.CurrentUser.SessionToken)) {
request.Headers.Add("X-LC-Session", AVUser.CurrentUser.SessionToken);
}
PrintRequest(httpClient, request, content);
HttpUtils.PrintRequest(httpClient, request, content);
var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
request.Dispose();
var resultString = await response.Content.ReadAsStringAsync();
response.Dispose();
PrintResponse(response, resultString);
HttpUtils.PrintResponse(response, resultString);
var ret = new Tuple<HttpStatusCode, string>(response.StatusCode, resultString);
@ -115,33 +116,65 @@ namespace LeanCloud.Storage.Internal {
return new Tuple<HttpStatusCode, T>(responseCode, default);
}
static void PrintRequest(HttpClient client, HttpRequestMessage request, string content) {
StringBuilder sb = new StringBuilder();
sb.AppendLine("=== HTTP Request Start ===");
sb.AppendLine($"URL: {request.RequestUri}");
sb.AppendLine($"Method: {request.Method}");
sb.AppendLine($"Headers: ");
foreach (var header in client.DefaultRequestHeaders) {
sb.AppendLine($"\t{header.Key}: {string.Join(",", header.Value.ToArray())}");
// TODO (hallucinogen): move this out to a class to be used by Analytics
private const int MaximumBatchSize = 50;
internal async Task<IList<IDictionary<string, object>>> ExecuteBatchRequests(IList<AVCommand> requests,
CancellationToken cancellationToken) {
var results = new List<IDictionary<string, object>>();
int batchSize = requests.Count;
IEnumerable<AVCommand> remaining = requests;
while (batchSize > MaximumBatchSize) {
var process = remaining.Take(MaximumBatchSize).ToList();
remaining = remaining.Skip(MaximumBatchSize);
results.AddRange(await ExecuteBatchRequest(process, cancellationToken));
batchSize = remaining.Count();
}
foreach (var header in request.Headers) {
sb.AppendLine($"\t{header.Key}: {string.Join(",", header.Value.ToArray())}");
}
foreach (var header in request.Content.Headers) {
sb.AppendLine($"\t{header.Key}: {string.Join(",", header.Value.ToArray())}");
}
sb.AppendLine($"Content: {content}");
sb.AppendLine("=== HTTP Request End ===");
AVClient.PrintLog(sb.ToString());
results.AddRange(await ExecuteBatchRequest(remaining.ToList(), cancellationToken));
return results;
}
static void PrintResponse(HttpResponseMessage response, string content) {
StringBuilder sb = new StringBuilder();
sb.AppendLine("=== HTTP Response Start ===");
sb.AppendLine($"URL: {response.RequestMessage.RequestUri}");
sb.AppendLine($"Content: {content}");
sb.AppendLine("=== HTTP Response End ===");
AVClient.PrintLog(sb.ToString());
internal async Task<IList<IDictionary<string, object>>> ExecuteBatchRequest(IList<AVCommand> requests, CancellationToken cancellationToken) {
var tasks = new List<Task<IDictionary<string, object>>>();
int batchSize = requests.Count;
var tcss = new List<TaskCompletionSource<IDictionary<string, object>>>();
for (int i = 0; i < batchSize; ++i) {
var tcs = new TaskCompletionSource<IDictionary<string, object>>();
tcss.Add(tcs);
tasks.Add(tcs.Task);
}
var encodedRequests = requests.Select(r => {
var results = new Dictionary<string, object> {
{ "method", r.Method.Method },
{ "path", $"/{AVClient.APIVersion}/{r.Path}" },
};
if (r.Content != null) {
results["body"] = r.Content;
}
return results;
}).Cast<object>().ToList();
var command = new AVCommand {
Path = "batch",
Method = HttpMethod.Post,
Content = new Dictionary<string, object> {
{ "requests", encodedRequests }
}
};
try {
List<IDictionary<string, object>> result = new List<IDictionary<string, object>>();
var response = await AVPlugins.Instance.CommandRunner.RunCommandAsync<IList<object>>(command, cancellationToken);
return response.Item2.Cast<IDictionary<string, object>>().ToList();
} catch (Exception e) {
throw e;
}
}
}
}

View File

@ -7,6 +7,7 @@ using System.Net.Http.Headers;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using LeanCloud.Common;
namespace LeanCloud.Storage.Internal {
internal class QCloudUploader {

View File

@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using System.Net.Http;
using System.Net.Http.Headers;
using LeanCloud.Common;
namespace LeanCloud.Storage.Internal {
internal enum CommonSize : long {

View File

@ -51,10 +51,9 @@ namespace LeanCloud.Storage.Internal {
return serverState;
}
public IList<Task<IObjectState>> SaveAllAsync(IList<IObjectState> states,
public async Task<IList<IObjectState>> SaveAllAsync(IList<IObjectState> states,
IList<IDictionary<string, IAVFieldOperation>> operationsList,
CancellationToken cancellationToken) {
var requests = states
.Zip(operationsList, (item, ops) => new AVCommand {
Path = item.ObjectId == null ? $"classes/{Uri.EscapeDataString(item.ClassName)}" : $"classes/{Uri.EscapeDataString(item.ClassName)}/{Uri.EscapeDataString(item.ObjectId)}",
@ -62,16 +61,15 @@ namespace LeanCloud.Storage.Internal {
Content = AVObject.ToJSONObjectForSaving(ops)
})
.ToList();
var batchTasks = ExecuteBatchRequests(requests, cancellationToken);
var stateTasks = new List<Task<IObjectState>>();
foreach (var task in batchTasks) {
stateTasks.Add(task.OnSuccess(t => {
return AVObjectCoder.Instance.Decode(t.Result, AVDecoder.Instance);
}));
IList<IObjectState> list = new List<IObjectState>();
var result = await AVPlugins.Instance.CommandRunner.ExecuteBatchRequests(requests, cancellationToken);
foreach (var data in result) {
if (data.TryGetValue("success", out object val)) {
IObjectState obj = AVObjectCoder.Instance.Decode(val as IDictionary<string, object>, AVDecoder.Instance);
list.Add(obj);
}
return stateTasks;
}
return list;
}
public async Task DeleteAsync(IObjectState state, AVQuery<AVObject> query, CancellationToken cancellationToken) {
@ -88,7 +86,7 @@ namespace LeanCloud.Storage.Internal {
await AVPlugins.Instance.CommandRunner.RunCommandAsync<IDictionary<string, object>>(command, cancellationToken);
}
public IList<Task> DeleteAllAsync(IList<IObjectState> states,
public async Task DeleteAllAsync(IList<IObjectState> states,
CancellationToken cancellationToken) {
var requests = states
.Where(item => item.ObjectId != null)
@ -97,92 +95,9 @@ namespace LeanCloud.Storage.Internal {
Method = HttpMethod.Delete
})
.ToList();
return ExecuteBatchRequests(requests, cancellationToken).Cast<Task>().ToList();
}
await AVPlugins.Instance.CommandRunner.ExecuteBatchRequests(requests, cancellationToken);
// TODO 判断是否全部失败或者网络错误
// TODO (hallucinogen): move this out to a class to be used by Analytics
private const int MaximumBatchSize = 50;
internal IList<Task<IDictionary<string, object>>> ExecuteBatchRequests(IList<AVCommand> requests,
CancellationToken cancellationToken) {
var tasks = new List<Task<IDictionary<string, object>>>();
int batchSize = requests.Count;
IEnumerable<AVCommand> remaining = requests;
while (batchSize > MaximumBatchSize) {
var process = remaining.Take(MaximumBatchSize).ToList();
remaining = remaining.Skip(MaximumBatchSize);
tasks.AddRange(ExecuteBatchRequest(process, cancellationToken));
batchSize = remaining.Count();
}
tasks.AddRange(ExecuteBatchRequest(remaining.ToList(), cancellationToken));
return tasks;
}
private async Task<IList<Task<IDictionary<string, object>>>> ExecuteBatchRequest(IList<AVCommand> requests,
CancellationToken cancellationToken) {
var tasks = new List<Task<IDictionary<string, object>>>();
int batchSize = requests.Count;
var tcss = new List<TaskCompletionSource<IDictionary<string, object>>>();
for (int i = 0; i < batchSize; ++i) {
var tcs = new TaskCompletionSource<IDictionary<string, object>>();
tcss.Add(tcs);
tasks.Add(tcs.Task);
}
var encodedRequests = requests.Select(r => {
var results = new Dictionary<string, object> {
{ "method", r.Method.Method },
{ "path", $"/{AVClient.APIVersion}/{r.Path}" },
};
if (r.Content != null) {
results["body"] = r.Content;
}
return results;
}).Cast<object>().ToList();
var command = new AVCommand {
Path = "batch",
Method = HttpMethod.Post,
Content = new Dictionary<string, object> {
{ "requests", encodedRequests }
}
};
try {
var response = await AVPlugins.Instance.CommandRunner.RunCommandAsync<IList<object>>(command, cancellationToken);
var resultsArray = response.Item2;
int resultLength = resultsArray.Count;
if (resultLength != batchSize) {
foreach (var tcs in tcss) {
tcs.TrySetException(new InvalidOperationException(
"Batch command result count expected: " + batchSize + " but was: " + resultLength + "."));
}
}
for (int i = 0; i < batchSize; ++i) {
var result = resultsArray[i] as Dictionary<string, object>;
var tcs = tcss[i];
if (result.ContainsKey("success")) {
tcs.TrySetResult(result["success"] as IDictionary<string, object>);
} else if (result.ContainsKey("error")) {
var error = result["error"] as IDictionary<string, object>;
long errorCode = long.Parse(error["code"].ToString());
tcs.TrySetException(new AVException((AVException.ErrorCode)errorCode, error["error"] as string));
} else {
tcs.TrySetException(new InvalidOperationException(
"Invalid batch command response."));
}
}
} catch (Exception e) {
foreach (var tcs in tcss) {
tcs.TrySetException(e);
}
}
}
}
}

View File

@ -610,12 +610,11 @@ string propertyName
var operationsList = (from item in current
select item.StartSave()).ToList();
var saveTasks = ObjectController.SaveAllAsync(states,
var serverStates = await ObjectController.SaveAllAsync(states,
operationsList,
cancellationToken);
try {
var serverStates = await Task.WhenAll(saveTasks);
foreach (var pair in current.Zip(serverStates, (item, state) => new { item, state })) {
pair.item.HandleSave(pair.state);
}
@ -802,25 +801,16 @@ string propertyName
/// Deletes each object in the provided list.
/// </summary>
/// <param name="objects">The objects to delete.</param>
public static Task DeleteAllAsync<T>(IEnumerable<T> objects, CancellationToken cancellationToken = default)
public static async Task DeleteAllAsync<T>(IEnumerable<T> objects, CancellationToken cancellationToken = default)
where T : AVObject {
var uniqueObjects = new HashSet<AVObject>(objects.OfType<AVObject>().ToList(),
new IdentityEqualityComparer<AVObject>());
return EnqueueForAll<object>(uniqueObjects, toAwait => {
var states = uniqueObjects.Select(t => t.state).ToList();
return toAwait.OnSuccess(_ => {
var deleteTasks = ObjectController.DeleteAllAsync(states, cancellationToken);
return Task.WhenAll(deleteTasks);
}).Unwrap().OnSuccess(t => {
// Dirty all objects in memory.
await ObjectController.DeleteAllAsync(states, cancellationToken);
foreach (var obj in uniqueObjects) {
obj.IsDirty = true;
}
return (object)null;
});
}, cancellationToken);
}
#endregion

View File

@ -1,64 +0,0 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections;
namespace Storage.Public {
public class AVObject2 : IDictionary<string, object> {
ConcurrentDictionary<string, object> data;
public object this[string key] { get => ((IDictionary<string, object>)data)[key]; set => ((IDictionary<string, object>)data)[key] = value; }
public ICollection<string> Keys => ((IDictionary<string, object>)data).Keys;
public ICollection<object> Values => ((IDictionary<string, object>)data).Values;
public int Count => ((IDictionary<string, object>)data).Count;
public bool IsReadOnly => ((IDictionary<string, object>)data).IsReadOnly;
public void Add(string key, object value) {
((IDictionary<string, object>)data).Add(key, value);
}
public void Add(KeyValuePair<string, object> item) {
((IDictionary<string, object>)data).Add(item);
}
public void Clear() {
((IDictionary<string, object>)data).Clear();
}
public bool Contains(KeyValuePair<string, object> item) {
return ((IDictionary<string, object>)data).Contains(item);
}
public bool ContainsKey(string key) {
return ((IDictionary<string, object>)data).ContainsKey(key);
}
public void CopyTo(KeyValuePair<string, object>[] array, int arrayIndex) {
((IDictionary<string, object>)data).CopyTo(array, arrayIndex);
}
public IEnumerator<KeyValuePair<string, object>> GetEnumerator() {
return ((IDictionary<string, object>)data).GetEnumerator();
}
public bool Remove(string key) {
return ((IDictionary<string, object>)data).Remove(key);
}
public bool Remove(KeyValuePair<string, object> item) {
return ((IDictionary<string, object>)data).Remove(item);
}
public bool TryGetValue(string key, out object value) {
return ((IDictionary<string, object>)data).TryGetValue(key, out value);
}
IEnumerator IEnumerable.GetEnumerator() {
return ((IDictionary<string, object>)data).GetEnumerator();
}
}
}

7
Test/Common.Test/Test.cs Normal file
View File

@ -0,0 +1,7 @@
using System;
namespace Common.Test {
public class Test {
public Test() {
}
}
}