support 1 level deep admindir

This commit is contained in:
Concedo 2026-03-02 16:23:34 +08:00
parent d904b51b0f
commit d7fb3df10a

View file

@ -4010,11 +4010,21 @@ Change Mode<br>
caps = get_capabilities()
response_body = (json.dumps(caps).encode())
elif clean_path.endswith(('/api/admin/list_options')): #used by admin to get info about a kcpp instance
elif clean_path.endswith(('/api/admin/list_options')): # used by admin to get info about a kcpp instance
opts = []
if args.admin and args.admindir and os.path.exists(args.admindir) and self.check_header_password(args.adminpassword):
dirpath = os.path.abspath(args.admindir)
opts = [f for f in sorted(os.listdir(dirpath)) if (f.endswith(".kcpps") or f.endswith(".kcppt") or f.endswith(".gguf")) and os.path.isfile(os.path.join(dirpath, f))]
valid_exts = (".kcpps", ".kcppt", ".gguf")
for entry in sorted(os.listdir(dirpath)): # Scan top-level directory
full_path = os.path.join(dirpath, entry)
if os.path.isfile(full_path) and entry.endswith(valid_exts): # If toplevel file
opts.append(entry)
elif os.path.isdir(full_path): #if dir, scan up to 1 level deep
for subentry in sorted(os.listdir(full_path)):
sub_full_path = os.path.join(full_path, subentry)
if os.path.isfile(sub_full_path) and subentry.endswith(valid_exts):
rel_path = os.path.join(entry, subentry)
opts.append(rel_path)
opts.append("unload_model")
response_body = (json.dumps(opts).encode())
@ -4621,13 +4631,26 @@ Change Mode<br>
resp = {"success": True}
else:
dirpath = os.path.abspath(args.admindir)
targetfilepath = os.path.join(dirpath, targetfile)
opts = [f for f in os.listdir(dirpath) if (f.lower().endswith(".kcpps") or f.lower().endswith(".kcppt") or f.lower().endswith(".gguf")) and os.path.isfile(os.path.join(dirpath, f))]
if targetfile in opts and os.path.exists(targetfilepath):
global_memory["restart_override_config_target"] = ""
valid_exts = (".kcpps", ".kcppt", ".gguf")
allowed_files = []
with os.scandir(dirpath) as entries: # Scan top-level and 1-level deep
for entry in entries:
if entry.is_file() and entry.name.lower().endswith(valid_exts):
allowed_files.append(entry.name)
elif entry.is_dir():
subdir_name = entry.name
with os.scandir(entry.path) as subentries:
for subentry in subentries:
if subentry.is_file() and subentry.name.lower().endswith(valid_exts):
allowed_files.append(os.path.join(subdir_name, subentry.name))
# Normalize requested target path
targetfilepath = os.path.abspath(os.path.join(dirpath, targetfile))
if (targetfile in allowed_files and os.path.commonpath([dirpath, targetfilepath]) == dirpath and os.path.exists(targetfilepath)):
global_memory["restart_override_config_target"] = "" # Jail enforcement
if targetfile.lower().endswith(".gguf") and overrideconfig:
overrideconfigfilepath = os.path.join(dirpath, overrideconfig)
if overrideconfig and overrideconfig in opts and os.path.exists(overrideconfigfilepath):
overrideconfigfilepath = os.path.abspath(os.path.join(dirpath, overrideconfig))
if (overrideconfig in allowed_files and os.path.commonpath([dirpath, overrideconfigfilepath]) == dirpath and os.path.exists(overrideconfigfilepath)):
print(f"Admin: Override config set to {overrideconfig}")
global_memory["restart_override_config_target"] = overrideconfig
print(f"Admin: Received request to reload config to {targetfile}")
@ -8255,8 +8278,14 @@ def main(launch_args, default_args):
time.sleep(0.5) #sleep for 0.5s then restart
if args.admin and args.admindir:
dirpath = os.path.abspath(args.admindir)
targetfilepath = os.path.join(dirpath, restart_target)
targetfilepath2 = os.path.join(dirpath, restart_override_config_target)
targetfilepath = os.path.abspath(os.path.join(dirpath, restart_target))
targetfilepath2 = os.path.abspath(os.path.join(dirpath, restart_override_config_target)) if restart_override_config_target else ""
if os.path.commonpath([dirpath, targetfilepath]) != dirpath: # Enforce admindir jail
print("Security: Invalid restart target path.")
continue
if targetfilepath2 and os.path.commonpath([dirpath, targetfilepath2]) != dirpath:
print("Security: Invalid override config path.")
continue
defaultargs = vars(default_args)
if (os.path.exists(targetfilepath) or restart_target=="unload_model") and (restart_override_config_target=="" or os.path.exists(targetfilepath2)):
print("Terminating old process...")