diff --git a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs index f92378b..0785bc2 100644 --- a/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs +++ b/src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Subscribe.cs @@ -63,6 +63,42 @@ namespace Cysharp.Threading.Tasks.Linq Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); } + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + // OnNext, OnError public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onError) @@ -105,6 +141,46 @@ namespace Cysharp.Threading.Tasks.Linq Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); } + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onError, nameof(onError)); + + Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); + } + // OnNext, OnCompleted public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onCompleted) @@ -147,6 +223,46 @@ namespace Cysharp.Threading.Tasks.Linq Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); } + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); + } + + public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + var cts = new CancellationTokenDisposable(); + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget(); + return cts; + } + + public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted, CancellationToken cancellationToken) + { + Error.ThrowArgumentNullException(source, nameof(source)); + Error.ThrowArgumentNullException(onNext, nameof(onNext)); + Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); + + Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); + } + // IObserver public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, IObserver observer) @@ -337,5 +453,84 @@ namespace Cysharp.Threading.Tasks.Linq } } } + + public static async UniTaskVoid SubscribeAwaitCore(IUniTaskAsyncEnumerable source, Func onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) + { + var e = source.GetAsyncEnumerator(cancellationToken); + try + { + while (await e.MoveNextAsync()) + { + try + { + await onNext(e.Current); + } + catch (Exception ex) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + } + } + onCompleted(); + } + catch (Exception ex) + { + if (onError == NopError) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + return; + } + + if (ex is OperationCanceledException) return; + + onError(ex); + } + finally + { + if (e != null) + { + await e.DisposeAsync(); + } + } + } + + public static async UniTaskVoid SubscribeAwaitCore(IUniTaskAsyncEnumerable source, Func onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) + { + var e = source.GetAsyncEnumerator(cancellationToken); + try + { + while (await e.MoveNextAsync()) + { + try + { + await onNext(e.Current, cancellationToken); + } + catch (Exception ex) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + } + } + onCompleted(); + } + catch (Exception ex) + { + if (onError == NopError) + { + UniTaskScheduler.PublishUnobservedTaskException(ex); + return; + } + + if (ex is OperationCanceledException) return; + + onError(ex); + } + finally + { + if (e != null) + { + await e.DisposeAsync(); + } + } + } + } } \ No newline at end of file