Add IUniTaskAsyncEnumerable.Queue

master
neuecc 2020-05-18 11:30:04 +09:00
parent 49ba57f20a
commit ec0a8f5a8b
3 changed files with 168 additions and 0 deletions

View File

@ -0,0 +1,29 @@
using Cysharp.Threading.Tasks;
using Cysharp.Threading.Tasks.Linq;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests.Linq
{
public class QueueTest
{
[Fact]
public async Task Q()
{
var rp = new AsyncReactiveProperty<int>(100);
var l = new List<int>();
await rp.Take(10).Queue().ForEachAsync(x =>
{
rp.Value += 10;
l.Add(x);
});
l.Should().BeEquivalentTo(100, 110, 120, 130, 140, 150, 160, 170, 180, 190);
}
}
}

View File

@ -0,0 +1,44 @@
using Cysharp.Threading.Tasks;
using Cysharp.Threading.Tasks.Linq;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests.Linq
{
public class TakeInfinityTest
{
[Fact]
public async Task Take()
{
var rp = new AsyncReactiveProperty<int>(1);
var xs = rp.Take(5).ToArrayAsync();
rp.Value = 2;
rp.Value = 3;
rp.Value = 4;
rp.Value = 5;
(await xs).Should().BeEquivalentTo(1, 2, 3, 4, 5);
}
[Fact]
public async Task TakeWhile()
{
var rp = new AsyncReactiveProperty<int>(1);
var xs = rp.TakeWhile(x => x != 5).ToArrayAsync();
rp.Value = 2;
rp.Value = 3;
rp.Value = 4;
rp.Value = 5;
(await xs).Should().BeEquivalentTo(1, 2, 3, 4);
}
}
}

View File

@ -0,0 +1,95 @@
using System;
using System.Threading;
namespace Cysharp.Threading.Tasks.Linq
{
public static partial class UniTaskAsyncEnumerable
{
public static IUniTaskAsyncEnumerable<TSource> Queue<TSource>(this IUniTaskAsyncEnumerable<TSource> source)
{
return new QueueOperator<TSource>(source);
}
}
internal sealed class QueueOperator<TSource> : IUniTaskAsyncEnumerable<TSource>
{
readonly IUniTaskAsyncEnumerable<TSource> source;
public QueueOperator(IUniTaskAsyncEnumerable<TSource> source)
{
this.source = source;
}
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new _Queue(source, cancellationToken);
}
sealed class _Queue : IUniTaskAsyncEnumerator<TSource>
{
readonly IUniTaskAsyncEnumerable<TSource> source;
CancellationToken cancellationToken;
Channel<TSource> channel;
IUniTaskAsyncEnumerator<TSource> channelEnumerator;
IUniTaskAsyncEnumerator<TSource> sourceEnumerator;
public _Queue(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
{
this.source = source;
this.cancellationToken = cancellationToken;
}
public TSource Current => channelEnumerator.Current;
public UniTask<bool> MoveNextAsync()
{
cancellationToken.ThrowIfCancellationRequested();
if (sourceEnumerator == null)
{
sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
channel = Channel.CreateSingleConsumerUnbounded<TSource>();
channelEnumerator = channel.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken);
ConsumeAll(sourceEnumerator, channel).Forget();
}
return channelEnumerator.MoveNextAsync();
}
static async UniTaskVoid ConsumeAll(IUniTaskAsyncEnumerator<TSource> enumerator, ChannelWriter<TSource> writer)
{
try
{
while (await enumerator.MoveNextAsync())
{
writer.TryWrite(enumerator.Current);
}
writer.TryComplete();
}
catch (Exception ex)
{
writer.TryComplete(ex);
}
finally
{
await enumerator.DisposeAsync();
}
}
public async UniTask DisposeAsync()
{
if (sourceEnumerator != null)
{
await sourceEnumerator.DisposeAsync();
}
if (channelEnumerator != null)
{
await channelEnumerator.DisposeAsync();
}
}
}
}
}