diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/CancellationTokenExtensions.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/CancellationTokenExtensions.cs index c139acc..3f3a532 100644 --- a/src/UniTask/Assets/Plugins/UniTask/Runtime/CancellationTokenExtensions.cs +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/CancellationTokenExtensions.cs @@ -30,10 +30,10 @@ namespace Cysharp.Threading.Tasks return ToCancellationToken(task); } - var cts = new CancellationTokenSource(); + var cts = CancellationTokenSource.CreateLinkedTokenSource(linkToken); ToCancellationTokenCore(task, cts).Forget(); - return CancellationTokenSource.CreateLinkedTokenSource(linkToken).Token; + return cts.Token; } public static CancellationToken ToCancellationToken(this UniTask task) diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/External/TextMeshPro/TextMeshProAsyncExtensions.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/External/TextMeshPro/TextMeshProAsyncExtensions.cs index b833624..362aa83 100644 --- a/src/UniTask/Assets/Plugins/UniTask/Runtime/External/TextMeshPro/TextMeshProAsyncExtensions.cs +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/External/TextMeshPro/TextMeshProAsyncExtensions.cs @@ -9,6 +9,7 @@ namespace Cysharp.Threading.Tasks { public static partial class TextMeshProAsyncExtensions { + // -> Text public static void BindTo(this IUniTaskAsyncEnumerable source, TMP_Text text, bool rebindOnError = true) { BindToCore(source, text, text.GetCancellationTokenOnDestroy(), rebindOnError).Forget(); @@ -62,6 +63,67 @@ namespace Cysharp.Threading.Tasks } } } + + // -> Text + + public static void BindTo(this IUniTaskAsyncEnumerable source, TMP_Text text, bool rebindOnError = true) + { + BindToCore(source, text, text.GetCancellationTokenOnDestroy(), rebindOnError).Forget(); + } + + public static void BindTo(this IUniTaskAsyncEnumerable source, TMP_Text text, CancellationToken cancellationToken, bool rebindOnError = true) + { + BindToCore(source, text, cancellationToken, rebindOnError).Forget(); + } + + public static void BindTo(this AsyncReactiveProperty source, TMP_Text text, bool rebindOnError = true) + { + BindToCore(source, text, text.GetCancellationTokenOnDestroy(), rebindOnError).Forget(); + } + + static async UniTaskVoid BindToCore(IUniTaskAsyncEnumerable source, TMP_Text text, CancellationToken cancellationToken, bool rebindOnError) + { + var repeat = false; + BIND_AGAIN: + var e = source.GetAsyncEnumerator(cancellationToken); + try + { + while (true) + { + bool moveNext; + try + { + moveNext = await e.MoveNextAsync(); + repeat = false; + } + catch (Exception ex) + { + if (ex is OperationCanceledException) return; + + if (rebindOnError && !repeat) + { + repeat = true; + goto BIND_AGAIN; + } + else + { + throw; + } + } + + if (!moveNext) return; + + text.text = e.Current.ToString(); + } + } + finally + { + if (e != null) + { + await e.DisposeAsync(); + } + } + } } } diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs index 9643b06..0785bc2 100644 --- a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs @@ -63,6 +63,42 @@ namespace Cysharp.Threading.Tasks.Linq Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); } + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + // OnNext, OnError public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onError) @@ -105,6 +141,46 @@ namespace Cysharp.Threading.Tasks.Linq Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); } + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + // OnNext, OnCompleted public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onCompleted) @@ -147,6 +223,46 @@ namespace Cysharp.Threading.Tasks.Linq Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); } + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); + } + + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); + } + // IObserver public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, IObserver observer) @@ -195,7 +311,14 @@ namespace Cysharp.Threading.Tasks.Linq { while (await e.MoveNextAsync()) { - onNext(e.Current); + try + { + onNext(e.Current); + } + catch (Exception ex) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + } } onCompleted(); } @@ -227,7 +350,14 @@ namespace Cysharp.Threading.Tasks.Linq { while (await e.MoveNextAsync()) { - onNext(e.Current).Forget(); + try + { + onNext(e.Current).Forget(); + } + catch (Exception ex) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + } } onCompleted(); } @@ -259,7 +389,14 @@ namespace Cysharp.Threading.Tasks.Linq { while (await e.MoveNextAsync()) { - onNext(e.Current, cancellationToken).Forget(); + try + { + onNext(e.Current, cancellationToken).Forget(); + } + catch (Exception ex) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + } } onCompleted(); } @@ -291,7 +428,14 @@ namespace Cysharp.Threading.Tasks.Linq { while (await e.MoveNextAsync()) { - observer.OnNext(e.Current); + try + { + observer.OnNext(e.Current); + } + catch (Exception ex) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + } } observer.OnCompleted(); } @@ -309,5 +453,84 @@ namespace Cysharp.Threading.Tasks.Linq } } } + + public static async UniTaskVoid SubscribeAwaitCore(IUniTaskAsyncEnumerable source, Func onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) + { + var e = source.GetAsyncEnumerator(cancellationToken); + try + { + while (await e.MoveNextAsync()) + { + try + { + await onNext(e.Current); + } + catch (Exception ex) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + } + } + onCompleted(); + } + catch (Exception ex) + { + if (onError == NopError) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + return; + } + + if (ex is OperationCanceledException) return; + + onError(ex); + } + finally + { + if (e != null) + { + await e.DisposeAsync(); + } + } + } + + public static async UniTaskVoid SubscribeAwaitCore(IUniTaskAsyncEnumerable source, Func onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) + { + var e = source.GetAsyncEnumerator(cancellationToken); + try + { + while (await e.MoveNextAsync()) + { + try + { + await onNext(e.Current, cancellationToken); + } + catch (Exception ex) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + } + } + onCompleted(); + } + catch (Exception ex) + { + if (onError == NopError) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + return; + } + + if (ex is OperationCanceledException) return; + + onError(ex); + } + finally + { + if (e != null) + { + await e.DisposeAsync(); + } + } + } + } } \ No newline at end of file