Add ReturnToMainThread, ReturnToSynchronizationContext, ReturnToCurrentSynchronizationContext

master
neuecc 2020-05-31 04:30:03 +09:00
parent 3001996298
commit a9baa52309
2 changed files with 319 additions and 36 deletions

View File

@ -17,7 +17,31 @@ namespace Cysharp.Threading.Tasks
/// </summary> /// </summary>
public static SwitchToMainThreadAwaitable SwitchToMainThread() public static SwitchToMainThreadAwaitable SwitchToMainThread()
{ {
return new SwitchToMainThreadAwaitable(); return new SwitchToMainThreadAwaitable(PlayerLoopTiming.Update);
}
/// <summary>
/// If running on mainthread, do nothing. Otherwise, same as UniTask.Yield(timing).
/// </summary>
public static SwitchToMainThreadAwaitable SwitchToMainThread(PlayerLoopTiming timing)
{
return new SwitchToMainThreadAwaitable(timing);
}
/// <summary>
/// Return to mainthread(same as await SwitchToMainThread) after using scope is closed.
/// </summary>
public static ReturnToMainThread ReturnToMainThread()
{
return new ReturnToMainThread(PlayerLoopTiming.Update);
}
/// <summary>
/// Return to mainthread(same as await SwitchToMainThread) after using scope is closed.
/// </summary>
public static ReturnToMainThread ReturnToMainThread(PlayerLoopTiming timing)
{
return new ReturnToMainThread(timing);
} }
#endif #endif
@ -27,15 +51,28 @@ namespace Cysharp.Threading.Tasks
return new SwitchToThreadPoolAwaitable(); return new SwitchToThreadPoolAwaitable();
} }
/// <summary>
/// Note: use SwitchToThreadPool is recommended.
/// </summary>
public static SwitchToTaskPoolAwaitable SwitchToTaskPool() public static SwitchToTaskPoolAwaitable SwitchToTaskPool()
{ {
return new SwitchToTaskPoolAwaitable(); return new SwitchToTaskPoolAwaitable();
} }
public static SwitchToSynchronizationContextAwaitable SwitchToSynchronizationContext(SynchronizationContext syncContext) public static SwitchToSynchronizationContextAwaitable SwitchToSynchronizationContext(SynchronizationContext synchronizationContext)
{ {
Error.ThrowArgumentNullException(syncContext, nameof(syncContext)); Error.ThrowArgumentNullException(synchronizationContext, nameof(synchronizationContext));
return new SwitchToSynchronizationContextAwaitable(syncContext); return new SwitchToSynchronizationContextAwaitable(synchronizationContext);
}
public static ReturnToSynchronizationContext ReturnToSynchronizationContext(SynchronizationContext synchronizationContext)
{
return new ReturnToSynchronizationContext(synchronizationContext);
}
public static ReturnToSynchronizationContext ReturnToCurrentSynchronizationContext()
{
return new ReturnToSynchronizationContext(SynchronizationContext.Current);
} }
} }
@ -43,10 +80,24 @@ namespace Cysharp.Threading.Tasks
public struct SwitchToMainThreadAwaitable public struct SwitchToMainThreadAwaitable
{ {
public Awaiter GetAwaiter() => new Awaiter(); readonly PlayerLoopTiming playerLoopTiming;
public SwitchToMainThreadAwaitable(PlayerLoopTiming playerLoopTiming)
{
this.playerLoopTiming = playerLoopTiming;
}
public Awaiter GetAwaiter() => new Awaiter(playerLoopTiming);
public struct Awaiter : ICriticalNotifyCompletion public struct Awaiter : ICriticalNotifyCompletion
{ {
readonly PlayerLoopTiming playerLoopTiming;
public Awaiter(PlayerLoopTiming playerLoopTiming)
{
this.playerLoopTiming = playerLoopTiming;
}
public bool IsCompleted public bool IsCompleted
{ {
get get
@ -67,12 +118,53 @@ namespace Cysharp.Threading.Tasks
public void OnCompleted(Action continuation) public void OnCompleted(Action continuation)
{ {
PlayerLoopHelper.AddContinuation(PlayerLoopTiming.Update, continuation); PlayerLoopHelper.AddContinuation(playerLoopTiming, continuation);
} }
public void UnsafeOnCompleted(Action continuation) public void UnsafeOnCompleted(Action continuation)
{ {
PlayerLoopHelper.AddContinuation(PlayerLoopTiming.Update, continuation); PlayerLoopHelper.AddContinuation(playerLoopTiming, continuation);
}
}
}
public struct ReturnToMainThread
{
readonly PlayerLoopTiming playerLoopTiming;
public ReturnToMainThread(PlayerLoopTiming playerLoopTiming)
{
this.playerLoopTiming = playerLoopTiming;
}
public Awaiter DisposeAsync()
{
return new Awaiter(playerLoopTiming); // run immediate.
}
public readonly struct Awaiter : ICriticalNotifyCompletion
{
readonly PlayerLoopTiming timing;
public Awaiter(PlayerLoopTiming timing)
{
this.timing = timing;
}
public Awaiter GetAwaiter() => this;
public bool IsCompleted => PlayerLoopHelper.MainThreadId == System.Threading.Thread.CurrentThread.ManagedThreadId;
public void GetResult() { }
public void OnCompleted(Action continuation)
{
PlayerLoopHelper.AddContinuation(timing, continuation);
}
public void UnsafeOnCompleted(Action continuation)
{
PlayerLoopHelper.AddContinuation(timing, continuation);
} }
} }
} }
@ -92,12 +184,16 @@ namespace Cysharp.Threading.Tasks
public void OnCompleted(Action continuation) public void OnCompleted(Action continuation)
{ {
ThreadPool.UnsafeQueueUserWorkItem(switchToCallback, continuation); ThreadPool.QueueUserWorkItem(switchToCallback, continuation);
} }
public void UnsafeOnCompleted(Action continuation) public void UnsafeOnCompleted(Action continuation)
{ {
#if NETCOREAPP3_1
ThreadPool.UnsafeQueueUserWorkItem(ThreadPoolWorkItem.Create(continuation), false);
#else
ThreadPool.UnsafeQueueUserWorkItem(switchToCallback, continuation); ThreadPool.UnsafeQueueUserWorkItem(switchToCallback, continuation);
#endif
} }
static void Callback(object state) static void Callback(object state)
@ -106,6 +202,47 @@ namespace Cysharp.Threading.Tasks
continuation(); continuation();
} }
} }
#if NETCOREAPP3_1
sealed class ThreadPoolWorkItem : IThreadPoolWorkItem, ITaskPoolNode<ThreadPoolWorkItem>
{
static TaskPool<ThreadPoolWorkItem> pool;
public ThreadPoolWorkItem NextNode { get; set; }
static ThreadPoolWorkItem()
{
TaskPool.RegisterSizeGetter(typeof(ThreadPoolWorkItem), () => pool.Size);
}
Action continuation;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ThreadPoolWorkItem Create(Action continuation)
{
if (!pool.TryPop(out var item))
{
item = new ThreadPoolWorkItem();
}
item.continuation = continuation;
return item;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Execute()
{
var call = continuation;
continuation = null;
if (call != null)
{
pool.TryPush(this);
call.Invoke();
}
}
}
#endif
} }
public struct SwitchToTaskPoolAwaitable public struct SwitchToTaskPoolAwaitable
@ -178,5 +315,19 @@ namespace Cysharp.Threading.Tasks
} }
} }
} }
public struct ReturnToSynchronizationContext
{
readonly SynchronizationContext syncContext;
public ReturnToSynchronizationContext(SynchronizationContext syncContext)
{
this.syncContext = syncContext;
} }
public SwitchToSynchronizationContextAwaitable DisposeAsync()
{
return UniTask.SwitchToSynchronizationContext(syncContext);
}
}
}

View File

@ -1,4 +1,5 @@
using Cysharp.Threading.Tasks; using Cysharp.Threading.Tasks;
using System.Linq;
using Cysharp.Threading.Tasks.Linq; using Cysharp.Threading.Tasks.Linq;
using Cysharp.Threading.Tasks.Triggers; using Cysharp.Threading.Tasks.Triggers;
using System; using System;
@ -10,6 +11,7 @@ using System.Threading.Tasks;
using Unity.Collections; using Unity.Collections;
using Unity.Jobs; using Unity.Jobs;
using UnityEngine; using UnityEngine;
using UnityEngine.LowLevel;
using UnityEngine.Networking; using UnityEngine.Networking;
using UnityEngine.UI; using UnityEngine.UI;
@ -320,43 +322,132 @@ public class SandboxMain : MonoBehaviour
//Debug.Log("AGAIN END MOVE"); //Debug.Log("AGAIN END MOVE");
//// DOTween.To(
//var cts = new CancellationTokenSource();
////var tween = okButton.GetComponent<RectTransform>().DOLocalMoveX(100, 5.0f);
//cancelButton.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
//{
// cts.Cancel();
//}).Forget();
//// await tween.ToUniTask(TweenCancelBehaviour.KillAndCancelAwait, cts.Token);
////tween.SetRecyclable(true);
//Debug.Log("END");
//// tween.Play();
//// DOTween.
//// DOVirtual.Float(0, 1, 1, x => { }).ToUniTask();
//await foreach (var _ in UniTaskAsyncEnumerable.EveryUpdate())
//{
// Debug.Log("Update() " + Time.frameCount);
//}
//await okButton.OnClickAsAsyncEnumerable().Where((x, i) => i % 2 == 0).ForEachAsync(_ =>
//{
//});
//okButton.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
//{
//}).Forget();
//CloseAsync(this.GetCancellationTokenOnDestroy()).Forget();
//okButton.onClick.AddListener(UniTask.UnityAction(async () => await UniTask.Yield()));
PlayerLoopInfo.Inject();
//UpdateUniTask().Forget();
//StartCoroutine(Coroutine());
//await UniTask.Delay(TimeSpan.FromSeconds(1));
_ = ReturnToMainThreadTest();
//GameObject.Destroy(this.gameObject);
SynchronizationContext.Current.Post(_ =>
{
//UnityEngine.Debug.Log("Post:" + PlayerLoopInfo.CurrentLoopType);
}, null);
}
async UniTaskVoid UpdateUniTask()
{
while (true)
{
await UniTask.Yield(); await UniTask.Yield();
// DOTween.To( UnityEngine.Debug.Log("UniTaskYield:" + PlayerLoopInfo.CurrentLoopType);
}
}
var cts = new CancellationTokenSource();
//var tween = okButton.GetComponent<RectTransform>().DOLocalMoveX(100, 5.0f); async UniTaskVoid ReturnToMainThreadTest()
cancelButton.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
{ {
cts.Cancel(); var d = UniTask.ReturnToCurrentSynchronizationContext();
}).Forget(); try
// await tween.ToUniTask(TweenCancelBehaviour.KillAndCancelAwait, cts.Token);
//tween.SetRecyclable(true);
Debug.Log("END");
// tween.Play();
// DOTween.
// DOVirtual.Float(0, 1, 1, x => { }).ToUniTask();
okButton.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
{ {
UnityEngine.Debug.Log("In MainThread?" + Thread.CurrentThread.ManagedThreadId);
UnityEngine.Debug.Log("SyncContext is null?" + (SynchronizationContext.Current == null));
await UniTask.SwitchToThreadPool();
UnityEngine.Debug.Log("In ThreadPool?" + Thread.CurrentThread.ManagedThreadId);
UnityEngine.Debug.Log("SyncContext is null?" + (SynchronizationContext.Current == null));
}
finally
{
await d.DisposeAsync();
}
UnityEngine.Debug.Log("In ThreadPool?" + Thread.CurrentThread.ManagedThreadId);
UnityEngine.Debug.Log("SyncContext is null2" + (SynchronizationContext.Current == null));
}
private void Update()
{
// UnityEngine.Debug.Log("Update:" + PlayerLoopInfo.CurrentLoopType);
}
IEnumerator Coroutine()
{
}).Forget(); try
{
CloseAsync(this.GetCancellationTokenOnDestroy()).Forget(); while (true)
{
okButton.onClick.AddListener(UniTask.UnityAction(async () => await UniTask.Yield())); yield return null;
//UnityEngine.Debug.Log("Coroutine null:" + PlayerLoopInfo.CurrentLoopType);
}
}
finally
{
UnityEngine.Debug.Log("Coroutine Finally");
}
} }
async UniTaskVoid CloseAsync(CancellationToken cancellationToken = default) async UniTaskVoid CloseAsync(CancellationToken cancellationToken = default)
@ -597,7 +688,7 @@ public class SandboxMain : MonoBehaviour
} }
} }
public class ShowPlayerLoop public class PlayerLoopInfo
{ {
// [RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.SubsystemRegistration)] // [RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.SubsystemRegistration)]
static void Init() static void Init()
@ -628,4 +719,45 @@ public class ShowPlayerLoop
UnityEngine.Debug.Log(sb.ToString()); UnityEngine.Debug.Log(sb.ToString());
} }
public static Type CurrentLoopType { get; private set; }
public static void Inject()
{
var system = PlayerLoop.GetCurrentPlayerLoop();
for (int i = 0; i < system.subSystemList.Length; i++)
{
var loop = system.subSystemList[i].subSystemList.SelectMany(x =>
{
var t = typeof(WrapLoop<>).MakeGenericType(x.type);
var instance = (ILoopRunner)Activator.CreateInstance(t, x.type);
return new[] { new PlayerLoopSystem { type = t, updateDelegate = instance.Run }, x };
}).ToArray();
system.subSystemList[i].subSystemList = loop;
}
PlayerLoop.SetPlayerLoop(system);
}
interface ILoopRunner
{
void Run();
}
class WrapLoop<T> : ILoopRunner
{
readonly Type type;
public WrapLoop(Type type)
{
this.type = type;
}
public void Run()
{
CurrentLoopType = type;
}
}
} }