consolidate imports

This commit is contained in:
Concedo 2025-06-09 17:48:54 +08:00
parent deece4be69
commit 8780b33c64

View file

@ -28,10 +28,13 @@ import threading
import html import html
import random import random
import hashlib import hashlib
import urllib.parse as urlparse import urllib.parse
import urllib.request
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Tuple from typing import Tuple
import shutil
import subprocess
# constants # constants
sampler_order_max = 7 sampler_order_max = 7
@ -640,7 +643,6 @@ def is_incomplete_utf8_sequence(byte_seq): #note, this will only flag INCOMPLETE
return False #invalid sequence, but not incomplete return False #invalid sequence, but not incomplete
def unpack_to_dir(destpath = ""): def unpack_to_dir(destpath = ""):
import shutil
srcpath = os.path.abspath(os.path.dirname(__file__)) srcpath = os.path.abspath(os.path.dirname(__file__))
cliunpack = False if destpath == "" else True cliunpack = False if destpath == "" else True
print("Attempt to unpack KoboldCpp into directory...") print("Attempt to unpack KoboldCpp into directory...")
@ -1036,8 +1038,6 @@ def autoset_gpu_layers(ctxsize, sdquanted, bbs, qkv_level): #shitty algo to dete
return 0 return 0
def fetch_gpu_properties(testCL,testCU,testVK): def fetch_gpu_properties(testCL,testCU,testVK):
import subprocess
gpumem_ignore_limit_min = 1024*1024*600 #600 mb min gpumem_ignore_limit_min = 1024*1024*600 #600 mb min
gpumem_ignore_limit_max = 1024*1024*1024*300 #300 gb max gpumem_ignore_limit_max = 1024*1024*1024*300 #300 gb max
@ -1812,11 +1812,8 @@ def websearch(query):
if query==websearch_lastquery: if query==websearch_lastquery:
print("Returning cached websearch...") print("Returning cached websearch...")
return websearch_lastresponse return websearch_lastresponse
import urllib.parse
import urllib.request
import difflib import difflib
from html.parser import HTMLParser from html.parser import HTMLParser
from concurrent.futures import ThreadPoolExecutor
num_results = 3 num_results = 3
searchresults = [] searchresults = []
utfprint("Performing new websearch...",1) utfprint("Performing new websearch...",1)
@ -1977,7 +1974,6 @@ def websearch(query):
def is_port_in_use(portNum): def is_port_in_use(portNum):
try: try:
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', portNum)) == 0 return s.connect_ex(('localhost', portNum)) == 0
except Exception: except Exception:
@ -2426,7 +2422,6 @@ ws ::= | " " | "\n" [ \t]{0,20}
def LaunchWebbrowser(target_url, failedmsg): def LaunchWebbrowser(target_url, failedmsg):
try: try:
if os.name == "posix" and "DISPLAY" in os.environ: # UNIX-like systems if os.name == "posix" and "DISPLAY" in os.environ: # UNIX-like systems
import subprocess
clean_env = os.environ.copy() clean_env = os.environ.copy()
clean_env.pop("LD_LIBRARY_PATH", None) clean_env.pop("LD_LIBRARY_PATH", None)
clean_env["PATH"] = "/usr/bin:/bin" clean_env["PATH"] = "/usr/bin:/bin"
@ -2744,8 +2739,8 @@ class KcppServerRequestHandler(http.server.SimpleHTTPRequestHandler):
def noscript_webui(self): def noscript_webui(self):
global modelbusy, sslvalid global modelbusy, sslvalid
parsed_url = urlparse.urlparse(self.path) parsed_url = urllib.parse.urlparse(self.path)
parsed_dict = urlparse.parse_qs(parsed_url.query) parsed_dict = urllib.parse.parse_qs(parsed_url.query)
reply = "" reply = ""
status = str(parsed_dict['status'][0]) if 'status' in parsed_dict else "Ready To Generate" status = str(parsed_dict['status'][0]) if 'status' in parsed_dict else "Ready To Generate"
prompt = str(parsed_dict['prompt'][0]) if 'prompt' in parsed_dict else "" prompt = str(parsed_dict['prompt'][0]) if 'prompt' in parsed_dict else ""
@ -2816,7 +2811,7 @@ class KcppServerRequestHandler(http.server.SimpleHTTPRequestHandler):
parsed_dict["status"] = status parsed_dict["status"] = status
parsed_dict["chatmode"] = ("1" if chatmode else "0") parsed_dict["chatmode"] = ("1" if chatmode else "0")
parsed_dict["imgmode"] = ("1" if imgmode else "0") parsed_dict["imgmode"] = ("1" if imgmode else "0")
updated_query_string = urlparse.urlencode(parsed_dict, doseq=True) updated_query_string = urllib.parse.urlencode(parsed_dict, doseq=True)
updated_path = parsed_url._replace(query=updated_query_string).geturl() updated_path = parsed_url._replace(query=updated_query_string).geturl()
self.path = updated_path self.path = updated_path
time.sleep(0.5) #short delay time.sleep(0.5) #short delay
@ -3902,8 +3897,6 @@ def RunServerMultiThreaded(addr, port, server_handler):
# Based on https://github.com/mathgeniuszach/xdialog/blob/main/xdialog/zenity_dialogs.py - MIT license | - Expanded version by Henk717 # Based on https://github.com/mathgeniuszach/xdialog/blob/main/xdialog/zenity_dialogs.py - MIT license | - Expanded version by Henk717
def zenity(filetypes=None, initialdir="", initialfile="", **kwargs) -> Tuple[int, str]: def zenity(filetypes=None, initialdir="", initialfile="", **kwargs) -> Tuple[int, str]:
import shutil
import subprocess
global zenity_recent_dir, zenity_permitted global zenity_recent_dir, zenity_permitted
if not zenity_permitted: if not zenity_permitted:
@ -4445,8 +4438,8 @@ def show_gui():
searchbase = model_search.get() searchbase = model_search.get()
if searchbase.strip()=="": if searchbase.strip()=="":
return return
urlcode = urlparse.urlencode({"search":( "GGUF " + searchbase),"limit":10}, doseq=True) urlcode = urllib.parse.urlencode({"search":( "GGUF " + searchbase),"limit":10}, doseq=True)
urlcode2 = urlparse.urlencode({"search":searchbase,"limit":6}, doseq=True) urlcode2 = urllib.parse.urlencode({"search":searchbase,"limit":6}, doseq=True)
resp = make_url_request(f"https://huggingface.co/api/models?{urlcode}",None,'GET',{},10) resp = make_url_request(f"https://huggingface.co/api/models?{urlcode}",None,'GET',{},10)
for m in resp: for m in resp:
searchedmodels.append(m["id"]) searchedmodels.append(m["id"])
@ -5541,7 +5534,6 @@ def print_with_time(txt):
print(f"{datetime.now().strftime('[%H:%M:%S]')} " + txt, flush=True) print(f"{datetime.now().strftime('[%H:%M:%S]')} " + txt, flush=True)
def make_url_request(url, data, method='POST', headers={}, timeout=300): def make_url_request(url, data, method='POST', headers={}, timeout=300):
import urllib.request
global nocertify global nocertify
try: try:
request = None request = None
@ -5572,7 +5564,6 @@ def make_url_request(url, data, method='POST', headers={}, timeout=300):
#A very simple and stripped down embedded horde worker with no dependencies #A very simple and stripped down embedded horde worker with no dependencies
def run_horde_worker(args, api_key, worker_name): def run_horde_worker(args, api_key, worker_name):
import random
global friendlymodelname, maxhordectx, maxhordelen, exitcounter, punishcounter, modelbusy, session_starttime, sslvalid global friendlymodelname, maxhordectx, maxhordelen, exitcounter, punishcounter, modelbusy, session_starttime, sslvalid
httpsaffix = ("https" if sslvalid else "http") httpsaffix = ("https" if sslvalid else "http")
epurl = f"{httpsaffix}://localhost:{args.port}" epurl = f"{httpsaffix}://localhost:{args.port}"
@ -5769,8 +5760,6 @@ def setuptunnel(global_memory, has_sd):
# This script will help setup a cloudflared tunnel for accessing KoboldCpp over the internet # This script will help setup a cloudflared tunnel for accessing KoboldCpp over the internet
# It should work out of the box on both linux and windows # It should work out of the box on both linux and windows
try: try:
import subprocess
import re
global sslvalid global sslvalid
httpsaffix = ("https" if sslvalid else "http") httpsaffix = ("https" if sslvalid else "http")
ssladd = (" --no-tls-verify" if sslvalid else "") ssladd = (" --no-tls-verify" if sslvalid else "")
@ -5935,9 +5924,6 @@ def delete_old_pyinstaller():
if not base_path: if not base_path:
return return
import time
import os
import shutil
selfdirpath = os.path.abspath(base_path) selfdirpath = os.path.abspath(base_path)
temp_parentdir_path = os.path.abspath(os.path.join(base_path, '..')) temp_parentdir_path = os.path.abspath(os.path.join(base_path, '..'))
for dirname in os.listdir(temp_parentdir_path): for dirname in os.listdir(temp_parentdir_path):
@ -5959,15 +5945,10 @@ def delete_old_pyinstaller():
def sanitize_string(input_string): def sanitize_string(input_string):
# alphanumeric characters, dots, dashes, and underscores # alphanumeric characters, dots, dashes, and underscores
import re
sanitized_string = re.sub( r'[^\w\d\.\-_]', '', input_string) sanitized_string = re.sub( r'[^\w\d\.\-_]', '', input_string)
return sanitized_string return sanitized_string
def downloader_internal(input_url, output_filename, capture_output, min_file_size=64): # 64 bytes required by default def downloader_internal(input_url, output_filename, capture_output, min_file_size=64): # 64 bytes required by default
import shutil
import subprocess
import os
if "https://huggingface.co/" in input_url and "/blob/main/" in input_url: if "https://huggingface.co/" in input_url and "/blob/main/" in input_url:
input_url = input_url.replace("/blob/main/", "/resolve/main/") input_url = input_url.replace("/blob/main/", "/resolve/main/")
if output_filename == "auto": if output_filename == "auto":
@ -6901,7 +6882,6 @@ def kcpp_main_process(launch_args, g_memory=None, gui_launcher=False):
#if post-ready script specified, execute it #if post-ready script specified, execute it
if args.onready: if args.onready:
def onready_subprocess(): def onready_subprocess():
import subprocess
print("Starting Post-Load subprocess...") print("Starting Post-Load subprocess...")
subprocess.run(args.onready[0], shell=True) subprocess.run(args.onready[0], shell=True)
timer_thread = threading.Timer(1, onready_subprocess) #1 second delay timer_thread = threading.Timer(1, onready_subprocess) #1 second delay
@ -7026,7 +7006,6 @@ def kcpp_main_process(launch_args, g_memory=None, gui_launcher=False):
print("Server was not started, main function complete. Idling.", flush=True) print("Server was not started, main function complete. Idling.", flush=True)
if __name__ == '__main__': if __name__ == '__main__':
import multiprocessing
multiprocessing.freeze_support() multiprocessing.freeze_support()
def check_range(value_type, min_value, max_value): def check_range(value_type, min_value, max_value):