From ec0a8f5a8bff453afce71d5c6909aeae4f7afd30 Mon Sep 17 00:00:00 2001 From: neuecc Date: Mon, 18 May 2020 11:30:04 +0900 Subject: [PATCH] Add IUniTaskAsyncEnumerable.Queue --- src/UniTask.NetCoreTests/Linq/QueueTest.cs | 29 ++++++ .../Linq/TakeInfinityTest.cs | 44 +++++++++ .../Plugins/UniTask/Runtime/Linq/Queue.cs | 95 +++++++++++++++++++ 3 files changed, 168 insertions(+) create mode 100644 src/UniTask.NetCoreTests/Linq/QueueTest.cs create mode 100644 src/UniTask.NetCoreTests/Linq/TakeInfinityTest.cs create mode 100644 src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Queue.cs diff --git a/src/UniTask.NetCoreTests/Linq/QueueTest.cs b/src/UniTask.NetCoreTests/Linq/QueueTest.cs new file mode 100644 index 0000000..d2f5fc5 --- /dev/null +++ b/src/UniTask.NetCoreTests/Linq/QueueTest.cs @@ -0,0 +1,29 @@ +using Cysharp.Threading.Tasks; +using Cysharp.Threading.Tasks.Linq; +using FluentAssertions; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Xunit; + +namespace NetCoreTests.Linq +{ + public class QueueTest + { + [Fact] + public async Task Q() + { + var rp = new AsyncReactiveProperty(100); + + var l = new List(); + await rp.Take(10).Queue().ForEachAsync(x => + { + rp.Value += 10; + l.Add(x); + }); + + l.Should().BeEquivalentTo(100, 110, 120, 130, 140, 150, 160, 170, 180, 190); + } + } +} diff --git a/src/UniTask.NetCoreTests/Linq/TakeInfinityTest.cs b/src/UniTask.NetCoreTests/Linq/TakeInfinityTest.cs new file mode 100644 index 0000000..1c1ea48 --- /dev/null +++ b/src/UniTask.NetCoreTests/Linq/TakeInfinityTest.cs @@ -0,0 +1,44 @@ +using Cysharp.Threading.Tasks; +using Cysharp.Threading.Tasks.Linq; +using FluentAssertions; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Xunit; + +namespace NetCoreTests.Linq +{ + public class TakeInfinityTest + { + [Fact] + public async Task Take() + { + var rp = new AsyncReactiveProperty(1); + + var xs = rp.Take(5).ToArrayAsync(); + + rp.Value = 2; + rp.Value = 3; + rp.Value = 4; + rp.Value = 5; + + (await xs).Should().BeEquivalentTo(1, 2, 3, 4, 5); + } + + [Fact] + public async Task TakeWhile() + { + var rp = new AsyncReactiveProperty(1); + + var xs = rp.TakeWhile(x => x != 5).ToArrayAsync(); + + rp.Value = 2; + rp.Value = 3; + rp.Value = 4; + rp.Value = 5; + + (await xs).Should().BeEquivalentTo(1, 2, 3, 4); + } + } +} diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Queue.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Queue.cs new file mode 100644 index 0000000..032c3ce --- /dev/null +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Queue.cs @@ -0,0 +1,95 @@ +using System; +using System.Threading; + +namespace Cysharp.Threading.Tasks.Linq +{ + public static partial class UniTaskAsyncEnumerable + { + public static IUniTaskAsyncEnumerable Queue(this IUniTaskAsyncEnumerable source) + { + return new QueueOperator(source); + } + } + + internal sealed class QueueOperator : IUniTaskAsyncEnumerable + { + readonly IUniTaskAsyncEnumerable source; + + public QueueOperator(IUniTaskAsyncEnumerable source) + { + this.source = source; + } + + public IUniTaskAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + return new _Queue(source, cancellationToken); + } + + sealed class _Queue : IUniTaskAsyncEnumerator + { + readonly IUniTaskAsyncEnumerable source; + CancellationToken cancellationToken; + + Channel channel; + IUniTaskAsyncEnumerator channelEnumerator; + IUniTaskAsyncEnumerator sourceEnumerator; + + public _Queue(IUniTaskAsyncEnumerable source, CancellationToken cancellationToken) + { + this.source = source; + this.cancellationToken = cancellationToken; + } + + public TSource Current => channelEnumerator.Current; + + public UniTask MoveNextAsync() + { + cancellationToken.ThrowIfCancellationRequested(); + + if (sourceEnumerator == null) + { + sourceEnumerator = source.GetAsyncEnumerator(cancellationToken); + channel = Channel.CreateSingleConsumerUnbounded(); + + channelEnumerator = channel.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken); + + ConsumeAll(sourceEnumerator, channel).Forget(); + } + + return channelEnumerator.MoveNextAsync(); + } + + static async UniTaskVoid ConsumeAll(IUniTaskAsyncEnumerator enumerator, ChannelWriter writer) + { + try + { + while (await enumerator.MoveNextAsync()) + { + writer.TryWrite(enumerator.Current); + } + writer.TryComplete(); + } + catch (Exception ex) + { + writer.TryComplete(ex); + } + finally + { + await enumerator.DisposeAsync(); + } + } + + public async UniTask DisposeAsync() + { + if (sourceEnumerator != null) + { + await sourceEnumerator.DisposeAsync(); + } + if (channelEnumerator != null) + { + await channelEnumerator.DisposeAsync(); + } + } + } + } +} \ No newline at end of file