Merge pull request #264 from CoplayDev/feat/telemetry

Added optional telemetry
main
dsarno 2025-09-10 13:00:01 -07:00 committed by GitHub
commit fe962114b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1995 additions and 171 deletions

View File

@ -292,6 +292,18 @@ Help make MCP for Unity better!
--- ---
## 📊 Telemetry & Privacy
Unity MCP includes **privacy-focused, anonymous telemetry** to help us improve the product. We collect usage analytics and performance data, but **never** your code, project names, or personal information.
- **🔒 Anonymous**: Random UUIDs only, no personal data
- **🚫 Easy opt-out**: Set `DISABLE_TELEMETRY=true` environment variable
- **📖 Transparent**: See [TELEMETRY.md](TELEMETRY.md) for full details
Your privacy matters to us. All telemetry is optional and designed to respect your workflow.
---
## Troubleshooting ❓ ## Troubleshooting ❓
<details> <details>

178
TELEMETRY.md Normal file
View File

@ -0,0 +1,178 @@
# Unity MCP Telemetry
Unity MCP includes privacy-focused, anonymous telemetry to help us improve the product. This document explains what data is collected, how to opt out, and our privacy practices.
## 🔒 Privacy First
- **Anonymous**: We use randomly generated UUIDs - no personal information
- **Non-blocking**: Telemetry never interferes with your Unity workflow
- **Easy opt-out**: Simple environment variable or Unity Editor setting
- **Transparent**: All collected data types are documented here
## 📊 What We Collect
### Usage Analytics
- **Tool Usage**: Which MCP tools you use (manage_script, manage_scene, etc.)
- **Performance**: Execution times and success/failure rates
- **System Info**: Unity version, platform (Windows/Mac/Linux), MCP version
- **Milestones**: First-time usage events (first script creation, first tool use, etc.)
### Technical Diagnostics
- **Connection Events**: Bridge startup/connection success/failures
- **Error Reports**: Anonymized error messages (truncated to 200 chars)
- **Server Health**: Startup time, connection latency
### What We **DON'T** Collect
- ❌ Your code or script contents
- ❌ Project names, file names, or paths
- ❌ Personal information or identifiers
- ❌ Sensitive project data
- ❌ IP addresses (beyond what's needed for HTTP requests)
## 🚫 How to Opt Out
### Method 1: Environment Variable (Recommended)
Set any of these environment variables to `true`:
```bash
# Disable all telemetry
export DISABLE_TELEMETRY=true
# Unity MCP specific
export UNITY_MCP_DISABLE_TELEMETRY=true
# MCP protocol wide
export MCP_DISABLE_TELEMETRY=true
```
### Method 2: Unity Editor (Coming Soon)
In Unity Editor: `Window > MCP for Unity > Settings > Disable Telemetry`
### Method 3: Manual Config
Add to your MCP client config:
```json
{
"env": {
"DISABLE_TELEMETRY": "true"
}
}
```
## 🔧 Technical Implementation
### Architecture
- **Python Server**: Core telemetry collection and transmission
- **Unity Bridge**: Local event collection from Unity Editor
- **Anonymous UUIDs**: Generated per-installation for aggregate analytics
- **Thread-safe**: Non-blocking background transmission
- **Fail-safe**: Errors never interrupt your workflow
### Data Storage
Telemetry data is stored locally in:
- **Windows**: `%APPDATA%\UnityMCP\`
- **macOS**: `~/Library/Application Support/UnityMCP/`
- **Linux**: `~/.local/share/UnityMCP/`
Files created:
- `customer_uuid.txt`: Anonymous identifier
- `milestones.json`: One-time events tracker
### Data Transmission
- **Endpoint**: `https://api-prod.coplay.dev/telemetry/events`
- **Method**: HTTPS POST with JSON payload
- **Retry**: Background thread with graceful failure
- **Timeout**: 10 second timeout, no retries on failure
## 📈 How We Use This Data
### Product Improvement
- **Feature Usage**: Understand which tools are most/least used
- **Performance**: Identify slow operations to optimize
- **Reliability**: Track error rates and connection issues
- **Compatibility**: Ensure Unity version compatibility
### Development Priorities
- **Roadmap**: Focus development on most-used features
- **Bug Fixes**: Prioritize fixes based on error frequency
- **Platform Support**: Allocate resources based on platform usage
- **Documentation**: Improve docs for commonly problematic areas
### What We Don't Do
- ❌ Sell data to third parties
- ❌ Use data for advertising/marketing
- ❌ Track individual developers
- ❌ Store sensitive project information
## 🛠️ For Developers
### Testing Telemetry
```bash
cd UnityMcpBridge/UnityMcpServer~/src
python test_telemetry.py
```
### Custom Telemetry Events
```python
from telemetry import record_telemetry, RecordType
record_telemetry(RecordType.USAGE, {
"custom_event": "my_feature_used",
"metadata": "optional_data"
})
```
### Telemetry Status Check
```python
from telemetry import is_telemetry_enabled
if is_telemetry_enabled():
print("Telemetry is active")
else:
print("Telemetry is disabled")
```
## 📋 Data Retention Policy
- **Aggregated Data**: Retained indefinitely for product insights
- **Raw Events**: Automatically purged after 90 days
- **Personal Data**: None collected, so none to purge
- **Opt-out**: Immediate - no data sent after opting out
## 🤝 Contact & Transparency
- **Questions**: [Discord Community](https://discord.gg/y4p8KfzrN4)
- **Issues**: [GitHub Issues](https://github.com/CoplayDev/unity-mcp/issues)
- **Privacy Concerns**: Create a GitHub issue with "Privacy" label
- **Source Code**: All telemetry code is open source in this repository
## 📊 Example Telemetry Event
Here's what a typical telemetry event looks like:
```json
{
"record": "tool_execution",
"timestamp": 1704067200,
"customer_uuid": "550e8400-e29b-41d4-a716-446655440000",
"session_id": "abc123-def456-ghi789",
"version": "3.0.2",
"platform": "posix",
"data": {
"tool_name": "manage_script",
"success": true,
"duration_ms": 42.5
}
}
```
Notice:
- ✅ Anonymous UUID (randomly generated)
- ✅ Tool performance metrics
- ✅ Success/failure tracking
- ❌ No code content
- ❌ No project information
- ❌ No personal data
---
*Unity MCP Telemetry is designed to respect your privacy while helping us build a better tool. Thank you for helping improve Unity MCP!*

View File

@ -0,0 +1,224 @@
using System;
using System.Collections.Generic;
using System.Threading;
using UnityEngine;
namespace MCPForUnity.Editor.Helpers
{
/// <summary>
/// Unity Bridge telemetry helper for collecting usage analytics
/// Following privacy-first approach with easy opt-out mechanisms
/// </summary>
public static class TelemetryHelper
{
private const string TELEMETRY_DISABLED_KEY = "MCPForUnity.TelemetryDisabled";
private const string CUSTOMER_UUID_KEY = "MCPForUnity.CustomerUUID";
private static Action<Dictionary<string, object>> s_sender;
/// <summary>
/// Check if telemetry is enabled (can be disabled via Environment Variable or EditorPrefs)
/// </summary>
public static bool IsEnabled
{
get
{
// Check environment variables first
var envDisable = Environment.GetEnvironmentVariable("DISABLE_TELEMETRY");
if (!string.IsNullOrEmpty(envDisable) &&
(envDisable.ToLower() == "true" || envDisable == "1"))
{
return false;
}
var unityMcpDisable = Environment.GetEnvironmentVariable("UNITY_MCP_DISABLE_TELEMETRY");
if (!string.IsNullOrEmpty(unityMcpDisable) &&
(unityMcpDisable.ToLower() == "true" || unityMcpDisable == "1"))
{
return false;
}
// Honor protocol-wide opt-out as well
var mcpDisable = Environment.GetEnvironmentVariable("MCP_DISABLE_TELEMETRY");
if (!string.IsNullOrEmpty(mcpDisable) &&
(mcpDisable.Equals("true", StringComparison.OrdinalIgnoreCase) || mcpDisable == "1"))
{
return false;
}
// Check EditorPrefs
return !UnityEditor.EditorPrefs.GetBool(TELEMETRY_DISABLED_KEY, false);
}
}
/// <summary>
/// Get or generate customer UUID for anonymous tracking
/// </summary>
public static string GetCustomerUUID()
{
var uuid = UnityEditor.EditorPrefs.GetString(CUSTOMER_UUID_KEY, "");
if (string.IsNullOrEmpty(uuid))
{
uuid = System.Guid.NewGuid().ToString();
UnityEditor.EditorPrefs.SetString(CUSTOMER_UUID_KEY, uuid);
}
return uuid;
}
/// <summary>
/// Disable telemetry (stored in EditorPrefs)
/// </summary>
public static void DisableTelemetry()
{
UnityEditor.EditorPrefs.SetBool(TELEMETRY_DISABLED_KEY, true);
}
/// <summary>
/// Enable telemetry (stored in EditorPrefs)
/// </summary>
public static void EnableTelemetry()
{
UnityEditor.EditorPrefs.SetBool(TELEMETRY_DISABLED_KEY, false);
}
/// <summary>
/// Send telemetry data to Python server for processing
/// This is a lightweight bridge - the actual telemetry logic is in Python
/// </summary>
public static void RecordEvent(string eventType, Dictionary<string, object> data = null)
{
if (!IsEnabled)
return;
try
{
var telemetryData = new Dictionary<string, object>
{
["event_type"] = eventType,
["timestamp"] = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
["customer_uuid"] = GetCustomerUUID(),
["unity_version"] = Application.unityVersion,
["platform"] = Application.platform.ToString(),
["source"] = "unity_bridge"
};
if (data != null)
{
telemetryData["data"] = data;
}
// Send to Python server via existing bridge communication
// The Python server will handle actual telemetry transmission
SendTelemetryToPythonServer(telemetryData);
}
catch (Exception e)
{
// Never let telemetry errors interfere with functionality
if (IsDebugEnabled())
{
Debug.LogWarning($"Telemetry error (non-blocking): {e.Message}");
}
}
}
/// <summary>
/// Allows the bridge to register a concrete sender for telemetry payloads.
/// </summary>
public static void RegisterTelemetrySender(Action<Dictionary<string, object>> sender)
{
Interlocked.Exchange(ref s_sender, sender);
}
public static void UnregisterTelemetrySender()
{
Interlocked.Exchange(ref s_sender, null);
}
/// <summary>
/// Record bridge startup event
/// </summary>
public static void RecordBridgeStartup()
{
RecordEvent("bridge_startup", new Dictionary<string, object>
{
["bridge_version"] = "3.0.2",
["auto_connect"] = MCPForUnityBridge.IsAutoConnectMode()
});
}
/// <summary>
/// Record bridge connection event
/// </summary>
public static void RecordBridgeConnection(bool success, string error = null)
{
var data = new Dictionary<string, object>
{
["success"] = success
};
if (!string.IsNullOrEmpty(error))
{
data["error"] = error.Substring(0, Math.Min(200, error.Length));
}
RecordEvent("bridge_connection", data);
}
/// <summary>
/// Record tool execution from Unity side
/// </summary>
public static void RecordToolExecution(string toolName, bool success, float durationMs, string error = null)
{
var data = new Dictionary<string, object>
{
["tool_name"] = toolName,
["success"] = success,
["duration_ms"] = Math.Round(durationMs, 2)
};
if (!string.IsNullOrEmpty(error))
{
data["error"] = error.Substring(0, Math.Min(200, error.Length));
}
RecordEvent("tool_execution_unity", data);
}
private static void SendTelemetryToPythonServer(Dictionary<string, object> telemetryData)
{
var sender = Volatile.Read(ref s_sender);
if (sender != null)
{
try
{
sender(telemetryData);
return;
}
catch (Exception e)
{
if (IsDebugEnabled())
{
Debug.LogWarning($"Telemetry sender error (non-blocking): {e.Message}");
}
}
}
// Fallback: log when debug is enabled
if (IsDebugEnabled())
{
Debug.Log($"<b><color=#2EA3FF>MCP-TELEMETRY</color></b>: {telemetryData["event_type"]}");
}
}
private static bool IsDebugEnabled()
{
try
{
return UnityEditor.EditorPrefs.GetBool("MCPForUnity.DebugLogs", false);
}
catch
{
return false;
}
}
}
}

View File

@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: b8f3c2d1e7a94f6c8a9b5e3d2c1a0f9e
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.Concurrent;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Net; using System.Net;
@ -25,6 +26,14 @@ namespace MCPForUnity.Editor
private static readonly object startStopLock = new(); private static readonly object startStopLock = new();
private static readonly object clientsLock = new(); private static readonly object clientsLock = new();
private static readonly System.Collections.Generic.HashSet<TcpClient> activeClients = new(); private static readonly System.Collections.Generic.HashSet<TcpClient> activeClients = new();
// Single-writer outbox for framed responses
private class Outbound
{
public byte[] Payload;
public string Tag;
public int? ReqId;
}
private static readonly BlockingCollection<Outbound> _outbox = new(new ConcurrentQueue<Outbound>());
private static CancellationTokenSource cts; private static CancellationTokenSource cts;
private static Task listenerTask; private static Task listenerTask;
private static int processingCommands = 0; private static int processingCommands = 0;
@ -38,11 +47,16 @@ namespace MCPForUnity.Editor
string, string,
(string commandJson, TaskCompletionSource<string> tcs) (string commandJson, TaskCompletionSource<string> tcs)
> commandQueue = new(); > commandQueue = new();
private static int mainThreadId;
private static int currentUnityPort = 6400; // Dynamic port, starts with default private static int currentUnityPort = 6400; // Dynamic port, starts with default
private static bool isAutoConnectMode = false; private static bool isAutoConnectMode = false;
private const ulong MaxFrameBytes = 64UL * 1024 * 1024; // 64 MiB hard cap for framed payloads private const ulong MaxFrameBytes = 64UL * 1024 * 1024; // 64 MiB hard cap for framed payloads
private const int FrameIOTimeoutMs = 30000; // Per-read timeout to avoid stalled clients private const int FrameIOTimeoutMs = 30000; // Per-read timeout to avoid stalled clients
// IO diagnostics
private static long _ioSeq = 0;
private static void IoInfo(string s) { McpLog.Info(s, always: false); }
// Debug helpers // Debug helpers
private static bool IsDebugEnabled() private static bool IsDebugEnabled()
{ {
@ -74,10 +88,16 @@ namespace MCPForUnity.Editor
currentUnityPort = PortManager.GetPortWithFallback(); currentUnityPort = PortManager.GetPortWithFallback();
Start(); Start();
isAutoConnectMode = true; isAutoConnectMode = true;
// Record telemetry for bridge startup
TelemetryHelper.RecordBridgeStartup();
} }
catch (Exception ex) catch (Exception ex)
{ {
Debug.LogError($"Auto-connect failed: {ex.Message}"); Debug.LogError($"Auto-connect failed: {ex.Message}");
// Record telemetry for connection failure
TelemetryHelper.RecordBridgeConnection(false, ex.Message);
throw; throw;
} }
} }
@ -103,6 +123,37 @@ namespace MCPForUnity.Editor
static MCPForUnityBridge() static MCPForUnityBridge()
{ {
// Record the main thread ID for safe thread checks
try { mainThreadId = Thread.CurrentThread.ManagedThreadId; } catch { mainThreadId = 0; }
// Start single writer thread for framed responses
try
{
var writerThread = new Thread(() =>
{
foreach (var item in _outbox.GetConsumingEnumerable())
{
try
{
long seq = Interlocked.Increment(ref _ioSeq);
IoInfo($"[IO] ➜ write start seq={seq} tag={item.Tag} len={(item.Payload?.Length ?? 0)} reqId={(item.ReqId?.ToString() ?? "?")}");
var sw = System.Diagnostics.Stopwatch.StartNew();
// Note: We currently have a per-connection 'stream' in the client handler. For simplicity,
// writes are performed inline there. This outbox provides single-writer semantics; if a shared
// stream is introduced, redirect here accordingly.
// No-op: actual write happens in client loop using WriteFrameAsync
sw.Stop();
IoInfo($"[IO] ✓ write end tag={item.Tag} len={(item.Payload?.Length ?? 0)} reqId={(item.ReqId?.ToString() ?? "?")} durMs={sw.Elapsed.TotalMilliseconds:F1}");
}
catch (Exception ex)
{
IoInfo($"[IO] ✗ write FAIL tag={item.Tag} reqId={(item.ReqId?.ToString() ?? "?")} {ex.GetType().Name}: {ex.Message}");
}
}
}) { IsBackground = true, Name = "MCP-Writer" };
writerThread.Start();
}
catch { }
// Skip bridge in headless/batch environments (CI/builds) unless explicitly allowed via env // Skip bridge in headless/batch environments (CI/builds) unless explicitly allowed via env
// CI override: set UNITY_MCP_ALLOW_BATCH=1 to allow the bridge in batch mode // CI override: set UNITY_MCP_ALLOW_BATCH=1 to allow the bridge in batch mode
if (Application.isBatchMode && string.IsNullOrWhiteSpace(Environment.GetEnvironmentVariable("UNITY_MCP_ALLOW_BATCH"))) if (Application.isBatchMode && string.IsNullOrWhiteSpace(Environment.GetEnvironmentVariable("UNITY_MCP_ALLOW_BATCH")))
@ -533,9 +584,69 @@ namespace MCPForUnity.Editor
commandQueue[commandId] = (commandText, tcs); commandQueue[commandId] = (commandText, tcs);
} }
string response = await tcs.Task.ConfigureAwait(false); // Wait for the handler to produce a response, but do not block indefinitely
byte[] responseBytes = System.Text.Encoding.UTF8.GetBytes(response); string response;
await WriteFrameAsync(stream, responseBytes); try
{
using var respCts = new CancellationTokenSource(FrameIOTimeoutMs);
var completed = await Task.WhenAny(tcs.Task, Task.Delay(FrameIOTimeoutMs, respCts.Token)).ConfigureAwait(false);
if (completed == tcs.Task)
{
// Got a result from the handler
respCts.Cancel();
response = tcs.Task.Result;
}
else
{
// Timeout: return a structured error so the client can recover
var timeoutResponse = new
{
status = "error",
error = $"Command processing timed out after {FrameIOTimeoutMs} ms",
};
response = JsonConvert.SerializeObject(timeoutResponse);
}
}
catch (Exception ex)
{
var errorResponse = new
{
status = "error",
error = ex.Message,
};
response = JsonConvert.SerializeObject(errorResponse);
}
if (IsDebugEnabled())
{
try { MCPForUnity.Editor.Helpers.McpLog.Info("[MCP] sending framed response", always: false); } catch { }
}
// Crash-proof and self-reporting writer logs (direct write to this client's stream)
long seq = System.Threading.Interlocked.Increment(ref _ioSeq);
byte[] responseBytes;
try
{
responseBytes = System.Text.Encoding.UTF8.GetBytes(response);
IoInfo($"[IO] ➜ write start seq={seq} tag=response len={responseBytes.Length} reqId=?");
}
catch (Exception ex)
{
IoInfo($"[IO] ✗ serialize FAIL tag=response reqId=? {ex.GetType().Name}: {ex.Message}");
throw;
}
var swDirect = System.Diagnostics.Stopwatch.StartNew();
try
{
await WriteFrameAsync(stream, responseBytes);
swDirect.Stop();
IoInfo($"[IO] ✓ write end tag=response len={responseBytes.Length} reqId=? durMs={swDirect.Elapsed.TotalMilliseconds:F1}");
}
catch (Exception ex)
{
IoInfo($"[IO] ✗ write FAIL tag=response reqId=? {ex.GetType().Name}: {ex.Message}");
throw;
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -810,6 +921,66 @@ namespace MCPForUnity.Editor
} }
} }
// Invoke the given function on the Unity main thread and wait up to timeoutMs for the result.
// Returns null on timeout or error; caller should provide a fallback error response.
private static object InvokeOnMainThreadWithTimeout(Func<object> func, int timeoutMs)
{
if (func == null) return null;
try
{
// If mainThreadId is unknown, assume we're on main thread to avoid blocking the editor.
if (mainThreadId == 0)
{
try { return func(); }
catch (Exception ex) { throw new InvalidOperationException($"Main thread handler error: {ex.Message}", ex); }
}
// If we are already on the main thread, execute directly to avoid deadlocks
try
{
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
return func();
}
}
catch { }
object result = null;
Exception captured = null;
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
EditorApplication.delayCall += () =>
{
try
{
result = func();
}
catch (Exception ex)
{
captured = ex;
}
finally
{
try { tcs.TrySetResult(true); } catch { }
}
};
// Wait for completion with timeout (Editor thread will pump delayCall)
bool completed = tcs.Task.Wait(timeoutMs);
if (!completed)
{
return null; // timeout
}
if (captured != null)
{
throw new InvalidOperationException($"Main thread handler error: {captured.Message}", captured);
}
return result;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Failed to invoke on main thread: {ex.Message}", ex);
}
}
// Helper method to check if a string is valid JSON // Helper method to check if a string is valid JSON
private static bool IsValidJson(string text) private static bool IsValidJson(string text)
{ {
@ -874,7 +1045,9 @@ namespace MCPForUnity.Editor
// Maps the command type (tool name) to the corresponding handler's static HandleCommand method // Maps the command type (tool name) to the corresponding handler's static HandleCommand method
// Assumes each handler class has a static method named 'HandleCommand' that takes JObject parameters // Assumes each handler class has a static method named 'HandleCommand' that takes JObject parameters
"manage_script" => ManageScript.HandleCommand(paramsObject), "manage_script" => ManageScript.HandleCommand(paramsObject),
"manage_scene" => ManageScene.HandleCommand(paramsObject), // Run scene operations on the main thread to avoid deadlocks/hangs (with diagnostics under debug flag)
"manage_scene" => HandleManageScene(paramsObject)
?? throw new TimeoutException($"manage_scene timed out after {FrameIOTimeoutMs} ms on main thread"),
"manage_editor" => ManageEditor.HandleCommand(paramsObject), "manage_editor" => ManageEditor.HandleCommand(paramsObject),
"manage_gameobject" => ManageGameObject.HandleCommand(paramsObject), "manage_gameobject" => ManageGameObject.HandleCommand(paramsObject),
"manage_asset" => ManageAsset.HandleCommand(paramsObject), "manage_asset" => ManageAsset.HandleCommand(paramsObject),
@ -912,6 +1085,23 @@ namespace MCPForUnity.Editor
} }
} }
private static object HandleManageScene(JObject paramsObject)
{
try
{
if (IsDebugEnabled()) Debug.Log("[MCP] manage_scene: dispatching to main thread");
var sw = System.Diagnostics.Stopwatch.StartNew();
var r = InvokeOnMainThreadWithTimeout(() => ManageScene.HandleCommand(paramsObject), FrameIOTimeoutMs);
sw.Stop();
if (IsDebugEnabled()) Debug.Log($"[MCP] manage_scene: completed in {sw.ElapsedMilliseconds} ms");
return r ?? Response.Error("manage_scene returned null (timeout or error)");
}
catch (Exception ex)
{
return Response.Error($"manage_scene dispatch error: {ex.Message}");
}
}
// Helper method to get a summary of parameters for error reporting // Helper method to get a summary of parameters for error reporting
private static string GetParamsSummary(JObject @params) private static string GetParamsSummary(JObject @params)
{ {

View File

@ -16,15 +16,46 @@ namespace MCPForUnity.Editor.Tools
/// </summary> /// </summary>
public static class ManageScene public static class ManageScene
{ {
private sealed class SceneCommand
{
public string action { get; set; } = string.Empty;
public string name { get; set; } = string.Empty;
public string path { get; set; } = string.Empty;
public int? buildIndex { get; set; }
}
private static SceneCommand ToSceneCommand(JObject p)
{
if (p == null) return new SceneCommand();
int? BI(JToken t)
{
if (t == null || t.Type == JTokenType.Null) return null;
var s = t.ToString().Trim();
if (s.Length == 0) return null;
if (int.TryParse(s, out var i)) return i;
if (double.TryParse(s, out var d)) return (int)d;
return t.Type == JTokenType.Integer ? t.Value<int>() : (int?)null;
}
return new SceneCommand
{
action = (p["action"]?.ToString() ?? string.Empty).Trim().ToLowerInvariant(),
name = p["name"]?.ToString() ?? string.Empty,
path = p["path"]?.ToString() ?? string.Empty,
buildIndex = BI(p["buildIndex"] ?? p["build_index"])
};
}
/// <summary> /// <summary>
/// Main handler for scene management actions. /// Main handler for scene management actions.
/// </summary> /// </summary>
public static object HandleCommand(JObject @params) public static object HandleCommand(JObject @params)
{ {
string action = @params["action"]?.ToString().ToLower(); try { McpLog.Info("[ManageScene] HandleCommand: start", always: false); } catch { }
string name = @params["name"]?.ToString(); var cmd = ToSceneCommand(@params);
string path = @params["path"]?.ToString(); // Relative to Assets/ string action = cmd.action;
int? buildIndex = @params["buildIndex"]?.ToObject<int?>(); string name = string.IsNullOrEmpty(cmd.name) ? null : cmd.name;
string path = string.IsNullOrEmpty(cmd.path) ? null : cmd.path; // Relative to Assets/
int? buildIndex = cmd.buildIndex;
// bool loadAdditive = @params["loadAdditive"]?.ToObject<bool>() ?? false; // Example for future extension // bool loadAdditive = @params["loadAdditive"]?.ToObject<bool>() ?? false; // Example for future extension
// Ensure path is relative to Assets/, removing any leading "Assets/" // Ensure path is relative to Assets/, removing any leading "Assets/"
@ -76,6 +107,7 @@ namespace MCPForUnity.Editor.Tools
} }
// Route action // Route action
try { McpLog.Info($"[ManageScene] Route action='{action}' name='{name}' path='{path}' buildIndex={(buildIndex.HasValue ? buildIndex.Value.ToString() : "null")}", always: false); } catch { }
switch (action) switch (action)
{ {
case "create": case "create":
@ -98,9 +130,15 @@ namespace MCPForUnity.Editor.Tools
// Save current scene, optionally to a new path // Save current scene, optionally to a new path
return SaveScene(fullPath, relativePath); return SaveScene(fullPath, relativePath);
case "get_hierarchy": case "get_hierarchy":
return GetSceneHierarchy(); try { McpLog.Info("[ManageScene] get_hierarchy: entering", always: false); } catch { }
var gh = GetSceneHierarchy();
try { McpLog.Info("[ManageScene] get_hierarchy: exiting", always: false); } catch { }
return gh;
case "get_active": case "get_active":
return GetActiveSceneInfo(); try { McpLog.Info("[ManageScene] get_active: entering", always: false); } catch { }
var ga = GetActiveSceneInfo();
try { McpLog.Info("[ManageScene] get_active: exiting", always: false); } catch { }
return ga;
case "get_build_settings": case "get_build_settings":
return GetBuildSettingsScenes(); return GetBuildSettingsScenes();
// Add cases for modifying build settings, additive loading, unloading etc. // Add cases for modifying build settings, additive loading, unloading etc.
@ -294,7 +332,9 @@ namespace MCPForUnity.Editor.Tools
{ {
try try
{ {
try { McpLog.Info("[ManageScene] get_active: querying EditorSceneManager.GetActiveScene", always: false); } catch { }
Scene activeScene = EditorSceneManager.GetActiveScene(); Scene activeScene = EditorSceneManager.GetActiveScene();
try { McpLog.Info($"[ManageScene] get_active: got scene valid={activeScene.IsValid()} loaded={activeScene.isLoaded} name='{activeScene.name}'", always: false); } catch { }
if (!activeScene.IsValid()) if (!activeScene.IsValid())
{ {
return Response.Error("No active scene found."); return Response.Error("No active scene found.");
@ -314,6 +354,7 @@ namespace MCPForUnity.Editor.Tools
} }
catch (Exception e) catch (Exception e)
{ {
try { McpLog.Error($"[ManageScene] get_active: exception {e.Message}"); } catch { }
return Response.Error($"Error getting active scene info: {e.Message}"); return Response.Error($"Error getting active scene info: {e.Message}");
} }
} }
@ -348,7 +389,9 @@ namespace MCPForUnity.Editor.Tools
{ {
try try
{ {
try { McpLog.Info("[ManageScene] get_hierarchy: querying EditorSceneManager.GetActiveScene", always: false); } catch { }
Scene activeScene = EditorSceneManager.GetActiveScene(); Scene activeScene = EditorSceneManager.GetActiveScene();
try { McpLog.Info($"[ManageScene] get_hierarchy: got scene valid={activeScene.IsValid()} loaded={activeScene.isLoaded} name='{activeScene.name}'", always: false); } catch { }
if (!activeScene.IsValid() || !activeScene.isLoaded) if (!activeScene.IsValid() || !activeScene.isLoaded)
{ {
return Response.Error( return Response.Error(
@ -356,16 +399,21 @@ namespace MCPForUnity.Editor.Tools
); );
} }
try { McpLog.Info("[ManageScene] get_hierarchy: fetching root objects", always: false); } catch { }
GameObject[] rootObjects = activeScene.GetRootGameObjects(); GameObject[] rootObjects = activeScene.GetRootGameObjects();
try { McpLog.Info($"[ManageScene] get_hierarchy: rootCount={rootObjects?.Length ?? 0}", always: false); } catch { }
var hierarchy = rootObjects.Select(go => GetGameObjectDataRecursive(go)).ToList(); var hierarchy = rootObjects.Select(go => GetGameObjectDataRecursive(go)).ToList();
return Response.Success( var resp = Response.Success(
$"Retrieved hierarchy for scene '{activeScene.name}'.", $"Retrieved hierarchy for scene '{activeScene.name}'.",
hierarchy hierarchy
); );
try { McpLog.Info("[ManageScene] get_hierarchy: success", always: false); } catch { }
return resp;
} }
catch (Exception e) catch (Exception e)
{ {
try { McpLog.Error($"[ManageScene] get_hierarchy: exception {e.Message}"); } catch { }
return Response.Error($"Error getting scene hierarchy: {e.Message}"); return Response.Error($"Error getting scene hierarchy: {e.Message}");
} }
} }

View File

@ -15,7 +15,7 @@ class ServerConfig:
mcp_port: int = 6500 mcp_port: int = 6500
# Connection settings # Connection settings
connection_timeout: float = 60.0 # default steady-state timeout; retries use shorter timeouts connection_timeout: float = 1.0 # short initial timeout; retries use shorter timeouts
buffer_size: int = 16 * 1024 * 1024 # 16MB buffer buffer_size: int = 16 * 1024 * 1024 # 16MB buffer
# Framed receive behavior # Framed receive behavior
framed_receive_timeout: float = 2.0 # max seconds to wait while consuming heartbeats only framed_receive_timeout: float = 2.0 # max seconds to wait while consuming heartbeats only
@ -33,6 +33,11 @@ class ServerConfig:
# Number of polite retries when Unity reports reloading # Number of polite retries when Unity reports reloading
# 40 × 250ms ≈ 10s default window # 40 × 250ms ≈ 10s default window
reload_max_retries: int = 40 reload_max_retries: int = 40
# Telemetry settings
telemetry_enabled: bool = True
# Align with telemetry.py default Cloud Run endpoint
telemetry_endpoint: str = "https://unity-mcp-telemetry-375728817078.us-central1.run.app/telemetry/events"
# Create a global config instance # Create a global config instance
config = ServerConfig() config = ServerConfig()

View File

@ -0,0 +1 @@
3.3.0

View File

@ -1,19 +1,64 @@
from mcp.server.fastmcp import FastMCP, Context, Image from mcp.server.fastmcp import FastMCP, Context, Image
import logging import logging
from logging.handlers import RotatingFileHandler
import os
from dataclasses import dataclass from dataclasses import dataclass
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any, List from typing import AsyncIterator, Dict, Any, List
from config import config from config import config
from tools import register_all_tools from tools import register_all_tools
from unity_connection import get_unity_connection, UnityConnection from unity_connection import get_unity_connection, UnityConnection
import time
# Configure logging using settings from config # Configure logging using settings from config
logging.basicConfig( logging.basicConfig(
level=getattr(logging, config.log_level), level=getattr(logging, config.log_level),
format=config.log_format format=config.log_format,
stream=None, # None -> defaults to sys.stderr; avoid stdout used by MCP stdio
force=True # Ensure our handler replaces any prior stdout handlers
) )
logger = logging.getLogger("mcp-for-unity-server") logger = logging.getLogger("mcp-for-unity-server")
# Also write logs to a rotating file so logs are available when launched via stdio
try:
import os as _os
_log_dir = _os.path.join(_os.path.expanduser("~/Library/Application Support/UnityMCP"), "Logs")
_os.makedirs(_log_dir, exist_ok=True)
_file_path = _os.path.join(_log_dir, "unity_mcp_server.log")
_fh = RotatingFileHandler(_file_path, maxBytes=512*1024, backupCount=2, encoding="utf-8")
_fh.setFormatter(logging.Formatter(config.log_format))
_fh.setLevel(getattr(logging, config.log_level))
logger.addHandler(_fh)
# Also route telemetry logger to the same rotating file and normal level
try:
tlog = logging.getLogger("unity-mcp-telemetry")
tlog.setLevel(getattr(logging, config.log_level))
tlog.addHandler(_fh)
except Exception:
# Never let logging setup break startup
pass
except Exception:
# Never let logging setup break startup
pass
# Quieten noisy third-party loggers to avoid clutter during stdio handshake
for noisy in ("httpx", "urllib3"):
try:
logging.getLogger(noisy).setLevel(max(logging.WARNING, getattr(logging, config.log_level)))
except Exception:
pass
# Import telemetry only after logging is configured to ensure its logs use stderr and proper levels
# Ensure a slightly higher telemetry timeout unless explicitly overridden by env
try:
# Ensure generous timeout unless explicitly overridden by env
if not os.environ.get("UNITY_MCP_TELEMETRY_TIMEOUT"):
os.environ["UNITY_MCP_TELEMETRY_TIMEOUT"] = "5.0"
except Exception:
pass
from telemetry import record_telemetry, record_milestone, RecordType, MilestoneType
# Global connection state # Global connection state
_unity_connection: UnityConnection = None _unity_connection: UnityConnection = None
@ -22,12 +67,76 @@ async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Handle server startup and shutdown.""" """Handle server startup and shutdown."""
global _unity_connection global _unity_connection
logger.info("MCP for Unity Server starting up") logger.info("MCP for Unity Server starting up")
# Record server startup telemetry
start_time = time.time()
start_clk = time.perf_counter()
try: try:
_unity_connection = get_unity_connection() from pathlib import Path
logger.info("Connected to Unity on startup") ver_path = Path(__file__).parent / "server-version.txt"
except Exception as e: server_version = ver_path.read_text(encoding="utf-8").strip()
logger.warning(f"Could not connect to Unity on startup: {str(e)}") except Exception:
server_version = "unknown"
# Defer initial telemetry by 1s to avoid stdio handshake interference
import threading
def _emit_startup():
try:
record_telemetry(RecordType.STARTUP, {
"server_version": server_version,
"startup_time": start_time,
})
record_milestone(MilestoneType.FIRST_STARTUP)
except Exception:
logger.debug("Deferred startup telemetry failed", exc_info=True)
threading.Timer(1.0, _emit_startup).start()
try:
skip_connect = os.environ.get("UNITY_MCP_SKIP_STARTUP_CONNECT", "").lower() in ("1", "true", "yes", "on")
if skip_connect:
logger.info("Skipping Unity connection on startup (UNITY_MCP_SKIP_STARTUP_CONNECT=1)")
else:
_unity_connection = get_unity_connection()
logger.info("Connected to Unity on startup")
# Record successful Unity connection (deferred)
import threading as _t
_t.Timer(1.0, lambda: record_telemetry(
RecordType.UNITY_CONNECTION,
{
"status": "connected",
"connection_time_ms": (time.perf_counter() - start_clk) * 1000,
}
)).start()
except ConnectionError as e:
logger.warning("Could not connect to Unity on startup: %s", e)
_unity_connection = None _unity_connection = None
# Record connection failure (deferred)
import threading as _t
_err_msg = str(e)[:200]
_t.Timer(1.0, lambda: record_telemetry(
RecordType.UNITY_CONNECTION,
{
"status": "failed",
"error": _err_msg,
"connection_time_ms": (time.perf_counter() - start_clk) * 1000,
}
)).start()
except Exception as e:
logger.warning("Unexpected error connecting to Unity on startup: %s", e)
_unity_connection = None
import threading as _t
_err_msg = str(e)[:200]
_t.Timer(1.0, lambda: record_telemetry(
RecordType.UNITY_CONNECTION,
{
"status": "failed",
"error": _err_msg,
"connection_time_ms": (time.perf_counter() - start_clk) * 1000,
}
)).start()
try: try:
# Yield the connection object so it can be attached to the context # Yield the connection object so it can be attached to the context
# The key 'bridge' matches how tools like read_console expect to access it (ctx.bridge) # The key 'bridge' matches how tools like read_console expect to access it (ctx.bridge)
@ -54,18 +163,18 @@ register_all_tools(mcp)
def asset_creation_strategy() -> str: def asset_creation_strategy() -> str:
"""Guide for discovering and using MCP for Unity tools effectively.""" """Guide for discovering and using MCP for Unity tools effectively."""
return ( return (
"Available MCP for Unity Server Tools:\\n\\n" "Available MCP for Unity Server Tools:\n\n"
"- `manage_editor`: Controls editor state and queries info.\\n" "- `manage_editor`: Controls editor state and queries info.\n"
"- `execute_menu_item`: Executes Unity Editor menu items by path.\\n" "- `execute_menu_item`: Executes Unity Editor menu items by path.\n"
"- `read_console`: Reads or clears Unity console messages, with filtering options.\\n" "- `read_console`: Reads or clears Unity console messages, with filtering options.\n"
"- `manage_scene`: Manages scenes.\\n" "- `manage_scene`: Manages scenes.\n"
"- `manage_gameobject`: Manages GameObjects in the scene.\\n" "- `manage_gameobject`: Manages GameObjects in the scene.\n"
"- `manage_script`: Manages C# script files.\\n" "- `manage_script`: Manages C# script files.\n"
"- `manage_asset`: Manages prefabs and assets.\\n" "- `manage_asset`: Manages prefabs and assets.\n"
"- `manage_shader`: Manages shaders.\\n\\n" "- `manage_shader`: Manages shaders.\n\n"
"Tips:\\n" "Tips:\n"
"- Create prefabs for reusable GameObjects.\\n" "- Create prefabs for reusable GameObjects.\n"
"- Always include a camera and main light in your scenes.\\n" "- Always include a camera and main light in your scenes.\n"
) )
# Run the server # Run the server

View File

@ -1 +1 @@
3.3.0 3.3.1

View File

@ -0,0 +1,431 @@
"""
Privacy-focused, anonymous telemetry system for Unity MCP
Inspired by Onyx's telemetry implementation with Unity-specific adaptations
"""
import uuid
import threading
"""
Fire-and-forget telemetry sender with a single background worker.
- No context/thread-local propagation to avoid re-entrancy into tool resolution.
- Small network timeouts to prevent stalls.
"""
import json
import time
import os
import sys
import platform
import logging
from enum import Enum
from urllib.parse import urlparse
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, List
from pathlib import Path
import importlib
import queue
import contextlib
try:
import httpx
HAS_HTTPX = True
except ImportError:
httpx = None # type: ignore
HAS_HTTPX = False
logger = logging.getLogger("unity-mcp-telemetry")
class RecordType(str, Enum):
"""Types of telemetry records we collect"""
VERSION = "version"
STARTUP = "startup"
USAGE = "usage"
LATENCY = "latency"
FAILURE = "failure"
TOOL_EXECUTION = "tool_execution"
UNITY_CONNECTION = "unity_connection"
CLIENT_CONNECTION = "client_connection"
class MilestoneType(str, Enum):
"""Major user journey milestones"""
FIRST_STARTUP = "first_startup"
FIRST_TOOL_USAGE = "first_tool_usage"
FIRST_SCRIPT_CREATION = "first_script_creation"
FIRST_SCENE_MODIFICATION = "first_scene_modification"
MULTIPLE_SESSIONS = "multiple_sessions"
DAILY_ACTIVE_USER = "daily_active_user"
WEEKLY_ACTIVE_USER = "weekly_active_user"
@dataclass
class TelemetryRecord:
"""Structure for telemetry data"""
record_type: RecordType
timestamp: float
customer_uuid: str
session_id: str
data: Dict[str, Any]
milestone: Optional[MilestoneType] = None
class TelemetryConfig:
"""Telemetry configuration"""
def __init__(self):
# Prefer config file, then allow env overrides
server_config = None
for modname in (
"UnityMcpBridge.UnityMcpServer~.src.config",
"UnityMcpBridge.UnityMcpServer.src.config",
"src.config",
"config",
):
try:
mod = importlib.import_module(modname)
server_config = getattr(mod, "config", None)
if server_config is not None:
break
except Exception:
continue
# Determine enabled flag: config -> env DISABLE_* opt-out
cfg_enabled = True if server_config is None else bool(getattr(server_config, "telemetry_enabled", True))
self.enabled = cfg_enabled and not self._is_disabled()
# Telemetry endpoint (Cloud Run default; override via env)
cfg_default = None if server_config is None else getattr(server_config, "telemetry_endpoint", None)
default_ep = cfg_default or "https://unity-mcp-telemetry-375728817078.us-central1.run.app/telemetry/events"
self.default_endpoint = default_ep
self.endpoint = self._validated_endpoint(
os.environ.get("UNITY_MCP_TELEMETRY_ENDPOINT", default_ep),
default_ep,
)
try:
logger.info(
"Telemetry configured: endpoint=%s (default=%s), timeout_env=%s",
self.endpoint,
default_ep,
os.environ.get("UNITY_MCP_TELEMETRY_TIMEOUT") or "<unset>"
)
except Exception:
pass
# Local storage for UUID and milestones
self.data_dir = self._get_data_directory()
self.uuid_file = self.data_dir / "customer_uuid.txt"
self.milestones_file = self.data_dir / "milestones.json"
# Request timeout (small, fail fast). Override with UNITY_MCP_TELEMETRY_TIMEOUT
try:
self.timeout = float(os.environ.get("UNITY_MCP_TELEMETRY_TIMEOUT", "1.5"))
except Exception:
self.timeout = 1.5
try:
logger.info("Telemetry timeout=%.2fs", self.timeout)
except Exception:
pass
# Session tracking
self.session_id = str(uuid.uuid4())
def _is_disabled(self) -> bool:
"""Check if telemetry is disabled via environment variables"""
disable_vars = [
"DISABLE_TELEMETRY",
"UNITY_MCP_DISABLE_TELEMETRY",
"MCP_DISABLE_TELEMETRY"
]
for var in disable_vars:
if os.environ.get(var, "").lower() in ("true", "1", "yes", "on"):
return True
return False
def _get_data_directory(self) -> Path:
"""Get directory for storing telemetry data"""
if os.name == 'nt': # Windows
base_dir = Path(os.environ.get('APPDATA', Path.home() / 'AppData' / 'Roaming'))
elif os.name == 'posix': # macOS/Linux
if 'darwin' in os.uname().sysname.lower(): # macOS
base_dir = Path.home() / 'Library' / 'Application Support'
else: # Linux
base_dir = Path(os.environ.get('XDG_DATA_HOME', Path.home() / '.local' / 'share'))
else:
base_dir = Path.home() / '.unity-mcp'
data_dir = base_dir / 'UnityMCP'
data_dir.mkdir(parents=True, exist_ok=True)
return data_dir
def _validated_endpoint(self, candidate: str, fallback: str) -> str:
"""Validate telemetry endpoint URL scheme; allow only http/https.
Falls back to the provided default on error.
"""
try:
parsed = urlparse(candidate)
if parsed.scheme not in ("https", "http"):
raise ValueError(f"Unsupported scheme: {parsed.scheme}")
# Basic sanity: require network location and path
if not parsed.netloc:
raise ValueError("Missing netloc in endpoint")
# Reject localhost/loopback endpoints in production to avoid accidental local overrides
host = parsed.hostname or ""
if host in ("localhost", "127.0.0.1", "::1"):
raise ValueError("Localhost endpoints are not allowed for telemetry")
return candidate
except Exception as e:
logger.debug(
f"Invalid telemetry endpoint '{candidate}', using default. Error: {e}",
exc_info=True,
)
return fallback
class TelemetryCollector:
"""Main telemetry collection class"""
def __init__(self):
self.config = TelemetryConfig()
self._customer_uuid: Optional[str] = None
self._milestones: Dict[str, Dict[str, Any]] = {}
self._lock: threading.Lock = threading.Lock()
# Bounded queue with single background worker (records only; no context propagation)
self._queue: "queue.Queue[TelemetryRecord]" = queue.Queue(maxsize=1000)
# Load persistent data before starting worker so first events have UUID
self._load_persistent_data()
self._worker: threading.Thread = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start()
def _load_persistent_data(self):
"""Load UUID and milestones from disk"""
# Load customer UUID
try:
if self.config.uuid_file.exists():
self._customer_uuid = self.config.uuid_file.read_text(encoding="utf-8").strip() or str(uuid.uuid4())
else:
self._customer_uuid = str(uuid.uuid4())
try:
self.config.uuid_file.write_text(self._customer_uuid, encoding="utf-8")
if os.name == "posix":
os.chmod(self.config.uuid_file, 0o600)
except OSError as e:
logger.debug(f"Failed to persist customer UUID: {e}", exc_info=True)
except OSError as e:
logger.debug(f"Failed to load customer UUID: {e}", exc_info=True)
self._customer_uuid = str(uuid.uuid4())
# Load milestones (failure here must not affect UUID)
try:
if self.config.milestones_file.exists():
content = self.config.milestones_file.read_text(encoding="utf-8")
self._milestones = json.loads(content) or {}
if not isinstance(self._milestones, dict):
self._milestones = {}
except (OSError, json.JSONDecodeError, ValueError) as e:
logger.debug(f"Failed to load milestones: {e}", exc_info=True)
self._milestones = {}
def _save_milestones(self):
"""Save milestones to disk. Caller must hold self._lock."""
try:
self.config.milestones_file.write_text(
json.dumps(self._milestones, indent=2),
encoding="utf-8",
)
except OSError as e:
logger.warning(f"Failed to save milestones: {e}", exc_info=True)
def record_milestone(self, milestone: MilestoneType, data: Optional[Dict[str, Any]] = None) -> bool:
"""Record a milestone event, returns True if this is the first occurrence"""
if not self.config.enabled:
return False
milestone_key = milestone.value
with self._lock:
if milestone_key in self._milestones:
return False # Already recorded
milestone_data = {
"timestamp": time.time(),
"data": data or {},
}
self._milestones[milestone_key] = milestone_data
self._save_milestones()
# Also send as telemetry record
self.record(
record_type=RecordType.USAGE,
data={"milestone": milestone_key, **(data or {})},
milestone=milestone
)
return True
def record(self,
record_type: RecordType,
data: Dict[str, Any],
milestone: Optional[MilestoneType] = None):
"""Record a telemetry event (async, non-blocking)"""
if not self.config.enabled:
return
# Allow fallback sender when httpx is unavailable (no early return)
record = TelemetryRecord(
record_type=record_type,
timestamp=time.time(),
customer_uuid=self._customer_uuid or "unknown",
session_id=self.config.session_id,
data=data,
milestone=milestone
)
# Enqueue for background worker (non-blocking). Drop on backpressure.
try:
self._queue.put_nowait(record)
except queue.Full:
logger.debug("Telemetry queue full; dropping %s", record.record_type)
def _worker_loop(self):
"""Background worker that serializes telemetry sends."""
while True:
rec = self._queue.get()
try:
# Run sender directly; do not reuse caller context/thread-locals
self._send_telemetry(rec)
except Exception:
logger.debug("Telemetry worker send failed", exc_info=True)
finally:
with contextlib.suppress(Exception):
self._queue.task_done()
def _send_telemetry(self, record: TelemetryRecord):
"""Send telemetry data to endpoint"""
try:
# System fingerprint (top-level remains concise; details stored in data JSON)
_platform = platform.system() # 'Darwin' | 'Linux' | 'Windows'
_source = sys.platform # 'darwin' | 'linux' | 'win32'
_platform_detail = f"{_platform} {platform.release()} ({platform.machine()})"
_python_version = platform.python_version()
# Enrich data JSON so BigQuery stores detailed fields without schema change
enriched_data = dict(record.data or {})
enriched_data.setdefault("platform_detail", _platform_detail)
enriched_data.setdefault("python_version", _python_version)
payload = {
"record": record.record_type.value,
"timestamp": record.timestamp,
"customer_uuid": record.customer_uuid,
"session_id": record.session_id,
"data": enriched_data,
"version": "3.0.2", # Unity MCP version
"platform": _platform,
"source": _source,
}
if record.milestone:
payload["milestone"] = record.milestone.value
# Prefer httpx when available; otherwise fall back to urllib
if httpx:
with httpx.Client(timeout=self.config.timeout) as client:
# Re-validate endpoint at send time to handle dynamic changes
endpoint = self.config._validated_endpoint(self.config.endpoint, self.config.default_endpoint)
response = client.post(endpoint, json=payload)
if 200 <= response.status_code < 300:
logger.debug(f"Telemetry sent: {record.record_type}")
else:
logger.warning(f"Telemetry failed: HTTP {response.status_code}")
else:
import urllib.request
import urllib.error
data_bytes = json.dumps(payload).encode("utf-8")
endpoint = self.config._validated_endpoint(self.config.endpoint, self.config.default_endpoint)
req = urllib.request.Request(
endpoint,
data=data_bytes,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=self.config.timeout) as resp:
if 200 <= resp.getcode() < 300:
logger.debug(f"Telemetry sent (urllib): {record.record_type}")
else:
logger.warning(f"Telemetry failed (urllib): HTTP {resp.getcode()}")
except urllib.error.URLError as ue:
logger.warning(f"Telemetry send failed (urllib): {ue}")
except Exception as e:
# Never let telemetry errors interfere with app functionality
logger.debug(f"Telemetry send failed: {e}")
# Global telemetry instance
_telemetry_collector: Optional[TelemetryCollector] = None
def get_telemetry() -> TelemetryCollector:
"""Get the global telemetry collector instance"""
global _telemetry_collector
if _telemetry_collector is None:
_telemetry_collector = TelemetryCollector()
return _telemetry_collector
def record_telemetry(record_type: RecordType,
data: Dict[str, Any],
milestone: Optional[MilestoneType] = None):
"""Convenience function to record telemetry"""
get_telemetry().record(record_type, data, milestone)
def record_milestone(milestone: MilestoneType, data: Optional[Dict[str, Any]] = None) -> bool:
"""Convenience function to record a milestone"""
return get_telemetry().record_milestone(milestone, data)
def record_tool_usage(tool_name: str, success: bool, duration_ms: float, error: Optional[str] = None, sub_action: Optional[str] = None):
"""Record tool usage telemetry
Args:
tool_name: Name of the tool invoked (e.g., 'manage_scene').
success: Whether the tool completed successfully.
duration_ms: Execution duration in milliseconds.
error: Optional error message (truncated if present).
sub_action: Optional sub-action/operation within the tool (e.g., 'get_hierarchy').
"""
data = {
"tool_name": tool_name,
"success": success,
"duration_ms": round(duration_ms, 2)
}
if sub_action is not None:
try:
data["sub_action"] = str(sub_action)
except Exception:
# Ensure telemetry is never disruptive
data["sub_action"] = "unknown"
if error:
data["error"] = str(error)[:200] # Limit error message length
record_telemetry(RecordType.TOOL_EXECUTION, data)
def record_latency(operation: str, duration_ms: float, metadata: Optional[Dict[str, Any]] = None):
"""Record latency telemetry"""
data = {
"operation": operation,
"duration_ms": round(duration_ms, 2)
}
if metadata:
data.update(metadata)
record_telemetry(RecordType.LATENCY, data)
def record_failure(component: str, error: str, metadata: Optional[Dict[str, Any]] = None):
"""Record failure telemetry"""
data = {
"component": component,
"error": str(error)[:500] # Limit error message length
}
if metadata:
data.update(metadata)
record_telemetry(RecordType.FAILURE, data)
def is_telemetry_enabled() -> bool:
"""Check if telemetry is enabled"""
return get_telemetry().config.enabled

View File

@ -0,0 +1,101 @@
"""
Telemetry decorator for Unity MCP tools
"""
import functools
import time
import inspect
import logging
from typing import Callable, Any
from telemetry import record_tool_usage, record_milestone, MilestoneType
_log = logging.getLogger("unity-mcp-telemetry")
_decorator_log_count = 0
def telemetry_tool(tool_name: str):
"""Decorator to add telemetry tracking to MCP tools"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def _sync_wrapper(*args, **kwargs) -> Any:
start_time = time.time()
success = False
error = None
# Extract sub-action (e.g., 'get_hierarchy') from bound args when available
sub_action = None
try:
sig = inspect.signature(func)
bound = sig.bind_partial(*args, **kwargs)
bound.apply_defaults()
sub_action = bound.arguments.get("action")
except Exception:
sub_action = None
try:
global _decorator_log_count
if _decorator_log_count < 10:
_log.info(f"telemetry_decorator sync: tool={tool_name}")
_decorator_log_count += 1
result = func(*args, **kwargs)
success = True
action_val = sub_action or kwargs.get("action")
try:
if tool_name == "manage_script" and action_val == "create":
record_milestone(MilestoneType.FIRST_SCRIPT_CREATION)
elif tool_name.startswith("manage_scene"):
record_milestone(MilestoneType.FIRST_SCENE_MODIFICATION)
record_milestone(MilestoneType.FIRST_TOOL_USAGE)
except Exception:
_log.debug("milestone emit failed", exc_info=True)
return result
except Exception as e:
error = str(e)
raise
finally:
duration_ms = (time.time() - start_time) * 1000
try:
record_tool_usage(tool_name, success, duration_ms, error, sub_action=sub_action)
except Exception:
_log.debug("record_tool_usage failed", exc_info=True)
@functools.wraps(func)
async def _async_wrapper(*args, **kwargs) -> Any:
start_time = time.time()
success = False
error = None
# Extract sub-action (e.g., 'get_hierarchy') from bound args when available
sub_action = None
try:
sig = inspect.signature(func)
bound = sig.bind_partial(*args, **kwargs)
bound.apply_defaults()
sub_action = bound.arguments.get("action")
except Exception:
sub_action = None
try:
global _decorator_log_count
if _decorator_log_count < 10:
_log.info(f"telemetry_decorator async: tool={tool_name}")
_decorator_log_count += 1
result = await func(*args, **kwargs)
success = True
action_val = sub_action or kwargs.get("action")
try:
if tool_name == "manage_script" and action_val == "create":
record_milestone(MilestoneType.FIRST_SCRIPT_CREATION)
elif tool_name.startswith("manage_scene"):
record_milestone(MilestoneType.FIRST_SCENE_MODIFICATION)
record_milestone(MilestoneType.FIRST_TOOL_USAGE)
except Exception:
_log.debug("milestone emit failed", exc_info=True)
return result
except Exception as e:
error = str(e)
raise
finally:
duration_ms = (time.time() - start_time) * 1000
try:
record_tool_usage(tool_name, success, duration_ms, error, sub_action=sub_action)
except Exception:
_log.debug("record_tool_usage failed", exc_info=True)
return _async_wrapper if inspect.iscoroutinefunction(func) else _sync_wrapper
return decorator

View File

@ -0,0 +1,156 @@
#!/usr/bin/env python3
"""
Test script for Unity MCP Telemetry System
Run this to verify telemetry is working correctly
"""
import os
import time
import sys
from pathlib import Path
# Add src to Python path for imports
sys.path.insert(0, str(Path(__file__).parent))
def test_telemetry_basic():
"""Test basic telemetry functionality"""
# Avoid stdout noise in tests
try:
from telemetry import (
get_telemetry, record_telemetry, record_milestone,
RecordType, MilestoneType, is_telemetry_enabled
)
pass
except ImportError as e:
# Silent failure path for tests
return False
# Test telemetry enabled status
_ = is_telemetry_enabled()
# Test basic record
try:
record_telemetry(RecordType.VERSION, {
"version": "3.0.2",
"test_run": True
})
pass
except Exception as e:
# Silent failure path for tests
return False
# Test milestone recording
try:
is_first = record_milestone(MilestoneType.FIRST_STARTUP, {
"test_mode": True
})
_ = is_first
except Exception as e:
# Silent failure path for tests
return False
# Test telemetry collector
try:
collector = get_telemetry()
_ = collector
except Exception as e:
# Silent failure path for tests
return False
return True
def test_telemetry_disabled():
"""Test telemetry with disabled state"""
# Silent for tests
# Set environment variable to disable telemetry
os.environ["DISABLE_TELEMETRY"] = "true"
# Re-import to get fresh config
import importlib
import telemetry
importlib.reload(telemetry)
from telemetry import is_telemetry_enabled, record_telemetry, RecordType
_ = is_telemetry_enabled()
if not is_telemetry_enabled():
pass
# Test that records are ignored when disabled
record_telemetry(RecordType.USAGE, {"test": "should_be_ignored"})
pass
return True
else:
pass
return False
def test_data_storage():
"""Test data storage functionality"""
# Silent for tests
try:
from telemetry import get_telemetry
collector = get_telemetry()
data_dir = collector.config.data_dir
_ = (data_dir, collector.config.uuid_file, collector.config.milestones_file)
# Check if files exist
if collector.config.uuid_file.exists():
pass
else:
pass
if collector.config.milestones_file.exists():
pass
else:
pass
return True
except Exception as e:
# Silent failure path for tests
return False
def main():
"""Run all telemetry tests"""
# Silent runner for CI
tests = [
test_telemetry_basic,
test_data_storage,
test_telemetry_disabled,
]
passed = 0
failed = 0
for test in tests:
try:
if test():
passed += 1
pass
else:
failed += 1
pass
except Exception as e:
failed += 1
pass
_ = (passed, failed)
if failed == 0:
pass
return True
else:
pass
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@ -7,12 +7,15 @@ from unity_connection import get_unity_connection, send_command_with_retry # Im
from config import config from config import config
import time import time
from telemetry_decorator import telemetry_tool
def register_execute_menu_item_tools(mcp: FastMCP): def register_execute_menu_item_tools(mcp: FastMCP):
"""Registers the execute_menu_item tool with the MCP server.""" """Registers the execute_menu_item tool with the MCP server."""
@mcp.tool() @mcp.tool()
async def execute_menu_item( @telemetry_tool("execute_menu_item")
ctx: Context, def execute_menu_item(
ctx: Any,
menu_path: str, menu_path: str,
action: str = 'execute', action: str = 'execute',
parameters: Dict[str, Any] = None, parameters: Dict[str, Any] = None,

View File

@ -9,12 +9,15 @@ from unity_connection import get_unity_connection, async_send_command_with_retry
from config import config from config import config
import time import time
from telemetry_decorator import telemetry_tool
def register_manage_asset_tools(mcp: FastMCP): def register_manage_asset_tools(mcp: FastMCP):
"""Registers the manage_asset tool with the MCP server.""" """Registers the manage_asset tool with the MCP server."""
@mcp.tool() @mcp.tool()
@telemetry_tool("manage_asset")
async def manage_asset( async def manage_asset(
ctx: Context, ctx: Any,
action: str, action: str,
path: str, path: str,
asset_type: str = None, asset_type: str = None,
@ -24,8 +27,8 @@ def register_manage_asset_tools(mcp: FastMCP):
search_pattern: str = None, search_pattern: str = None,
filter_type: str = None, filter_type: str = None,
filter_date_after: str = None, filter_date_after: str = None,
page_size: int = None, page_size: Any = None,
page_number: int = None page_number: Any = None
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Performs asset operations (import, create, modify, delete, etc.) in Unity. """Performs asset operations (import, create, modify, delete, etc.) in Unity.
@ -50,6 +53,25 @@ def register_manage_asset_tools(mcp: FastMCP):
if properties is None: if properties is None:
properties = {} properties = {}
# Coerce numeric inputs defensively
def _coerce_int(value, default=None):
if value is None:
return default
try:
if isinstance(value, bool):
return default
if isinstance(value, int):
return int(value)
s = str(value).strip()
if s.lower() in ("", "none", "null"):
return default
return int(float(s))
except Exception:
return default
page_size = _coerce_int(page_size)
page_number = _coerce_int(page_number)
# Prepare parameters for the C# handler # Prepare parameters for the C# handler
params_dict = { params_dict = {
"action": action.lower(), "action": action.lower(),

View File

@ -4,10 +4,25 @@ from typing import Dict, Any
from unity_connection import get_unity_connection, send_command_with_retry from unity_connection import get_unity_connection, send_command_with_retry
from config import config from config import config
from telemetry_decorator import telemetry_tool
from telemetry import is_telemetry_enabled, record_tool_usage
def register_manage_editor_tools(mcp: FastMCP): def register_manage_editor_tools(mcp: FastMCP):
"""Register all editor management tools with the MCP server.""" """Register all editor management tools with the MCP server."""
@mcp.tool() @mcp.tool(description=(
"Controls and queries the Unity editor's state and settings.\n\n"
"Args:\n"
"- ctx: Context object (required)\n"
"- action: Operation (e.g., 'play', 'pause', 'get_state', 'set_active_tool', 'add_tag')\n"
"- wait_for_completion: Optional. If True, waits for certain actions\n"
"- tool_name: Tool name for specific actions\n"
"- tag_name: Tag name for specific actions\n"
"- layer_name: Layer name for specific actions\n\n"
"Returns:\n"
"Dictionary with operation results ('success', 'message', 'data')."
))
@telemetry_tool("manage_editor")
def manage_editor( def manage_editor(
ctx: Context, ctx: Context,
action: str, action: str,
@ -17,17 +32,14 @@ def register_manage_editor_tools(mcp: FastMCP):
tag_name: str = None, tag_name: str = None,
layer_name: str = None, layer_name: str = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Controls and queries the Unity editor's state and settings.
Args:
action: Operation (e.g., 'play', 'pause', 'get_state', 'set_active_tool', 'add_tag').
wait_for_completion: Optional. If True, waits for certain actions.
Action-specific arguments (e.g., tool_name, tag_name, layer_name).
Returns:
Dictionary with operation results ('success', 'message', 'data').
"""
try: try:
# Diagnostics: quick telemetry checks
if action == "telemetry_status":
return {"success": True, "telemetry_enabled": is_telemetry_enabled()}
if action == "telemetry_ping":
record_tool_usage("diagnostic_ping", True, 1.0, None)
return {"success": True, "message": "telemetry ping queued"}
# Prepare parameters, removing None values # Prepare parameters, removing None values
params = { params = {
"action": action, "action": action,

View File

@ -4,12 +4,15 @@ from unity_connection import get_unity_connection, send_command_with_retry
from config import config from config import config
import time import time
from telemetry_decorator import telemetry_tool
def register_manage_gameobject_tools(mcp: FastMCP): def register_manage_gameobject_tools(mcp: FastMCP):
"""Register all GameObject management tools with the MCP server.""" """Register all GameObject management tools with the MCP server."""
@mcp.tool() @mcp.tool()
@telemetry_tool("manage_gameobject")
def manage_gameobject( def manage_gameobject(
ctx: Context, ctx: Any,
action: str, action: str,
target: str = None, # GameObject identifier by name or path target: str = None, # GameObject identifier by name or path
search_method: str = None, search_method: str = None,
@ -119,9 +122,9 @@ def register_manage_gameobject_tools(mcp: FastMCP):
params["prefabPath"] = constructed_path.replace("\\", "/") params["prefabPath"] = constructed_path.replace("\\", "/")
elif not params["prefabPath"].lower().endswith(".prefab"): elif not params["prefabPath"].lower().endswith(".prefab"):
return {"success": False, "message": f"Invalid prefab_path: '{params['prefabPath']}' must end with .prefab"} return {"success": False, "message": f"Invalid prefab_path: '{params['prefabPath']}' must end with .prefab"}
# Ensure prefab_folder itself isn't sent if prefabPath was constructed or provided # Ensure prefabFolder itself isn't sent if prefabPath was constructed or provided
# The C# side only needs the final prefabPath # The C# side only needs the final prefabPath
params.pop("prefab_folder", None) params.pop("prefabFolder", None)
# -------------------------------- # --------------------------------
# Use centralized retry helper # Use centralized retry helper

View File

@ -4,16 +4,19 @@ from unity_connection import get_unity_connection, send_command_with_retry
from config import config from config import config
import time import time
from telemetry_decorator import telemetry_tool
def register_manage_scene_tools(mcp: FastMCP): def register_manage_scene_tools(mcp: FastMCP):
"""Register all scene management tools with the MCP server.""" """Register all scene management tools with the MCP server."""
@mcp.tool() @mcp.tool()
@telemetry_tool("manage_scene")
def manage_scene( def manage_scene(
ctx: Context, ctx: Context,
action: str, action: str,
name: str, name: str = "",
path: str, path: str = "",
build_index: int, build_index: Any = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Manages Unity scenes (load, save, create, get hierarchy, etc.). """Manages Unity scenes (load, save, create, get hierarchy, etc.).
@ -28,13 +31,31 @@ def register_manage_scene_tools(mcp: FastMCP):
Dictionary with results ('success', 'message', 'data'). Dictionary with results ('success', 'message', 'data').
""" """
try: try:
params = { # Coerce numeric inputs defensively
"action": action, def _coerce_int(value, default=None):
"name": name, if value is None:
"path": path, return default
"buildIndex": build_index try:
} if isinstance(value, bool):
params = {k: v for k, v in params.items() if v is not None} return default
if isinstance(value, int):
return int(value)
s = str(value).strip()
if s.lower() in ("", "none", "null"):
return default
return int(float(s))
except Exception:
return default
coerced_build_index = _coerce_int(build_index, default=None)
params = {"action": action}
if name:
params["name"] = name
if path:
params["path"] = path
if coerced_build_index is not None:
params["buildIndex"] = coerced_build_index
# Use centralized retry helper # Use centralized retry helper
response = send_command_with_retry("manage_scene", params) response = send_command_with_retry("manage_scene", params)

View File

@ -5,6 +5,16 @@ import base64
import os import os
from urllib.parse import urlparse, unquote from urllib.parse import urlparse, unquote
try:
from telemetry_decorator import telemetry_tool
from telemetry import record_milestone, MilestoneType
HAS_TELEMETRY = True
except ImportError:
HAS_TELEMETRY = False
def telemetry_tool(tool_name: str):
def decorator(func):
return func
return decorator
def register_manage_script_tools(mcp: FastMCP): def register_manage_script_tools(mcp: FastMCP):
"""Register all script management tools with the MCP server.""" """Register all script management tools with the MCP server."""
@ -80,6 +90,7 @@ def register_manage_script_tools(mcp: FastMCP):
"- For method/class operations, use script_apply_edits (safer, structured edits)\n" "- For method/class operations, use script_apply_edits (safer, structured edits)\n"
"- For pattern-based replacements, consider anchor operations in script_apply_edits\n" "- For pattern-based replacements, consider anchor operations in script_apply_edits\n"
)) ))
@telemetry_tool("apply_text_edits")
def apply_text_edits( def apply_text_edits(
ctx: Context, ctx: Context,
uri: str, uri: str,
@ -346,6 +357,7 @@ def register_manage_script_tools(mcp: FastMCP):
"Args: path (e.g., 'Assets/Scripts/My.cs'), contents (string), script_type, namespace.\n" "Args: path (e.g., 'Assets/Scripts/My.cs'), contents (string), script_type, namespace.\n"
"Rules: path must be under Assets/. Contents will be Base64-encoded over transport.\n" "Rules: path must be under Assets/. Contents will be Base64-encoded over transport.\n"
)) ))
@telemetry_tool("create_script")
def create_script( def create_script(
ctx: Context, ctx: Context,
path: str, path: str,
@ -385,6 +397,7 @@ def register_manage_script_tools(mcp: FastMCP):
"Args: uri (unity://path/... or file://... or Assets/...).\n" "Args: uri (unity://path/... or file://... or Assets/...).\n"
"Rules: Target must resolve under Assets/.\n" "Rules: Target must resolve under Assets/.\n"
)) ))
@telemetry_tool("delete_script")
def delete_script(ctx: Context, uri: str) -> Dict[str, Any]: def delete_script(ctx: Context, uri: str) -> Dict[str, Any]:
"""Delete a C# script by URI.""" """Delete a C# script by URI."""
name, directory = _split_uri(uri) name, directory = _split_uri(uri)
@ -396,12 +409,14 @@ def register_manage_script_tools(mcp: FastMCP):
@mcp.tool(description=( @mcp.tool(description=(
"Validate a C# script and return diagnostics.\n\n" "Validate a C# script and return diagnostics.\n\n"
"Args: uri, level=('basic'|'standard').\n" "Args: uri, level=('basic'|'standard'), include_diagnostics (bool, optional).\n"
"- basic: quick syntax checks.\n" "- basic: quick syntax checks.\n"
"- standard: deeper checks (performance hints, common pitfalls).\n" "- standard: deeper checks (performance hints, common pitfalls).\n"
"- include_diagnostics: when true, returns full diagnostics and summary; default returns counts only.\n"
)) ))
@telemetry_tool("validate_script")
def validate_script( def validate_script(
ctx: Context, uri: str, level: str = "basic" ctx: Context, uri: str, level: str = "basic", include_diagnostics: bool = False
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Validate a C# script and return diagnostics.""" """Validate a C# script and return diagnostics."""
name, directory = _split_uri(uri) name, directory = _split_uri(uri)
@ -418,8 +433,10 @@ def register_manage_script_tools(mcp: FastMCP):
resp = send_command_with_retry("manage_script", params) resp = send_command_with_retry("manage_script", params)
if isinstance(resp, dict) and resp.get("success"): if isinstance(resp, dict) and resp.get("success"):
diags = resp.get("data", {}).get("diagnostics", []) or [] diags = resp.get("data", {}).get("diagnostics", []) or []
warnings = sum(d.get("severity", "").lower() == "warning" for d in diags) warnings = sum(1 for d in diags if str(d.get("severity", "")).lower() == "warning")
errors = sum(d.get("severity", "").lower() in ("error", "fatal") for d in diags) errors = sum(1 for d in diags if str(d.get("severity", "")).lower() in ("error", "fatal"))
if include_diagnostics:
return {"success": True, "data": {"diagnostics": diags, "summary": {"warnings": warnings, "errors": errors}}}
return {"success": True, "data": {"warnings": warnings, "errors": errors}} return {"success": True, "data": {"warnings": warnings, "errors": errors}}
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)} return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
@ -429,6 +446,7 @@ def register_manage_script_tools(mcp: FastMCP):
"Args: name (no .cs), path (Assets/...), contents (for create), script_type, namespace.\n" "Args: name (no .cs), path (Assets/...), contents (for create), script_type, namespace.\n"
"Notes: prefer apply_text_edits (ranges) or script_apply_edits (structured) for edits.\n" "Notes: prefer apply_text_edits (ranges) or script_apply_edits (structured) for edits.\n"
)) ))
@telemetry_tool("manage_script")
def manage_script( def manage_script(
ctx: Context, ctx: Context,
action: str, action: str,
@ -560,6 +578,7 @@ def register_manage_script_tools(mcp: FastMCP):
"Get manage_script capabilities (supported ops, limits, and guards).\n\n" "Get manage_script capabilities (supported ops, limits, and guards).\n\n"
"Returns:\n- ops: list of supported structured ops\n- text_ops: list of supported text ops\n- max_edit_payload_bytes: server edit payload cap\n- guards: header/using guard enabled flag\n" "Returns:\n- ops: list of supported structured ops\n- text_ops: list of supported text ops\n- max_edit_payload_bytes: server edit payload cap\n- guards: header/using guard enabled flag\n"
)) ))
@telemetry_tool("manage_script_capabilities")
def manage_script_capabilities(ctx: Context) -> Dict[str, Any]: def manage_script_capabilities(ctx: Context) -> Dict[str, Any]:
try: try:
# Keep in sync with server/Editor ManageScript implementation # Keep in sync with server/Editor ManageScript implementation
@ -583,10 +602,11 @@ def register_manage_script_tools(mcp: FastMCP):
return {"success": False, "error": f"capabilities error: {e}"} return {"success": False, "error": f"capabilities error: {e}"}
@mcp.tool(description=( @mcp.tool(description=(
"Get SHA256 and metadata for a Unity C# script without returning file contents.\n\n" "Get SHA256 and basic metadata for a Unity C# script without returning file contents.\n\n"
"Args: uri (unity://path/Assets/... or file://... or Assets/...).\n" "Args: uri (unity://path/Assets/... or file://... or Assets/...).\n"
"Returns: {sha256, lengthBytes, lastModifiedUtc, uri, path}." "Returns: {sha256, lengthBytes}."
)) ))
@telemetry_tool("get_sha")
def get_sha(ctx: Context, uri: str) -> Dict[str, Any]: def get_sha(ctx: Context, uri: str) -> Dict[str, Any]:
"""Return SHA256 and basic metadata for a script.""" """Return SHA256 and basic metadata for a script."""
try: try:
@ -595,13 +615,8 @@ def register_manage_script_tools(mcp: FastMCP):
resp = send_command_with_retry("manage_script", params) resp = send_command_with_retry("manage_script", params)
if isinstance(resp, dict) and resp.get("success"): if isinstance(resp, dict) and resp.get("success"):
data = resp.get("data", {}) data = resp.get("data", {})
return { minimal = {"sha256": data.get("sha256"), "lengthBytes": data.get("lengthBytes")}
"success": True, return {"success": True, "data": minimal}
"data": {
"sha256": data.get("sha256"),
"lengthBytes": data.get("lengthBytes"),
},
}
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)} return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
except Exception as e: except Exception as e:
return {"success": False, "message": f"get_sha error: {e}"} return {"success": False, "message": f"get_sha error: {e}"}

View File

@ -1,10 +1,12 @@
from mcp.server.fastmcp import FastMCP, Context from mcp.server.fastmcp import FastMCP, Context
from typing import Dict, Any, List, Tuple from typing import Dict, Any, List, Tuple, Optional
import base64 import base64
import re import re
import os import os
from unity_connection import send_command_with_retry from unity_connection import send_command_with_retry
from telemetry_decorator import telemetry_tool
def _apply_edits_locally(original_text: str, edits: List[Dict[str, Any]]) -> str: def _apply_edits_locally(original_text: str, edits: List[Dict[str, Any]]) -> str:
text = original_text text = original_text
@ -316,22 +318,42 @@ def register_manage_script_edits_tools(mcp: FastMCP):
"Do NOT use: new_method, anchor_method, content, newText (aliases accepted but normalized).\n\n" "Do NOT use: new_method, anchor_method, content, newText (aliases accepted but normalized).\n\n"
"Examples:\n" "Examples:\n"
"1) Replace a method:\n" "1) Replace a method:\n"
"{ 'name':'SmartReach','path':'Assets/Scripts/Interaction','edits':[\n" "{\n"
" { 'op':'replace_method','className':'SmartReach','methodName':'HasTarget',\n" " \"name\": \"SmartReach\",\n"
" 'replacement':'public bool HasTarget(){ return currentTarget!=null; }' }\n" " \"path\": \"Assets/Scripts/Interaction\",\n"
"], 'options':{'validate':'standard','refresh':'immediate'} }\n\n" " \"edits\": [\n"
" {\n"
" \"op\": \"replace_method\",\n"
" \"className\": \"SmartReach\",\n"
" \"methodName\": \"HasTarget\",\n"
" \"replacement\": \"public bool HasTarget(){ return currentTarget!=null; }\"\n"
" }\n"
" ],\n"
" \"options\": {\"validate\": \"standard\", \"refresh\": \"immediate\"}\n"
"}\n\n"
"2) Insert a method after another:\n" "2) Insert a method after another:\n"
"{ 'name':'SmartReach','path':'Assets/Scripts/Interaction','edits':[\n" "{\n"
" { 'op':'insert_method','className':'SmartReach','replacement':'public void PrintSeries(){ Debug.Log(seriesName); }',\n" " \"name\": \"SmartReach\",\n"
" 'position':'after','afterMethodName':'GetCurrentTarget' }\n" " \"path\": \"Assets/Scripts/Interaction\",\n"
"] }\n" " \"edits\": [\n"
" {\n"
" \"op\": \"insert_method\",\n"
" \"className\": \"SmartReach\",\n"
" \"replacement\": \"public void PrintSeries(){ Debug.Log(seriesName); }\",\n"
" \"position\": \"after\",\n"
" \"afterMethodName\": \"GetCurrentTarget\"\n"
" }\n"
" ]\n"
"}\n\n"
"Note: 'options' must be an object/dict, not a string. Use proper JSON syntax.\n"
)) ))
@telemetry_tool("script_apply_edits")
def script_apply_edits( def script_apply_edits(
ctx: Context, ctx: Context,
name: str, name: str,
path: str, path: str,
edits: List[Dict[str, Any]], edits: List[Dict[str, Any]],
options: Dict[str, Any] | None = None, options: Optional[Dict[str, Any]] = None,
script_type: str = "MonoBehaviour", script_type: str = "MonoBehaviour",
namespace: str = "", namespace: str = "",
) -> Dict[str, Any]: ) -> Dict[str, Any]:

View File

@ -6,12 +6,15 @@ import time
import os import os
import base64 import base64
from telemetry_decorator import telemetry_tool
def register_manage_shader_tools(mcp: FastMCP): def register_manage_shader_tools(mcp: FastMCP):
"""Register all shader script management tools with the MCP server.""" """Register all shader script management tools with the MCP server."""
@mcp.tool() @mcp.tool()
@telemetry_tool("manage_shader")
def manage_shader( def manage_shader(
ctx: Context, ctx: Any,
action: str, action: str,
name: str, name: str,
path: str, path: str,

View File

@ -6,16 +6,18 @@ import time
from mcp.server.fastmcp import FastMCP, Context from mcp.server.fastmcp import FastMCP, Context
from unity_connection import get_unity_connection, send_command_with_retry from unity_connection import get_unity_connection, send_command_with_retry
from config import config from config import config
from telemetry_decorator import telemetry_tool
def register_read_console_tools(mcp: FastMCP): def register_read_console_tools(mcp: FastMCP):
"""Registers the read_console tool with the MCP server.""" """Registers the read_console tool with the MCP server."""
@mcp.tool() @mcp.tool()
@telemetry_tool("read_console")
def read_console( def read_console(
ctx: Context, ctx: Context,
action: str = None, action: str = None,
types: List[str] = None, types: List[str] = None,
count: int = None, count: Any = None,
filter_text: str = None, filter_text: str = None,
since_timestamp: str = None, since_timestamp: str = None,
format: str = None, format: str = None,
@ -40,21 +42,34 @@ def register_read_console_tools(mcp: FastMCP):
# Get the connection instance # Get the connection instance
bridge = get_unity_connection() bridge = get_unity_connection()
# Set defaults if values are None (conservative but useful for CI) # Set defaults if values are None
action = action if action is not None else 'get' action = action if action is not None else 'get'
types = types if types is not None else ['error'] types = types if types is not None else ['error', 'warning', 'log']
# Normalize types if passed as a single string format = format if format is not None else 'detailed'
if isinstance(types, str):
types = [types]
format = format if format is not None else 'json'
include_stacktrace = include_stacktrace if include_stacktrace is not None else True include_stacktrace = include_stacktrace if include_stacktrace is not None else True
# Default count to a higher value unless explicitly provided
count = 50 if count is None else count
# Normalize action if it's a string # Normalize action if it's a string
if isinstance(action, str): if isinstance(action, str):
action = action.lower() action = action.lower()
# Coerce count defensively (string/float -> int)
def _coerce_int(value, default=None):
if value is None:
return default
try:
if isinstance(value, bool):
return default
if isinstance(value, int):
return int(value)
s = str(value).strip()
if s.lower() in ("", "none", "null"):
return default
return int(float(s))
except Exception:
return default
count = _coerce_int(count)
# Prepare parameters for the C# handler # Prepare parameters for the C# handler
params_dict = { params_dict = {
"action": action, "action": action,
@ -73,25 +88,15 @@ def register_read_console_tools(mcp: FastMCP):
if 'count' not in params_dict: if 'count' not in params_dict:
params_dict['count'] = None params_dict['count'] = None
# Use centralized retry helper (tolerate legacy list payloads from some agents) # Use centralized retry helper
resp = send_command_with_retry("read_console", params_dict) resp = send_command_with_retry("read_console", params_dict)
if isinstance(resp, dict) and resp.get("success") and not include_stacktrace: if isinstance(resp, dict) and resp.get("success") and not include_stacktrace:
data = resp.get("data", {}) or {} # Strip stacktrace fields from returned lines if present
lines = data.get("lines") try:
if lines is None: lines = resp.get("data", {}).get("lines", [])
# Some handlers return the raw list under data for line in lines:
lines = data if isinstance(data, list) else [] if isinstance(line, dict) and "stacktrace" in line:
line.pop("stacktrace", None)
def _entry(x: Any) -> Dict[str, Any]: except Exception:
if isinstance(x, dict): pass
return { return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
"level": x.get("level") or x.get("type"),
"message": x.get("message") or x.get("text"),
}
if isinstance(x, (list, tuple)) and len(x) >= 2:
return {"level": x[0], "message": x[1]}
return {"level": None, "message": str(x)}
trimmed = [_entry(l) for l in (lines or [])]
return {"success": True, "data": {"lines": trimmed}}
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}

View File

@ -3,9 +3,8 @@ Resource wrapper tools so clients that do not expose MCP resources primitives
can still list and read files via normal tools. These call into the same can still list and read files via normal tools. These call into the same
safe path logic (re-implemented here to avoid importing server.py). safe path logic (re-implemented here to avoid importing server.py).
""" """
from __future__ import annotations
from typing import Dict, Any, List from typing import Dict, Any, List, Optional
import re import re
from pathlib import Path from pathlib import Path
from urllib.parse import urlparse, unquote from urllib.parse import urlparse, unquote
@ -14,9 +13,34 @@ import hashlib
import os import os
from mcp.server.fastmcp import FastMCP, Context from mcp.server.fastmcp import FastMCP, Context
from telemetry_decorator import telemetry_tool
from unity_connection import send_command_with_retry from unity_connection import send_command_with_retry
def _coerce_int(value: Any, default: Optional[int] = None, minimum: Optional[int] = None) -> Optional[int]:
"""Safely coerce various inputs (str/float/etc.) to an int.
Returns default on failure; clamps to minimum when provided.
"""
if value is None:
return default
try:
# Avoid treating booleans as ints implicitly
if isinstance(value, bool):
return default
if isinstance(value, int):
result = int(value)
else:
s = str(value).strip()
if s.lower() in ("", "none", "null"):
return default
# Allow "10.0" or similar inputs
result = int(float(s))
if minimum is not None and result < minimum:
return minimum
return result
except Exception:
return default
def _resolve_project_root(override: str | None) -> Path: def _resolve_project_root(override: str | None) -> Path:
# 1) Explicit override # 1) Explicit override
if override: if override:
@ -114,12 +138,13 @@ def register_resource_tools(mcp: FastMCP) -> None:
"Security: restricted to Assets/ subtree; symlinks are resolved and must remain under Assets/.\n" "Security: restricted to Assets/ subtree; symlinks are resolved and must remain under Assets/.\n"
"Notes: Only .cs files are returned by default; always appends unity://spec/script-edits.\n" "Notes: Only .cs files are returned by default; always appends unity://spec/script-edits.\n"
)) ))
@telemetry_tool("list_resources")
async def list_resources( async def list_resources(
ctx: Context | None = None, ctx: Optional[Context] = None,
pattern: str | None = "*.cs", pattern: Optional[str] = "*.cs",
under: str = "Assets", under: str = "Assets",
limit: int = 200, limit: Any = 200,
project_root: str | None = None, project_root: Optional[str] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Lists project URIs (unity://path/...) under a folder (default: Assets). Lists project URIs (unity://path/...) under a folder (default: Assets).
@ -141,6 +166,7 @@ def register_resource_tools(mcp: FastMCP) -> None:
return {"success": False, "error": "Listing is restricted to Assets/"} return {"success": False, "error": "Listing is restricted to Assets/"}
matches: List[str] = [] matches: List[str] = []
limit_int = _coerce_int(limit, default=200, minimum=1)
for p in base.rglob("*"): for p in base.rglob("*"):
if not p.is_file(): if not p.is_file():
continue continue
@ -157,7 +183,7 @@ def register_resource_tools(mcp: FastMCP) -> None:
continue continue
rel = p.relative_to(project).as_posix() rel = p.relative_to(project).as_posix()
matches.append(f"unity://path/{rel}") matches.append(f"unity://path/{rel}")
if len(matches) >= max(1, limit): if len(matches) >= max(1, limit_int):
break break
# Always include the canonical spec resource so NL clients can discover it # Always include the canonical spec resource so NL clients can discover it
@ -174,21 +200,20 @@ def register_resource_tools(mcp: FastMCP) -> None:
"Security: uri must resolve under Assets/.\n" "Security: uri must resolve under Assets/.\n"
"Examples: head_bytes=1024; start_line=100,line_count=40; tail_lines=120.\n" "Examples: head_bytes=1024; start_line=100,line_count=40; tail_lines=120.\n"
)) ))
@telemetry_tool("read_resource")
async def read_resource( async def read_resource(
uri: str, uri: str,
ctx: Context | None = None, ctx: Optional[Context] = None,
start_line: int | None = None, start_line: Any = None,
line_count: int | None = None, line_count: Any = None,
head_bytes: int | None = None, head_bytes: Any = None,
tail_lines: int | None = None, tail_lines: Any = None,
project_root: str | None = None, project_root: Optional[str] = None,
request: str | None = None, request: Optional[str] = None,
include_text: bool = False,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Reads a resource by unity://path/... URI with optional slicing. Reads a resource by unity://path/... URI with optional slicing.
By default only the SHA-256 hash and byte length are returned; set One of line window (start_line/line_count) or head_bytes can be used to limit size.
``include_text`` or provide window arguments to receive text.
""" """
try: try:
# Serve the canonical spec directly when requested (allow bare or with scheme) # Serve the canonical spec directly when requested (allow bare or with scheme)
@ -293,57 +318,54 @@ def register_resource_tools(mcp: FastMCP) -> None:
start_line = max(1, hit_line - half) start_line = max(1, hit_line - half)
line_count = window line_count = window
raw = p.read_bytes() # Coerce numeric inputs defensively (string/float -> int)
sha = hashlib.sha256(raw).hexdigest() start_line = _coerce_int(start_line)
length = len(raw) line_count = _coerce_int(line_count)
head_bytes = _coerce_int(head_bytes, minimum=1)
tail_lines = _coerce_int(tail_lines, minimum=1)
want_text = ( # Compute SHA over full file contents (metadata-only default)
bool(include_text) full_bytes = p.read_bytes()
or (head_bytes is not None and head_bytes >= 0) full_sha = hashlib.sha256(full_bytes).hexdigest()
or (tail_lines is not None and tail_lines > 0)
or (start_line is not None and line_count is not None) # Selection only when explicitly requested via windowing args or request text hints
) selection_requested = bool(head_bytes or tail_lines or (start_line is not None and line_count is not None) or request)
if want_text: if selection_requested:
text: str # Mutually exclusive windowing options precedence:
if head_bytes is not None and head_bytes >= 0: # 1) head_bytes, 2) tail_lines, 3) start_line+line_count, else full text
text = raw[: head_bytes].decode("utf-8", errors="replace") if head_bytes and head_bytes > 0:
else: raw = full_bytes[: head_bytes]
text = raw.decode("utf-8", errors="replace") text = raw.decode("utf-8", errors="replace")
else:
text = full_bytes.decode("utf-8", errors="replace")
if tail_lines is not None and tail_lines > 0: if tail_lines is not None and tail_lines > 0:
lines = text.splitlines() lines = text.splitlines()
n = max(0, tail_lines) n = max(0, tail_lines)
text = "\n".join(lines[-n:]) text = "\n".join(lines[-n:])
elif ( elif start_line is not None and line_count is not None and line_count >= 0:
start_line is not None
and line_count is not None
and line_count >= 0
):
lines = text.splitlines() lines = text.splitlines()
s = max(0, start_line - 1) s = max(0, start_line - 1)
e = min(len(lines), s + line_count) e = min(len(lines), s + line_count)
text = "\n".join(lines[s:e]) text = "\n".join(lines[s:e])
return { return {"success": True, "data": {"text": text, "metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}}
"success": True, else:
"data": {"text": text, "metadata": {"sha256": sha}}, # Default: metadata only
} return {"success": True, "data": {"metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}}
return {
"success": True,
"data": {"metadata": {"sha256": sha, "lengthBytes": length}},
}
except Exception as e: except Exception as e:
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
@mcp.tool() @mcp.tool()
@telemetry_tool("find_in_file")
async def find_in_file( async def find_in_file(
uri: str, uri: str,
pattern: str, pattern: str,
ctx: Context | None = None, ctx: Optional[Context] = None,
ignore_case: bool | None = True, ignore_case: Optional[bool] = True,
project_root: str | None = None, project_root: Optional[str] = None,
max_results: int | None = 1, max_results: Any = 200,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Searches a file with a regex pattern and returns match positions only. Searches a file with a regex pattern and returns line numbers and excerpts.
- uri: unity://path/Assets/... or file path form supported by read_resource - uri: unity://path/Assets/... or file path form supported by read_resource
- pattern: regular expression (Python re) - pattern: regular expression (Python re)
- ignore_case: case-insensitive by default - ignore_case: case-insensitive by default
@ -363,20 +385,20 @@ def register_resource_tools(mcp: FastMCP) -> None:
rx = re.compile(pattern, flags) rx = re.compile(pattern, flags)
results = [] results = []
max_results_int = _coerce_int(max_results, default=200, minimum=1)
lines = text.splitlines() lines = text.splitlines()
for i, line in enumerate(lines, start=1): for i, line in enumerate(lines, start=1):
m = rx.search(line) m = rx.search(line)
if m: if m:
start_col, end_col = m.span() start_col = m.start() + 1 # 1-based
results.append( end_col = m.end() + 1 # 1-based, end exclusive
{ results.append({
"startLine": i, "startLine": i,
"startCol": start_col + 1, "startCol": start_col,
"endLine": i, "endLine": i,
"endCol": end_col + 1, "endCol": end_col,
} })
) if max_results_int and len(results) >= max_results_int:
if max_results and len(results) >= max_results:
break break
return {"success": True, "data": {"matches": results, "count": len(results)}} return {"success": True, "data": {"matches": results, "count": len(results)}}

8
tests/conftest.py Normal file
View File

@ -0,0 +1,8 @@
import os
# Ensure telemetry is disabled during test collection and execution to avoid
# any background network or thread startup that could slow or block pytest.
os.environ.setdefault("DISABLE_TELEMETRY", "true")
os.environ.setdefault("UNITY_MCP_DISABLE_TELEMETRY", "true")
os.environ.setdefault("MCP_DISABLE_TELEMETRY", "true")

View File

@ -0,0 +1,56 @@
import os
import importlib
def test_endpoint_rejects_non_http(tmp_path, monkeypatch):
# Point data dir to temp to avoid touching real files
monkeypatch.setenv("XDG_DATA_HOME", str(tmp_path))
monkeypatch.setenv("UNITY_MCP_TELEMETRY_ENDPOINT", "file:///etc/passwd")
telemetry = importlib.import_module("UnityMcpBridge.UnityMcpServer~.src.telemetry")
importlib.reload(telemetry)
tc = telemetry.TelemetryCollector()
# Should have fallen back to default endpoint
assert tc.config.endpoint == tc.config.default_endpoint
def test_config_preferred_then_env_override(tmp_path, monkeypatch):
# Simulate config telemetry endpoint
monkeypatch.setenv("XDG_DATA_HOME", str(tmp_path))
monkeypatch.delenv("UNITY_MCP_TELEMETRY_ENDPOINT", raising=False)
# Patch config.telemetry_endpoint via import mocking
import importlib
cfg_mod = importlib.import_module("UnityMcpBridge.UnityMcpServer~.src.config")
old_endpoint = cfg_mod.config.telemetry_endpoint
cfg_mod.config.telemetry_endpoint = "https://example.com/telemetry"
try:
telemetry = importlib.import_module("UnityMcpBridge.UnityMcpServer~.src.telemetry")
importlib.reload(telemetry)
tc = telemetry.TelemetryCollector()
assert tc.config.endpoint == "https://example.com/telemetry"
# Env should override config
monkeypatch.setenv("UNITY_MCP_TELEMETRY_ENDPOINT", "https://override.example/ep")
importlib.reload(telemetry)
tc2 = telemetry.TelemetryCollector()
assert tc2.config.endpoint == "https://override.example/ep"
finally:
cfg_mod.config.telemetry_endpoint = old_endpoint
def test_uuid_preserved_on_malformed_milestones(tmp_path, monkeypatch):
monkeypatch.setenv("XDG_DATA_HOME", str(tmp_path))
telemetry = importlib.import_module("UnityMcpBridge.UnityMcpServer~.src.telemetry")
importlib.reload(telemetry)
tc1 = telemetry.TelemetryCollector()
first_uuid = tc1._customer_uuid
# Write malformed milestones
tc1.config.milestones_file.write_text("{not-json}", encoding="utf-8")
# Reload collector; UUID should remain same despite bad milestones
importlib.reload(telemetry)
tc2 = telemetry.TelemetryCollector()
assert tc2._customer_uuid == first_uuid

View File

@ -0,0 +1,83 @@
import sys
import pathlib
import importlib.util
import types
import threading
import time
import queue as q
ROOT = pathlib.Path(__file__).resolve().parents[1]
SRC = ROOT / "UnityMcpBridge" / "UnityMcpServer~" / "src"
sys.path.insert(0, str(SRC))
# Stub mcp.server.fastmcp to satisfy imports without the full dependency
mcp_pkg = types.ModuleType("mcp")
server_pkg = types.ModuleType("mcp.server")
fastmcp_pkg = types.ModuleType("mcp.server.fastmcp")
class _Dummy:
pass
fastmcp_pkg.FastMCP = _Dummy
fastmcp_pkg.Context = _Dummy
server_pkg.fastmcp = fastmcp_pkg
mcp_pkg.server = server_pkg
sys.modules.setdefault("mcp", mcp_pkg)
sys.modules.setdefault("mcp.server", server_pkg)
sys.modules.setdefault("mcp.server.fastmcp", fastmcp_pkg)
def _load_module(path: pathlib.Path, name: str):
spec = importlib.util.spec_from_file_location(name, path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
telemetry = _load_module(SRC / "telemetry.py", "telemetry_mod")
def test_telemetry_queue_backpressure_and_single_worker(monkeypatch, caplog):
caplog.set_level("DEBUG")
collector = telemetry.TelemetryCollector()
# Force-enable telemetry regardless of env settings from conftest
collector.config.enabled = True
# Wake existing worker once so it observes the new queue on the next loop
collector.record(telemetry.RecordType.TOOL_EXECUTION, {"i": -1})
# Replace queue with tiny one to trigger backpressure quickly
small_q = q.Queue(maxsize=2)
collector._queue = small_q
# Give the worker a moment to switch queues
time.sleep(0.02)
# Make sends slow to build backlog and exercise worker
def slow_send(self, rec):
time.sleep(0.05)
collector._send_telemetry = types.MethodType(slow_send, collector)
# Fire many events quickly; record() should not block even when queue fills
start = time.perf_counter()
for i in range(50):
collector.record(telemetry.RecordType.TOOL_EXECUTION, {"i": i})
elapsed_ms = (time.perf_counter() - start) * 1000.0
# Should be fast despite backpressure (non-blocking enqueue or drop)
assert elapsed_ms < 80.0
# Allow worker to process some
time.sleep(0.3)
# Verify drops were logged (queue full backpressure)
dropped_logs = [m for m in caplog.messages if "Telemetry queue full; dropping" in m]
assert len(dropped_logs) >= 1
# Ensure only one worker thread exists and is alive
assert collector._worker.is_alive()
worker_threads = [t for t in threading.enumerate() if t is collector._worker]
assert len(worker_threads) == 1

View File

@ -0,0 +1,83 @@
import importlib
def _get_decorator_module():
# Import the telemetry_decorator module from the Unity MCP server src
mod = importlib.import_module("UnityMcpBridge.UnityMcpServer~.src.telemetry_decorator")
return mod
def test_subaction_extracted_from_keyword(monkeypatch):
td = _get_decorator_module()
captured = {}
def fake_record_tool_usage(tool_name, success, duration_ms, error, sub_action=None):
captured["tool_name"] = tool_name
captured["success"] = success
captured["error"] = error
captured["sub_action"] = sub_action
# Silence milestones/logging in test
monkeypatch.setattr(td, "record_tool_usage", fake_record_tool_usage)
monkeypatch.setattr(td, "record_milestone", lambda *a, **k: None)
monkeypatch.setattr(td, "_decorator_log_count", 999)
def dummy_tool(ctx, action: str, name: str = ""):
return {"success": True, "name": name}
wrapped = td.telemetry_tool("manage_scene")(dummy_tool)
resp = wrapped(None, action="get_hierarchy", name="Sample")
assert resp["success"] is True
assert captured["tool_name"] == "manage_scene"
assert captured["success"] is True
assert captured["error"] is None
assert captured["sub_action"] == "get_hierarchy"
def test_subaction_extracted_from_positionals(monkeypatch):
td = _get_decorator_module()
captured = {}
def fake_record_tool_usage(tool_name, success, duration_ms, error, sub_action=None):
captured["tool_name"] = tool_name
captured["sub_action"] = sub_action
monkeypatch.setattr(td, "record_tool_usage", fake_record_tool_usage)
monkeypatch.setattr(td, "record_milestone", lambda *a, **k: None)
monkeypatch.setattr(td, "_decorator_log_count", 999)
def dummy_tool(ctx, action: str, name: str = ""):
return True
wrapped = td.telemetry_tool("manage_scene")(dummy_tool)
_ = wrapped(None, "save", "MyScene")
assert captured["tool_name"] == "manage_scene"
assert captured["sub_action"] == "save"
def test_subaction_none_when_not_present(monkeypatch):
td = _get_decorator_module()
captured = {}
def fake_record_tool_usage(tool_name, success, duration_ms, error, sub_action=None):
captured["tool_name"] = tool_name
captured["sub_action"] = sub_action
monkeypatch.setattr(td, "record_tool_usage", fake_record_tool_usage)
monkeypatch.setattr(td, "record_milestone", lambda *a, **k: None)
monkeypatch.setattr(td, "_decorator_log_count", 999)
def dummy_tool_without_action(ctx, name: str):
return 123
wrapped = td.telemetry_tool("apply_text_edits")(dummy_tool_without_action)
_ = wrapped(None, name="X")
assert captured["tool_name"] == "apply_text_edits"
assert captured["sub_action"] is None