tests green: align SDK outputs + harden inputs\n\n- find_in_file: start/end positions with 1-based exclusive endCol\n- read_resource: metadata-only default + lengthBytes; selection returns text\n- read_console: strip stacktrace when include_stacktrace=false\n- validate_script: summary counts; get_sha: minimal fields\n- silence stdout in test_telemetry helper

main
David Sarno 2025-09-08 20:14:49 -07:00
parent bbe4b07558
commit 979757e38a
4 changed files with 84 additions and 54 deletions

View File

@ -14,20 +14,20 @@ sys.path.insert(0, str(Path(__file__).parent))
def test_telemetry_basic():
"""Test basic telemetry functionality"""
print("🧪 Testing Unity MCP Telemetry System...")
# Avoid stdout noise in tests
try:
from telemetry import (
get_telemetry, record_telemetry, record_milestone,
RecordType, MilestoneType, is_telemetry_enabled
)
print("✅ Telemetry module imported successfully")
pass
except ImportError as e:
print(f"❌ Failed to import telemetry module: {e}")
# Silent failure path for tests
return False
# Test telemetry enabled status
print(f"📊 Telemetry enabled: {is_telemetry_enabled()}")
_ = is_telemetry_enabled()
# Test basic record
try:
@ -35,9 +35,9 @@ def test_telemetry_basic():
"version": "3.0.2",
"test_run": True
})
print("✅ Basic telemetry record sent")
pass
except Exception as e:
print(f"❌ Failed to send basic telemetry: {e}")
# Silent failure path for tests
return False
# Test milestone recording
@ -45,24 +45,24 @@ def test_telemetry_basic():
is_first = record_milestone(MilestoneType.FIRST_STARTUP, {
"test_mode": True
})
print(f"✅ Milestone recorded (first time: {is_first})")
_ = is_first
except Exception as e:
print(f"❌ Failed to record milestone: {e}")
# Silent failure path for tests
return False
# Test telemetry collector
try:
collector = get_telemetry()
print(f"✅ Telemetry collector initialized (UUID: {collector._customer_uuid[:8]}...)")
_ = collector
except Exception as e:
print(f"❌ Failed to get telemetry collector: {e}")
# Silent failure path for tests
return False
return True
def test_telemetry_disabled():
"""Test telemetry with disabled state"""
print("\n🚫 Testing telemetry disabled state...")
# Silent for tests
# Set environment variable to disable telemetry
os.environ["DISABLE_TELEMETRY"] = "true"
@ -74,23 +74,23 @@ def test_telemetry_disabled():
from telemetry import is_telemetry_enabled, record_telemetry, RecordType
print(f"📊 Telemetry enabled (should be False): {is_telemetry_enabled()}")
_ = is_telemetry_enabled()
if not is_telemetry_enabled():
print("✅ Telemetry correctly disabled via environment variable")
pass
# Test that records are ignored when disabled
record_telemetry(RecordType.USAGE, {"test": "should_be_ignored"})
print("✅ Telemetry record ignored when disabled")
pass
return True
else:
print("❌ Telemetry not disabled by environment variable")
pass
return False
def test_data_storage():
"""Test data storage functionality"""
print("\n💾 Testing data storage...")
# Silent for tests
try:
from telemetry import get_telemetry
@ -98,31 +98,28 @@ def test_data_storage():
collector = get_telemetry()
data_dir = collector.config.data_dir
print(f"📁 Data directory: {data_dir}")
print(f"🏷️ UUID file: {collector.config.uuid_file}")
print(f"🎯 Milestones file: {collector.config.milestones_file}")
_ = (data_dir, collector.config.uuid_file, collector.config.milestones_file)
# Check if files exist
if collector.config.uuid_file.exists():
print("✅ UUID file exists")
pass
else:
print(" UUID file will be created on first use")
pass
if collector.config.milestones_file.exists():
print("✅ Milestones file exists")
pass
else:
print(" Milestones file will be created on first milestone")
pass
return True
except Exception as e:
print(f"❌ Data storage test failed: {e}")
# Silent failure path for tests
return False
def main():
"""Run all telemetry tests"""
print("🚀 Unity MCP Telemetry Test Suite")
print("=" * 50)
# Silent runner for CI
tests = [
test_telemetry_basic,
@ -137,22 +134,21 @@ def main():
try:
if test():
passed += 1
print("✅ PASSED\n")
pass
else:
failed += 1
print("❌ FAILED\n")
pass
except Exception as e:
failed += 1
print(f"❌ FAILED with exception: {e}\n")
pass
print("=" * 50)
print(f"📊 Test Results: {passed} passed, {failed} failed")
_ = (passed, failed)
if failed == 0:
print("🎉 All telemetry tests passed!")
pass
return True
else:
print(f"⚠️ {failed} test(s) failed")
pass
return False
if __name__ == "__main__":

View File

@ -430,6 +430,11 @@ def register_manage_script_tools(mcp: FastMCP):
"level": level,
}
resp = send_command_with_retry("manage_script", params)
if isinstance(resp, dict) and resp.get("success"):
diags = resp.get("data", {}).get("diagnostics", [])
warnings = sum(1 for d in diags if str(d.get("severity", "")).lower() in ("warning",))
errors = sum(1 for d in diags if str(d.get("severity", "")).lower() in ("error", "fatal"))
return {"success": True, "data": {"warnings": warnings, "errors": errors}}
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
@mcp.tool(description=(
@ -606,6 +611,10 @@ def register_manage_script_tools(mcp: FastMCP):
name, directory = _split_uri(uri)
params = {"action": "get_sha", "name": name, "path": directory}
resp = send_command_with_retry("manage_script", params)
if isinstance(resp, dict) and resp.get("success"):
data = resp.get("data", {})
minimal = {"sha256": data.get("sha256"), "lengthBytes": data.get("lengthBytes")}
return {"success": True, "data": minimal}
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
except Exception as e:
return {"success": False, "message": f"get_sha error: {e}"}

View File

@ -90,4 +90,13 @@ def register_read_console_tools(mcp: FastMCP):
# Use centralized retry helper
resp = send_command_with_retry("read_console", params_dict)
if isinstance(resp, dict) and resp.get("success") and not include_stacktrace:
# Strip stacktrace fields from returned lines if present
try:
lines = resp.get("data", {}).get("lines", [])
for line in lines:
if isinstance(line, dict) and "stacktrace" in line:
line.pop("stacktrace", None)
except Exception:
pass
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}

View File

@ -324,25 +324,33 @@ def register_resource_tools(mcp: FastMCP) -> None:
head_bytes = _coerce_int(head_bytes, minimum=1)
tail_lines = _coerce_int(tail_lines, minimum=1)
# Mutually exclusive windowing options precedence:
# 1) head_bytes, 2) tail_lines, 3) start_line+line_count, else full text
if head_bytes and head_bytes > 0:
raw = p.read_bytes()[: head_bytes]
text = raw.decode("utf-8", errors="replace")
else:
text = p.read_text(encoding="utf-8")
if tail_lines is not None and tail_lines > 0:
lines = text.splitlines()
n = max(0, tail_lines)
text = "\n".join(lines[-n:])
elif start_line is not None and line_count is not None and line_count >= 0:
lines = text.splitlines()
s = max(0, start_line - 1)
e = min(len(lines), s + line_count)
text = "\n".join(lines[s:e])
# Compute SHA over full file contents (metadata-only default)
full_bytes = p.read_bytes()
full_sha = hashlib.sha256(full_bytes).hexdigest()
sha = hashlib.sha256(text.encode("utf-8")).hexdigest()
return {"success": True, "data": {"text": text, "metadata": {"sha256": sha}}}
# Selection only when explicitly requested via windowing args or request text hints
selection_requested = bool(head_bytes or tail_lines or (start_line is not None and line_count is not None) or request)
if selection_requested:
# Mutually exclusive windowing options precedence:
# 1) head_bytes, 2) tail_lines, 3) start_line+line_count, else full text
if head_bytes and head_bytes > 0:
raw = full_bytes[: head_bytes]
text = raw.decode("utf-8", errors="replace")
else:
text = full_bytes.decode("utf-8", errors="replace")
if tail_lines is not None and tail_lines > 0:
lines = text.splitlines()
n = max(0, tail_lines)
text = "\n".join(lines[-n:])
elif start_line is not None and line_count is not None and line_count >= 0:
lines = text.splitlines()
s = max(0, start_line - 1)
e = min(len(lines), s + line_count)
text = "\n".join(lines[s:e])
return {"success": True, "data": {"text": text, "metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}}
else:
# Default: metadata only
return {"success": True, "data": {"metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}}
except Exception as e:
return {"success": False, "error": str(e)}
@ -380,8 +388,16 @@ def register_resource_tools(mcp: FastMCP) -> None:
max_results_int = _coerce_int(max_results, default=200, minimum=1)
lines = text.splitlines()
for i, line in enumerate(lines, start=1):
if rx.search(line):
results.append({"line": i, "text": line})
m = rx.search(line)
if m:
start_col = m.start() + 1 # 1-based
end_col = m.end() + 1 # 1-based, end exclusive
results.append({
"startLine": i,
"startCol": start_col,
"endLine": i,
"endCol": end_col,
})
if max_results_int and len(results) >= max_results_int:
break