Add UniTaskAsyncEnumerable.Create

master
neuecc 2020-07-04 06:29:33 +09:00
parent c65ae8d18e
commit 551128e64c
6 changed files with 456 additions and 100 deletions

View File

@ -295,71 +295,25 @@ namespace NetCoreSandbox
//await new ComparisonBenchmarks().ViaUniTaskT(); //await new ComparisonBenchmarks().ViaUniTaskT();
return; return;
#endif #endif
// await new AllocationCheck().ViaUniTaskVoid();
var buttonTest = new AsyncReactiveProperty<AsyncUnit>(AsyncUnit.Default); var e = UniTaskAsyncEnumerable.Create<int>(async (writer, token) =>
buttonTest
.Subscribe(async _ =>
{ {
try for (int i = 0; i < 5; i++)
{ {
await new Foo().MethodFooAsync(); Console.WriteLine($"Start {i}");
} await writer.YieldAsync(i);
catch (Exception e) Console.WriteLine($"End {i}");
{
Console.WriteLine(e.StackTrace);
} }
}); });
buttonTest.Value = AsyncUnit.Default; var ee = e.GetAsyncEnumerator();
while (await ee.MoveNextAsync())
// AsyncTest().Forge
Console.WriteLine("A?");
var a = await new ZeroAllocAsyncAwaitInDotNetCore().NanikaAsync(1, 2);
Console.WriteLine("RET:" + a);
await WhereSelect();
SynchronizationContext.SetSynchronizationContext(new MySyncContext());
await Aaa();
//AsyncTest().Forget();
// AsyncTest().Forget();
ThreadPool.SetMinThreads(100, 100);
//List<UniTask<int>> list = new List<UniTask<int>>();
for (int i = 0; i < short.MaxValue; i++)
{ {
//// list.Add(AsyncTest()); Console.WriteLine("ForEach " + ee.Current);
await YieldCore();
} }
//await UniTask.WhenAll(list);
//Console.WriteLine("TOGO");
//var a = await AsyncTest();
//var b = AsyncTest();
//var c = AsyncTest();
await YieldCore();
//await b;
//await c;
//foreach (var item in Cysharp.Threading.Tasks.Internal.TaskPool.GetCacheSizeInfo())
//{
// Console.WriteLine(item);
//}
Console.ReadLine();
} }
static async UniTask YieldCore() static async UniTask YieldCore()

View File

@ -0,0 +1,170 @@
#pragma warning disable CS1998
#pragma warning disable CS0162
using Cysharp.Threading.Tasks;
using Cysharp.Threading.Tasks.Linq;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests.Linq
{
public class CreateTest
{
[Fact]
public async Task SyncCreation()
{
var from = 10;
var count = 100;
var xs = await UniTaskAsyncEnumerable.Create<int>(async (writer, token) =>
{
for (int i = 0; i < count; i++)
{
await writer.YieldAsync(from + i);
}
}).ToArrayAsync();
var ys = await Range(from, count).AsUniTaskAsyncEnumerable().ToArrayAsync();
xs.Should().BeEquivalentTo(ys);
}
[Fact]
public async Task SyncManually()
{
var list = new List<int>();
var xs = UniTaskAsyncEnumerable.Create<int>(async (writer, token) =>
{
list.Add(100);
await writer.YieldAsync(10);
list.Add(200);
await writer.YieldAsync(20);
list.Add(300);
await writer.YieldAsync(30);
list.Add(400);
});
list.Should().BeEmpty();
var e = xs.GetAsyncEnumerator();
list.Should().BeEmpty();
await e.MoveNextAsync();
list.Should().BeEquivalentTo(100);
e.Current.Should().Be(10);
await e.MoveNextAsync();
list.Should().BeEquivalentTo(100, 200);
e.Current.Should().Be(20);
await e.MoveNextAsync();
list.Should().BeEquivalentTo(100, 200, 300);
e.Current.Should().Be(30);
(await e.MoveNextAsync()).Should().BeFalse();
list.Should().BeEquivalentTo(100, 200, 300, 400);
}
[Fact]
public async Task SyncExceptionFirst()
{
var from = 10;
var count = 100;
var xs = UniTaskAsyncEnumerable.Create<int>(async (writer, token) =>
{
for (int i = 0; i < count; i++)
{
throw new UniTaskTestException();
await writer.YieldAsync(from + i);
}
});
await Assert.ThrowsAsync<UniTaskTestException>(async () => await xs.ToArrayAsync());
}
[Fact]
public async Task SyncException()
{
var from = 10;
var count = 100;
var xs = UniTaskAsyncEnumerable.Create<int>(async (writer, token) =>
{
for (int i = 0; i < count; i++)
{
await writer.YieldAsync(from + i);
if (i == 15)
{
throw new UniTaskTestException();
}
}
});
await Assert.ThrowsAsync<UniTaskTestException>(async () => await xs.ToArrayAsync());
}
[Fact]
public async Task ASyncManually()
{
var list = new List<int>();
var xs = UniTaskAsyncEnumerable.Create<int>(async (writer, token) =>
{
await UniTask.Yield();
list.Add(100);
await writer.YieldAsync(10);
await UniTask.Yield();
list.Add(200);
await writer.YieldAsync(20);
await UniTask.Yield();
list.Add(300);
await UniTask.Yield();
await writer.YieldAsync(30);
await UniTask.Yield();
list.Add(400);
});
list.Should().BeEmpty();
var e = xs.GetAsyncEnumerator();
list.Should().BeEmpty();
await e.MoveNextAsync();
list.Should().BeEquivalentTo(100);
e.Current.Should().Be(10);
await e.MoveNextAsync();
list.Should().BeEquivalentTo(100, 200);
e.Current.Should().Be(20);
await e.MoveNextAsync();
list.Should().BeEquivalentTo(100, 200, 300);
e.Current.Should().Be(30);
(await e.MoveNextAsync()).Should().BeFalse();
list.Should().BeEquivalentTo(100, 200, 300, 400);
}
async IAsyncEnumerable<int> Range(int from, int count)
{
for (int i = 0; i < count; i++)
{
yield return from + i;
}
}
}
}

View File

@ -0,0 +1,192 @@
using Cysharp.Threading.Tasks.Internal;
using System;
using System.Threading;
namespace Cysharp.Threading.Tasks.Linq
{
public static partial class UniTaskAsyncEnumerable
{
public static IUniTaskAsyncEnumerable<T> Create<T>(Func<IAsyncWriter<T>, CancellationToken, UniTask> create)
{
Error.ThrowArgumentNullException(create, nameof(create));
return new Create<T>(create);
}
}
public interface IAsyncWriter<T>
{
UniTask YieldAsync(T value);
}
internal sealed class Create<T> : IUniTaskAsyncEnumerable<T>
{
readonly Func<IAsyncWriter<T>, CancellationToken, UniTask> create;
public Create(Func<IAsyncWriter<T>, CancellationToken, UniTask> create)
{
this.create = create;
}
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new _Create(create, cancellationToken);
}
sealed class _Create : MoveNextSource, IUniTaskAsyncEnumerator<T>
{
readonly Func<IAsyncWriter<T>, CancellationToken, UniTask> create;
readonly CancellationToken cancellationToken;
int state = -1;
AsyncWriter writer;
public _Create(Func<IAsyncWriter<T>, CancellationToken, UniTask> create, CancellationToken cancellationToken)
{
this.create = create;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public T Current { get; private set; }
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
return default;
}
public UniTask<bool> MoveNextAsync()
{
if (state == -2) return default;
completionSource.Reset();
MoveNext();
return new UniTask<bool>(this, completionSource.Version);
}
void MoveNext()
{
try
{
switch (state)
{
case -1: // init
{
writer = new AsyncWriter(this);
RunWriterTask(create(writer, cancellationToken)).Forget();
if (Volatile.Read(ref state) == -2)
{
return; // complete synchronously
}
state = 0; // wait YieldAsync, it set TrySetResult(true)
return;
}
case 0:
writer.SignalWriter();
return;
default:
goto DONE;
}
}
catch (Exception ex)
{
state = -2;
completionSource.TrySetException(ex);
return;
}
DONE:
state = -2;
completionSource.TrySetResult(false);
return;
}
async UniTaskVoid RunWriterTask(UniTask task)
{
try
{
await task;
goto DONE;
}
catch (Exception ex)
{
Volatile.Write(ref state, -2);
completionSource.TrySetException(ex);
return;
}
DONE:
Volatile.Write(ref state, -2);
completionSource.TrySetResult(false);
}
public bool TrySetCanceled(CancellationToken cancellationToken)
{
state = -2;
return completionSource.TrySetCanceled(cancellationToken);
}
public bool TrySetComplete()
{
state = -2;
return completionSource.TrySetResult(false);
}
public bool TrySetException(Exception error)
{
state = -2;
return completionSource.TrySetException(error);
}
public void SetResult(T value)
{
Current = value;
completionSource.TrySetResult(true);
}
}
sealed class AsyncWriter : IUniTaskSource, IAsyncWriter<T>
{
readonly _Create enumerator;
UniTaskCompletionSourceCore<AsyncUnit> core;
public AsyncWriter(_Create enumerator)
{
this.enumerator = enumerator;
}
public void GetResult(short token)
{
core.GetResult(token);
}
public UniTaskStatus GetStatus(short token)
{
return core.GetStatus(token);
}
public UniTaskStatus UnsafeGetStatus()
{
return core.UnsafeGetStatus();
}
public void OnCompleted(Action<object> continuation, object state, short token)
{
core.OnCompleted(continuation, state, token);
}
public UniTask YieldAsync(T value)
{
core.Reset();
enumerator.SetResult(value);
return new UniTask(this, core.Version);
}
public void SignalWriter()
{
core.TrySetResult(AsyncUnit.Default);
}
}
}
}

View File

@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 0202f723469f93945afa063bfb440d15
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -1,46 +1,52 @@
using System; using Cysharp.Threading.Tasks;
using System.Threading.Tasks; using System;
using Cysharp.Threading.Tasks; using System.Collections.Generic;
using Cysharp.Threading.Tasks.Linq;
using UnityEngine; using UnityEngine;
using UnityEngine.Networking;
using UnityEngine.UI;
/*UNniTastWhenAnyTester*/
[ExecuteInEditMode]
public class ExceptionExamples : MonoBehaviour public class ExceptionExamples : MonoBehaviour
{ {
public Button ButtonTest; public bool apply = false;
void Start() private async UniTaskVoid Update()
{ {
ButtonTest.OnClickAsAsyncEnumerable() if (apply)
.Subscribe(async _ => {
apply = false;
await LaunchTasksAndDetectWhenAnyDone(5);
}
}
private async UniTask LaunchTasksAndDetectWhenAnyDone(int nbTasks)
{
List<UniTask<int>> sleeptasks = new List<UniTask<int>>();
for (int i = 0; i < nbTasks; i++)
{
sleeptasks.Add(SleepAndReturnTrue(i).ToAsyncLazy().Task);
}
while (sleeptasks.Count > 0)
{
Debug.Log(DateTime.Now.ToString() + " waiting for " + sleeptasks.Count + " tasks...");
try
{ {
try (int index, int taskID) = await UniTask.WhenAny(sleeptasks);
{ Debug.Log(DateTime.Now.ToString() + " Sleep task " + taskID + " done");
await new Foo().MethodFooAsync(); sleeptasks.RemoveAt(index);
} }
catch (Exception e) catch
{ {
Debug.Log(e.StackTrace); throw;
} //Debug.Log("Error: " + e.Message);
}, this.GetCancellationTokenOnDestroy()); //return;
} }
} }
}
class Foo
{ private async UniTask<int> SleepAndReturnTrue(int taskIndex)
public async UniTask MethodFooAsync() {
{ await UniTask.Delay(100);
await MethodBarAsync(); return taskIndex;
}
private async UniTask MethodBarAsync()
{
Throw();
}
private void Throw()
{
throw new Exception();
} }
} }

View File

@ -459,18 +459,41 @@ public class SandboxMain : MonoBehaviour
PrepareCamera(); PrepareCamera();
} }
public IUniTaskAsyncEnumerable<int> MyEveryUpdate()
{
return UniTaskAsyncEnumerable.Create<int>(async (writer, token) =>
{
var frameCount = 0;
await UniTask.Yield();
while (!token.IsCancellationRequested)
{
await writer.YieldAsync(frameCount++); // instead of `yield return`
await UniTask.Yield();
}
});
}
async UniTaskVoid Start() async UniTaskVoid Start()
{ {
var cts = new CancellationTokenSource();
okButton.onClick.AddListener(() => okButton.onClick.AddListener(() =>
{ {
ShootAsync().Forget(); cts.Cancel();
}); });
// Nanika(); //// Nanika();
//await UniTask.Yield();
await MyEveryUpdate().Select(x => x).Where(x => x % 2 == 0).ForEachAsync(x =>
{
UnityEngine.Debug.Log(x + ":" + Time.frameCount);
}, cts.Token);
await UniTask.Yield();
// this.GetCancellationTokenOnDestroy() // this.GetCancellationTokenOnDestroy()
//PlayerLoopInfo.Inject(); //PlayerLoopInfo.Inject();
@ -992,17 +1015,17 @@ public class SandboxMain : MonoBehaviour
void PrepareCamera() void PrepareCamera()
{ {
Debug.Log("Support AsyncGPUReadback:" + SystemInfo.supportsAsyncGPUReadback); //Debug.Log("Support AsyncGPUReadback:" + SystemInfo.supportsAsyncGPUReadback);
var width = 480; //var width = 480;
var height = 240; //var height = 240;
var depth = 24; //var depth = 24;
mycamera.targetTexture = new RenderTexture(width, height, depth, RenderTextureFormat.ARGB32, RenderTextureReadWrite.Default) //mycamera.targetTexture = new RenderTexture(width, height, depth, RenderTextureFormat.ARGB32, RenderTextureReadWrite.Default)
{ //{
antiAliasing = 8 // antiAliasing = 8
}; //};
mycamera.enabled = true; //mycamera.enabled = true;
//myRenderTexture = new RenderTexture(width, height, depth, RenderTextureFormat.ARGB32, RenderTextureReadWrite.Default) //myRenderTexture = new RenderTexture(width, height, depth, RenderTextureFormat.ARGB32, RenderTextureReadWrite.Default)
//{ //{