from telemetry import record_telemetry, record_milestone, RecordType, MilestoneType from fastmcp import FastMCP import logging from logging.handlers import RotatingFileHandler import os from contextlib import asynccontextmanager import sys import signal import threading from typing import AsyncIterator, Dict, Any from config import config from tools import register_all_tools from resources import register_all_resources from unity_connection import get_unity_connection, UnityConnection import time # Configure logging using settings from config logging.basicConfig( level=getattr(logging, config.log_level), format=config.log_format, stream=None, # None -> defaults to sys.stderr; avoid stdout used by MCP stdio force=True # Ensure our handler replaces any prior stdout handlers ) logger = logging.getLogger("mcp-for-unity-server") # Also write logs to a rotating file so logs are available when launched via stdio try: import os as _os _log_dir = _os.path.join(_os.path.expanduser( "~/Library/Application Support/UnityMCP"), "Logs") _os.makedirs(_log_dir, exist_ok=True) _file_path = _os.path.join(_log_dir, "unity_mcp_server.log") _fh = RotatingFileHandler( _file_path, maxBytes=512*1024, backupCount=2, encoding="utf-8") _fh.setFormatter(logging.Formatter(config.log_format)) _fh.setLevel(getattr(logging, config.log_level)) logger.addHandler(_fh) # Also route telemetry logger to the same rotating file and normal level try: tlog = logging.getLogger("unity-mcp-telemetry") tlog.setLevel(getattr(logging, config.log_level)) tlog.addHandler(_fh) except Exception: # Never let logging setup break startup pass except Exception: # Never let logging setup break startup pass # Quieten noisy third-party loggers to avoid clutter during stdio handshake for noisy in ("httpx", "urllib3"): try: logging.getLogger(noisy).setLevel( max(logging.WARNING, getattr(logging, config.log_level))) except Exception: pass # Import telemetry only after logging is configured to ensure its logs use stderr and proper levels # Ensure a slightly higher telemetry timeout unless explicitly overridden by env try: # Ensure generous timeout unless explicitly overridden by env if not os.environ.get("UNITY_MCP_TELEMETRY_TIMEOUT"): os.environ["UNITY_MCP_TELEMETRY_TIMEOUT"] = "5.0" except Exception: pass # Global connection state _unity_connection: UnityConnection = None # Global shutdown coordination _shutdown_flag = threading.Event() _exit_timer_scheduled = threading.Event() @asynccontextmanager async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]: """Handle server startup and shutdown.""" global _unity_connection logger.info("MCP for Unity Server starting up") # Record server startup telemetry start_time = time.time() start_clk = time.perf_counter() try: from pathlib import Path ver_path = Path(__file__).parent / "server_version.txt" server_version = ver_path.read_text(encoding="utf-8").strip() except Exception: server_version = "unknown" # Defer initial telemetry by 1s to avoid stdio handshake interference import threading def _emit_startup(): try: record_telemetry(RecordType.STARTUP, { "server_version": server_version, "startup_time": start_time, }) record_milestone(MilestoneType.FIRST_STARTUP) except Exception: logger.debug("Deferred startup telemetry failed", exc_info=True) threading.Timer(1.0, _emit_startup).start() try: skip_connect = os.environ.get( "UNITY_MCP_SKIP_STARTUP_CONNECT", "").lower() in ("1", "true", "yes", "on") if skip_connect: logger.info( "Skipping Unity connection on startup (UNITY_MCP_SKIP_STARTUP_CONNECT=1)") else: _unity_connection = get_unity_connection() logger.info("Connected to Unity on startup") # Record successful Unity connection (deferred) import threading as _t _t.Timer(1.0, lambda: record_telemetry( RecordType.UNITY_CONNECTION, { "status": "connected", "connection_time_ms": (time.perf_counter() - start_clk) * 1000, } )).start() except ConnectionError as e: logger.warning("Could not connect to Unity on startup: %s", e) _unity_connection = None # Record connection failure (deferred) import threading as _t _err_msg = str(e)[:200] _t.Timer(1.0, lambda: record_telemetry( RecordType.UNITY_CONNECTION, { "status": "failed", "error": _err_msg, "connection_time_ms": (time.perf_counter() - start_clk) * 1000, } )).start() except Exception as e: logger.warning( "Unexpected error connecting to Unity on startup: %s", e) _unity_connection = None import threading as _t _err_msg = str(e)[:200] _t.Timer(1.0, lambda: record_telemetry( RecordType.UNITY_CONNECTION, { "status": "failed", "error": _err_msg, "connection_time_ms": (time.perf_counter() - start_clk) * 1000, } )).start() try: # Yield the connection object so it can be attached to the context # The key 'bridge' matches how tools like read_console expect to access it (ctx.bridge) yield {"bridge": _unity_connection} finally: if _unity_connection: _unity_connection.disconnect() _unity_connection = None logger.info("MCP for Unity Server shut down") # Initialize MCP server mcp = FastMCP( name="mcp-for-unity-server", lifespan=server_lifespan, instructions=""" This server provides tools to interact with the Unity Game Engine Editor.\n\n Available tools:\n - `manage_editor`: Controls editor state and queries info.\n - `execute_menu_item`: Executes, lists and checks for the existence of Unity Editor menu items.\n - `read_console`: Reads or clears Unity console messages, with filtering options.\n - `manage_scene`: Manages scenes.\n - `manage_gameobject`: Manages GameObjects in the scene.\n - `manage_script`: Manages C# script files.\n - `manage_asset`: Manages prefabs and assets.\n - `manage_shader`: Manages shaders.\n\n - Tips:\n - Create prefabs for reusable GameObjects.\n - Always include a camera and main light in your scenes.\n - Unless specified otherwise, paths are relative to the project's `Assets/` folder.\n - After creating or modifying scripts with `manage_script`, allow Unity to recompile; use `read_console` to check for compile errors.\n - Use `execute_menu_item` for interacting with Unity systems and third party tools like a user would.\n """ ) # Register all tools register_all_tools(mcp) # Register all resources register_all_resources(mcp) def _force_exit(code: int = 0): """Force process exit, bypassing any background threads that might linger.""" os._exit(code) def _signal_handler(signum, frame): logger.info(f"Received signal {signum}, initiating shutdown...") _shutdown_flag.set() if not _exit_timer_scheduled.is_set(): _exit_timer_scheduled.set() threading.Timer(1.0, _force_exit, args=(0,)).start() def _monitor_stdin(): """Background thread to detect stdio detach (stdin EOF) or parent exit.""" try: parent_pid = os.getppid() if hasattr(os, "getppid") else None while not _shutdown_flag.is_set(): if _shutdown_flag.wait(0.5): break if parent_pid is not None: try: os.kill(parent_pid, 0) except ValueError: # Signal 0 unsupported on this platform (e.g., Windows); disable parent probing parent_pid = None except (ProcessLookupError, OSError): logger.info(f"Parent process {parent_pid} no longer exists; shutting down") break try: if sys.stdin.closed: logger.info("stdin.closed is True; client disconnected") break fd = sys.stdin.fileno() if fd < 0: logger.info("stdin fd invalid; client disconnected") break except (ValueError, OSError, AttributeError): # Closed pipe or unavailable stdin break except Exception: # Ignore transient errors logger.debug("Transient error checking stdin", exc_info=True) if not _shutdown_flag.is_set(): logger.info("Client disconnected (stdin or parent), initiating shutdown...") _shutdown_flag.set() if not _exit_timer_scheduled.is_set(): _exit_timer_scheduled.set() threading.Timer(0.5, _force_exit, args=(0,)).start() except Exception: # Never let monitor thread crash the process logger.debug("Monitor thread error", exc_info=True) def main(): """Entry point for uvx and console scripts.""" try: signal.signal(signal.SIGTERM, _signal_handler) signal.signal(signal.SIGINT, _signal_handler) if hasattr(signal, "SIGPIPE"): signal.signal(signal.SIGPIPE, signal.SIG_IGN) if hasattr(signal, "SIGBREAK"): signal.signal(signal.SIGBREAK, _signal_handler) except Exception: # Signals can fail in some environments pass t = threading.Thread(target=_monitor_stdin, daemon=True) t.start() try: mcp.run(transport='stdio') logger.info("FastMCP run() returned (stdin EOF or disconnect)") except (KeyboardInterrupt, SystemExit): logger.info("Server interrupted; shutting down") _shutdown_flag.set() except BrokenPipeError: logger.info("Broken pipe; shutting down") _shutdown_flag.set() except Exception as e: logger.error(f"Server error: {e}", exc_info=True) _shutdown_flag.set() _force_exit(1) finally: _shutdown_flag.set() logger.info("Server main loop exited") if not _exit_timer_scheduled.is_set(): _exit_timer_scheduled.set() threading.Timer(0.5, _force_exit, args=(0,)).start() # Run the server if __name__ == "__main__": main()