feat(bridge): embed Python server into package and remove Git-based installer

- Switch ServerInstaller to embedded copy-only (no network)
- Simplify Editor UI server status to 'Installed (Embedded)'
- Vendor UnityMcpServer/src into UnityMcpBridge/UnityMcpServer/src for UPM distribution
- Keep bridge recompile robustness (heartbeat + sticky port)
main
David Sarno 2025-08-08 08:08:30 -07:00
parent a0fd9199bb
commit a65f10383a
44 changed files with 1985 additions and 260 deletions

View File

@ -1,8 +1,7 @@
using System; using System;
using System.IO; using System.IO;
using System.Linq;
using System.Net;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using UnityEditor;
using UnityEngine; using UnityEngine;
namespace UnityMcpBridge.Editor.Helpers namespace UnityMcpBridge.Editor.Helpers
@ -11,37 +10,34 @@ namespace UnityMcpBridge.Editor.Helpers
{ {
private const string RootFolder = "UnityMCP"; private const string RootFolder = "UnityMCP";
private const string ServerFolder = "UnityMcpServer"; private const string ServerFolder = "UnityMcpServer";
private const string BranchName = "master";
private const string GitUrl = "https://github.com/justinpbarnett/unity-mcp.git";
private const string PyprojectUrl =
"https://raw.githubusercontent.com/justinpbarnett/unity-mcp/refs/heads/"
+ BranchName
+ "/UnityMcpServer/src/pyproject.toml";
/// <summary> /// <summary>
/// Ensures the unity-mcp-server is installed and up to date. /// Ensures the unity-mcp-server is installed locally by copying from the embedded package source.
/// No network calls or Git operations are performed.
/// </summary> /// </summary>
public static void EnsureServerInstalled() public static void EnsureServerInstalled()
{ {
try try
{ {
string saveLocation = GetSaveLocation(); string saveLocation = GetSaveLocation();
string destRoot = Path.Combine(saveLocation, ServerFolder);
string destSrc = Path.Combine(destRoot, "src");
if (!IsServerInstalled(saveLocation)) if (File.Exists(Path.Combine(destSrc, "server.py")))
{ {
InstallServer(saveLocation); return; // Already installed
} }
else
{
string installedVersion = GetInstalledVersion();
string latestVersion = GetLatestVersion();
if (IsNewerVersion(latestVersion, installedVersion)) if (!TryGetEmbeddedServerSource(out string embeddedSrc))
{ {
UpdateServer(saveLocation); throw new Exception("Could not find embedded UnityMcpServer/src in the package.");
}
else { }
} }
// Ensure destination exists
Directory.CreateDirectory(destRoot);
// Copy the entire UnityMcpServer folder (parent of src)
string embeddedRoot = Path.GetDirectoryName(embeddedSrc) ?? embeddedSrc; // go up from src to UnityMcpServer
CopyDirectoryRecursive(embeddedRoot, destRoot);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -111,139 +107,110 @@ namespace UnityMcpBridge.Editor.Helpers
private static bool IsServerInstalled(string location) private static bool IsServerInstalled(string location)
{ {
return Directory.Exists(location) return Directory.Exists(location)
&& File.Exists(Path.Combine(location, ServerFolder, "src", "pyproject.toml")); && File.Exists(Path.Combine(location, ServerFolder, "src", "server.py"));
} }
/// <summary> /// <summary>
/// Installs the server by cloning only the UnityMcpServer folder from the repository and setting up dependencies. /// Attempts to locate the embedded UnityMcpServer/src directory inside the installed package
/// or common development locations.
/// </summary> /// </summary>
private static void InstallServer(string location) private static bool TryGetEmbeddedServerSource(out string srcPath)
{ {
// Create the src directory where the server code will reside // 1) Development mode: common repo layouts
Directory.CreateDirectory(location); try
// Initialize git repo in the src directory
RunCommand("git", $"init", workingDirectory: location);
// Add remote
RunCommand("git", $"remote add origin {GitUrl}", workingDirectory: location);
// Configure sparse checkout
RunCommand("git", "config core.sparseCheckout true", workingDirectory: location);
// Set sparse checkout path to only include UnityMcpServer folder
string sparseCheckoutPath = Path.Combine(location, ".git", "info", "sparse-checkout");
File.WriteAllText(sparseCheckoutPath, $"{ServerFolder}/");
// Fetch and checkout the branch
RunCommand("git", $"fetch --depth=1 origin {BranchName}", workingDirectory: location);
RunCommand("git", $"checkout {BranchName}", workingDirectory: location);
}
/// <summary>
/// Fetches the currently installed version from the local pyproject.toml file.
/// </summary>
public static string GetInstalledVersion()
{
string pyprojectPath = Path.Combine(
GetSaveLocation(),
ServerFolder,
"src",
"pyproject.toml"
);
return ParseVersionFromPyproject(File.ReadAllText(pyprojectPath));
}
/// <summary>
/// Fetches the latest version from the GitHub pyproject.toml file.
/// </summary>
public static string GetLatestVersion()
{
using WebClient webClient = new();
string pyprojectContent = webClient.DownloadString(PyprojectUrl);
return ParseVersionFromPyproject(pyprojectContent);
}
/// <summary>
/// Updates the server by pulling the latest changes for the UnityMcpServer folder only.
/// </summary>
private static void UpdateServer(string location)
{
RunCommand("git", $"pull origin {BranchName}", workingDirectory: location);
}
/// <summary>
/// Parses the version number from pyproject.toml content.
/// </summary>
private static string ParseVersionFromPyproject(string content)
{
foreach (string line in content.Split('\n'))
{ {
if (line.Trim().StartsWith("version =")) string projectRoot = Path.GetDirectoryName(Application.dataPath);
string[] devCandidates =
{ {
string[] parts = line.Split('='); Path.Combine(projectRoot ?? string.Empty, "unity-mcp", "UnityMcpServer", "src"),
if (parts.Length == 2) Path.Combine(projectRoot ?? string.Empty, "..", "unity-mcp", "UnityMcpServer", "src"),
};
foreach (string candidate in devCandidates)
{
string full = Path.GetFullPath(candidate);
if (Directory.Exists(full) && File.Exists(Path.Combine(full, "server.py")))
{ {
return parts[1].Trim().Trim('"'); srcPath = full;
return true;
} }
} }
} }
throw new Exception("Version not found in pyproject.toml"); catch { /* ignore */ }
}
/// <summary> // 2) Installed package: resolve via Package Manager
/// Compares two version strings to determine if the latest is newer. try
/// </summary>
public static bool IsNewerVersion(string latest, string installed)
{
int[] latestParts = latest.Split('.').Select(int.Parse).ToArray();
int[] installedParts = installed.Split('.').Select(int.Parse).ToArray();
for (int i = 0; i < Math.Min(latestParts.Length, installedParts.Length); i++)
{ {
if (latestParts[i] > installedParts[i]) var list = UnityEditor.PackageManager.Client.List();
while (!list.IsCompleted) { }
if (list.Status == UnityEditor.PackageManager.StatusCode.Success)
{ {
return true; foreach (var pkg in list.Result)
} {
if (pkg.name == "com.justinpbarnett.unity-mcp")
{
string packagePath = pkg.resolvedPath; // e.g., Library/PackageCache/... or local path
if (latestParts[i] < installedParts[i]) // Preferred: UnityMcpServer embedded alongside Editor/Runtime within the package
{ string embedded = Path.Combine(packagePath, "UnityMcpServer", "src");
return false; if (Directory.Exists(embedded) && File.Exists(Path.Combine(embedded, "server.py")))
{
srcPath = embedded;
return true;
}
// Legacy: sibling of the package folder (dev-linked). Only valid when present on disk.
string sibling = Path.Combine(Path.GetDirectoryName(packagePath) ?? string.Empty, "UnityMcpServer", "src");
if (Directory.Exists(sibling) && File.Exists(Path.Combine(sibling, "server.py")))
{
srcPath = sibling;
return true;
}
}
}
} }
} }
return latestParts.Length > installedParts.Length; catch { /* ignore */ }
// 3) Fallback to previous common install locations
try
{
string home = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
string[] candidates =
{
Path.Combine(home, "unity-mcp", "UnityMcpServer", "src"),
Path.Combine(home, "Applications", "UnityMCP", "UnityMcpServer", "src"),
};
foreach (string candidate in candidates)
{
if (Directory.Exists(candidate) && File.Exists(Path.Combine(candidate, "server.py")))
{
srcPath = candidate;
return true;
}
}
}
catch { /* ignore */ }
srcPath = null;
return false;
} }
/// <summary> private static void CopyDirectoryRecursive(string sourceDir, string destinationDir)
/// Runs a command-line process and handles output/errors.
/// </summary>
private static void RunCommand(
string command,
string arguments,
string workingDirectory = null
)
{ {
System.Diagnostics.Process process = new() Directory.CreateDirectory(destinationDir);
foreach (string filePath in Directory.GetFiles(sourceDir))
{ {
StartInfo = new System.Diagnostics.ProcessStartInfo string fileName = Path.GetFileName(filePath);
{ string destFile = Path.Combine(destinationDir, fileName);
FileName = command, File.Copy(filePath, destFile, overwrite: true);
Arguments = arguments, }
RedirectStandardOutput = true,
RedirectStandardError = true, foreach (string dirPath in Directory.GetDirectories(sourceDir))
UseShellExecute = false,
CreateNoWindow = true,
WorkingDirectory = workingDirectory ?? string.Empty,
},
};
process.Start();
string output = process.StandardOutput.ReadToEnd();
string error = process.StandardError.ReadToEnd();
process.WaitForExit();
if (process.ExitCode != 0)
{ {
throw new Exception( string dirName = Path.GetFileName(dirPath);
$"Command failed: {command} {arguments}\nOutput: {output}\nError: {error}" string destSubDir = Path.Combine(destinationDir, dirName);
); CopyDirectoryRecursive(dirPath, destSubDir);
} }
} }
} }

View File

@ -4,6 +4,7 @@ using System.IO;
using System.Linq; using System.Linq;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
@ -23,6 +24,7 @@ namespace UnityMcpBridge.Editor
private static readonly object lockObj = new(); private static readonly object lockObj = new();
private static readonly object startStopLock = new(); private static readonly object startStopLock = new();
private static bool initScheduled = false; private static bool initScheduled = false;
private static double nextHeartbeatAt = 0.0f;
private static Dictionary< private static Dictionary<
string, string,
(string commandJson, TaskCompletionSource<string> tcs) (string commandJson, TaskCompletionSource<string> tcs)
@ -83,20 +85,11 @@ namespace UnityMcpBridge.Editor
static UnityMcpBridge() static UnityMcpBridge()
{ {
// Use delayed initialization to avoid repeated restarts during compilation // Immediate start for minimal downtime, plus quit hook
EditorApplication.delayCall += InitializeAfterCompilation; Start();
EditorApplication.quitting += Stop; EditorApplication.quitting += Stop;
AssemblyReloadEvents.beforeAssemblyReload += Stop; // ensure listener releases before domain reload AssemblyReloadEvents.beforeAssemblyReload += OnBeforeAssemblyReload;
AssemblyReloadEvents.afterAssemblyReload += OnAfterAssemblyReload;
// Robust re-init hooks
UnityEditor.Compilation.CompilationPipeline.compilationFinished += _ => ScheduleInitRetry();
EditorApplication.playModeStateChanged += state =>
{
if (state == PlayModeStateChange.EnteredEditMode || state == PlayModeStateChange.EnteredPlayMode)
{
ScheduleInitRetry();
}
};
} }
/// <summary> /// <summary>
@ -148,58 +141,77 @@ namespace UnityMcpBridge.Editor
Stop(); Stop();
// Removed automatic server installer; assume server exists inside the package (UPM). // Attempt fast bind with same-port preference
try try
{ {
// Try to reuse the current port if it's still available, otherwise get a new one currentUnityPort = currentUnityPort > 0 && PortManager.IsPortAvailable(currentUnityPort)
if (currentUnityPort > 0 && PortManager.IsPortAvailable(currentUnityPort)) ? currentUnityPort
: PortManager.GetPortWithFallback();
const int maxImmediateRetries = 3;
const int retrySleepMs = 75;
int attempt = 0;
for (;;)
{ {
Debug.Log($"Reusing current port {currentUnityPort}"); try
{
listener = new TcpListener(IPAddress.Loopback, currentUnityPort);
listener.Server.SetSocketOption(
SocketOptionLevel.Socket,
SocketOptionName.ReuseAddress,
true
);
// Minimize TIME_WAIT by sending RST on close
try
{
listener.Server.LingerState = new LingerOption(true, 0);
}
catch (Exception)
{
// Ignore if not supported on platform
}
listener.Start();
break;
}
catch (SocketException se) when (se.SocketErrorCode == SocketError.AddressAlreadyInUse && attempt < maxImmediateRetries)
{
attempt++;
Thread.Sleep(retrySleepMs);
continue;
}
catch (SocketException se) when (se.SocketErrorCode == SocketError.AddressAlreadyInUse && attempt >= maxImmediateRetries)
{
currentUnityPort = PortManager.GetPortWithFallback();
listener = new TcpListener(IPAddress.Loopback, currentUnityPort);
listener.Server.SetSocketOption(
SocketOptionLevel.Socket,
SocketOptionName.ReuseAddress,
true
);
try
{
listener.Server.LingerState = new LingerOption(true, 0);
}
catch (Exception)
{
}
listener.Start();
break;
}
} }
else
{
// Use PortManager to get available port with automatic fallback
currentUnityPort = PortManager.GetPortWithFallback();
}
listener = new TcpListener(IPAddress.Loopback, currentUnityPort);
listener.Start();
isRunning = true; isRunning = true;
isAutoConnectMode = false; // Normal startup mode isAutoConnectMode = false;
Debug.Log($"UnityMcpBridge started on port {currentUnityPort}."); Debug.Log($"UnityMcpBridge started on port {currentUnityPort}.");
// Assuming ListenerLoop and ProcessCommands are defined elsewhere
Task.Run(ListenerLoop); Task.Run(ListenerLoop);
EditorApplication.update += ProcessCommands; EditorApplication.update += ProcessCommands;
// Write initial heartbeat immediately
WriteHeartbeat(false);
nextHeartbeatAt = EditorApplication.timeSinceStartup + 0.5f;
} }
catch (SocketException ex) catch (SocketException ex)
{ {
if (ex.SocketErrorCode == SocketError.AddressAlreadyInUse) Debug.LogError($"Failed to start TCP listener: {ex.Message}");
{
Debug.LogError(
$"Port {currentUnityPort} is already in use. Trying to find alternative..."
);
// Try once more with a fresh port discovery
try
{
currentUnityPort = PortManager.DiscoverNewPort();
listener = new TcpListener(IPAddress.Loopback, currentUnityPort);
listener.Start();
isRunning = true;
Debug.Log($"UnityMcpBridge started on fallback port {currentUnityPort}.");
Task.Run(ListenerLoop);
EditorApplication.update += ProcessCommands;
}
catch (Exception fallbackEx)
{
Debug.LogError($"Failed to start on fallback port: {fallbackEx.Message}");
}
}
else
{
Debug.LogError($"Failed to start TCP listener: {ex.Message}");
}
} }
} }
} }
@ -215,6 +227,8 @@ namespace UnityMcpBridge.Editor
try try
{ {
// Mark heartbeat one last time before stopping
WriteHeartbeat(false);
listener?.Stop(); listener?.Stop();
listener = null; listener = null;
isRunning = false; isRunning = false;
@ -317,6 +331,14 @@ namespace UnityMcpBridge.Editor
List<string> processedIds = new(); List<string> processedIds = new();
lock (lockObj) lock (lockObj)
{ {
// Periodic heartbeat while editor is idle/processing
double now = EditorApplication.timeSinceStartup;
if (now >= nextHeartbeatAt)
{
WriteHeartbeat(false);
nextHeartbeatAt = now + 0.5f;
}
foreach ( foreach (
KeyValuePair< KeyValuePair<
string, string,
@ -544,5 +566,59 @@ namespace UnityMcpBridge.Editor
return "Could not summarize parameters"; return "Could not summarize parameters";
} }
} }
// Heartbeat/status helpers
private static void OnBeforeAssemblyReload()
{
WriteHeartbeat(true);
}
private static void OnAfterAssemblyReload()
{
// Will be overwritten by Start(), but mark as alive quickly
WriteHeartbeat(false);
}
private static void WriteHeartbeat(bool reloading)
{
try
{
string dir = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.UserProfile), ".unity-mcp");
Directory.CreateDirectory(dir);
string filePath = Path.Combine(dir, $"unity-mcp-status-{ComputeProjectHash(Application.dataPath)}.json");
var payload = new
{
unity_port = currentUnityPort,
reloading,
project_path = Application.dataPath,
last_heartbeat = DateTime.UtcNow.ToString("O")
};
File.WriteAllText(filePath, JsonConvert.SerializeObject(payload));
}
catch (Exception)
{
// Best-effort only
}
}
private static string ComputeProjectHash(string input)
{
try
{
using var sha1 = System.Security.Cryptography.SHA1.Create();
byte[] bytes = System.Text.Encoding.UTF8.GetBytes(input ?? string.Empty);
byte[] hashBytes = sha1.ComputeHash(bytes);
var sb = new System.Text.StringBuilder();
foreach (byte b in hashBytes)
{
sb.Append(b.ToString("x2"));
}
return sb.ToString()[..8];
}
catch
{
return "default";
}
}
} }
} }

View File

@ -104,19 +104,8 @@ namespace UnityMcpBridge.Editor.Windows
if (File.Exists(Path.Combine(serverPath, "server.py"))) if (File.Exists(Path.Combine(serverPath, "server.py")))
{ {
string installedVersion = ServerInstaller.GetInstalledVersion(); pythonServerInstallationStatus = "Installed (Embedded)";
string latestVersion = ServerInstaller.GetLatestVersion(); pythonServerInstallationStatusColor = Color.green;
if (ServerInstaller.IsNewerVersion(latestVersion, installedVersion))
{
pythonServerInstallationStatus = "Newer Version Available";
pythonServerInstallationStatusColor = Color.yellow;
}
else
{
pythonServerInstallationStatus = "Up to Date";
pythonServerInstallationStatusColor = Color.green;
}
} }
else else
{ {

View File

@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: 661ad50b20643440fbed55a237c6db95
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1 @@
3.12

View File

@ -0,0 +1,27 @@
FROM python:3.12-slim
# Install required system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
git \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Install uv package manager
RUN pip install uv
# Copy required files
COPY config.py /app/
COPY server.py /app/
COPY unity_connection.py /app/
COPY pyproject.toml /app/
COPY __init__.py /app/
COPY tools/ /app/tools/
# Install dependencies using uv
RUN uv pip install --system -e .
# Command to run the server
CMD ["uv", "run", "server.py"]

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 6fa88615288954da09edbaa8118d833d
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: e90a4cfea1025423da33a86d17d4fbd3
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,3 @@
"""
Unity MCP Server package.
"""

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 59ba898760fd24167997d22d2705b8a4
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,30 @@
"""
Configuration settings for the Unity MCP Server.
This file contains all configurable parameters for the server.
"""
from dataclasses import dataclass
@dataclass
class ServerConfig:
"""Main configuration class for the MCP server."""
# Network settings
unity_host: str = "localhost"
unity_port: int = 6400
mcp_port: int = 6500
# Connection settings
connection_timeout: float = 60.0 # default steady-state timeout; retries use shorter timeouts
buffer_size: int = 16 * 1024 * 1024 # 16MB buffer
# Logging settings
log_level: str = "INFO"
log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# Server settings
max_retries: int = 8
retry_delay: float = 0.5
# Create a global config instance
config = ServerConfig()

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 5516f911d79504c71976757e67ca228b
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,155 @@
"""
Port discovery utility for Unity MCP Server.
What changed and why:
- Unity now writes a per-project port file named like
`~/.unity-mcp/unity-mcp-port-<hash>.json` to avoid projects overwriting
each other's saved port. The legacy file `unity-mcp-port.json` may still
exist.
- This module now scans for both patterns, prefers the most recently
modified file, and verifies that the port is actually a Unity MCP listener
(quick socket connect + ping) before choosing it.
"""
import json
import os
import logging
from pathlib import Path
from typing import Optional, List
import glob
import socket
logger = logging.getLogger("unity-mcp-server")
class PortDiscovery:
"""Handles port discovery from Unity Bridge registry"""
REGISTRY_FILE = "unity-mcp-port.json" # legacy single-project file
DEFAULT_PORT = 6400
CONNECT_TIMEOUT = 0.3 # seconds, keep this snappy during discovery
@staticmethod
def get_registry_path() -> Path:
"""Get the path to the port registry file"""
return Path.home() / ".unity-mcp" / PortDiscovery.REGISTRY_FILE
@staticmethod
def get_registry_dir() -> Path:
return Path.home() / ".unity-mcp"
@staticmethod
def list_candidate_files() -> List[Path]:
"""Return candidate registry files, newest first.
Includes hashed per-project files and the legacy file (if present).
"""
base = PortDiscovery.get_registry_dir()
hashed = sorted(
(Path(p) for p in glob.glob(str(base / "unity-mcp-port-*.json"))),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
legacy = PortDiscovery.get_registry_path()
if legacy.exists():
# Put legacy at the end so hashed, per-project files win
hashed.append(legacy)
return hashed
@staticmethod
def _try_probe_unity_mcp(port: int) -> bool:
"""Quickly check if a Unity MCP listener is on this port.
Tries a short TCP connect, sends 'ping', expects a JSON 'pong'.
"""
try:
with socket.create_connection(("127.0.0.1", port), PortDiscovery.CONNECT_TIMEOUT) as s:
s.settimeout(PortDiscovery.CONNECT_TIMEOUT)
try:
s.sendall(b"ping")
data = s.recv(512)
# Minimal validation: look for a success pong response
if data and b'"message":"pong"' in data:
return True
except Exception:
return False
except Exception:
return False
return False
@staticmethod
def _read_latest_status() -> Optional[dict]:
try:
base = PortDiscovery.get_registry_dir()
status_files = sorted(
(Path(p) for p in glob.glob(str(base / "unity-mcp-status-*.json"))),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if not status_files:
return None
with status_files[0].open('r') as f:
return json.load(f)
except Exception:
return None
@staticmethod
def discover_unity_port() -> int:
"""
Discover Unity port by scanning per-project and legacy registry files.
Prefer the newest file whose port responds; fall back to first parsed
value; finally default to 6400.
Returns:
Port number to connect to
"""
# Prefer the latest heartbeat status if it points to a responsive port
status = PortDiscovery._read_latest_status()
if status:
port = status.get('unity_port')
if isinstance(port, int) and PortDiscovery._try_probe_unity_mcp(port):
logger.info(f"Using Unity port from status: {port}")
return port
candidates = PortDiscovery.list_candidate_files()
first_seen_port: Optional[int] = None
for path in candidates:
try:
with open(path, 'r') as f:
cfg = json.load(f)
unity_port = cfg.get('unity_port')
if isinstance(unity_port, int):
if first_seen_port is None:
first_seen_port = unity_port
if PortDiscovery._try_probe_unity_mcp(unity_port):
logger.info(f"Using Unity port from {path.name}: {unity_port}")
return unity_port
except Exception as e:
logger.warning(f"Could not read port registry {path}: {e}")
if first_seen_port is not None:
logger.info(f"No responsive port found; using first seen value {first_seen_port}")
return first_seen_port
# Fallback to default port
logger.info(f"No port registry found; using default port {PortDiscovery.DEFAULT_PORT}")
return PortDiscovery.DEFAULT_PORT
@staticmethod
def get_port_config() -> Optional[dict]:
"""
Get the most relevant port configuration from registry.
Returns the most recent hashed file's config if present,
otherwise the legacy file's config. Returns None if nothing exists.
Returns:
Port configuration dict or None if not found
"""
candidates = PortDiscovery.list_candidate_files()
if not candidates:
return None
for path in candidates:
try:
with open(path, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Could not read port configuration {path}: {e}")
return None

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 8d315755217ea4c36b221ac0461032ab
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,15 @@
[project]
name = "UnityMcpServer"
version = "2.0.0"
description = "Unity MCP Server: A Unity package for Unity Editor integration via the Model Context Protocol (MCP)."
readme = "README.md"
requires-python = ">=3.12"
dependencies = ["httpx>=0.27.2", "mcp[cli]>=1.4.1"]
[build-system]
requires = ["setuptools>=64.0.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
py-modules = ["config", "server", "unity_connection"]
packages = ["tools"]

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 66fbd8ab4fd094540ba73299b6a2424a
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,73 @@
from mcp.server.fastmcp import FastMCP, Context, Image
import logging
from dataclasses import dataclass
from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any, List
from config import config
from tools import register_all_tools
from unity_connection import get_unity_connection, UnityConnection
# Configure logging using settings from config
logging.basicConfig(
level=getattr(logging, config.log_level),
format=config.log_format
)
logger = logging.getLogger("unity-mcp-server")
# Global connection state
_unity_connection: UnityConnection = None
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Handle server startup and shutdown."""
global _unity_connection
logger.info("Unity MCP Server starting up")
try:
_unity_connection = get_unity_connection()
logger.info("Connected to Unity on startup")
except Exception as e:
logger.warning(f"Could not connect to Unity on startup: {str(e)}")
_unity_connection = None
try:
# Yield the connection object so it can be attached to the context
# The key 'bridge' matches how tools like read_console expect to access it (ctx.bridge)
yield {"bridge": _unity_connection}
finally:
if _unity_connection:
_unity_connection.disconnect()
_unity_connection = None
logger.info("Unity MCP Server shut down")
# Initialize MCP server
mcp = FastMCP(
"unity-mcp-server",
description="Unity Editor integration via Model Context Protocol",
lifespan=server_lifespan
)
# Register all tools
register_all_tools(mcp)
# Asset Creation Strategy
@mcp.prompt()
def asset_creation_strategy() -> str:
"""Guide for discovering and using Unity MCP tools effectively."""
return (
"Available Unity MCP Server Tools:\\n\\n"
"- `manage_editor`: Controls editor state and queries info.\\n"
"- `execute_menu_item`: Executes Unity Editor menu items by path.\\n"
"- `read_console`: Reads or clears Unity console messages, with filtering options.\\n"
"- `manage_scene`: Manages scenes.\\n"
"- `manage_gameobject`: Manages GameObjects in the scene.\\n"
"- `manage_script`: Manages C# script files.\\n"
"- `manage_asset`: Manages prefabs and assets.\\n"
"- `manage_shader`: Manages shaders.\\n\\n"
"Tips:\\n"
"- Create prefabs for reusable GameObjects.\\n"
"- Always include a camera and main light in your scenes.\\n"
)
# Run the server
if __name__ == "__main__":
mcp.run(transport='stdio')

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 8ef892978afc74491b6cf65f40514e74
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: 205ac300b2209414f8b246354e853777
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,21 @@
from .manage_script import register_manage_script_tools
from .manage_scene import register_manage_scene_tools
from .manage_editor import register_manage_editor_tools
from .manage_gameobject import register_manage_gameobject_tools
from .manage_asset import register_manage_asset_tools
from .manage_shader import register_manage_shader_tools
from .read_console import register_read_console_tools
from .execute_menu_item import register_execute_menu_item_tools
def register_all_tools(mcp):
"""Register all refactored tools with the MCP server."""
print("Registering Unity MCP Server refactored tools...")
register_manage_script_tools(mcp)
register_manage_scene_tools(mcp)
register_manage_editor_tools(mcp)
register_manage_gameobject_tools(mcp)
register_manage_asset_tools(mcp)
register_manage_shader_tools(mcp)
register_read_console_tools(mcp)
register_execute_menu_item_tools(mcp)
print("Unity MCP Server tool registration complete.")

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 85da958dba57e47b9a2fa32a8abd61ef
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,51 @@
"""
Defines the execute_menu_item tool for running Unity Editor menu commands.
"""
from typing import Dict, Any
from mcp.server.fastmcp import FastMCP, Context
from unity_connection import get_unity_connection # Import unity_connection module
def register_execute_menu_item_tools(mcp: FastMCP):
"""Registers the execute_menu_item tool with the MCP server."""
@mcp.tool()
async def execute_menu_item(
ctx: Context,
menu_path: str,
action: str = 'execute',
parameters: Dict[str, Any] = None,
) -> Dict[str, Any]:
"""Executes a Unity Editor menu item via its path (e.g., "File/Save Project").
Args:
ctx: The MCP context.
menu_path: The full path of the menu item to execute.
action: The operation to perform (default: 'execute').
parameters: Optional parameters for the menu item (rarely used).
Returns:
A dictionary indicating success or failure, with optional message/error.
"""
action = action.lower() if action else 'execute'
# Prepare parameters for the C# handler
params_dict = {
"action": action,
"menuPath": menu_path,
"parameters": parameters if parameters else {},
}
# Remove None values
params_dict = {k: v for k, v in params_dict.items() if v is not None}
if "parameters" not in params_dict:
params_dict["parameters"] = {} # Ensure parameters dict exists
# Get Unity connection and send the command
# We use the unity_connection module to communicate with Unity
unity_conn = get_unity_connection()
# Send command to the ExecuteMenuItem C# handler
# The command type should match what the Unity side expects
return unity_conn.send_command("execute_menu_item", params_dict)

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 50ba0cffcdba2452a89ac372d67b4787
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,83 @@
"""
Defines the manage_asset tool for interacting with Unity assets.
"""
import asyncio # Added: Import asyncio for running sync code in async
from typing import Dict, Any
from mcp.server.fastmcp import FastMCP, Context
# from ..unity_connection import get_unity_connection # Original line that caused error
from unity_connection import get_unity_connection # Use absolute import relative to Python dir
def register_manage_asset_tools(mcp: FastMCP):
"""Registers the manage_asset tool with the MCP server."""
@mcp.tool()
async def manage_asset(
ctx: Context,
action: str,
path: str,
asset_type: str = None,
properties: Dict[str, Any] = None,
destination: str = None,
generate_preview: bool = False,
search_pattern: str = None,
filter_type: str = None,
filter_date_after: str = None,
page_size: int = None,
page_number: int = None
) -> Dict[str, Any]:
"""Performs asset operations (import, create, modify, delete, etc.) in Unity.
Args:
ctx: The MCP context.
action: Operation to perform (e.g., 'import', 'create', 'modify', 'delete', 'duplicate', 'move', 'rename', 'search', 'get_info', 'create_folder', 'get_components').
path: Asset path (e.g., "Materials/MyMaterial.mat") or search scope.
asset_type: Asset type (e.g., 'Material', 'Folder') - required for 'create'.
properties: Dictionary of properties for 'create'/'modify'.
example properties for Material: {"color": [1, 0, 0, 1], "shader": "Standard"}.
example properties for Texture: {"width": 1024, "height": 1024, "format": "RGBA32"}.
example properties for PhysicsMaterial: {"bounciness": 1.0, "staticFriction": 0.5, "dynamicFriction": 0.5}.
destination: Target path for 'duplicate'/'move'.
search_pattern: Search pattern (e.g., '*.prefab').
filter_*: Filters for search (type, date).
page_*: Pagination for search.
Returns:
A dictionary with operation results ('success', 'data', 'error').
"""
# Ensure properties is a dict if None
if properties is None:
properties = {}
# Prepare parameters for the C# handler
params_dict = {
"action": action.lower(),
"path": path,
"assetType": asset_type,
"properties": properties,
"destination": destination,
"generatePreview": generate_preview,
"searchPattern": search_pattern,
"filterType": filter_type,
"filterDateAfter": filter_date_after,
"pageSize": page_size,
"pageNumber": page_number
}
# Remove None values to avoid sending unnecessary nulls
params_dict = {k: v for k, v in params_dict.items() if v is not None}
# Get the current asyncio event loop
loop = asyncio.get_running_loop()
# Get the Unity connection instance
connection = get_unity_connection()
# Run the synchronous send_command in the default executor (thread pool)
# This prevents blocking the main async event loop.
result = await loop.run_in_executor(
None, # Use default executor
connection.send_command, # The function to call
"manage_asset", # First argument for send_command
params_dict # Second argument for send_command
)
# Return the result obtained from Unity
return result

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 7c0cfde2907ef4306b8a46c4b190f96a
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,53 @@
from mcp.server.fastmcp import FastMCP, Context
from typing import Dict, Any
from unity_connection import get_unity_connection
def register_manage_editor_tools(mcp: FastMCP):
"""Register all editor management tools with the MCP server."""
@mcp.tool()
def manage_editor(
ctx: Context,
action: str,
wait_for_completion: bool = None,
# --- Parameters for specific actions ---
tool_name: str = None,
tag_name: str = None,
layer_name: str = None,
) -> Dict[str, Any]:
"""Controls and queries the Unity editor's state and settings.
Args:
action: Operation (e.g., 'play', 'pause', 'get_state', 'set_active_tool', 'add_tag').
wait_for_completion: Optional. If True, waits for certain actions.
Action-specific arguments (e.g., tool_name, tag_name, layer_name).
Returns:
Dictionary with operation results ('success', 'message', 'data').
"""
try:
# Prepare parameters, removing None values
params = {
"action": action,
"waitForCompletion": wait_for_completion,
"toolName": tool_name, # Corrected parameter name to match C#
"tagName": tag_name, # Pass tag name
"layerName": layer_name, # Pass layer name
# Add other parameters based on the action being performed
# "width": width,
# "height": height,
# etc.
}
params = {k: v for k, v in params.items() if v is not None}
# Send command to Unity
response = get_unity_connection().send_command("manage_editor", params)
# Process response
if response.get("success"):
return {"success": True, "message": response.get("message", "Editor operation successful."), "data": response.get("data")}
else:
return {"success": False, "message": response.get("error", "An unknown error occurred during editor management.")}
except Exception as e:
return {"success": False, "message": f"Python error managing editor: {str(e)}"}

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 54f6646d00435410fb67cc17d095c977
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,138 @@
from mcp.server.fastmcp import FastMCP, Context
from typing import Dict, Any, List
from unity_connection import get_unity_connection
def register_manage_gameobject_tools(mcp: FastMCP):
"""Register all GameObject management tools with the MCP server."""
@mcp.tool()
def manage_gameobject(
ctx: Context,
action: str,
target: str = None, # GameObject identifier by name or path
search_method: str = None,
# --- Combined Parameters for Create/Modify ---
name: str = None, # Used for both 'create' (new object name) and 'modify' (rename)
tag: str = None, # Used for both 'create' (initial tag) and 'modify' (change tag)
parent: str = None, # Used for both 'create' (initial parent) and 'modify' (change parent)
position: List[float] = None,
rotation: List[float] = None,
scale: List[float] = None,
components_to_add: List[str] = None, # List of component names to add
primitive_type: str = None,
save_as_prefab: bool = False,
prefab_path: str = None,
prefab_folder: str = "Assets/Prefabs",
# --- Parameters for 'modify' ---
set_active: bool = None,
layer: str = None, # Layer name
components_to_remove: List[str] = None,
component_properties: Dict[str, Dict[str, Any]] = None,
# --- Parameters for 'find' ---
search_term: str = None,
find_all: bool = False,
search_in_children: bool = False,
search_inactive: bool = False,
# -- Component Management Arguments --
component_name: str = None,
includeNonPublicSerialized: bool = None, # Controls serialization of private [SerializeField] fields
) -> Dict[str, Any]:
"""Manages GameObjects: create, modify, delete, find, and component operations.
Args:
action: Operation (e.g., 'create', 'modify', 'find', 'add_component', 'remove_component', 'set_component_property', 'get_components').
target: GameObject identifier (name or path string) for modify/delete/component actions.
search_method: How to find objects ('by_name', 'by_id', 'by_path', etc.). Used with 'find' and some 'target' lookups.
name: GameObject name - used for both 'create' (initial name) and 'modify' (rename).
tag: Tag name - used for both 'create' (initial tag) and 'modify' (change tag).
parent: Parent GameObject reference - used for both 'create' (initial parent) and 'modify' (change parent).
layer: Layer name - used for both 'create' (initial layer) and 'modify' (change layer).
component_properties: Dict mapping Component names to their properties to set.
Example: {"Rigidbody": {"mass": 10.0, "useGravity": True}},
To set references:
- Use asset path string for Prefabs/Materials, e.g., {"MeshRenderer": {"material": "Assets/Materials/MyMat.mat"}}
- Use a dict for scene objects/components, e.g.:
{"MyScript": {"otherObject": {"find": "Player", "method": "by_name"}}} (assigns GameObject)
{"MyScript": {"playerHealth": {"find": "Player", "component": "HealthComponent"}}} (assigns Component)
Example set nested property:
- Access shared material: {"MeshRenderer": {"sharedMaterial.color": [1, 0, 0, 1]}}
components_to_add: List of component names to add.
Action-specific arguments (e.g., position, rotation, scale for create/modify;
component_name for component actions;
search_term, find_all for 'find').
includeNonPublicSerialized: If True, includes private fields marked [SerializeField] in component data.
Action-specific details:
- For 'get_components':
Required: target, search_method
Optional: includeNonPublicSerialized (defaults to True)
Returns all components on the target GameObject with their serialized data.
The search_method parameter determines how to find the target ('by_name', 'by_id', 'by_path').
Returns:
Dictionary with operation results ('success', 'message', 'data').
For 'get_components', the 'data' field contains a dictionary of component names and their serialized properties.
"""
try:
# --- Early check for attempting to modify a prefab asset ---
# ----------------------------------------------------------
# Prepare parameters, removing None values
params = {
"action": action,
"target": target,
"searchMethod": search_method,
"name": name,
"tag": tag,
"parent": parent,
"position": position,
"rotation": rotation,
"scale": scale,
"componentsToAdd": components_to_add,
"primitiveType": primitive_type,
"saveAsPrefab": save_as_prefab,
"prefabPath": prefab_path,
"prefabFolder": prefab_folder,
"setActive": set_active,
"layer": layer,
"componentsToRemove": components_to_remove,
"componentProperties": component_properties,
"searchTerm": search_term,
"findAll": find_all,
"searchInChildren": search_in_children,
"searchInactive": search_inactive,
"componentName": component_name,
"includeNonPublicSerialized": includeNonPublicSerialized
}
params = {k: v for k, v in params.items() if v is not None}
# --- Handle Prefab Path Logic ---
if action == "create" and params.get("saveAsPrefab"): # Check if 'saveAsPrefab' is explicitly True in params
if "prefabPath" not in params:
if "name" not in params or not params["name"]:
return {"success": False, "message": "Cannot create default prefab path: 'name' parameter is missing."}
# Use the provided prefab_folder (which has a default) and the name to construct the path
constructed_path = f"{prefab_folder}/{params['name']}.prefab"
# Ensure clean path separators (Unity prefers '/')
params["prefabPath"] = constructed_path.replace("\\", "/")
elif not params["prefabPath"].lower().endswith(".prefab"):
return {"success": False, "message": f"Invalid prefab_path: '{params['prefabPath']}' must end with .prefab"}
# Ensure prefab_folder itself isn't sent if prefabPath was constructed or provided
# The C# side only needs the final prefabPath
params.pop("prefab_folder", None)
# --------------------------------
# Send the command to Unity via the established connection
# Use the get_unity_connection function to retrieve the active connection instance
# Changed "MANAGE_GAMEOBJECT" to "manage_gameobject" to potentially match Unity expectation
response = get_unity_connection().send_command("manage_gameobject", params)
# Check if the response indicates success
# If the response is not successful, raise an exception with the error message
if response.get("success"):
return {"success": True, "message": response.get("message", "GameObject operation successful."), "data": response.get("data")}
else:
return {"success": False, "message": response.get("error", "An unknown error occurred during GameObject management.")}
except Exception as e:
return {"success": False, "message": f"Python error managing GameObject: {str(e)}"}

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 393f17281b99c428dbe73ba8652b60f5
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,47 @@
from mcp.server.fastmcp import FastMCP, Context
from typing import Dict, Any
from unity_connection import get_unity_connection
def register_manage_scene_tools(mcp: FastMCP):
"""Register all scene management tools with the MCP server."""
@mcp.tool()
def manage_scene(
ctx: Context,
action: str,
name: str,
path: str,
build_index: int,
) -> Dict[str, Any]:
"""Manages Unity scenes (load, save, create, get hierarchy, etc.).
Args:
action: Operation (e.g., 'load', 'save', 'create', 'get_hierarchy').
name: Scene name (no extension) for create/load/save.
path: Asset path for scene operations (default: "Assets/").
build_index: Build index for load/build settings actions.
# Add other action-specific args as needed (e.g., for hierarchy depth)
Returns:
Dictionary with results ('success', 'message', 'data').
"""
try:
params = {
"action": action,
"name": name,
"path": path,
"buildIndex": build_index
}
params = {k: v for k, v in params.items() if v is not None}
# Send command to Unity
response = get_unity_connection().send_command("manage_scene", params)
# Process response
if response.get("success"):
return {"success": True, "message": response.get("message", "Scene operation successful."), "data": response.get("data")}
else:
return {"success": False, "message": response.get("error", "An unknown error occurred during scene management.")}
except Exception as e:
return {"success": False, "message": f"Python error managing scene: {str(e)}"}

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: a744081d28b1e4ace9bfe8d6c4309640
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,74 @@
from mcp.server.fastmcp import FastMCP, Context
from typing import Dict, Any
from unity_connection import get_unity_connection
import os
import base64
def register_manage_script_tools(mcp: FastMCP):
"""Register all script management tools with the MCP server."""
@mcp.tool()
def manage_script(
ctx: Context,
action: str,
name: str,
path: str,
contents: str,
script_type: str,
namespace: str
) -> Dict[str, Any]:
"""Manages C# scripts in Unity (create, read, update, delete).
Make reference variables public for easier access in the Unity Editor.
Args:
action: Operation ('create', 'read', 'update', 'delete').
name: Script name (no .cs extension).
path: Asset path (default: "Assets/").
contents: C# code for 'create'/'update'.
script_type: Type hint (e.g., 'MonoBehaviour').
namespace: Script namespace.
Returns:
Dictionary with results ('success', 'message', 'data').
"""
try:
# Prepare parameters for Unity
params = {
"action": action,
"name": name,
"path": path,
"namespace": namespace,
"scriptType": script_type
}
# Base64 encode the contents if they exist to avoid JSON escaping issues
if contents is not None:
if action in ['create', 'update']:
# Encode content for safer transmission
params["encodedContents"] = base64.b64encode(contents.encode('utf-8')).decode('utf-8')
params["contentsEncoded"] = True
else:
params["contents"] = contents
# Remove None values so they don't get sent as null
params = {k: v for k, v in params.items() if v is not None}
# Send command to Unity
response = get_unity_connection().send_command("manage_script", params)
# Process response from Unity
if response.get("success"):
# If the response contains base64 encoded content, decode it
if response.get("data", {}).get("contentsEncoded"):
decoded_contents = base64.b64decode(response["data"]["encodedContents"]).decode('utf-8')
response["data"]["contents"] = decoded_contents
del response["data"]["encodedContents"]
del response["data"]["contentsEncoded"]
return {"success": True, "message": response.get("message", "Operation successful."), "data": response.get("data")}
else:
return {"success": False, "message": response.get("error", "An unknown error occurred.")}
except Exception as e:
# Handle Python-side errors (e.g., connection issues)
return {"success": False, "message": f"Python error managing script: {str(e)}"}

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 5f5d55725198d4d53afcd4565f402b9e
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,67 @@
from mcp.server.fastmcp import FastMCP, Context
from typing import Dict, Any
from unity_connection import get_unity_connection
import os
import base64
def register_manage_shader_tools(mcp: FastMCP):
"""Register all shader script management tools with the MCP server."""
@mcp.tool()
def manage_shader(
ctx: Context,
action: str,
name: str,
path: str,
contents: str,
) -> Dict[str, Any]:
"""Manages shader scripts in Unity (create, read, update, delete).
Args:
action: Operation ('create', 'read', 'update', 'delete').
name: Shader name (no .cs extension).
path: Asset path (default: "Assets/").
contents: Shader code for 'create'/'update'.
Returns:
Dictionary with results ('success', 'message', 'data').
"""
try:
# Prepare parameters for Unity
params = {
"action": action,
"name": name,
"path": path,
}
# Base64 encode the contents if they exist to avoid JSON escaping issues
if contents is not None:
if action in ['create', 'update']:
# Encode content for safer transmission
params["encodedContents"] = base64.b64encode(contents.encode('utf-8')).decode('utf-8')
params["contentsEncoded"] = True
else:
params["contents"] = contents
# Remove None values so they don't get sent as null
params = {k: v for k, v in params.items() if v is not None}
# Send command to Unity
response = get_unity_connection().send_command("manage_shader", params)
# Process response from Unity
if response.get("success"):
# If the response contains base64 encoded content, decode it
if response.get("data", {}).get("contentsEncoded"):
decoded_contents = base64.b64decode(response["data"]["encodedContents"]).decode('utf-8')
response["data"]["contents"] = decoded_contents
del response["data"]["encodedContents"]
del response["data"]["contentsEncoded"]
return {"success": True, "message": response.get("message", "Operation successful."), "data": response.get("data")}
else:
return {"success": False, "message": response.get("error", "An unknown error occurred.")}
except Exception as e:
# Handle Python-side errors (e.g., connection issues)
return {"success": False, "message": f"Python error managing shader: {str(e)}"}

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 52a3e6faa53234aa08edf8163159c9af
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,70 @@
"""
Defines the read_console tool for accessing Unity Editor console messages.
"""
from typing import List, Dict, Any
from mcp.server.fastmcp import FastMCP, Context
from unity_connection import get_unity_connection
def register_read_console_tools(mcp: FastMCP):
"""Registers the read_console tool with the MCP server."""
@mcp.tool()
def read_console(
ctx: Context,
action: str = None,
types: List[str] = None,
count: int = None,
filter_text: str = None,
since_timestamp: str = None,
format: str = None,
include_stacktrace: bool = None
) -> Dict[str, Any]:
"""Gets messages from or clears the Unity Editor console.
Args:
ctx: The MCP context.
action: Operation ('get' or 'clear').
types: Message types to get ('error', 'warning', 'log', 'all').
count: Max messages to return.
filter_text: Text filter for messages.
since_timestamp: Get messages after this timestamp (ISO 8601).
format: Output format ('plain', 'detailed', 'json').
include_stacktrace: Include stack traces in output.
Returns:
Dictionary with results. For 'get', includes 'data' (messages).
"""
# Get the connection instance
bridge = get_unity_connection()
# Set defaults if values are None
action = action if action is not None else 'get'
types = types if types is not None else ['error', 'warning', 'log']
format = format if format is not None else 'detailed'
include_stacktrace = include_stacktrace if include_stacktrace is not None else True
# Normalize action if it's a string
if isinstance(action, str):
action = action.lower()
# Prepare parameters for the C# handler
params_dict = {
"action": action,
"types": types,
"count": count,
"filterText": filter_text,
"sinceTimestamp": since_timestamp,
"format": format.lower() if isinstance(format, str) else format,
"includeStacktrace": include_stacktrace
}
# Remove None values unless it's 'count' (as None might mean 'all')
params_dict = {k: v for k, v in params_dict.items() if v is not None or k == 'count'}
# Add count back if it was None, explicitly sending null might be important for C# logic
if 'count' not in params_dict:
params_dict['count'] = None
# Forward the command using the bridge's send_command method
return bridge.send_command("read_console", params_dict)

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: a73ff5df6153548878e2656315e5db69
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,239 @@
import socket
import json
import logging
from dataclasses import dataclass
from pathlib import Path
import time
from typing import Dict, Any
from config import config
from port_discovery import PortDiscovery
# Configure logging using settings from config
logging.basicConfig(
level=getattr(logging, config.log_level),
format=config.log_format
)
logger = logging.getLogger("unity-mcp-server")
@dataclass
class UnityConnection:
"""Manages the socket connection to the Unity Editor."""
host: str = config.unity_host
port: int = None # Will be set dynamically
sock: socket.socket = None # Socket for Unity communication
def __post_init__(self):
"""Set port from discovery if not explicitly provided"""
if self.port is None:
self.port = PortDiscovery.discover_unity_port()
def connect(self) -> bool:
"""Establish a connection to the Unity Editor."""
if self.sock:
return True
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
logger.info(f"Connected to Unity at {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to Unity: {str(e)}")
self.sock = None
return False
def disconnect(self):
"""Close the connection to the Unity Editor."""
if self.sock:
try:
self.sock.close()
except Exception as e:
logger.error(f"Error disconnecting from Unity: {str(e)}")
finally:
self.sock = None
def receive_full_response(self, sock, buffer_size=config.buffer_size) -> bytes:
"""Receive a complete response from Unity, handling chunked data."""
chunks = []
sock.settimeout(config.connection_timeout) # Use timeout from config
try:
while True:
chunk = sock.recv(buffer_size)
if not chunk:
if not chunks:
raise Exception("Connection closed before receiving data")
break
chunks.append(chunk)
# Process the data received so far
data = b''.join(chunks)
decoded_data = data.decode('utf-8')
# Check if we've received a complete response
try:
# Special case for ping-pong
if decoded_data.strip().startswith('{"status":"success","result":{"message":"pong"'):
logger.debug("Received ping response")
return data
# Handle escaped quotes in the content
if '"content":' in decoded_data:
# Find the content field and its value
content_start = decoded_data.find('"content":') + 9
content_end = decoded_data.rfind('"', content_start)
if content_end > content_start:
# Replace escaped quotes in content with regular quotes
content = decoded_data[content_start:content_end]
content = content.replace('\\"', '"')
decoded_data = decoded_data[:content_start] + content + decoded_data[content_end:]
# Validate JSON format
json.loads(decoded_data)
# If we get here, we have valid JSON
logger.info(f"Received complete response ({len(data)} bytes)")
return data
except json.JSONDecodeError:
# We haven't received a complete valid JSON response yet
continue
except Exception as e:
logger.warning(f"Error processing response chunk: {str(e)}")
# Continue reading more chunks as this might not be the complete response
continue
except socket.timeout:
logger.warning("Socket timeout during receive")
raise Exception("Timeout receiving Unity response")
except Exception as e:
logger.error(f"Error during receive: {str(e)}")
raise
def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Send a command with retry/backoff and port rediscovery. Pings only when requested."""
attempts = max(config.max_retries, 5)
base_backoff = max(0.5, config.retry_delay)
def read_status_file() -> dict | None:
try:
status_files = sorted(Path.home().joinpath('.unity-mcp').glob('unity-mcp-status-*.json'), key=lambda p: p.stat().st_mtime, reverse=True)
if not status_files:
return None
latest = status_files[0]
with latest.open('r') as f:
return json.load(f)
except Exception:
return None
last_short_timeout = None
for attempt in range(attempts + 1):
try:
# Ensure connected
if not self.sock:
# During retries use short connect timeout
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(1.0)
self.sock.connect((self.host, self.port))
# restore steady-state timeout for receive
self.sock.settimeout(config.connection_timeout)
logger.info(f"Connected to Unity at {self.host}:{self.port}")
# Build payload
if command_type == 'ping':
payload = b'ping'
else:
command = {"type": command_type, "params": params or {}}
payload = json.dumps(command, ensure_ascii=False).encode('utf-8')
# Send
self.sock.sendall(payload)
# During retry bursts use a short receive timeout
if attempt > 0 and last_short_timeout is None:
last_short_timeout = self.sock.gettimeout()
self.sock.settimeout(1.0)
response_data = self.receive_full_response(self.sock)
# restore steady-state timeout if changed
if last_short_timeout is not None:
self.sock.settimeout(config.connection_timeout)
last_short_timeout = None
# Parse
if command_type == 'ping':
resp = json.loads(response_data.decode('utf-8'))
if resp.get('status') == 'success' and resp.get('result', {}).get('message') == 'pong':
return {"message": "pong"}
raise Exception("Ping unsuccessful")
resp = json.loads(response_data.decode('utf-8'))
if resp.get('status') == 'error':
err = resp.get('error') or resp.get('message', 'Unknown Unity error')
raise Exception(err)
return resp.get('result', {})
except Exception as e:
logger.warning(f"Unity communication attempt {attempt+1} failed: {e}")
try:
if self.sock:
self.sock.close()
finally:
self.sock = None
# Re-discover port each time
try:
new_port = PortDiscovery.discover_unity_port()
if new_port != self.port:
logger.info(f"Unity port changed {self.port} -> {new_port}")
self.port = new_port
except Exception as de:
logger.debug(f"Port discovery failed: {de}")
if attempt < attempts:
# If heartbeat indicates reload, keep retries snappy without spamming
status = read_status_file()
backoff = base_backoff * (2 ** attempt)
sleep_s = min(backoff, 3.0)
if status and (status.get('reloading') or status.get('unity_port') == self.port):
sleep_s = min(sleep_s, 0.8)
time.sleep(sleep_s)
continue
raise
# Global Unity connection
_unity_connection = None
def get_unity_connection() -> UnityConnection:
"""Retrieve or establish a persistent Unity connection."""
global _unity_connection
if _unity_connection is not None:
try:
# Try to ping with a short timeout to verify connection
result = _unity_connection.send_command("ping")
# If we get here, the connection is still valid
logger.debug("Reusing existing Unity connection")
return _unity_connection
except Exception as e:
logger.warning(f"Existing connection failed: {str(e)}")
try:
_unity_connection.disconnect()
except:
pass
_unity_connection = None
# Create a new connection
logger.info("Creating new Unity connection")
_unity_connection = UnityConnection()
if not _unity_connection.connect():
_unity_connection = None
raise ConnectionError("Could not connect to Unity. Ensure the Unity Editor and MCP Bridge are running.")
try:
# Verify the new connection works
_unity_connection.send_command("ping")
logger.info("Successfully established new Unity connection")
return _unity_connection
except Exception as e:
logger.error(f"Could not verify new connection: {str(e)}")
try:
_unity_connection.disconnect()
except:
pass
_unity_connection = None
raise ConnectionError(f"Could not establish valid Unity connection: {str(e)}")

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 2bba70ed632654291acae6c529d6ec79
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,349 @@
version = 1
revision = 1
requires-python = ">=3.12"
[[package]]
name = "annotated-types"
version = "0.7.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ee/67/531ea369ba64dcff5ec9c3402f9f51bf748cec26dde048a2f973a4eea7f5/annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89", size = 16081 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/78/b6/6307fbef88d9b5ee7421e68d78a9f162e0da4900bc5f5793f6d3d0e34fb8/annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53", size = 13643 },
]
[[package]]
name = "anyio"
version = "4.9.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "idna" },
{ name = "sniffio" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/95/7d/4c1bd541d4dffa1b52bd83fb8527089e097a106fc90b467a7313b105f840/anyio-4.9.0.tar.gz", hash = "sha256:673c0c244e15788651a4ff38710fea9675823028a6f08a5eda409e0c9840a028", size = 190949 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a1/ee/48ca1a7c89ffec8b6a0c5d02b89c305671d5ffd8d3c94acf8b8c408575bb/anyio-4.9.0-py3-none-any.whl", hash = "sha256:9f76d541cad6e36af7beb62e978876f3b41e3e04f2c1fbf0884604c0a9c4d93c", size = 100916 },
]
[[package]]
name = "certifi"
version = "2025.1.31"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/1c/ab/c9f1e32b7b1bf505bf26f0ef697775960db7932abeb7b516de930ba2705f/certifi-2025.1.31.tar.gz", hash = "sha256:3d5da6925056f6f18f119200434a4780a94263f10d1c21d032a6f6b2baa20651", size = 167577 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/38/fc/bce832fd4fd99766c04d1ee0eead6b0ec6486fb100ae5e74c1d91292b982/certifi-2025.1.31-py3-none-any.whl", hash = "sha256:ca78db4565a652026a4db2bcdf68f2fb589ea80d0be70e03929ed730746b84fe", size = 166393 },
]
[[package]]
name = "click"
version = "8.1.8"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b9/2e/0090cbf739cee7d23781ad4b89a9894a41538e4fcf4c31dcdd705b78eb8b/click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a", size = 226593 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/d4/7ebdbd03970677812aac39c869717059dbb71a4cfc033ca6e5221787892c/click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2", size = 98188 },
]
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335 },
]
[[package]]
name = "h11"
version = "0.14.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f5/38/3af3d3633a34a3316095b39c8e8fb4853a28a536e55d347bd8d8e9a14b03/h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d", size = 100418 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/95/04/ff642e65ad6b90db43e668d70ffb6736436c7ce41fcc549f4e9472234127/h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761", size = 58259 },
]
[[package]]
name = "httpcore"
version = "1.0.7"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/6a/41/d7d0a89eb493922c37d343b607bc1b5da7f5be7e383740b4753ad8943e90/httpcore-1.0.7.tar.gz", hash = "sha256:8551cb62a169ec7162ac7be8d4817d561f60e08eaa485234898414bb5a8a0b4c", size = 85196 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/87/f5/72347bc88306acb359581ac4d52f23c0ef445b57157adedb9aee0cd689d2/httpcore-1.0.7-py3-none-any.whl", hash = "sha256:a3fff8f43dc260d5bd363d9f9cf1830fa3a458b332856f34282de498ed420edd", size = 78551 },
]
[[package]]
name = "httpx"
version = "0.28.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "certifi" },
{ name = "httpcore" },
{ name = "idna" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517 },
]
[[package]]
name = "httpx-sse"
version = "0.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/4c/60/8f4281fa9bbf3c8034fd54c0e7412e66edbab6bc74c4996bd616f8d0406e/httpx-sse-0.4.0.tar.gz", hash = "sha256:1e81a3a3070ce322add1d3529ed42eb5f70817f45ed6ec915ab753f961139721", size = 12624 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e1/9b/a181f281f65d776426002f330c31849b86b31fc9d848db62e16f03ff739f/httpx_sse-0.4.0-py3-none-any.whl", hash = "sha256:f329af6eae57eaa2bdfd962b42524764af68075ea87370a2de920af5341e318f", size = 7819 },
]
[[package]]
name = "idna"
version = "3.10"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f1/70/7703c29685631f5a7590aa73f1f1d3fa9a380e654b86af429e0934a32f7d/idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9", size = 190490 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/76/c6/c88e154df9c4e1a2a66ccf0005a88dfb2650c1dffb6f5ce603dfbd452ce3/idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3", size = 70442 },
]
[[package]]
name = "markdown-it-py"
version = "3.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "mdurl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/38/71/3b932df36c1a044d397a1f92d1cf91ee0a503d91e470cbd670aa66b07ed0/markdown-it-py-3.0.0.tar.gz", hash = "sha256:e3f60a94fa066dc52ec76661e37c851cb232d92f9886b15cb560aaada2df8feb", size = 74596 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/42/d7/1ec15b46af6af88f19b8e5ffea08fa375d433c998b8a7639e76935c14f1f/markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1", size = 87528 },
]
[[package]]
name = "mcp"
version = "1.4.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "httpx" },
{ name = "httpx-sse" },
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "sse-starlette" },
{ name = "starlette" },
{ name = "uvicorn" },
]
sdist = { url = "https://files.pythonhosted.org/packages/50/cc/5c5bb19f1a0f8f89a95e25cb608b0b07009e81fd4b031e519335404e1422/mcp-1.4.1.tar.gz", hash = "sha256:b9655d2de6313f9d55a7d1df62b3c3fe27a530100cc85bf23729145b0dba4c7a", size = 154942 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e8/0e/885f156ade60108e67bf044fada5269da68e29d758a10b0c513f4d85dd76/mcp-1.4.1-py3-none-any.whl", hash = "sha256:a7716b1ec1c054e76f49806f7d96113b99fc1166fc9244c2c6f19867cb75b593", size = 72448 },
]
[package.optional-dependencies]
cli = [
{ name = "python-dotenv" },
{ name = "typer" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979 },
]
[[package]]
name = "pydantic"
version = "2.10.6"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "annotated-types" },
{ name = "pydantic-core" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b7/ae/d5220c5c52b158b1de7ca89fc5edb72f304a70a4c540c84c8844bf4008de/pydantic-2.10.6.tar.gz", hash = "sha256:ca5daa827cce33de7a42be142548b0096bf05a7e7b365aebfa5f8eeec7128236", size = 761681 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f4/3c/8cc1cc84deffa6e25d2d0c688ebb80635dfdbf1dbea3e30c541c8cf4d860/pydantic-2.10.6-py3-none-any.whl", hash = "sha256:427d664bf0b8a2b34ff5dd0f5a18df00591adcee7198fbd71981054cef37b584", size = 431696 },
]
[[package]]
name = "pydantic-core"
version = "2.27.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/fc/01/f3e5ac5e7c25833db5eb555f7b7ab24cd6f8c322d3a3ad2d67a952dc0abc/pydantic_core-2.27.2.tar.gz", hash = "sha256:eb026e5a4c1fee05726072337ff51d1efb6f59090b7da90d30ea58625b1ffb39", size = 413443 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d6/74/51c8a5482ca447871c93e142d9d4a92ead74de6c8dc5e66733e22c9bba89/pydantic_core-2.27.2-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:9e0c8cfefa0ef83b4da9588448b6d8d2a2bf1a53c3f1ae5fca39eb3061e2f0b0", size = 1893127 },
{ url = "https://files.pythonhosted.org/packages/d3/f3/c97e80721735868313c58b89d2de85fa80fe8dfeeed84dc51598b92a135e/pydantic_core-2.27.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:83097677b8e3bd7eaa6775720ec8e0405f1575015a463285a92bfdfe254529ef", size = 1811340 },
{ url = "https://files.pythonhosted.org/packages/9e/91/840ec1375e686dbae1bd80a9e46c26a1e0083e1186abc610efa3d9a36180/pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:172fce187655fece0c90d90a678424b013f8fbb0ca8b036ac266749c09438cb7", size = 1822900 },
{ url = "https://files.pythonhosted.org/packages/f6/31/4240bc96025035500c18adc149aa6ffdf1a0062a4b525c932065ceb4d868/pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:519f29f5213271eeeeb3093f662ba2fd512b91c5f188f3bb7b27bc5973816934", size = 1869177 },
{ url = "https://files.pythonhosted.org/packages/fa/20/02fbaadb7808be578317015c462655c317a77a7c8f0ef274bc016a784c54/pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:05e3a55d124407fffba0dd6b0c0cd056d10e983ceb4e5dbd10dda135c31071d6", size = 2038046 },
{ url = "https://files.pythonhosted.org/packages/06/86/7f306b904e6c9eccf0668248b3f272090e49c275bc488a7b88b0823444a4/pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9c3ed807c7b91de05e63930188f19e921d1fe90de6b4f5cd43ee7fcc3525cb8c", size = 2685386 },
{ url = "https://files.pythonhosted.org/packages/8d/f0/49129b27c43396581a635d8710dae54a791b17dfc50c70164866bbf865e3/pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6fb4aadc0b9a0c063206846d603b92030eb6f03069151a625667f982887153e2", size = 1997060 },
{ url = "https://files.pythonhosted.org/packages/0d/0f/943b4af7cd416c477fd40b187036c4f89b416a33d3cc0ab7b82708a667aa/pydantic_core-2.27.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:28ccb213807e037460326424ceb8b5245acb88f32f3d2777427476e1b32c48c4", size = 2004870 },
{ url = "https://files.pythonhosted.org/packages/35/40/aea70b5b1a63911c53a4c8117c0a828d6790483f858041f47bab0b779f44/pydantic_core-2.27.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:de3cd1899e2c279b140adde9357c4495ed9d47131b4a4eaff9052f23398076b3", size = 1999822 },
{ url = "https://files.pythonhosted.org/packages/f2/b3/807b94fd337d58effc5498fd1a7a4d9d59af4133e83e32ae39a96fddec9d/pydantic_core-2.27.2-cp312-cp312-musllinux_1_1_armv7l.whl", hash = "sha256:220f892729375e2d736b97d0e51466252ad84c51857d4d15f5e9692f9ef12be4", size = 2130364 },
{ url = "https://files.pythonhosted.org/packages/fc/df/791c827cd4ee6efd59248dca9369fb35e80a9484462c33c6649a8d02b565/pydantic_core-2.27.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:a0fcd29cd6b4e74fe8ddd2c90330fd8edf2e30cb52acda47f06dd615ae72da57", size = 2158303 },
{ url = "https://files.pythonhosted.org/packages/9b/67/4e197c300976af185b7cef4c02203e175fb127e414125916bf1128b639a9/pydantic_core-2.27.2-cp312-cp312-win32.whl", hash = "sha256:1e2cb691ed9834cd6a8be61228471d0a503731abfb42f82458ff27be7b2186fc", size = 1834064 },
{ url = "https://files.pythonhosted.org/packages/1f/ea/cd7209a889163b8dcca139fe32b9687dd05249161a3edda62860430457a5/pydantic_core-2.27.2-cp312-cp312-win_amd64.whl", hash = "sha256:cc3f1a99a4f4f9dd1de4fe0312c114e740b5ddead65bb4102884b384c15d8bc9", size = 1989046 },
{ url = "https://files.pythonhosted.org/packages/bc/49/c54baab2f4658c26ac633d798dab66b4c3a9bbf47cff5284e9c182f4137a/pydantic_core-2.27.2-cp312-cp312-win_arm64.whl", hash = "sha256:3911ac9284cd8a1792d3cb26a2da18f3ca26c6908cc434a18f730dc0db7bfa3b", size = 1885092 },
{ url = "https://files.pythonhosted.org/packages/41/b1/9bc383f48f8002f99104e3acff6cba1231b29ef76cfa45d1506a5cad1f84/pydantic_core-2.27.2-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:7d14bd329640e63852364c306f4d23eb744e0f8193148d4044dd3dacdaacbd8b", size = 1892709 },
{ url = "https://files.pythonhosted.org/packages/10/6c/e62b8657b834f3eb2961b49ec8e301eb99946245e70bf42c8817350cbefc/pydantic_core-2.27.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:82f91663004eb8ed30ff478d77c4d1179b3563df6cdb15c0817cd1cdaf34d154", size = 1811273 },
{ url = "https://files.pythonhosted.org/packages/ba/15/52cfe49c8c986e081b863b102d6b859d9defc63446b642ccbbb3742bf371/pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:71b24c7d61131bb83df10cc7e687433609963a944ccf45190cfc21e0887b08c9", size = 1823027 },
{ url = "https://files.pythonhosted.org/packages/b1/1c/b6f402cfc18ec0024120602bdbcebc7bdd5b856528c013bd4d13865ca473/pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:fa8e459d4954f608fa26116118bb67f56b93b209c39b008277ace29937453dc9", size = 1868888 },
{ url = "https://files.pythonhosted.org/packages/bd/7b/8cb75b66ac37bc2975a3b7de99f3c6f355fcc4d89820b61dffa8f1e81677/pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:ce8918cbebc8da707ba805b7fd0b382816858728ae7fe19a942080c24e5b7cd1", size = 2037738 },
{ url = "https://files.pythonhosted.org/packages/c8/f1/786d8fe78970a06f61df22cba58e365ce304bf9b9f46cc71c8c424e0c334/pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:eda3f5c2a021bbc5d976107bb302e0131351c2ba54343f8a496dc8783d3d3a6a", size = 2685138 },
{ url = "https://files.pythonhosted.org/packages/a6/74/d12b2cd841d8724dc8ffb13fc5cef86566a53ed358103150209ecd5d1999/pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bd8086fa684c4775c27f03f062cbb9eaa6e17f064307e86b21b9e0abc9c0f02e", size = 1997025 },
{ url = "https://files.pythonhosted.org/packages/a0/6e/940bcd631bc4d9a06c9539b51f070b66e8f370ed0933f392db6ff350d873/pydantic_core-2.27.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:8d9b3388db186ba0c099a6d20f0604a44eabdeef1777ddd94786cdae158729e4", size = 2004633 },
{ url = "https://files.pythonhosted.org/packages/50/cc/a46b34f1708d82498c227d5d80ce615b2dd502ddcfd8376fc14a36655af1/pydantic_core-2.27.2-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:7a66efda2387de898c8f38c0cf7f14fca0b51a8ef0b24bfea5849f1b3c95af27", size = 1999404 },
{ url = "https://files.pythonhosted.org/packages/ca/2d/c365cfa930ed23bc58c41463bae347d1005537dc8db79e998af8ba28d35e/pydantic_core-2.27.2-cp313-cp313-musllinux_1_1_armv7l.whl", hash = "sha256:18a101c168e4e092ab40dbc2503bdc0f62010e95d292b27827871dc85450d7ee", size = 2130130 },
{ url = "https://files.pythonhosted.org/packages/f4/d7/eb64d015c350b7cdb371145b54d96c919d4db516817f31cd1c650cae3b21/pydantic_core-2.27.2-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:ba5dd002f88b78a4215ed2f8ddbdf85e8513382820ba15ad5ad8955ce0ca19a1", size = 2157946 },
{ url = "https://files.pythonhosted.org/packages/a4/99/bddde3ddde76c03b65dfd5a66ab436c4e58ffc42927d4ff1198ffbf96f5f/pydantic_core-2.27.2-cp313-cp313-win32.whl", hash = "sha256:1ebaf1d0481914d004a573394f4be3a7616334be70261007e47c2a6fe7e50130", size = 1834387 },
{ url = "https://files.pythonhosted.org/packages/71/47/82b5e846e01b26ac6f1893d3c5f9f3a2eb6ba79be26eef0b759b4fe72946/pydantic_core-2.27.2-cp313-cp313-win_amd64.whl", hash = "sha256:953101387ecf2f5652883208769a79e48db18c6df442568a0b5ccd8c2723abee", size = 1990453 },
{ url = "https://files.pythonhosted.org/packages/51/b2/b2b50d5ecf21acf870190ae5d093602d95f66c9c31f9d5de6062eb329ad1/pydantic_core-2.27.2-cp313-cp313-win_arm64.whl", hash = "sha256:ac4dbfd1691affb8f48c2c13241a2e3b60ff23247cbcf981759c768b6633cf8b", size = 1885186 },
]
[[package]]
name = "pydantic-settings"
version = "2.8.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pydantic" },
{ name = "python-dotenv" },
]
sdist = { url = "https://files.pythonhosted.org/packages/88/82/c79424d7d8c29b994fb01d277da57b0a9b09cc03c3ff875f9bd8a86b2145/pydantic_settings-2.8.1.tar.gz", hash = "sha256:d5c663dfbe9db9d5e1c646b2e161da12f0d734d422ee56f567d0ea2cee4e8585", size = 83550 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0b/53/a64f03044927dc47aafe029c42a5b7aabc38dfb813475e0e1bf71c4a59d0/pydantic_settings-2.8.1-py3-none-any.whl", hash = "sha256:81942d5ac3d905f7f3ee1a70df5dfb62d5569c12f51a5a647defc1c3d9ee2e9c", size = 30839 },
]
[[package]]
name = "pygments"
version = "2.19.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/7c/2d/c3338d48ea6cc0feb8446d8e6937e1408088a72a39937982cc6111d17f84/pygments-2.19.1.tar.gz", hash = "sha256:61c16d2a8576dc0649d9f39e089b5f02bcd27fba10d8fb4dcc28173f7a45151f", size = 4968581 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/8a/0b/9fcc47d19c48b59121088dd6da2488a49d5f72dacf8262e2790a1d2c7d15/pygments-2.19.1-py3-none-any.whl", hash = "sha256:9ea1544ad55cecf4b8242fab6dd35a93bbce657034b0611ee383099054ab6d8c", size = 1225293 },
]
[[package]]
name = "python-dotenv"
version = "1.0.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/bc/57/e84d88dfe0aec03b7a2d4327012c1627ab5f03652216c63d49846d7a6c58/python-dotenv-1.0.1.tar.gz", hash = "sha256:e324ee90a023d808f1959c46bcbc04446a10ced277783dc6ee09987c37ec10ca", size = 39115 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/6a/3e/b68c118422ec867fa7ab88444e1274aa40681c606d59ac27de5a5588f082/python_dotenv-1.0.1-py3-none-any.whl", hash = "sha256:f7b63ef50f1b690dddf550d03497b66d609393b40b564ed0d674909a68ebf16a", size = 19863 },
]
[[package]]
name = "rich"
version = "13.9.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ab/3a/0316b28d0761c6734d6bc14e770d85506c986c85ffb239e688eeaab2c2bc/rich-13.9.4.tar.gz", hash = "sha256:439594978a49a09530cff7ebc4b5c7103ef57baf48d5ea3184f21d9a2befa098", size = 223149 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/19/71/39c7c0d87f8d4e6c020a393182060eaefeeae6c01dab6a84ec346f2567df/rich-13.9.4-py3-none-any.whl", hash = "sha256:6049d5e6ec054bf2779ab3358186963bac2ea89175919d699e378b99738c2a90", size = 242424 },
]
[[package]]
name = "shellingham"
version = "1.5.4"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/58/15/8b3609fd3830ef7b27b655beb4b4e9c62313a4e8da8c676e142cc210d58e/shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de", size = 10310 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e0/f9/0595336914c5619e5f28a1fb793285925a8cd4b432c9da0a987836c7f822/shellingham-1.5.4-py2.py3-none-any.whl", hash = "sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686", size = 9755 },
]
[[package]]
name = "sniffio"
version = "1.3.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a2/87/a6771e1546d97e7e041b6ae58d80074f81b7d5121207425c964ddf5cfdbd/sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc", size = 20372 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235 },
]
[[package]]
name = "sse-starlette"
version = "2.2.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "starlette" },
]
sdist = { url = "https://files.pythonhosted.org/packages/71/a4/80d2a11af59fe75b48230846989e93979c892d3a20016b42bb44edb9e398/sse_starlette-2.2.1.tar.gz", hash = "sha256:54470d5f19274aeed6b2d473430b08b4b379ea851d953b11d7f1c4a2c118b419", size = 17376 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d9/e0/5b8bd393f27f4a62461c5cf2479c75a2cc2ffa330976f9f00f5f6e4f50eb/sse_starlette-2.2.1-py3-none-any.whl", hash = "sha256:6410a3d3ba0c89e7675d4c273a301d64649c03a5ef1ca101f10b47f895fd0e99", size = 10120 },
]
[[package]]
name = "starlette"
version = "0.46.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
]
sdist = { url = "https://files.pythonhosted.org/packages/04/1b/52b27f2e13ceedc79a908e29eac426a63465a1a01248e5f24aa36a62aeb3/starlette-0.46.1.tar.gz", hash = "sha256:3c88d58ee4bd1bb807c0d1acb381838afc7752f9ddaec81bbe4383611d833230", size = 2580102 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a0/4b/528ccf7a982216885a1ff4908e886b8fb5f19862d1962f56a3fce2435a70/starlette-0.46.1-py3-none-any.whl", hash = "sha256:77c74ed9d2720138b25875133f3a2dae6d854af2ec37dceb56aef370c1d8a227", size = 71995 },
]
[[package]]
name = "typer"
version = "0.15.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "click" },
{ name = "rich" },
{ name = "shellingham" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/8b/6f/3991f0f1c7fcb2df31aef28e0594d8d54b05393a0e4e34c65e475c2a5d41/typer-0.15.2.tar.gz", hash = "sha256:ab2fab47533a813c49fe1f16b1a370fd5819099c00b119e0633df65f22144ba5", size = 100711 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7f/fc/5b29fea8cee020515ca82cc68e3b8e1e34bb19a3535ad854cac9257b414c/typer-0.15.2-py3-none-any.whl", hash = "sha256:46a499c6107d645a9c13f7ee46c5d5096cae6f5fc57dd11eccbbb9ae3e44ddfc", size = 45061 },
]
[[package]]
name = "typing-extensions"
version = "4.12.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/df/db/f35a00659bc03fec321ba8bce9420de607a1d37f8342eee1863174c69557/typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8", size = 85321 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/26/9f/ad63fc0248c5379346306f8668cda6e2e2e9c95e01216d2b8ffd9ff037d0/typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d", size = 37438 },
]
[[package]]
name = "unitymcpserver"
version = "2.0.0"
source = { editable = "." }
dependencies = [
{ name = "httpx" },
{ name = "mcp", extra = ["cli"] },
]
[package.metadata]
requires-dist = [
{ name = "httpx", specifier = ">=0.27.2" },
{ name = "mcp", extras = ["cli"], specifier = ">=1.4.1" },
]
[[package]]
name = "uvicorn"
version = "0.34.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "click" },
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/4b/4d/938bd85e5bf2edeec766267a5015ad969730bb91e31b44021dfe8b22df6c/uvicorn-0.34.0.tar.gz", hash = "sha256:404051050cd7e905de2c9a7e61790943440b3416f49cb409f965d9dcd0fa73e9", size = 76568 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/61/14/33a3a1352cfa71812a3a21e8c9bfb83f60b0011f5e36f2b1399d51928209/uvicorn-0.34.0-py3-none-any.whl", hash = "sha256:023dc038422502fa28a09c7a30bf2b6991512da7dcdb8fd35fe57cfc154126f4", size = 62315 },
]

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 9c116a2a729ac40348fb4c81c93ea030
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -15,7 +15,7 @@ class ServerConfig:
mcp_port: int = 6500 mcp_port: int = 6500
# Connection settings # Connection settings
connection_timeout: float = 600.0 # 10 minutes timeout connection_timeout: float = 60.0 # default steady-state timeout; retries use shorter timeouts
buffer_size: int = 16 * 1024 * 1024 # 16MB buffer buffer_size: int = 16 * 1024 * 1024 # 16MB buffer
# Logging settings # Logging settings
@ -23,8 +23,8 @@ class ServerConfig:
log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# Server settings # Server settings
max_retries: int = 3 max_retries: int = 8
retry_delay: float = 1.0 retry_delay: float = 0.5
# Create a global config instance # Create a global config instance
config = ServerConfig() config = ServerConfig()

View File

@ -68,12 +68,26 @@ class PortDiscovery:
if data and b'"message":"pong"' in data: if data and b'"message":"pong"' in data:
return True return True
except Exception: except Exception:
# Even if the ping fails, a successful TCP connect is a strong signal. return False
# Fall back to treating the port as viable if connect succeeded.
return True
except Exception: except Exception:
return False return False
return False return False
@staticmethod
def _read_latest_status() -> Optional[dict]:
try:
base = PortDiscovery.get_registry_dir()
status_files = sorted(
(Path(p) for p in glob.glob(str(base / "unity-mcp-status-*.json"))),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if not status_files:
return None
with status_files[0].open('r') as f:
return json.load(f)
except Exception:
return None
@staticmethod @staticmethod
def discover_unity_port() -> int: def discover_unity_port() -> int:
@ -85,6 +99,14 @@ class PortDiscovery:
Returns: Returns:
Port number to connect to Port number to connect to
""" """
# Prefer the latest heartbeat status if it points to a responsive port
status = PortDiscovery._read_latest_status()
if status:
port = status.get('unity_port')
if isinstance(port, int) and PortDiscovery._try_probe_unity_mcp(port):
logger.info(f"Using Unity port from status: {port}")
return port
candidates = PortDiscovery.list_candidate_files() candidates = PortDiscovery.list_candidate_files()
first_seen_port: Optional[int] = None first_seen_port: Optional[int] = None

View File

@ -2,6 +2,8 @@ import socket
import json import json
import logging import logging
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
import time
from typing import Dict, Any from typing import Dict, Any
from config import config from config import config
from port_discovery import PortDiscovery from port_discovery import PortDiscovery
@ -105,64 +107,94 @@ class UnityConnection:
raise raise
def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]: def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Send a command to Unity and return its response.""" """Send a command with retry/backoff and port rediscovery. Pings only when requested."""
if not self.sock and not self.connect(): attempts = max(config.max_retries, 5)
raise ConnectionError("Not connected to Unity") base_backoff = max(0.5, config.retry_delay)
# Special handling for ping command def read_status_file() -> dict | None:
if command_type == "ping":
try: try:
logger.debug("Sending ping to verify connection") status_files = sorted(Path.home().joinpath('.unity-mcp').glob('unity-mcp-status-*.json'), key=lambda p: p.stat().st_mtime, reverse=True)
self.sock.sendall(b"ping") if not status_files:
return None
latest = status_files[0]
with latest.open('r') as f:
return json.load(f)
except Exception:
return None
last_short_timeout = None
for attempt in range(attempts + 1):
try:
# Ensure connected
if not self.sock:
# During retries use short connect timeout
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(1.0)
self.sock.connect((self.host, self.port))
# restore steady-state timeout for receive
self.sock.settimeout(config.connection_timeout)
logger.info(f"Connected to Unity at {self.host}:{self.port}")
# Build payload
if command_type == 'ping':
payload = b'ping'
else:
command = {"type": command_type, "params": params or {}}
payload = json.dumps(command, ensure_ascii=False).encode('utf-8')
# Send
self.sock.sendall(payload)
# During retry bursts use a short receive timeout
if attempt > 0 and last_short_timeout is None:
last_short_timeout = self.sock.gettimeout()
self.sock.settimeout(1.0)
response_data = self.receive_full_response(self.sock) response_data = self.receive_full_response(self.sock)
response = json.loads(response_data.decode('utf-8')) # restore steady-state timeout if changed
if last_short_timeout is not None:
if response.get("status") != "success": self.sock.settimeout(config.connection_timeout)
logger.warning("Ping response was not successful") last_short_timeout = None
self.sock = None
raise ConnectionError("Connection verification failed") # Parse
if command_type == 'ping':
return {"message": "pong"} resp = json.loads(response_data.decode('utf-8'))
if resp.get('status') == 'success' and resp.get('result', {}).get('message') == 'pong':
return {"message": "pong"}
raise Exception("Ping unsuccessful")
resp = json.loads(response_data.decode('utf-8'))
if resp.get('status') == 'error':
err = resp.get('error') or resp.get('message', 'Unknown Unity error')
raise Exception(err)
return resp.get('result', {})
except Exception as e: except Exception as e:
logger.error(f"Ping error: {str(e)}") logger.warning(f"Unity communication attempt {attempt+1} failed: {e}")
self.sock = None try:
raise ConnectionError(f"Connection verification failed: {str(e)}") if self.sock:
self.sock.close()
# Normal command handling finally:
command = {"type": command_type, "params": params or {}} self.sock = None
try:
# Check for very large content that might cause JSON issues # Re-discover port each time
command_size = len(json.dumps(command)) try:
new_port = PortDiscovery.discover_unity_port()
if command_size > config.buffer_size / 2: if new_port != self.port:
logger.warning(f"Large command detected ({command_size} bytes). This might cause issues.") logger.info(f"Unity port changed {self.port} -> {new_port}")
self.port = new_port
logger.info(f"Sending command: {command_type} with params size: {command_size} bytes") except Exception as de:
logger.debug(f"Port discovery failed: {de}")
# Ensure we have a valid JSON string before sending
command_json = json.dumps(command, ensure_ascii=False) if attempt < attempts:
self.sock.sendall(command_json.encode('utf-8')) # If heartbeat indicates reload, keep retries snappy without spamming
status = read_status_file()
response_data = self.receive_full_response(self.sock) backoff = base_backoff * (2 ** attempt)
try: sleep_s = min(backoff, 3.0)
response = json.loads(response_data.decode('utf-8')) if status and (status.get('reloading') or status.get('unity_port') == self.port):
except json.JSONDecodeError as je: sleep_s = min(sleep_s, 0.8)
logger.error(f"JSON decode error: {str(je)}") time.sleep(sleep_s)
# Log partial response for debugging continue
partial_response = response_data.decode('utf-8')[:500] + "..." if len(response_data) > 500 else response_data.decode('utf-8') raise
logger.error(f"Partial response: {partial_response}")
raise Exception(f"Invalid JSON response from Unity: {str(je)}")
if response.get("status") == "error":
error_message = response.get("error") or response.get("message", "Unknown Unity error")
logger.error(f"Unity error: {error_message}")
raise Exception(error_message)
return response.get("result", {})
except Exception as e:
logger.error(f"Communication error with Unity: {str(e)}")
self.sock = None
raise Exception(f"Failed to communicate with Unity: {str(e)}")
# Global Unity connection # Global Unity connection
_unity_connection = None _unity_connection = None