From 2e4fe90956992bcc91c61ca327a988d78bcb2e09 Mon Sep 17 00:00:00 2001 From: neuecc Date: Mon, 18 May 2020 23:33:13 +0900 Subject: [PATCH] Fix ChannelReader.Completion throws UnobservedException when not touched --- .../Assets/Plugins/UniTask/Runtime/Channel.cs | 56 ++++- .../Runtime/UniTaskCompletionSource.cs | 9 +- src/UniTask/Assets/Scenes/SandboxMain.cs | 214 +++++++----------- 3 files changed, 134 insertions(+), 145 deletions(-) diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Channel.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/Channel.cs index bbed075..7bfefbf 100644 --- a/src/UniTask/Assets/Plugins/UniTask/Runtime/Channel.cs +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Channel.cs @@ -91,7 +91,8 @@ namespace Cysharp.Threading.Tasks { readonly Queue items; readonly SingleConsumerUnboundedChannelReader readerSource; - readonly UniTaskCompletionSource completedTask; + UniTaskCompletionSource completedTaskSource; + UniTask completedTask; Exception completionError; bool closed; @@ -99,7 +100,6 @@ namespace Cysharp.Threading.Tasks public SingleConsumerUnboundedChannel() { items = new Queue(); - completedTask = new UniTaskCompletionSource(); Writer = new SingleConsumerUnboundedChannelWriter(this); readerSource = new SingleConsumerUnboundedChannelReader(this); Reader = readerSource; @@ -146,11 +146,25 @@ namespace Cysharp.Threading.Tasks { if (error == null) { - parent.completedTask.TrySetResult(); + if (parent.completedTaskSource != null) + { + parent.completedTaskSource.TrySetResult(); + } + else + { + parent.completedTask = UniTask.CompletedTask; + } } else { - parent.completedTask.TrySetException(error); + if (parent.completedTaskSource != null) + { + parent.completedTaskSource.TrySetException(error); + } + else + { + parent.completedTask = UniTask.FromException(error); + } } if (waiting) @@ -181,7 +195,21 @@ namespace Cysharp.Threading.Tasks this.parent = parent; } - public override UniTask Completion => parent.completedTask.Task; + public override UniTask Completion + { + get + { + if (parent.completedTaskSource != null) return parent.completedTaskSource.Task; + + if (parent.closed) + { + return parent.completedTask; + } + + parent.completedTaskSource = new UniTaskCompletionSource(); + return parent.completedTaskSource.Task; + } + } public override bool TryRead(out T item) { @@ -196,11 +224,25 @@ namespace Cysharp.Threading.Tasks { if (parent.completionError != null) { - parent.completedTask.TrySetException(parent.completionError); + if (parent.completedTaskSource != null) + { + parent.completedTaskSource.TrySetException(parent.completionError); + } + else + { + parent.completedTask = UniTask.FromException(parent.completionError); + } } else { - parent.completedTask.TrySetResult(); + if (parent.completedTaskSource != null) + { + parent.completedTaskSource.TrySetResult(); + } + else + { + parent.completedTask = UniTask.CompletedTask; + } } } } diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/UniTaskCompletionSource.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/UniTaskCompletionSource.cs index cefc851..19b811c 100644 --- a/src/UniTask/Assets/Plugins/UniTask/Runtime/UniTaskCompletionSource.cs +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/UniTaskCompletionSource.cs @@ -97,6 +97,11 @@ namespace Cysharp.Threading.Tasks } } + internal void MarkHandled() + { + hasUnhandledError = false; + } + /// Completes with a successful result. /// The result. [DebuggerHidden] @@ -293,12 +298,12 @@ namespace Cysharp.Threading.Tasks } [DebuggerHidden] - [Conditional("UNITY_EDITOR")] internal void MarkHandled() { if (!handled) { handled = true; + core.MarkHandled(); TaskTracker.RemoveTracking(this); } } @@ -504,12 +509,12 @@ namespace Cysharp.Threading.Tasks } [DebuggerHidden] - [Conditional("UNITY_EDITOR")] internal void MarkHandled() { if (!handled) { handled = true; + core.MarkHandled(); TaskTracker.RemoveTracking(this); } } diff --git a/src/UniTask/Assets/Scenes/SandboxMain.cs b/src/UniTask/Assets/Scenes/SandboxMain.cs index 2d94ef3..40a8583 100644 --- a/src/UniTask/Assets/Scenes/SandboxMain.cs +++ b/src/UniTask/Assets/Scenes/SandboxMain.cs @@ -50,7 +50,62 @@ public static partial class UnityUIComponentExtensions +public class AsyncMessageBroker : IDisposable +{ + Channel channel; + List> asyncEvents; + public AsyncMessageBroker() + { + channel = Channel.CreateSingleConsumerUnbounded(); + asyncEvents = new List>(); + } + + async UniTaskVoid PublishAll() + { + await channel.Reader.ReadAllAsync().ForEachAwaitAsync(async x => + { + foreach (var item in asyncEvents) + { + await item.Invoke(x); + } + }); + } + + public void Publish(T value) + { + channel.Writer.TryWrite(value); + } + + public Subscription Subscribe(Func func) + { + asyncEvents.Add(func); + return new Subscription(this, func); + } + + public void Dispose() + { + channel.Writer.TryComplete(); + asyncEvents.Clear(); + } + + public readonly struct Subscription : IDisposable + { + readonly AsyncMessageBroker broker; + readonly Func func; + + public Subscription(AsyncMessageBroker broker, Func func) + { + this.broker = broker; + this.func = func; + } + + public void Dispose() + { + broker.asyncEvents.Remove(func); + } + } +} public class SandboxMain : MonoBehaviour @@ -141,150 +196,37 @@ public class SandboxMain : MonoBehaviour void Start() { - Application.SetStackTraceLogType(LogType.Error, StackTraceLogType.Full); - Application.SetStackTraceLogType(LogType.Exception, StackTraceLogType.Full); + var channel = Channel.CreateSingleConsumerUnbounded(); - var playerLoop = UnityEngine.LowLevel.PlayerLoop.GetCurrentPlayerLoop(); - //ShowPlayerLoop.DumpPlayerLoop("Current", playerLoop); + var reader = channel.Reader; + WaitForChannelAsync(reader, this.GetCancellationTokenOnDestroy()).Forget(); - RP1 = new AsyncReactiveProperty(999); + var writer = channel.Writer; - HogeAsync().Forget(); + writer.TryWrite(1); + writer.TryWrite(2); + writer.TryWrite(3); + writer.Complete(new Exception()); - RP1.Select(x => x * x).BindTo(text); - - - //Update2().Forget(); - - //RunStandardDelayAsync().Forget(); - - //for (int i = 0; i < 14; i++) - //{ - // TimingDump((PlayerLoopTiming)i).Forget(); - //} - - //StartCoroutine(CoroutineDump("yield WaitForEndOfFrame", new WaitForEndOfFrame())); - //StartCoroutine(CoroutineDump("yield WaitForFixedUpdate", new WaitForFixedUpdate())); - //StartCoroutine(CoroutineDump("yield null", null)); - - // ----- - - // RunJobAsync().Forget(); - - //ClickOnce().Forget(); - //ClickForever().Forget(); - - //var cor = UniTask.ToCoroutine(async () => - // { - // var job = new MyJob() { loopCount = 999, inOut = new NativeArray(1, Allocator.TempJob) }; - // JobHandle.ScheduleBatchedJobs(); - // await job.Schedule().WaitAsync(PlayerLoopTiming.Update); - // job.inOut.Dispose(); - // }); - - //StartCoroutine(cor); - - // UniTaskAsyncEnumerable.EveryUpdate(PlayerLoopTiming.FixedUpdate) - - - //await UniTask.Yield(PlayerLoopTiming.Update); - //Debug.Log("Start:" + Time.frameCount); - - //await UniTaskAsyncEnumerable.TimerFrame(3, 5, PlayerLoopTiming.PostLateUpdate) - // .Select(x => x) - // .Do(x => Debug.Log("DODODO")) - // .ForEachAsync(_ => - // { - // Debug.Log("Call:" + Time.frameCount); - // }, cancellationToken: this.GetCancellationTokenOnDestroy()); - - //try - //{ - // await this.GetAsyncUpdateTrigger().ForEachAsync(_ => - // { - // UnityEngine.Debug.Log("EveryUpdate:" + Time.frameCount); - // }); - //} - //catch (OperationCanceledException ex) - //{ - // UnityEngine.Debug.Log("END"); - //} - - - - CancellationTokenSource cts = new CancellationTokenSource(); - - //var trigger = this.GetAsyncUpdateTrigger(); - //Go(trigger, 1, cts.Token).Forget(); - //Go(trigger, 2, cts.Token).Forget(); - //Go(trigger, 3, cts.Token).Forget(); - //Go(trigger, 4, cts.Token).Forget(); - //Go(trigger, 5, cts.Token).Forget(); - - - Application.logMessageReceived += Application_logMessageReceived; - - - - // foo.Status.IsCanceled - - - // 5回クリックされるまで待つ、とか。 - //Debug.Log("Await start."); - - - - //await okButton.GetAsyncClickEventHandler().DisableAutoClose() - // .Select((_, clickCount) => clickCount + 1) - // .FirstAsync(x => x == 5); - - //Debug.Log("Click 5 times."); - - - - // await this.GetAsyncUpdateTrigger().UpdateAsAsyncEnumerable() - - - - - - - - - - - - - - - //ucs = new UniTaskCompletionSource(); - - //okButton.onClick.AddListener(async () => - //{ - // await InnerAsync(false); - //}); - - okButton.onClick.AddListener(() => - { - // FooAsync().Forget(); - - RP1.Value += 3; - - }); - - cancelButton.onClick.AddListener(() => - { - text.text = ""; - - // ucs.TrySetResult(); - - cts.Cancel(); - }); } - static void Foo(UniTask t) + async UniTaskVoid WaitForChannelAsync(ChannelReader reader, CancellationToken token) { + try + { + //var result1 = await reader.ReadAsync(token); + //Debug.Log(result1); + + await reader.ReadAllAsync().ForEachAsync(x => Debug.Log(x)/*, token*/); + + Debug.Log("done"); + } + catch (Exception ex) + { + Debug.Log("here"); + Debug.LogException(ex); + } } async UniTaskVoid Go(AsyncUpdateTrigger trigger, int i, CancellationToken ct)