refactor: applied coderabbit refactor suggestion

This commit is contained in:
Muhamad Aji Wibisono 2025-06-11 00:02:54 +07:00
parent 5a39920e40
commit 3022e44803

View file

@ -1,5 +1,27 @@
import os import os
def _parse_bool(value):
"""Parse boolean value from string."""
return value.lower() == "true" if value else False
def _parse_int(value, var_name):
"""Parse integer value with error handling."""
try:
return int(value)
except ValueError:
raise ValueError(f"Invalid integer value for {var_name}: {value}")
def _parse_headers(value):
"""Parse headers from comma-separated string."""
try:
return [
tuple(h.split(":", 1))
for h in value.split(",")
if ":" in h
]
except Exception:
raise ValueError(f"Invalid headers format: {value}")
def load_uvicorn_config(args=None): def load_uvicorn_config(args=None):
""" """
@ -15,78 +37,46 @@ def load_uvicorn_config(args=None):
reload_dirs=["app"] if (args and args.reload) else None, reload_dirs=["app"] if (args and args.reload) else None,
) )
# Only add advanced args if set in env # Configuration mapping for advanced options
if os.getenv("UVICORN_PROXY_HEADERS"): config_mapping = {
config_kwargs["proxy_headers"] = ( "UVICORN_PROXY_HEADERS": ("proxy_headers", _parse_bool),
os.getenv("UVICORN_PROXY_HEADERS").lower() == "true" "UVICORN_FORWARDED_ALLOW_IPS": ("forwarded_allow_ips", str),
) "UVICORN_WORKERS": ("workers", lambda x: _parse_int(x, "UVICORN_WORKERS")),
if os.getenv("UVICORN_FORWARDED_ALLOW_IPS"): "UVICORN_ACCESS_LOG": ("access_log", _parse_bool),
config_kwargs["forwarded_allow_ips"] = os.getenv("UVICORN_FORWARDED_ALLOW_IPS") "UVICORN_LOOP": ("loop", str),
if os.getenv("UVICORN_WORKERS"): "UVICORN_HTTP": ("http", str),
config_kwargs["workers"] = int(os.getenv("UVICORN_WORKERS")) "UVICORN_WS": ("ws", str),
if os.getenv("UVICORN_ACCESS_LOG"): "UVICORN_LIFESPAN": ("lifespan", str),
config_kwargs["access_log"] = os.getenv("UVICORN_ACCESS_LOG").lower() == "true" "UVICORN_ENV_FILE": ("env_file", str),
if os.getenv("UVICORN_LOOP"): "UVICORN_LOG_CONFIG": ("log_config", str),
config_kwargs["loop"] = os.getenv("UVICORN_LOOP") "UVICORN_SERVER_HEADER": ("server_header", _parse_bool),
if os.getenv("UVICORN_HTTP"): "UVICORN_DATE_HEADER": ("date_header", _parse_bool),
config_kwargs["http"] = os.getenv("UVICORN_HTTP") "UVICORN_LIMIT_CONCURRENCY": ("limit_concurrency", lambda x: _parse_int(x, "UVICORN_LIMIT_CONCURRENCY")),
if os.getenv("UVICORN_WS"): "UVICORN_LIMIT_MAX_REQUESTS": ("limit_max_requests", lambda x: _parse_int(x, "UVICORN_LIMIT_MAX_REQUESTS")),
config_kwargs["ws"] = os.getenv("UVICORN_WS") "UVICORN_TIMEOUT_KEEP_ALIVE": ("timeout_keep_alive", lambda x: _parse_int(x, "UVICORN_TIMEOUT_KEEP_ALIVE")),
if os.getenv("UVICORN_LIFESPAN"): "UVICORN_TIMEOUT_NOTIFY": ("timeout_notify", lambda x: _parse_int(x, "UVICORN_TIMEOUT_NOTIFY")),
config_kwargs["lifespan"] = os.getenv("UVICORN_LIFESPAN") "UVICORN_SSL_KEYFILE": ("ssl_keyfile", str),
if os.getenv("UVICORN_ENV_FILE"): "UVICORN_SSL_CERTFILE": ("ssl_certfile", str),
config_kwargs["env_file"] = os.getenv("UVICORN_ENV_FILE") "UVICORN_SSL_KEYFILE_PASSWORD": ("ssl_keyfile_password", str),
if os.getenv("UVICORN_LOG_CONFIG"): "UVICORN_SSL_VERSION": ("ssl_version", lambda x: _parse_int(x, "UVICORN_SSL_VERSION")),
config_kwargs["log_config"] = os.getenv("UVICORN_LOG_CONFIG") "UVICORN_SSL_CERT_REQS": ("ssl_cert_reqs", lambda x: _parse_int(x, "UVICORN_SSL_CERT_REQS")),
if os.getenv("UVICORN_SERVER_HEADER"): "UVICORN_SSL_CA_CERTS": ("ssl_ca_certs", str),
config_kwargs["server_header"] = ( "UVICORN_SSL_CIPHERS": ("ssl_ciphers", str),
os.getenv("UVICORN_SERVER_HEADER").lower() == "true" "UVICORN_HEADERS": ("headers", _parse_headers),
) "UVICORN_USE_COLORS": ("use_colors", _parse_bool),
if os.getenv("UVICORN_DATE_HEADER"): "UVICORN_UDS": ("uds", str),
config_kwargs["date_header"] = ( "UVICORN_FD": ("fd", lambda x: _parse_int(x, "UVICORN_FD")),
os.getenv("UVICORN_DATE_HEADER").lower() == "true" "UVICORN_ROOT_PATH": ("root_path", str),
) }
if os.getenv("UVICORN_LIMIT_CONCURRENCY"):
config_kwargs["limit_concurrency"] = int(os.getenv("UVICORN_LIMIT_CONCURRENCY")) # Process advanced configuration options
if os.getenv("UVICORN_LIMIT_MAX_REQUESTS"): for env_var, (config_key, parser) in config_mapping.items():
config_kwargs["limit_max_requests"] = int( value = os.getenv(env_var)
os.getenv("UVICORN_LIMIT_MAX_REQUESTS") if value:
) try:
if os.getenv("UVICORN_TIMEOUT_KEEP_ALIVE"): config_kwargs[config_key] = parser(value)
config_kwargs["timeout_keep_alive"] = int( except ValueError as e:
os.getenv("UVICORN_TIMEOUT_KEEP_ALIVE") raise ValueError(f"Configuration error for {env_var}: {e}")
)
if os.getenv("UVICORN_TIMEOUT_NOTIFY"):
config_kwargs["timeout_notify"] = int(os.getenv("UVICORN_TIMEOUT_NOTIFY"))
if os.getenv("UVICORN_SSL_KEYFILE"):
config_kwargs["ssl_keyfile"] = os.getenv("UVICORN_SSL_KEYFILE")
if os.getenv("UVICORN_SSL_CERTFILE"):
config_kwargs["ssl_certfile"] = os.getenv("UVICORN_SSL_CERTFILE")
if os.getenv("UVICORN_SSL_KEYFILE_PASSWORD"):
config_kwargs["ssl_keyfile_password"] = os.getenv(
"UVICORN_SSL_KEYFILE_PASSWORD"
)
if os.getenv("UVICORN_SSL_VERSION"):
config_kwargs["ssl_version"] = int(os.getenv("UVICORN_SSL_VERSION"))
if os.getenv("UVICORN_SSL_CERT_REQS"):
config_kwargs["ssl_cert_reqs"] = int(os.getenv("UVICORN_SSL_CERT_REQS"))
if os.getenv("UVICORN_SSL_CA_CERTS"):
config_kwargs["ssl_ca_certs"] = os.getenv("UVICORN_SSL_CA_CERTS")
if os.getenv("UVICORN_SSL_CIPHERS"):
config_kwargs["ssl_ciphers"] = os.getenv("UVICORN_SSL_CIPHERS")
if os.getenv("UVICORN_HEADERS"):
config_kwargs["headers"] = [
tuple(h.split(":", 1))
for h in os.getenv("UVICORN_HEADERS").split(",")
if ":" in h
]
if os.getenv("UVICORN_USE_COLORS"):
config_kwargs["use_colors"] = os.getenv("UVICORN_USE_COLORS").lower() == "true"
if os.getenv("UVICORN_UDS"):
config_kwargs["uds"] = os.getenv("UVICORN_UDS")
if os.getenv("UVICORN_FD"):
config_kwargs["fd"] = int(os.getenv("UVICORN_FD"))
if os.getenv("UVICORN_ROOT_PATH"):
config_kwargs["root_path"] = os.getenv("UVICORN_ROOT_PATH")
return config_kwargs return config_kwargs