diff --git a/app/cache.py b/app/cache.py index eefe9ec..4c16c00 100644 --- a/app/cache.py +++ b/app/cache.py @@ -22,7 +22,7 @@ class Cache: self.init_delete_expired_values() def get_connection(self): - return sqlite3.connect(self.params.file) + return sqlite3.connect(self.params.file, timeout=30000) def init_pybase_migration(self): os.environ["PYWAY_TYPE"] = "sqlite" diff --git a/app/cuda.py b/app/cuda.py index d09ed4b..dc1ff84 100644 --- a/app/cuda.py +++ b/app/cuda.py @@ -34,3 +34,7 @@ def get_device_with_gpu_num(options: dict) -> str: return "cuda:{0}".format(options["cuda_device_index"]) else: return "cpu" + + +def gpu_count() -> int: + return torch.cuda.device_count() \ No newline at end of file diff --git a/app/file_processor_html.py b/app/file_processor_html.py index c3827dd..386ef8d 100644 --- a/app/file_processor_html.py +++ b/app/file_processor_html.py @@ -2,9 +2,8 @@ from typing import Iterator from bs4 import BeautifulSoup, PageElement, Tag -from app import file_processor +from app import file_processor, parallel_process, dto from app.app_core import AppCore -from app.dto import ProcessingFileDirReq class FileProcessorHtml: @@ -43,13 +42,16 @@ class FileProcessorHtml: original_element.append(additional_tag_element) return original_element - def process(self, req: ProcessingFileDirReq, soup: BeautifulSoup, body_tag: str = None) -> None: + def process_html(self, req: dto.ProcessingFileDirReq, soup: BeautifulSoup, body_tag, parallel: bool, + gpu_count_for_parallel = None) -> None: translate_only_first_paragraphs: int = self.options.get("translate_only_first_paragraphs", 0) children: Iterator[PageElement] = soup.find(body_tag).descendants if body_tag else soup.descendants translated_paragraphs = 0 all_original_text_items: list[str] = [] + translate_params: list[dto.TranslateCommonRequest] = list() + for child in children: if (child and child.text and child.parent and child.parent.get(self.attribute_source) is None and child.parent.get(self.attribute_translate) is None): @@ -68,24 +70,39 @@ class FileProcessorHtml: all_original_text_items.append(original_text) translate_req = req.translate_req(text=original_text, context=context) - translate_txt = self.core.translate(translate_req).result - translated_paragraphs = translated_paragraphs + 1 - if 0 < translate_only_first_paragraphs <= translated_paragraphs: - break - if child_tag in self.text_tags: - translate_element = self.get_translate_element(soup, child, translate_txt) - if req.preserve_original_text: - child.parent.insert_after(translate_element) - original_element = self.get_original_element(soup, child, original_text) - if original_element: - child.replaceWith(original_element) - else: - child.replaceWith(translate_element) + # if parallel - only fill params list, after that will be start async translate + if parallel: + translate_params.append(translate_req) + else: + translate_txt = self.core.translate(translate_req).result + translated_paragraphs = translated_paragraphs + 1 + if 0 < translate_only_first_paragraphs <= translated_paragraphs: + break - elif child_tag in self.header_tags: - if req.preserve_original_text: - child.parent.string = f'{original_text}{self.header_delimiter}{translate_txt}' - else: - child.parent.string = translate_txt + if child_tag in self.text_tags: + translate_element = self.get_translate_element(soup, child, translate_txt) + if req.preserve_original_text: + child.parent.insert_after(translate_element) + original_element = self.get_original_element(soup, child, original_text) + if original_element: + child.replaceWith(original_element) + else: + child.replaceWith(translate_element) + elif child_tag in self.header_tags: + if req.preserve_original_text: + child.parent.string = f'{original_text}{self.header_delimiter}{translate_txt}' + else: + child.parent.string = translate_txt + + if parallel: + parallel_process.start_parallel_processing(gpu_count_for_parallel, self.core, translate_params) + + def process(self, req: dto.ProcessingFileDirReq, soup: BeautifulSoup, body_tag: str = None) -> None: + gpu_count_for_parallel = parallel_process.translate_plugin_support_parallel_gpu_count(self.core, req.translator_plugin) + if gpu_count_for_parallel is not None: + # First pre-pass - translate without any actions, for fill cache. Next pass get translated text from cache. + self.process_html(req, soup, body_tag, True, gpu_count_for_parallel) + + self.process_html(req, soup, body_tag, False) diff --git a/app/parallel_process.py b/app/parallel_process.py new file mode 100644 index 0000000..0cd233d --- /dev/null +++ b/app/parallel_process.py @@ -0,0 +1,71 @@ +import threading +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass + +from app import dto, log +from app.app_core import AppCore + +logger = log.logger() + +model_name_parallel_postfix = "--parallel-gpu#" +executor_translate_prefix = "executor_translate_thread" +executor_file_processing_prefix = "executor_file_processing_thread" + + +@dataclass +class AsyncResult: + content: str + model: str + part_num: int + + +def get_model_name_by_gpu_id(model: str, gpu_id: int) -> str: + return f'{model}{model_name_parallel_postfix}{gpu_id}' + + +def translate_plugin_support_parallel_gpu_count(core: AppCore, custom_translator_plugin: str) -> int | None: + """ + If translate plugin support parallel translate with few GPU and parallel processing enabled - return GPU count. + :param core: core + :param custom_translator_plugin: translate plugin from request, may be empty + :return: GPU count if parallel processing enabled or None otherwise. + """ + translator_plugin = core.get_translator_plugin(custom_translator_plugin) + + if "lm_studio" == translator_plugin: + plugin_info = core.initialized_translator_engines[translator_plugin] + options = core.plugin_options(plugin_info.plugin_name) + if options["use_library"]["enabled"] and options["use_library"]["model"] != "" and options['parallel_processing']["enabled"]: + enabled_gpu_numbers: list[int] = options['parallel_processing']["enabled_gpu_numbers"] + return len(enabled_gpu_numbers) + + return None + + +def thread_num() -> int | None: + thread_name: str = threading.current_thread().name + if thread_name.startswith(executor_translate_prefix): + return int(thread_name.replace(executor_translate_prefix + "_", "")) + elif thread_name.startswith(executor_file_processing_prefix): + return int(thread_name.replace(executor_file_processing_prefix + "_", "")) + else: + return None + + +def is_main_thread() -> bool: + return 'MainThread' == threading.current_thread().name + + +def start_parallel_processing(gpu_count_for_parallel: int, core: AppCore, + translate_params: list[dto.TranslateCommonRequest]) -> list[dto.TranslateResp]: + with ThreadPoolExecutor(max_workers=gpu_count_for_parallel, + thread_name_prefix=executor_file_processing_prefix) as executor: + async_results: list[dto.TranslateResp] = list(executor.map(core.translate, translate_params)) + logger.info("Finish preprocess parallel task. Requests: " + str(len(async_results))) + + return async_results + + + + + diff --git a/doc/ru/plugins-translate/lm_studio.md b/doc/ru/plugins-translate/lm_studio.md index 85065bb..c30e7f9 100644 --- a/doc/ru/plugins-translate/lm_studio.md +++ b/doc/ru/plugins-translate/lm_studio.md @@ -21,6 +21,18 @@ Формат `"имя_модели_в_нижнем_регистре": "Специальная инструкция для перевода"`. Имя модели выводится в логах при старте плагина в таком формате: `Success init translation plugin: 'lm_studio'. Model: nemo_12b_gguf`. Имя модели - `nemo_12b_gguf`, обязательно в нижнем регистре. +* группа параметров **use_library** - параметры, включающую интеграцию с LM Studio через библиотеку, а не через REST - дает больше возможностей. +* * enabled - включен или выключен режим работы с использованием библиотеки +* * model - если здесь указана модель, и в LM studio в момент старта приложения не будет эта модель загружена - приложение загрузит модель с указанными ниже параметрами. +Имя модели можно узнать в LM studio, вкладка моделей, столбец `LLM`. +* * model_context_length - длина контекста при загрузке модели. Большой контекст позволяет работать с текстами большего размера, но расходует память. + +* группа параметров **parallel_processing** - параметры, позволяющие выполнять параллельный перевод на нескольких видеокартах +* * enabled - включена или выключена параллельная обработка. Более подробно - в этом документе, в разделе _Как работает параллельная обработка на нескольких видеокартах_. +Кроме этого параметра должен быть включен параметр `use_library.enabled` и указана модель в `use_library.model`. +* * enabled_gpu_numbers - номера видеокарт в системе, на которых будет происходить обработка. +Номер и имя видеокарты указывается при старте приложения в логе, вида `INFO GPU #0: NVIDIA GeForce RTX 4090`. `0` - указываемый в параметре номер. + * опциональная группа параметров **text_processing_params** * опциональная группа параметров **text_split_params** @@ -38,3 +50,16 @@ Про опциональные группы более подробно - [здесь](../options.md), в разделах **Настройки core** и **Переопределение групп параметров в настройках плагинов переводов**. + +### Как работает параллельная обработка на нескольких видеокартах + +**Имеет смысл включать только при наличии в системе более одной видеокарты!** + +Кроме этого параметра должен быть включен параметр `use_library.enabled` и указана модель в `use_library.model`. + +Предположим, что указаны параметры модели `"use_library"."model": "model_name"` и устройства в `enabled_gpu_numbers: [0, 1]`. +Тогда приложение при старте проверит, загружены ли в LM Studio модели `model_name--parallel-gpu#0` и `model_name--parallel-gpu#1`, +если не загружены - попытается загрузить. Параметры загрузки указаны так, чтобы каждая модель приоритетна была загружена на соответсвующую видеокарту. + +Очень желательно, чтобы модели и их контекст полностью помещались в памяти видеокарты. + diff --git a/integrations/renpy/_llm-translate-integration.rpy b/integrations/renpy/_llm-translate-integration.rpy index 401978e..96874c7 100644 --- a/integrations/renpy/_llm-translate-integration.rpy +++ b/integrations/renpy/_llm-translate-integration.rpy @@ -1,12 +1,13 @@ init 99 python: import sys + import requests #-------------------------------------------------------------------------- # Configuration variables #-------------------------------------------------------------------------- llm_translate__translate_path = "http://127.0.0.1:4990/translate" llm_translate__preserve_original_text = True - llm_translate__translate_text_all = False # not recommended + llm_translate__translate_text_all = False # recommended False llm_translate__translate_font_name = "DejaVuSans.ttf" llm_translate__translate_font_size = 22 llm_translate__translate_font_format_tag = "i" @@ -35,7 +36,6 @@ init 99 python: return s[0] - # def llm_translate__preprocess_text(src): """ Preprocess text - remove tags, fill variables values @@ -50,7 +50,7 @@ init 99 python: s = s.replace("{p}", "\n") s = llm_translate__fill_variables_values(s) - s = renpy.translation.dialogue.notags_filter(s) #remove tags {} + s = renpy.translation.dialogue.notags_filter(s) # remove tags {} return s @@ -78,7 +78,6 @@ init 99 python: :param s: preprocessed text :return: translate result """ - import requests req = { "text": s, "llm_translate__from_lang": llm_translate__from_lang, @@ -102,7 +101,7 @@ init 99 python: else: return resp["error"] - # + def llm_translate__translate_text(src): """ Main function to translate @@ -119,20 +118,22 @@ init 99 python: if s is None or s == "": return src - dict_translated_value = llm_translate__translated_text_dict.get(s, None) - if dict_translated_value is None: + translate_result = llm_translate__translated_text_dict.get(s, None) + if translate_result is None: translate_result = llm_translate__request_python_v3(src, s) - llm_translate__translated_text_dict[s] = translate_result - else: - return dict_translated_value - # enable or disable translate + return translate_result + + def llm_translate__toggle_translate(): + """ + Toggle translate button listener + """ global llm_translate__translate_toggle_value - value = not llm_translate__translate_toggle_value - llm_translate__translate_toggle_value = value - if not value: #clear cache + toggle_value = not llm_translate__translate_toggle_value + llm_translate__translate_toggle_value = toggle_value + if not toggle_value: #clear cache llm_translate__translated_text_dict.clear() # apply replace text @@ -142,11 +143,6 @@ init 99 python: config.say_menu_text_filter = llm_translate__translate_text - # translate request with module requests - for python v3 - def llm_translate__request_python_v2(src, s): - return src - - # button to enable or disable translate screen toggle_tr_button(): hbox: diff --git a/plugins/plugin_file_srt.py b/plugins/plugin_file_srt.py index 71af7e0..748e884 100644 --- a/plugins/plugin_file_srt.py +++ b/plugins/plugin_file_srt.py @@ -2,9 +2,8 @@ import os import pysrt -from app import file_processor, params +from app import file_processor, params, parallel_process, dto from app.app_core import AppCore -from app.dto import ProcessingFileDirReq, ProcessingFileResp, FileProcessingPluginInitInfo, ProcessingFileStruct plugin_name = os.path.basename(__file__)[:-3] # calculating modname @@ -45,15 +44,31 @@ def start_with_options(core: AppCore, manifest: dict): pass -def init(core: AppCore) -> FileProcessingPluginInitInfo: - return FileProcessingPluginInitInfo(plugin_name=plugin_name, supported_extensions={"srt"}) +def init(core: AppCore) -> dto.FileProcessingPluginInitInfo: + return dto.FileProcessingPluginInitInfo(plugin_name=plugin_name, supported_extensions={"srt"}) -def file_processing(core: AppCore, file_struct: ProcessingFileStruct, req: ProcessingFileDirReq) -> ProcessingFileResp: +def parallel_preprocessing(subs, core: AppCore, req: dto.ProcessingFileDirReq) -> None: + gpu_count_for_parallel = parallel_process.translate_plugin_support_parallel_gpu_count(core, req.translator_plugin) + if gpu_count_for_parallel is None: + return None + + # First pre-pass - translate without any actions, for fill cache. Next pass get translated text from cache. + translate_params: list[dto.TranslateCommonRequest] = list() + for sub in subs: + translate_req = req.translate_req(sub.text, "") + translate_params.append(translate_req) + + parallel_process.start_parallel_processing(gpu_count_for_parallel, core, translate_params) + + +def file_processing(core: AppCore, file_struct: dto.ProcessingFileStruct, req: dto.ProcessingFileDirReq) -> dto.ProcessingFileResp: options = core.plugin_options(plugin_name) text_format = params.read_plugin_file_processing_text_format(options) subs = pysrt.open(file_struct.path_file_in()) + parallel_preprocessing(subs, core, req) + for sub in subs: text = sub.text translate_req = req.translate_req(text, "") @@ -71,7 +86,7 @@ def file_processing(core: AppCore, file_struct: ProcessingFileStruct, req: Proce return file_processor.get_processing_file_resp_ok(file_struct=file_struct, file_out=out_file_name) -def processed_file_name(core: AppCore, file_struct: ProcessingFileStruct, req: ProcessingFileDirReq) -> str: +def processed_file_name(core: AppCore, file_struct: dto.ProcessingFileStruct, req: dto.ProcessingFileDirReq) -> str: options = core.plugin_options(plugin_name) src_postfix = options["remove_src_filename_postfix"] diff --git a/plugins/plugin_file_txt.py b/plugins/plugin_file_txt.py index a11757c..13ae465 100644 --- a/plugins/plugin_file_txt.py +++ b/plugins/plugin_file_txt.py @@ -1,8 +1,7 @@ import os -from app import file_processor, params +from app import file_processor, params, parallel_process, dto from app.app_core import AppCore -from app.dto import ProcessingFileDirReq, ProcessingFileResp, FileProcessingPluginInitInfo, ProcessingFileStruct plugin_name = os.path.basename(__file__)[:-3] # calculating modname @@ -43,11 +42,30 @@ def start_with_options(core: AppCore, manifest: dict): pass -def init(core: AppCore) -> FileProcessingPluginInitInfo: - return FileProcessingPluginInitInfo(plugin_name=plugin_name, supported_extensions={"txt"}) +def init(core: AppCore) -> dto.FileProcessingPluginInitInfo: + return dto.FileProcessingPluginInitInfo(plugin_name=plugin_name, supported_extensions={"txt"}) -def file_processing(core: AppCore, file_struct: ProcessingFileStruct, req: ProcessingFileDirReq) -> ProcessingFileResp: +def parallel_preprocessing(lines: list[str], core: AppCore, req: dto.ProcessingFileDirReq) -> None: + gpu_count_for_parallel = parallel_process.translate_plugin_support_parallel_gpu_count(core, req.translator_plugin) + if gpu_count_for_parallel is None: + return None + + # First pre-pass - translate without any actions, for fill cache. Next pass get translated text from cache. + translate_params: list[dto.TranslateCommonRequest] = list() + all_original_text_items: list[str] = [] + for line in lines: + if line != '': + context = file_processor.get_context(items_to_context=all_original_text_items, + params=core.file_processing_params.context_params, translate_text=line) + translate_req = req.translate_req(line, context) + all_original_text_items.append(line) + translate_params.append(translate_req) + + parallel_process.start_parallel_processing(gpu_count_for_parallel, core, translate_params) + + +def file_processing(core: AppCore, file_struct: dto.ProcessingFileStruct, req: dto.ProcessingFileDirReq) -> dto.ProcessingFileResp: options = core.plugin_options(plugin_name) markdown_output: bool = options["markdown_output"] new_line_delimiter: str = options["new_line_delimiter"] @@ -58,6 +76,9 @@ def file_processing(core: AppCore, file_struct: ProcessingFileStruct, req: Proce all_original_text_items: list[str] = [] file_content = file_processor.read_file_with_fix_encoding(file_struct.path_file_in()) lines: list[str] = file_content.splitlines() + + parallel_preprocessing(lines, core, req) + for line in lines: if line == '': result_lines.append(new_line_delimiter) @@ -82,7 +103,7 @@ def file_processing(core: AppCore, file_struct: ProcessingFileStruct, req: Proce return file_processor.get_processing_file_resp_ok(file_struct=file_struct, file_out=out_file_name) -def processed_file_name(core: AppCore, file_struct: ProcessingFileStruct, req: ProcessingFileDirReq) -> str: +def processed_file_name(core: AppCore, file_struct: dto.ProcessingFileStruct, req: dto.ProcessingFileDirReq) -> str: options = core.plugin_options(plugin_name) ext = "md" if options["markdown_output"] else None diff --git a/plugins/plugin_lm_studio.py b/plugins/plugin_lm_studio.py index fd7770f..eb9f6e9 100644 --- a/plugins/plugin_lm_studio.py +++ b/plugins/plugin_lm_studio.py @@ -1,17 +1,22 @@ import os +from concurrent.futures import ThreadPoolExecutor import lmstudio -from lmstudio import LLM, LlmPredictionConfig +from lmstudio import LlmPredictionConfig, LlmLoadModelConfig +from lmstudio._sdk_models import GpuSetting from tqdm import tqdm -from app import params, translate_func +from app import params, translate_func, cuda, parallel_process, log from app.app_core import AppCore from app.dto import TranslatePluginInitInfo, TranslateStruct from app.lang_dict import get_lang_by_2_chars_code plugin_name = os.path.basename(__file__)[:-3] # calculating modname -llm_model: LLM | None = None +llm_model_list_names: list[str] = [] model_name: str = "" +logger = log.logger() + +executor: ThreadPoolExecutor def start(core: AppCore): @@ -24,7 +29,15 @@ def start(core: AppCore): "prompt": "You are a professional translator. Your task is to translate a text (or word) provided below from %%from_lang%% to %%to_lang%%.\n%%context_prompt%%\nINSTRUCTION:Carefully analyze the context. Pay special attention to Terminology, Style, Consistency. Provide only the translation. Do not include any additional information, explanations, notes, or comments in your response. The output should be the pure translated text only.\nTEXT TO TRANSLATE:", "prompt_postfix": "", "prompt_no_think_postfix": False, - "use_library_for_request": True, + "use_library": { + "enabled": True, + "model": "", + "model_context_length": 8192 + }, + "parallel_processing": { + "enabled": False, + "enabled_gpu_numbers": [0, 1] + }, "special_prompt_for_model": { "my_model_name": "special prompt" }, @@ -43,29 +56,72 @@ def start_with_options(core: AppCore, manifest: dict): pass +def init_parallel_processing(options: dict) -> None: + model_name_param = options['use_library']['model'] + gpu_numbers_for_processing: list[int] = options['parallel_processing']["enabled_gpu_numbers"] + loaded_models = list(map(lambda item: item.identifier, lmstudio.list_loaded_models("llm"))) + client = lmstudio.get_default_client() + gpu_count = cuda.gpu_count() + + for gpu_number in gpu_numbers_for_processing: + model_name_parallel = parallel_process.get_model_name_by_gpu_id(model_name_param, gpu_number) + # Check, maybe model already loaded. If not - try to load. + if model_name_parallel not in loaded_models: + # disable all other gpu load, exclude gpu_number + disabled_gpus: list[int] = list(filter(lambda item: item != gpu_number, list(range(gpu_count)))) + config = LlmLoadModelConfig( + gpu=GpuSetting(main_gpu=gpu_number, split_strategy="favorMainGpu", disabled_gpus=disabled_gpus), + context_length=options["use_library"]["model_context_length"]) + logger.info("LM Studio load model: " + model_name_parallel) + client.llm.load_new_instance(model_name_param, model_name_parallel, config=config, ttl=None) + + # llm_model_list.append(lmstudio.llm(model_name_parallel)) + llm_model_list_names.append(model_name_parallel) + + logger.info("LM Studio load models: " + str(llm_model_list_names)) + + global executor + executor = ThreadPoolExecutor(max_workers=len(llm_model_list_names), + thread_name_prefix=parallel_process.executor_translate_prefix) + + global model_name + model_name = model_name_param.lower() + + def init(core: AppCore) -> TranslatePluginInitInfo: options = core.plugin_options(plugin_name) custom_url: str = options['custom_url'] - use_library_for_request = options["use_library_for_request"] + use_library_for_request = options["use_library"]["enabled"] global model_name if use_library_for_request: lmstudio.configure_default_client(custom_url.replace("http://", "")) - loaded_models = lmstudio.list_loaded_models("llm") - if len(loaded_models) > 0: - model_name = loaded_models[0].identifier.lower() - global llm_model - llm_model = lmstudio.llm(model_name) + if options['parallel_processing']["enabled"]: + # if enabled parallel_processing, check loaded models, try to load, if needed model doesn't exist + init_parallel_processing(options) else: - raise ValueError('List loaded models is empty. Please load model before init this plugin') + # if disabled parallel_processing, check loaded models and get name, if found + loaded_models = lmstudio.list_loaded_models("llm") + if len(loaded_models) > 0: # found loaded model - use it + llm_model_name = loaded_models[0].identifier + llm_model_list_names.append(llm_model_name) + model_name = llm_model_name.lower() + elif options['use_library']['model'] != "": # loaded model not found - try to load + model_name = options['use_library']['model'] + client = lmstudio.get_default_client() + config = LlmLoadModelConfig(context_length=options["use_library"]["model_context_length"]) + logger.info("LM Studio load model: " + model_name) + client.llm.load_new_instance(model_name, model_name, config=config, ttl=None) + else: # loaded model not found - and not model to load - error + raise ValueError('List loaded models is empty. Please load model before init this plugin') else: postfix = translate_func.get_prompt_postfix(options["prompt_postfix"], options['prompt_no_think_postfix']) prompt = "You are assistant. " + postfix req = translate_func.get_open_ai_request(prompt, "init") resp = translate_func.post_request(req, options['custom_url'] + "/v1/chat/completions") - model_name = model_name=resp["model"].lower() + model_name = model_name = resp["model"].lower() return TranslatePluginInitInfo(plugin_name=plugin_name, model_name=model_name) @@ -83,13 +139,36 @@ def translate(core: AppCore, ts: TranslateStruct) -> TranslateStruct: to_lang_name=to_lang_name, postfix_param=options["prompt_postfix"], prompt_no_think_postfix_param=options['prompt_no_think_postfix'], context=ts.req.context, ) - use_library_for_request = options["use_library_for_request"] + use_library_for_request = options["use_library"]["enabled"] + # check params and not already parallel work in file processing task + parallel_process_enabled: bool = (use_library_for_request and options['parallel_processing']["enabled"] + and parallel_process.is_main_thread()) - for part in tqdm(ts.parts, unit=params.tp.unit, ascii=params.tp.ascii, desc=params.tp.desc): - if part.need_to_translate(): - content: str + if parallel_process_enabled: + # first pass - prepare lists of params + params_prompt: list[str] = list() + params_text: list[str] = list() + params_part_num: list[int] = list() + for part_num, part in enumerate(ts.parts): + if part.need_to_translate(): + params_prompt.append(prompt) + params_text.append(part.text) + params_part_num.append(part_num) + + # second pass - async execute and get list of results + async_results: list[parallel_process.AsyncResult] = list(tqdm(executor.map( + library_request, params_prompt, params_text, params_part_num), total=len(ts.parts), + unit=params.tp.unit, ascii=params.tp.ascii, desc=params.tp.desc)) + + # third pass - set translate to part by part_num + for async_result in async_results: + ts.parts[async_result.part_num].translate = async_result.content + else: + for part in tqdm(ts.parts, unit=params.tp.unit, ascii=params.tp.ascii, desc=params.tp.desc): + if part.need_to_translate(): + content: str if use_library_for_request: - content = library_request(llm_model, prompt, part.text) + content = library_request(prompt, part.text).content else: req = translate_func.get_open_ai_request(prompt, part.text) resp = translate_func.post_request(req, options['custom_url'] + "/v1/chat/completions") @@ -100,9 +179,17 @@ def translate(core: AppCore, ts: TranslateStruct) -> TranslateStruct: return ts -def library_request(model: LLM, prompt: str, text: str) -> str: +def library_request(prompt: str, text: str, part_num: int = 0) -> parallel_process.AsyncResult: + # print(f"pid {os.getpid()} ({multiprocessing.current_process().name}) thread: {threading.current_thread().name}") + + thread_num = parallel_process.thread_num() + if thread_num is None: + model = lmstudio.llm(model_name) + else: + model = lmstudio.llm(llm_model_list_names[thread_num]) + chat = lmstudio.Chat(prompt) chat.add_user_message(text) result = model.respond(chat, config=LlmPredictionConfig(temperature=0.0)) - return result.content + return parallel_process.AsyncResult(content=result.content, model=model.identifier, part_num=part_num) diff --git a/tests/test_async.py b/tests/test_async.py new file mode 100644 index 0000000..3d7f244 --- /dev/null +++ b/tests/test_async.py @@ -0,0 +1,37 @@ +import multiprocessing +import os +import threading +import time +from unittest import TestCase + +# gpu - 0, 3. workers - 1, 2. + +#[00:07<00:00, 1.09part/s] +#[00:07<00:00, 1.08part/s] + +def print_name(param): + process_id = os.getpid() + # Получаем имя процесса + process_name = multiprocessing.current_process().name + print(f"Обрабатываем {param} в процессе {os.getpid()} ({multiprocessing.current_process().name}) thread: {threading.current_thread().name}") + + +def calculate_parameter(param): + print_name(param) + + time.sleep(3) # Имитация I/O операции + return param ** 2 + + +class StructTest(TestCase): + def test_n1(self): + print_name("main") + num_processes = 3 + worker_names = [f"CustomWorker-{i}" for i in range(num_processes)] + + pool: multiprocessing.Pool = multiprocessing.Pool(processes=num_processes, initargs=(worker_names[0],)) + results = pool.map(calculate_parameter, range(1,9)) + print(results) + + +