telemetry: fire-and-forget queue; remove context propagation; reduce timeouts; fix milestone lock reentrancy

main
David Sarno 2025-09-10 07:50:05 -07:00
parent 46df7250b5
commit 397ba32a99
1 changed files with 21 additions and 15 deletions

View File

@ -5,7 +5,11 @@ Inspired by Onyx's telemetry implementation with Unity-specific adaptations
import uuid import uuid
import threading import threading
import contextvars """
Fire-and-forget telemetry sender with a single background worker.
- No context/thread-local propagation to avoid re-entrancy into tool resolution.
- Small network timeouts to prevent stalls.
"""
import json import json
import time import time
import os import os
@ -98,8 +102,11 @@ class TelemetryConfig:
self.uuid_file = self.data_dir / "customer_uuid.txt" self.uuid_file = self.data_dir / "customer_uuid.txt"
self.milestones_file = self.data_dir / "milestones.json" self.milestones_file = self.data_dir / "milestones.json"
# Request timeout # Request timeout (small, fail fast). Override with UNITY_MCP_TELEMETRY_TIMEOUT
self.timeout = 10.0 try:
self.timeout = float(os.environ.get("UNITY_MCP_TELEMETRY_TIMEOUT", "1.5"))
except Exception:
self.timeout = 1.5
# Session tracking # Session tracking
self.session_id = str(uuid.uuid4()) self.session_id = str(uuid.uuid4())
@ -160,8 +167,8 @@ class TelemetryCollector:
self._customer_uuid: Optional[str] = None self._customer_uuid: Optional[str] = None
self._milestones: Dict[str, Dict[str, Any]] = {} self._milestones: Dict[str, Dict[str, Any]] = {}
self._lock: threading.Lock = threading.Lock() self._lock: threading.Lock = threading.Lock()
# Bounded queue with single background worker to avoid spawning a thread per event # Bounded queue with single background worker (records only; no context propagation)
self._queue: "queue.Queue[tuple[contextvars.Context, TelemetryRecord]]" = queue.Queue(maxsize=1000) self._queue: "queue.Queue[TelemetryRecord]" = queue.Queue(maxsize=1000)
self._worker: threading.Thread = threading.Thread(target=self._worker_loop, daemon=True) self._worker: threading.Thread = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start() self._worker.start()
self._load_persistent_data() self._load_persistent_data()
@ -196,9 +203,8 @@ class TelemetryCollector:
self._milestones = {} self._milestones = {}
def _save_milestones(self): def _save_milestones(self):
"""Save milestones to disk""" """Save milestones to disk. Caller must hold self._lock."""
try: try:
with self._lock:
self.config.milestones_file.write_text( self.config.milestones_file.write_text(
json.dumps(self._milestones, indent=2), json.dumps(self._milestones, indent=2),
encoding="utf-8", encoding="utf-8",
@ -249,18 +255,18 @@ class TelemetryCollector:
milestone=milestone milestone=milestone
) )
# Enqueue for background worker (non-blocking). Drop on backpressure. # Enqueue for background worker (non-blocking). Drop on backpressure.
current_context = contextvars.copy_context()
try: try:
self._queue.put_nowait((current_context, record)) self._queue.put_nowait(record)
except queue.Full: except queue.Full:
logger.debug("Telemetry queue full; dropping %s", record.record_type) logger.debug("Telemetry queue full; dropping %s", record.record_type)
def _worker_loop(self): def _worker_loop(self):
"""Background worker that serializes telemetry sends.""" """Background worker that serializes telemetry sends."""
while True: while True:
ctx, rec = self._queue.get() rec = self._queue.get()
try: try:
ctx.run(self._send_telemetry, rec) # Run sender directly; do not reuse caller context/thread-locals
self._send_telemetry(rec)
except Exception: except Exception:
logger.debug("Telemetry worker send failed", exc_info=True) logger.debug("Telemetry worker send failed", exc_info=True)
finally: finally: