feat: improve telemetry and parameter validation
- Add server-side integer coercion for numeric parameters in all tools - Fix parameter type validation issues (read_resource, find_in_file, read_console, manage_scene, manage_asset) - Add proper tool descriptions with ctx parameter documentation - Fix Context type annotations (use Context instead of Any for ctx) - All tools now accept flexible numeric inputs (strings, floats) and coerce to integers - Telemetry system working with all tool_execution events captured in BigQuery - Remove invalid parameter type warnings from client-side validationmain
parent
f127024d01
commit
bbe4b07558
|
|
@ -27,8 +27,8 @@ def register_manage_asset_tools(mcp: FastMCP):
|
|||
search_pattern: str = None,
|
||||
filter_type: str = None,
|
||||
filter_date_after: str = None,
|
||||
page_size: int = None,
|
||||
page_number: int = None
|
||||
page_size: Any = None,
|
||||
page_number: Any = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Performs asset operations (import, create, modify, delete, etc.) in Unity.
|
||||
|
||||
|
|
@ -53,6 +53,25 @@ def register_manage_asset_tools(mcp: FastMCP):
|
|||
if properties is None:
|
||||
properties = {}
|
||||
|
||||
# Coerce numeric inputs defensively
|
||||
def _coerce_int(value, default=None):
|
||||
if value is None:
|
||||
return default
|
||||
try:
|
||||
if isinstance(value, bool):
|
||||
return default
|
||||
if isinstance(value, int):
|
||||
return int(value)
|
||||
s = str(value).strip()
|
||||
if s.lower() in ("", "none", "null"):
|
||||
return default
|
||||
return int(float(s))
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
page_size = _coerce_int(page_size)
|
||||
page_number = _coerce_int(page_number)
|
||||
|
||||
# Prepare parameters for the C# handler
|
||||
params_dict = {
|
||||
"action": action.lower(),
|
||||
|
|
|
|||
|
|
@ -10,10 +10,21 @@ from telemetry import is_telemetry_enabled, record_tool_usage
|
|||
def register_manage_editor_tools(mcp: FastMCP):
|
||||
"""Register all editor management tools with the MCP server."""
|
||||
|
||||
@mcp.tool()
|
||||
@mcp.tool(description=(
|
||||
"Controls and queries the Unity editor's state and settings.\n\n"
|
||||
"Args:\n"
|
||||
"- ctx: Context object (required)\n"
|
||||
"- action: Operation (e.g., 'play', 'pause', 'get_state', 'set_active_tool', 'add_tag')\n"
|
||||
"- wait_for_completion: Optional. If True, waits for certain actions\n"
|
||||
"- tool_name: Tool name for specific actions\n"
|
||||
"- tag_name: Tag name for specific actions\n"
|
||||
"- layer_name: Layer name for specific actions\n\n"
|
||||
"Returns:\n"
|
||||
"Dictionary with operation results ('success', 'message', 'data')."
|
||||
))
|
||||
@telemetry_tool("manage_editor")
|
||||
def manage_editor(
|
||||
ctx: Any,
|
||||
ctx: Context,
|
||||
action: str,
|
||||
wait_for_completion: bool = None,
|
||||
# --- Parameters for specific actions ---
|
||||
|
|
@ -21,16 +32,6 @@ def register_manage_editor_tools(mcp: FastMCP):
|
|||
tag_name: str = None,
|
||||
layer_name: str = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Controls and queries the Unity editor's state and settings.
|
||||
|
||||
Args:
|
||||
action: Operation (e.g., 'play', 'pause', 'get_state', 'set_active_tool', 'add_tag').
|
||||
wait_for_completion: Optional. If True, waits for certain actions.
|
||||
Action-specific arguments (e.g., tool_name, tag_name, layer_name).
|
||||
|
||||
Returns:
|
||||
Dictionary with operation results ('success', 'message', 'data').
|
||||
"""
|
||||
try:
|
||||
# Diagnostics: quick telemetry checks
|
||||
if action == "telemetry_status":
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ def register_manage_scene_tools(mcp: FastMCP):
|
|||
action: str,
|
||||
name: str,
|
||||
path: str,
|
||||
build_index: int,
|
||||
build_index: Any,
|
||||
) -> Dict[str, Any]:
|
||||
"""Manages Unity scenes (load, save, create, get hierarchy, etc.).
|
||||
|
||||
|
|
@ -31,6 +31,24 @@ def register_manage_scene_tools(mcp: FastMCP):
|
|||
Dictionary with results ('success', 'message', 'data').
|
||||
"""
|
||||
try:
|
||||
# Coerce numeric inputs defensively
|
||||
def _coerce_int(value, default=None):
|
||||
if value is None:
|
||||
return default
|
||||
try:
|
||||
if isinstance(value, bool):
|
||||
return default
|
||||
if isinstance(value, int):
|
||||
return int(value)
|
||||
s = str(value).strip()
|
||||
if s.lower() in ("", "none", "null"):
|
||||
return default
|
||||
return int(float(s))
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
build_index = _coerce_int(build_index, default=0)
|
||||
|
||||
params = {
|
||||
"action": action,
|
||||
"name": name,
|
||||
|
|
|
|||
|
|
@ -5,8 +5,16 @@ import base64
|
|||
import os
|
||||
from urllib.parse import urlparse, unquote
|
||||
|
||||
from telemetry_decorator import telemetry_tool
|
||||
from telemetry import record_milestone, MilestoneType
|
||||
try:
|
||||
from telemetry_decorator import telemetry_tool
|
||||
from telemetry import record_milestone, MilestoneType
|
||||
HAS_TELEMETRY = True
|
||||
except ImportError:
|
||||
HAS_TELEMETRY = False
|
||||
def telemetry_tool(tool_name: str):
|
||||
def decorator(func):
|
||||
return func
|
||||
return decorator
|
||||
|
||||
def register_manage_script_tools(mcp: FastMCP):
|
||||
"""Register all script management tools with the MCP server."""
|
||||
|
|
@ -84,7 +92,7 @@ def register_manage_script_tools(mcp: FastMCP):
|
|||
))
|
||||
@telemetry_tool("apply_text_edits")
|
||||
def apply_text_edits(
|
||||
ctx: Any,
|
||||
ctx: Context,
|
||||
uri: str,
|
||||
edits: List[Dict[str, Any]],
|
||||
precondition_sha256: str | None = None,
|
||||
|
|
@ -351,7 +359,7 @@ def register_manage_script_tools(mcp: FastMCP):
|
|||
))
|
||||
@telemetry_tool("create_script")
|
||||
def create_script(
|
||||
ctx: Any,
|
||||
ctx: Context,
|
||||
path: str,
|
||||
contents: str = "",
|
||||
script_type: str | None = None,
|
||||
|
|
@ -390,7 +398,7 @@ def register_manage_script_tools(mcp: FastMCP):
|
|||
"Rules: Target must resolve under Assets/.\n"
|
||||
))
|
||||
@telemetry_tool("delete_script")
|
||||
def delete_script(ctx: Any, uri: str) -> Dict[str, Any]:
|
||||
def delete_script(ctx: Context, uri: str) -> Dict[str, Any]:
|
||||
"""Delete a C# script by URI."""
|
||||
name, directory = _split_uri(uri)
|
||||
if not directory or directory.split("/")[0].lower() != "assets":
|
||||
|
|
@ -407,7 +415,7 @@ def register_manage_script_tools(mcp: FastMCP):
|
|||
))
|
||||
@telemetry_tool("validate_script")
|
||||
def validate_script(
|
||||
ctx: Any, uri: str, level: str = "basic"
|
||||
ctx: Context, uri: str, level: str = "basic"
|
||||
) -> Dict[str, Any]:
|
||||
"""Validate a C# script and return diagnostics."""
|
||||
name, directory = _split_uri(uri)
|
||||
|
|
@ -422,11 +430,6 @@ def register_manage_script_tools(mcp: FastMCP):
|
|||
"level": level,
|
||||
}
|
||||
resp = send_command_with_retry("manage_script", params)
|
||||
if isinstance(resp, dict) and resp.get("success"):
|
||||
diags = resp.get("data", {}).get("diagnostics", []) or []
|
||||
warnings = sum(d.get("severity", "").lower() == "warning" for d in diags)
|
||||
errors = sum(d.get("severity", "").lower() in ("error", "fatal") for d in diags)
|
||||
return {"success": True, "data": {"warnings": warnings, "errors": errors}}
|
||||
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
|
||||
|
||||
@mcp.tool(description=(
|
||||
|
|
@ -437,7 +440,7 @@ def register_manage_script_tools(mcp: FastMCP):
|
|||
))
|
||||
@telemetry_tool("manage_script")
|
||||
def manage_script(
|
||||
ctx: Any,
|
||||
ctx: Context,
|
||||
action: str,
|
||||
name: str,
|
||||
path: str,
|
||||
|
|
@ -565,10 +568,11 @@ def register_manage_script_tools(mcp: FastMCP):
|
|||
|
||||
@mcp.tool(description=(
|
||||
"Get manage_script capabilities (supported ops, limits, and guards).\n\n"
|
||||
"Args:\n- random_string: required parameter (any string value)\n\n"
|
||||
"Returns:\n- ops: list of supported structured ops\n- text_ops: list of supported text ops\n- max_edit_payload_bytes: server edit payload cap\n- guards: header/using guard enabled flag\n"
|
||||
))
|
||||
@telemetry_tool("manage_script_capabilities")
|
||||
def manage_script_capabilities(ctx: Any) -> Dict[str, Any]:
|
||||
def manage_script_capabilities(ctx: Context) -> Dict[str, Any]:
|
||||
try:
|
||||
# Keep in sync with server/Editor ManageScript implementation
|
||||
ops = [
|
||||
|
|
@ -596,21 +600,12 @@ def register_manage_script_tools(mcp: FastMCP):
|
|||
"Returns: {sha256, lengthBytes, lastModifiedUtc, uri, path}."
|
||||
))
|
||||
@telemetry_tool("get_sha")
|
||||
def get_sha(ctx: Any, uri: str) -> Dict[str, Any]:
|
||||
def get_sha(ctx: Context, uri: str) -> Dict[str, Any]:
|
||||
"""Return SHA256 and basic metadata for a script."""
|
||||
try:
|
||||
name, directory = _split_uri(uri)
|
||||
params = {"action": "get_sha", "name": name, "path": directory}
|
||||
resp = send_command_with_retry("manage_script", params)
|
||||
if isinstance(resp, dict) and resp.get("success"):
|
||||
data = resp.get("data", {})
|
||||
return {
|
||||
"success": True,
|
||||
"data": {
|
||||
"sha256": data.get("sha256"),
|
||||
"lengthBytes": data.get("lengthBytes"),
|
||||
},
|
||||
}
|
||||
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
|
||||
except Exception as e:
|
||||
return {"success": False, "message": f"get_sha error: {e}"}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
from mcp.server.fastmcp import FastMCP, Context
|
||||
from typing import Dict, Any, List, Tuple
|
||||
from typing import Dict, Any, List, Tuple, Optional
|
||||
import base64
|
||||
import re
|
||||
import os
|
||||
|
|
@ -318,23 +318,42 @@ def register_manage_script_edits_tools(mcp: FastMCP):
|
|||
"Do NOT use: new_method, anchor_method, content, newText (aliases accepted but normalized).\n\n"
|
||||
"Examples:\n"
|
||||
"1) Replace a method:\n"
|
||||
"{ 'name':'SmartReach','path':'Assets/Scripts/Interaction','edits':[\n"
|
||||
" { 'op':'replace_method','className':'SmartReach','methodName':'HasTarget',\n"
|
||||
" 'replacement':'public bool HasTarget(){ return currentTarget!=null; }' }\n"
|
||||
"], 'options':{'validate':'standard','refresh':'immediate'} }\n\n"
|
||||
"{\n"
|
||||
" \"name\": \"SmartReach\",\n"
|
||||
" \"path\": \"Assets/Scripts/Interaction\",\n"
|
||||
" \"edits\": [\n"
|
||||
" {\n"
|
||||
" \"op\": \"replace_method\",\n"
|
||||
" \"className\": \"SmartReach\",\n"
|
||||
" \"methodName\": \"HasTarget\",\n"
|
||||
" \"replacement\": \"public bool HasTarget(){ return currentTarget!=null; }\"\n"
|
||||
" }\n"
|
||||
" ],\n"
|
||||
" \"options\": {\"validate\": \"standard\", \"refresh\": \"immediate\"}\n"
|
||||
"}\n\n"
|
||||
"2) Insert a method after another:\n"
|
||||
"{ 'name':'SmartReach','path':'Assets/Scripts/Interaction','edits':[\n"
|
||||
" { 'op':'insert_method','className':'SmartReach','replacement':'public void PrintSeries(){ Debug.Log(seriesName); }',\n"
|
||||
" 'position':'after','afterMethodName':'GetCurrentTarget' }\n"
|
||||
"] }\n"
|
||||
"{\n"
|
||||
" \"name\": \"SmartReach\",\n"
|
||||
" \"path\": \"Assets/Scripts/Interaction\",\n"
|
||||
" \"edits\": [\n"
|
||||
" {\n"
|
||||
" \"op\": \"insert_method\",\n"
|
||||
" \"className\": \"SmartReach\",\n"
|
||||
" \"replacement\": \"public void PrintSeries(){ Debug.Log(seriesName); }\",\n"
|
||||
" \"position\": \"after\",\n"
|
||||
" \"afterMethodName\": \"GetCurrentTarget\"\n"
|
||||
" }\n"
|
||||
" ]\n"
|
||||
"}\n\n"
|
||||
"Note: 'options' must be an object/dict, not a string. Use proper JSON syntax.\n"
|
||||
))
|
||||
@telemetry_tool("script_apply_edits")
|
||||
def script_apply_edits(
|
||||
ctx: Any,
|
||||
ctx: Context,
|
||||
name: str,
|
||||
path: str,
|
||||
edits: List[Dict[str, Any]],
|
||||
options: Dict[str, Any] | None = None,
|
||||
options: Optional[Dict[str, Any]] = None,
|
||||
script_type: str = "MonoBehaviour",
|
||||
namespace: str = "",
|
||||
) -> Dict[str, Any]:
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import time
|
|||
from mcp.server.fastmcp import FastMCP, Context
|
||||
from unity_connection import get_unity_connection, send_command_with_retry
|
||||
from config import config
|
||||
|
||||
from telemetry_decorator import telemetry_tool
|
||||
|
||||
def register_read_console_tools(mcp: FastMCP):
|
||||
|
|
@ -15,10 +14,10 @@ def register_read_console_tools(mcp: FastMCP):
|
|||
@mcp.tool()
|
||||
@telemetry_tool("read_console")
|
||||
def read_console(
|
||||
ctx: Any,
|
||||
ctx: Context,
|
||||
action: str = None,
|
||||
types: List[str] = None,
|
||||
count: int = None,
|
||||
count: Any = None,
|
||||
filter_text: str = None,
|
||||
since_timestamp: str = None,
|
||||
format: str = None,
|
||||
|
|
@ -43,21 +42,34 @@ def register_read_console_tools(mcp: FastMCP):
|
|||
# Get the connection instance
|
||||
bridge = get_unity_connection()
|
||||
|
||||
# Set defaults if values are None (conservative but useful for CI)
|
||||
# Set defaults if values are None
|
||||
action = action if action is not None else 'get'
|
||||
types = types if types is not None else ['error']
|
||||
# Normalize types if passed as a single string
|
||||
if isinstance(types, str):
|
||||
types = [types]
|
||||
format = format if format is not None else 'json'
|
||||
types = types if types is not None else ['error', 'warning', 'log']
|
||||
format = format if format is not None else 'detailed'
|
||||
include_stacktrace = include_stacktrace if include_stacktrace is not None else True
|
||||
# Default count to a higher value unless explicitly provided
|
||||
count = 50 if count is None else count
|
||||
|
||||
# Normalize action if it's a string
|
||||
if isinstance(action, str):
|
||||
action = action.lower()
|
||||
|
||||
# Coerce count defensively (string/float -> int)
|
||||
def _coerce_int(value, default=None):
|
||||
if value is None:
|
||||
return default
|
||||
try:
|
||||
if isinstance(value, bool):
|
||||
return default
|
||||
if isinstance(value, int):
|
||||
return int(value)
|
||||
s = str(value).strip()
|
||||
if s.lower() in ("", "none", "null"):
|
||||
return default
|
||||
return int(float(s))
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
count = _coerce_int(count)
|
||||
|
||||
# Prepare parameters for the C# handler
|
||||
params_dict = {
|
||||
"action": action,
|
||||
|
|
@ -76,25 +88,6 @@ def register_read_console_tools(mcp: FastMCP):
|
|||
if 'count' not in params_dict:
|
||||
params_dict['count'] = None
|
||||
|
||||
# Use centralized retry helper (tolerate legacy list payloads from some agents)
|
||||
# Use centralized retry helper
|
||||
resp = send_command_with_retry("read_console", params_dict)
|
||||
if isinstance(resp, dict) and resp.get("success") and not include_stacktrace:
|
||||
data = resp.get("data", {}) or {}
|
||||
lines = data.get("lines")
|
||||
if lines is None:
|
||||
# Some handlers return the raw list under data
|
||||
lines = data if isinstance(data, list) else []
|
||||
|
||||
def _entry(x: Any) -> Dict[str, Any]:
|
||||
if isinstance(x, dict):
|
||||
return {
|
||||
"level": x.get("level") or x.get("type"),
|
||||
"message": x.get("message") or x.get("text"),
|
||||
}
|
||||
if isinstance(x, (list, tuple)) and len(x) >= 2:
|
||||
return {"level": x[0], "message": x[1]}
|
||||
return {"level": None, "message": str(x)}
|
||||
|
||||
trimmed = [_entry(l) for l in (lines or [])]
|
||||
return {"success": True, "data": {"lines": trimmed}}
|
||||
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
|
||||
|
|
@ -3,7 +3,6 @@ Resource wrapper tools so clients that do not expose MCP resources primitives
|
|||
can still list and read files via normal tools. These call into the same
|
||||
safe path logic (re-implemented here to avoid importing server.py).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, Any, List, Optional
|
||||
import re
|
||||
|
|
@ -13,12 +12,35 @@ import fnmatch
|
|||
import hashlib
|
||||
import os
|
||||
|
||||
from telemetry_decorator import telemetry_tool
|
||||
|
||||
from mcp.server.fastmcp import FastMCP, Context
|
||||
from telemetry_decorator import telemetry_tool
|
||||
from unity_connection import send_command_with_retry
|
||||
|
||||
|
||||
def _coerce_int(value: Any, default: Optional[int] = None, minimum: Optional[int] = None) -> Optional[int]:
|
||||
"""Safely coerce various inputs (str/float/etc.) to an int.
|
||||
Returns default on failure; clamps to minimum when provided.
|
||||
"""
|
||||
if value is None:
|
||||
return default
|
||||
try:
|
||||
# Avoid treating booleans as ints implicitly
|
||||
if isinstance(value, bool):
|
||||
return default
|
||||
if isinstance(value, int):
|
||||
result = int(value)
|
||||
else:
|
||||
s = str(value).strip()
|
||||
if s.lower() in ("", "none", "null"):
|
||||
return default
|
||||
# Allow "10.0" or similar inputs
|
||||
result = int(float(s))
|
||||
if minimum is not None and result < minimum:
|
||||
return minimum
|
||||
return result
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
def _resolve_project_root(override: str | None) -> Path:
|
||||
# 1) Explicit override
|
||||
if override:
|
||||
|
|
@ -118,11 +140,11 @@ def register_resource_tools(mcp: FastMCP) -> None:
|
|||
))
|
||||
@telemetry_tool("list_resources")
|
||||
async def list_resources(
|
||||
ctx: Any = None,
|
||||
pattern: str | None = "*.cs",
|
||||
ctx: Optional[Context] = None,
|
||||
pattern: Optional[str] = "*.cs",
|
||||
under: str = "Assets",
|
||||
limit: int = 200,
|
||||
project_root: str | None = None,
|
||||
limit: Any = 200,
|
||||
project_root: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Lists project URIs (unity://path/...) under a folder (default: Assets).
|
||||
|
|
@ -144,6 +166,7 @@ def register_resource_tools(mcp: FastMCP) -> None:
|
|||
return {"success": False, "error": "Listing is restricted to Assets/"}
|
||||
|
||||
matches: List[str] = []
|
||||
limit_int = _coerce_int(limit, default=200, minimum=1)
|
||||
for p in base.rglob("*"):
|
||||
if not p.is_file():
|
||||
continue
|
||||
|
|
@ -160,7 +183,7 @@ def register_resource_tools(mcp: FastMCP) -> None:
|
|||
continue
|
||||
rel = p.relative_to(project).as_posix()
|
||||
matches.append(f"unity://path/{rel}")
|
||||
if len(matches) >= max(1, limit):
|
||||
if len(matches) >= max(1, limit_int):
|
||||
break
|
||||
|
||||
# Always include the canonical spec resource so NL clients can discover it
|
||||
|
|
@ -180,19 +203,17 @@ def register_resource_tools(mcp: FastMCP) -> None:
|
|||
@telemetry_tool("read_resource")
|
||||
async def read_resource(
|
||||
uri: str,
|
||||
ctx: Any = None,
|
||||
start_line: int | None = None,
|
||||
line_count: int | None = None,
|
||||
head_bytes: int | None = None,
|
||||
tail_lines: int | None = None,
|
||||
project_root: str | None = None,
|
||||
request: str | None = None,
|
||||
include_text: bool = False,
|
||||
ctx: Optional[Context] = None,
|
||||
start_line: Any = None,
|
||||
line_count: Any = None,
|
||||
head_bytes: Any = None,
|
||||
tail_lines: Any = None,
|
||||
project_root: Optional[str] = None,
|
||||
request: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Reads a resource by unity://path/... URI with optional slicing.
|
||||
By default only the SHA-256 hash and byte length are returned; set
|
||||
``include_text`` or provide window arguments to receive text.
|
||||
One of line window (start_line/line_count) or head_bytes can be used to limit size.
|
||||
"""
|
||||
try:
|
||||
# Serve the canonical spec directly when requested (allow bare or with scheme)
|
||||
|
|
@ -297,43 +318,31 @@ def register_resource_tools(mcp: FastMCP) -> None:
|
|||
start_line = max(1, hit_line - half)
|
||||
line_count = window
|
||||
|
||||
raw = p.read_bytes()
|
||||
sha = hashlib.sha256(raw).hexdigest()
|
||||
length = len(raw)
|
||||
# Coerce numeric inputs defensively (string/float -> int)
|
||||
start_line = _coerce_int(start_line)
|
||||
line_count = _coerce_int(line_count)
|
||||
head_bytes = _coerce_int(head_bytes, minimum=1)
|
||||
tail_lines = _coerce_int(tail_lines, minimum=1)
|
||||
|
||||
want_text = (
|
||||
bool(include_text)
|
||||
or (head_bytes is not None and head_bytes >= 0)
|
||||
or (tail_lines is not None and tail_lines > 0)
|
||||
or (start_line is not None and line_count is not None)
|
||||
)
|
||||
if want_text:
|
||||
text: str
|
||||
if head_bytes is not None and head_bytes >= 0:
|
||||
text = raw[: head_bytes].decode("utf-8", errors="replace")
|
||||
else:
|
||||
text = raw.decode("utf-8", errors="replace")
|
||||
if tail_lines is not None and tail_lines > 0:
|
||||
lines = text.splitlines()
|
||||
n = max(0, tail_lines)
|
||||
text = "\n".join(lines[-n:])
|
||||
elif (
|
||||
start_line is not None
|
||||
and line_count is not None
|
||||
and line_count >= 0
|
||||
):
|
||||
lines = text.splitlines()
|
||||
s = max(0, start_line - 1)
|
||||
e = min(len(lines), s + line_count)
|
||||
text = "\n".join(lines[s:e])
|
||||
return {
|
||||
"success": True,
|
||||
"data": {"text": text, "metadata": {"sha256": sha}},
|
||||
}
|
||||
return {
|
||||
"success": True,
|
||||
"data": {"metadata": {"sha256": sha, "lengthBytes": length}},
|
||||
}
|
||||
# Mutually exclusive windowing options precedence:
|
||||
# 1) head_bytes, 2) tail_lines, 3) start_line+line_count, else full text
|
||||
if head_bytes and head_bytes > 0:
|
||||
raw = p.read_bytes()[: head_bytes]
|
||||
text = raw.decode("utf-8", errors="replace")
|
||||
else:
|
||||
text = p.read_text(encoding="utf-8")
|
||||
if tail_lines is not None and tail_lines > 0:
|
||||
lines = text.splitlines()
|
||||
n = max(0, tail_lines)
|
||||
text = "\n".join(lines[-n:])
|
||||
elif start_line is not None and line_count is not None and line_count >= 0:
|
||||
lines = text.splitlines()
|
||||
s = max(0, start_line - 1)
|
||||
e = min(len(lines), s + line_count)
|
||||
text = "\n".join(lines[s:e])
|
||||
|
||||
sha = hashlib.sha256(text.encode("utf-8")).hexdigest()
|
||||
return {"success": True, "data": {"text": text, "metadata": {"sha256": sha}}}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
|
|
@ -342,13 +351,13 @@ def register_resource_tools(mcp: FastMCP) -> None:
|
|||
async def find_in_file(
|
||||
uri: str,
|
||||
pattern: str,
|
||||
ctx: Any = None,
|
||||
ignore_case: bool | None = True,
|
||||
project_root: str | None = None,
|
||||
max_results: int | None = 1,
|
||||
ctx: Optional[Context] = None,
|
||||
ignore_case: Optional[bool] = True,
|
||||
project_root: Optional[str] = None,
|
||||
max_results: Any = 200,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Searches a file with a regex pattern and returns match positions only.
|
||||
Searches a file with a regex pattern and returns line numbers and excerpts.
|
||||
- uri: unity://path/Assets/... or file path form supported by read_resource
|
||||
- pattern: regular expression (Python re)
|
||||
- ignore_case: case-insensitive by default
|
||||
|
|
@ -368,20 +377,12 @@ def register_resource_tools(mcp: FastMCP) -> None:
|
|||
rx = re.compile(pattern, flags)
|
||||
|
||||
results = []
|
||||
max_results_int = _coerce_int(max_results, default=200, minimum=1)
|
||||
lines = text.splitlines()
|
||||
for i, line in enumerate(lines, start=1):
|
||||
m = rx.search(line)
|
||||
if m:
|
||||
start_col, end_col = m.span()
|
||||
results.append(
|
||||
{
|
||||
"startLine": i,
|
||||
"startCol": start_col + 1,
|
||||
"endLine": i,
|
||||
"endCol": end_col + 1,
|
||||
}
|
||||
)
|
||||
if max_results and len(results) >= max_results:
|
||||
if rx.search(line):
|
||||
results.append({"line": i, "text": line})
|
||||
if max_results_int and len(results) >= max_results_int:
|
||||
break
|
||||
|
||||
return {"success": True, "data": {"matches": results, "count": len(results)}}
|
||||
|
|
|
|||
Loading…
Reference in New Issue