mcp-unity: telemetry fire-and-forget; safer sender reg; defer startup/conn telemetry; writer IO logs; manage_scene tolerant params; test worker wake

main
David Sarno 2025-09-10 09:24:09 -07:00
parent 89714d022c
commit 9f7308b4c2
4 changed files with 52 additions and 30 deletions

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using UnityEngine; using UnityEngine;
namespace MCPForUnity.Editor.Helpers namespace MCPForUnity.Editor.Helpers
@ -124,7 +125,12 @@ namespace MCPForUnity.Editor.Helpers
/// </summary> /// </summary>
public static void RegisterTelemetrySender(Action<Dictionary<string, object>> sender) public static void RegisterTelemetrySender(Action<Dictionary<string, object>> sender)
{ {
s_sender = sender; Interlocked.Exchange(ref s_sender, sender);
}
public static void UnregisterTelemetrySender()
{
Interlocked.Exchange(ref s_sender, null);
} }
/// <summary> /// <summary>
@ -179,7 +185,7 @@ namespace MCPForUnity.Editor.Helpers
private static void SendTelemetryToPythonServer(Dictionary<string, object> telemetryData) private static void SendTelemetryToPythonServer(Dictionary<string, object> telemetryData)
{ {
var sender = s_sender; var sender = Volatile.Read(ref s_sender);
if (sender != null) if (sender != null)
{ {
try try

View File

@ -60,16 +60,18 @@ async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
server_version = ver_path.read_text(encoding="utf-8").strip() server_version = ver_path.read_text(encoding="utf-8").strip()
except Exception: except Exception:
server_version = "unknown" server_version = "unknown"
# Defer telemetry for first second to avoid interfering with stdio handshake # Defer initial telemetry by 1s to avoid stdio handshake interference
if (time.perf_counter() - start_clk) > 1.0: import threading
record_telemetry(RecordType.STARTUP, { def _emit_startup():
"server_version": server_version, try:
"startup_time": start_time record_telemetry(RecordType.STARTUP, {
}) "server_version": server_version,
"startup_time": start_time,
# Record first startup milestone })
if (time.perf_counter() - start_clk) > 1.0: record_milestone(MilestoneType.FIRST_STARTUP)
record_milestone(MilestoneType.FIRST_STARTUP) except Exception:
logger.debug("Deferred startup telemetry failed", exc_info=True)
threading.Timer(1.0, _emit_startup).start()
try: try:
skip_connect = os.environ.get("UNITY_MCP_SKIP_STARTUP_CONNECT", "").lower() in ("1", "true", "yes", "on") skip_connect = os.environ.get("UNITY_MCP_SKIP_STARTUP_CONNECT", "").lower() in ("1", "true", "yes", "on")
@ -79,33 +81,42 @@ async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
_unity_connection = get_unity_connection() _unity_connection = get_unity_connection()
logger.info("Connected to Unity on startup") logger.info("Connected to Unity on startup")
# Record successful Unity connection # Record successful Unity connection (deferred)
if (time.perf_counter() - start_clk) > 1.0: import threading as _t
record_telemetry(RecordType.UNITY_CONNECTION, { _t.Timer(1.0, lambda: record_telemetry(
RecordType.UNITY_CONNECTION,
{
"status": "connected", "status": "connected",
"connection_time_ms": (time.time() - start_time) * 1000 "connection_time_ms": (time.perf_counter() - start_clk) * 1000,
}) }
)).start()
except ConnectionError as e: except ConnectionError as e:
logger.warning("Could not connect to Unity on startup: %s", e) logger.warning("Could not connect to Unity on startup: %s", e)
_unity_connection = None _unity_connection = None
# Record connection failure # Record connection failure (deferred)
if (time.perf_counter() - start_clk) > 1.0: import threading as _t
record_telemetry(RecordType.UNITY_CONNECTION, { _t.Timer(1.0, lambda: record_telemetry(
RecordType.UNITY_CONNECTION,
{
"status": "failed", "status": "failed",
"error": str(e)[:200], "error": str(e)[:200],
"connection_time_ms": (time.perf_counter() - start_clk) * 1000 "connection_time_ms": (time.perf_counter() - start_clk) * 1000,
}) }
)).start()
except Exception as e: except Exception as e:
logger.warning("Unexpected error connecting to Unity on startup: %s", e) logger.warning("Unexpected error connecting to Unity on startup: %s", e)
_unity_connection = None _unity_connection = None
if (time.perf_counter() - start_clk) > 1.0: import threading as _t
record_telemetry(RecordType.UNITY_CONNECTION, { _t.Timer(1.0, lambda: record_telemetry(
RecordType.UNITY_CONNECTION,
{
"status": "failed", "status": "failed",
"error": str(e)[:200], "error": str(e)[:200],
"connection_time_ms": (time.perf_counter() - start_clk) * 1000 "connection_time_ms": (time.perf_counter() - start_clk) * 1000,
}) }
)).start()
try: try:
# Yield the connection object so it can be attached to the context # Yield the connection object so it can be attached to the context

View File

@ -169,9 +169,10 @@ class TelemetryCollector:
self._lock: threading.Lock = threading.Lock() self._lock: threading.Lock = threading.Lock()
# Bounded queue with single background worker (records only; no context propagation) # Bounded queue with single background worker (records only; no context propagation)
self._queue: "queue.Queue[TelemetryRecord]" = queue.Queue(maxsize=1000) self._queue: "queue.Queue[TelemetryRecord]" = queue.Queue(maxsize=1000)
# Load persistent data before starting worker so first events have UUID
self._load_persistent_data()
self._worker: threading.Thread = threading.Thread(target=self._worker_loop, daemon=True) self._worker: threading.Thread = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start() self._worker.start()
self._load_persistent_data()
def _load_persistent_data(self): def _load_persistent_data(self):
"""Load UUID and milestones from disk""" """Load UUID and milestones from disk"""
@ -307,8 +308,8 @@ class TelemetryCollector:
# Re-validate endpoint at send time to handle dynamic changes # Re-validate endpoint at send time to handle dynamic changes
endpoint = self.config._validated_endpoint(self.config.endpoint, self.config.default_endpoint) endpoint = self.config._validated_endpoint(self.config.endpoint, self.config.default_endpoint)
response = client.post(endpoint, json=payload) response = client.post(endpoint, json=payload)
if response.status_code == 200: if 200 <= response.status_code < 300:
logger.info(f"Telemetry sent: {record.record_type}") logger.debug(f"Telemetry sent: {record.record_type}")
else: else:
logger.warning(f"Telemetry failed: HTTP {response.status_code}") logger.warning(f"Telemetry failed: HTTP {response.status_code}")
else: else:
@ -325,7 +326,7 @@ class TelemetryCollector:
try: try:
with urllib.request.urlopen(req, timeout=self.config.timeout) as resp: with urllib.request.urlopen(req, timeout=self.config.timeout) as resp:
if 200 <= resp.getcode() < 300: if 200 <= resp.getcode() < 300:
logger.info(f"Telemetry sent (urllib): {record.record_type}") logger.debug(f"Telemetry sent (urllib): {record.record_type}")
else: else:
logger.warning(f"Telemetry failed (urllib): HTTP {resp.getcode()}") logger.warning(f"Telemetry failed (urllib): HTTP {resp.getcode()}")
except urllib.error.URLError as ue: except urllib.error.URLError as ue:

View File

@ -45,9 +45,13 @@ def test_telemetry_queue_backpressure_and_single_worker(monkeypatch, caplog):
# Force-enable telemetry regardless of env settings from conftest # Force-enable telemetry regardless of env settings from conftest
collector.config.enabled = True collector.config.enabled = True
# Wake existing worker once so it observes the new queue on the next loop
collector.record(telemetry.RecordType.TOOL_EXECUTION, {"i": -1})
# Replace queue with tiny one to trigger backpressure quickly # Replace queue with tiny one to trigger backpressure quickly
small_q = q.Queue(maxsize=2) small_q = q.Queue(maxsize=2)
collector._queue = small_q collector._queue = small_q
# Give the worker a moment to switch queues
time.sleep(0.02)
# Make sends slow to build backlog and exercise worker # Make sends slow to build backlog and exercise worker
def slow_send(self, rec): def slow_send(self, rec):