Impl AsyncReactiveProperty.ToString, Add State

master
neuecc 2020-05-21 02:22:24 +09:00
parent 36d53a3bcb
commit 6d7e6ec871
2 changed files with 307 additions and 1 deletions

View File

@ -51,6 +51,67 @@ namespace NetCoreTests
ar.Should().BeEquivalentTo(new[] { 100, 100, 100, 131, 191 });
}
[Fact]
public async Task StateIteration()
{
var rp = new State<int>(99);
var setter = rp.GetSetter();
var f = await rp.FirstAsync();
f.Should().Be(99);
var array = rp.Take(5).ToArrayAsync();
setter(100);
setter(100);
setter(100);
setter(131);
var ar = await array;
ar.Should().BeEquivalentTo(new[] { 99, 100, 100, 100, 131 });
}
[Fact]
public async Task StateWithoutCurrent()
{
var rp = new State<int>(99);
var setter = rp.GetSetter();
var array = rp.WithoutCurrent().Take(5).ToArrayAsync();
setter(100);
setter(100);
setter(100);
setter(131);
setter(191);
var ar = await array;
ar.Should().BeEquivalentTo(new[] { 100, 100, 100, 131, 191 });
}
[Fact]
public void StateFromEnumeration()
{
var rp = new AsyncReactiveProperty<int>(10);
var state = rp.ToState(CancellationToken.None);
rp.Value = 10;
state.Value.Should().Be(10);
rp.Value = 20;
state.Value.Should().Be(20);
state.Dispose();
rp.Value = 30;
state.Value.Should().Be(20);
}
}

View File

@ -59,6 +59,24 @@ namespace Cysharp.Threading.Tasks
triggerEvent.SetCompleted();
}
public static implicit operator T(AsyncReactiveProperty<T> value)
{
return value.Value;
}
public override string ToString()
{
if (isValueType) return latestValue.ToString();
return latestValue?.ToString();
}
static bool isValueType;
static AsyncReactiveProperty()
{
isValueType = typeof(T).IsValueType;
}
class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
{
readonly AsyncReactiveProperty<T> parent;
@ -156,4 +174,231 @@ namespace Cysharp.Threading.Tasks
}
}
}
}
public class State<T> : IReadOnlyAsyncReactiveProperty<T>, IDisposable
{
TriggerEvent<T> triggerEvent;
T latestValue;
Action<T> setter;
IUniTaskAsyncEnumerator<T> enumerator;
public T Value
{
get
{
return latestValue;
}
}
public State(T value)
{
this.latestValue = value;
this.triggerEvent = default;
}
public State(T initialValue, IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
latestValue = initialValue;
ConsumeEnumerator(source, cancellationToken).Forget();
}
public State(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
ConsumeEnumerator(source, cancellationToken).Forget();
}
async UniTaskVoid ConsumeEnumerator(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
enumerator = source.GetAsyncEnumerator(cancellationToken);
try
{
while (await enumerator.MoveNextAsync())
{
SetValue(enumerator.Current);
}
}
finally
{
await enumerator.DisposeAsync();
enumerator = null;
}
}
public Action<T> GetSetter()
{
if (enumerator != null)
{
throw new InvalidOperationException("Can not get setter when create from IUniTaskAsyncEnumerable source.");
}
if (setter != null)
{
throw new InvalidOperationException("GetSetter can only call once.");
}
setter = SetValue;
return setter;
}
void SetValue(T value)
{
this.latestValue = value;
triggerEvent.SetResult(value);
}
public IUniTaskAsyncEnumerable<T> WithoutCurrent()
{
return new WithoutCurrentEnumerable(this);
}
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
{
return new Enumerator(this, cancellationToken, true);
}
public void Dispose()
{
if (enumerator != null)
{
enumerator.DisposeAsync().Forget();
}
triggerEvent.SetCompleted();
}
public static implicit operator State<T>(T value)
{
return new State<T>(value);
}
public static implicit operator T(State<T> value)
{
return value.Value;
}
public override string ToString()
{
if (isValueType) return latestValue.ToString();
return latestValue?.ToString();
}
static bool isValueType;
static State()
{
isValueType = typeof(T).IsValueType;
}
class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
{
readonly State<T> parent;
public WithoutCurrentEnumerable(State<T> parent)
{
this.parent = parent;
}
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new Enumerator(parent, cancellationToken, false);
}
}
sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
{
static Action<object> cancellationCallback = CancellationCallback;
readonly State<T> parent;
readonly CancellationToken cancellationToken;
readonly CancellationTokenRegistration cancellationTokenRegistration;
T value;
bool isDisposed;
bool firstCall;
public Enumerator(State<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
{
this.parent = parent;
this.cancellationToken = cancellationToken;
this.firstCall = publishCurrentValue;
parent.triggerEvent.Add(this);
TaskTracker.TrackActiveTask(this, 3);
if (cancellationToken.CanBeCanceled)
{
cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this);
}
}
public T Current => value;
public UniTask<bool> MoveNextAsync()
{
// raise latest value on first call.
if (firstCall)
{
firstCall = false;
value = parent.Value;
return CompletedTasks.True;
}
completionSource.Reset();
return new UniTask<bool>(this, completionSource.Version);
}
public UniTask DisposeAsync()
{
if (!isDisposed)
{
isDisposed = true;
TaskTracker.RemoveTracking(this);
completionSource.TrySetCanceled(cancellationToken);
parent.triggerEvent.Remove(this);
}
return default;
}
public void OnNext(T value)
{
this.value = value;
completionSource.TrySetResult(true);
}
public void OnCanceled(CancellationToken cancellationToken)
{
DisposeAsync().Forget();
}
public void OnCompleted()
{
completionSource.TrySetResult(false);
}
public void OnError(Exception ex)
{
completionSource.TrySetException(ex);
}
static void CancellationCallback(object state)
{
var self = (Enumerator)state;
self.DisposeAsync().Forget();
}
}
}
public static class StateExtensions
{
public static State<T> ToState<T>(this IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
return new State<T>(source, cancellationToken);
}
public static State<T> ToState<T>(this IUniTaskAsyncEnumerable<T> source, T initialValue, CancellationToken cancellationToken)
{
return new State<T>(initialValue, source, cancellationToken);
}
}
}