mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-04-28 11:40:47 +00:00
- Add WsHandler base class, WsManager (connection tracking / event routing / buffering), WsResult - Extract network.py (is_loopback_address) and context_utils.py (use_context) to eliminate duplication - Migrate three handlers to api/ following the ws_* py naming convention - Simplify run_ui.py WebSocket init from ~170 lines to ~10 - Update import paths in api.py, plugins.py, state_monitor.py
31 lines
969 B
Python
31 lines
969 B
Python
import socket
|
|
import struct
|
|
|
|
|
|
def is_loopback_address(address: str) -> bool:
|
|
"""Check whether *address* resolves to a loopback interface."""
|
|
_checkers = {
|
|
socket.AF_INET: lambda x: (
|
|
struct.unpack("!I", socket.inet_aton(x))[0] >> (32 - 8)
|
|
) == 127,
|
|
socket.AF_INET6: lambda x: x == "::1",
|
|
}
|
|
try:
|
|
socket.inet_pton(socket.AF_INET6, address)
|
|
return _checkers[socket.AF_INET6](address)
|
|
except socket.error:
|
|
pass
|
|
try:
|
|
socket.inet_pton(socket.AF_INET, address)
|
|
return _checkers[socket.AF_INET](address)
|
|
except socket.error:
|
|
pass
|
|
for family in (socket.AF_INET, socket.AF_INET6):
|
|
try:
|
|
r = socket.getaddrinfo(address, None, family, socket.SOCK_STREAM)
|
|
except socket.gaierror:
|
|
return False
|
|
for fam, _, _, _, sockaddr in r:
|
|
if not _checkers[fam](sockaddr[0]):
|
|
return False
|
|
return True
|