import logging import types import threading import time import queue as q import core.telemetry as telemetry def test_telemetry_queue_backpressure_and_single_worker(monkeypatch, caplog): # Directly attach caplog's handler to the telemetry logger so that # earlier tests calling logging.basicConfig() can't steal the records # via a root handler before caplog sees them. tel_logger = logging.getLogger("unity-mcp-telemetry") tel_logger.addHandler(caplog.handler) try: caplog.set_level("DEBUG", logger="unity-mcp-telemetry") collector = telemetry.TelemetryCollector() # Force-enable telemetry regardless of env settings from conftest collector.config.enabled = True # Wake existing worker once so it observes the new queue on the next loop collector.record(telemetry.RecordType.TOOL_EXECUTION, {"i": -1}) # Replace queue with tiny one to trigger backpressure quickly small_q = q.Queue(maxsize=2) collector._queue = small_q # Give the worker time to finish processing the seeded item and # re-enter _queue.get() on the new small queue time.sleep(0.2) # Make sends slow to build backlog and exercise worker def slow_send(self, rec): time.sleep(0.05) collector._send_telemetry = types.MethodType(slow_send, collector) # Fire many events quickly; record() should not block even when queue fills start = time.perf_counter() for i in range(50): collector.record(telemetry.RecordType.TOOL_EXECUTION, {"i": i}) elapsed_ms = (time.perf_counter() - start) * 1000.0 # Should be fast despite backpressure (non-blocking enqueue or drop) # Threshold set high (500ms) to accommodate CI environments with variable load. # The key assertion is that 50 record() calls don't block on a full queue; # even under heavy CI load, non-blocking calls should complete well under 500ms. assert elapsed_ms < 500.0, f"Took {elapsed_ms:.1f}ms (expected <500ms for non-blocking calls)" # Allow worker to process some time.sleep(0.3) # Verify drops were logged (queue full backpressure) dropped_logs = [ m for m in caplog.messages if "Telemetry queue full; dropping" in m] assert len(dropped_logs) >= 1 # Ensure only one worker thread exists and is alive assert collector._worker.is_alive() worker_threads = [ t for t in threading.enumerate() if t is collector._worker] assert len(worker_threads) == 1 finally: if caplog.handler in tel_logger.handlers: tel_logger.removeHandler(caplog.handler)