rename StackNode to TaskPool

master
neuecc 2020-05-29 02:08:11 +09:00
parent a8455af16d
commit d5db96b913
6 changed files with 90 additions and 63 deletions

View File

@ -234,7 +234,7 @@ namespace NetCoreSandbox
await c; await c;
foreach (var item in Cysharp.Threading.Tasks.Internal.StackNodeMonitor.GetCacheSizeInfo()) foreach (var item in Cysharp.Threading.Tasks.Internal.TaskPoolMonitor.GetCacheSizeInfo())
{ {
Console.WriteLine(item); Console.WriteLine(item);
} }

View File

@ -29,9 +29,11 @@ namespace Cysharp.Threading.Tasks.CompilerServices
void SetException(Exception exception); void SetException(Exception exception);
} }
internal sealed class MoveNextRunner<TStateMachine> : IMoveNextRunner, IStackNode<MoveNextRunner<TStateMachine>> internal sealed class MoveNextRunner<TStateMachine> : IMoveNextRunner, ITaskPoolNode<MoveNextRunner<TStateMachine>>
where TStateMachine : IAsyncStateMachine where TStateMachine : IAsyncStateMachine
{ {
static TaskPool<MoveNextRunner<TStateMachine>> pool;
TStateMachine stateMachine; TStateMachine stateMachine;
public Action MoveNext { get; } public Action MoveNext { get; }
@ -43,7 +45,7 @@ namespace Cysharp.Threading.Tasks.CompilerServices
public static void SetStateMachine(ref AsyncUniTaskVoidMethodBuilder builder, ref TStateMachine stateMachine) public static void SetStateMachine(ref AsyncUniTaskVoidMethodBuilder builder, ref TStateMachine stateMachine)
{ {
if (!StackNodeHelper.TryPop(ref cacheLock, ref size, ref nodeRoot, out var result)) if (!pool.TryPop(out var result))
{ {
result = new MoveNextRunner<TStateMachine>(); result = new MoveNextRunner<TStateMachine>();
} }
@ -52,13 +54,9 @@ namespace Cysharp.Threading.Tasks.CompilerServices
result.stateMachine = stateMachine; // copy struct StateMachine(in release build). result.stateMachine = stateMachine; // copy struct StateMachine(in release build).
} }
static int cacheLock;
static int size;
static MoveNextRunner<TStateMachine> nodeRoot;
static MoveNextRunner() static MoveNextRunner()
{ {
StackNodeMonitor.RegisterSizeGettter(typeof(MoveNextRunner<TStateMachine>), () => size); TaskPoolMonitor.RegisterSizeGettter(typeof(MoveNextRunner<TStateMachine>), () => pool.Size);
} }
public MoveNextRunner<TStateMachine> NextNode { get; set; } public MoveNextRunner<TStateMachine> NextNode { get; set; }
@ -66,7 +64,7 @@ namespace Cysharp.Threading.Tasks.CompilerServices
public void Return() public void Return()
{ {
stateMachine = default; stateMachine = default;
StackNodeHelper.TryPush(ref cacheLock, ref size, ref nodeRoot, this); pool.TryPush(this);
} }
[DebuggerHidden] [DebuggerHidden]
@ -77,9 +75,11 @@ namespace Cysharp.Threading.Tasks.CompilerServices
} }
} }
internal class MoveNextRunnerPromise<TStateMachine> : IMoveNextRunnerPromise, IUniTaskSource, IStackNode<MoveNextRunnerPromise<TStateMachine>> internal class MoveNextRunnerPromise<TStateMachine> : IMoveNextRunnerPromise, IUniTaskSource, ITaskPoolNode<MoveNextRunnerPromise<TStateMachine>>
where TStateMachine : IAsyncStateMachine where TStateMachine : IAsyncStateMachine
{ {
static TaskPool<MoveNextRunnerPromise<TStateMachine>> pool;
TStateMachine stateMachine; TStateMachine stateMachine;
public Action MoveNext { get; } public Action MoveNext { get; }
@ -93,7 +93,7 @@ namespace Cysharp.Threading.Tasks.CompilerServices
public static void SetStateMachine(ref AsyncUniTaskMethodBuilder builder, ref TStateMachine stateMachine) public static void SetStateMachine(ref AsyncUniTaskMethodBuilder builder, ref TStateMachine stateMachine)
{ {
if (!StackNodeHelper.TryPop(ref cacheLock, ref size, ref nodeRoot, out var result)) if (!pool.TryPop(out var result))
{ {
result = new MoveNextRunnerPromise<TStateMachine>(); result = new MoveNextRunnerPromise<TStateMachine>();
} }
@ -103,15 +103,11 @@ namespace Cysharp.Threading.Tasks.CompilerServices
result.stateMachine = stateMachine; // copy struct StateMachine(in release build). result.stateMachine = stateMachine; // copy struct StateMachine(in release build).
} }
static int cacheLock;
static int size;
static MoveNextRunnerPromise<TStateMachine> nodeRoot;
public MoveNextRunnerPromise<TStateMachine> NextNode { get; set; } public MoveNextRunnerPromise<TStateMachine> NextNode { get; set; }
static MoveNextRunnerPromise() static MoveNextRunnerPromise()
{ {
StackNodeMonitor.RegisterSizeGettter(typeof(MoveNextRunnerPromise<TStateMachine>), () => size); TaskPoolMonitor.RegisterSizeGettter(typeof(MoveNextRunnerPromise<TStateMachine>), () => pool.Size);
} }
bool TryReturn() bool TryReturn()
@ -119,7 +115,7 @@ namespace Cysharp.Threading.Tasks.CompilerServices
TaskTracker.RemoveTracking(this); TaskTracker.RemoveTracking(this);
core.Reset(); core.Reset();
stateMachine = default; stateMachine = default;
return StackNodeHelper.TryPush(ref cacheLock, ref size, ref nodeRoot, this); return pool.TryPush(this);
} }
[DebuggerHidden] [DebuggerHidden]
@ -190,9 +186,11 @@ namespace Cysharp.Threading.Tasks.CompilerServices
} }
} }
internal class MoveNextRunnerPromise<TStateMachine, T> : IMoveNextRunnerPromise<T>, IUniTaskSource<T>, IStackNode<MoveNextRunnerPromise<TStateMachine, T>> internal class MoveNextRunnerPromise<TStateMachine, T> : IMoveNextRunnerPromise<T>, IUniTaskSource<T>, ITaskPoolNode<MoveNextRunnerPromise<TStateMachine, T>>
where TStateMachine : IAsyncStateMachine where TStateMachine : IAsyncStateMachine
{ {
static TaskPool<MoveNextRunnerPromise<TStateMachine, T>> pool;
TStateMachine stateMachine; TStateMachine stateMachine;
public Action MoveNext { get; } public Action MoveNext { get; }
@ -206,7 +204,7 @@ namespace Cysharp.Threading.Tasks.CompilerServices
public static void SetStateMachine(ref AsyncUniTaskMethodBuilder<T> builder, ref TStateMachine stateMachine) public static void SetStateMachine(ref AsyncUniTaskMethodBuilder<T> builder, ref TStateMachine stateMachine)
{ {
if (!StackNodeHelper.TryPop(ref cacheLock, ref size, ref nodeRoot, out var result)) if (!pool.TryPop(out var result))
{ {
result = new MoveNextRunnerPromise<TStateMachine, T>(); result = new MoveNextRunnerPromise<TStateMachine, T>();
} }
@ -216,15 +214,11 @@ namespace Cysharp.Threading.Tasks.CompilerServices
result.stateMachine = stateMachine; // copy struct StateMachine(in release build). result.stateMachine = stateMachine; // copy struct StateMachine(in release build).
} }
static int cacheLock;
static int size;
static MoveNextRunnerPromise<TStateMachine, T> nodeRoot;
public MoveNextRunnerPromise<TStateMachine, T> NextNode { get; set; } public MoveNextRunnerPromise<TStateMachine, T> NextNode { get; set; }
static MoveNextRunnerPromise() static MoveNextRunnerPromise()
{ {
StackNodeMonitor.RegisterSizeGettter(typeof(MoveNextRunnerPromise<TStateMachine, T>), () => size); TaskPoolMonitor.RegisterSizeGettter(typeof(MoveNextRunnerPromise<TStateMachine, T>), () => pool.Size);
} }
bool TryReturn() bool TryReturn()
@ -232,7 +226,7 @@ namespace Cysharp.Threading.Tasks.CompilerServices
TaskTracker.RemoveTracking(this); TaskTracker.RemoveTracking(this);
core.Reset(); core.Reset();
stateMachine = default; stateMachine = default;
return StackNodeHelper.TryPush(ref cacheLock, ref size, ref nodeRoot, this); return pool.TryPush(this);
} }
[DebuggerHidden] [DebuggerHidden]

View File

@ -30,9 +30,34 @@ namespace Cysharp.Threading.Tasks
return new UniTask(EnumeratorPromise.Create(enumerator, timing, cancellationToken, out var token), token); return new UniTask(EnumeratorPromise.Create(enumerator, timing, cancellationToken, out var token), token);
} }
class EnumeratorPromise : IUniTaskSource, IPlayerLoopItem, IPromisePoolItem class EnumeratorPromise : IUniTaskSource, IPlayerLoopItem, ITaskPoolNode<EnumeratorPromise>
{ {
static readonly PromisePool<EnumeratorPromise> pool = new PromisePool<EnumeratorPromise>(); static TaskPool<EnumeratorPromise> pool;
public EnumeratorPromise NextNode { get; set; }
static EnumeratorPromise()
{
TaskPoolMonitor.RegisterSizeGettter(typeof(EnumeratorPromise), () => pool.Size);
}
static EnumeratorPromise Create()
{
if (!pool.TryPop(out var result))
{
result = new EnumeratorPromise();
}
TaskTracker.TrackActiveTask(result, 4);
return result;
}
bool TryReturn()
{
TaskTracker.RemoveTracking(this);
core.Reset();
innerEnumerator = default;
cancellationToken = default;
return pool.TryPush(this);
}
IEnumerator innerEnumerator; IEnumerator innerEnumerator;
CancellationToken cancellationToken; CancellationToken cancellationToken;
@ -50,13 +75,11 @@ namespace Cysharp.Threading.Tasks
return AutoResetUniTaskCompletionSource.CreateFromCanceled(cancellationToken, out token); return AutoResetUniTaskCompletionSource.CreateFromCanceled(cancellationToken, out token);
} }
var result = pool.TryRent() ?? new EnumeratorPromise(); var result = Create();
result.innerEnumerator = ConsumeEnumerator(innerEnumerator); result.innerEnumerator = ConsumeEnumerator(innerEnumerator);
result.cancellationToken = cancellationToken; result.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(result, 3);
PlayerLoopHelper.AddAction(timing, result); PlayerLoopHelper.AddAction(timing, result);
token = result.core.Version; token = result.core.Version;
@ -67,12 +90,11 @@ namespace Cysharp.Threading.Tasks
{ {
try try
{ {
TaskTracker.RemoveTracking(this);
core.GetResult(token); core.GetResult(token);
} }
finally finally
{ {
pool.TryReturn(this); TryReturn();
} }
} }
@ -125,7 +147,7 @@ namespace Cysharp.Threading.Tasks
~EnumeratorPromise() ~EnumeratorPromise()
{ {
if (pool.TryReturn(this)) if (TryReturn())
{ {
GC.ReRegisterForFinalize(this); GC.ReRegisterForFinalize(this);
} }

View File

@ -1,18 +1,22 @@
using System; using System;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
namespace Cysharp.Threading.Tasks.Internal namespace Cysharp.Threading.Tasks.Internal
{ {
internal sealed class PooledDelegate<T> internal sealed class PooledDelegate<T> : ITaskPoolNode<PooledDelegate<T>>
{ {
static readonly ConcurrentQueue<PooledDelegate<T>> pool = new ConcurrentQueue<PooledDelegate<T>>(); static TaskPool<PooledDelegate<T>> pool;
public PooledDelegate<T> NextNode { get; set; }
static PooledDelegate()
{
TaskPoolMonitor.RegisterSizeGettter(typeof(PooledDelegate<T>), () => pool.Size);
}
readonly Action<T> runDelegate; readonly Action<T> runDelegate;
Action continuation; Action continuation;
PooledDelegate() PooledDelegate()
{ {
runDelegate = Run; runDelegate = Run;
@ -21,7 +25,7 @@ namespace Cysharp.Threading.Tasks.Internal
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
public static Action<T> Create(Action continuation) public static Action<T> Create(Action continuation)
{ {
if (!pool.TryDequeue(out var item)) if (!pool.TryPop(out var item))
{ {
item = new PooledDelegate<T>(); item = new PooledDelegate<T>();
} }
@ -37,7 +41,7 @@ namespace Cysharp.Threading.Tasks.Internal
continuation = null; continuation = null;
if (call != null) if (call != null)
{ {
pool.Enqueue(this); pool.TryPush(this);
call.Invoke(); call.Invoke();
} }
} }

View File

@ -8,16 +8,11 @@ namespace Cysharp.Threading.Tasks.Internal
{ {
// internaly used but public, allow to user create custom operator with pooling. // internaly used but public, allow to user create custom operator with pooling.
public interface IStackNode<T> public static class TaskPool
{ {
T NextNode { get; set; } internal static int MaxPoolSize;
}
public static class StackNodeHelper static TaskPool()
{
static int MaxPoolSize;
static StackNodeHelper()
{ {
try try
{ {
@ -40,15 +35,28 @@ namespace Cysharp.Threading.Tasks.Internal
{ {
MaxPoolSize = maxPoolSize; MaxPoolSize = maxPoolSize;
} }
}
// Strictness as a Stack is not required.
// If there is a conflict, it will go through as is. public interface ITaskPoolNode<T>
{
T NextNode { get; set; }
}
// mutable struct, don't mark readonly.
public struct TaskPool<T>
where T : class, ITaskPoolNode<T>
{
int gate;
int size;
T root;
public int Size => size;
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool TryPop<T>(ref int stackLock, ref int size, ref T root, out T result) public bool TryPop(out T result)
where T : class, IStackNode<T>
{ {
if (Interlocked.CompareExchange(ref stackLock, 1, 0) == 0) if (Interlocked.CompareExchange(ref gate, 1, 0) == 0)
{ {
var v = root; var v = root;
if (!(v is null)) if (!(v is null))
@ -57,40 +65,39 @@ namespace Cysharp.Threading.Tasks.Internal
v.NextNode = null; v.NextNode = null;
size--; size--;
result = v; result = v;
Volatile.Write(ref stackLock, 0); Volatile.Write(ref gate, 0);
return true; return true;
} }
Volatile.Write(ref stackLock, 0); Volatile.Write(ref gate, 0);
} }
result = default; result = default;
return false; return false;
} }
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool TryPush<T>(ref int stackLock, ref int size, ref T root, T item) public bool TryPush(T item)
where T : class, IStackNode<T>
{ {
if (Interlocked.CompareExchange(ref stackLock, 1, 0) == 0) if (Interlocked.CompareExchange(ref gate, 1, 0) == 0)
{ {
if (size < MaxPoolSize) if (size < TaskPool.MaxPoolSize)
{ {
item.NextNode = root; item.NextNode = root;
root = item; root = item;
size++; size++;
Volatile.Write(ref stackLock, 0); Volatile.Write(ref gate, 0);
return true; return true;
} }
else else
{ {
Volatile.Write(ref stackLock, 0); Volatile.Write(ref gate, 0);
} }
} }
return false; return false;
} }
} }
public static class StackNodeMonitor public static class TaskPoolMonitor
{ {
static ConcurrentDictionary<Type, Func<int>> sizes = new ConcurrentDictionary<Type, Func<int>>(); static ConcurrentDictionary<Type, Func<int>> sizes = new ConcurrentDictionary<Type, Func<int>>();