refactor: extract duplicated JSON parsing helpers in kamatera and cherry providers (#299)

Kamatera: Extract _kamatera_queue_field and _extract_kamatera_wan_ip helpers
to deduplicate inline Python blocks in wait_for_command (49->33 lines) and
get_kamatera_server_ip (49->26 lines).

Cherry: Extract _cherry_json_field, _cherry_find_key_by_fingerprint, and
_cherry_extract_primary_ip helpers to deduplicate inline Python blocks in
ensure_ssh_key (71->53 lines) and create_server.

Agent: complexity-hunter

Co-authored-by: A <6723574+louisgv@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
A 2026-02-10 16:14:30 -08:00 committed by GitHub
parent f4f02825a3
commit 1be86da79f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 94 additions and 84 deletions

View file

@ -18,6 +18,57 @@ CHERRY_DEFAULT_PLAN="${CHERRY_DEFAULT_PLAN:-cloud_vps_1}"
CHERRY_DEFAULT_REGION="${CHERRY_DEFAULT_REGION:-eu_nord_1}"
CHERRY_DEFAULT_IMAGE="${CHERRY_DEFAULT_IMAGE:-Ubuntu 24.04 64bit}"
# ============================================================
# JSON Helpers
# ============================================================
# Extract a field from a JSON object via stdin
# Usage: echo '{"id": 123}' | _cherry_json_field "id"
_cherry_json_field() {
local field="$1"
python3 -c "
import sys, json
try:
data = json.load(sys.stdin)
print(data.get('$field', ''))
except:
pass
" 2>&1
}
# Find an SSH key ID by fingerprint from a JSON array of keys
# Usage: echo '[{"fingerprint":"...","id":1}]' | _cherry_find_key_by_fingerprint "aa:bb:..."
_cherry_find_key_by_fingerprint() {
local fingerprint="$1"
python3 -c "
import sys, json
try:
keys = json.load(sys.stdin)
for key in keys:
if key.get('fingerprint', '') == '$fingerprint':
print(key.get('id', ''))
break
except:
pass
" 2>&1
}
# Extract the primary IP address from a Cherry server info response
# Usage: echo '{"ip_addresses":[...]}' | _cherry_extract_primary_ip
_cherry_extract_primary_ip() {
python3 -c "
import sys, json
try:
data = json.load(sys.stdin)
for addr in data.get('ip_addresses', []):
if addr.get('type') == 'primary-ip':
print(addr.get('address', ''))
break
except:
pass
" 2>&1
}
# ============================================================
# Authentication
# ============================================================
@ -68,18 +119,7 @@ ensure_ssh_key() {
# Check if our key is already registered
local key_id
key_id=$(printf '%s' "$existing_keys" | python3 -c "
import sys, json
try:
keys = json.load(sys.stdin)
fingerprint = '${key_fingerprint}'
for key in keys:
if key.get('fingerprint', '') == fingerprint:
print(key.get('id', ''))
break
except:
pass
" 2>&1)
key_id=$(printf '%s' "$existing_keys" | _cherry_find_key_by_fingerprint "$key_fingerprint")
if [[ -n "$key_id" ]]; then
log_info "SSH key already registered (ID: $key_id)"
@ -99,14 +139,7 @@ except:
-d "{\"label\": \"$label\", \"key\": \"$ssh_pub_key\"}" \
"${CHERRY_API_BASE}/ssh-keys" 2>&1)
key_id=$(printf '%s' "$response" | python3 -c "
import sys, json
try:
data = json.load(sys.stdin)
print(data.get('id', ''))
except:
pass
" 2>&1)
key_id=$(printf '%s' "$response" | _cherry_json_field "id")
if [[ -z "$key_id" ]]; then
log_error "Failed to register SSH key"
@ -140,8 +173,7 @@ try:
data = json.load(sys.stdin)
if isinstance(data, list) and len(data) > 0:
print(data[0].get('id', ''))
except:
pass
except: pass
" 2>&1)
if [[ -z "$project_id" ]]; then
@ -201,14 +233,7 @@ print(json.dumps(data))
"${CHERRY_API_BASE}/projects/${project_id}/servers" 2>&1)
local server_id
server_id=$(printf '%s' "$response" | python3 -c "
import sys, json
try:
data = json.load(sys.stdin)
print(data.get('id', ''))
except:
pass
" 2>&1)
server_id=$(printf '%s' "$response" | _cherry_json_field "id")
if [[ -z "$server_id" ]]; then
log_error "Failed to create server"
@ -235,18 +260,7 @@ except:
-H "Content-Type: application/json" \
"${CHERRY_API_BASE}/servers/${server_id}" 2>&1)
ip_address=$(printf '%s' "$server_info" | python3 -c "
import sys, json
try:
data = json.load(sys.stdin)
addresses = data.get('ip_addresses', [])
for addr in addresses:
if addr.get('type') == 'primary-ip':
print(addr.get('address', ''))
break
except:
pass
" 2>&1)
ip_address=$(printf '%s' "$server_info" | _cherry_extract_primary_ip)
attempts=$((attempts + 1))
done

View file

@ -134,6 +134,22 @@ get_server_name() {
echo "$server_name"
}
# Extract a field from a Kamatera queue response (handles both list and dict responses)
# Usage: _kamatera_queue_field JSON_DATA FIELD_NAME [DEFAULT]
_kamatera_queue_field() {
local json_data="$1" field="$2" default="${3:-}"
python3 -c "
import json, sys
data = json.loads(sys.stdin.read())
if isinstance(data, list) and len(data) > 0:
print(data[0].get('$field', '$default'))
elif isinstance(data, dict):
print(data.get('$field', '$default'))
else:
print('$default')
" <<< "$json_data" 2>/dev/null
}
# Wait for an async Kamatera command to complete
# Kamatera API returns command IDs for long-running operations
# We poll the queue endpoint until the command completes
@ -150,16 +166,7 @@ wait_for_command() {
queue_response=$(kamatera_api GET "/service/queue?id=${command_ids}")
local status
status=$(python3 -c "
import json, sys
data = json.loads(sys.stdin.read())
if isinstance(data, list) and len(data) > 0:
print(data[0].get('status', ''))
elif isinstance(data, dict):
print(data.get('status', ''))
else:
print('')
" <<< "$queue_response" 2>/dev/null)
status=$(_kamatera_queue_field "$queue_response" "status")
if [[ "$status" == "complete" ]]; then
log_info "Command completed successfully"
@ -167,14 +174,7 @@ else:
return 0
elif [[ "$status" == "error" ]]; then
local error_log
error_log=$(python3 -c "
import json, sys
data = json.loads(sys.stdin.read())
if isinstance(data, list) and len(data) > 0:
print(data[0].get('log', 'Unknown error'))
elif isinstance(data, dict):
print(data.get('log', 'Unknown error'))
" <<< "$queue_response" 2>/dev/null)
error_log=$(_kamatera_queue_field "$queue_response" "log" "Unknown error")
log_error "Command failed: $error_log"
return 1
fi
@ -219,6 +219,25 @@ else:
" <<< "$response" 2>/dev/null
}
# Extract WAN IP from a Kamatera server info response
# Falls back to any IP if the server is powered on
# Usage: _extract_kamatera_wan_ip JSON_DATA
_extract_kamatera_wan_ip() {
python3 -c "
import json, sys
data = json.loads(sys.stdin.read())
server = data[0] if isinstance(data, list) and len(data) > 0 else data
networks = server.get('networks', [])
for net in networks:
if net.get('network', '').startswith('wan') and net.get('ips'):
print(net['ips'][0]); sys.exit(0)
if server.get('power') == 'on':
for net in networks:
if net.get('ips'):
print(net['ips'][0]); sys.exit(0)
" <<< "$1" 2>/dev/null
}
# Poll Kamatera server info until a WAN IP address is available
# Sets: KAMATERA_SERVER_IP
# Usage: get_kamatera_server_ip SERVER_NAME [MAX_ATTEMPTS]
@ -232,30 +251,7 @@ get_kamatera_server_ip() {
local info_response
info_response=$(kamatera_api POST "/service/server/info" "{\"name\":\"$name\"}")
KAMATERA_SERVER_IP=$(python3 -c "
import json, sys
data = json.loads(sys.stdin.read())
if isinstance(data, list) and len(data) > 0:
server = data[0]
else:
server = data
networks = server.get('networks', [])
for net in networks:
net_name = net.get('network', '')
if net_name.startswith('wan'):
ips = net.get('ips', [])
if ips:
print(ips[0])
sys.exit(0)
# Fallback: try power_on field or any IP
power = server.get('power', '')
if power == 'on':
for net in networks:
ips = net.get('ips', [])
if ips:
print(ips[0])
sys.exit(0)
" <<< "$info_response" 2>/dev/null)
KAMATERA_SERVER_IP=$(_extract_kamatera_wan_ip "$info_response")
if [[ -n "$KAMATERA_SERVER_IP" ]]; then
export KAMATERA_SERVER_IP