Improve pooling mechanism

master
neuecc 2020-05-29 01:22:46 +09:00
parent 2290b14532
commit a8455af16d
7 changed files with 294 additions and 76 deletions

View File

@ -29,7 +29,12 @@ public class AllocationCheck
{ {
for (int i = 0; i < InnerOps; i++) for (int i = 0; i < InnerOps; i++)
{ {
await Core(); var a = Core();
var b = Core();
var c = Core();
await a;
await b;
await c;
} }
static async UniTask Core() static async UniTask Core()
@ -46,7 +51,12 @@ public class AllocationCheck
var sum = 0; var sum = 0;
for (int i = 0; i < InnerOps; i++) for (int i = 0; i < InnerOps; i++)
{ {
sum += await Core(); var a = Core();
var b = Core();
var c = Core();
sum += await a;
sum += await b;
sum += await c;
} }
return sum; return sum;
@ -59,14 +69,16 @@ public class AllocationCheck
} }
} }
[Benchmark(OperationsPerInvoke = InnerOps)] //[Benchmark(OperationsPerInvoke = InnerOps)]
public Task ViaUniTaskVoid() //[Benchmark]
public void ViaUniTaskVoid()
{ {
for (int i = 0; i < InnerOps; i++) for (int i = 0; i < InnerOps; i++)
{ {
Core().Forget(); Core().Forget();
Core().Forget();
Core().Forget();
} }
return Task.CompletedTask;
static async UniTaskVoid Core() static async UniTaskVoid Core()
{ {
@ -75,6 +87,46 @@ public class AllocationCheck
await new TestAwaiter(false, UniTaskStatus.Succeeded); await new TestAwaiter(false, UniTaskStatus.Succeeded);
} }
} }
struct Foo : IAsyncStateMachine
{
public AsyncUniTaskVoidMethodBuilder builder;
public TestAwaiter awaiter;
public TestAwaiter awaiterawaiter;
public int state;
public void MoveNext()
{
switch (state)
{
case -1:
awaiterawaiter = awaiter.GetAwaiter();
if (awaiterawaiter.IsCompleted)
{
goto case 0;
}
else
{
state = 0;
builder.AwaitUnsafeOnCompleted(ref awaiterawaiter, ref this);
return;
}
case 0:
default:
goto END;
}
END:
builder.SetResult();
}
public void SetStateMachine(IAsyncStateMachine stateMachine)
{
}
}
} }
public class TaskTestException : Exception public class TaskTestException : Exception
@ -170,7 +222,15 @@ public struct TestAwaiter<T> : ICriticalNotifyCompletion
public sealed class ThreadPoolWorkItem : IThreadPoolWorkItem public sealed class ThreadPoolWorkItem : IThreadPoolWorkItem
{ {
static readonly ConcurrentQueue<ThreadPoolWorkItem> pool = new ConcurrentQueue<ThreadPoolWorkItem>(); public static readonly ConcurrentQueue<ThreadPoolWorkItem> pool = new ConcurrentQueue<ThreadPoolWorkItem>();
public static void CreatePoolItems(int count)
{
for (int i = 0; i < count; i++)
{
pool.Enqueue(new ThreadPoolWorkItem());
}
}
Action continuation; Action continuation;

View File

@ -37,7 +37,7 @@ public class BenchmarkConfig : ManualConfig
public BenchmarkConfig() public BenchmarkConfig()
{ {
AddDiagnoser(MemoryDiagnoser.Default); AddDiagnoser(MemoryDiagnoser.Default);
AddJob(Job.ShortRun.WithLaunchCount(1).WithIterationCount(1).WithWarmupCount(1)); AddJob(Job.ShortRun.WithLaunchCount(1).WithIterationCount(1).WithWarmupCount(1)/*.RunOncePerIteration()*/);
} }
} }

View File

@ -198,12 +198,15 @@ namespace NetCoreSandbox
static async Task Main(string[] args) static async Task Main(string[] args)
{ {
#if !DEBUG #if !DEBUG
//await new AllocationCheck().ViaUniTaskVoid();
//Console.ReadLine();
BenchmarkDotNet.Running.BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args); BenchmarkDotNet.Running.BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args);
//await new ComparisonBenchmarks().ViaUniTaskT(); //await new ComparisonBenchmarks().ViaUniTaskT();
return; return;
#endif #endif
// await new AllocationCheck().ViaUniTaskVoid();
// AsyncTest().Forget(); // AsyncTest().Forget();
@ -212,24 +215,43 @@ namespace NetCoreSandbox
// AsyncTest().Forget(); // AsyncTest().Forget();
ThreadPool.SetMinThreads(100, 100);
List<UniTask<int>> list = new List<UniTask<int>>();
for (int i = 0; i < 321; i++)
{
list.Add(AsyncTest());
}
//await UniTask.WhenAll(list);
Console.WriteLine("TOGO");
var a = await AsyncTest();
var b = AsyncTest();
var c = AsyncTest();
await b;
await c;
foreach (var item in Cysharp.Threading.Tasks.Internal.StackNodeMonitor.GetCacheSizeInfo())
{
Console.WriteLine(item);
}
await UniTask.Yield();
Console.ReadLine(); Console.ReadLine();
} }
#pragma warning disable CS1998 #pragma warning disable CS1998
static async UniTaskVoid AsyncTest() static async UniTask<int> AsyncTest()
{ {
// empty // empty
await new TestAwaiter(false, UniTaskStatus.Succeeded); await new TestAwaiter(false, UniTaskStatus.Succeeded);
await new TestAwaiter(true, UniTaskStatus.Succeeded); await new TestAwaiter(true, UniTaskStatus.Succeeded);
await new TestAwaiter(false, UniTaskStatus.Succeeded); await new TestAwaiter(false, UniTaskStatus.Succeeded);
return 10;
Console.WriteLine("foo");
//return 10;
} }

View File

@ -68,7 +68,7 @@ namespace Cysharp.Threading.Tasks.CompilerServices
MoveNextRunner<TStateMachine>.SetStateMachine(ref this, ref stateMachine); MoveNextRunner<TStateMachine>.SetStateMachine(ref this, ref stateMachine);
} }
awaiter.OnCompleted(runner.CallMoveNext); awaiter.OnCompleted(runner.MoveNext);
} }
// 6. AwaitUnsafeOnCompleted // 6. AwaitUnsafeOnCompleted
@ -83,7 +83,7 @@ namespace Cysharp.Threading.Tasks.CompilerServices
MoveNextRunner<TStateMachine>.SetStateMachine(ref this, ref stateMachine); MoveNextRunner<TStateMachine>.SetStateMachine(ref this, ref stateMachine);
} }
awaiter.UnsafeOnCompleted(runner.CallMoveNext); awaiter.UnsafeOnCompleted(runner.MoveNext);
} }
// 7. Start // 7. Start

View File

@ -1,17 +1,15 @@
 #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
using Cysharp.Threading.Tasks.Internal;
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Threading;
using Cysharp.Threading.Tasks.Internal;
namespace Cysharp.Threading.Tasks.CompilerServices namespace Cysharp.Threading.Tasks.CompilerServices
{ {
internal interface IMoveNextRunner public interface IMoveNextRunner
{ {
Action CallMoveNext { get; } Action MoveNext { get; }
void Return(); void Return();
} }
@ -31,25 +29,21 @@ namespace Cysharp.Threading.Tasks.CompilerServices
void SetException(Exception exception); void SetException(Exception exception);
} }
internal sealed class MoveNextRunner<TStateMachine> : IMoveNextRunner, IPromisePoolItem internal sealed class MoveNextRunner<TStateMachine> : IMoveNextRunner, IStackNode<MoveNextRunner<TStateMachine>>
where TStateMachine : IAsyncStateMachine where TStateMachine : IAsyncStateMachine
{ {
static PromisePool<MoveNextRunner<TStateMachine>> pool = new PromisePool<MoveNextRunner<TStateMachine>>();
TStateMachine stateMachine; TStateMachine stateMachine;
readonly Action callMoveNext;
public Action CallMoveNext => callMoveNext; public Action MoveNext { get; }
MoveNextRunner() public MoveNextRunner()
{ {
callMoveNext = Run; MoveNext = Run;
} }
public static void SetStateMachine(ref AsyncUniTaskVoidMethodBuilder builder, ref TStateMachine stateMachine) public static void SetStateMachine(ref AsyncUniTaskVoidMethodBuilder builder, ref TStateMachine stateMachine)
{ {
var result = pool.TryRent(); if (!StackNodeHelper.TryPop(ref cacheLock, ref size, ref nodeRoot, out var result))
if (result == null)
{ {
result = new MoveNextRunner<TStateMachine>(); result = new MoveNextRunner<TStateMachine>();
} }
@ -58,29 +52,34 @@ namespace Cysharp.Threading.Tasks.CompilerServices
result.stateMachine = stateMachine; // copy struct StateMachine(in release build). result.stateMachine = stateMachine; // copy struct StateMachine(in release build).
} }
static int cacheLock;
static int size;
static MoveNextRunner<TStateMachine> nodeRoot;
static MoveNextRunner()
{
StackNodeMonitor.RegisterSizeGettter(typeof(MoveNextRunner<TStateMachine>), () => size);
}
public MoveNextRunner<TStateMachine> NextNode { get; set; }
public void Return()
{
stateMachine = default;
StackNodeHelper.TryPush(ref cacheLock, ref size, ref nodeRoot, this);
}
[DebuggerHidden] [DebuggerHidden]
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
void Run() void Run()
{ {
stateMachine.MoveNext(); stateMachine.MoveNext();
} }
public void Return()
{
pool.TryReturn(this);
} }
void IPromisePoolItem.Reset() internal class MoveNextRunnerPromise<TStateMachine> : IMoveNextRunnerPromise, IUniTaskSource, IStackNode<MoveNextRunnerPromise<TStateMachine>>
{
stateMachine = default;
}
}
internal class MoveNextRunnerPromise<TStateMachine> : IMoveNextRunnerPromise, IUniTaskSource, IPromisePoolItem
where TStateMachine : IAsyncStateMachine where TStateMachine : IAsyncStateMachine
{ {
static readonly PromisePool<MoveNextRunnerPromise<TStateMachine>> pool = new PromisePool<MoveNextRunnerPromise<TStateMachine>>();
TStateMachine stateMachine; TStateMachine stateMachine;
public Action MoveNext { get; } public Action MoveNext { get; }
@ -94,18 +93,35 @@ namespace Cysharp.Threading.Tasks.CompilerServices
public static void SetStateMachine(ref AsyncUniTaskMethodBuilder builder, ref TStateMachine stateMachine) public static void SetStateMachine(ref AsyncUniTaskMethodBuilder builder, ref TStateMachine stateMachine)
{ {
var result = pool.TryRent(); if (!StackNodeHelper.TryPop(ref cacheLock, ref size, ref nodeRoot, out var result))
if (result == null)
{ {
result = new MoveNextRunnerPromise<TStateMachine>(); result = new MoveNextRunnerPromise<TStateMachine>();
} }
TaskTracker.TrackActiveTask(result, 3);
TaskTracker.TrackActiveTask(result, 2);
builder.runnerPromise = result; // set runner before copied. builder.runnerPromise = result; // set runner before copied.
result.stateMachine = stateMachine; // copy struct StateMachine(in release build). result.stateMachine = stateMachine; // copy struct StateMachine(in release build).
} }
static int cacheLock;
static int size;
static MoveNextRunnerPromise<TStateMachine> nodeRoot;
public MoveNextRunnerPromise<TStateMachine> NextNode { get; set; }
static MoveNextRunnerPromise()
{
StackNodeMonitor.RegisterSizeGettter(typeof(MoveNextRunnerPromise<TStateMachine>), () => size);
}
bool TryReturn()
{
TaskTracker.RemoveTracking(this);
core.Reset();
stateMachine = default;
return StackNodeHelper.TryPush(ref cacheLock, ref size, ref nodeRoot, this);
}
[DebuggerHidden] [DebuggerHidden]
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
void Run() void Run()
@ -139,12 +155,11 @@ namespace Cysharp.Threading.Tasks.CompilerServices
{ {
try try
{ {
TaskTracker.RemoveTracking(this);
core.GetResult(token); core.GetResult(token);
} }
finally finally
{ {
pool.TryReturn(this); TryReturn();
} }
} }
@ -166,27 +181,18 @@ namespace Cysharp.Threading.Tasks.CompilerServices
core.OnCompleted(continuation, state, token); core.OnCompleted(continuation, state, token);
} }
[DebuggerHidden]
void IPromisePoolItem.Reset()
{
stateMachine = default;
core.Reset();
}
~MoveNextRunnerPromise() ~MoveNextRunnerPromise()
{ {
if (pool.TryReturn(this)) if (TryReturn())
{ {
GC.ReRegisterForFinalize(this); GC.ReRegisterForFinalize(this);
} }
} }
} }
internal class MoveNextRunnerPromise<TStateMachine, T> : IMoveNextRunnerPromise<T>, IUniTaskSource<T>, IPromisePoolItem internal class MoveNextRunnerPromise<TStateMachine, T> : IMoveNextRunnerPromise<T>, IUniTaskSource<T>, IStackNode<MoveNextRunnerPromise<TStateMachine, T>>
where TStateMachine : IAsyncStateMachine where TStateMachine : IAsyncStateMachine
{ {
static readonly PromisePool<MoveNextRunnerPromise<TStateMachine, T>> pool = new PromisePool<MoveNextRunnerPromise<TStateMachine, T>>();
TStateMachine stateMachine; TStateMachine stateMachine;
public Action MoveNext { get; } public Action MoveNext { get; }
@ -200,18 +206,35 @@ namespace Cysharp.Threading.Tasks.CompilerServices
public static void SetStateMachine(ref AsyncUniTaskMethodBuilder<T> builder, ref TStateMachine stateMachine) public static void SetStateMachine(ref AsyncUniTaskMethodBuilder<T> builder, ref TStateMachine stateMachine)
{ {
var result = pool.TryRent(); if (!StackNodeHelper.TryPop(ref cacheLock, ref size, ref nodeRoot, out var result))
if (result == null)
{ {
result = new MoveNextRunnerPromise<TStateMachine, T>(); result = new MoveNextRunnerPromise<TStateMachine, T>();
} }
TaskTracker.TrackActiveTask(result, 3);
TaskTracker.TrackActiveTask(result, 2);
builder.runnerPromise = result; // set runner before copied. builder.runnerPromise = result; // set runner before copied.
result.stateMachine = stateMachine; // copy struct StateMachine(in release build). result.stateMachine = stateMachine; // copy struct StateMachine(in release build).
} }
static int cacheLock;
static int size;
static MoveNextRunnerPromise<TStateMachine, T> nodeRoot;
public MoveNextRunnerPromise<TStateMachine, T> NextNode { get; set; }
static MoveNextRunnerPromise()
{
StackNodeMonitor.RegisterSizeGettter(typeof(MoveNextRunnerPromise<TStateMachine, T>), () => size);
}
bool TryReturn()
{
TaskTracker.RemoveTracking(this);
core.Reset();
stateMachine = default;
return StackNodeHelper.TryPush(ref cacheLock, ref size, ref nodeRoot, this);
}
[DebuggerHidden] [DebuggerHidden]
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
void Run() void Run()
@ -245,12 +268,11 @@ namespace Cysharp.Threading.Tasks.CompilerServices
{ {
try try
{ {
TaskTracker.RemoveTracking(this);
return core.GetResult(token); return core.GetResult(token);
} }
finally finally
{ {
pool.TryReturn(this); TryReturn();
} }
} }
@ -278,16 +300,9 @@ namespace Cysharp.Threading.Tasks.CompilerServices
core.OnCompleted(continuation, state, token); core.OnCompleted(continuation, state, token);
} }
[DebuggerHidden]
void IPromisePoolItem.Reset()
{
stateMachine = default;
core.Reset();
}
~MoveNextRunnerPromise() ~MoveNextRunnerPromise()
{ {
if (pool.TryReturn(this)) if (TryReturn())
{ {
GC.ReRegisterForFinalize(this); GC.ReRegisterForFinalize(this);
} }

View File

@ -0,0 +1,110 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
namespace Cysharp.Threading.Tasks.Internal
{
// internaly used but public, allow to user create custom operator with pooling.
public interface IStackNode<T>
{
T NextNode { get; set; }
}
public static class StackNodeHelper
{
static int MaxPoolSize;
static StackNodeHelper()
{
try
{
var value = Environment.GetEnvironmentVariable("UNITASK_MAX_POOLSIZE");
if (value != null)
{
if (int.TryParse(value, out var size))
{
MaxPoolSize = size;
return;
}
}
}
catch { }
MaxPoolSize = int.MaxValue;
}
public static void SetMaxPoolSize(int maxPoolSize)
{
MaxPoolSize = maxPoolSize;
}
// Strictness as a Stack is not required.
// If there is a conflict, it will go through as is.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool TryPop<T>(ref int stackLock, ref int size, ref T root, out T result)
where T : class, IStackNode<T>
{
if (Interlocked.CompareExchange(ref stackLock, 1, 0) == 0)
{
var v = root;
if (!(v is null))
{
root = v.NextNode;
v.NextNode = null;
size--;
result = v;
Volatile.Write(ref stackLock, 0);
return true;
}
Volatile.Write(ref stackLock, 0);
}
result = default;
return false;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool TryPush<T>(ref int stackLock, ref int size, ref T root, T item)
where T : class, IStackNode<T>
{
if (Interlocked.CompareExchange(ref stackLock, 1, 0) == 0)
{
if (size < MaxPoolSize)
{
item.NextNode = root;
root = item;
size++;
Volatile.Write(ref stackLock, 0);
return true;
}
else
{
Volatile.Write(ref stackLock, 0);
}
}
return false;
}
}
public static class StackNodeMonitor
{
static ConcurrentDictionary<Type, Func<int>> sizes = new ConcurrentDictionary<Type, Func<int>>();
public static IEnumerable<(Type, int)> GetCacheSizeInfo()
{
foreach (var item in sizes)
{
yield return (item.Key, item.Value());
}
}
public static void RegisterSizeGettter(Type type, Func<int> getSize)
{
sizes[type] = getSize;
}
}
}

View File

@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 19f4e6575150765449cc99f25f06f25f
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant: