MCP server: hardened startup + telemetry queue

- Logging to stderr with force; quiet httpx/urllib3
- Async lifespan fix; defer telemetry in first second
- Bounded telemetry queue with single worker
- Reduce initial Unity connect timeout to 1s
- Keep server_version in file
main
David Sarno 2025-09-09 12:14:00 -07:00
parent ba45051a40
commit bd55a56d1c
4 changed files with 84 additions and 47 deletions

View File

@ -15,7 +15,7 @@ class ServerConfig:
mcp_port: int = 6500 mcp_port: int = 6500
# Connection settings # Connection settings
connection_timeout: float = 60.0 # default steady-state timeout; retries use shorter timeouts connection_timeout: float = 1.0 # short initial timeout; retries use shorter timeouts
buffer_size: int = 16 * 1024 * 1024 # 16MB buffer buffer_size: int = 16 * 1024 * 1024 # 16MB buffer
# Framed receive behavior # Framed receive behavior
framed_receive_timeout: float = 2.0 # max seconds to wait while consuming heartbeats only framed_receive_timeout: float = 2.0 # max seconds to wait while consuming heartbeats only

View File

@ -1,20 +1,31 @@
from mcp.server.fastmcp import FastMCP, Context, Image from mcp.server.fastmcp import FastMCP, Context, Image
import logging import logging
import os
from dataclasses import dataclass from dataclasses import dataclass
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any, List from typing import AsyncIterator, Dict, Any, List
from config import config from config import config
from tools import register_all_tools from tools import register_all_tools
from unity_connection import get_unity_connection, UnityConnection from unity_connection import get_unity_connection, UnityConnection
from telemetry import record_telemetry, record_milestone, RecordType, MilestoneType
import time import time
# Configure logging using settings from config # Configure logging using settings from config
logging.basicConfig( logging.basicConfig(
level=getattr(logging, config.log_level), level=getattr(logging, config.log_level),
format=config.log_format format=config.log_format,
stream=None, # None -> defaults to sys.stderr; avoid stdout used by MCP stdio
force=True # Ensure our handler replaces any prior stdout handlers
) )
logger = logging.getLogger("mcp-for-unity-server") logger = logging.getLogger("mcp-for-unity-server")
# Quieten noisy third-party loggers to avoid clutter during stdio handshake
for noisy in ("httpx", "urllib3"):
try:
logging.getLogger(noisy).setLevel(max(logging.WARNING, getattr(logging, config.log_level)))
except Exception:
pass
# Import telemetry only after logging is configured to ensure its logs use stderr and proper levels
from telemetry import record_telemetry, record_milestone, RecordType, MilestoneType
# Global connection state # Global connection state
_unity_connection: UnityConnection = None _unity_connection: UnityConnection = None
@ -34,42 +45,52 @@ async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
server_version = ver_path.read_text(encoding="utf-8").strip() server_version = ver_path.read_text(encoding="utf-8").strip()
except Exception: except Exception:
server_version = "unknown" server_version = "unknown"
record_telemetry(RecordType.STARTUP, { # Defer telemetry for first second to avoid interfering with stdio handshake
"server_version": server_version, if (time.perf_counter() - start_clk) > 1.0:
"startup_time": start_time record_telemetry(RecordType.STARTUP, {
}) "server_version": server_version,
"startup_time": start_time
})
# Record first startup milestone # Record first startup milestone
record_milestone(MilestoneType.FIRST_STARTUP) if (time.perf_counter() - start_clk) > 1.0:
record_milestone(MilestoneType.FIRST_STARTUP)
try: try:
_unity_connection = get_unity_connection() skip_connect = os.environ.get("UNITY_MCP_SKIP_STARTUP_CONNECT", "").lower() in ("1", "true", "yes", "on")
logger.info("Connected to Unity on startup") if skip_connect:
logger.info("Skipping Unity connection on startup (UNITY_MCP_SKIP_STARTUP_CONNECT=1)")
# Record successful Unity connection else:
record_telemetry(RecordType.UNITY_CONNECTION, { _unity_connection = get_unity_connection()
"status": "connected", logger.info("Connected to Unity on startup")
"connection_time_ms": (time.time() - start_time) * 1000
}) # Record successful Unity connection
if (time.perf_counter() - start_clk) > 1.0:
record_telemetry(RecordType.UNITY_CONNECTION, {
"status": "connected",
"connection_time_ms": (time.time() - start_time) * 1000
})
except ConnectionError as e: except ConnectionError as e:
logger.warning("Could not connect to Unity on startup: %s", e) logger.warning("Could not connect to Unity on startup: %s", e)
_unity_connection = None _unity_connection = None
# Record connection failure # Record connection failure
record_telemetry(RecordType.UNITY_CONNECTION, { if (time.perf_counter() - start_clk) > 1.0:
"status": "failed", record_telemetry(RecordType.UNITY_CONNECTION, {
"error": str(e)[:200], "status": "failed",
"connection_time_ms": (time.perf_counter() - start_clk) * 1000 "error": str(e)[:200],
}) "connection_time_ms": (time.perf_counter() - start_clk) * 1000
})
except Exception as e: except Exception as e:
logger.warning("Unexpected error connecting to Unity on startup: %s", e) logger.warning("Unexpected error connecting to Unity on startup: %s", e)
_unity_connection = None _unity_connection = None
record_telemetry(RecordType.UNITY_CONNECTION, { if (time.perf_counter() - start_clk) > 1.0:
"status": "failed", record_telemetry(RecordType.UNITY_CONNECTION, {
"error": str(e)[:200], "status": "failed",
"connection_time_ms": (time.perf_counter() - start_clk) * 1000 "error": str(e)[:200],
}) "connection_time_ms": (time.perf_counter() - start_clk) * 1000
})
try: try:
# Yield the connection object so it can be attached to the context # Yield the connection object so it can be attached to the context
@ -97,18 +118,18 @@ register_all_tools(mcp)
def asset_creation_strategy() -> str: def asset_creation_strategy() -> str:
"""Guide for discovering and using MCP for Unity tools effectively.""" """Guide for discovering and using MCP for Unity tools effectively."""
return ( return (
"Available MCP for Unity Server Tools:\\n\\n" "Available MCP for Unity Server Tools:\n\n"
"- `manage_editor`: Controls editor state and queries info.\\n" "- `manage_editor`: Controls editor state and queries info.\n"
"- `execute_menu_item`: Executes Unity Editor menu items by path.\\n" "- `execute_menu_item`: Executes Unity Editor menu items by path.\n"
"- `read_console`: Reads or clears Unity console messages, with filtering options.\\n" "- `read_console`: Reads or clears Unity console messages, with filtering options.\n"
"- `manage_scene`: Manages scenes.\\n" "- `manage_scene`: Manages scenes.\n"
"- `manage_gameobject`: Manages GameObjects in the scene.\\n" "- `manage_gameobject`: Manages GameObjects in the scene.\n"
"- `manage_script`: Manages C# script files.\\n" "- `manage_script`: Manages C# script files.\n"
"- `manage_asset`: Manages prefabs and assets.\\n" "- `manage_asset`: Manages prefabs and assets.\n"
"- `manage_shader`: Manages shaders.\\n\\n" "- `manage_shader`: Manages shaders.\n\n"
"Tips:\\n" "Tips:\n"
"- Create prefabs for reusable GameObjects.\\n" "- Create prefabs for reusable GameObjects.\n"
"- Always include a camera and main light in your scenes.\\n" "- Always include a camera and main light in your scenes.\n"
) )
# Run the server # Run the server

View File

@ -1 +1 @@
3.3.0 3.3.1

View File

@ -18,6 +18,8 @@ from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
from pathlib import Path from pathlib import Path
import importlib import importlib
import queue
import contextlib
try: try:
import httpx import httpx
@ -158,6 +160,10 @@ class TelemetryCollector:
self._customer_uuid: Optional[str] = None self._customer_uuid: Optional[str] = None
self._milestones: Dict[str, Dict[str, Any]] = {} self._milestones: Dict[str, Dict[str, Any]] = {}
self._lock: threading.Lock = threading.Lock() self._lock: threading.Lock = threading.Lock()
# Bounded queue with single background worker to avoid spawning a thread per event
self._queue: "queue.Queue[tuple[contextvars.Context, TelemetryRecord]]" = queue.Queue(maxsize=1000)
self._worker: threading.Thread = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start()
self._load_persistent_data() self._load_persistent_data()
def _load_persistent_data(self): def _load_persistent_data(self):
@ -242,14 +248,24 @@ class TelemetryCollector:
data=data, data=data,
milestone=milestone milestone=milestone
) )
# Enqueue for background worker (non-blocking). Drop on backpressure.
# Send in background thread to avoid blocking
current_context = contextvars.copy_context() current_context = contextvars.copy_context()
thread = threading.Thread( try:
target=lambda: current_context.run(self._send_telemetry, record), self._queue.put_nowait((current_context, record))
daemon=True except queue.Full:
) logger.debug("Telemetry queue full; dropping %s", record.record_type)
thread.start()
def _worker_loop(self):
"""Background worker that serializes telemetry sends."""
while True:
ctx, rec = self._queue.get()
try:
ctx.run(self._send_telemetry, rec)
except Exception:
logger.debug("Telemetry worker send failed", exc_info=True)
finally:
with contextlib.suppress(Exception):
self._queue.task_done()
def _send_telemetry(self, record: TelemetryRecord): def _send_telemetry(self, record: TelemetryRecord):
"""Send telemetry data to endpoint""" """Send telemetry data to endpoint"""