diff --git a/plugins/text_editor/default_config.yaml b/plugins/text_editor/default_config.yaml new file mode 100644 index 000000000..c3064d4eb --- /dev/null +++ b/plugins/text_editor/default_config.yaml @@ -0,0 +1,3 @@ +max_line_tokens: 500 +default_line_count: 100 +max_total_read_tokens: 4000 diff --git a/plugins/text_editor/extensions/.gitkeep b/plugins/text_editor/extensions/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/text_editor/extensions/python/system_prompt/_15_text_editor_prompt.py b/plugins/text_editor/extensions/python/system_prompt/_15_text_editor_prompt.py new file mode 100644 index 000000000..96ea55582 --- /dev/null +++ b/plugins/text_editor/extensions/python/system_prompt/_15_text_editor_prompt.py @@ -0,0 +1,20 @@ +from python.helpers.extension import Extension +from python.helpers import plugins +from agent import Agent, LoopData + + +class TextEditorPrompt(Extension): + + async def execute( + self, + system_prompt: list[str] = [], + loop_data: LoopData = LoopData(), + **kwargs, + ): + config = plugins.get_plugin_config("text_editor", agent=self.agent) or {} + default_line_count = config.get("default_line_count", 100) + prompt = self.agent.read_prompt( + "agent.system.tool.text_editor.md", + default_line_count=default_line_count, + ) + system_prompt.append(prompt) diff --git a/plugins/text_editor/helpers/__init__.py b/plugins/text_editor/helpers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/text_editor/helpers/file_ops.py b/plugins/text_editor/helpers/file_ops.py new file mode 100644 index 000000000..c71b3221e --- /dev/null +++ b/plugins/text_editor/helpers/file_ops.py @@ -0,0 +1,321 @@ +""" +Pure file operations for the text_editor plugin. + +No agent/tool dependencies — only stdlib + tokens helper. +""" + +import os +import shutil +import tempfile +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from python.helpers import plugins, tokens + +if TYPE_CHECKING: + from agent import Agent + +_BINARY_PEEK = 8192 + + +# ------------------------------------------------------------------ +# Config +# ------------------------------------------------------------------ + +def get_config(agent: "Agent") -> dict: + config = plugins.get_plugin_config("text_editor", agent=agent) or {} + return { + "max_line_tokens": int(config.get("max_line_tokens", 500)), + "default_line_count": int(config.get("default_line_count", 100)), + "max_total_read_tokens": int(config.get("max_total_read_tokens", 4000)), + } + + +# ------------------------------------------------------------------ +# Binary detection +# ------------------------------------------------------------------ + +def is_binary(path: str) -> bool: + """Detect binary file by checking for null bytes.""" + try: + with open(path, "rb") as f: + chunk = f.read(_BINARY_PEEK) + return b"\x00" in chunk + except OSError: + return False + + +# ------------------------------------------------------------------ +# Read +# ------------------------------------------------------------------ + +@dataclass +class ReadResult: + content: str = "" + total_lines: int = 0 + warnings: str = "" + error: str = "" + + +def read_file( + path: str, + line_from: int = 0, + line_to: int = 0, + max_line_tokens: int = 500, + default_line_count: int = 100, + max_total_read_tokens: int = 4000, +) -> ReadResult: + """Read a text file and return numbered lines with token budgeting.""" + path = os.path.expanduser(path) + + if not os.path.isfile(path): + return ReadResult(error="file not found") + + if is_binary(path): + return ReadResult(error="file appears binary, use terminal instead") + + try: + with open(path, "r", encoding="utf-8", errors="replace") as f: + all_lines = f.readlines() + except OSError as exc: + return ReadResult(error=str(exc)) + + total_lines = len(all_lines) + line_from = max(line_from, 0) + if not line_to: + line_to = min(line_from + default_line_count, total_lines) + line_to = min(line_to, total_lines) + + selected = all_lines[line_from:line_to] + + warn_parts: list[str] = [] + cropped_lines: list[int] = [] + output_lines: list[str] = [] + running_tokens = 0 + trimmed_by_total = False + + for i, raw_line in enumerate(selected): + line_no = line_from + i + stripped = raw_line.rstrip("\n").rstrip("\r") + line_tok = tokens.count_tokens(stripped) + + if line_tok > max_line_tokens: + chars_per_tok = max(len(stripped) / line_tok, 1) + keep_chars = int(max_line_tokens * chars_per_tok * tokens.TRIM_BUFFER) + stripped = stripped[:keep_chars] + "..." + cropped_lines.append(line_no) + line_tok = max_line_tokens + + if running_tokens + line_tok > max_total_read_tokens: + trimmed_by_total = True + break + + running_tokens += line_tok + output_lines.append(f"{line_no} {stripped}") + + if cropped_lines: + nums = " ".join(str(n) for n in cropped_lines) + warn_parts.append( + f"long lines {nums} cropped - use terminal for precise manipulation" + ) + if trimmed_by_total: + actual_end = line_from + len(output_lines) + warn_parts.append( + f"output trimmed at line {actual_end} due to token limit" + " - use line_from/line_to for remaining" + ) + + warn_str = "" + if warn_parts: + warn_str = "\nwarning: " + "; ".join(warn_parts) + + return ReadResult( + content="\n".join(output_lines), + total_lines=total_lines, + warnings=warn_str, + ) + + +# ------------------------------------------------------------------ +# Write +# ------------------------------------------------------------------ + +@dataclass +class WriteResult: + total_lines: int = 0 + error: str = "" + + +def write_file(path: str, content: str) -> WriteResult: + """Create or overwrite a file.""" + path = os.path.expanduser(path) + try: + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + f.write(content) + except OSError as exc: + return WriteResult(error=str(exc)) + + total = content.count("\n") + ( + 1 if content and not content.endswith("\n") else 0 + ) + return WriteResult(total_lines=total) + + +# ------------------------------------------------------------------ +# Patch +# ------------------------------------------------------------------ + +@dataclass +class PatchResult: + total_lines: int = 0 + edit_count: int = 0 + error: str = "" + + +def validate_edits(edits: list) -> tuple[list[dict], str]: + """ + Normalise and validate an edits array. + + Semantics (to is inclusive): + {from:2, to:2, content:"x\\n"} - replace line 2 + {from:1, to:3, content:"x\\n"} - replace lines 1-3 + {from:2, to:2} - delete line 2 + {from:5} or {from:5, to:-1} - insert before line 5 (no deletion) + + Returns (parsed_edits, error_string). error_string is empty on success. + """ + if not edits or not isinstance(edits, list): + return [], "edits array is required" + + parsed: list[dict] = [] + for e in edits: + if not isinstance(e, dict): + return [], f"invalid edit entry: {e}" + frm = int(e.get("from", -1)) + if frm < 0: + return [], f"edit missing from: {e}" + # to == -1 or absent means pure insert (no lines removed) + to = int(e.get("to", -1)) + is_insert = to < 0 or to < frm + if is_insert: + to = frm - 1 # normalise: marks zero-width range + parsed.append({ + "from": frm, + "to": to, + "content": e.get("content", ""), + "insert": is_insert, + }) + + parsed.sort(key=lambda x: (x["from"], 0 if x["insert"] else 1)) + for i in range(1, len(parsed)): + prev, cur = parsed[i - 1], parsed[i] + # Inserts at the same line don't overlap with each other or + # with a replace that starts at the same line. + if prev["insert"]: + continue + # prev is a replace/delete: its range is [from..to] inclusive + if cur["from"] <= prev["to"]: + return [], ( + f"overlapping edits: edit at {prev['from']}" + f" (to {prev['to']}) and {cur['from']}" + f" (to {cur['to']})" + ) + + return parsed, "" + + +def apply_patch(path: str, edits: list[dict]) -> int: + """ + Apply sorted, validated edits by streaming to a temp file. + + Edits use inclusive 'to'. Inserts have 'insert': True. + Returns total line count after patching. + """ + dir_name = os.path.dirname(path) or "." + fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp") + try: + with ( + open(path, "r", encoding="utf-8", errors="replace") as src, + os.fdopen(fd, "w", encoding="utf-8") as dst, + ): + edit_idx = 0 + line_no = 0 + total_written = 0 + + for raw_line in src: + # Process all inserts targeting this line first + while ( + edit_idx < len(edits) + and edits[edit_idx]["insert"] + and edits[edit_idx]["from"] == line_no + ): + edit = edits[edit_idx] + if edit["content"]: + dst.write(edit["content"]) + total_written += _count_content_lines(edit["content"]) + edit_idx += 1 + + # Check if current line falls in a replace/delete range + if edit_idx < len(edits) and not edits[edit_idx]["insert"]: + edit = edits[edit_idx] + if edit["from"] <= line_no <= edit["to"]: + # Write replacement content once at range start + if line_no == edit["from"] and edit["content"]: + dst.write(edit["content"]) + total_written += _count_content_lines( + edit["content"] + ) + # Skip original line; advance edit at range end + if line_no == edit["to"]: + edit_idx += 1 + line_no += 1 + continue + + dst.write(raw_line) + total_written += 1 + line_no += 1 + + # Remaining edits past end of file + while edit_idx < len(edits): + edit = edits[edit_idx] + if edit["content"]: + dst.write(edit["content"]) + total_written += _count_content_lines(edit["content"]) + edit_idx += 1 + + shutil.move(tmp_path, path) + return total_written + except Exception: + if os.path.exists(tmp_path): + os.unlink(tmp_path) + raise + + +def patch_file(path: str, edits: list) -> PatchResult: + """Validate and apply edits to a file.""" + path = os.path.expanduser(path) + if not os.path.isfile(path): + return PatchResult(error="file not found") + + parsed, err = validate_edits(edits) + if err: + return PatchResult(error=err) + + try: + total = apply_patch(path, parsed) + except Exception as exc: + return PatchResult(error=str(exc)) + + return PatchResult(total_lines=total, edit_count=len(parsed)) + + +# ------------------------------------------------------------------ +# Internal +# ------------------------------------------------------------------ + +def _count_content_lines(content: str) -> int: + return content.count("\n") + ( + 1 if content and not content.endswith("\n") else 0 + ) + \ No newline at end of file diff --git a/plugins/text_editor/plugin.yaml b/plugins/text_editor/plugin.yaml new file mode 100644 index 000000000..da84ddb77 --- /dev/null +++ b/plugins/text_editor/plugin.yaml @@ -0,0 +1,7 @@ +name: Text Editor +description: Native text file read, write and patch tools with line numbers. +version: 1.0.0 +settings_sections: + - agent +per_project_config: false +per_agent_config: false diff --git a/plugins/text_editor/prompts/agent.system.tool.text_editor.md b/plugins/text_editor/prompts/agent.system.tool.text_editor.md new file mode 100644 index 000000000..290df6d79 --- /dev/null +++ b/plugins/text_editor/prompts/agent.system.tool.text_editor.md @@ -0,0 +1,70 @@ +### text_editor +native file read write patch with line numbers +no code execution creating viewing editing text files +terminal (grep find sed) search advanced replacements + +#### text_editor:read +read file numbered lines +args path line_from (inclusive) line_to (inclusive) both optional +defaults first {{default_line_count}} lines if no range +usage: +~~~json +{ + "thoughts": [ + "..." + ], + "headline": "...", + "tool_name": "text_editor:read", + "tool_args": { + "path": "/path/file.py", + "line_from": 1, + "line_to": 50 + } +} +~~~ + +#### text_editor:write +create overwrite entire file +args path content +usage: +~~~json +{ + "thoughts": [ + "..." + ], + "headline": "...", + "tool_name": "text_editor:write", + "tool_args": { + "path": "/path/file.py", + "content": "import os\nprint('hello')\n" + } +} +~~~ + +#### text_editor:patch +apply line edits existing file +args path edits (array of {from, to, content}) +from and to are inclusive line numbers +{from:2, to:2, content:"x\n"} replace line 2 +{from:1, to:3, content:"x\n"} replace lines 1-3 +{from:2, to:2} delete line 2 (no content = delete) +{from:2, content:"x\n"} insert before line 2 (omit to = insert) +always original line numbers from read output dont adjust shifts +edits must not overlap +usage: +~~~json +{ + "thoughts": [ + "..." + ], + "headline": "...", + "tool_name": "text_editor:patch", + "tool_args": { + "path": "/path/file.py", + "edits": [ + {"from": 1, "content": "import sys\n"}, + {"from": 5, "to": 5, "content": " if x == 2:\n"} + ] + } +} +~~~ diff --git a/plugins/text_editor/prompts/fw.text_editor.patch_error.md b/plugins/text_editor/prompts/fw.text_editor.patch_error.md new file mode 100644 index 000000000..061afc03c --- /dev/null +++ b/plugins/text_editor/prompts/fw.text_editor.patch_error.md @@ -0,0 +1 @@ +error patching {{path}}: {{error}} diff --git a/plugins/text_editor/prompts/fw.text_editor.patch_ok.md b/plugins/text_editor/prompts/fw.text_editor.patch_ok.md new file mode 100644 index 000000000..8d54e3d97 --- /dev/null +++ b/plugins/text_editor/prompts/fw.text_editor.patch_ok.md @@ -0,0 +1 @@ +{{path}} patched {{edit_count}} edits applied, {{total_lines}} lines now diff --git a/plugins/text_editor/prompts/fw.text_editor.read_error.md b/plugins/text_editor/prompts/fw.text_editor.read_error.md new file mode 100644 index 000000000..14f26397c --- /dev/null +++ b/plugins/text_editor/prompts/fw.text_editor.read_error.md @@ -0,0 +1 @@ +error reading {{path}}: {{error}} diff --git a/plugins/text_editor/prompts/fw.text_editor.read_ok.md b/plugins/text_editor/prompts/fw.text_editor.read_ok.md new file mode 100644 index 000000000..a4659ae8a --- /dev/null +++ b/plugins/text_editor/prompts/fw.text_editor.read_ok.md @@ -0,0 +1,4 @@ +{{path}} {{total_lines}} lines{{warnings}} +>>> +{{content}} +<<< diff --git a/plugins/text_editor/prompts/fw.text_editor.write_error.md b/plugins/text_editor/prompts/fw.text_editor.write_error.md new file mode 100644 index 000000000..abf972235 --- /dev/null +++ b/plugins/text_editor/prompts/fw.text_editor.write_error.md @@ -0,0 +1 @@ +error writing {{path}}: {{error}} diff --git a/plugins/text_editor/prompts/fw.text_editor.write_ok.md b/plugins/text_editor/prompts/fw.text_editor.write_ok.md new file mode 100644 index 000000000..cb493f7f2 --- /dev/null +++ b/plugins/text_editor/prompts/fw.text_editor.write_ok.md @@ -0,0 +1 @@ +{{path}} written {{total_lines}} lines diff --git a/plugins/text_editor/tools/text_editor.py b/plugins/text_editor/tools/text_editor.py new file mode 100644 index 000000000..1f3707265 --- /dev/null +++ b/plugins/text_editor/tools/text_editor.py @@ -0,0 +1,145 @@ +import os + +from python.helpers.tool import Tool, Response +from python.helpers.extension import call_extensions +from plugins.text_editor.helpers.file_ops import ( + get_config, + read_file, + write_file, + validate_edits, + apply_patch, +) + + +class TextEditor(Tool): + + async def execute(self, **kwargs): + if self.method == "read": + return await self._read(**kwargs) + elif self.method == "write": + return await self._write(**kwargs) + elif self.method == "patch": + return await self._patch(**kwargs) + return Response( + message=f"unknown method '{self.name}:{self.method}'", + break_loop=False, + ) + + # ------------------------------------------------------------------ + # READ + # ------------------------------------------------------------------ + async def _read(self, path: str = "", **kwargs) -> Response: + if not path: + return self._error("read", path, "path is required") + + cfg = get_config(self.agent) + line_from = int(kwargs.get("line_from", 0)) + line_to = int(kwargs.get("line_to", 0)) + + result = read_file( + path, + line_from=line_from, + line_to=line_to, + max_line_tokens=cfg["max_line_tokens"], + default_line_count=cfg["default_line_count"], + max_total_read_tokens=cfg["max_total_read_tokens"], + ) + + if result.error: + return self._error("read", path, result.error) + + # Extension point + ext_data = {"content": result.content, "warnings": result.warnings} + await call_extensions( + "text_editor_read_after", agent=self.agent, data=ext_data + ) + + msg = self.agent.read_prompt( + "fw.text_editor.read_ok.md", + path=os.path.expanduser(path), + total_lines=str(result.total_lines), + warnings=ext_data["warnings"], + content=ext_data["content"], + ) + return Response(message=msg, break_loop=False) + + # ------------------------------------------------------------------ + # WRITE + # ------------------------------------------------------------------ + async def _write(self, path: str = "", content: str = "", **kwargs) -> Response: + if not path: + return self._error("write", path, "path is required") + + # Extension point + ext_data = {"path": path, "content": content} + await call_extensions( + "text_editor_write_before", agent=self.agent, data=ext_data + ) + + result = write_file(ext_data["path"], ext_data["content"]) + + if result.error: + return self._error("write", path, result.error) + + # Extension point + await call_extensions( + "text_editor_write_after", agent=self.agent, + data={"path": path, "total_lines": result.total_lines}, + ) + + msg = self.agent.read_prompt( + "fw.text_editor.write_ok.md", + path=os.path.expanduser(path), + total_lines=str(result.total_lines), + ) + return Response(message=msg, break_loop=False) + + # ------------------------------------------------------------------ + # PATCH + # ------------------------------------------------------------------ + async def _patch(self, path: str = "", edits=None, **kwargs) -> Response: + if not path: + return self._error("patch", path, "path is required") + + expanded = os.path.expanduser(path) + if not os.path.isfile(expanded): + return self._error("patch", path, "file not found") + + parsed, err = validate_edits(edits) + if err: + return self._error("patch", path, err) + + # Extension point + ext_data = {"path": expanded, "edits": parsed} + await call_extensions( + "text_editor_patch_before", agent=self.agent, data=ext_data + ) + + try: + total_lines = apply_patch(ext_data["path"], ext_data["edits"]) + except Exception as exc: + return self._error("patch", path, str(exc)) + + # Extension point + await call_extensions( + "text_editor_patch_after", agent=self.agent, + data={"path": expanded, "total_lines": total_lines}, + ) + + msg = self.agent.read_prompt( + "fw.text_editor.patch_ok.md", + path=expanded, + edit_count=str(len(ext_data["edits"])), + total_lines=str(total_lines), + ) + return Response(message=msg, break_loop=False) + + # ------------------------------------------------------------------ + # Shared error helper + # ------------------------------------------------------------------ + def _error(self, action: str, path: str, error: str) -> Response: + msg = self.agent.read_prompt( + f"fw.text_editor.{action}_error.md", path=path, error=error + ) + return Response(message=msg, break_loop=False) + \ No newline at end of file diff --git a/plugins/text_editor/webui/config.html b/plugins/text_editor/webui/config.html new file mode 100644 index 000000000..0ee726298 --- /dev/null +++ b/plugins/text_editor/webui/config.html @@ -0,0 +1,58 @@ + + + Text Editor + + + +
+ +
+ + +