unity-mcp/UnityMcpBridge/UnityMcpServer~/src/telemetry.py

372 lines
14 KiB
Python

"""
Privacy-focused, anonymous telemetry system for Unity MCP
Inspired by Onyx's telemetry implementation with Unity-specific adaptations
"""
import uuid
import threading
import contextvars
import json
import time
import os
import sys
import platform
import logging
from enum import Enum
from urllib.parse import urlparse
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, List
from pathlib import Path
import importlib
try:
import httpx
HAS_HTTPX = True
except ImportError:
httpx = None # type: ignore
HAS_HTTPX = False
logger = logging.getLogger("unity-mcp-telemetry")
class RecordType(str, Enum):
"""Types of telemetry records we collect"""
VERSION = "version"
STARTUP = "startup"
USAGE = "usage"
LATENCY = "latency"
FAILURE = "failure"
TOOL_EXECUTION = "tool_execution"
UNITY_CONNECTION = "unity_connection"
CLIENT_CONNECTION = "client_connection"
class MilestoneType(str, Enum):
"""Major user journey milestones"""
FIRST_STARTUP = "first_startup"
FIRST_TOOL_USAGE = "first_tool_usage"
FIRST_SCRIPT_CREATION = "first_script_creation"
FIRST_SCENE_MODIFICATION = "first_scene_modification"
MULTIPLE_SESSIONS = "multiple_sessions"
DAILY_ACTIVE_USER = "daily_active_user"
WEEKLY_ACTIVE_USER = "weekly_active_user"
@dataclass
class TelemetryRecord:
"""Structure for telemetry data"""
record_type: RecordType
timestamp: float
customer_uuid: str
session_id: str
data: Dict[str, Any]
milestone: Optional[MilestoneType] = None
class TelemetryConfig:
"""Telemetry configuration"""
def __init__(self):
# Prefer config file, then allow env overrides
server_config = None
for modname in (
"UnityMcpBridge.UnityMcpServer~.src.config",
"UnityMcpBridge.UnityMcpServer.src.config",
"src.config",
"config",
):
try:
mod = importlib.import_module(modname)
server_config = getattr(mod, "config", None)
if server_config is not None:
break
except Exception:
continue
# Determine enabled flag: config -> env DISABLE_* opt-out
cfg_enabled = True if server_config is None else bool(getattr(server_config, "telemetry_enabled", True))
self.enabled = cfg_enabled and not self._is_disabled()
# Telemetry endpoint (Cloud Run default; override via env)
cfg_default = None if server_config is None else getattr(server_config, "telemetry_endpoint", None)
default_ep = cfg_default or "https://unity-mcp-telemetry-375728817078.us-central1.run.app/telemetry/events"
self.default_endpoint = default_ep
self.endpoint = self._validated_endpoint(
os.environ.get("UNITY_MCP_TELEMETRY_ENDPOINT", default_ep),
default_ep,
)
# Local storage for UUID and milestones
self.data_dir = self._get_data_directory()
self.uuid_file = self.data_dir / "customer_uuid.txt"
self.milestones_file = self.data_dir / "milestones.json"
# Request timeout
self.timeout = 10.0
# Session tracking
self.session_id = str(uuid.uuid4())
def _is_disabled(self) -> bool:
"""Check if telemetry is disabled via environment variables"""
disable_vars = [
"DISABLE_TELEMETRY",
"UNITY_MCP_DISABLE_TELEMETRY",
"MCP_DISABLE_TELEMETRY"
]
for var in disable_vars:
if os.environ.get(var, "").lower() in ("true", "1", "yes", "on"):
return True
return False
def _get_data_directory(self) -> Path:
"""Get directory for storing telemetry data"""
if os.name == 'nt': # Windows
base_dir = Path(os.environ.get('APPDATA', Path.home() / 'AppData' / 'Roaming'))
elif os.name == 'posix': # macOS/Linux
if 'darwin' in os.uname().sysname.lower(): # macOS
base_dir = Path.home() / 'Library' / 'Application Support'
else: # Linux
base_dir = Path(os.environ.get('XDG_DATA_HOME', Path.home() / '.local' / 'share'))
else:
base_dir = Path.home() / '.unity-mcp'
data_dir = base_dir / 'UnityMCP'
data_dir.mkdir(parents=True, exist_ok=True)
return data_dir
def _validated_endpoint(self, candidate: str, fallback: str) -> str:
"""Validate telemetry endpoint URL scheme; allow only http/https.
Falls back to the provided default on error.
"""
try:
parsed = urlparse(candidate)
if parsed.scheme not in ("https", "http"):
raise ValueError(f"Unsupported scheme: {parsed.scheme}")
# Basic sanity: require network location and path
if not parsed.netloc:
raise ValueError("Missing netloc in endpoint")
return candidate
except Exception as e:
logger.debug(
f"Invalid telemetry endpoint '{candidate}', using default. Error: {e}",
exc_info=True,
)
return fallback
class TelemetryCollector:
"""Main telemetry collection class"""
def __init__(self):
self.config = TelemetryConfig()
self._customer_uuid: Optional[str] = None
self._milestones: Dict[str, Dict[str, Any]] = {}
self._lock: threading.Lock = threading.Lock()
self._load_persistent_data()
def _load_persistent_data(self):
"""Load UUID and milestones from disk"""
# Load customer UUID
try:
if self.config.uuid_file.exists():
self._customer_uuid = self.config.uuid_file.read_text(encoding="utf-8").strip() or str(uuid.uuid4())
else:
self._customer_uuid = str(uuid.uuid4())
try:
self.config.uuid_file.write_text(self._customer_uuid, encoding="utf-8")
if os.name == "posix":
os.chmod(self.config.uuid_file, 0o600)
except OSError as e:
logger.debug(f"Failed to persist customer UUID: {e}", exc_info=True)
except OSError as e:
logger.debug(f"Failed to load customer UUID: {e}", exc_info=True)
self._customer_uuid = str(uuid.uuid4())
# Load milestones (failure here must not affect UUID)
try:
if self.config.milestones_file.exists():
content = self.config.milestones_file.read_text(encoding="utf-8")
self._milestones = json.loads(content) or {}
if not isinstance(self._milestones, dict):
self._milestones = {}
except (OSError, json.JSONDecodeError, ValueError) as e:
logger.debug(f"Failed to load milestones: {e}", exc_info=True)
self._milestones = {}
def _save_milestones(self):
"""Save milestones to disk"""
try:
self.config.milestones_file.write_text(json.dumps(self._milestones, indent=2))
except Exception as e:
logger.warning(f"Failed to save milestones: {e}")
def record_milestone(self, milestone: MilestoneType, data: Optional[Dict[str, Any]] = None) -> bool:
"""Record a milestone event, returns True if this is the first occurrence"""
if not self.config.enabled:
return False
milestone_key = milestone.value
with self._lock:
if milestone_key in self._milestones:
return False # Already recorded
milestone_data = {
"timestamp": time.time(),
"data": data or {},
}
self._milestones[milestone_key] = milestone_data
self._save_milestones()
# Also send as telemetry record
self.record(
record_type=RecordType.USAGE,
data={"milestone": milestone_key, **(data or {})},
milestone=milestone
)
return True
def record(self,
record_type: RecordType,
data: Dict[str, Any],
milestone: Optional[MilestoneType] = None):
"""Record a telemetry event (async, non-blocking)"""
if not self.config.enabled:
return
# Allow fallback sender when httpx is unavailable (no early return)
record = TelemetryRecord(
record_type=record_type,
timestamp=time.time(),
customer_uuid=self._customer_uuid or "unknown",
session_id=self.config.session_id,
data=data,
milestone=milestone
)
# Send in background thread to avoid blocking
current_context = contextvars.copy_context()
thread = threading.Thread(
target=lambda: current_context.run(self._send_telemetry, record),
daemon=True
)
thread.start()
def _send_telemetry(self, record: TelemetryRecord):
"""Send telemetry data to endpoint"""
try:
# System fingerprint (top-level remains concise; details stored in data JSON)
_platform = platform.system() # 'Darwin' | 'Linux' | 'Windows'
_source = sys.platform # 'darwin' | 'linux' | 'win32'
_platform_detail = f"{_platform} {platform.release()} ({platform.machine()})"
_python_version = platform.python_version()
# Enrich data JSON so BigQuery stores detailed fields without schema change
enriched_data = dict(record.data or {})
enriched_data.setdefault("platform_detail", _platform_detail)
enriched_data.setdefault("python_version", _python_version)
payload = {
"record": record.record_type.value,
"timestamp": record.timestamp,
"customer_uuid": record.customer_uuid,
"session_id": record.session_id,
"data": enriched_data,
"version": "3.0.2", # Unity MCP version
"platform": _platform,
"source": _source,
}
if record.milestone:
payload["milestone"] = record.milestone.value
# Prefer httpx when available; otherwise fall back to urllib
if httpx:
with httpx.Client(timeout=self.config.timeout) as client:
# Re-validate endpoint at send time to handle dynamic changes
endpoint = self.config._validated_endpoint(self.config.endpoint, self.config.default_endpoint)
response = client.post(endpoint, json=payload)
if response.status_code == 200:
logger.debug(f"Telemetry sent: {record.record_type}")
else:
logger.debug(f"Telemetry failed: HTTP {response.status_code}")
else:
import urllib.request
import urllib.error
data_bytes = json.dumps(payload).encode("utf-8")
endpoint = self.config._validated_endpoint(self.config.endpoint, self.config.default_endpoint)
req = urllib.request.Request(
endpoint,
data=data_bytes,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=self.config.timeout) as resp:
if 200 <= resp.getcode() < 300:
logger.debug(f"Telemetry sent (urllib): {record.record_type}")
else:
logger.debug(f"Telemetry failed (urllib): HTTP {resp.getcode()}")
except urllib.error.URLError as ue:
logger.debug(f"Telemetry send failed (urllib): {ue}")
except Exception as e:
# Never let telemetry errors interfere with app functionality
logger.debug(f"Telemetry send failed: {e}")
# Global telemetry instance
_telemetry_collector: Optional[TelemetryCollector] = None
def get_telemetry() -> TelemetryCollector:
"""Get the global telemetry collector instance"""
global _telemetry_collector
if _telemetry_collector is None:
_telemetry_collector = TelemetryCollector()
return _telemetry_collector
def record_telemetry(record_type: RecordType,
data: Dict[str, Any],
milestone: Optional[MilestoneType] = None):
"""Convenience function to record telemetry"""
get_telemetry().record(record_type, data, milestone)
def record_milestone(milestone: MilestoneType, data: Optional[Dict[str, Any]] = None) -> bool:
"""Convenience function to record a milestone"""
return get_telemetry().record_milestone(milestone, data)
def record_tool_usage(tool_name: str, success: bool, duration_ms: float, error: Optional[str] = None):
"""Record tool usage telemetry"""
data = {
"tool_name": tool_name,
"success": success,
"duration_ms": round(duration_ms, 2)
}
if error:
data["error"] = str(error)[:200] # Limit error message length
record_telemetry(RecordType.TOOL_EXECUTION, data)
def record_latency(operation: str, duration_ms: float, metadata: Optional[Dict[str, Any]] = None):
"""Record latency telemetry"""
data = {
"operation": operation,
"duration_ms": round(duration_ms, 2)
}
if metadata:
data.update(metadata)
record_telemetry(RecordType.LATENCY, data)
def record_failure(component: str, error: str, metadata: Optional[Dict[str, Any]] = None):
"""Record failure telemetry"""
data = {
"component": component,
"error": str(error)[:500] # Limit error message length
}
if metadata:
data.update(metadata)
record_telemetry(RecordType.FAILURE, data)
def is_telemetry_enabled() -> bool:
"""Check if telemetry is enabled"""
return get_telemetry().config.enabled