Fix: HTTP/Stdio transport routing and middleware session persistence (#422)

* Fix: HTTP/Stdio transport routing and middleware session persistence

- Ensure session persistence for stdio transport by using stable client_id/global fallback.

- Fix Nagle algorithm latency issues by setting TCP_NODELAY.

- Cleanup duplicate logging and imports.

- Resolve C# NullReferenceException in EditorWindow health check.

* Fix: thread-safe middleware singleton and re-enable repo stats schedule
main
dsarno 2025-12-03 17:15:42 -08:00 committed by GitHub
parent eb9327c06a
commit d93e437014
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 151 additions and 45 deletions

View File

@ -214,11 +214,21 @@ namespace MCPForUnity.Editor.Windows
{ {
EditorApplication.delayCall += async () => EditorApplication.delayCall += async () =>
{ {
// Ensure window and components are still valid before execution
if (this == null || connectionSection == null) if (this == null || connectionSection == null)
{ {
return; return;
} }
await connectionSection.VerifyBridgeConnectionAsync();
try
{
await connectionSection.VerifyBridgeConnectionAsync();
}
catch (Exception ex)
{
// Log but don't crash if verification fails during cleanup
McpLog.Warn($"Health check verification failed: {ex.Message}");
}
}; };
} }
} }

View File

@ -22,7 +22,10 @@ from services.resources import register_all_resources
from core.telemetry import record_milestone, record_telemetry, MilestoneType, RecordType, get_package_version from core.telemetry import record_milestone, record_telemetry, MilestoneType, RecordType, get_package_version
from services.tools import register_all_tools from services.tools import register_all_tools
from transport.legacy.unity_connection import get_unity_connection_pool, UnityConnectionPool from transport.legacy.unity_connection import get_unity_connection_pool, UnityConnectionPool
from transport.unity_instance_middleware import UnityInstanceMiddleware, set_unity_instance_middleware from transport.unity_instance_middleware import (
UnityInstanceMiddleware,
get_unity_instance_middleware
)
# Configure logging using settings from config # Configure logging using settings from config
logging.basicConfig( logging.basicConfig(
@ -44,22 +47,25 @@ try:
_fh.setFormatter(logging.Formatter(config.log_format)) _fh.setFormatter(logging.Formatter(config.log_format))
_fh.setLevel(getattr(logging, config.log_level)) _fh.setLevel(getattr(logging, config.log_level))
logger.addHandler(_fh) logger.addHandler(_fh)
logger.propagate = False # Prevent double logging to root logger
# Also route telemetry logger to the same rotating file and normal level # Also route telemetry logger to the same rotating file and normal level
try: try:
tlog = logging.getLogger("unity-mcp-telemetry") tlog = logging.getLogger("unity-mcp-telemetry")
tlog.setLevel(getattr(logging, config.log_level)) tlog.setLevel(getattr(logging, config.log_level))
tlog.addHandler(_fh) tlog.addHandler(_fh)
except Exception: tlog.propagate = False # Prevent double logging for telemetry too
except Exception as exc:
# Never let logging setup break startup # Never let logging setup break startup
pass logger.debug("Failed to configure telemetry logger", exc_info=exc)
except Exception: except Exception as exc:
# Never let logging setup break startup # Never let logging setup break startup
pass logger.debug("Failed to configure main logger file handler", exc_info=exc)
# Quieten noisy third-party loggers to avoid clutter during stdio handshake # Quieten noisy third-party loggers to avoid clutter during stdio handshake
for noisy in ("httpx", "urllib3"): for noisy in ("httpx", "urllib3", "mcp.server.lowlevel.server"):
try: try:
logging.getLogger(noisy).setLevel( logging.getLogger(noisy).setLevel(
max(logging.WARNING, getattr(logging, config.log_level))) max(logging.WARNING, getattr(logging, config.log_level)))
logging.getLogger(noisy).propagate = False
except Exception: except Exception:
pass pass
@ -258,8 +264,8 @@ async def plugin_sessions_route(_: Request) -> JSONResponse:
# Initialize and register middleware for session-based Unity instance routing # Initialize and register middleware for session-based Unity instance routing
unity_middleware = UnityInstanceMiddleware() # Using the singleton getter ensures we use the same instance everywhere
set_unity_instance_middleware(unity_middleware) unity_middleware = get_unity_instance_middleware()
mcp.add_middleware(unity_middleware) mcp.add_middleware(unity_middleware)
logger.info("Registered Unity instance middleware for session-based routing") logger.info("Registered Unity instance middleware for session-based routing")

View File

@ -7,7 +7,7 @@ from fastmcp import Context
from services.registry import mcp_for_unity_resource from services.registry import mcp_for_unity_resource
from transport.legacy.unity_connection import get_unity_connection_pool from transport.legacy.unity_connection import get_unity_connection_pool
from transport.plugin_hub import PluginHub from transport.plugin_hub import PluginHub
from transport.unity_transport import _is_http_transport from transport.unity_transport import _current_transport
@mcp_for_unity_resource( @mcp_for_unity_resource(
@ -36,7 +36,8 @@ async def unity_instances(ctx: Context) -> dict[str, Any]:
await ctx.info("Listing Unity instances") await ctx.info("Listing Unity instances")
try: try:
if _is_http_transport(): transport = _current_transport()
if transport == "http":
# HTTP/WebSocket transport: query PluginHub # HTTP/WebSocket transport: query PluginHub
sessions_data = await PluginHub.get_sessions() sessions_data = await PluginHub.get_sessions()
sessions = sessions_data.sessions sessions = sessions_data.sessions
@ -71,7 +72,7 @@ async def unity_instances(ctx: Context) -> dict[str, Any]:
result = { result = {
"success": True, "success": True,
"transport": "http", "transport": transport,
"instance_count": len(instances), "instance_count": len(instances),
"instances": instances, "instances": instances,
} }
@ -98,7 +99,7 @@ async def unity_instances(ctx: Context) -> dict[str, Any]:
result = { result = {
"success": True, "success": True,
"transport": "stdio", "transport": transport,
"instance_count": len(instances), "instance_count": len(instances),
"instances": [inst.to_dict() for inst in instances], "instances": [inst.to_dict() for inst in instances],
} }

View File

@ -3,6 +3,7 @@ from typing import Any
from fastmcp import Context from fastmcp import Context
from services.registry import mcp_for_unity_tool from services.registry import mcp_for_unity_tool
from transport.unity_instance_middleware import get_unity_instance_middleware from transport.unity_instance_middleware import get_unity_instance_middleware
from transport.plugin_hub import PluginHub
@mcp_for_unity_tool( @mcp_for_unity_tool(
@ -35,8 +36,15 @@ def debug_request_context(ctx: Context) -> dict[str, Any]:
# Get session state info via middleware # Get session state info via middleware
middleware = get_unity_instance_middleware() middleware = get_unity_instance_middleware()
derived_key = middleware._get_session_key(ctx) derived_key = middleware.get_session_key(ctx)
active_instance = middleware.get_active_instance(ctx) active_instance = middleware.get_active_instance(ctx)
# Debugging middleware internals
with middleware._lock:
all_keys = list(middleware._active_by_key.keys())
# Debugging PluginHub state
plugin_hub_configured = PluginHub.is_configured()
return { return {
"success": True, "success": True,
@ -53,6 +61,9 @@ def debug_request_context(ctx: Context) -> dict[str, Any]:
"session_state": { "session_state": {
"derived_key": derived_key, "derived_key": derived_key,
"active_instance": active_instance, "active_instance": active_instance,
"all_keys_in_store": all_keys,
"plugin_hub_configured": plugin_hub_configured,
"middleware_id": id(middleware),
}, },
"available_attributes": ctx_attrs, "available_attributes": ctx_attrs,
}, },

View File

@ -6,7 +6,7 @@ from services.registry import mcp_for_unity_tool
from transport.legacy.unity_connection import get_unity_connection_pool from transport.legacy.unity_connection import get_unity_connection_pool
from transport.unity_instance_middleware import get_unity_instance_middleware from transport.unity_instance_middleware import get_unity_instance_middleware
from transport.plugin_hub import PluginHub from transport.plugin_hub import PluginHub
from transport.unity_transport import _is_http_transport from transport.unity_transport import _current_transport
@mcp_for_unity_tool( @mcp_for_unity_tool(
@ -16,8 +16,10 @@ async def set_active_instance(
ctx: Context, ctx: Context,
instance: Annotated[str, "Target instance (Name@hash or hash prefix)"] instance: Annotated[str, "Target instance (Name@hash or hash prefix)"]
) -> dict[str, Any]: ) -> dict[str, Any]:
transport = _current_transport()
# Discover running instances based on transport # Discover running instances based on transport
if _is_http_transport(): if transport == "http":
sessions_data = await PluginHub.get_sessions() sessions_data = await PluginHub.get_sessions()
sessions = sessions_data.sessions sessions = sessions_data.sessions
instances = [] instances = []
@ -42,26 +44,69 @@ async def set_active_instance(
"success": False, "success": False,
"error": "No Unity instances are currently connected. Start Unity and press 'Start Session'." "error": "No Unity instances are currently connected. Start Unity and press 'Start Session'."
} }
ids = {inst.id: inst for inst in instances} ids = {inst.id: inst for inst in instances if getattr(inst, "id", None)}
hashes = {}
for inst in instances:
# exact hash and prefix map; last write wins but we'll detect ambiguity
hashes.setdefault(inst.hash, inst)
# Disallow anything except the explicit Name@hash id to ensure determinism
value = (instance or "").strip() value = (instance or "").strip()
if not value or "@" not in value: if not value:
return { return {
"success": False, "success": False,
"error": "Instance identifier must be Name@hash. " "error": "Instance identifier is required. "
"Use unity://instances to copy the exact id (e.g., MyProject@abcd1234)." "Use unity://instances to copy a Name@hash or provide a hash prefix."
} }
resolved = None resolved = None
resolved = ids.get(value) if "@" in value:
resolved = ids.get(value)
if resolved is None:
return {
"success": False,
"error": f"Instance '{value}' not found. "
"Use unity://instances to copy an exact Name@hash."
}
else:
lookup = value.lower()
matches = []
for inst in instances:
if not getattr(inst, "id", None):
continue
inst_hash = getattr(inst, "hash", "")
if inst_hash and inst_hash.lower().startswith(lookup):
matches.append(inst)
if not matches:
return {
"success": False,
"error": f"Instance hash '{value}' does not match any running Unity editors. "
"Use unity://instances to confirm the available hashes."
}
if len(matches) > 1:
matching_ids = ", ".join(
inst.id for inst in matches if getattr(inst, "id", None)
) or "multiple instances"
return {
"success": False,
"error": f"Instance hash '{value}' is ambiguous ({matching_ids}). "
"Provide the full Name@hash from unity://instances."
}
resolved = matches[0]
if resolved is None: if resolved is None:
return {"success": False, "error": f"Instance '{value}' not found. Use unity://instances to choose a valid Name@hash."} # Should be unreachable due to logic above, but satisfies static analysis
return {
"success": False,
"error": "Internal error: Instance resolution failed."
}
# Store selection in middleware (session-scoped) # Store selection in middleware (session-scoped)
middleware = get_unity_instance_middleware() middleware = get_unity_instance_middleware()
# We use middleware.set_active_instance to persist the selection.
# The session key is an internal detail but useful for debugging response.
middleware.set_active_instance(ctx, resolved.id) middleware.set_active_instance(ctx, resolved.id)
return {"success": True, "message": f"Active instance set to {resolved.id}", "data": {"instance": resolved.id}} session_key = middleware.get_session_key(ctx)
return {
"success": True,
"message": f"Active instance set to {resolved.id}",
"data": {
"instance": resolved.id,
"session_key": session_key,
},
}

View File

@ -60,6 +60,8 @@ class UnityConnection:
# Bounded connect to avoid indefinite blocking # Bounded connect to avoid indefinite blocking
connect_timeout = float( connect_timeout = float(
getattr(config, "connection_timeout", 1.0)) getattr(config, "connection_timeout", 1.0))
# We trust config.unity_host (default 127.0.0.1) but future improvements
# could dynamically prefer 'localhost' depending on OS resolver behavior.
self.sock = socket.create_connection( self.sock = socket.create_connection(
(self.host, self.port), connect_timeout) (self.host, self.port), connect_timeout)
self._prepare_socket(self.sock) self._prepare_socket(self.sock)

View File

@ -5,21 +5,29 @@ This middleware intercepts all tool calls and injects the active Unity instance
into the request-scoped state, allowing tools to access it via ctx.get_state("unity_instance"). into the request-scoped state, allowing tools to access it via ctx.get_state("unity_instance").
""" """
from threading import RLock from threading import RLock
import logging
from fastmcp.server.middleware import Middleware, MiddlewareContext from fastmcp.server.middleware import Middleware, MiddlewareContext
from transport.plugin_hub import PluginHub from transport.plugin_hub import PluginHub
logger = logging.getLogger("mcp-for-unity-server")
# Store a global reference to the middleware instance so tools can interact # Store a global reference to the middleware instance so tools can interact
# with it to set or clear the active unity instance. # with it to set or clear the active unity instance.
_unity_instance_middleware = None _unity_instance_middleware = None
_middleware_lock = RLock()
def get_unity_instance_middleware() -> 'UnityInstanceMiddleware': def get_unity_instance_middleware() -> 'UnityInstanceMiddleware':
"""Get the global Unity instance middleware.""" """Get the global Unity instance middleware."""
global _unity_instance_middleware
if _unity_instance_middleware is None: if _unity_instance_middleware is None:
raise RuntimeError( with _middleware_lock:
"UnityInstanceMiddleware not initialized. Call set_unity_instance_middleware first.") if _unity_instance_middleware is None:
# Auto-initialize if not set (lazy singleton) to handle import order or test cases
_unity_instance_middleware = UnityInstanceMiddleware()
return _unity_instance_middleware return _unity_instance_middleware
@ -42,37 +50,36 @@ class UnityInstanceMiddleware(Middleware):
self._active_by_key: dict[str, str] = {} self._active_by_key: dict[str, str] = {}
self._lock = RLock() self._lock = RLock()
def _get_session_key(self, ctx) -> str: def get_session_key(self, ctx) -> str:
""" """
Derive a stable key for the calling session. Derive a stable key for the calling session.
Uses ctx.session_id if available, falls back to 'global'. Prioritizes client_id for stability.
If client_id is missing, falls back to 'global' (assuming single-user local mode),
ignoring session_id which can be unstable in some transports/clients.
""" """
session_id = getattr(ctx, "session_id", None)
if isinstance(session_id, str) and session_id:
return session_id
client_id = getattr(ctx, "client_id", None) client_id = getattr(ctx, "client_id", None)
if isinstance(client_id, str) and client_id: if isinstance(client_id, str) and client_id:
return client_id return client_id
# Fallback to global for local dev stability
return "global" return "global"
def set_active_instance(self, ctx, instance_id: str) -> None: def set_active_instance(self, ctx, instance_id: str) -> None:
"""Store the active instance for this session.""" """Store the active instance for this session."""
key = self._get_session_key(ctx) key = self.get_session_key(ctx)
with self._lock: with self._lock:
self._active_by_key[key] = instance_id self._active_by_key[key] = instance_id
def get_active_instance(self, ctx) -> str | None: def get_active_instance(self, ctx) -> str | None:
"""Retrieve the active instance for this session.""" """Retrieve the active instance for this session."""
key = self._get_session_key(ctx) key = self.get_session_key(ctx)
with self._lock: with self._lock:
return self._active_by_key.get(key) return self._active_by_key.get(key)
def clear_active_instance(self, ctx) -> None: def clear_active_instance(self, ctx) -> None:
"""Clear the stored instance for this session.""" """Clear the stored instance for this session."""
key = self._get_session_key(ctx) key = self.get_session_key(ctx)
with self._lock: with self._lock:
self._active_by_key.pop(key, None) self._active_by_key.pop(key, None)
@ -82,13 +89,32 @@ class UnityInstanceMiddleware(Middleware):
active_instance = self.get_active_instance(ctx) active_instance = self.get_active_instance(ctx)
if active_instance: if active_instance:
# If using HTTP transport (PluginHub configured), validate session
# But for stdio transport (no PluginHub needed or maybe partially configured),
# we should be careful not to clear instance just because PluginHub can't resolve it.
# The 'active_instance' (Name@hash) might be valid for stdio even if PluginHub fails.
session_id: str | None = None session_id: str | None = None
# Only validate via PluginHub if we are actually using HTTP transport
# OR if we want to support hybrid mode. For now, let's be permissive.
if PluginHub.is_configured(): if PluginHub.is_configured():
try: try:
# resolving session_id might fail if the plugin disconnected
# We only need session_id for HTTP transport routing.
# For stdio, we just need the instance ID.
session_id = await PluginHub._resolve_session_id(active_instance) session_id = await PluginHub._resolve_session_id(active_instance)
except Exception: except Exception as exc:
self.clear_active_instance(ctx) # If resolution fails, it means the Unity instance is not reachable via HTTP/WS.
return await call_next(context) # If we are in stdio mode, this might still be fine if the user is just setting state?
# But usually if PluginHub is configured, we expect it to work.
# Let's LOG the error but NOT clear the instance immediately to avoid flickering,
# or at least debug why it's failing.
logger.debug(
"PluginHub session resolution failed for %s: %s; leaving active_instance unchanged",
active_instance,
exc,
exc_info=True,
)
ctx.set_state("unity_instance", active_instance) ctx.set_state("unity_instance", active_instance)
if session_id is not None: if session_id is not None:

View File

@ -19,6 +19,11 @@ def _is_http_transport() -> bool:
return os.environ.get("UNITY_MCP_TRANSPORT", "stdio").lower() == "http" return os.environ.get("UNITY_MCP_TRANSPORT", "stdio").lower() == "http"
def _current_transport() -> str:
"""Expose the active transport mode as a simple string identifier."""
return "http" if _is_http_transport() else "stdio"
def with_unity_instance( def with_unity_instance(
log: str | Callable[[Context, tuple, dict, str | None], str] | None = None, log: str | Callable[[Context, tuple, dict, str | None], str] | None = None,
*, *,

View File

@ -1,5 +1,5 @@
version = 1 version = 1
revision = 2 revision = 3
requires-python = ">=3.10" requires-python = ">=3.10"
[[package]] [[package]]
@ -694,7 +694,7 @@ wheels = [
[[package]] [[package]]
name = "mcpforunityserver" name = "mcpforunityserver"
version = "7.0.0" version = "8.1.4"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "fastapi" }, { name = "fastapi" },
@ -715,7 +715,7 @@ dev = [
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "fastapi", specifier = ">=0.104.0" }, { name = "fastapi", specifier = ">=0.104.0" },
{ name = "fastmcp", specifier = ">=2.13.0" }, { name = "fastmcp", specifier = ">=2.13.0,<2.13.2" },
{ name = "httpx", specifier = ">=0.27.2" }, { name = "httpx", specifier = ">=0.27.2" },
{ name = "mcp", specifier = ">=1.16.0" }, { name = "mcp", specifier = ">=1.16.0" },
{ name = "pydantic", specifier = ">=2.12.0" }, { name = "pydantic", specifier = ">=2.12.0" },