Add IUniTaskAsyncEnumerable.Publish
parent
cc165a6897
commit
e31c87b8a8
|
@ -0,0 +1,78 @@
|
|||
using Cysharp.Threading.Tasks;
|
||||
using Cysharp.Threading.Tasks.Linq;
|
||||
using FluentAssertions;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Xunit;
|
||||
|
||||
namespace NetCoreTests.Linq
|
||||
{
|
||||
public class PublishTest
|
||||
{
|
||||
[Fact]
|
||||
public async Task Normal()
|
||||
{
|
||||
var rp = new AsyncReactiveProperty<int>(1);
|
||||
|
||||
var multicast = rp.Publish();
|
||||
|
||||
var a = multicast.ToArrayAsync();
|
||||
var b = multicast.Take(2).ToArrayAsync();
|
||||
|
||||
var disp = multicast.Connect();
|
||||
|
||||
rp.Value = 2;
|
||||
|
||||
(await b).Should().BeEquivalentTo(1, 2);
|
||||
|
||||
var c = multicast.ToArrayAsync();
|
||||
|
||||
rp.Value = 3;
|
||||
rp.Value = 4;
|
||||
rp.Value = 5;
|
||||
|
||||
rp.Dispose();
|
||||
|
||||
(await a).Should().BeEquivalentTo(1, 2, 3, 4, 5);
|
||||
(await c).Should().BeEquivalentTo(3, 4, 5);
|
||||
|
||||
disp.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Cancel()
|
||||
{
|
||||
var rp = new AsyncReactiveProperty<int>(1);
|
||||
|
||||
var multicast = rp.Publish();
|
||||
|
||||
var a = multicast.ToArrayAsync();
|
||||
var b = multicast.Take(2).ToArrayAsync();
|
||||
|
||||
var disp = multicast.Connect();
|
||||
|
||||
rp.Value = 2;
|
||||
|
||||
(await b).Should().BeEquivalentTo(1, 2);
|
||||
|
||||
var c = multicast.ToArrayAsync();
|
||||
|
||||
rp.Value = 3;
|
||||
|
||||
disp.Dispose();
|
||||
|
||||
rp.Value = 4;
|
||||
rp.Value = 5;
|
||||
|
||||
rp.Dispose();
|
||||
|
||||
await Assert.ThrowsAsync<OperationCanceledException>(async () => await a);
|
||||
await Assert.ThrowsAsync<OperationCanceledException>(async () => await c);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -144,6 +144,11 @@ namespace Cysharp.Threading.Tasks
|
|||
completionSource.TrySetResult(false);
|
||||
}
|
||||
|
||||
public void OnError(Exception ex)
|
||||
{
|
||||
completionSource.TrySetException(ex);
|
||||
}
|
||||
|
||||
static void CancellationCallback(object state)
|
||||
{
|
||||
var self = (Enumerator)state;
|
||||
|
|
|
@ -28,6 +28,12 @@ namespace Cysharp.Threading.Tasks
|
|||
IUniTaskOrderedAsyncEnumerable<TElement> CreateOrderedEnumerable<TKey>(Func<TElement, CancellationToken, UniTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending);
|
||||
}
|
||||
|
||||
public interface IConnectableUniTaskAsyncEnumerable<out T> : IUniTaskAsyncEnumerable<T>
|
||||
{
|
||||
IDisposable Connect();
|
||||
}
|
||||
|
||||
// don't use AsyncGrouping.
|
||||
//public interface IUniTaskAsyncGrouping<out TKey, out TElement> : IUniTaskAsyncEnumerable<TElement>
|
||||
//{
|
||||
// TKey Key { get; }
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
using Cysharp.Threading.Tasks.Internal;
|
||||
using System;
|
||||
using System.Threading;
|
||||
|
||||
namespace Cysharp.Threading.Tasks.Linq
|
||||
{
|
||||
public static partial class UniTaskAsyncEnumerable
|
||||
{
|
||||
public static IConnectableUniTaskAsyncEnumerable<TSource> Publish<TSource>(this IUniTaskAsyncEnumerable<TSource> source)
|
||||
{
|
||||
Error.ThrowArgumentNullException(source, nameof(source));
|
||||
|
||||
return new Publish<TSource>(source);
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class Publish<TSource> : IConnectableUniTaskAsyncEnumerable<TSource>
|
||||
{
|
||||
readonly IUniTaskAsyncEnumerable<TSource> source;
|
||||
readonly CancellationTokenSource cancellationTokenSource;
|
||||
|
||||
TriggerEvent<TSource> trigger;
|
||||
IUniTaskAsyncEnumerator<TSource> enumerator;
|
||||
IDisposable connectedDisposable;
|
||||
bool isCompleted;
|
||||
|
||||
public Publish(IUniTaskAsyncEnumerable<TSource> source)
|
||||
{
|
||||
this.source = source;
|
||||
this.cancellationTokenSource = new CancellationTokenSource();
|
||||
}
|
||||
|
||||
public IDisposable Connect()
|
||||
{
|
||||
if (connectedDisposable != null) return connectedDisposable;
|
||||
|
||||
if (enumerator == null)
|
||||
{
|
||||
enumerator = source.GetAsyncEnumerator(cancellationTokenSource.Token);
|
||||
}
|
||||
|
||||
ConsumeEnumerator().Forget();
|
||||
|
||||
connectedDisposable = new ConnectDisposable(cancellationTokenSource);
|
||||
return connectedDisposable;
|
||||
}
|
||||
|
||||
async UniTaskVoid ConsumeEnumerator()
|
||||
{
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
while (await enumerator.MoveNextAsync())
|
||||
{
|
||||
trigger.SetResult(enumerator.Current);
|
||||
}
|
||||
trigger.SetCompleted();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
trigger.SetError(ex);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
isCompleted = true;
|
||||
await enumerator.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
|
||||
{
|
||||
return new _Publish(this, cancellationToken);
|
||||
}
|
||||
|
||||
sealed class ConnectDisposable : IDisposable
|
||||
{
|
||||
readonly CancellationTokenSource cancellationTokenSource;
|
||||
|
||||
public ConnectDisposable(CancellationTokenSource cancellationTokenSource)
|
||||
{
|
||||
this.cancellationTokenSource = cancellationTokenSource;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
this.cancellationTokenSource.Cancel();
|
||||
}
|
||||
}
|
||||
|
||||
sealed class _Publish : MoveNextSource, IUniTaskAsyncEnumerator<TSource>, ITriggerHandler<TSource>
|
||||
{
|
||||
static readonly Action<object> CancelDelegate = OnCanceled;
|
||||
|
||||
readonly Publish<TSource> parent;
|
||||
CancellationToken cancellationToken;
|
||||
CancellationTokenRegistration cancellationTokenRegistration;
|
||||
bool isDisposed;
|
||||
|
||||
public _Publish(Publish<TSource> parent, CancellationToken cancellationToken)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested) return;
|
||||
|
||||
this.parent = parent;
|
||||
this.cancellationToken = cancellationToken;
|
||||
|
||||
if (cancellationToken.CanBeCanceled)
|
||||
{
|
||||
this.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(CancelDelegate, this);
|
||||
}
|
||||
|
||||
parent.trigger.Add(this);
|
||||
TaskTracker.TrackActiveTask(this, 3);
|
||||
}
|
||||
|
||||
public TSource Current { get; private set; }
|
||||
|
||||
public UniTask<bool> MoveNextAsync()
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
if (parent.isCompleted) return CompletedTasks.False;
|
||||
|
||||
completionSource.Reset();
|
||||
return new UniTask<bool>(this, completionSource.Version);
|
||||
}
|
||||
|
||||
static void OnCanceled(object state)
|
||||
{
|
||||
var self = (_Publish)state;
|
||||
self.completionSource.TrySetCanceled(self.cancellationToken);
|
||||
self.DisposeAsync().Forget();
|
||||
}
|
||||
|
||||
public UniTask DisposeAsync()
|
||||
{
|
||||
if (!isDisposed)
|
||||
{
|
||||
isDisposed = true;
|
||||
TaskTracker.RemoveTracking(this);
|
||||
cancellationTokenRegistration.Dispose();
|
||||
parent.trigger.Remove(this);
|
||||
}
|
||||
|
||||
return default;
|
||||
}
|
||||
|
||||
public void OnNext(TSource value)
|
||||
{
|
||||
Current = value;
|
||||
completionSource.TrySetResult(true);
|
||||
}
|
||||
|
||||
public void OnCanceled(CancellationToken cancellationToken)
|
||||
{
|
||||
completionSource.TrySetCanceled(cancellationToken);
|
||||
}
|
||||
|
||||
public void OnCompleted()
|
||||
{
|
||||
completionSource.TrySetResult(false);
|
||||
}
|
||||
|
||||
public void OnError(Exception ex)
|
||||
{
|
||||
completionSource.TrySetException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
fileFormatVersion: 2
|
||||
guid: 93c684d1e88c09d4e89b79437d97b810
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
|
@ -7,8 +7,9 @@ namespace Cysharp.Threading.Tasks
|
|||
public interface ITriggerHandler<T>
|
||||
{
|
||||
void OnNext(T value);
|
||||
void OnCanceled(CancellationToken cancellationToken);
|
||||
void OnError(Exception ex);
|
||||
void OnCompleted();
|
||||
void OnCanceled(CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
// be careful to use, itself is struct.
|
||||
|
@ -207,6 +208,67 @@ namespace Cysharp.Threading.Tasks
|
|||
}
|
||||
}
|
||||
|
||||
public void SetError(Exception exception)
|
||||
{
|
||||
isRunning = true;
|
||||
|
||||
if (singleHandler != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
singleHandler.OnError(exception);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
#if UNITY_2018_3_OR_NEWER
|
||||
UnityEngine.Debug.LogException(ex);
|
||||
#else
|
||||
Console.WriteLine(ex);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
if (handlers != null)
|
||||
{
|
||||
for (int i = 0; i < handlers.Length; i++)
|
||||
{
|
||||
if (handlers[i] != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
handlers[i].OnError(exception);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
handlers[i] = null;
|
||||
#if UNITY_2018_3_OR_NEWER
|
||||
UnityEngine.Debug.LogException(ex);
|
||||
#else
|
||||
Console.WriteLine(ex);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
isRunning = false;
|
||||
|
||||
if (waitHandler != null)
|
||||
{
|
||||
var h = waitHandler;
|
||||
waitHandler = null;
|
||||
Add(h);
|
||||
}
|
||||
|
||||
if (waitQueue != null)
|
||||
{
|
||||
while (waitQueue.Count != 0)
|
||||
{
|
||||
Add(waitQueue.Dequeue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Add(ITriggerHandler<T> handler)
|
||||
{
|
||||
if (isRunning)
|
||||
|
|
|
@ -89,6 +89,11 @@ namespace Cysharp.Threading.Tasks.Triggers
|
|||
completionSource.TrySetResult(false);
|
||||
}
|
||||
|
||||
public void OnError(Exception ex)
|
||||
{
|
||||
completionSource.TrySetException(ex);
|
||||
}
|
||||
|
||||
static void CancellationCallback(object state)
|
||||
{
|
||||
var self = (AsyncTriggerEnumerator)state;
|
||||
|
@ -273,6 +278,11 @@ namespace Cysharp.Threading.Tasks.Triggers
|
|||
core.TrySetCanceled(CancellationToken.None);
|
||||
}
|
||||
|
||||
void ITriggerHandler<T>.OnError(Exception ex)
|
||||
{
|
||||
core.TrySetException(ex);
|
||||
}
|
||||
|
||||
void IUniTaskSource.GetResult(short token)
|
||||
{
|
||||
((IUniTaskSource<T>)this).GetResult(token);
|
||||
|
|
|
@ -49,23 +49,15 @@ public static partial class UnityUIComponentExtensions
|
|||
public class AsyncMessageBroker<T> : IDisposable
|
||||
{
|
||||
Channel<T> channel;
|
||||
List<Func<T, UniTask>> asyncEvents;
|
||||
|
||||
IConnectableUniTaskAsyncEnumerable<T> multicastSource;
|
||||
IDisposable connection;
|
||||
|
||||
public AsyncMessageBroker()
|
||||
{
|
||||
channel = Channel.CreateSingleConsumerUnbounded<T>();
|
||||
asyncEvents = new List<Func<T, UniTask>>();
|
||||
}
|
||||
|
||||
async UniTaskVoid PublishAll()
|
||||
{
|
||||
await channel.Reader.ReadAllAsync().ForEachAwaitAsync(async x =>
|
||||
{
|
||||
foreach (var item in asyncEvents)
|
||||
{
|
||||
await item.Invoke(x);
|
||||
}
|
||||
});
|
||||
multicastSource = channel.Reader.ReadAllAsync().Publish();
|
||||
connection = multicastSource.Connect();
|
||||
}
|
||||
|
||||
public void Publish(T value)
|
||||
|
@ -73,33 +65,15 @@ public class AsyncMessageBroker<T> : IDisposable
|
|||
channel.Writer.TryWrite(value);
|
||||
}
|
||||
|
||||
public Subscription Subscribe(Func<T, UniTask> func)
|
||||
public IUniTaskAsyncEnumerable<T> Subscribe()
|
||||
{
|
||||
asyncEvents.Add(func);
|
||||
return new Subscription(this, func);
|
||||
return multicastSource;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
channel.Writer.TryComplete();
|
||||
asyncEvents.Clear();
|
||||
}
|
||||
|
||||
public readonly struct Subscription : IDisposable
|
||||
{
|
||||
readonly AsyncMessageBroker<T> broker;
|
||||
readonly Func<T, UniTask> func;
|
||||
|
||||
public Subscription(AsyncMessageBroker<T> broker, Func<T, UniTask> func)
|
||||
{
|
||||
this.broker = broker;
|
||||
this.func = func;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
broker.asyncEvents.Remove(func);
|
||||
}
|
||||
connection.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -205,9 +179,22 @@ public class SandboxMain : MonoBehaviour
|
|||
//await channel.Reader.ReadAllAsync(this.GetCancellationTokenOnDestroy()).ForEachAsync(_ => { });
|
||||
|
||||
|
||||
var rp = new AsyncReactiveProperty<int>(10);
|
||||
var pubsub = new AsyncMessageBroker<int>();
|
||||
|
||||
pubsub.Subscribe().ForEachAsync(x => Debug.Log("A:" + x)).Forget();
|
||||
pubsub.Subscribe().ForEachAsync(x => Debug.Log("B:" + x)).Forget();
|
||||
|
||||
|
||||
int i = 0;
|
||||
okButton.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
|
||||
{
|
||||
|
||||
Debug.Log("foo");
|
||||
pubsub.Publish(i++);
|
||||
|
||||
|
||||
}).Forget();
|
||||
|
||||
rp.Append(10).Select(x => x * 100).Take(30).Prepend(99).SkipLast(9).Where(x => x % 2 == 0).ForEachAsync(_ => { }).Forget();
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue