IAsyncReadOnlyReactiveProperty -> IReadOnlyAsyncReactiveProperty, .Dipose retrurns MoveNext -> false

master
neuecc 2020-05-19 01:20:20 +09:00
parent 7a306118f5
commit fb1152d8f4
2 changed files with 25 additions and 17 deletions

View File

@ -4,13 +4,13 @@ using System.Threading;
namespace Cysharp.Threading.Tasks namespace Cysharp.Threading.Tasks
{ {
public interface IAsyncReadOnlyReactiveProperty<T> : IUniTaskAsyncEnumerable<T> public interface IReadOnlyAsyncReactiveProperty<T> : IUniTaskAsyncEnumerable<T>
{ {
T Value { get; } T Value { get; }
IUniTaskAsyncEnumerable<T> WithoutCurrent(); IUniTaskAsyncEnumerable<T> WithoutCurrent();
} }
public interface IAsyncReactiveProperty<T> : IAsyncReadOnlyReactiveProperty<T> public interface IAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>
{ {
new T Value { get; set; } new T Value { get; set; }
} }
@ -56,7 +56,7 @@ namespace Cysharp.Threading.Tasks
public void Dispose() public void Dispose()
{ {
triggerEvent.SetCanceled(CancellationToken.None); triggerEvent.SetCompleted();
} }
class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T> class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
@ -74,7 +74,7 @@ namespace Cysharp.Threading.Tasks
} }
} }
sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, IResolveCancelPromise<T> sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
{ {
static Action<object> cancellationCallback = CancellationCallback; static Action<object> cancellationCallback = CancellationCallback;
@ -128,17 +128,20 @@ namespace Cysharp.Threading.Tasks
return default; return default;
} }
public bool TrySetResult(T value) public void OnNext(T value)
{ {
this.value = value; this.value = value;
completionSource.TrySetResult(true); completionSource.TrySetResult(true);
return true;
} }
public bool TrySetCanceled(CancellationToken cancellationToken = default) public void OnCanceled(CancellationToken cancellationToken)
{ {
DisposeAsync().Forget(); DisposeAsync().Forget();
return true; }
public void OnCompleted()
{
completionSource.TrySetResult(false);
} }
static void CancellationCallback(object state) static void CancellationCallback(object state)

View File

@ -194,23 +194,28 @@ public class SandboxMain : MonoBehaviour
Debug.Log("Done"); Debug.Log("Done");
} }
void Start() async void Start()
{ {
var channel = Channel.CreateSingleConsumerUnbounded<int>(); //var rp = new AsyncReactiveProperty<int>(10);
var reader = channel.Reader; //Running(rp).Forget();
WaitForChannelAsync(reader, this.GetCancellationTokenOnDestroy()).Forget(); //await UniTaskAsyncEnumerable.EveryUpdate().Take(10).ForEachAsync((x, i) => rp.Value = i);
var writer = channel.Writer; //rp.Dispose();
writer.TryWrite(1); await this.GetAsyncUpdateTrigger().ForEachAsync(x => Debug.Log("yeah"));
writer.TryWrite(2); Debug.Log("DONE");
writer.TryWrite(3);
writer.Complete(new Exception());
} }
async UniTaskVoid Running(IReadOnlyAsyncReactiveProperty<int> rp)
{
Debug.Log("BEGIN");
await rp.ForEachAsync(x => Debug.Log("AR:" + x));
Debug.Log("DONE");
}
async UniTaskVoid WaitForChannelAsync(ChannelReader<int> reader, CancellationToken token) async UniTaskVoid WaitForChannelAsync(ChannelReader<int> reader, CancellationToken token)
{ {
try try