161 lines
5.7 KiB
Python
161 lines
5.7 KiB
Python
"""
|
|
Port discovery utility for MCP for Unity Server.
|
|
|
|
What changed and why:
|
|
- Unity now writes a per-project port file named like
|
|
`~/.unity-mcp/unity-mcp-port-<hash>.json` to avoid projects overwriting
|
|
each other's saved port. The legacy file `unity-mcp-port.json` may still
|
|
exist.
|
|
- This module now scans for both patterns, prefers the most recently
|
|
modified file, and verifies that the port is actually a MCP for Unity listener
|
|
(quick socket connect + ping) before choosing it.
|
|
"""
|
|
|
|
import glob
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
import socket
|
|
from typing import Optional, List
|
|
|
|
logger = logging.getLogger("mcp-for-unity-server")
|
|
|
|
|
|
class PortDiscovery:
|
|
"""Handles port discovery from Unity Bridge registry"""
|
|
REGISTRY_FILE = "unity-mcp-port.json" # legacy single-project file
|
|
DEFAULT_PORT = 6400
|
|
CONNECT_TIMEOUT = 0.3 # seconds, keep this snappy during discovery
|
|
|
|
@staticmethod
|
|
def get_registry_path() -> Path:
|
|
"""Get the path to the port registry file"""
|
|
return Path.home() / ".unity-mcp" / PortDiscovery.REGISTRY_FILE
|
|
|
|
@staticmethod
|
|
def get_registry_dir() -> Path:
|
|
return Path.home() / ".unity-mcp"
|
|
|
|
@staticmethod
|
|
def list_candidate_files() -> List[Path]:
|
|
"""Return candidate registry files, newest first.
|
|
Includes hashed per-project files and the legacy file (if present).
|
|
"""
|
|
base = PortDiscovery.get_registry_dir()
|
|
hashed = sorted(
|
|
(Path(p) for p in glob.glob(str(base / "unity-mcp-port-*.json"))),
|
|
key=lambda p: p.stat().st_mtime,
|
|
reverse=True,
|
|
)
|
|
legacy = PortDiscovery.get_registry_path()
|
|
if legacy.exists():
|
|
# Put legacy at the end so hashed, per-project files win
|
|
hashed.append(legacy)
|
|
return hashed
|
|
|
|
@staticmethod
|
|
def _try_probe_unity_mcp(port: int) -> bool:
|
|
"""Quickly check if a MCP for Unity listener is on this port.
|
|
Tries a short TCP connect, sends 'ping', expects Unity bridge welcome message.
|
|
"""
|
|
try:
|
|
with socket.create_connection(("127.0.0.1", port), PortDiscovery.CONNECT_TIMEOUT) as s:
|
|
s.settimeout(PortDiscovery.CONNECT_TIMEOUT)
|
|
try:
|
|
s.sendall(b"ping")
|
|
data = s.recv(512)
|
|
# Check for Unity bridge welcome message format
|
|
if data and (b"WELCOME UNITY-MCP" in data or b'"message":"pong"' in data):
|
|
return True
|
|
except Exception:
|
|
return False
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
@staticmethod
|
|
def _read_latest_status() -> Optional[dict]:
|
|
try:
|
|
base = PortDiscovery.get_registry_dir()
|
|
status_files = sorted(
|
|
(Path(p)
|
|
for p in glob.glob(str(base / "unity-mcp-status-*.json"))),
|
|
key=lambda p: p.stat().st_mtime,
|
|
reverse=True,
|
|
)
|
|
if not status_files:
|
|
return None
|
|
with status_files[0].open('r') as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def discover_unity_port() -> int:
|
|
"""
|
|
Discover Unity port by scanning per-project and legacy registry files.
|
|
Prefer the newest file whose port responds; fall back to first parsed
|
|
value; finally default to 6400.
|
|
|
|
Returns:
|
|
Port number to connect to
|
|
"""
|
|
# Prefer the latest heartbeat status if it points to a responsive port
|
|
status = PortDiscovery._read_latest_status()
|
|
if status:
|
|
port = status.get('unity_port')
|
|
if isinstance(port, int) and PortDiscovery._try_probe_unity_mcp(port):
|
|
logger.info(f"Using Unity port from status: {port}")
|
|
return port
|
|
|
|
candidates = PortDiscovery.list_candidate_files()
|
|
|
|
first_seen_port: Optional[int] = None
|
|
|
|
for path in candidates:
|
|
try:
|
|
with open(path, 'r') as f:
|
|
cfg = json.load(f)
|
|
unity_port = cfg.get('unity_port')
|
|
if isinstance(unity_port, int):
|
|
if first_seen_port is None:
|
|
first_seen_port = unity_port
|
|
if PortDiscovery._try_probe_unity_mcp(unity_port):
|
|
logger.info(
|
|
f"Using Unity port from {path.name}: {unity_port}")
|
|
return unity_port
|
|
except Exception as e:
|
|
logger.warning(f"Could not read port registry {path}: {e}")
|
|
|
|
if first_seen_port is not None:
|
|
logger.info(
|
|
f"No responsive port found; using first seen value {first_seen_port}")
|
|
return first_seen_port
|
|
|
|
# Fallback to default port
|
|
logger.info(
|
|
f"No port registry found; using default port {PortDiscovery.DEFAULT_PORT}")
|
|
return PortDiscovery.DEFAULT_PORT
|
|
|
|
@staticmethod
|
|
def get_port_config() -> Optional[dict]:
|
|
"""
|
|
Get the most relevant port configuration from registry.
|
|
Returns the most recent hashed file's config if present,
|
|
otherwise the legacy file's config. Returns None if nothing exists.
|
|
|
|
Returns:
|
|
Port configuration dict or None if not found
|
|
"""
|
|
candidates = PortDiscovery.list_candidate_files()
|
|
if not candidates:
|
|
return None
|
|
for path in candidates:
|
|
try:
|
|
with open(path, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Could not read port configuration {path}: {e}")
|
|
return None
|