refactor Where
parent
239bf749b6
commit
e1d5359d73
|
@ -246,6 +246,20 @@ namespace NetCoreSandbox
|
||||||
Console.WriteLine("FooBarAsync End");
|
Console.WriteLine("FooBarAsync End");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static async UniTask WhereSelect()
|
||||||
|
{
|
||||||
|
await foreach (var item in UniTaskAsyncEnumerable.Range(1, 10)
|
||||||
|
.SelectAwait(async x =>
|
||||||
|
{
|
||||||
|
await UniTask.Yield();
|
||||||
|
return x;
|
||||||
|
})
|
||||||
|
.Where(x => x % 2 == 0))
|
||||||
|
{
|
||||||
|
Console.WriteLine(item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static async Task Main(string[] args)
|
static async Task Main(string[] args)
|
||||||
{
|
{
|
||||||
|
@ -264,6 +278,7 @@ namespace NetCoreSandbox
|
||||||
// await new AllocationCheck().ViaUniTaskVoid();
|
// await new AllocationCheck().ViaUniTaskVoid();
|
||||||
|
|
||||||
// AsyncTest().Forget();
|
// AsyncTest().Forget();
|
||||||
|
await WhereSelect();
|
||||||
|
|
||||||
SynchronizationContext.SetSynchronizationContext(new MySyncContext());
|
SynchronizationContext.SetSynchronizationContext(new MySyncContext());
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,9 @@ using System.Threading;
|
||||||
|
|
||||||
namespace Cysharp.Threading.Tasks.Linq
|
namespace Cysharp.Threading.Tasks.Linq
|
||||||
{
|
{
|
||||||
public abstract class AsyncEnumeratorBase<TSource, TResult> : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
|
// note: refactor all inherit class and should remove this.
|
||||||
|
// see Select and Where.
|
||||||
|
internal abstract class AsyncEnumeratorBase<TSource, TResult> : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
|
||||||
{
|
{
|
||||||
static readonly Action<object> moveNextCallbackDelegate = MoveNextCallBack;
|
static readonly Action<object> moveNextCallbackDelegate = MoveNextCallBack;
|
||||||
|
|
||||||
|
@ -129,7 +131,7 @@ namespace Cysharp.Threading.Tasks.Linq
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract class AsyncEnumeratorAwaitSelectorBase<TSource, TResult, TAwait> : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
|
internal abstract class AsyncEnumeratorAwaitSelectorBase<TSource, TResult, TAwait> : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
|
||||||
{
|
{
|
||||||
static readonly Action<object> moveNextCallbackDelegate = MoveNextCallBack;
|
static readonly Action<object> moveNextCallbackDelegate = MoveNextCallBack;
|
||||||
static readonly Action<object> setCurrentCallbackDelegate = SetCurrentCallBack;
|
static readonly Action<object> setCurrentCallbackDelegate = SetCurrentCallBack;
|
||||||
|
@ -351,5 +353,4 @@ namespace Cysharp.Threading.Tasks.Linq
|
||||||
return default;
|
return default;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -71,36 +71,101 @@ namespace Cysharp.Threading.Tasks.Linq
|
||||||
return new _Where(source, predicate, cancellationToken);
|
return new _Where(source, predicate, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
class _Where : AsyncEnumeratorBase<TSource, TSource>
|
sealed class _Where : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
|
||||||
{
|
{
|
||||||
|
readonly IUniTaskAsyncEnumerable<TSource> source;
|
||||||
readonly Func<TSource, bool> predicate;
|
readonly Func<TSource, bool> predicate;
|
||||||
|
readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
|
int state = -1;
|
||||||
|
IUniTaskAsyncEnumerator<TSource> enumerator;
|
||||||
|
UniTask<bool>.Awaiter awaiter;
|
||||||
|
Action moveNextAction;
|
||||||
|
|
||||||
public _Where(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
|
public _Where(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
|
||||||
|
|
||||||
: base(source, cancellationToken)
|
|
||||||
{
|
{
|
||||||
|
this.source = source;
|
||||||
this.predicate = predicate;
|
this.predicate = predicate;
|
||||||
|
this.cancellationToken = cancellationToken;
|
||||||
|
this.moveNextAction = MoveNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override bool TryMoveNextCore(bool sourceHasCurrent, out bool result)
|
public TSource Current { get; private set; }
|
||||||
|
|
||||||
|
public UniTask<bool> MoveNextAsync()
|
||||||
{
|
{
|
||||||
if (sourceHasCurrent)
|
if (state == -2) return default;
|
||||||
|
|
||||||
|
completionSource.Reset();
|
||||||
|
MoveNext();
|
||||||
|
return new UniTask<bool>(this, completionSource.Version);
|
||||||
|
}
|
||||||
|
|
||||||
|
void MoveNext()
|
||||||
|
{
|
||||||
|
REPEAT:
|
||||||
|
try
|
||||||
{
|
{
|
||||||
if (predicate(SourceCurrent))
|
switch (state)
|
||||||
{
|
{
|
||||||
Current = SourceCurrent;
|
case -1: // init
|
||||||
result = true;
|
enumerator = source.GetAsyncEnumerator(cancellationToken);
|
||||||
return true;
|
goto case 0;
|
||||||
}
|
case 0:
|
||||||
else
|
awaiter = enumerator.MoveNextAsync().GetAwaiter();
|
||||||
{
|
if (awaiter.IsCompleted)
|
||||||
result = default;
|
{
|
||||||
return false;
|
goto case 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 1;
|
||||||
|
awaiter.UnsafeOnCompleted(moveNextAction);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 1:
|
||||||
|
if (awaiter.GetResult())
|
||||||
|
{
|
||||||
|
Current = enumerator.Current;
|
||||||
|
if (predicate(Current))
|
||||||
|
{
|
||||||
|
goto CONTINUE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 0;
|
||||||
|
goto REPEAT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
goto DONE;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
goto DONE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
state = -2;
|
||||||
|
completionSource.TrySetException(ex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
result = false;
|
DONE:
|
||||||
return true;
|
state = -2;
|
||||||
|
completionSource.TrySetResult(false);
|
||||||
|
return;
|
||||||
|
|
||||||
|
CONTINUE:
|
||||||
|
state = 0;
|
||||||
|
completionSource.TrySetResult(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UniTask DisposeAsync()
|
||||||
|
{
|
||||||
|
return enumerator.DisposeAsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -118,40 +183,105 @@ namespace Cysharp.Threading.Tasks.Linq
|
||||||
|
|
||||||
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
|
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
return new _WhereInt(source, predicate, cancellationToken);
|
return new _Where(source, predicate, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
class _WhereInt : AsyncEnumeratorBase<TSource, TSource>
|
sealed class _Where : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
|
||||||
{
|
{
|
||||||
|
readonly IUniTaskAsyncEnumerable<TSource> source;
|
||||||
readonly Func<TSource, int, bool> predicate;
|
readonly Func<TSource, int, bool> predicate;
|
||||||
|
readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
|
int state = -1;
|
||||||
|
IUniTaskAsyncEnumerator<TSource> enumerator;
|
||||||
|
UniTask<bool>.Awaiter awaiter;
|
||||||
|
Action moveNextAction;
|
||||||
int index;
|
int index;
|
||||||
|
|
||||||
public _WhereInt(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate, CancellationToken cancellationToken)
|
public _Where(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate, CancellationToken cancellationToken)
|
||||||
|
|
||||||
: base(source, cancellationToken)
|
|
||||||
{
|
{
|
||||||
|
this.source = source;
|
||||||
this.predicate = predicate;
|
this.predicate = predicate;
|
||||||
|
this.cancellationToken = cancellationToken;
|
||||||
|
this.moveNextAction = MoveNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override bool TryMoveNextCore(bool sourceHasCurrent, out bool result)
|
public TSource Current { get; private set; }
|
||||||
|
|
||||||
|
public UniTask<bool> MoveNextAsync()
|
||||||
{
|
{
|
||||||
if (sourceHasCurrent)
|
if (state == -2) return default;
|
||||||
|
|
||||||
|
completionSource.Reset();
|
||||||
|
MoveNext();
|
||||||
|
return new UniTask<bool>(this, completionSource.Version);
|
||||||
|
}
|
||||||
|
|
||||||
|
void MoveNext()
|
||||||
|
{
|
||||||
|
REPEAT:
|
||||||
|
try
|
||||||
{
|
{
|
||||||
if (predicate(SourceCurrent, checked(index++)))
|
switch (state)
|
||||||
{
|
{
|
||||||
Current = SourceCurrent;
|
case -1: // init
|
||||||
result = true;
|
enumerator = source.GetAsyncEnumerator(cancellationToken);
|
||||||
return true;
|
goto case 0;
|
||||||
}
|
case 0:
|
||||||
else
|
awaiter = enumerator.MoveNextAsync().GetAwaiter();
|
||||||
{
|
if (awaiter.IsCompleted)
|
||||||
result = default;
|
{
|
||||||
return false;
|
goto case 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 1;
|
||||||
|
awaiter.UnsafeOnCompleted(moveNextAction);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 1:
|
||||||
|
if (awaiter.GetResult())
|
||||||
|
{
|
||||||
|
Current = enumerator.Current;
|
||||||
|
if (predicate(Current, checked(index++)))
|
||||||
|
{
|
||||||
|
goto CONTINUE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 0;
|
||||||
|
goto REPEAT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
goto DONE;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
goto DONE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
state = -2;
|
||||||
|
completionSource.TrySetException(ex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
result = false;
|
DONE:
|
||||||
return true;
|
state = -2;
|
||||||
|
completionSource.TrySetResult(false);
|
||||||
|
return;
|
||||||
|
|
||||||
|
CONTINUE:
|
||||||
|
state = 0;
|
||||||
|
completionSource.TrySetResult(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UniTask DisposeAsync()
|
||||||
|
{
|
||||||
|
return enumerator.DisposeAsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -172,34 +302,115 @@ namespace Cysharp.Threading.Tasks.Linq
|
||||||
return new _WhereAwait(source, predicate, cancellationToken);
|
return new _WhereAwait(source, predicate, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
class _WhereAwait : AsyncEnumeratorAwaitSelectorBase<TSource, TSource, bool>
|
sealed class _WhereAwait : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
|
||||||
{
|
{
|
||||||
|
readonly IUniTaskAsyncEnumerable<TSource> source;
|
||||||
readonly Func<TSource, UniTask<bool>> predicate;
|
readonly Func<TSource, UniTask<bool>> predicate;
|
||||||
|
readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
|
int state = -1;
|
||||||
|
IUniTaskAsyncEnumerator<TSource> enumerator;
|
||||||
|
UniTask<bool>.Awaiter awaiter;
|
||||||
|
UniTask<bool>.Awaiter awaiter2;
|
||||||
|
Action moveNextAction;
|
||||||
|
|
||||||
public _WhereAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<bool>> predicate, CancellationToken cancellationToken)
|
public _WhereAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<bool>> predicate, CancellationToken cancellationToken)
|
||||||
|
|
||||||
: base(source, cancellationToken)
|
|
||||||
{
|
{
|
||||||
|
this.source = source;
|
||||||
this.predicate = predicate;
|
this.predicate = predicate;
|
||||||
|
this.cancellationToken = cancellationToken;
|
||||||
|
this.moveNextAction = MoveNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override UniTask<bool> TransformAsync(TSource sourceCurrent)
|
public TSource Current { get; private set; }
|
||||||
|
|
||||||
|
public UniTask<bool> MoveNextAsync()
|
||||||
{
|
{
|
||||||
return predicate(sourceCurrent);
|
if (state == -2) return default;
|
||||||
|
|
||||||
|
completionSource.Reset();
|
||||||
|
MoveNext();
|
||||||
|
return new UniTask<bool>(this, completionSource.Version);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override bool TrySetCurrentCore(bool awaitResult, out bool terminateIteration)
|
void MoveNext()
|
||||||
{
|
{
|
||||||
terminateIteration = false;
|
REPEAT:
|
||||||
if (awaitResult)
|
try
|
||||||
{
|
{
|
||||||
Current = SourceCurrent;
|
switch (state)
|
||||||
return true;
|
{
|
||||||
|
case -1: // init
|
||||||
|
enumerator = source.GetAsyncEnumerator(cancellationToken);
|
||||||
|
goto case 0;
|
||||||
|
case 0:
|
||||||
|
awaiter = enumerator.MoveNextAsync().GetAwaiter();
|
||||||
|
if (awaiter.IsCompleted)
|
||||||
|
{
|
||||||
|
goto case 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 1;
|
||||||
|
awaiter.UnsafeOnCompleted(moveNextAction);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 1:
|
||||||
|
if (awaiter.GetResult())
|
||||||
|
{
|
||||||
|
Current = enumerator.Current;
|
||||||
|
|
||||||
|
awaiter2 = predicate(Current).GetAwaiter();
|
||||||
|
if (awaiter2.IsCompleted)
|
||||||
|
{
|
||||||
|
goto case 2;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 2;
|
||||||
|
awaiter2.UnsafeOnCompleted(moveNextAction);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
goto DONE;
|
||||||
|
}
|
||||||
|
case 2:
|
||||||
|
if (awaiter2.GetResult())
|
||||||
|
{
|
||||||
|
goto CONTINUE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 0;
|
||||||
|
goto REPEAT;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
goto DONE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
return false;
|
state = -2;
|
||||||
|
completionSource.TrySetException(ex);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DONE:
|
||||||
|
state = -2;
|
||||||
|
completionSource.TrySetResult(false);
|
||||||
|
return;
|
||||||
|
|
||||||
|
CONTINUE:
|
||||||
|
state = 0;
|
||||||
|
completionSource.TrySetResult(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UniTask DisposeAsync()
|
||||||
|
{
|
||||||
|
return enumerator.DisposeAsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -217,44 +428,123 @@ namespace Cysharp.Threading.Tasks.Linq
|
||||||
|
|
||||||
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
|
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
return new _WhereIntAwait(source, predicate, cancellationToken);
|
return new _WhereAwait(source, predicate, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
class _WhereIntAwait : AsyncEnumeratorAwaitSelectorBase<TSource, TSource, bool>
|
sealed class _WhereAwait : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
|
||||||
{
|
{
|
||||||
|
readonly IUniTaskAsyncEnumerable<TSource> source;
|
||||||
readonly Func<TSource, int, UniTask<bool>> predicate;
|
readonly Func<TSource, int, UniTask<bool>> predicate;
|
||||||
|
readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
|
int state = -1;
|
||||||
|
IUniTaskAsyncEnumerator<TSource> enumerator;
|
||||||
|
UniTask<bool>.Awaiter awaiter;
|
||||||
|
UniTask<bool>.Awaiter awaiter2;
|
||||||
|
Action moveNextAction;
|
||||||
int index;
|
int index;
|
||||||
|
|
||||||
public _WhereIntAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, UniTask<bool>> predicate, CancellationToken cancellationToken)
|
public _WhereAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, UniTask<bool>> predicate, CancellationToken cancellationToken)
|
||||||
|
|
||||||
: base(source, cancellationToken)
|
|
||||||
{
|
{
|
||||||
|
this.source = source;
|
||||||
this.predicate = predicate;
|
this.predicate = predicate;
|
||||||
|
this.cancellationToken = cancellationToken;
|
||||||
|
this.moveNextAction = MoveNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override UniTask<bool> TransformAsync(TSource sourceCurrent)
|
public TSource Current { get; private set; }
|
||||||
|
|
||||||
|
public UniTask<bool> MoveNextAsync()
|
||||||
{
|
{
|
||||||
return predicate(sourceCurrent, checked(index++));
|
if (state == -2) return default;
|
||||||
|
|
||||||
|
completionSource.Reset();
|
||||||
|
MoveNext();
|
||||||
|
return new UniTask<bool>(this, completionSource.Version);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override bool TrySetCurrentCore(bool awaitResult, out bool terminateIteration)
|
void MoveNext()
|
||||||
{
|
{
|
||||||
terminateIteration = false;
|
REPEAT:
|
||||||
if (awaitResult)
|
try
|
||||||
{
|
{
|
||||||
Current = SourceCurrent;
|
switch (state)
|
||||||
return true;
|
{
|
||||||
|
case -1: // init
|
||||||
|
enumerator = source.GetAsyncEnumerator(cancellationToken);
|
||||||
|
goto case 0;
|
||||||
|
case 0:
|
||||||
|
awaiter = enumerator.MoveNextAsync().GetAwaiter();
|
||||||
|
if (awaiter.IsCompleted)
|
||||||
|
{
|
||||||
|
goto case 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 1;
|
||||||
|
awaiter.UnsafeOnCompleted(moveNextAction);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 1:
|
||||||
|
if (awaiter.GetResult())
|
||||||
|
{
|
||||||
|
Current = enumerator.Current;
|
||||||
|
|
||||||
|
awaiter2 = predicate(Current, checked(index++)).GetAwaiter();
|
||||||
|
if (awaiter2.IsCompleted)
|
||||||
|
{
|
||||||
|
goto case 2;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 2;
|
||||||
|
awaiter2.UnsafeOnCompleted(moveNextAction);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
goto DONE;
|
||||||
|
}
|
||||||
|
case 2:
|
||||||
|
if (awaiter2.GetResult())
|
||||||
|
{
|
||||||
|
goto CONTINUE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 0;
|
||||||
|
goto REPEAT;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
goto DONE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
return false;
|
state = -2;
|
||||||
|
completionSource.TrySetException(ex);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DONE:
|
||||||
|
state = -2;
|
||||||
|
completionSource.TrySetResult(false);
|
||||||
|
return;
|
||||||
|
|
||||||
|
CONTINUE:
|
||||||
|
state = 0;
|
||||||
|
completionSource.TrySetResult(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UniTask DisposeAsync()
|
||||||
|
{
|
||||||
|
return enumerator.DisposeAsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
internal sealed class WhereAwaitWithCancellation<TSource> : IUniTaskAsyncEnumerable<TSource>
|
internal sealed class WhereAwaitWithCancellation<TSource> : IUniTaskAsyncEnumerable<TSource>
|
||||||
{
|
{
|
||||||
readonly IUniTaskAsyncEnumerable<TSource> source;
|
readonly IUniTaskAsyncEnumerable<TSource> source;
|
||||||
|
@ -271,34 +561,115 @@ namespace Cysharp.Threading.Tasks.Linq
|
||||||
return new _WhereAwaitWithCancellation(source, predicate, cancellationToken);
|
return new _WhereAwaitWithCancellation(source, predicate, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
class _WhereAwaitWithCancellation : AsyncEnumeratorAwaitSelectorBase<TSource, TSource, bool>
|
sealed class _WhereAwaitWithCancellation : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
|
||||||
{
|
{
|
||||||
|
readonly IUniTaskAsyncEnumerable<TSource> source;
|
||||||
readonly Func<TSource, CancellationToken, UniTask<bool>> predicate;
|
readonly Func<TSource, CancellationToken, UniTask<bool>> predicate;
|
||||||
|
readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
|
int state = -1;
|
||||||
|
IUniTaskAsyncEnumerator<TSource> enumerator;
|
||||||
|
UniTask<bool>.Awaiter awaiter;
|
||||||
|
UniTask<bool>.Awaiter awaiter2;
|
||||||
|
Action moveNextAction;
|
||||||
|
|
||||||
public _WhereAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<bool>> predicate, CancellationToken cancellationToken)
|
public _WhereAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<bool>> predicate, CancellationToken cancellationToken)
|
||||||
|
|
||||||
: base(source, cancellationToken)
|
|
||||||
{
|
{
|
||||||
|
this.source = source;
|
||||||
this.predicate = predicate;
|
this.predicate = predicate;
|
||||||
|
this.cancellationToken = cancellationToken;
|
||||||
|
this.moveNextAction = MoveNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override UniTask<bool> TransformAsync(TSource sourceCurrent)
|
public TSource Current { get; private set; }
|
||||||
|
|
||||||
|
public UniTask<bool> MoveNextAsync()
|
||||||
{
|
{
|
||||||
return predicate(sourceCurrent, cancellationToken);
|
if (state == -2) return default;
|
||||||
|
|
||||||
|
completionSource.Reset();
|
||||||
|
MoveNext();
|
||||||
|
return new UniTask<bool>(this, completionSource.Version);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override bool TrySetCurrentCore(bool awaitResult, out bool terminateIteration)
|
void MoveNext()
|
||||||
{
|
{
|
||||||
terminateIteration = false;
|
REPEAT:
|
||||||
if (awaitResult)
|
try
|
||||||
{
|
{
|
||||||
Current = SourceCurrent;
|
switch (state)
|
||||||
return true;
|
{
|
||||||
|
case -1: // init
|
||||||
|
enumerator = source.GetAsyncEnumerator(cancellationToken);
|
||||||
|
goto case 0;
|
||||||
|
case 0:
|
||||||
|
awaiter = enumerator.MoveNextAsync().GetAwaiter();
|
||||||
|
if (awaiter.IsCompleted)
|
||||||
|
{
|
||||||
|
goto case 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 1;
|
||||||
|
awaiter.UnsafeOnCompleted(moveNextAction);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 1:
|
||||||
|
if (awaiter.GetResult())
|
||||||
|
{
|
||||||
|
Current = enumerator.Current;
|
||||||
|
|
||||||
|
awaiter2 = predicate(Current, cancellationToken).GetAwaiter();
|
||||||
|
if (awaiter2.IsCompleted)
|
||||||
|
{
|
||||||
|
goto case 2;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 2;
|
||||||
|
awaiter2.UnsafeOnCompleted(moveNextAction);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
goto DONE;
|
||||||
|
}
|
||||||
|
case 2:
|
||||||
|
if (awaiter2.GetResult())
|
||||||
|
{
|
||||||
|
goto CONTINUE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 0;
|
||||||
|
goto REPEAT;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
goto DONE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
return false;
|
state = -2;
|
||||||
|
completionSource.TrySetException(ex);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DONE:
|
||||||
|
state = -2;
|
||||||
|
completionSource.TrySetResult(false);
|
||||||
|
return;
|
||||||
|
|
||||||
|
CONTINUE:
|
||||||
|
state = 0;
|
||||||
|
completionSource.TrySetResult(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UniTask DisposeAsync()
|
||||||
|
{
|
||||||
|
return enumerator.DisposeAsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -316,40 +687,120 @@ namespace Cysharp.Threading.Tasks.Linq
|
||||||
|
|
||||||
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
|
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
return new _WhereIntAwaitWithCancellation(source, predicate, cancellationToken);
|
return new _WhereAwaitWithCancellation(source, predicate, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
class _WhereIntAwaitWithCancellation : AsyncEnumeratorAwaitSelectorBase<TSource, TSource, bool>
|
sealed class _WhereAwaitWithCancellation : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
|
||||||
{
|
{
|
||||||
|
readonly IUniTaskAsyncEnumerable<TSource> source;
|
||||||
readonly Func<TSource, int, CancellationToken, UniTask<bool>> predicate;
|
readonly Func<TSource, int, CancellationToken, UniTask<bool>> predicate;
|
||||||
|
readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
|
int state = -1;
|
||||||
|
IUniTaskAsyncEnumerator<TSource> enumerator;
|
||||||
|
UniTask<bool>.Awaiter awaiter;
|
||||||
|
UniTask<bool>.Awaiter awaiter2;
|
||||||
|
Action moveNextAction;
|
||||||
int index;
|
int index;
|
||||||
|
|
||||||
public _WhereIntAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, UniTask<bool>> predicate, CancellationToken cancellationToken)
|
public _WhereAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, UniTask<bool>> predicate, CancellationToken cancellationToken)
|
||||||
|
|
||||||
: base(source, cancellationToken)
|
|
||||||
{
|
{
|
||||||
|
this.source = source;
|
||||||
this.predicate = predicate;
|
this.predicate = predicate;
|
||||||
|
this.cancellationToken = cancellationToken;
|
||||||
|
this.moveNextAction = MoveNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override UniTask<bool> TransformAsync(TSource sourceCurrent)
|
public TSource Current { get; private set; }
|
||||||
|
|
||||||
|
public UniTask<bool> MoveNextAsync()
|
||||||
{
|
{
|
||||||
return predicate(sourceCurrent, checked(index++), cancellationToken);
|
if (state == -2) return default;
|
||||||
|
|
||||||
|
completionSource.Reset();
|
||||||
|
MoveNext();
|
||||||
|
return new UniTask<bool>(this, completionSource.Version);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override bool TrySetCurrentCore(bool awaitResult, out bool terminateIteration)
|
void MoveNext()
|
||||||
{
|
{
|
||||||
terminateIteration = false;
|
REPEAT:
|
||||||
if (awaitResult)
|
try
|
||||||
{
|
{
|
||||||
Current = SourceCurrent;
|
switch (state)
|
||||||
return true;
|
{
|
||||||
|
case -1: // init
|
||||||
|
enumerator = source.GetAsyncEnumerator(cancellationToken);
|
||||||
|
goto case 0;
|
||||||
|
case 0:
|
||||||
|
awaiter = enumerator.MoveNextAsync().GetAwaiter();
|
||||||
|
if (awaiter.IsCompleted)
|
||||||
|
{
|
||||||
|
goto case 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 1;
|
||||||
|
awaiter.UnsafeOnCompleted(moveNextAction);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 1:
|
||||||
|
if (awaiter.GetResult())
|
||||||
|
{
|
||||||
|
Current = enumerator.Current;
|
||||||
|
|
||||||
|
awaiter2 = predicate(Current, checked(index++), cancellationToken).GetAwaiter();
|
||||||
|
if (awaiter2.IsCompleted)
|
||||||
|
{
|
||||||
|
goto case 2;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 2;
|
||||||
|
awaiter2.UnsafeOnCompleted(moveNextAction);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
goto DONE;
|
||||||
|
}
|
||||||
|
case 2:
|
||||||
|
if (awaiter2.GetResult())
|
||||||
|
{
|
||||||
|
goto CONTINUE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = 0;
|
||||||
|
goto REPEAT;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
goto DONE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
return false;
|
state = -2;
|
||||||
|
completionSource.TrySetException(ex);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DONE:
|
||||||
|
state = -2;
|
||||||
|
completionSource.TrySetResult(false);
|
||||||
|
return;
|
||||||
|
|
||||||
|
CONTINUE:
|
||||||
|
state = 0;
|
||||||
|
completionSource.TrySetResult(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UniTask DisposeAsync()
|
||||||
|
{
|
||||||
|
return enumerator.DisposeAsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue