chore: 简化保存逻辑

oneRain 2019-12-09 16:18:59 +08:00
parent 538552c13d
commit 0bd113c4d6
5 changed files with 77 additions and 253 deletions

View File

@ -27,7 +27,7 @@ namespace LeanCloud.Test {
{ "hello", 1 }, { "hello", 1 },
{ "world", 2 } { "world", 2 }
}; };
await obj.Save(); await obj.SaveAsync();
Assert.NotNull(obj.ObjectId); Assert.NotNull(obj.ObjectId);
Assert.NotNull(obj.CreatedAt); Assert.NotNull(obj.CreatedAt);
Assert.NotNull(obj.UpdatedAt); Assert.NotNull(obj.UpdatedAt);
@ -80,7 +80,7 @@ namespace LeanCloud.Test {
AVObject c2 = new AVObject("C2"); AVObject c2 = new AVObject("C2");
parent["c1"] = c1; parent["c1"] = c1;
parent["c2"] = c2; parent["c2"] = c2;
await parent.Save(); await parent.SaveAsync();
} }
[Test] [Test]
@ -239,7 +239,7 @@ namespace LeanCloud.Test {
a["b"] = b; a["b"] = b;
b["a"] = a; b["a"] = a;
Assert.ThrowsAsync<AVException>(async () => await a.Save()); Assert.ThrowsAsync<AVException>(async () => await a.SaveAsync());
} }
[Test] [Test]
@ -251,7 +251,7 @@ namespace LeanCloud.Test {
b["c"] = c; b["c"] = c;
c["a"] = a; c["a"] = a;
Assert.ThrowsAsync<AVException>(async () => await a.Save()); Assert.ThrowsAsync<AVException>(async () => await a.SaveAsync());
} }
[Test] [Test]
@ -263,7 +263,7 @@ namespace LeanCloud.Test {
{ "c", a } { "c", a }
}; };
Assert.ThrowsAsync<AVException>(async () => await a.Save()); Assert.ThrowsAsync<AVException>(async () => await a.SaveAsync());
} }
[Test] [Test]
@ -278,7 +278,7 @@ namespace LeanCloud.Test {
{ "c", a } { "c", a }
}; };
Assert.ThrowsAsync<AVException>(async () => await a.Save()); Assert.ThrowsAsync<AVException>(async () => await a.SaveAsync());
} }
[Test] [Test]
@ -291,7 +291,7 @@ namespace LeanCloud.Test {
{ "c1", c1 }, { "c1", c1 },
{ "c2", c2 } { "c2", c2 }
}; };
await p.Save(); await p.SaveAsync();
} }
[Test] [Test]
@ -301,7 +301,7 @@ namespace LeanCloud.Test {
AVObject c2 = new AVObject("C2"); AVObject c2 = new AVObject("C2");
p["c"] = c1; p["c"] = c1;
c1["c"] = c2; c1["c"] = c2;
await p.Save(); await p.SaveAsync();
} }
} }
} }

View File

@ -78,7 +78,7 @@ namespace LeanCloud.Storage.Internal {
AVCommand command = new AVCommand { AVCommand command = new AVCommand {
Path = avObj.ObjectId == null ? $"classes/{Uri.EscapeDataString(avObj.ClassName)}" : $"classes/{Uri.EscapeDataString(avObj.ClassName)}/{Uri.EscapeDataString(avObj.ObjectId)}", Path = avObj.ObjectId == null ? $"classes/{Uri.EscapeDataString(avObj.ClassName)}" : $"classes/{Uri.EscapeDataString(avObj.ClassName)}/{Uri.EscapeDataString(avObj.ObjectId)}",
Method = avObj.ObjectId == null ? HttpMethod.Post : HttpMethod.Put, Method = avObj.ObjectId == null ? HttpMethod.Post : HttpMethod.Put,
Content = AVObject.ToJSONObjectForSaving(avObj.StartSave()) Content = AVObject.ToJSONObjectForSaving(avObj.operationDict)
}; };
commandList.Add(command); commandList.Add(command);
} }

View File

@ -37,7 +37,7 @@ namespace LeanCloud.Storage.Internal
public static IDictionary<string, IAVFieldOperation> GetCurrentOperations(this AVObject obj) public static IDictionary<string, IAVFieldOperation> GetCurrentOperations(this AVObject obj)
{ {
return obj.CurrentOperations; return obj.operationDict;
} }
public static IDictionary<string, object> Encode(this AVObject obj) public static IDictionary<string, object> Encode(this AVObject obj)

View File

@ -47,10 +47,7 @@ namespace LeanCloud {
internal readonly object mutex = new object(); internal readonly object mutex = new object();
private readonly LinkedList<IDictionary<string, IAVFieldOperation>> operationSetQueue = internal readonly ConcurrentDictionary<string, IAVFieldOperation> operationDict = new ConcurrentDictionary<string, IAVFieldOperation>();
new LinkedList<IDictionary<string, IAVFieldOperation>>();
private readonly ConcurrentDictionary<string, IAVFieldOperation> operationDict = new ConcurrentDictionary<string, IAVFieldOperation>();
private readonly ConcurrentDictionary<string, object> estimatedData = new ConcurrentDictionary<string, object>(); private readonly ConcurrentDictionary<string, object> estimatedData = new ConcurrentDictionary<string, object>();
private static readonly ThreadLocal<bool> isCreatingPointer = new ThreadLocal<bool>(() => false); private static readonly ThreadLocal<bool> isCreatingPointer = new ThreadLocal<bool>(() => false);
@ -102,7 +99,7 @@ namespace LeanCloud {
/// <summary> /// <summary>
/// Constructs a new AVObject with no data in it. A AVObject constructed in this way will /// Constructs a new AVObject with no data in it. A AVObject constructed in this way will
/// not have an ObjectId and will not persist to the database until <see cref="SaveAsync()"/> /// not have an ObjectId and will not persist to the database until <see cref="SaveAsync(bool, AVQuery{AVObject}, CancellationToken)"/>
/// is called. /// is called.
/// </summary> /// </summary>
/// <remarks> /// <remarks>
@ -133,11 +130,9 @@ namespace LeanCloud {
ClassName = className ClassName = className
}; };
operationSetQueue.AddLast(new Dictionary<string, IAVFieldOperation>());
if (!isPointer) { if (!isPointer) {
hasBeenFetched = true; hasBeenFetched = true;
IsDirty = true; IsDirty = true;
SetDefaultValues();
} else { } else {
IsDirty = false; IsDirty = false;
hasBeenFetched = false; hasBeenFetched = false;
@ -240,7 +235,7 @@ namespace LeanCloud {
this[GetFieldForPropertyName(ClassName, propertyName)] = value; this[GetFieldForPropertyName(ClassName, propertyName)] = value;
} }
/// <summary> /// <summary>
/// Gets a relation for a property based upon its associated AVFieldName attribute. /// Gets a relation for a property based upon its associated AVFieldName attribute.
/// </summary> /// </summary>
/// <returns>The AVRelation for the property.</returns> /// <returns>The AVRelation for the property.</returns>
@ -313,13 +308,12 @@ string propertyName
} }
/// <summary> /// <summary>
/// Clears any changes to this object made since the last call to <see cref="SaveAsync()"/>. /// Clears any changes to this object made since the last call to <see cref="SaveAsync(bool, AVQuery{AVObject}, CancellationToken)"/>.
/// </summary> /// </summary>
public void Revert() { public void Revert() {
lock (mutex) { lock (mutex) {
bool wasDirty = CurrentOperations.Count > 0; if (operationDict.Any()) {
if (wasDirty) { operationDict.Clear();
CurrentOperations.Clear();
RebuildEstimatedData(); RebuildEstimatedData();
} }
} }
@ -331,38 +325,9 @@ string propertyName
} }
} }
internal void HandleFailedSave(IDictionary<string, IAVFieldOperation> operationsBeforeSave) {
lock (mutex) {
var opNode = operationSetQueue.Find(operationsBeforeSave);
var nextOperations = opNode.Next.Value;
bool wasDirty = nextOperations.Count > 0;
operationSetQueue.Remove(opNode);
// Merge the data from the failed save into the next save.
foreach (var pair in operationsBeforeSave) {
var operation1 = pair.Value;
IAVFieldOperation operation2 = null;
nextOperations.TryGetValue(pair.Key, out operation2);
if (operation2 != null) {
operation2 = operation2.MergeWithPrevious(operation1);
} else {
operation2 = operation1;
}
nextOperations[pair.Key] = operation2;
}
}
}
internal virtual void HandleSave(IObjectState serverState) { internal virtual void HandleSave(IObjectState serverState) {
lock (mutex) { lock (mutex) {
var operationsBeforeSave = operationSetQueue.First.Value; state = state.MutatedClone((objectState) => objectState.Apply(operationDict));
operationSetQueue.RemoveFirst();
// Merge the data from the save and the data from the server into serverData.
//MutateState(mutableClone =>
//{
// mutableClone.Apply(operationsBeforeSave);
//});
state = state.MutatedClone((objectState) => objectState.Apply(operationsBeforeSave));
MergeFromServer(serverState); MergeFromServer(serverState);
} }
} }
@ -406,20 +371,14 @@ string propertyName
internal void MergeFromObject(AVObject other) { internal void MergeFromObject(AVObject other) {
lock (mutex) { lock (mutex) {
// If they point to the same instance, we don't need to merge
if (this == other) { if (this == other) {
return; return;
} }
} }
// Clear out any changes on this object. operationDict.Clear();
if (operationSetQueue.Count != 1) { foreach (KeyValuePair<string, IAVFieldOperation> entry in other.operationDict) {
throw new InvalidOperationException("Attempt to MergeFromObject during save."); operationDict.AddOrUpdate(entry.Key, entry.Value, (key, value) => value);
}
operationSetQueue.Clear();
foreach (var operationSet in other.operationSetQueue) {
operationSetQueue.AddLast(operationSet.ToDictionary(entry => entry.Key,
entry => entry.Value));
} }
lock (mutex) { lock (mutex) {
@ -498,8 +457,7 @@ string propertyName
.ToDictionary(group => group.Key, group => group.Last()); .ToDictionary(group => group.Key, group => group.Last());
} }
public static IDictionary<string, object> ToJSONObjectForSaving( public static IDictionary<string, object> ToJSONObjectForSaving(IDictionary<string, IAVFieldOperation> operations) {
IDictionary<string, IAVFieldOperation> operations) {
var result = new Dictionary<string, object>(); var result = new Dictionary<string, object>();
foreach (var pair in operations) { foreach (var pair in operations) {
// AVRPCSerialize the data // AVRPCSerialize the data
@ -528,43 +486,46 @@ string propertyName
as IDictionary<string, object>; as IDictionary<string, object>;
} }
#region Save Object(s) #region Save Object()
/// <summary>
/// Pushes new operations onto the queue and returns the current set of operations.
/// </summary>
public IDictionary<string, IAVFieldOperation> StartSave() {
lock (mutex) {
var currentOperations = CurrentOperations;
operationSetQueue.AddLast(new Dictionary<string, IAVFieldOperation>());
return currentOperations;
}
}
public virtual async Task SaveAsync(bool fetchWhenSave = false, AVQuery<AVObject> query = null, CancellationToken cancellationToken = default) { public virtual async Task SaveAsync(bool fetchWhenSave = false, AVQuery<AVObject> query = null, CancellationToken cancellationToken = default) {
IDictionary<string, IAVFieldOperation> currentOperations = null; if (HasCircleReference(this, new HashSet<AVObject>())) {
if (!IsDirty) { throw new AVException(AVException.ErrorCode.CircleReference, "Found a circle dependency when save");
return;
}
Task deepSaveTask;
lock (mutex) {
// Get the JSON representation of the object.
currentOperations = StartSave();
deepSaveTask = DeepSaveAsync(estimatedData, cancellationToken);
} }
Stack<Batch> batches = BatchObjects(new List<AVObject> { this });
await SaveBatches(batches, cancellationToken);
// TODO query
try { }
await deepSaveTask;
IObjectState objState = await ObjectController.SaveAsync(state, /// <summary>
currentOperations, /// Saves each object in the provided list.
FetchWhenSave || fetchWhenSave, /// </summary>
query, /// <param name="objects">The objects to save.</param>
cancellationToken); public static async Task SaveAllAsync<T>(IEnumerable<T> objects, CancellationToken cancellationToken = default)
HandleSave(objState); where T : AVObject {
} catch (Exception e) { foreach (T obj in objects) {
HandleFailedSave(currentOperations); if (HasCircleReference(obj, new HashSet<AVObject>())) {
throw e; throw new AVException(AVException.ErrorCode.CircleReference, "Found a circle dependency when save");
}
}
Stack<Batch> batches = BatchObjects(objects);
await SaveBatches(batches, cancellationToken);
}
static async Task SaveBatches(Stack<Batch> batches, CancellationToken cancellationToken = default) {
while (batches.Any()) {
Batch batch = batches.Pop();
IList<AVObject> dirtyObjects = batch.Objects.Where(o => o.IsDirty).ToList();
var serverStates = await ObjectController.SaveAllAsync(dirtyObjects, cancellationToken);
try {
foreach (var pair in dirtyObjects.Zip(serverStates, (item, state) => new { item, state })) {
pair.item.HandleSave(pair.state);
}
} catch (Exception e) {
throw e;
}
} }
} }
@ -587,76 +548,6 @@ string propertyName
}); });
} }
private static async Task DeepSaveAsync(object obj, CancellationToken cancellationToken) {
var objects = new List<AVObject>();
CollectDirtyChildren(obj, objects);
var uniqueObjects = new HashSet<AVObject>(objects, new IdentityEqualityComparer<AVObject>());
// 先保存文件对象(后面可以考虑将 AVFile 作为 AVObject 的子类型进行保存)
//var saveDirtyFileTasks = DeepTraversal(obj, true)
// .OfType<AVFile>()
// .Where(f => f.IsDirty)
// .Select(f => f.SaveAsync(cancellationToken: cancellationToken)).ToList();
//await Task.WhenAll(saveDirtyFileTasks);
IEnumerable<AVObject> remaining = new List<AVObject>(uniqueObjects);
while (remaining.Any()) {
// Partition the objects into two sets: those that can be saved immediately,
// and those that rely on other objects to be created first.
var current = (from item in remaining
where item.CanBeSerialized
select item).ToList();
remaining = remaining.Except(current).ToList();
if (current.Count == 0) {
// We do cycle-detection when building the list of objects passed to this
// function, so this should never get called. But we should check for it
// anyway, so that we get an exception instead of an infinite loop.
throw new InvalidOperationException(
"Unable to save a AVObject with a relation to a cycle.");
}
var states = (from item in current
select item.state).ToList();
var operationsList = (from item in current
select item.StartSave()).ToList();
var serverStates = await ObjectController.SaveAllAsync(states,
operationsList,
cancellationToken);
try {
foreach (var pair in current.Zip(serverStates, (item, state) => new { item, state })) {
pair.item.HandleSave(pair.state);
}
} catch (Exception e) {
foreach (var pair in current.Zip(operationsList, (item, ops) => new { item, ops })) {
pair.item.HandleFailedSave(pair.ops);
}
throw e;
}
}
}
/// <summary>
/// Saves each object in the provided list.
/// </summary>
/// <param name="objects">The objects to save.</param>
public static Task SaveAllAsync<T>(IEnumerable<T> objects) where T : AVObject {
return SaveAllAsync(objects, CancellationToken.None);
}
/// <summary>
/// Saves each object in the provided list.
/// </summary>
/// <param name="objects">The objects to save.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static Task SaveAllAsync<T>(
IEnumerable<T> objects, CancellationToken cancellationToken) where T : AVObject {
return DeepSaveAsync(objects.ToList(), cancellationToken);
}
#endregion #endregion
#region Fetch Object(s) #region Fetch Object(s)
@ -951,9 +842,7 @@ string propertyName
} }
private IEnumerable<string> ApplyOperations(IDictionary<string, private IEnumerable<string> ApplyOperations(IDictionary<string, IAVFieldOperation> operations, IDictionary<string, object> map) {
IAVFieldOperation> operations,
IDictionary<string, object> map) {
List<string> appliedKeys = new List<string>(); List<string> appliedKeys = new List<string>();
lock (mutex) { lock (mutex) {
foreach (var pair in operations) { foreach (var pair in operations) {
@ -974,31 +863,8 @@ string propertyName
/// Regenerates the estimatedData map from the serverData and operations. /// Regenerates the estimatedData map from the serverData and operations.
/// </summary> /// </summary>
internal void RebuildEstimatedData() { internal void RebuildEstimatedData() {
IEnumerable<string> changedKeys = null; estimatedData.Clear();
ApplyOperations(operationDict, estimatedData);
lock (mutex) {
//estimatedData.Clear();
List<string> converdKeys = new List<string>();
foreach (var item in state) {
var key = item.Key;
var value = item.Value;
if (!estimatedData.ContainsKey(key)) {
converdKeys.Add(key);
} else {
var oldValue = estimatedData[key];
if (oldValue != value) {
converdKeys.Add(key);
}
estimatedData.TryRemove(key, out _);
}
estimatedData.TryAdd(item.Key, item.Value);
}
changedKeys = converdKeys;
foreach (var operations in operationSetQueue) {
var appliedKeys = ApplyOperations(operations, estimatedData);
changedKeys = converdKeys.Concat(appliedKeys);
}
}
} }
/// <summary> /// <summary>
@ -1007,9 +873,7 @@ string propertyName
/// </summary> /// </summary>
internal void PerformOperation(string key, IAVFieldOperation operation) { internal void PerformOperation(string key, IAVFieldOperation operation) {
lock (mutex) { lock (mutex) {
var ifDirtyBeforePerform = this.IsDirty; estimatedData.TryGetValue(key, out object oldValue);
object oldValue;
estimatedData.TryGetValue(key, out oldValue);
object newValue = operation.Apply(oldValue, key); object newValue = operation.Apply(oldValue, key);
if (newValue != AVDeleteOperation.DeleteToken) { if (newValue != AVDeleteOperation.DeleteToken) {
estimatedData[key] = newValue; estimatedData[key] = newValue;
@ -1017,11 +881,10 @@ string propertyName
estimatedData.TryRemove(key, out _); estimatedData.TryRemove(key, out _);
} }
IAVFieldOperation oldOperation; if (operationDict.TryGetValue(key, out IAVFieldOperation oldOperation)) {
bool wasDirty = CurrentOperations.Count > 0; operation = operation.MergeWithPrevious(oldOperation);
CurrentOperations.TryGetValue(key, out oldOperation); }
var newOperation = operation.MergeWithPrevious(oldOperation); operationDict[key] = operation;
CurrentOperations[key] = newOperation;
} }
} }
@ -1049,7 +912,9 @@ string propertyName
lock (mutex) { lock (mutex) {
CheckGetAccess(key); CheckGetAccess(key);
var value = estimatedData[key]; if (!estimatedData.TryGetValue(key, out object value)) {
value = state[key];
}
if (value is AVRelationBase) { if (value is AVRelationBase) {
var relation = value as AVRelationBase; var relation = value as AVRelationBase;
@ -1321,14 +1186,6 @@ string propertyName
} }
} }
internal IDictionary<string, IAVFieldOperation> CurrentOperations {
get {
lock (mutex) {
return operationSetQueue.Last.Value;
}
}
}
/// <summary> /// <summary>
/// Gets a set view of the keys contained in this object. This does not include createdAt, /// Gets a set view of the keys contained in this object. This does not include createdAt,
/// updatedAt, or objectId. It does include things like username and ACL. /// updatedAt, or objectId. It does include things like username and ACL.
@ -1376,8 +1233,8 @@ string propertyName
/// <summary> /// <summary>
/// Gets the last time this object was updated as the server sees it, so that if you make changes /// Gets the last time this object was updated as the server sees it, so that if you make changes
/// to a AVObject, then wait a while, and then call <see cref="SaveAsync()"/>, the updated time /// to a AVObject, then wait a while, and then call <see cref="SaveAsync(bool, AVQuery{AVObject}, CancellationToken)"/>, the updated time
/// will be the time of the <see cref="SaveAsync()"/> call rather than the time the object was /// will be the time of the <see cref="SaveAsync(bool, AVQuery{AVObject}, CancellationToken)"/> call rather than the time the object was
/// changed locally. /// changed locally.
/// </summary> /// </summary>
[AVFieldName("updatedAt")] [AVFieldName("updatedAt")]
@ -1389,8 +1246,8 @@ string propertyName
/// <summary> /// <summary>
/// Gets the first time this object was saved as the server sees it, so that if you create a /// Gets the first time this object was saved as the server sees it, so that if you create a
/// AVObject, then wait a while, and then call <see cref="SaveAsync()"/>, the /// AVObject, then wait a while, and then call <see cref="SaveAsync(bool, AVQuery{AVObject}, CancellationToken)"/>, the
/// creation time will be the time of the first <see cref="SaveAsync()"/> call rather than /// creation time will be the time of the first <see cref="SaveAsync(bool, AVQuery{AVObject}, CancellationToken)"/> call rather than
/// the time the object was created locally. /// the time the object was created locally.
/// </summary> /// </summary>
[AVFieldName("createdAt")] [AVFieldName("createdAt")]
@ -1426,13 +1283,13 @@ string propertyName
/// <c>false</c>.</returns> /// <c>false</c>.</returns>
public bool IsKeyDirty(string key) { public bool IsKeyDirty(string key) {
lock (mutex) { lock (mutex) {
return CurrentOperations.ContainsKey(key); return operationDict.ContainsKey(key);
} }
} }
private bool CheckIsDirty(bool considerChildren) { private bool CheckIsDirty(bool considerChildren) {
lock (mutex) { lock (mutex) {
return (dirty || CurrentOperations.Count > 0 || (considerChildren && HasDirtyChildren)); return (dirty || operationDict.Count > 0 || (considerChildren && HasDirtyChildren));
} }
} }
@ -1600,34 +1457,6 @@ string propertyName
return batches; return batches;
} }
public virtual async Task Save() {
if (HasCircleReference(this, new HashSet<AVObject>())) {
throw new AVException(AVException.ErrorCode.CircleReference, "Found a circle dependency when save");
}
Stack<Batch> batches = BatchObjects(new List<AVObject> { this });
while (batches.Any()) {
Batch batch = batches.Pop();
IList<AVObject> dirtyObjects = batch.Objects.Where(o => o.IsDirty).ToList();
//IList<IObjectState> states = (from item in dirtyObjects
// select item.state).ToList();
//IList<IDictionary<string, IAVFieldOperation>> operationList = (from item in dirtyObjects
// select item.StartSave()).ToList();
var serverStates = await ObjectController.SaveAllAsync(dirtyObjects, CancellationToken.None);
//var serverStates = await ObjectController.SaveAllAsync(states, operationList, CancellationToken.None);
try {
foreach (var pair in dirtyObjects.Zip(serverStates, (item, state) => new { item, state })) {
pair.item.HandleSave(pair.state);
}
} catch (Exception e) {
//foreach (var pair in dirtyObjects.Zip(operationList, (item, ops) => new { item, ops })) {
// pair.item.HandleFailedSave(pair.ops);
//}
throw e;
}
}
}
#endregion #endregion
} }
} }

View File

@ -192,14 +192,9 @@ namespace LeanCloud {
throw new InvalidOperationException("Cannot sign up a user that already exists."); throw new InvalidOperationException("Cannot sign up a user that already exists.");
} }
IDictionary<string, IAVFieldOperation> currentOperations = StartSave(); var serverState = await UserController.SignUpAsync(State, operationDict);
try { HandleSave(serverState);
var serverState = await UserController.SignUpAsync(State, currentOperations); CurrentUser = this;
HandleSave(serverState);
CurrentUser = this;
} catch (Exception) {
HandleFailedSave(currentOperations);
}
} }
/// <summary> /// <summary>