Add TaskTracker to Channel

master
neuecc 2020-05-19 03:43:06 +09:00
parent f07527cd06
commit ec7064083a
2 changed files with 11 additions and 11 deletions

View File

@ -193,6 +193,8 @@ namespace Cysharp.Threading.Tasks
public SingleConsumerUnboundedChannelReader(SingleConsumerUnboundedChannel<T> parent) public SingleConsumerUnboundedChannelReader(SingleConsumerUnboundedChannel<T> parent)
{ {
this.parent = parent; this.parent = parent;
TaskTracker.TrackActiveTask(this, 4);
} }
public override UniTask Completion public override UniTask Completion
@ -304,6 +306,7 @@ namespace Cysharp.Threading.Tasks
public void SingalCancellation(CancellationToken cancellationToken) public void SingalCancellation(CancellationToken cancellationToken)
{ {
TaskTracker.RemoveTracking(this);
core.TrySetCanceled(cancellationToken); core.TrySetCanceled(cancellationToken);
} }
@ -311,10 +314,12 @@ namespace Cysharp.Threading.Tasks
{ {
if (error != null) if (error != null)
{ {
TaskTracker.RemoveTracking(this);
core.TrySetException(error); core.TrySetException(error);
} }
else else
{ {
TaskTracker.RemoveTracking(this);
core.TrySetResult(false); core.TrySetResult(false);
} }
} }

View File

@ -1,20 +1,16 @@
using System; using Cysharp.Threading.Tasks;
using Cysharp.Threading.Tasks.Linq; using Cysharp.Threading.Tasks.Linq;
using Cysharp.Threading.Tasks.Triggers; using Cysharp.Threading.Tasks.Triggers;
using System;
using System.Collections; using System.Collections;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Cysharp.Threading.Tasks;
using Unity.Collections; using Unity.Collections;
using Unity.Jobs; using Unity.Jobs;
using UnityEngine; using UnityEngine;
using UnityEngine.Networking;
using UnityEngine.UI; using UnityEngine.UI;
using TMPro;
public struct MyJob : IJob public struct MyJob : IJob
{ {
@ -194,7 +190,7 @@ public class SandboxMain : MonoBehaviour
Debug.Log("Done"); Debug.Log("Done");
} }
async void Start() async UniTaskVoid Start()
{ {
//var rp = new AsyncReactiveProperty<int>(10); //var rp = new AsyncReactiveProperty<int>(10);
@ -203,13 +199,12 @@ public class SandboxMain : MonoBehaviour
//await UniTaskAsyncEnumerable.EveryUpdate().Take(10).ForEachAsync((x, i) => rp.Value = i); //await UniTaskAsyncEnumerable.EveryUpdate().Take(10).ForEachAsync((x, i) => rp.Value = i);
//rp.Dispose(); //rp.Dispose();
var cts = new CancellationTokenSource();
Running(cts.Token).Forget();
var channel = Channel.CreateSingleConsumerUnbounded<int>();
Debug.Log("wait channel");
await channel.Reader.ReadAllAsync(this.GetCancellationTokenOnDestroy()).ForEachAsync(_ => { });
cts.CancelAfterSlim(TimeSpan.FromSeconds(3));
} }
async UniTaskVoid Running(CancellationToken ct) async UniTaskVoid Running(CancellationToken ct)