Merge pull request #139 from Cysharp/feature/SubscribeAwait

Add UniTaskAsyncEnumerable.SubscribeAwait
master
Yoshifumi Kawai 2020-08-25 07:12:00 +09:00 committed by GitHub
commit a2cbbd82d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 195 additions and 0 deletions

View File

@ -63,6 +63,42 @@ namespace Cysharp.Threading.Tasks.Linq
Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
} }
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
}
// OnNext, OnError // OnNext, OnError
public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError) public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
@ -105,6 +141,46 @@ namespace Cysharp.Threading.Tasks.Linq
Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
} }
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onError, nameof(onError));
Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
}
// OnNext, OnCompleted // OnNext, OnCompleted
public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action onCompleted) public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action onCompleted)
@ -147,6 +223,46 @@ namespace Cysharp.Threading.Tasks.Linq
Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
} }
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action onCompleted)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action onCompleted, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
}
public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action onCompleted)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
var cts = new CancellationTokenDisposable();
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget();
return cts;
}
public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action onCompleted, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(onNext, nameof(onNext));
Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
}
// IObserver // IObserver
public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, IObserver<TSource> observer) public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, IObserver<TSource> observer)
@ -337,5 +453,84 @@ namespace Cysharp.Threading.Tasks.Linq
} }
} }
} }
public static async UniTaskVoid SubscribeAwaitCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
while (await e.MoveNextAsync())
{
try
{
await onNext(e.Current);
}
catch (Exception ex)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
}
}
onCompleted();
}
catch (Exception ex)
{
if (onError == NopError)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
return;
}
if (ex is OperationCanceledException) return;
onError(ex);
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
public static async UniTaskVoid SubscribeAwaitCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
while (await e.MoveNextAsync())
{
try
{
await onNext(e.Current, cancellationToken);
}
catch (Exception ex)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
}
}
onCompleted();
}
catch (Exception ex)
{
if (onError == NopError)
{
UniTaskScheduler.PublishUnobservedTaskException(ex);
return;
}
if (ex is OperationCanceledException) return;
onError(ex);
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
} }
} }