From 0e25122ee2b789a8e846235460f98cef44728318 Mon Sep 17 00:00:00 2001 From: neuecc Date: Fri, 22 May 2020 03:19:54 +0900 Subject: [PATCH] Add Pairwise --- src/UniTask.NetCoreTests/Linq/Projection.cs | 33 +++++ .../UniTask/Runtime/Linq/CombineLatest.cs | 28 ++++ .../UniTask/Runtime/Linq/CombineLatest.tt | 2 + .../Plugins/UniTask/Runtime/Linq/Pairwise.cs | 128 ++++++++++++++++++ .../UniTask/Runtime/Linq/Pairwise.cs.meta | 11 ++ 5 files changed, 202 insertions(+) create mode 100644 src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Pairwise.cs create mode 100644 src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Pairwise.cs.meta diff --git a/src/UniTask.NetCoreTests/Linq/Projection.cs b/src/UniTask.NetCoreTests/Linq/Projection.cs index f2488ea..6bfbc60 100644 --- a/src/UniTask.NetCoreTests/Linq/Projection.cs +++ b/src/UniTask.NetCoreTests/Linq/Projection.cs @@ -396,6 +396,39 @@ namespace NetCoreTests.Linq await Assert.ThrowsAsync(async () => await complete); } + + + [Fact] + public async Task PariwiseImmediate() + { + var xs = await UniTaskAsyncEnumerable.Range(1, 5).Pairwise().ToArrayAsync(); + xs.Should().BeEquivalentTo((1, 2), (2, 3), (3, 4), (4, 5)); + } + + [Fact] + public async Task Pariwise() + { + var a = new AsyncReactiveProperty(0); + + var list = new List<(int, int)>(); + var complete = a.WithoutCurrent().Pairwise().ForEachAsync(x => list.Add(x)); + + list.Count.Should().Be(0); + a.Value = 10; + list.Count.Should().Be(0); + a.Value = 20; + list.Count.Should().Be(1); + a.Value = 30; + a.Value = 40; + a.Value = 50; + + a.Dispose(); + + await complete; + + list.Should().BeEquivalentTo((10, 20), (20, 30), (30, 40), (40, 50)); + } + class MyException : Exception { diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.cs index 0df71b8..972799c 100644 --- a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.cs +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.cs @@ -280,6 +280,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -457,6 +458,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -535,6 +537,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -779,6 +782,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -872,6 +876,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -1183,6 +1188,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -1291,6 +1297,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -1669,6 +1676,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -1792,6 +1800,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -2237,6 +2246,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -2375,6 +2385,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -2887,6 +2898,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -3040,6 +3052,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -3619,6 +3632,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -3787,6 +3801,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -4433,6 +4448,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -4616,6 +4632,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -5329,6 +5346,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -5527,6 +5545,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -6307,6 +6326,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -6520,6 +6540,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -7367,6 +7388,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -7595,6 +7617,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -8509,6 +8532,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -8752,6 +8776,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -9733,6 +9758,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); @@ -9991,6 +10017,7 @@ namespace Cysharp.Threading.Tasks.Linq this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -11039,6 +11066,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); if (enumerator1 != null) { await enumerator1.DisposeAsync(); diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.tt b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.tt index 7995780..9d39724 100644 --- a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.tt +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.tt @@ -87,6 +87,7 @@ namespace Cysharp.Threading.Tasks.Linq <# } #> this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; @@ -203,6 +204,7 @@ namespace Cysharp.Threading.Tasks.Linq public async UniTask DisposeAsync() { + TaskTracker.RemoveTracking(this); <# for(var j = 1; j <= i; j++) { #> if (enumerator<#= j #> != null) { diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Pairwise.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Pairwise.cs new file mode 100644 index 0000000..5d44a9e --- /dev/null +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Pairwise.cs @@ -0,0 +1,128 @@ +using Cysharp.Threading.Tasks.Internal; +using System; +using System.Threading; + +namespace Cysharp.Threading.Tasks.Linq +{ + public static partial class UniTaskAsyncEnumerable + { + public static IUniTaskAsyncEnumerable<(TSource, TSource)> Pairwise(this IUniTaskAsyncEnumerable source) + { + Error.ThrowArgumentNullException(source, nameof(source)); + + return new Pairwise(source); + } + } + + internal sealed class Pairwise : IUniTaskAsyncEnumerable<(TSource, TSource)> + { + readonly IUniTaskAsyncEnumerable source; + + public Pairwise(IUniTaskAsyncEnumerable source) + { + this.source = source; + } + + public IUniTaskAsyncEnumerator<(TSource, TSource)> GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + return new _Pairwise(source, cancellationToken); + } + + sealed class _Pairwise : MoveNextSource, IUniTaskAsyncEnumerator<(TSource, TSource)> + { + static readonly Action MoveNextCoreDelegate = MoveNextCore; + + readonly IUniTaskAsyncEnumerable source; + CancellationToken cancellationToken; + + IUniTaskAsyncEnumerator enumerator; + UniTask.Awaiter awaiter; + + TSource prev; + bool isFirst; + + public _Pairwise(IUniTaskAsyncEnumerable source, CancellationToken cancellationToken) + { + this.source = source; + this.cancellationToken = cancellationToken; + TaskTracker.TrackActiveTask(this, 3); + } + + public (TSource, TSource) Current { get; private set; } + + public UniTask MoveNextAsync() + { + cancellationToken.ThrowIfCancellationRequested(); + + if (enumerator == null) + { + isFirst = true; + enumerator = source.GetAsyncEnumerator(cancellationToken); + } + + completionSource.Reset(); + SourceMoveNext(); + return new UniTask(this, completionSource.Version); + } + + void SourceMoveNext() + { + try + { + awaiter = enumerator.MoveNextAsync().GetAwaiter(); + if (awaiter.IsCompleted) + { + MoveNextCore(this); + } + else + { + awaiter.SourceOnCompleted(MoveNextCoreDelegate, this); + } + } + catch (Exception ex) + { + completionSource.TrySetException(ex); + } + } + + static void MoveNextCore(object state) + { + var self = (_Pairwise)state; + + if (self.TryGetResult(self.awaiter, out var result)) + { + if (result) + { + if (self.isFirst) + { + self.isFirst = false; + self.prev = self.enumerator.Current; + self.SourceMoveNext(); // run again. okay to use recursive(only one more). + } + else + { + var p = self.prev; + self.prev = self.enumerator.Current; + self.Current = (p, self.prev); + self.completionSource.TrySetResult(true); + } + } + else + { + self.completionSource.TrySetResult(false); + } + } + } + + public UniTask DisposeAsync() + { + TaskTracker.RemoveTracking(this); + if (enumerator != null) + { + return enumerator.DisposeAsync(); + } + return default; + } + } + } +} \ No newline at end of file diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Pairwise.cs.meta b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Pairwise.cs.meta new file mode 100644 index 0000000..727b8cf --- /dev/null +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Pairwise.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: cddbf051d2a88f549986c468b23214af +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: