From 3bac16229fb1a83eb82104947e56a570937b3e28 Mon Sep 17 00:00:00 2001 From: hadashiA Date: Sun, 10 Sep 2023 23:33:29 +0900 Subject: [PATCH] Reduce lock --- .../Plugins/UniTask/Runtime/Linq/Merge.cs | 49 ++++++++----------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Merge.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Merge.cs index 226b9fb..86240a6 100644 --- a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Merge.cs +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Merge.cs @@ -65,7 +65,7 @@ namespace Cysharp.Threading.Tasks.Linq readonly int length; readonly IUniTaskAsyncEnumerator[] enumerators; - readonly MergeSourceState[] states; + readonly int[] states; readonly Queue<(T, Exception)> queuedResult = new Queue<(T, Exception)>(); readonly CancellationToken cancellationToken; @@ -75,12 +75,12 @@ namespace Cysharp.Threading.Tasks.Linq { this.cancellationToken = cancellationToken; length = sources.Length; - states = ArrayPool.Shared.Rent(length); + states = ArrayPool.Shared.Rent(length); enumerators = ArrayPool>.Shared.Rent(length); for (var i = 0; i < length; i++) { enumerators[i] = sources[i].GetAsyncEnumerator(cancellationToken); - states[i] = MergeSourceState.Pending; + states[i] = (int)MergeSourceState.Pending;; } } @@ -112,23 +112,17 @@ namespace Cysharp.Threading.Tasks.Linq for (var i = 0; i < length; i++) { - lock (states) + if (Interlocked.CompareExchange(ref states[i], (int)MergeSourceState.Running, (int)MergeSourceState.Pending) == (int)MergeSourceState.Pending) { - if (states[i] != MergeSourceState.Pending) + var awaiter = enumerators[i].MoveNextAsync().GetAwaiter(); + if (awaiter.IsCompleted) { - continue; + GetResultAt(i, awaiter); + } + else + { + awaiter.SourceOnCompleted(GetResultAtAction, StateTuple.Create(this, i, awaiter)); } - states[i] = MergeSourceState.Running; - } - - var awaiter = enumerators[i].MoveNextAsync().GetAwaiter(); - if (awaiter.IsCompleted) - { - GetResultAt(i, awaiter); - } - else - { - awaiter.SourceOnCompleted(GetResultAtAction, StateTuple.Create(this, i, awaiter)); } } return new UniTask(this, completionSource.Version); @@ -141,7 +135,7 @@ namespace Cysharp.Threading.Tasks.Linq await enumerators[i].DisposeAsync(); } - ArrayPool.Shared.Return(states, true); + ArrayPool.Shared.Return(states, true); ArrayPool>.Shared.Return(enumerators, true); } @@ -159,10 +153,7 @@ namespace Cysharp.Threading.Tasks.Linq try { hasNext = awaiter.GetResult(); - lock (states) - { - states[index] = hasNext ? MergeSourceState.Pending : MergeSourceState.Completed; - } + Interlocked.Exchange(ref states[index], (int)(hasNext ? MergeSourceState.Pending : MergeSourceState.Completed)); } catch (Exception ex) { @@ -196,16 +187,16 @@ namespace Cysharp.Threading.Tasks.Linq bool IsCompletedAll() { - lock (states) + for (var i = 0; i < length; i++) { - for (var i = 0; i < length; i++) + if (states[i] != (int)MergeSourceState.Completed) { - if (states[i] != MergeSourceState.Completed) - { - return false; - } + return false; } - return true; + } + lock (queuedResult) + { + return queuedResult.Count <= 0; } } }