From 1e003748d8557c4b276e7cc894c9e826d49aa18a Mon Sep 17 00:00:00 2001 From: David Sarno Date: Tue, 9 Sep 2025 18:45:09 -0700 Subject: [PATCH] telemetry: bounded queue + single worker; INFO-level send logs; endpoint to Cloud Run; add unit test for backpressure --- .../UnityMcpServer~/src/telemetry.py | 10 +-- tests/test_telemetry_queue_worker.py | 79 +++++++++++++++++++ 2 files changed, 84 insertions(+), 5 deletions(-) create mode 100644 tests/test_telemetry_queue_worker.py diff --git a/UnityMcpBridge/UnityMcpServer~/src/telemetry.py b/UnityMcpBridge/UnityMcpServer~/src/telemetry.py index 8aed4ce..8d6be46 100644 --- a/UnityMcpBridge/UnityMcpServer~/src/telemetry.py +++ b/UnityMcpBridge/UnityMcpServer~/src/telemetry.py @@ -302,9 +302,9 @@ class TelemetryCollector: endpoint = self.config._validated_endpoint(self.config.endpoint, self.config.default_endpoint) response = client.post(endpoint, json=payload) if response.status_code == 200: - logger.debug(f"Telemetry sent: {record.record_type}") + logger.info(f"Telemetry sent: {record.record_type}") else: - logger.debug(f"Telemetry failed: HTTP {response.status_code}") + logger.warning(f"Telemetry failed: HTTP {response.status_code}") else: import urllib.request import urllib.error @@ -319,11 +319,11 @@ class TelemetryCollector: try: with urllib.request.urlopen(req, timeout=self.config.timeout) as resp: if 200 <= resp.getcode() < 300: - logger.debug(f"Telemetry sent (urllib): {record.record_type}") + logger.info(f"Telemetry sent (urllib): {record.record_type}") else: - logger.debug(f"Telemetry failed (urllib): HTTP {resp.getcode()}") + logger.warning(f"Telemetry failed (urllib): HTTP {resp.getcode()}") except urllib.error.URLError as ue: - logger.debug(f"Telemetry send failed (urllib): {ue}") + logger.warning(f"Telemetry send failed (urllib): {ue}") except Exception as e: # Never let telemetry errors interfere with app functionality diff --git a/tests/test_telemetry_queue_worker.py b/tests/test_telemetry_queue_worker.py new file mode 100644 index 0000000..c3e3722 --- /dev/null +++ b/tests/test_telemetry_queue_worker.py @@ -0,0 +1,79 @@ +import sys +import pathlib +import importlib.util +import types +import threading +import time +import queue as q + + +ROOT = pathlib.Path(__file__).resolve().parents[1] +SRC = ROOT / "UnityMcpBridge" / "UnityMcpServer~" / "src" +sys.path.insert(0, str(SRC)) + +# Stub mcp.server.fastmcp to satisfy imports without the full dependency +mcp_pkg = types.ModuleType("mcp") +server_pkg = types.ModuleType("mcp.server") +fastmcp_pkg = types.ModuleType("mcp.server.fastmcp") + +class _Dummy: + pass + +fastmcp_pkg.FastMCP = _Dummy +fastmcp_pkg.Context = _Dummy +server_pkg.fastmcp = fastmcp_pkg +mcp_pkg.server = server_pkg +sys.modules.setdefault("mcp", mcp_pkg) +sys.modules.setdefault("mcp.server", server_pkg) +sys.modules.setdefault("mcp.server.fastmcp", fastmcp_pkg) + + +def _load_module(path: pathlib.Path, name: str): + spec = importlib.util.spec_from_file_location(name, path) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +telemetry = _load_module(SRC / "telemetry.py", "telemetry_mod") + + +def test_telemetry_queue_backpressure_and_single_worker(monkeypatch, caplog): + caplog.set_level("DEBUG") + + collector = telemetry.TelemetryCollector() + # Force-enable telemetry regardless of env settings from conftest + collector.config.enabled = True + + # Replace queue with tiny one to trigger backpressure quickly + small_q = q.Queue(maxsize=2) + collector._queue = small_q + + # Make sends slow to build backlog and exercise worker + def slow_send(self, rec): + time.sleep(0.05) + + collector._send_telemetry = types.MethodType(slow_send, collector) + + # Fire many events quickly; record() should not block even when queue fills + start = time.perf_counter() + for i in range(50): + collector.record(telemetry.RecordType.TOOL_EXECUTION, {"i": i}) + elapsed_ms = (time.perf_counter() - start) * 1000.0 + + # Should be fast despite backpressure (non-blocking enqueue or drop) + assert elapsed_ms < 80.0 + + # Allow worker to process some + time.sleep(0.3) + + # Verify drops were logged (queue full backpressure) + dropped_logs = [m for m in caplog.messages if "Telemetry queue full; dropping" in m] + assert len(dropped_logs) >= 1 + + # Ensure only one worker thread exists and is alive + assert collector._worker.is_alive() + worker_threads = [t for t in threading.enumerate() if t is collector._worker] + assert len(worker_threads) == 1 + +