From b7cd210cd28914a260d9f679b99ffa0b9595fc4f Mon Sep 17 00:00:00 2001 From: Concedo <39025047+LostRuins@users.noreply.github.com> Date: Sun, 1 Dec 2024 00:56:39 +0800 Subject: [PATCH] more linting with Ruff (+1 squashed commits) Squashed commits: [43802cfe2] Applied default Ruff linting --- koboldcpp.py | 214 +++++++++++++++++++++++++++------------------------ 1 file changed, 114 insertions(+), 100 deletions(-) diff --git a/koboldcpp.py b/koboldcpp.py index 4acd3e0a5..9e0b08b04 100644 --- a/koboldcpp.py +++ b/koboldcpp.py @@ -9,11 +9,20 @@ # scenarios and everything Kobold and KoboldAI Lite have to offer. import ctypes -import os, math, re +import os +import math +import re import argparse import platform import base64 -import json, sys, http.server, time, asyncio, socket, threading +import struct +import json +import sys +import http.server +import time +import asyncio +import socket +import threading from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone @@ -298,7 +307,7 @@ def restore_stdout(): def get_default_threads(): physical_core_limit = 1 - if os.cpu_count()!=None and os.cpu_count()>1: + if os.cpu_count() is not None and os.cpu_count()>1: physical_core_limit = os.cpu_count() // 2 default_threads = (physical_core_limit if physical_core_limit<=3 else max(3,physical_core_limit-1)) processor = platform.processor() @@ -521,8 +530,8 @@ def set_backend_props(inputs): if args.usevulkan: #is an empty array if using vulkan without defined gpu s = "" - for l in range(0,len(args.usevulkan)): - s += str(args.usevulkan[l]) + for it in range(0,len(args.usevulkan)): + s += str(args.usevulkan[it]) inputs.vulkan_info = s.encode("UTF-8") else: inputs.vulkan_info = "".encode("UTF-8") @@ -593,7 +602,7 @@ def unpack_to_dir(destpath = ""): messagebox.showerror("Error", f"An error occurred while unpacking: {e}") else: if cliunpack: - print(f"The target folder is not empty or invalid. Please select an empty folder.") + print("The target folder is not empty or invalid. Please select an empty folder.") else: messagebox.showwarning("Invalid Selection", "The target folder is not empty or invalid. Please select an empty folder.") @@ -647,8 +656,6 @@ def string_contains_or_overlaps_sequence_substring(inputstr, sequences): return True return False -import struct - def read_gguf_metadata(file_path): chunk_size = 8192 # read only first 8kb of file try: @@ -681,7 +688,7 @@ def read_gguf_metadata(file_path): key_length = read_gguf_key(b'.attention.key_length',data,8192) val_length = read_gguf_key(b'.attention.value_length',data,8192) return [layercount,head_count_kv, max(key_length,val_length)] - except Exception as ex: + except Exception: return None def extract_modelfile_params(filepath,sdfilepath,whisperfilepath,mmprojfilepath,draftmodelpath): @@ -705,7 +712,7 @@ def extract_modelfile_params(filepath,sdfilepath,whisperfilepath,mmprojfilepath, if fsize>10000000: #dont bother with models < 10mb as they are probably bad ggufmeta = read_gguf_metadata(filepath) modelfile_extracted_meta = [ggufmeta,fsize,sdfsize,whisperfsize,mmprojsize,draftmodelsize] #extract done. note that meta may be null - except Exception as ex: + except Exception: modelfile_extracted_meta = None def autoset_gpu_layers(ctxsize,sdquanted,bbs): #shitty algo to determine how many layers to use @@ -757,7 +764,7 @@ def autoset_gpu_layers(ctxsize,sdquanted,bbs): #shitty algo to determine how man layerlimit = min(int(ratio*layers), (layers + 3)) layerlimit = (0 if layerlimit<=2 else layerlimit) return layerlimit - except Exception as ex: + except Exception: return 0 def fetch_gpu_properties(testCL,testCU,testVK): @@ -773,7 +780,7 @@ def fetch_gpu_properties(testCL,testCU,testVK): FetchedCUdevices = [line.split(",")[0].strip() for line in output.splitlines()] FetchedCUdeviceMem = [line.split(",")[1].strip().split(" ")[0].strip() for line in output.splitlines()] FetchedCUfreeMem = [line.split(",")[2].strip().split(" ")[0].strip() for line in output.splitlines()] - except Exception as e: + except Exception: pass if len(FetchedCUdevices)==0: try: # Get AMD ROCm GPU names @@ -781,16 +788,18 @@ def fetch_gpu_properties(testCL,testCU,testVK): device_name = None for line in output.splitlines(): # read through the output line by line line = line.strip() - if line.startswith("Marketing Name:"): device_name = line.split(":", 1)[1].strip() # if we find a named device, temporarily save the name + if line.startswith("Marketing Name:"): + device_name = line.split(":", 1)[1].strip() # if we find a named device, temporarily save the name elif line.startswith("Device Type:") and "GPU" in line and device_name is not None: # if the following Device Type is a GPU (not a CPU) then add it to devices list FetchedCUdevices.append(device_name) AMDgpu = True - elif line.startswith("Device Type:") and "GPU" not in line: device_name = None + elif line.startswith("Device Type:") and "GPU" not in line: + device_name = None if FetchedCUdevices: getamdvram = subprocess.run(['rocm-smi', '--showmeminfo', 'vram', '--csv'], capture_output=True, text=True, check=True, encoding='utf-8').stdout # fetch VRAM of devices if getamdvram: FetchedCUdeviceMem = [line.split(",")[1].strip() for line in getamdvram.splitlines()[1:] if line.strip()] - except Exception as e: + except Exception: pass lowestcumem = 0 lowestfreecumem = 0 @@ -823,7 +832,7 @@ def fetch_gpu_properties(testCL,testCU,testVK): if idx 0 and genparams.get('tool_choice',None) != None: + if tools_array and len(tools_array) > 0 and genparams.get('tool_choice',None) is not None: response_array = [{"id": "insert an id for the response", "type": "function", "function": {"name": "insert the name of the function you want to call", "arguments": {"first property key": "first property value", "second property key": "second property value"}}}] json_formatting_instruction = " Use this style of JSON object formatting to give your answer if you think the user is asking you to perform an action: " + json.dumps(response_array, indent=0) tools_string = json.dumps(tools_array, indent=0) @@ -1461,7 +1470,7 @@ def transform_genparams(genparams, api_format): try: specified_function = genparams.get('tool_choice').get('function').get('name') json_formatting_instruction = f"The user is asking you to use the style of this JSON object formatting to complete the parameters for the specific function named {specified_function} in the following format: " + json.dumps([{"id": "insert an id for the response", "type": "function", "function": {"name": f"{specified_function}", "arguments": {"first property key": "first property value", "second property key": "second property value"}}}], indent=0) - except Exception as e: + except Exception: # In case of any issues, just revert back to no specified function pass messages_string += json_formatting_instruction @@ -1671,7 +1680,7 @@ class ServerRequestHandler(http.server.SimpleHTTPRequestHandler): self.wfile.flush() async def send_kai_sse_event(self, data): - self.wfile.write(f'event: message\n'.encode()) + self.wfile.write('event: message\n'.encode()) self.wfile.write(f'data: {data}\n\n'.encode()) self.wfile.flush() @@ -1803,11 +1812,11 @@ class ServerRequestHandler(http.server.SimpleHTTPRequestHandler): auth_header = self.headers['Authorization'] elif 'authorization' in self.headers: auth_header = self.headers['authorization'] - if auth_header != None and auth_header.startswith('Bearer '): + if auth_header is not None and auth_header.startswith('Bearer '): token = auth_header[len('Bearer '):].strip() if token==password: auth_ok = True - if auth_ok==False: + if auth_ok is False: self.send_response(401) self.end_headers(content_type='application/json') self.wfile.write(json.dumps({"detail": { @@ -1847,7 +1856,7 @@ class ServerRequestHandler(http.server.SimpleHTTPRequestHandler): epurl = f"{httpsaffix}://localhost:{args.port}" if args.host!="": epurl = f"{httpsaffix}://{args.host}:{args.port}" - gen_payload = {"prompt": prompt,"max_length": max_length,"temperature": temperature,"prompt": prompt,"top_k": top_k,"top_p": top_p,"rep_pen": rep_pen,"ban_eos_token":ban_eos_token} + gen_payload = {"prompt": prompt,"max_length": max_length,"temperature": temperature,"top_k": top_k,"top_p": top_p,"rep_pen": rep_pen,"ban_eos_token":ban_eos_token} respjson = make_url_request(f'{epurl}/api/v1/generate', gen_payload) reply = html.escape(respjson["results"][0]["text"]) status = "Generation Completed" @@ -1928,7 +1937,7 @@ Enter Prompt:
auth_header = self.headers['Authorization'] elif 'authorization' in self.headers: auth_header = self.headers['authorization'] - if auth_header != None and auth_header.startswith('Bearer '): + if auth_header is not None and auth_header.startswith('Bearer '): token = auth_header[len('Bearer '):].strip() if token==password: auth_ok = True @@ -2048,20 +2057,20 @@ Enter Prompt:
elif self.path=="/api" or self.path=="/docs" or self.path.startswith(('/api/?json=','/api?json=','/docs/?json=','/docs?json=')): content_type = 'text/html' if embedded_kcpp_docs is None: - response_body = (f"KoboldCpp API is running!\n\nAPI usage reference can be found at the wiki: https://github.com/LostRuins/koboldcpp/wiki").encode() + response_body = ("KoboldCpp API is running!\n\nAPI usage reference can be found at the wiki: https://github.com/LostRuins/koboldcpp/wiki").encode() else: response_body = embedded_kcpp_docs elif self.path.startswith(("/sdui")): content_type = 'text/html' if embedded_kcpp_sdui is None: - response_body = (f"KoboldCpp API is running, but KCPP SDUI is not loaded").encode() + response_body = ("KoboldCpp API is running, but KCPP SDUI is not loaded").encode() else: response_body = embedded_kcpp_sdui elif self.path=="/v1": content_type = 'text/html' - response_body = (f"KoboldCpp OpenAI compatible endpoint is running!\n\nFor usage reference, see https://platform.openai.com/docs/api-reference").encode() + response_body = ("KoboldCpp OpenAI compatible endpoint is running!\n\nFor usage reference, see https://platform.openai.com/docs/api-reference").encode() elif self.path=="/api/extra/preloadstory": if preloaded_story is None: @@ -2128,7 +2137,7 @@ Enter Prompt:
self.rfile.readline() if chunk_length == 0: break - except Exception as e: + except Exception: self.send_response(500) self.end_headers(content_type='application/json') self.wfile.write(json.dumps({"detail": { @@ -2177,7 +2186,7 @@ Enter Prompt:
tempbody = json.loads(body) if isinstance(tempbody, dict): multiuserkey = tempbody.get('genkey', "") - except Exception as e: + except Exception: multiuserkey = "" pass if (multiuserkey=="" and requestsinqueue==0) or (multiuserkey!="" and multiuserkey==currentusergenkey): @@ -2200,7 +2209,7 @@ Enter Prompt:
tempbody = json.loads(body) if isinstance(tempbody, dict): multiuserkey = tempbody.get('genkey', "") - except Exception as e: + except Exception: multiuserkey = "" if totalgens>0: @@ -2218,7 +2227,7 @@ Enter Prompt:
tempbody = json.loads(body) if isinstance(tempbody, dict): multiuserkey = tempbody.get('genkey', "") - except Exception as e: + except Exception: multiuserkey = "" if totalgens>0: @@ -2240,7 +2249,7 @@ Enter Prompt:
if isinstance(tempbody, dict): sender = tempbody.get('sender', "") senderbusy = tempbody.get('senderbusy', False) - except Exception as e: + except Exception: pass if sender!="" and senderbusy: multiplayer_lastactive[sender] = int(time.time()) @@ -2380,7 +2389,7 @@ Enter Prompt:
genparams = None try: genparams = json.loads(body) - except Exception as e: + except Exception: genparams = None if is_transcribe: #fallback handling of file uploads b64wav = self.extract_b64string_from_file_upload(body) @@ -2399,7 +2408,7 @@ Enter Prompt:
is_quiet = args.quiet if (args.debugmode != -1 and not is_quiet) or args.debugmode >= 1: - utfprint(f"\nInput: " + json.dumps(genparams)) + utfprint("\nInput: " + json.dumps(genparams)) if args.foreground: bring_terminal_to_foreground() @@ -2497,7 +2506,7 @@ def is_port_in_use(portNum): import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(('localhost', portNum)) == 0 - except Exception as ex: + except Exception: return True def is_ipv6_supported(): @@ -2508,7 +2517,7 @@ def is_ipv6_supported(): sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) sock.close() return True - except Exception as ex: + except Exception: return False def RunServerMultiThreaded(addr, port): @@ -2542,7 +2551,7 @@ def RunServerMultiThreaded(addr, port): try: ipv6_sock.bind((addr, port)) ipv6_sock.listen(numThreads) - except Exception as ex: + except Exception: ipv6_sock = None print("IPv6 Socket Failed to Bind. IPv6 will be unavailable.") @@ -2619,7 +2628,7 @@ def show_gui(): import darkdetect as darkdt darkdt.isDark() pass - except Exception as e: + except Exception: pass import customtkinter as ctk @@ -2727,7 +2736,7 @@ def show_gui(): blasbatchsize_values = ["-1", "32", "64", "128", "256", "512", "1024", "2048"] blasbatchsize_text = ["Don't Batch BLAS","32","64","128","256","512","1024","2048"] contextsize_text = ["256", "512", "1024", "2048", "3072", "4096", "6144", "8192", "12288", "16384", "24576", "32768", "49152", "65536", "98304", "131072"] - antirunopts = [opt.replace("Use ", "") for lib, opt in lib_option_pairs if not (opt in runopts)] + antirunopts = [opt.replace("Use ", "") for lib, opt in lib_option_pairs if opt not in runopts] quantkv_text = ["F16 (Off)","8-Bit","4-Bit"] if not any(runopts): @@ -2942,8 +2951,8 @@ def show_gui(): def setup_backend_tooltip(parent): # backend count label with the tooltip function nl = '\n' - tooltxt = f"Number of backends you have built and available." + (f"\n\nMissing Backends: \n\n{nl.join(antirunopts)}" if len(runopts) < 8 else "") - num_backends_built = makelabel(parent, str(len(runopts)) + f"/8", 5, 2,tooltxt) + tooltxt = "Number of backends you have built and available." + (f"\n\nMissing Backends: \n\n{nl.join(antirunopts)}" if len(runopts) < 8 else "") + num_backends_built = makelabel(parent, str(len(runopts)) + "/8", 5, 2,tooltxt) num_backends_built.grid(row=1, column=1, padx=195, pady=0) num_backends_built.configure(text_color="#00ff00") @@ -2967,17 +2976,17 @@ def show_gui(): layercounter_label.grid(row=6, column=1, padx=75, sticky="W") quick_layercounter_label.grid(row=6, column=1, padx=75, sticky="W") if sys.platform=="darwin" and gpulayers_var.get()=="-1": - quick_layercounter_label.configure(text=f"(Auto: All Layers)") - layercounter_label.configure(text=f"(Auto: All Layers)") + quick_layercounter_label.configure(text="(Auto: All Layers)") + layercounter_label.configure(text="(Auto: All Layers)") elif gpu_be and gpulayers_var.get()=="-1" and predicted_gpu_layers>0: quick_layercounter_label.configure(text=f"(Auto: {predicted_gpu_layers}{max_gpu_layers} Layers)") layercounter_label.configure(text=f"(Auto: {predicted_gpu_layers}{max_gpu_layers} Layers)") elif gpu_be and gpulayers_var.get()=="-1" and predicted_gpu_layers<=0 and (modelfile_extracted_meta and modelfile_extracted_meta[1]): - quick_layercounter_label.configure(text=f"(Auto: No Offload)") - layercounter_label.configure(text=f"(Auto: No Offload)") + quick_layercounter_label.configure(text="(Auto: No Offload)") + layercounter_label.configure(text="(Auto: No Offload)") elif gpu_be and gpulayers_var.get()=="": - quick_layercounter_label.configure(text=f"(Set -1 for Auto)") - layercounter_label.configure(text=f"(Set -1 for Auto)") + quick_layercounter_label.configure(text="(Set -1 for Auto)") + layercounter_label.configure(text="(Set -1 for Auto)") else: layercounter_label.grid_remove() quick_layercounter_label.grid_remove() @@ -3000,7 +3009,7 @@ def show_gui(): else: quick_gpuname_label.configure(text=CUDevicesNames[s]) gpuname_label.configure(text=CUDevicesNames[s]) - except Exception as ex: + except Exception: pass else: quick_gpuname_label.configure(text="") @@ -3395,7 +3404,7 @@ def show_gui(): savdict["tensor_split"] = None savdict["config"] = None filename = asksaveasfile(filetypes=file_type, defaultextension=file_type) - if filename == None: + if filename is None: return file = open(str(filename.name), 'a') file.write(json.dumps(savdict)) @@ -3501,10 +3510,10 @@ def show_gui(): args.chatcompletionsadapter = None if chatcompletionsadapter_var.get() == "" else chatcompletionsadapter_var.get() try: if kcpp_exporting_template and isinstance(args.chatcompletionsadapter, str) and args.chatcompletionsadapter!="" and os.path.exists(args.chatcompletionsadapter): - print(f"Embedding chat completions adapter...") # parse and save embedded preload story + print("Embedding chat completions adapter...") # parse and save embedded preload story with open(args.chatcompletionsadapter, 'r') as f: args.chatcompletionsadapter = json.load(f) - except Exception as ex2: + except Exception: pass args.model_param = None if model_var.get() == "" else model_var.get() @@ -3512,10 +3521,10 @@ def show_gui(): args.preloadstory = None if preloadstory_var.get() == "" else preloadstory_var.get() try: if kcpp_exporting_template and isinstance(args.preloadstory, str) and args.preloadstory!="" and os.path.exists(args.preloadstory): - print(f"Embedding preload story...") # parse and save embedded preload story + print("Embedding preload story...") # parse and save embedded preload story with open(args.preloadstory, 'r') as f: args.preloadstory = json.load(f) - except Exception as ex2: + except Exception: pass args.mmproj = None if mmproj_var.get() == "" else mmproj_var.get() args.draftmodel = None if draftmodel_var.get() == "" else draftmodel_var.get() @@ -3732,7 +3741,8 @@ def show_gui(): savdict = json.loads(json.dumps(args.__dict__)) file_type = [("KoboldCpp Settings", "*.kcpps")] filename = asksaveasfile(filetypes=file_type, defaultextension=file_type) - if filename == None: return + if filename is None: + return file = open(str(filename.name), 'a') file.write(json.dumps(savdict)) file.close() @@ -3754,19 +3764,19 @@ def show_gui(): try: import webbrowser as wb wb.open("https://github.com/LostRuins/koboldcpp/wiki") - except: + except Exception: print("Cannot launch help in browser.") def display_help_models(): try: import webbrowser as wb wb.open("https://github.com/LostRuins/koboldcpp/wiki#what-models-does-koboldcpp-support-what-architectures-are-supported") - except: + except Exception: print("Cannot launch help in browser.") def display_updates(): try: import webbrowser as wb wb.open("https://github.com/LostRuins/koboldcpp/releases/latest") - except: + except Exception: print("Cannot launch updates in browser.") ctk.CTkButton(tabs , text = "Launch", fg_color="#2f8d3c", hover_color="#2faa3c", command = guilaunch, width=80, height = 35 ).grid(row=1,column=1, stick="se", padx= 25, pady=5) @@ -3820,7 +3830,7 @@ def show_gui_msgbox(title,message): messagebox.showerror(title=title, message=message) root.withdraw() root.quit() - except Exception as ex2: + except Exception: pass def show_gui_yesnobox(title,message): @@ -3834,7 +3844,7 @@ def show_gui_yesnobox(title,message): root.withdraw() root.quit() return result - except Exception as ex2: + except Exception: return False pass @@ -3842,7 +3852,8 @@ def print_with_time(txt): print(f"{datetime.now().strftime('[%H:%M:%S]')} " + txt, flush=True) def make_url_request(url, data, method='POST', headers={}): - import urllib.request, ssl + import urllib.request + import ssl global nocertify try: request = None @@ -3889,7 +3900,7 @@ def run_horde_worker(args, api_key, worker_name): reply = make_url_request_horde(url, submit_dict) if not reply: punishcounter += 1 - print_with_time(f"Error, Job submit failed.") + print_with_time("Error, Job submit failed.") else: reward = reply["reward"] session_kudos_earned += reward @@ -3925,7 +3936,7 @@ def run_horde_worker(args, api_key, worker_name): sleepy_counter = 0 #if this exceeds a value, worker becomes sleepy (slower) exitcounter = 0 print(f"===\nEmbedded Horde Worker '{worker_name}' Starting...\n(To use your own Horde Bridge/Scribe worker instead, don't set your API key)\n") - BRIDGE_AGENT = f"KoboldCppEmbedWorker:2:https://github.com/LostRuins/koboldcpp" + BRIDGE_AGENT = "KoboldCppEmbedWorker:2:https://github.com/LostRuins/koboldcpp" cluster = "https://aihorde.net" while exitcounter < 10: time.sleep(3) @@ -3944,10 +3955,10 @@ def run_horde_worker(args, api_key, worker_name): if exitcounter < 10: penaltytime = (2 ** exitcounter) print_with_time(f"Horde Worker Paused for {penaltytime} min - Too many errors. It will resume automatically, but you should restart it.") - print_with_time(f"Caution: Too many failed jobs may lead to entering maintenance mode.") + print_with_time("Caution: Too many failed jobs may lead to entering maintenance mode.") time.sleep(60 * penaltytime) else: - print_with_time(f"Horde Worker Exit limit reached, too many errors.") + print_with_time("Horde Worker Exit limit reached, too many errors.") global last_non_horde_req_time sec_since_non_horde = time.time() - last_non_horde_req_time @@ -3983,13 +3994,13 @@ def run_horde_worker(args, api_key, worker_name): time.sleep(slp) sleepy_counter += 1 if sleepy_counter==20: - print_with_time(f"No recent jobs, entering low power mode...") + print_with_time("No recent jobs, entering low power mode...") continue sleepy_counter = 0 current_id = pop['id'] current_payload = pop['payload'] - print(f"") #empty newline + print("") #empty newline print_with_time(f"Job received from {cluster} for {current_payload.get('max_length',80)} tokens and {current_payload.get('max_context_length',1024)} max context. Starting generation...") #do gen @@ -4005,11 +4016,11 @@ def run_horde_worker(args, api_key, worker_name): if currentjob_attempts>5: break - print_with_time(f"Server Busy - Not ready to generate...") + print_with_time("Server Busy - Not ready to generate...") time.sleep(5) #submit reply - print(f"") #empty newline + print("") #empty newline if current_generation: submit_dict = { "id": current_id, @@ -4020,15 +4031,15 @@ def run_horde_worker(args, api_key, worker_name): submit_thread = threading.Thread(target=submit_completed_generation, args=(submiturl, current_id, session_starttime, submit_dict)) submit_thread.start() #submit job in new thread so nothing is waiting else: - print_with_time(f"Error, Abandoned current job due to errors. Getting new job.") + print_with_time("Error, Abandoned current job due to errors. Getting new job.") current_id = None current_payload = None time.sleep(0.1) if exitcounter<100: - print_with_time(f"Horde Worker Shutdown - Too many errors.") + print_with_time("Horde Worker Shutdown - Too many errors.") else: - print_with_time(f"Horde Worker Shutdown - Server Closing.") + print_with_time("Horde Worker Shutdown - Server Closing.") exitcounter = 999 time.sleep(3) sys.exit(2) @@ -4071,7 +4082,7 @@ def check_deprecation_warning(): # but i am not going to troubleshoot or provide support for deprecated flags. global using_outdated_flags if using_outdated_flags: - print(f"\n=== !!! IMPORTANT WARNING !!! ===") + print("\n=== !!! IMPORTANT WARNING !!! ===") print("You are using one or more OUTDATED config files or launch flags!") print("The flags --hordeconfig and --sdconfig have been DEPRECATED, and MAY be REMOVED in future!") print("They will still work for now, but you SHOULD switch to the updated flags instead, to avoid future issues!") @@ -4086,7 +4097,8 @@ def setuptunnel(has_sd): # This script will help setup a cloudflared tunnel for accessing KoboldCpp over the internet # It should work out of the box on both linux and windows try: - import subprocess, re + import subprocess + import re global sslvalid httpsaffix = ("https" if sslvalid else "http") def run_tunnel(): @@ -4253,7 +4265,9 @@ def delete_old_pyinstaller(): if not base_path: return - import time, os, shutil + import time + import os + import shutil selfdirpath = os.path.abspath(base_path) temp_parentdir_path = os.path.abspath(os.path.join(base_path, '..')) for dirname in os.listdir(temp_parentdir_path): @@ -4369,7 +4383,7 @@ def main(launch_args,start_server=True): ermsg = "Reason: " + str(ex) + "\nFile selection GUI unsupported.\ncustomtkinter python module required!\nPlease check command line: script.py --help" show_gui_msgbox("Warning, GUI failed to start",ermsg) if args.skiplauncher: - print(f"Note: In order to use --skiplauncher, you need to specify a model with --model") + print("Note: In order to use --skiplauncher, you need to specify a model with --model") time.sleep(3) sys.exit(2) @@ -4383,7 +4397,7 @@ def main(launch_args,start_server=True): preloaded_story = f.read() canload = True elif isinstance(args.preloadstory, str): - print(f"Preloading saved story as JSON into server...") + print("Preloading saved story as JSON into server...") try: import ast parsed = ast.literal_eval(args.preloadstory) @@ -4400,7 +4414,7 @@ def main(launch_args,start_server=True): if canload: print("Saved story preloaded.") else: - print(f"Warning: Saved story file invalid or not found. No story will be preloaded into server.") + print("Warning: Saved story file invalid or not found. No story will be preloaded into server.") # try to read chat completions adapter if args.chatcompletionsadapter: @@ -4439,9 +4453,9 @@ def main(launch_args,start_server=True): except Exception as ex: print(ex) if canload: - print(f"Chat Completions Adapter Loaded") + print("Chat Completions Adapter Loaded") else: - print(f"Warning: Chat Completions Adapter invalid or not found.") + print("Warning: Chat Completions Adapter invalid or not found.") # handle model downloads if needed if args.model_param and args.model_param!="": @@ -4544,7 +4558,7 @@ def main(launch_args,start_server=True): print("WARNING: GPU layers is set, but a GPU backend was not selected! GPU will not be used!") args.gpulayers = 0 elif args.gpulayers==-1 and sys.platform=="darwin" and args.model_param and os.path.exists(args.model_param): - print(f"MacOS detected: Auto GPU layers set to maximum") + print("MacOS detected: Auto GPU layers set to maximum") args.gpulayers = 200 elif not shouldavoidgpu and args.model_param and os.path.exists(args.model_param): if (args.usecublas is None) and (args.usevulkan is None) and (args.useclblast is None): @@ -4560,7 +4574,7 @@ def main(launch_args,start_server=True): print(f"Auto Recommended GPU Layers: {layeramt}") args.gpulayers = layeramt else: - print(f"No GPU backend found, or could not automatically determine GPU layers. Please set it manually.") + print("No GPU backend found, or could not automatically determine GPU layers. Please set it manually.") args.gpulayers = 0 if args.threads == -1: @@ -4654,27 +4668,27 @@ def main(launch_args,start_server=True): if os.path.exists(args.sdlora): imglora = os.path.abspath(args.sdlora) else: - print(f"Missing SD LORA model file...") + print("Missing SD LORA model file...") if args.sdvae: if os.path.exists(args.sdvae): imgvae = os.path.abspath(args.sdvae) else: - print(f"Missing SD VAE model file...") + print("Missing SD VAE model file...") if args.sdt5xxl: if os.path.exists(args.sdt5xxl): imgt5xxl = os.path.abspath(args.sdt5xxl) else: - print(f"Missing SD T5-XXL model file...") + print("Missing SD T5-XXL model file...") if args.sdclipl: if os.path.exists(args.sdclipl): imgclipl = os.path.abspath(args.sdclipl) else: - print(f"Missing SD Clip-L model file...") + print("Missing SD Clip-L model file...") if args.sdclipg: if os.path.exists(args.sdclipg): imgclipg = os.path.abspath(args.sdclipg) else: - print(f"Missing SD Clip-G model file...") + print("Missing SD Clip-G model file...") imgmodel = os.path.abspath(imgmodel) fullsdmodelpath = imgmodel @@ -4719,7 +4733,7 @@ def main(launch_args,start_server=True): embedded_kailite = embedded_kailite.replace(origStr, patchedStr) embedded_kailite = embedded_kailite.encode() print("Embedded KoboldAI Lite loaded.") - except Exception as e: + except Exception: print("Could not find KoboldAI Lite. Embedded KoboldAI Lite will not be available.") try: @@ -4727,7 +4741,7 @@ def main(launch_args,start_server=True): with open(os.path.join(basepath, "kcpp_docs.embd"), mode='rb') as f: embedded_kcpp_docs = f.read() print("Embedded API docs loaded.") - except Exception as e: + except Exception: print("Could not find Embedded KoboldCpp API docs.") try: @@ -4736,7 +4750,7 @@ def main(launch_args,start_server=True): embedded_kcpp_sdui = f.read() if args.sdmodel: print("Embedded SDUI loaded.") - except Exception as e: + except Exception: print("Could not find Embedded SDUI.") if args.port_param!=defaultport: @@ -4765,7 +4779,7 @@ def main(launch_args,start_server=True): try: import webbrowser as wb wb.open(epurl) - except: + except Exception: print("--launch was set, but could not launch web browser automatically.") if args.hordekey and args.hordekey!="": @@ -4805,12 +4819,12 @@ def main(launch_args,start_server=True): benchbaneos = False if args.benchmark: if os.path.exists(args.benchmark) and os.path.getsize(args.benchmark) > 1000000: - print(f"\nWarning: The benchmark CSV output file you selected exceeds 1MB. This is probably not what you want, did you select the wrong CSV file?\nFor safety, benchmark output will not be saved.") + print("\nWarning: The benchmark CSV output file you selected exceeds 1MB. This is probably not what you want, did you select the wrong CSV file?\nFor safety, benchmark output will not be saved.") save_to_file = False if save_to_file: print(f"\nRunning benchmark (Save to File: {args.benchmark})...") else: - print(f"\nRunning benchmark (Not Saved)...") + print("\nRunning benchmark (Not Saved)...") if benchprompt=="": benchprompt = " 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1" for i in range(0,14): #generate massive prompt @@ -4856,7 +4870,7 @@ def main(launch_args,start_server=True): with open(args.benchmark, "a") as file: file.seek(0, 2) if file.tell() == 0: #empty file - file.write(f"Timestamp,Backend,Layers,Model,MaxCtx,GenAmount,ProcessingTime,ProcessingSpeed,GenerationTime,GenerationSpeed,TotalTime,Output,Flags") + file.write("Timestamp,Backend,Layers,Model,MaxCtx,GenAmount,ProcessingTime,ProcessingSpeed,GenerationTime,GenerationSpeed,TotalTime,Output,Flags") file.write(f"\n{datetimestamp},{libname},{args.gpulayers},{benchmodel},{benchmaxctx},{benchlen},{t_pp:.2f},{s_pp:.2f},{t_gen:.2f},{s_gen:.2f},{(t_pp+t_gen):.2f},{result},{benchflagstr}") except Exception as e: print(f"Error writing benchmark to file: {e}") @@ -4877,7 +4891,7 @@ def main(launch_args,start_server=True): else: # Flush stdout for previous win32 issue so the client can see output. if not args.prompt or args.benchmark: - print(f"Server was not started, main function complete. Idling.", flush=True) + print("Server was not started, main function complete. Idling.", flush=True) def run_in_queue(launch_args, input_queue, output_queue): main(launch_args, start_server=False)