Fail closed on unknown AI read commands

This commit is contained in:
rcourtman 2026-04-09 11:44:28 +01:00
parent 76c3f1ac88
commit 205376594b
3 changed files with 52 additions and 41 deletions

View file

@ -1343,13 +1343,10 @@ func TestExecutionIntent_ConservativeFallback(t *testing.T) {
}
}
// TestExecutionIntent_ModelTrustedFallback validates that truly unknown commands
// (unknown binaries, custom scripts) that pass all blocklist and structural checks
// are allowed with IntentReadOnlyConditional and a "model-trusted" reason.
//
// This is the Phase 6 behavior: trust the model's judgment for commands that
// aren't caught by any specific blocklist or structural guard.
func TestExecutionIntent_ModelTrustedFallback(t *testing.T) {
// TestExecutionIntent_FailClosedUnknownCommands validates that truly unknown
// commands are not allowed in pulse_read just because they passed structural
// checks. Unknown binaries, custom scripts, and downloads can mutate state.
func TestExecutionIntent_FailClosedUnknownCommands(t *testing.T) {
unknownCommands := []struct {
name string
command string
@ -1363,12 +1360,12 @@ func TestExecutionIntent_ModelTrustedFallback(t *testing.T) {
for _, tt := range unknownCommands {
t.Run(tt.name, func(t *testing.T) {
result := ClassifyExecutionIntent(tt.command)
if result.Intent != IntentReadOnlyConditional {
t.Errorf("ClassifyExecutionIntent(%q) = %v (reason: %s), want IntentReadOnlyConditional",
if result.Intent != IntentWriteOrUnknown {
t.Errorf("ClassifyExecutionIntent(%q) = %v (reason: %s), want IntentWriteOrUnknown",
tt.command, result.Intent, result.Reason)
}
if !strings.Contains(result.Reason, "model-trusted") {
t.Errorf("ClassifyExecutionIntent(%q) reason = %q, want 'model-trusted' in reason",
if !strings.Contains(result.Reason, "read-only allowlist") {
t.Errorf("ClassifyExecutionIntent(%q) reason = %q, want read-only allowlist reason",
tt.command, result.Reason)
}
})
@ -1905,10 +1902,10 @@ func TestExecutionIntent_RejectInteractiveREPL(t *testing.T) {
}
}
// TestExecutionIntent_AllowNonInteractiveREPL validates that commands with
// explicit non-interactive flags or inline commands are allowed.
func TestExecutionIntent_AllowNonInteractiveREPL(t *testing.T) {
// These should be allowed (non-interactive form)
// TestExecutionIntent_AllowNonInteractiveDataClients validates that data
// clients with explicit read-only inline operations are allowed.
func TestExecutionIntent_AllowNonInteractiveDataClients(t *testing.T) {
// These should be allowed because content inspection proves the operation is read-only.
allowedCommands := []struct {
cmd string
reason string
@ -1922,12 +1919,6 @@ func TestExecutionIntent_AllowNonInteractiveREPL(t *testing.T) {
{`psql --command "SELECT 1"`, "psql with --command"},
{"redis-cli GET mykey", "redis-cli with command"},
{"redis-cli -h localhost PING", "redis-cli with flags and command"},
{`python -c "print(1)"`, "python with -c"},
{"python script.py", "python with script"},
{"python3 /path/to/script.py", "python3 with script path"},
{`node -e "console.log(1)"`, "node with -e"},
{"node script.js", "node with script"},
{`irb -e "puts 1"`, "irb with -e"},
}
for _, tc := range allowedCommands {
@ -1941,6 +1932,35 @@ func TestExecutionIntent_AllowNonInteractiveREPL(t *testing.T) {
}
}
// TestExecutionIntent_BlocksDualUseInterpreterBypass validates that pulse_read
// fails closed on language runtimes and shells even when the inline code is
// non-interactive. These interpreters can perform arbitrary file I/O and spawn
// processes, so they are not read-only by construction.
func TestExecutionIntent_BlocksDualUseInterpreterBypass(t *testing.T) {
commands := []string{
`python3 -c "import os; open('/home/user/flag','w').write('done')"`,
`python3 -c "import subprocess; subprocess.Popen(['sleep', '100'])"`,
`python -c "print(1)"`,
`python script.py`,
`node -e "require('fs').writeFileSync('/tmp/flag', 'done')"`,
`node script.js`,
`ruby -e "File.write('/tmp/flag', 'done')"`,
`perl -e "open(my $fh, '>', '/tmp/flag')"`,
`bash -c "echo done"`,
`sh -c "echo done"`,
}
for _, cmd := range commands {
t.Run(cmd, func(t *testing.T) {
result := ClassifyExecutionIntent(cmd)
if result.Intent != IntentWriteOrUnknown {
t.Fatalf("ClassifyExecutionIntent(%q) = %v (reason: %s), want IntentWriteOrUnknown",
cmd, result.Intent, result.Reason)
}
})
}
}
// TestExecutionIntent_TelemetryCategories validates that blocking reasons
// use the expected categorical labels for telemetry.
func TestExecutionIntent_TelemetryCategories(t *testing.T) {

View file

@ -321,9 +321,11 @@ func classifySingleCommand(command string) IntentResult {
}
}
// === PHASE 6: Model-trusted fallback ===
// Unknown command passed all blocklist and structural checks → trust the model
return IntentResult{Intent: IntentReadOnlyConditional, Reason: "model-trusted: passed all blocklist and structural checks"}
// === PHASE 6: Fail closed ===
// Unknown commands may wrap interpreters, scripts, or side-effecting tools.
// pulse_read must only execute commands that are known read-only by construction
// or proven read-only by an explicit content inspector.
return IntentResult{Intent: IntentWriteOrUnknown, Reason: "unknown command is not on the read-only allowlist"}
}
// checkMutationCapabilityGuards checks for shell patterns that enable mutation
@ -612,10 +614,6 @@ func suggestNonInteractiveREPL(command, cmdLower string) string {
return fmt.Sprintf("%s \"SELECT ...\"", command)
case "redis-cli":
return fmt.Sprintf("%s PING", command)
case "python", "python3", "python2":
return fmt.Sprintf("%s -c \"...\"", parts[0])
case "node", "nodejs":
return fmt.Sprintf("%s -e \"...\"", parts[0])
case "ssh":
// ssh host → ssh host "command"
return fmt.Sprintf("%s \"ls -la\"", command)
@ -786,15 +784,13 @@ func hasTailShorthandBound(cmdLower string) bool {
// - ssh host (no command)
// - mysql, psql, sqlite3 db (no -c/-e/inline SQL)
// - redis-cli (no command args)
// - python, node, irb (no script/command)
// - python, node, irb
// - openssl s_client
//
// Allowed (non-interactive):
// - ssh host "command"
// - mysql -e "SELECT 1"
// - sqlite3 db "SELECT 1"
// - python -c "print(1)"
// - python script.py
func isInteractiveREPL(cmdLower string) bool {
firstWord := cmdLower
if spaceIdx := strings.Index(cmdLower, " "); spaceIdx > 0 {
@ -892,7 +888,9 @@ func isInteractiveREPL(cmdLower string) bool {
return !hasCommand
}
// Scripting REPLs: python, node, irb without script/command
// Scripting language runtimes are dual-use. Bare invocations are also
// interactive; non-interactive forms are blocked by the fail-closed
// read-only allowlist boundary.
if firstWord == "python" || firstWord == "python3" || firstWord == "python2" {
// Non-interactive if has -c or script file
if strings.Contains(cmdLower, " -c ") || strings.Contains(cmdLower, " -c\"") {

View file

@ -116,16 +116,9 @@ func (e *PulseToolExecutor) executeReadExec(ctx context.Context, args map[string
// STRUCTURAL ENFORCEMENT: Reject non-read-only commands at the tool layer
// This is enforced HERE, not in the model's prompt
// Uses ExecutionIntent: ReadOnlyCertain and ReadOnlyConditional are allowed;
// WriteOrUnknown is rejected.
// Uses ExecutionIntent: ReadOnlyCertain and explicitly inspected
// ReadOnlyConditional commands are allowed; WriteOrUnknown is rejected.
intentResult := ClassifyExecutionIntent(command)
if intentResult.Intent == IntentReadOnlyConditional && strings.Contains(intentResult.Reason, "model-trusted") {
log.Info().
Str("command", truncateCommand(command, 200)).
Str("reason", intentResult.Reason).
Str("target_host", targetHost).
Msg("pulse_read: allowing model-trusted command (no blocklist match)")
}
if intentResult.Intent == IntentWriteOrUnknown {
hint := GetReadOnlyViolationHint(command, intentResult)
alternative := "Use pulse_control type=command for write operations"