Add (I/ReadOnly)AsyncReactiveProperty.WaitAsync
parent
5d8e0e61ad
commit
4898e4c7bf
|
@ -112,6 +112,85 @@ namespace NetCoreTests
|
||||||
state.Value.Should().Be(20);
|
state.Value.Should().Be(20);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WaitAsyncTest()
|
||||||
|
{
|
||||||
|
var rp = new AsyncReactiveProperty<int>(128);
|
||||||
|
|
||||||
|
var f = await rp.FirstAsync();
|
||||||
|
f.Should().Be(128);
|
||||||
|
|
||||||
|
{
|
||||||
|
var t = rp.WaitAsync();
|
||||||
|
rp.Value = 99;
|
||||||
|
rp.Value = 100;
|
||||||
|
var v = await t;
|
||||||
|
|
||||||
|
v.Should().Be(99);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
var t = rp.WaitAsync();
|
||||||
|
rp.Value = 99;
|
||||||
|
rp.Value = 100;
|
||||||
|
var v = await t;
|
||||||
|
|
||||||
|
v.Should().Be(99);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WaitAsyncCancellationTest()
|
||||||
|
{
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
|
var rp = new AsyncReactiveProperty<int>(128);
|
||||||
|
|
||||||
|
var t = rp.WaitAsync(cts.Token);
|
||||||
|
|
||||||
|
cts.Cancel();
|
||||||
|
|
||||||
|
rp.Value = 99;
|
||||||
|
rp.Value = 100;
|
||||||
|
|
||||||
|
await Assert.ThrowsAsync<OperationCanceledException>(async () => { await t; });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadOnlyWaitAsyncTest()
|
||||||
|
{
|
||||||
|
var rp = new AsyncReactiveProperty<int>(128);
|
||||||
|
var rrp = rp.ToReadOnlyAsyncReactiveProperty(CancellationToken.None);
|
||||||
|
|
||||||
|
var t = rrp.WaitAsync();
|
||||||
|
rp.Value = 99;
|
||||||
|
rp.Value = 100;
|
||||||
|
var v = await t;
|
||||||
|
|
||||||
|
v.Should().Be(99);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadOnlyWaitAsyncCancellationTest()
|
||||||
|
{
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
|
var rp = new AsyncReactiveProperty<int>(128);
|
||||||
|
var rrp = rp.ToReadOnlyAsyncReactiveProperty(CancellationToken.None);
|
||||||
|
|
||||||
|
var t = rrp.WaitAsync(cts.Token);
|
||||||
|
|
||||||
|
cts.Cancel();
|
||||||
|
|
||||||
|
rp.Value = 99;
|
||||||
|
rp.Value = 100;
|
||||||
|
|
||||||
|
await Assert.ThrowsAsync<OperationCanceledException>(async () => { await t; });
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ namespace Cysharp.Threading.Tasks
|
||||||
{
|
{
|
||||||
T Value { get; }
|
T Value { get; }
|
||||||
IUniTaskAsyncEnumerable<T> WithoutCurrent();
|
IUniTaskAsyncEnumerable<T> WithoutCurrent();
|
||||||
|
UniTask<T> WaitAsync(CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface IAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>
|
public interface IAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>
|
||||||
|
@ -69,6 +70,11 @@ namespace Cysharp.Threading.Tasks
|
||||||
return latestValue?.ToString();
|
return latestValue?.ToString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public UniTask<T> WaitAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token);
|
||||||
|
}
|
||||||
|
|
||||||
static bool isValueType;
|
static bool isValueType;
|
||||||
|
|
||||||
static AsyncReactiveProperty()
|
static AsyncReactiveProperty()
|
||||||
|
@ -76,7 +82,143 @@ namespace Cysharp.Threading.Tasks
|
||||||
isValueType = typeof(T).IsValueType;
|
isValueType = typeof(T).IsValueType;
|
||||||
}
|
}
|
||||||
|
|
||||||
class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
|
sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource>
|
||||||
|
{
|
||||||
|
static Action<object> cancellationCallback = CancellationCallback;
|
||||||
|
|
||||||
|
static TaskPool<WaitAsyncSource> pool;
|
||||||
|
WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode { get; set; }
|
||||||
|
|
||||||
|
static WaitAsyncSource()
|
||||||
|
{
|
||||||
|
TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size);
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncReactiveProperty<T> parent;
|
||||||
|
CancellationToken cancellationToken;
|
||||||
|
CancellationTokenRegistration cancellationTokenRegistration;
|
||||||
|
UniTaskCompletionSourceCore<T> core;
|
||||||
|
|
||||||
|
WaitAsyncSource()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public static IUniTaskSource<T> Create(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token)
|
||||||
|
{
|
||||||
|
if (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pool.TryPop(out var result))
|
||||||
|
{
|
||||||
|
result = new WaitAsyncSource();
|
||||||
|
}
|
||||||
|
|
||||||
|
result.parent = parent;
|
||||||
|
result.cancellationToken = cancellationToken;
|
||||||
|
|
||||||
|
if (cancellationToken.CanBeCanceled)
|
||||||
|
{
|
||||||
|
result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
result.parent.triggerEvent.Add(result);
|
||||||
|
|
||||||
|
TaskTracker.TrackActiveTask(result, 3);
|
||||||
|
|
||||||
|
token = result.core.Version;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool TryReturn()
|
||||||
|
{
|
||||||
|
TaskTracker.RemoveTracking(this);
|
||||||
|
core.Reset();
|
||||||
|
cancellationTokenRegistration.Dispose();
|
||||||
|
cancellationTokenRegistration = default;
|
||||||
|
parent.triggerEvent.Remove(this);
|
||||||
|
parent = null;
|
||||||
|
cancellationToken = default;
|
||||||
|
return pool.TryPush(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
~WaitAsyncSource()
|
||||||
|
{
|
||||||
|
if (TryReturn())
|
||||||
|
{
|
||||||
|
GC.ReRegisterForFinalize(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void CancellationCallback(object state)
|
||||||
|
{
|
||||||
|
var self = (WaitAsyncSource)state;
|
||||||
|
self.OnCanceled(self.cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
// IUniTaskSource
|
||||||
|
|
||||||
|
public T GetResult(short token)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return core.GetResult(token);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
TryReturn();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void IUniTaskSource.GetResult(short token)
|
||||||
|
{
|
||||||
|
GetResult(token);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnCompleted(Action<object> continuation, object state, short token)
|
||||||
|
{
|
||||||
|
core.OnCompleted(continuation, state, token);
|
||||||
|
}
|
||||||
|
|
||||||
|
public UniTaskStatus GetStatus(short token)
|
||||||
|
{
|
||||||
|
return core.GetStatus(token);
|
||||||
|
}
|
||||||
|
|
||||||
|
public UniTaskStatus UnsafeGetStatus()
|
||||||
|
{
|
||||||
|
return core.UnsafeGetStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ITriggerHandler
|
||||||
|
|
||||||
|
ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
|
||||||
|
ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
|
||||||
|
|
||||||
|
public void OnCanceled(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
core.TrySetCanceled(cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnCompleted()
|
||||||
|
{
|
||||||
|
// Complete as Cancel.
|
||||||
|
core.TrySetCanceled(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnError(Exception ex)
|
||||||
|
{
|
||||||
|
core.TrySetException(ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnNext(T value)
|
||||||
|
{
|
||||||
|
core.TrySetResult(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
|
||||||
{
|
{
|
||||||
readonly AsyncReactiveProperty<T> parent;
|
readonly AsyncReactiveProperty<T> parent;
|
||||||
|
|
||||||
|
@ -253,6 +395,11 @@ namespace Cysharp.Threading.Tasks
|
||||||
return latestValue?.ToString();
|
return latestValue?.ToString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public UniTask<T> WaitAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token);
|
||||||
|
}
|
||||||
|
|
||||||
static bool isValueType;
|
static bool isValueType;
|
||||||
|
|
||||||
static ReadOnlyAsyncReactiveProperty()
|
static ReadOnlyAsyncReactiveProperty()
|
||||||
|
@ -260,7 +407,143 @@ namespace Cysharp.Threading.Tasks
|
||||||
isValueType = typeof(T).IsValueType;
|
isValueType = typeof(T).IsValueType;
|
||||||
}
|
}
|
||||||
|
|
||||||
class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
|
sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource>
|
||||||
|
{
|
||||||
|
static Action<object> cancellationCallback = CancellationCallback;
|
||||||
|
|
||||||
|
static TaskPool<WaitAsyncSource> pool;
|
||||||
|
WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode { get; set; }
|
||||||
|
|
||||||
|
static WaitAsyncSource()
|
||||||
|
{
|
||||||
|
TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size);
|
||||||
|
}
|
||||||
|
|
||||||
|
ReadOnlyAsyncReactiveProperty<T> parent;
|
||||||
|
CancellationToken cancellationToken;
|
||||||
|
CancellationTokenRegistration cancellationTokenRegistration;
|
||||||
|
UniTaskCompletionSourceCore<T> core;
|
||||||
|
|
||||||
|
WaitAsyncSource()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public static IUniTaskSource<T> Create(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token)
|
||||||
|
{
|
||||||
|
if (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pool.TryPop(out var result))
|
||||||
|
{
|
||||||
|
result = new WaitAsyncSource();
|
||||||
|
}
|
||||||
|
|
||||||
|
result.parent = parent;
|
||||||
|
result.cancellationToken = cancellationToken;
|
||||||
|
|
||||||
|
if (cancellationToken.CanBeCanceled)
|
||||||
|
{
|
||||||
|
result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
result.parent.triggerEvent.Add(result);
|
||||||
|
|
||||||
|
TaskTracker.TrackActiveTask(result, 3);
|
||||||
|
|
||||||
|
token = result.core.Version;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool TryReturn()
|
||||||
|
{
|
||||||
|
TaskTracker.RemoveTracking(this);
|
||||||
|
core.Reset();
|
||||||
|
cancellationTokenRegistration.Dispose();
|
||||||
|
cancellationTokenRegistration = default;
|
||||||
|
parent.triggerEvent.Remove(this);
|
||||||
|
parent = null;
|
||||||
|
cancellationToken = default;
|
||||||
|
return pool.TryPush(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
~WaitAsyncSource()
|
||||||
|
{
|
||||||
|
if (TryReturn())
|
||||||
|
{
|
||||||
|
GC.ReRegisterForFinalize(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void CancellationCallback(object state)
|
||||||
|
{
|
||||||
|
var self = (WaitAsyncSource)state;
|
||||||
|
self.OnCanceled(self.cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
// IUniTaskSource
|
||||||
|
|
||||||
|
public T GetResult(short token)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return core.GetResult(token);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
TryReturn();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void IUniTaskSource.GetResult(short token)
|
||||||
|
{
|
||||||
|
GetResult(token);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnCompleted(Action<object> continuation, object state, short token)
|
||||||
|
{
|
||||||
|
core.OnCompleted(continuation, state, token);
|
||||||
|
}
|
||||||
|
|
||||||
|
public UniTaskStatus GetStatus(short token)
|
||||||
|
{
|
||||||
|
return core.GetStatus(token);
|
||||||
|
}
|
||||||
|
|
||||||
|
public UniTaskStatus UnsafeGetStatus()
|
||||||
|
{
|
||||||
|
return core.UnsafeGetStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ITriggerHandler
|
||||||
|
|
||||||
|
ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
|
||||||
|
ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
|
||||||
|
|
||||||
|
public void OnCanceled(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
core.TrySetCanceled(cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnCompleted()
|
||||||
|
{
|
||||||
|
// Complete as Cancel.
|
||||||
|
core.TrySetCanceled(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnError(Exception ex)
|
||||||
|
{
|
||||||
|
core.TrySetException(ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnNext(T value)
|
||||||
|
{
|
||||||
|
core.TrySetResult(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
|
||||||
{
|
{
|
||||||
readonly ReadOnlyAsyncReactiveProperty<T> parent;
|
readonly ReadOnlyAsyncReactiveProperty<T> parent;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue