free-claude-code/tests/cli/test_cli.py
Alishahryar1 f3a7528d49
Some checks are pending
CI / checks (push) Waiting to run
Major refactor: API, providers, messaging, and Anthropic protocol
Consolidates the incremental refactor work into a single change set: modular web tools (api/web_tools), native Anthropic request building and SSE block policy, OpenAI conversion and error handling, provider transports and rate limiting, messaging handler and tree queue, safe logging, smoke tests, and broad test coverage.
2026-04-26 03:01:14 -07:00

652 lines
22 KiB
Python

"""Tests for cli/ module."""
import asyncio
import json
import os
from typing import cast
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from messaging.event_parser import parse_cli_event
# --- Existing Parser Tests ---
class TestCLIParser:
"""Test CLI event parsing."""
def test_parse_text_content(self):
"""Test parsing text content from assistant message."""
event = {
"type": "assistant",
"message": {"content": [{"type": "text", "text": "Hello, world!"}]},
}
result = parse_cli_event(event)
assert len(result) == 1
assert result[0]["type"] == "text_chunk"
assert result[0]["text"] == "Hello, world!"
def test_parse_thinking_content(self):
"""Test parsing thinking content."""
event = {
"type": "assistant",
"message": {
"content": [{"type": "thinking", "thinking": "Let me think..."}]
},
}
result = parse_cli_event(event)
assert len(result) == 1
assert result[0]["type"] == "thinking_chunk"
assert (
result[0]["text"] == "Let me think...\n"
or result[0]["text"] == "Let me think..."
)
def test_parse_multiple_content(self):
"""Test parsing mixed content (thinking + tools)."""
event = {
"type": "assistant",
"message": {
"content": [
{"type": "thinking", "thinking": "Thinking..."},
{"type": "tool_use", "name": "ls", "input": {}},
]
},
}
result = parse_cli_event(event)
assert len(result) == 2
assert result[0]["type"] == "thinking_chunk"
assert result[0]["text"] == "Thinking..."
assert result[1]["type"] == "tool_use"
def test_parse_tool_use(self):
"""Test parsing tool use content."""
event = {
"type": "assistant",
"message": {
"content": [
{
"type": "tool_use",
"name": "read_file",
"input": {"path": "/test"},
}
]
},
}
result = parse_cli_event(event)
assert len(result) == 1
assert result[0]["type"] == "tool_use"
assert result[0]["name"] == "read_file"
assert result[0]["input"] == {"path": "/test"}
def test_parse_text_delta(self):
"""Test parsing streaming text delta."""
event = {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": "streaming text"},
}
result = parse_cli_event(event)
assert len(result) == 1
assert result[0]["type"] == "text_delta"
assert result[0]["text"] == "streaming text"
def test_parse_thinking_delta(self):
"""Test parsing streaming thinking delta."""
event = {
"type": "content_block_delta",
"index": 1,
"delta": {"type": "thinking_delta", "thinking": "thinking..."},
}
result = parse_cli_event(event)
assert len(result) == 1
assert result[0]["type"] == "thinking_delta"
assert result[0]["text"] == "thinking..."
def test_parse_error(self):
"""Test parsing error event."""
event = {"type": "error", "error": {"message": "Something went wrong"}}
result = parse_cli_event(event)
assert result[0]["type"] == "error"
assert result[0]["message"] == "Something went wrong"
def test_parse_exit_success(self):
"""Test parsing exit event with success."""
event = {"type": "exit", "code": 0}
result = parse_cli_event(event)
assert result[0]["type"] == "complete"
assert result[0]["status"] == "success"
def test_parse_exit_failure(self):
"""Test parsing exit event with failure returns error then complete."""
event = {"type": "exit", "code": 1}
result = parse_cli_event(event)
# Non-zero exit now returns error first, then complete
assert len(result) == 2
assert result[0]["type"] == "error"
assert (
"exit" in result[0]["message"].lower()
or "code" in result[0]["message"].lower()
)
assert result[1]["type"] == "complete"
assert result[1]["status"] == "failed"
def test_parse_invalid_event(self):
"""Test parsing returns empty list for unrecognized event."""
result = parse_cli_event({"type": "unknown"})
assert result == []
def test_parse_non_dict(self):
"""Test parsing returns empty list for non-dict input."""
result = parse_cli_event("not a dict")
assert result == []
# --- CLI Session Tests ---
class TestCLISession:
"""Test CLISession."""
def test_session_init(self):
"""Test CLISession initialization."""
from cli.session import CLISession
session = CLISession(
workspace_path="/tmp/test",
api_url="http://localhost:8082/v1",
allowed_dirs=["/home/user/projects"],
)
assert session.workspace == os.path.normpath(os.path.abspath("/tmp/test"))
assert session.api_url == "http://localhost:8082/v1"
assert not session.is_busy
def test_session_extract_session_id(self):
"""Test session ID extraction from various event formats."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
# Direct session_id field
assert session._extract_session_id({"session_id": "abc123"}) == "abc123"
assert session._extract_session_id({"sessionId": "abc123"}) == "abc123"
# Nested in init
assert (
session._extract_session_id({"init": {"session_id": "nested123"}})
== "nested123"
)
# Nested in result
assert (
session._extract_session_id({"result": {"session_id": "res123"}})
== "res123"
)
# Conversation id
assert (
session._extract_session_id({"conversation": {"id": "conv123"}})
== "conv123"
)
# No session ID
assert session._extract_session_id({"type": "message"}) is None
assert session._extract_session_id("not a dict") is None
@pytest.mark.asyncio
async def test_start_task_basic_flow(self):
"""Test start_task running a basic command flow."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
# Mock subprocess
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [
b'{"type": "message", "content": "Hello"}\n',
b'{"session_id": "sess_1"}\n',
b"", # EOF
]
mock_process.stderr.read.return_value = b"" # No error
mock_process.wait.return_value = 0
mock_process.returncode = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
events = [e async for e in session.start_task("Hello")]
# Verify command construction
# Arg 1 is subprocess command
args = mock_exec.call_args[0]
assert args[0] == "claude"
assert "-p" in args
assert "Hello" in args
# Verify events
assert (
len(events) == 4
) # message, session_id, session_info (synthesized), exit
assert events[0] == {"type": "message", "content": "Hello"}
assert events[1] == {"type": "session_info", "session_id": "sess_1"}
# The session_info event is yielded by _handle_line_gen right after extracting ID
assert events[2] == {"session_id": "sess_1"} # The original event
assert events[3] == {"type": "exit", "code": 0, "stderr": None}
assert session.current_session_id == "sess_1"
@pytest.mark.asyncio
async def test_start_task_with_session_resume(self):
"""Test resuming an existing session."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [
b"",
] # Immediate EOF
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
async for _ in session.start_task("Hello", session_id="sess_abc"):
pass
args = mock_exec.call_args[0]
assert "--resume" in args
assert "sess_abc" in args
assert "--fork-session" not in args
@pytest.mark.asyncio
async def test_start_task_with_session_resume_and_fork(self):
"""Test resuming an existing session and forking."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b""] # Immediate EOF
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
async for _ in session.start_task(
"Hello", session_id="sess_abc", fork_session=True
):
pass
args = mock_exec.call_args[0]
assert "--resume" in args
assert "sess_abc" in args
assert "--fork-session" in args
@pytest.mark.asyncio
async def test_start_task_process_failure_with_stderr(self):
"""Test process exit with error code and stderr output."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b""] # No stdout
mock_process.stderr.read.side_effect = [b"Fatal error", b""]
mock_process.wait.return_value = 1
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
events = [e async for e in session.start_task("Hello")]
# Should have error event from stderr, then exit event
assert len(events) == 2
assert events[0]["type"] == "error"
assert events[0]["error"]["message"] == "Fatal error"
assert events[1]["type"] == "exit"
assert events[1]["code"] == 1
assert events[1]["stderr"] == "Fatal error"
@pytest.mark.asyncio
async def test_start_task_stderr_while_stdout_streams(self):
"""Stderr is drained concurrently so stdout streaming is not blocked."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [
b'{"type": "message", "content": "Hi"}\n',
b"",
]
mock_process.stderr.read.side_effect = [b"warning on stderr\n", b""]
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
events = [e async for e in session.start_task("Hello")]
assert mock_process.stderr.read.await_count >= 2
err_events = [e for e in events if e.get("type") == "error"]
assert len(err_events) == 1
assert "warning on stderr" in err_events[0]["error"]["message"]
assert events[-1]["type"] == "exit"
assert events[-1]["code"] == 0
@pytest.mark.asyncio
async def test_drain_stderr_bounded_retains_cap_but_drains_to_eof(self):
"""Oversized stderr is fully drained so the pipe cannot deadlock; capture is bounded."""
from cli.session import _MAX_STDERR_CAPTURE_BYTES, CLISession
total_len = _MAX_STDERR_CAPTURE_BYTES + 100_000
remaining: dict[str, int] = {"n": total_len}
class _FakeStderr:
async def read(self, n: int = 65536) -> bytes:
left = remaining["n"]
if left <= 0:
return b""
take = min(n, left)
remaining["n"] = left - take
return b"y" * take
class _FakeProcess:
stderr = _FakeStderr()
out = await CLISession._drain_stderr_bounded(
cast(asyncio.subprocess.Process, _FakeProcess())
)
assert len(out) == _MAX_STDERR_CAPTURE_BYTES
assert out == b"y" * _MAX_STDERR_CAPTURE_BYTES
assert remaining["n"] == 0
@pytest.mark.asyncio
async def test_stop_session(self):
"""Test stopping the session process."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = MagicMock()
mock_process.returncode = None # Running
# Mock wait to simulate async finish
mock_process.wait = AsyncMock(return_value=0)
session.process = mock_process
stopped = await session.stop()
assert stopped is True
mock_process.terminate.assert_called_once()
mock_process.wait.assert_called()
@pytest.mark.asyncio
async def test_stop_session_timeout_force_kill(self):
"""Test force kill if terminate times out."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = MagicMock()
mock_process.returncode = None
# First wait times out
async def wait_side_effect():
if not mock_process.kill.called:
await asyncio.sleep(6) # Should be > 5.0 timeout
return 0
# We can simulate timeout by raising TimeoutError directly on first call
mock_process.wait = AsyncMock(side_effect=[asyncio.TimeoutError, 0])
session.process = mock_process
stopped = await session.stop()
assert stopped is True
mock_process.terminate.assert_called()
mock_process.kill.assert_called()
@pytest.mark.asyncio
async def test_start_task_split_buffer(self):
"""Test handling of JSON split across chunks."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
# Split json: {"type": "mess... age"}
mock_process.stdout.read.side_effect = [
b'{"type": "mess',
b'age", "content": "Split"}\n',
b"",
]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
events = [
e async for e in session.start_task("test") if e["type"] == "message"
]
assert len(events) == 1
assert events[0]["content"] == "Split"
@pytest.mark.asyncio
async def test_start_task_remnant_buffer(self):
"""Test handling of buffer remnant at EOF (no newline at end)."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [
b'{"type": "message", "content": "Remnant"}', # No newline
b"",
]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
events = [
e async for e in session.start_task("test") if e["type"] == "message"
]
assert len(events) == 1
assert events[0]["content"] == "Remnant"
@pytest.mark.asyncio
async def test_start_task_non_v1_url(self):
"""Test start_task with a non-v1 URL."""
from cli.session import CLISession
# URL not ending in /v1
session = CLISession("/tmp", "http://localhost:8082")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b""]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
async for _ in session.start_task("test"):
pass
# Check env var
kwargs = mock_exec.call_args[1]
env = kwargs["env"]
assert env["ANTHROPIC_BASE_URL"] == "http://localhost:8082"
@pytest.mark.asyncio
async def test_start_task_allowed_dirs(self):
"""Test start_task includes allowed dirs in command."""
from cli.session import CLISession
session = CLISession(
"/tmp", "http://localhost:8082/v1", allowed_dirs=["/dir1", "/dir2"]
)
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b""]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
async for _ in session.start_task("test"):
pass
cmd = mock_exec.call_args[0]
assert "--add-dir" in cmd
assert os.path.normpath("/dir1") in cmd
assert os.path.normpath("/dir2") in cmd
@pytest.mark.asyncio
async def test_start_task_plans_directory(self):
"""Test start_task includes --settings plansDirectory when plans_directory set."""
from cli.session import CLISession
session = CLISession(
"/tmp",
"http://localhost:8082/v1",
plans_directory="./agent_workspace/plans",
)
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b""]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
async for _ in session.start_task("test"):
pass
cmd = mock_exec.call_args[0]
assert "--settings" in cmd
settings_idx = cmd.index("--settings")
assert settings_idx + 1 < len(cmd)
settings = json.loads(cmd[settings_idx + 1])
assert settings["plansDirectory"] == "./agent_workspace/plans"
@pytest.mark.asyncio
async def test_start_task_json_error(self):
"""Test handling of non-JSON output from CLI."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = AsyncMock()
mock_process.stdout.read.side_effect = [b"Not valid json\n", b""]
mock_process.stderr.read.return_value = b""
mock_process.wait.return_value = 0
with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = mock_process
events = [e async for e in session.start_task("test") if e["type"] == "raw"]
assert len(events) == 1
assert events[0]["content"] == "Not valid json"
@pytest.mark.asyncio
async def test_stop_exception(self):
"""Test exception handling during stop."""
from cli.session import CLISession
session = CLISession("/tmp", "http://localhost:8082/v1")
mock_process = MagicMock()
mock_process.returncode = None
# Raise exception on terminate
mock_process.terminate.side_effect = RuntimeError("Permission denied")
session.process = mock_process
stopped = await session.stop()
assert stopped is False
class TestCLISessionManager:
"""Test CLISessionManager."""
@pytest.mark.asyncio
async def test_manager_create_session(self):
"""Test creating a new session."""
from cli.manager import CLISessionManager
manager = CLISessionManager(
workspace_path="/tmp/test",
api_url="http://localhost:8082/v1",
)
session, sid, is_new = await manager.get_or_create_session()
assert session is not None
assert sid.startswith("pending_")
assert is_new is True
@pytest.mark.asyncio
async def test_manager_reuse_session(self):
"""Test reusing an existing session."""
from cli.manager import CLISessionManager
manager = CLISessionManager(
workspace_path="/tmp/test",
api_url="http://localhost:8082/v1",
)
# Create first session
s1, sid1, _is_new1 = await manager.get_or_create_session()
# Request same session
s2, _sid2, is_new2 = await manager.get_or_create_session(session_id=sid1)
assert s1 is s2
assert is_new2 is False
@pytest.mark.asyncio
async def test_manager_stats(self):
"""Test manager stats."""
from cli.manager import CLISessionManager
manager = CLISessionManager(
workspace_path="/tmp/test",
api_url="http://localhost:8082/v1",
)
stats = manager.get_stats()
assert stats["active_sessions"] == 0
assert stats["pending_sessions"] == 0