Files processing - books

* book translate

* files processing

* files processing

* files processing

* files processing

---------

Co-authored-by: APodoinikov <APodoynikov@detmir.ru>
This commit is contained in:
illian64 2025-09-04 11:09:29 +07:00 committed by GitHub
parent e9e0e647f7
commit 0a70da3b98
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 1737 additions and 493 deletions

3
.gitignore vendored
View file

@ -9,4 +9,5 @@ test/_trial_temp
/.idea/
/options/
/models/
venv
venv
__pycache__

40
app.py
View file

@ -1,17 +1,16 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
import uvicorn
import logging
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI
from starlette.staticfiles import StaticFiles
from app.app_core import AppCore
from app.cuda import cuda_info
from app.dto import TranslateReq
from app.struct import Request
from app.dto import TranslateReq, TranslateCommonRequest, TranslateResp, ProcessingFileDirReq, ProcessingFileDirResp, \
ProcessingFileDirListResp
from app.properties import Properties
core: AppCore
logger = logging.getLogger('uvicorn')
@ -22,7 +21,7 @@ async def lifespan(fast_api: FastAPI):
logger.info("Starting llm-translate")
global core
core = AppCore()
core.init_with_plugins()
core.init_with_translate_plugins()
yield
logger.info("Stopping llm-translate")
@ -33,9 +32,10 @@ properties = Properties()
@app.get("/translate")
async def translate_get(text: str, from_lang: str = "", to_lang: str = "", translator_plugin: str = ""):
async def translate_get(text: str, from_lang: str = "", to_lang: str = "",
translator_plugin: str = "") -> TranslateResp:
"""
Return translation
Translate text.
:param str text: text to translate
@ -48,26 +48,34 @@ async def translate_get(text: str, from_lang: str = "", to_lang: str = "", trans
:param str translator_plugin: to use. If blank, default will be used.
If not initialized (not in "default_translate_plugin" and not in "init_on_start" from options - throw error)
:param str api_key: api key for access (if service setup in security mode with api keys)
:return: dict (result: text)
"""
"""
request = Request(text, from_lang, to_lang, translator_plugin)
request = TranslateCommonRequest(text, from_lang, to_lang, translator_plugin)
return core.translate(request)
@app.post("/translate")
async def translate_post(req: TranslateReq):
request = Request(req.text, req.from_lang, req.to_lang, req.translator_plugin)
async def translate_post(req: TranslateReq) -> TranslateResp:
request = TranslateCommonRequest(req.text, req.from_lang, req.to_lang, req.translator_plugin)
return core.translate(request)
@app.get("/process-files-list")
async def process_files_list(recursive_sub_dirs: bool) -> ProcessingFileDirListResp:
return core.process_files_list(recursive_sub_dirs)
@app.post("/process-files")
async def process_files(req: ProcessingFileDirReq) -> ProcessingFileDirResp:
return core.process_files(req)
if __name__ == "__main__":
log_config = uvicorn.config.LOGGING_CONFIG
log_config["formatters"]["access"]["fmt"] = "%(asctime)s %(levelname)s %(message)s"
log_config["formatters"]["default"]["fmt"] = "%(asctime)s %(levelname)s %(message)s"
app.mount('/', StaticFiles(directory='static', html=True), name='static')
uvicorn.run(app, host="127.0.0.1", port=properties.port, log_level="info", log_config=log_config, use_colors=False)
uvicorn.run(app, host="127.0.0.1", port=properties.port, log_level="info", log_config=log_config, use_colors=False)

View file

@ -1,11 +1,16 @@
import logging
import os
import time
import traceback
from os import walk
from app import text_splitter
from app import text_splitter, file_processor
from app.cache import Cache
from app.dto import TranslateResp
from app.struct import TranslateStruct, TranslationParams, TextSplitParams, TextProcessParams, Request, Part, \
CacheParams
from app.dto import TranslateResp, ProcessingFileDirReq, \
ProcessingFileDirResp, TranslatePluginInitInfo, Part, TranslateStruct, FileProcessingPluginInitInfo, \
TranslateCommonRequest, ProcessingFileResp, ProcessingFileStruct, ProcessingFileStatus, ProcessingFileDirListResp, \
ProcessingFileDirListItemIn, ProcessingFileDirListItemOut
from app.params import TranslationParams, TextSplitParams, TextProcessParams, CacheParams, FileProcessingParams
from app.text_processor import pre_process
from jaa import JaaCore
@ -21,22 +26,39 @@ class AppCore(JaaCore):
self.init_on_start = ""
self.translation_params = TranslationParams("", "")
self.text_split_params: TextSplitParams = None
self.text_process_params: TextProcessParams = None
self.cache_params: CacheParams = None
self.text_split_params: TextSplitParams | None = None
self.text_process_params: TextProcessParams | None = None
self.cache_params: CacheParams | None = None
self.file_processing_params: FileProcessingParams | None = None
self.translators: dict = {}
self.initialized_translator_engines = dict()
self.cache: Cache = None
self.initialized_translator_engines: dict[str, TranslatePluginInitInfo] = dict()
self.cache: Cache | None = None
self.files_ext_to_processors: dict[str, list[FileProcessingPluginInitInfo]] = dict()
self.sleep_after_translate: float = 0.0
def process_plugin_manifest(self, modname, manifest):
if "translate" in manifest: # process commands
if "translate" in manifest: # collect translate plugins
for cmd in manifest["translate"].keys():
self.translators[cmd] = manifest["translate"][cmd]
if "file_processing" in manifest and manifest["options"]["enabled"]: # collect file processing plugins
for cmd in manifest["file_processing"].keys():
init_info: FileProcessingPluginInitInfo = manifest["file_processing"][cmd][0](self) # init call
init_info.name = cmd
init_info.processing_function = manifest["file_processing"][cmd][1]
init_info.processed_file_name_function = manifest["file_processing"][cmd][2]
logger.info("Init file processing plugin '%s' for next file extensions: %s",
init_info.name, init_info.supported_extensions)
for ext in init_info.supported_extensions:
ext_list = self.files_ext_to_processors.get(ext, list())
ext_list.append(init_info)
self.files_ext_to_processors[ext] = ext_list
return manifest
def init_with_plugins(self):
def init_with_translate_plugins(self) -> None:
self.init_plugins(["core"])
self.cache = Cache(self.cache_params)
@ -44,68 +66,81 @@ class AppCore(JaaCore):
self.init_translator_engine(self.default_translate_plugin)
init_on_start_list = self.init_on_start.replace(" ", "").split(",")
init_on_start_list = self.init_on_start.replace(" ", "").split(",") # TODO to array
for translator in init_on_start_list:
if translator != "":
self.init_translator_engine(translator)
logger.info("Found translation engines: %s", ", ".join(str(key) for key in self.translators.keys()))
def init_translator_engine(self, translator_engine: str):
def init_translator_engine(self, translator_engine: str) -> None:
if translator_engine in self.initialized_translator_engines:
# already inited
return
try:
logger.info("Try to init translation plugin '%s'...", translator_engine)
modname = self.translators[translator_engine][0](self)
self.initialized_translator_engines[translator_engine] = modname
model_init_info: TranslatePluginInitInfo = self.translators[translator_engine][0](self)
self.initialized_translator_engines[translator_engine] = model_init_info
logger.info("Success init translation plugin: '%s'.", translator_engine)
except Exception as e:
logger.error("Error init translation plugin '%s'...", translator_engine, e)
def get_plugin_options(self, translator_engine: str):
modname = self.initialized_translator_engines[translator_engine]
return self.plugin_options(modname)
def get_translation_params(self, translator_engine: str):
options = self.get_plugin_options(translator_engine)
if options['translation_params_struct']:
return options['translation_params_struct']
def get_translation_params(self, plugin_name: str) -> TranslationParams:
options = self.plugin_options(plugin_name)
if options and options.get('translation_params_struct'):
return options.get('translation_params_struct')
else:
return self.translation_params
def get_text_split_params(self, translator_engine: str):
options = self.get_plugin_options(translator_engine)
if options['text_split_params_struct']:
return options['text_split_params_struct']
def get_text_split_params(self, plugin_name: str) -> TextSplitParams:
options = self.plugin_options(plugin_name)
if options and options.get('text_split_params_struct'):
return options.get('text_split_params_struct')
else:
return self.text_split_params
def get_text_process_params(self, translator_engine: str):
options = self.get_plugin_options(translator_engine)
if options['text_process_params_struct']:
return options['text_process_params_struct']
def get_text_process_params(self, plugin_name: str) -> TextProcessParams:
options = self.plugin_options(plugin_name)
if options and options.get('text_process_params_struct'):
return options.get('text_process_params_struct')
else:
return self.text_process_params
def translate(self, req: Request):
def get_translator_plugin(self, req_plugin: str) -> str:
translator_plugin: str
if not req_plugin or req_plugin == "":
translator_plugin = self.default_translate_plugin
else:
translator_plugin = req_plugin
if translator_plugin not in self.initialized_translator_engines:
raise ValueError("This translate_plugin not in initialized: " + translator_plugin)
return translator_plugin
def get_from_language(self, req_lang: str, plugin_name: str) -> str:
if req_lang == "" or req_lang == "--":
return self.get_translation_params(plugin_name).default_from_lang
else:
return req_lang
def get_to_language(self, req_lang: str, plugin_name: str) -> str:
if req_lang == "" or req_lang == "--":
return self.get_translation_params(plugin_name).default_to_lang
else:
return req_lang
def translate(self, req: TranslateCommonRequest) -> TranslateResp:
if req.text == '':
return TranslateResp(result='', parts=[], error=None)
try:
if not req.translator_plugin or req.translator_plugin == "":
req.translator_plugin = self.default_translate_plugin
if req.translator_plugin not in self.initialized_translator_engines:
raise ValueError("This translate_plugin not in initialized: " + req.translator_plugin)
if req.from_lang == "":
req.from_lang = self.get_translation_params(req.translator_plugin).default_from_lang
if req.to_lang == "":
req.to_lang = self.get_translation_params(req.translator_plugin).default_to_lang
req.translator_plugin = self.get_translator_plugin(req.translator_plugin)
plugin_info = self.initialized_translator_engines[req.translator_plugin]
req.from_lang = self.get_from_language(req.from_lang, plugin_info.plugin_name)
req.to_lang = self.get_to_language(req.to_lang, plugin_info.plugin_name)
processed_text: str
if self.get_text_process_params(req.translator_plugin).apply_for_request:
@ -116,12 +151,18 @@ class AppCore(JaaCore):
text_parts: list[Part] = text_splitter.split_text(processed_text,
self.get_text_split_params(req.translator_plugin),
req.from_lang)
self.cache_read(req, text_parts)
for text_part in text_parts:
if not text_part.need_to_translate():
text_part.translate = text_part.text
self.cache.cache_read(req, text_parts, self.cache_params, plugin_info.model_name)
translate_struct = TranslateStruct(req=req, processed_text=processed_text, parts=text_parts)
translate_struct: TranslateStruct = self.translators[req.translator_plugin][1](self, translate_struct)
self.cache_write(req, translate_struct.parts)
if translate_struct.need_to_translate():
translate_struct: TranslateStruct = self.translators[req.translator_plugin][1](self, translate_struct)
self.cache.cache_write(req, translate_struct.parts, self.cache_params, plugin_info.model_name)
if self.sleep_after_translate > 0:
time.sleep(self.sleep_after_translate)
(translate_text, translate_parts) = text_splitter.join_text(translate_struct.parts)
@ -137,19 +178,143 @@ class AppCore(JaaCore):
traceback.print_tb(e.__traceback__, limit=10)
return TranslateResp(result=None, parts=None, error=getattr(e, 'message', repr(e)))
def cache_read(self, req: Request, parts: list[Part]):
if self.cache_params.enabled and req.translator_plugin not in self.cache_params.disable_for_plugins:
for part in parts:
if part.need_to_translate():
cached_translate = self.cache.get(req, part.text)
if cached_translate:
part.cache_found = True
part.translate = cached_translate
else:
part.cache_found = False
def process_files_list(self, recursive_sub_dirs: bool) -> ProcessingFileDirListResp:
files_in: list[ProcessingFileDirListItemIn] = []
for root, dirs, file_names in os.walk(self.file_processing_params.directory_in):
for file_name in file_names:
name, extension = os.path.splitext(file_name)
extension = extension.lower().replace(".", "")
processor_name = None
file_processor_error = None
try:
processor = self.get_file_processor(extension, None)
if processor:
processor_name = processor.name
except ValueError as ve:
file_processor_error = "error: " + ve.args[0]
def cache_write(self, req: Request, parts: list[Part]):
if self.cache_params.enabled and req.translator_plugin not in self.cache_params.disable_for_plugins:
for part in parts:
if part.need_to_translate() and not part.cache_found:
self.cache.put(req, part.text, part.translate)
files_in.append(ProcessingFileDirListItemIn(
file_with_path=file_processor.get_file_with_path_for_list(
self.file_processing_params.directory_in, root.replace(os.sep, "/"), file_name),
file_processor=processor_name, file_processor_error=file_processor_error))
if not recursive_sub_dirs:
break
# output directory files list
files_out: list[ProcessingFileDirListItemOut] = []
for root, dirs, file_names in walk(self.file_processing_params.directory_out):
for file_name in file_names:
files_out.append(ProcessingFileDirListItemOut(
file_with_path=file_processor.get_file_with_path_for_list(self.file_processing_params.directory_out,
root.replace(os.sep, "/"), file_name)))
if not recursive_sub_dirs:
break
return ProcessingFileDirListResp(files_in=files_in, files_out=files_out,
directory_in=self.file_processing_params.directory_in,
directory_out=self.file_processing_params.directory_out,
error=None)
def process_files(self, req: ProcessingFileDirReq) -> ProcessingFileDirResp:
try:
req.translator_plugin = self.get_translator_plugin(req.translator_plugin)
plugin_name = self.initialized_translator_engines[req.translator_plugin].plugin_name
req.from_lang = self.get_from_language(req.from_lang, plugin_name)
req.to_lang = self.get_to_language(req.to_lang, plugin_name)
if not req.directory_in or req.directory_in == "":
req.directory_in = self.file_processing_params.directory_in
if not req.directory_out or req.directory_out == "":
req.directory_out = self.file_processing_params.directory_out
if req.preserve_original_text is None:
req.preserve_original_text = self.file_processing_params.preserve_original_text
if req.overwrite_processed_files is None:
req.overwrite_processed_files = self.file_processing_params.overwrite_processed_files
files: list[ProcessingFileResp] = []
for root, dirs, file_names in walk(req.directory_in):
for file_name in file_names:
files.append(self.process_file(req, root, file_name))
if not req.recursive_sub_dirs:
break
return ProcessingFileDirResp(files, "")
except ValueError as ve:
return ProcessingFileDirResp(files=list(), error=ve.args[0])
except Exception as e:
traceback.print_tb(e.__traceback__, limit=10)
return ProcessingFileDirResp(files=list(), error=getattr(e, 'message', repr(e)))
def process_file(self, req: ProcessingFileDirReq, root: str, file_name: str) -> ProcessingFileResp:
try:
name, extension = os.path.splitext(file_name)
# try to find processor
extension = extension.lower().replace(".", "")
req_processor = req.file_processors.get(extension) if req.file_processors else None
processor = self.get_file_processor(extension, req_processor)
if processor is None:
return ProcessingFileResp(file_in=file_name, file_out="",
path_file_in=f'{root}/{file_name}'.replace(os.sep, "/"),
path_file_out=None, status=ProcessingFileStatus.TYPE_NOT_SUPPORT,
file_processor="", message=None)
# calculate output path and validate file exists (depend on request)
path_out = root.replace(req.directory_in, req.directory_out)
file_struct = ProcessingFileStruct(
path_in=root, path_out=path_out, file_name=name,
file_ext=extension, file_name_ext=file_name, file_processor=processor.name)
processed_file_name = processor.processed_file_name_function(self, file_struct, req)
if (not req.overwrite_processed_files
and os.path.isfile(f'{path_out}/{processed_file_name}')):
return file_processor.get_processing_file_resp(file_struct=file_struct, file_out=processed_file_name,
file_processor=processor.name,
status=ProcessingFileStatus.TRANSLATE_ALREADY_EXISTS)
else:
logger.info("Start processing file %s/%s", root.replace(os.sep, "/"), file_name)
os.makedirs(file_struct.path_out, exist_ok=True) # make output directory structure
return processor.processing_function(self, file_struct, req)
except ValueError as ve:
return file_processor.get_processing_file_resp_error(file_in=file_name, path_in=root, error_msg=ve.args[0])
except Exception as e:
traceback.print_tb(e.__traceback__, limit=10)
return file_processor.get_processing_file_resp_error(file_in=file_name, path_in=root, error_msg=repr(e))
def get_file_processor(self, extension: str, req_processor: str | None) -> FileProcessingPluginInitInfo | None:
if not extension or extension == "": # skip files without extension
return None
processors: list[FileProcessingPluginInitInfo] = self.files_ext_to_processors.get(extension, None)
if not processors:
return None
if req_processor: # try to find processor by name from request (if set)
for processor in processors:
if processor.name == req_processor:
return processor
if req_processor:
raise ValueError(f'Not found processor with name from request: {req_processor} for extension {extension}')
if len(processors) == 1: # only one processor found - ok, return it
return processors[0]
# try to find default processor
default_processors_list: list[FileProcessingPluginInitInfo] = []
for processor in processors:
options = self.plugin_options(processor.plugin_name)
if options and options.get('default_extension_processor'):
default_processors_list.append(processor)
if len(default_processors_list) == 1: # only one default processor found - return it
return default_processors_list[0]
elif len(default_processors_list) > 1: # find more than one default processors - error
processor_names = map(lambda p: p.name, default_processors_list)
raise ValueError(f'Found more than one default processor {processor_names} for extension: {extension}')
processor_names = map(lambda p: p.name, processors) # find more than one processor, without default - error
raise ValueError(f'Found more than one not default processors {processor_names} for extension: {extension}')

View file

@ -0,0 +1,54 @@
import logging
import ebooklib
from app.struct import TranslateBook, Request
from bs4 import BeautifulSoup
from ebooklib import epub
from tqdm import tqdm
from app.app_core import AppCore
from app.dto import TranslateBookItemStatus
from app.params import tp
logger = logging.getLogger('uvicorn')
tag_headers = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']
tag_text = ['p']
class BookEpubTranslate:
def translate_book(self, translate_func, req: TranslateBook, output_file_name: str) -> TranslateBookItemStatus:
book = epub.read_epub(req.file)
for item in book.get_items():
logger.info("Translate item with id %s", item.get_id())
if item.get_type() == ebooklib.ITEM_DOCUMENT and item.get_id() == "item_1":
content = BeautifulSoup(item.get_content(), features="xml")
for child in tqdm(content.descendants, unit=tp.unit, ascii=tp.ascii, desc=tp.desc):
if child and child.text and child.parent:
if child.parent.name and child.parent.string and (child.parent.name in tag_text or child.parent.name in tag_headers):
text = child.parent.string
translated_text = self.translate_text(core, req, text)
if child.parent.name in tag_text:
if req.preserve_original_text:
translate_tag = content.new_tag(child.parent.name)
translate_tag.string = translated_text
child.insert_after(translate_tag)
else:
child.parent.string = translated_text
if child.parent.name in tag_headers:
if req.preserve_original_text:
child.parent.string = f'{child.parent.string} / {translated_text}'
else:
child.parent.string = translated_text
item.set_content(content.encode())
epub.write_epub(file[:len(file) - 4] + "__translate.epub", book, {})
def translate_text(self, core: AppCore, req: TranslateBook, text: str) -> str:
translate_result = core.translate(Request(text=text, from_lang=req.from_lang, to_lang=req.to_lang,
translator_plugin=req.translator_plugin))
return translate_result.result

50
app/books_translate.py Normal file
View file

@ -0,0 +1,50 @@
import logging
import os
from os import walk
from app.dto import TranslateBookDirReq, TranslateBookDirResp, TranslateBookItem, TranslateBookItemStatus
logger = logging.getLogger('uvicorn')
class BookDirectoryTranslate:
supported_extensions = ['epub']
overwrite_exists_translated_books = True
def __init__(self, translate_func):
self.translate_func = translate_func
def translate(self, req: TranslateBookDirReq) -> TranslateBookDirResp:
filenames: list[str] = []
for dir_path, dir_names, filenames in walk(req.directory_in):
break
if not filenames:
return TranslateBookDirResp([], "")
books: list[TranslateBookItem] = []
for filename in filenames:
books.append(self.process_file(req, filename))
def process_file(self, req: TranslateBookDirReq, filename: str) -> TranslateBookItem:
name, extension = os.path.splitext(filename)
if extension in self.supported_extensions:
translate_book_file_name = self.get_translate_book_file_name(req, name, extension)
if not self.overwrite_exists_translated_books and os.path.isfile(f'{req.directory_out}/{translate_book_file_name}'):
return TranslateBookItem(f'{req.directory_in}/{filename}', "", TranslateBookItemStatus.translate_already_exists)
else:
if extension == 'epub':
pass #TODO fix
else:
return TranslateBookItem(f'{req.directory_in}/{name}.{extension}', "", TranslateBookItemStatus.type_not_support)
def get_translate_book_file_name(self, req: TranslateBookDirReq, name: str, extension: str) -> str:
from_lang_part = "_" + req.from_lang if req.preserve_original_text else ""
return f'{name}__{from_lang_part}_{req.to_lang}.{extension}'

View file

@ -1,7 +1,8 @@
import logging
import sqlite3
from app.struct import CacheParams, Request
from app.dto import TranslateCommonRequest, Part
from app.params import CacheParams
logger = logging.getLogger('uvicorn')
@ -9,11 +10,9 @@ logger = logging.getLogger('uvicorn')
class Cache:
cache_table_name = "cache_translate"
params: CacheParams
connection: sqlite3.Connection
def __init__(self, params: CacheParams):
self.params = params
self.connection = self.get_connection()
self.init()
def get_connection(self):
@ -23,53 +22,73 @@ class Cache:
if not self.params.enabled:
return None
cursor = self.connection.cursor()
connection = self.get_connection()
cursor = connection.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='{0}'".format(self.cache_table_name))
table_exists = cursor.fetchall()
cursor.connection.commit()
if len(table_exists) == 0:
logger.info("Init cache table: %s, file db: %s", self.cache_table_name, self.params.file)
create_table = """
CREATE TABLE IF NOT EXISTS {0}
(key TEXT NOT NULL, created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
from_lang TEXT NOT NULL, to_lang TEXT NOT NULL, plugin TEXT NOT NULL, value TEXT NOT NULL)
from_lang TEXT NOT NULL, to_lang TEXT NOT NULL, plugin TEXT NOT NULL,
model TEXT NOT NULL, value TEXT NOT NULL)
""".format(self.cache_table_name)
create_idx_translate_cols = ('CREATE UNIQUE INDEX IF NOT EXISTS idx_translate_cols '
'ON {0} (key, from_lang, to_lang, plugin)').format(self.cache_table_name)
'ON {0} (key, from_lang, to_lang, plugin, model)').format(self.cache_table_name)
create_idx_created = ('CREATE INDEX IF NOT EXISTS idx_created '
'ON {0} (created)').format(self.cache_table_name)
with cursor:
cursor.execute(create_table)
cursor.execute(create_idx_translate_cols)
cursor.execute(create_idx_created)
cursor.execute(create_table)
cursor.execute(create_idx_translate_cols)
cursor.execute(create_idx_created)
else:
if (self.params.expire_days > 0):
delete_expired_values = "DELETE FROM {0} WHERE created < date('now', '-{1} day')".format(
self.cache_table_name, self.params.expire_days)
cursor.execute(delete_expired_values)
self.connection.commit()
connection.commit()
def get(self, req: Request, text: str):
select = "SELECT value FROM {0} WHERE key = ? AND from_lang = ? AND to_lang = ? AND plugin = ?".format(
def get(self, req: TranslateCommonRequest, text: str, model_name: str):
select = ("SELECT value FROM {0} "
"WHERE key = ? AND from_lang = ? AND to_lang = ? AND plugin = ? AND model = ?").format(
self.cache_table_name)
cursor = self.connection.cursor()
cursor.execute(select, (text, req.from_lang, req.to_lang, req.translator_plugin))
cursor = self.get_connection().cursor()
cursor.execute(select, (text, req.from_lang, req.to_lang, req.translator_plugin, model_name))
value = cursor.fetchone()
if value:
return value[0]
else:
return None
def put(self, req: Request, text: str, value: str):
def put(self, req: TranslateCommonRequest, text: str, value: str, model_name: str):
try:
insert_connection = self.get_connection()
cursor = insert_connection.cursor()
cursor.execute('INSERT INTO {0} (KEY, from_lang, to_lang, plugin, VALUE) VALUES (?, ?, ?, ?, ?)'.format(
self.cache_table_name),(text, req.from_lang, req.to_lang, req.translator_plugin, value))
insert = 'INSERT INTO {0} (KEY, from_lang, to_lang, plugin, model, VALUE) VALUES (?, ?, ?, ?, ?, ?)'.format(self.cache_table_name)
cursor.execute(insert,(text, req.from_lang, req.to_lang, req.translator_plugin, model_name, value))
insert_connection.commit()
insert_connection.close()
except Exception as e:
logger.error("Error save cache entry, text = %s, req = %s, error=%s", text, req, e)
def cache_read(self, req: TranslateCommonRequest, parts: list[Part], params: CacheParams, model_name: str):
if params.enabled and req.translator_plugin not in params.disable_for_plugins:
for part in parts:
if part.need_to_translate():
cached_translate = self.get(req, part.text, model_name)
if cached_translate:
part.cache_found = True
part.translate = cached_translate
else:
part.cache_found = False
def cache_write(self, req: TranslateCommonRequest, parts: list[Part], params: CacheParams, model_name: str):
if params.enabled and req.translator_plugin not in params.disable_for_plugins:
for part in parts:
if part.need_to_translate() and not part.cache_found:
self.put(req, part.text, part.translate, model_name)

View file

@ -5,7 +5,7 @@ import torch
logger = logging.getLogger('uvicorn')
def cuda_info():
def cuda_info() -> None:
cuda_is_available = torch.cuda.is_available()
device_count = torch.cuda.device_count()
current_device = torch.cuda.current_device()
@ -17,14 +17,15 @@ def cuda_info():
logger.info("GPU #%d: %s", i, torch.cuda.get_device_name(i))
def get_device(options: dict):
def get_device(options: dict) -> str:
cuda_opt = options["cuda"]
if cuda_opt:
return "cuda"
else:
return "cpu"
def get_device_with_gpu_num(options: dict):
def get_device_with_gpu_num(options: dict) -> str:
cuda_opt = options["cuda"]
if cuda_opt:
return "cuda:{0}".format(options["cuda_device_index"])

View file

@ -1,4 +1,7 @@
import enum
import os
from dataclasses import dataclass
from typing import Callable, Any
from pydantic import BaseModel
@ -10,6 +13,46 @@ class TranslateReq(BaseModel):
translator_plugin: str | None = ""
@dataclass
class TranslateCommonRequest:
text: str
from_lang: str | None
to_lang: str | None
translator_plugin: str | None
class ProcessingFileDirReq(BaseModel):
from_lang: str | None = ""
to_lang: str | None = ""
translator_plugin: str | None = ""
preserve_original_text: bool
directory_in: str | None = None
directory_out: str | None = None
file_processors: dict[str, str] | None
overwrite_processed_files: bool | None
recursive_sub_dirs: bool
def translate_req(self, text: str) -> TranslateCommonRequest:
return TranslateCommonRequest(text=text, from_lang=self.from_lang, to_lang=self.to_lang,
translator_plugin=self.translator_plugin)
@dataclass
class ProcessingFileStruct:
path_in: str
path_out: str
file_name: str
file_ext: str
file_name_ext: str
file_processor: str
def path_file_in(self) -> str:
return f'{self.path_in}{os.sep}{self.file_name_ext}'
def path_file_out(self, out_file_name_ext: str) -> str:
return f'{self.path_out}{os.sep}{out_file_name_ext}'
@dataclass
class TranslatePart:
text: str
@ -23,3 +66,107 @@ class TranslateResp:
parts: list[TranslatePart] | None
error: str | None
class ProcessingFileStatus(enum.Enum):
OK = "OK"
ERROR = "ERROR"
TRANSLATE_ALREADY_EXISTS = "TRANSLATE_ALREADY_EXISTS"
TYPE_NOT_SUPPORT = "TYPE_NOT_SUPPORT"
@dataclass
class ProcessingFileResp:
file_in: str
file_out: str | None
path_file_in: str
path_file_out: str | None
status: ProcessingFileStatus
file_processor: str | None
message: str | None
@dataclass
class ProcessingFileDirResp:
files: list[ProcessingFileResp] | None
error: str | None
@dataclass
class ProcessingFileDirListItemIn:
file_with_path: str
file_processor: str | None
file_processor_error: str | None
@dataclass
class ProcessingFileDirListItemOut:
file_with_path: str
@dataclass
class ProcessingFileDirListResp:
files_in: list[ProcessingFileDirListItemIn]
files_out: list[ProcessingFileDirListItemOut]
directory_in: str
directory_out: str
error: str | None
@dataclass
class TranslatePluginInitInfo:
plugin_name: str
model_name: str
# todo translate_function: Callable[[...], ...]
@dataclass
class FileProcessingPluginInitInfo:
name: str
plugin_name: str
processing_function: Callable[[Any, ProcessingFileStruct, ProcessingFileDirReq], ProcessingFileResp]
processed_file_name_function: Callable[[Any, ProcessingFileStruct, ProcessingFileDirReq], str]
supported_extensions: set[str] # lower case
def __init__(self, plugin_name: str, supported_extensions: set[str]):
self.plugin_name = plugin_name
self.supported_extensions = supported_extensions
@dataclass
class Part:
text: str
translate: str
paragraph_end: bool
cache_found: bool
def is_contains_alpha(self) -> bool:
if any(letter.isalpha() for letter in self.text):
return True
return False
def need_to_translate(self):
return not self.cache_found and self.text and self.is_contains_alpha()
def __init__(self, text: str, paragraph_end: bool):
self.text = text
self.translate = ""
self.paragraph_end = paragraph_end
self.cache_found = False
@dataclass
class TranslateStruct:
req: TranslateCommonRequest
processed_text: str
parts: list[Part]
def need_to_translate(self) -> bool:
for part in self.parts:
if part.need_to_translate():
return True
return False

75
app/file_processor.py Normal file
View file

@ -0,0 +1,75 @@
import logging
import os
import chardet
from app.dto import ProcessingFileStruct, ProcessingFileDirReq, ProcessingFileResp, ProcessingFileStatus
logger = logging.getLogger('uvicorn')
def processed_file_name_def(file_struct: ProcessingFileStruct, req: ProcessingFileDirReq) -> str:
from_lang_part = "_" + req.from_lang if req.preserve_original_text else ""
return f'{file_struct.file_name}__{from_lang_part}_{req.to_lang}.{file_struct.file_ext}'
def file_name_from_template(file_struct: ProcessingFileStruct, req: ProcessingFileDirReq, options: dict) -> str:
"""
Generate output file name from template. Template in options, for preserve original and not.
Special parameters in template:
%source% - original file name
%from_lang% - source language
%to_lang% - target language
:param file_struct: struct with file info
:param req: file process request
:param template: template with special parameters
:return: output file name
"""
template_dict = options["output_file_name_template"]
template = template_dict["preserve_original"] if req.preserve_original_text else template_dict["without_original"]
return ((template.replace("%source%", file_struct.file_name)
.replace("%from_lang%", req.from_lang)
.replace("%to_lang%", req.to_lang))
+ "." + file_struct.file_ext)
def get_file_with_path_for_list(init_dir: str, root: str, file_name: str) -> str:
file_with_path = root.replace(init_dir, "") + "/" + file_name
return file_with_path[1:]
def get_processing_file_resp(file_struct: ProcessingFileStruct, file_out: str, file_processor: str,
status: ProcessingFileStatus, message: str | None = None) -> ProcessingFileResp:
return ProcessingFileResp(
file_in=file_struct.file_name, file_out=file_out,
path_file_in=file_struct.path_file_in().replace(os.sep, "/"),
path_file_out=file_struct.path_file_out(file_out).replace(os.sep, "/"),
status=status, file_processor=file_processor, message=message
)
def get_processing_file_resp_ok(file_struct: ProcessingFileStruct, file_out: str) -> ProcessingFileResp:
return get_processing_file_resp(
file_struct=file_struct, file_out=file_out,
status=ProcessingFileStatus.OK, file_processor=file_struct.file_processor
)
def get_processing_file_resp_error(file_in: str, path_in: str, error_msg: str) -> ProcessingFileResp:
return ProcessingFileResp(
file_in=file_in, path_file_in=f'{path_in}{os.sep}{file_in}', file_out=None, path_file_out=None,
file_processor=None, status=ProcessingFileStatus.ERROR, message=error_msg
)
def read_file_with_fix_encoding(path_file: str) -> str:
with open(path_file, "rb") as file:
content_raw = file.read()
encoding = chardet.detect(content_raw)['encoding']
if encoding.lower() != "utf-8":
logger.info("Charset encoding in file %s: %s",path_file, encoding)
return content_raw.decode(encoding=encoding, errors='ignore')
else:
return content_raw.decode(encoding="utf-8")

113
app/file_processor_html.py Normal file
View file

@ -0,0 +1,113 @@
from typing import Iterator
from bs4 import BeautifulSoup, PageElement, Tag, NavigableString
from app.app_core import AppCore
from app.dto import ProcessingFileDirReq
class FileProcessorHtml:
attribute_source = "data-src"
attribute_translate = "data-tr"
def __init__(self, core: AppCore, options: dict):
self.core = core
self.options = options
self.header_tags = options["header_tags"]
self.text_tags = options["text_tags"]
self.original_tag: str = options["text_format"]["original_tag"]
self.translate_tag: str = options["text_format"]["translate_tag"]
self.header_delimiter: str = options["text_format"]["header_delimiter"]
def get_translate_element(self, soup: BeautifulSoup, child: PageElement, translate_txt: str) -> Tag:
translate_element = soup.new_tag(child.parent.name)
translate_element[self.attribute_translate] = "t"
if self.translate_tag == "":
translate_element.string = translate_txt
else:
additional_tag_element = soup.new_tag(self.translate_tag)
additional_tag_element.string = translate_txt
translate_element.append(additional_tag_element)
return translate_element
def get_original_element(self, soup: BeautifulSoup, child: PageElement, original_text: str) -> None | Tag:
if self.original_tag == "":
return None
else:
original_element = soup.new_tag(child.parent.name)
additional_tag_element = soup.new_tag(self.original_tag)
additional_tag_element.string = original_text
original_element.append(additional_tag_element)
return original_element
def process(self, req: ProcessingFileDirReq, soup: BeautifulSoup, body_tag: str = None) -> None:
translate_only_first_paragraphs: int = self.options.get("translate_only_first_paragraphs", 0)
children: Iterator[PageElement] = soup.find(body_tag).descendants if body_tag else soup.descendants
translated_paragraphs = 0
for child in children:
if (child and child.text and child.parent and child.parent.get(self.attribute_source) is None
and child.parent.get(self.attribute_translate) is None):
child_tag = child.parent.name
if child_tag and child.parent.text and (child_tag in self.text_tags or child_tag in self.header_tags):
# get contents - for example <p><b>1</b>2<i>3</i><p> - 3 items. 1, 3 - tags, 2 - simple string
# contents = child.parent.contents - for translate with save format within paragraph
child.parent[self.attribute_source] = "1"
original_text = child.parent.text
translate_req = req.translate_req(original_text)
translate_txt = self.core.translate(translate_req).result
translated_paragraphs = translated_paragraphs + 1
if 0 < translate_only_first_paragraphs <= translated_paragraphs:
break
if child_tag in self.text_tags:
translate_element = self.get_translate_element(soup, child, translate_txt)
if req.preserve_original_text:
child.parent.insert_after(translate_element)
original_element = self.get_original_element(soup, child, original_text)
if original_element:
child.replaceWith(original_element)
else:
child.replaceWith(translate_element)
elif child_tag in self.header_tags:
if req.preserve_original_text:
child.parent.string = f'{original_text}{self.header_delimiter}{translate_txt}'
else:
child.parent.string = translate_txt
def process1(self, req: ProcessingFileDirReq, soup: BeautifulSoup, body_tag: str = None) -> None:
translate_only_first_paragraphs: int = self.options.get("translate_only_first_paragraphs", 0)
children: Iterator[PageElement] = soup.find(body_tag).descendants if body_tag else soup.descendants
translated_paragraphs = 0
for child in children:
if child and child.text and child.parent and child.parent.get(self.translated_attribute) is None:
child_tag = child.parent.name
is_simple_string = isinstance(child, NavigableString)
if is_simple_string and child_tag and child.parent.string and (child_tag in self.text_tags or child_tag in self.header_tags):
original_text = child.parent.string
translate_req = req.translate_req(original_text)
translate_txt = self.core.translate(translate_req).result
translated_paragraphs = translated_paragraphs + 1
if 0 < translate_only_first_paragraphs <= translated_paragraphs:
break
if child_tag in self.text_tags:
translate_element = self.get_translate_element(soup, child, translate_txt)
if req.preserve_original_text:
child.parent.insert_after(translate_element)
original_element = self.get_original_element(soup, child, original_text)
if original_element:
child.replaceWith(original_element)
else:
child.replaceWith(translate_element)
elif child_tag in self.header_tags:
if req.preserve_original_text:
child.parent.string = f'{original_text}{self.header_delimiter}{translate_txt}'
else:
child.parent.string = translate_txt

11
app/log.py Normal file
View file

@ -0,0 +1,11 @@
import logging
import traceback
def logger():
return logging.getLogger('uvicorn')
def log_exception(message: str, e: Exception) -> None:
traceback.print_tb(e.__traceback__, limit=10)
logging.error(message, str(e))

View file

@ -1,50 +1,4 @@
from dataclasses import dataclass, field
# dict_field: dict = field(default_factory=lambda: {})
@dataclass
class Request:
text: str
from_lang: str | None
to_lang: str | None
translator_plugin: str | None
@dataclass
class Sentence:
text: str
@dataclass
class Part:
text: str
translate: str
paragraph_end: bool
cache_found: bool
def is_numeric_or_empty(self):
processed_text = (self.text
.replace(" ", "")
.replace(",", "")
.replace(".", ""))
return processed_text.isnumeric() or len(processed_text) == 0
def need_to_translate(self):
return not self.cache_found and self.text and self.text != "" and not self.is_numeric_or_empty()
def __init__(self, text: str, paragraph_end: bool):
self.text = text
self.translate = ""
self.paragraph_end = paragraph_end
self.cache_found = False
@dataclass
class TranslateStruct:
req: Request
processed_text: str
parts: list[Part]
from dataclasses import dataclass
@dataclass
@ -65,7 +19,7 @@ class TextSplitParams:
# pysbd (default) / blingfire
sentence_splitter: str
def split_enabled(self):
def split_enabled(self) -> bool:
return (self.split_by_paragraphs_only or self.split_by_paragraphs_and_length
or self.split_by_sentences_and_length or self.split_by_sentences_only)
@ -96,6 +50,14 @@ class CacheParams:
expire_days: int
@dataclass
class FileProcessingParams:
directory_in: str
directory_out: str
preserve_original_text: bool
overwrite_processed_files: bool
@dataclass
class TranslateProgress:
unit: str
@ -103,16 +65,31 @@ class TranslateProgress:
desc: str
tp: TranslateProgress = TranslateProgress(unit="part", ascii=True, desc="translate parts: ")
@dataclass
class FileProcessingTextFormat:
original_prefix: str
original_postfix: str
translate_prefix: str
translate_postfix: str
def original_text(self, text: str) -> str:
return self.original_prefix + text + self.original_postfix
def translate_text(self, text: str) -> str:
return self.translate_prefix + text + self.translate_postfix
def read_plugin_params(manifest: dict):
def read_plugin_translate_params(manifest: dict):
manifest["options"]["translation_params_struct"] = read_translation_params(manifest)
manifest["options"]["text_split_params_struct"] = read_text_split_params(manifest)
manifest["options"]["text_process_params_struct"] = read_text_process_params(manifest)
def read_translation_params(manifest: dict):
def read_plugin_file_processing_params(manifest: dict):
manifest["options"]["translation_params_struct"] = read_translation_params(manifest)
def read_translation_params(manifest: dict) -> TranslationParams | None:
options = manifest["options"]
if "translation_params" not in options:
return None
@ -123,7 +100,7 @@ def read_translation_params(manifest: dict):
)
def read_text_split_params(manifest: dict):
def read_text_split_params(manifest: dict) -> TextSplitParams | None:
options = manifest["options"]
if "text_split_params" not in options:
@ -141,7 +118,7 @@ def read_text_split_params(manifest: dict):
)
def read_text_process_params(manifest: dict):
def read_text_process_params(manifest: dict) -> TextProcessParams | None:
options = manifest["options"]
if "text_processing_params" not in options:
@ -165,7 +142,7 @@ def read_text_process_params(manifest: dict):
)
def read_cache_params(manifest: dict):
def read_cache_params(manifest: dict) -> CacheParams:
options = manifest["options"]
return CacheParams(
@ -174,3 +151,28 @@ def read_cache_params(manifest: dict):
disable_for_plugins=options["cache_params"]["disable_for_plugins"],
expire_days=options["cache_params"]["expire_days"],
)
def read_file_processing_params(manifest: dict) -> FileProcessingParams | None:
options = manifest["options"]
if "file_processing_params" not in options:
return None
return FileProcessingParams(
directory_in=options["file_processing_params"]["directory_in"],
directory_out=options["file_processing_params"]["directory_out"],
preserve_original_text=options["file_processing_params"]["preserve_original_text"],
overwrite_processed_files=options["file_processing_params"]["overwrite_processed_files"],
)
def read_plugin_file_processing_text_format(options: dict):
return FileProcessingTextFormat(
original_prefix=options["text_format"]["original_prefix"],
original_postfix=options["text_format"]["original_postfix"],
translate_prefix=options["text_format"]["translate_prefix"],
translate_postfix=options["text_format"]["translate_postfix"],
)
tp: TranslateProgress = TranslateProgress(unit="part", ascii=True, desc="translate parts: ")

View file

@ -1,13 +1,12 @@
import logging
import re
from app.struct import TextProcessParams
from app.params import TextProcessParams
logger = logging.getLogger('uvicorn')
def pre_process(params: TextProcessParams, original_text: str):
def pre_process(params: TextProcessParams, original_text: str) -> str:
processed_text = replace_text_from_to(original_text, params.replace_text_from_to)
if params.replace_non_standard_new_lines_chars:
@ -28,7 +27,7 @@ def pre_process(params: TextProcessParams, original_text: str):
return processed_text
def replace_not_text_chars(text: str, allowed_chars_ignoring_replace: set, replace_not_text_target_char: str):
def replace_not_text_chars(text: str, allowed_chars_ignoring_replace: set, replace_not_text_target_char: str) -> str:
result = ""
replaced_chars = []
for char in text:
@ -45,7 +44,7 @@ def replace_not_text_chars(text: str, allowed_chars_ignoring_replace: set, repla
return result
def replace_non_standard_new_lines_chars(text: str):
def replace_non_standard_new_lines_chars(text: str) -> str:
return text.replace("\r\n", "\n").replace("\n\r", "\n").replace("\r", "\n")
@ -57,14 +56,14 @@ def remove_identical_characters(text: str,
return re.sub(regexp, r'\1' * remove_identical_characters_max_repeats, text)
def remove_multiple_spaces(text: str):
def remove_multiple_spaces(text: str) -> str:
while ' ' in text:
text = text.replace(' ', ' ')
return text
def replace_text_from_to(text: str, from_to: dict | None):
def replace_text_from_to(text: str, from_to: dict | None) -> str:
if from_to and len(from_to) > 0:
for key, value in from_to.items():
text = text.replace(key, value)

View file

@ -1,8 +1,8 @@
import pysbd
from blingfire import text_to_sentences
from app.dto import TranslatePart
from app.struct import TextSplitParams, Part
from app.dto import TranslatePart, Part
from app.params import TextSplitParams
def is_arr_fin(arr: list, i):

View file

View file

6
jaa.py
View file

@ -40,8 +40,8 @@ main.init_plugins()
Python 3.5+ (due to dict mix in final_options calc), can be relaxed
"""
import os
import json
import os
# here we trying to use termcolor to highlight plugin info and errors during load
try:
@ -195,8 +195,8 @@ class JaaCore:
return self.plugin_manifests[pluginname]
return {}
def plugin_options(self, pluginname):
manifest = self.plugin_manifest(pluginname)
def plugin_options(self, plugin_name):
manifest = self.plugin_manifest(plugin_name)
if "options" in manifest:
return manifest["options"]
return None

View file

@ -1,65 +1,72 @@
from app import params
from app.app_core import AppCore
from app.struct import TranslationParams, read_text_split_params, \
read_text_process_params, read_translation_params, read_cache_params
manifest = {
"name": "Core plugin",
"version": "1.0",
# this is DEFAULT options
# ACTUAL options is in options/<plugin_name>.json after first run
"default_options": {
"default_translate_plugin": "lm_studio", # default translation engine. Will be auto inited on start
"init_on_start": "", # additional list of engines, that must be init on start, separated by ","
"sleep_after_translate": 0, # delay after translate (in seconds, may be decimal, for example 0.1 for 100 ms), if you GPU too hot
"translation_params": {
"default_from_lang": "en", # default from language
"default_to_lang": "ru", # default to language
},
"text_split_params": {
"split_by_paragraphs_and_length": True,
"split_by_sentences_and_length": False,
"split_expected_length": 1000,
"split_by_paragraphs_only": False,
"split_by_sentences_only": False,
"sentence_splitter": "default"
},
"text_processing_params": {
"apply_for_request": True, # apply processing params for text to translate
"apply_for_response": True, # apply processing params for result text
"replace_non_standard_new_lines_chars": True,
"replace_not_text_chars": False,
# some models has issues with special chars (for example { or }) in text. this option replace all non-digit / non text / non-allowed (allowed_chars_for_replace) chars
"allowed_chars_ignoring_replace": " .,<>:;\"'-–…?!#@№$%+/\\^&[]=*()«»—\r\t\n",
# allowed chars for replace with replace_not_text_chars
"replace_not_text_target_char": " ", # replace not allowed char to this char
# replace more than N char consecutive, for example: aaaa -> aaa, bbbbbbb -> bbb
"remove_identical_characters": True,
"remove_identical_characters_extra_chars": "",
"remove_identical_characters_max_repeats": 3,
"remove_multiple_spaces": True, # replace two or more space to one
"replace_text_from_to": { # additional replace variants, from : to
},
},
"cache_params": {
"enabled": True, # enable/disable translate cache
"file": "cache.db", # path to cache file
"disable_for_plugins": ["no_translate"], # list of plugin names without cache
"expire_days": 0, # 0 - without expire
},
"file_processing_params": {
"directory_in": "files_processing/in",
"directory_out": "files_processing/out",
"preserve_original_text": True,
"overwrite_processed_files": False
},
},
}
def start(core: AppCore):
manifest = {
"name": "Core plugin",
"version": "1.0",
# this is DEFAULT options
# ACTUAL options is in options/<plugin_name>.json after first run
"default_options": {
"default_translate_plugin": "lm_studio", # default translation engine. Will be auto inited on start
"init_on_start": "", # additional list of engines, that must be init on start, separated by ","
"translation_params": {
"default_from_lang": "en", # default from language
"default_to_lang": "ru", # default to language
},
"text_split_params": {
"split_by_paragraphs_and_length": True,
"split_by_sentences_and_length": False,
"split_expected_length": 1000,
"split_by_paragraphs_only": False,
"split_by_sentences_only": False,
"sentence_splitter": "default"
},
"text_processing_params": {
"apply_for_request": True, # apply processing params for text to translate
"apply_for_response": True, # apply processing params for result text
"replace_non_standard_new_lines_chars": True,
"replace_not_text_chars": False,
# some models has issues with special chars (for example { or }) in text. this option replace all non-digit / non text / non-allowed (allowed_chars_for_replace) chars
"allowed_chars_ignoring_replace": " .,<>:;\"'-–…?!#@№$%+/\\^&[]=*()«»—\r\t\n",
# allowed chars for replace with replace_not_text_chars
"replace_not_text_target_char": " ", # replace not allowed char to this char
# replace more than N char consecutive, for example: aaaa -> aaa, bbbbbbb -> bbb
"remove_identical_characters": True,
"remove_identical_characters_extra_chars": "",
"remove_identical_characters_max_repeats": 3,
"remove_multiple_spaces": True, # replace two or more space to one
"replace_text_from_to": { # additional replace variants, from : to
},
},
"cache_params": {
"enabled": True, # enable/disable translate cache
"file": "cache.db", # path to cache file
"disable_for_plugins": ["no_translate"], # list of plugin names without cache
"expire_days": 0, # 0 - without expire
}
},
}
return manifest
@ -68,10 +75,13 @@ def start_with_options(core: AppCore, manifest: dict):
core.default_translate_plugin = options["default_translate_plugin"]
core.init_on_start = options["init_on_start"]
core.sleep_after_translate = options["sleep_after_translate"]
core.translation_params = params.read_translation_params(manifest)
core.text_split_params = params.read_text_split_params(manifest)
core.text_process_params = params.read_text_process_params(manifest)
core.cache_params = params.read_cache_params(manifest)
core.file_processing_params = params.read_file_processing_params(manifest)
core.translation_params = read_translation_params(manifest)
core.text_split_params = read_text_split_params(manifest)
core.text_process_params = read_text_process_params(manifest)
core.cache_params = read_cache_params(manifest)
return manifest

106
plugins/plugin_file_epub.py Normal file
View file

@ -0,0 +1,106 @@
import logging
import os
import traceback
import ebooklib
from bs4 import BeautifulSoup
from ebooklib import epub
from natsort import os_sorted
from app import file_processor
from app.app_core import AppCore
from app.dto import ProcessingFileDirReq, ProcessingFileResp, FileProcessingPluginInitInfo, ProcessingFileStruct
from app.file_processor_html import FileProcessorHtml
plugin_name = os.path.basename(__file__)[:-3] # calculating modname
logger = logging.getLogger('uvicorn')
def start(core: AppCore):
manifest = { # plugin settings
"name": "Translator for epub books", # name
"version": "1.0", # version
"default_options": {
"enabled": True,
"text_format": {
"original_tag": "",
"translate_tag": "i",
"header_delimiter": " / "
},
"header_tags": ['h1', 'h2', 'h3', 'h4', 'h5', 'h6'],
"text_tags": ['p'],
"output_file_name_template": {
"preserve_original": "%source%__%from_lang%_%to_lang%",
"without_original": "%source%__%to_lang%",
},
"translate_other_first_chapters_amount": 0,
"default_extension_processor": {
"epub": True
},
},
"file_processing": {
"file_epub_translate": (init, file_processing, processed_file_name)
}
}
return manifest
def start_with_options(core: AppCore, manifest: dict):
pass
def init(core: AppCore) -> FileProcessingPluginInitInfo:
return FileProcessingPluginInitInfo(plugin_name=plugin_name, supported_extensions={"epub"})
def file_processing(core: AppCore, file_struct: ProcessingFileStruct, req: ProcessingFileDirReq) -> ProcessingFileResp:
options = core.plugin_options(plugin_name)
translate_only_first_chapters_amount: int = options["translate_only_first_chapters_amount"]
html_processor = FileProcessorHtml(core=core, options=options)
book_documents_ids: list[str] = []
try:
book = epub.read_epub(file_struct.path_file_in())
docs_count = 0
for item in book.get_items():
if item.get_type() == ebooklib.ITEM_DOCUMENT:
book_documents_ids.append(item.id)
docs_count = docs_count + 1
book_documents_ids_set: set[str]
if translate_only_first_chapters_amount > 0:
book_documents_ids = os_sorted(book_documents_ids)
book_documents_ids_set = set(book_documents_ids[:translate_only_first_chapters_amount])
else:
book_documents_ids_set = set(book_documents_ids)
processed_count = 0
log_limit_info = f"(limit: {translate_only_first_chapters_amount})" if translate_only_first_chapters_amount > 0 else ""
for item in book.get_items():
if item.get_type() == ebooklib.ITEM_DOCUMENT and item.id in book_documents_ids_set:
processed_count = processed_count + 1
logger.info("Translate file %s, item with id %s, item %s / %s %s",
file_struct.file_name_ext, item.get_id(), processed_count, docs_count, log_limit_info)
soup = BeautifulSoup(item.get_content(), features="xml")
html_processor.process(req=req, soup=soup)
item.set_content(soup.encode())
out_file_name = processed_file_name(core=core, file_struct=file_struct, req=req)
epub.write_epub(file_struct.path_file_out(out_file_name), book, {})
return file_processor.get_processing_file_resp_ok(file_struct=file_struct, file_out=out_file_name)
except Exception as e:
traceback.print_tb(e.__traceback__, limit=10)
logging.error("Error with processing file %s: %s", file_struct.file_name_ext, str(e))
return file_processor.get_processing_file_resp_error(
file_in=file_struct.file_name_ext, path_in=file_struct.path_in, error_msg=str(e))
def processed_file_name(core: AppCore, file_struct: ProcessingFileStruct, req: ProcessingFileDirReq) -> str:
options = core.plugin_options(plugin_name)
return file_processor.file_name_from_template(file_struct=file_struct, req=req, options=options)

View file

@ -0,0 +1,82 @@
import logging
import os
import traceback
from bs4 import BeautifulSoup
from app import file_processor
from app.app_core import AppCore
from app.dto import ProcessingFileDirReq, ProcessingFileResp, FileProcessingPluginInitInfo, ProcessingFileStruct
from app.file_processor_html import FileProcessorHtml
plugin_name = os.path.basename(__file__)[:-3] # calculating modname
logger = logging.getLogger('uvicorn')
def start(core: AppCore):
manifest = { # plugin settings
"name": "Translator for epub books", # name
"version": "1.0", # version
"default_options": {
"enabled": True,
"text_format": {
"original_tag": "",
"translate_tag": "emphasis",
"header_delimiter": " / ",
},
"header_tags": [],
"text_tags": ['p'],
"output_file_name_template": {
"preserve_original": "%source%__%from_lang%_%to_lang%",
"without_original": "%source%__%to_lang%",
},
"translate_only_first_paragraphs": 0,
"default_extension_processor": {
"fb2": True
},
},
"file_processing": {
"file_epub_translate": (init, file_processing, processed_file_name)
}
}
return manifest
def start_with_options(core: AppCore, manifest: dict):
pass
def init(core: AppCore) -> FileProcessingPluginInitInfo:
return FileProcessingPluginInitInfo(plugin_name=plugin_name, supported_extensions={"fb2"})
def file_processing(core: AppCore, file_struct: ProcessingFileStruct, req: ProcessingFileDirReq) -> ProcessingFileResp:
options = core.plugin_options(plugin_name)
html_processor = FileProcessorHtml(core=core, options=options)
try:
fb2_content = file_processor.read_file_with_fix_encoding(file_struct.path_file_in())
soup = BeautifulSoup(fb2_content, features="xml")
html_processor.process(req, soup, "body")
out_file_name = processed_file_name(core=core, file_struct=file_struct, req=req)
with open(file_struct.path_file_out(out_file_name), 'w+', encoding='utf-8') as fb2_put_file:
fb2_put_file.write(soup.decode())
return file_processor.get_processing_file_resp_ok(file_struct=file_struct, file_out=out_file_name)
except Exception as e:
traceback.print_tb(e.__traceback__, limit=10)
logging.error("Error with processing file %s: %s", file_struct.file_name_ext, str(e))
return file_processor.get_processing_file_resp_error(
file_in=file_struct.file_name_ext, path_in=file_struct.path_in, error_msg=str(e))
def processed_file_name(core: AppCore, file_struct: ProcessingFileStruct, req: ProcessingFileDirReq) -> str:
options = core.plugin_options(plugin_name)
return file_processor.file_name_from_template(file_struct=file_struct, req=req, options=options)

View file

@ -0,0 +1,88 @@
import os
from app import file_processor, params
from app.app_core import AppCore
from app.dto import ProcessingFileDirReq, ProcessingFileResp, FileProcessingPluginInitInfo, ProcessingFileStruct
plugin_name = os.path.basename(__file__)[:-3] # calculating modname
def start(core: AppCore):
manifest = { # plugin settings
"name": "Translator for txt files", # name
"version": "1.0", # version
"default_options": {
"enabled": True,
"markdown_output": False,
"text_format": {
"original_prefix": "",
"original_postfix": "",
"translate_prefix": "*",
"translate_postfix": "*",
},
"new_line_delimiter": "\n",
"output_file_name_template": {
"preserve_original": "%source%__%from_lang%_%to_lang%",
"without_original": "%source%__%to_lang%",
},
"default_extension_processor": {
"txt": True
},
},
"file_processing": {
"file_txt_translate": (init, file_processing, processed_file_name)
}
}
return manifest
def start_with_options(core: AppCore, manifest: dict):
pass
def init(core: AppCore) -> FileProcessingPluginInitInfo:
return FileProcessingPluginInitInfo(plugin_name=plugin_name, supported_extensions={"txt"})
def file_processing(core: AppCore, file_struct: ProcessingFileStruct, req: ProcessingFileDirReq) -> ProcessingFileResp:
options = core.plugin_options(plugin_name)
markdown_output: bool = options["markdown_output"]
new_line_delimiter: str = options["new_line_delimiter"]
text_format = params.read_plugin_file_processing_text_format(options)
new_line_delimiter_count = 2 if markdown_output else 1
result_lines: list[str] = []
file_content = file_processor.read_file_with_fix_encoding(file_struct.path_file_in())
lines: list[str] = file_content.splitlines()
for line in lines:
if line == '':
result_lines.append(new_line_delimiter)
continue
if req.preserve_original_text:
result_lines.append(text_format.original_text(line) +
new_line_delimiter * new_line_delimiter_count)
translate_req = req.translate_req(line)
translate_txt = core.translate(translate_req).result
translate_txt_format = text_format.translate_text(translate_txt)
result_lines.append(translate_txt_format + new_line_delimiter * new_line_delimiter_count)
out_file_name = processed_file_name(core=core, file_struct=file_struct, req=req)
with open(file_struct.path_file_out(out_file_name), "w", encoding=options["encoding_output"]) as f:
f.write((''.join(result_lines)))
return file_processor.get_processing_file_resp_ok(file_struct=file_struct, file_out=out_file_name)
def processed_file_name(core: AppCore, file_struct: ProcessingFileStruct, req: ProcessingFileDirReq) -> str:
options = core.plugin_options(plugin_name)
file_name = file_processor.file_name_from_template(file_struct=file_struct, req=req, options=options)
if options["markdown_output"]:
file_name = file_name[:-3] + "md"
return file_name

View file

@ -1,15 +1,17 @@
import os
from app import struct
from app.app_core import AppCore
from app.lang_dict import get_lang_by_2_chars_code
from app.struct import TranslateStruct
import requests
from tqdm import tqdm
from app import params
from app.app_core import AppCore
from app.dto import TranslatePluginInitInfo, TranslateStruct
from app.lang_dict import get_lang_by_2_chars_code
plugin_name = os.path.basename(__file__)[:-3] # calculating modname
modname = os.path.basename(__file__)[:-3] # calculating modname
# start function
def start(core: AppCore):
manifest = { # plugin settings
"name": "KoboldCpp Translator", # name
@ -28,16 +30,22 @@ def start(core: AppCore):
def start_with_options(core: AppCore, manifest: dict):
struct.read_plugin_params(manifest)
params.read_plugin_translate_params(manifest)
pass
def init(core: AppCore):
return modname
def init(core: AppCore) -> TranslatePluginInitInfo:
options = core.plugin_options(plugin_name)
url = options['custom_url'] + "/api/v1/model"
response = requests.get(url)
if response.status_code != 200:
raise ValueError(f'Response status {response.status_code} for request by url {url}')
return TranslatePluginInitInfo(plugin_name=plugin_name, model_name=response.json()["result"])
def translate(core: AppCore, ts: TranslateStruct):
options = core.plugin_options(modname)
options = core.plugin_options(plugin_name)
from_lang_name = get_lang_by_2_chars_code(ts.req.from_lang)
to_lang_name = get_lang_by_2_chars_code(ts.req.to_lang)
@ -45,7 +53,7 @@ def translate(core: AppCore, ts: TranslateStruct):
# prompt = options["prompt"].format(from_lang_name, to_lang_name)
url = options['custom_url'] + "/api/v1/generate"
for part in ts.parts:
for part in tqdm(ts.parts, unit=params.tp.unit, ascii=params.tp.ascii, desc=params.tp.desc):
if part.need_to_translate():
prompt = options["prompt"].format(from_lang_name, to_lang_name, part.text)
length: int
@ -91,7 +99,7 @@ def translate(core: AppCore, ts: TranslateStruct):
response = requests.post(url, json=req)
if response.status_code != 200:
raise ValueError("Response status {0} for request by url {1}".format(response.status_code, url))
raise ValueError(f'Response status {response.status_code} for request by url {url}')
content: str = response.json()["results"][0]['text']
part.translate = content.strip()

View file

@ -1,17 +1,18 @@
import os
import lmstudio
import requests
from lmstudio import LLM, LlmPredictionConfig
from tqdm import tqdm
from app import struct
from app import params
from app.app_core import AppCore
from app.dto import TranslatePluginInitInfo, TranslateStruct
from app.lang_dict import get_lang_by_2_chars_code
from app.struct import TranslateStruct, tp
modname = os.path.basename(__file__)[:-3] # calculating modname
plugin_name = os.path.basename(__file__)[:-3] # calculating modname
# start function
def start(core: AppCore):
manifest = {
"name": "LM-Studio Translator", # name
@ -20,7 +21,8 @@ def start(core: AppCore):
"default_options": {
"custom_url": "http://localhost:1234", #
"prompt": "You are professional translator. Translate text from {0} to {1}. Don't add any notes or any additional info in your answer, write only translate. Text: ",
"prompt_postfix": ""
"prompt_postfix": "",
"use_library_for_request": True,
},
"translate": {
@ -32,39 +34,77 @@ def start(core: AppCore):
def start_with_options(core: AppCore, manifest: dict):
struct.read_plugin_params(manifest)
params.read_plugin_translate_params(manifest)
pass
def init(core: AppCore):
return modname
def init(core: AppCore) -> TranslatePluginInitInfo:
options = core.plugin_options(plugin_name)
custom_url: str = options['custom_url']
use_library_for_request = options["use_library_for_request"]
if use_library_for_request:
lmstudio.configure_default_client(custom_url.replace("http://", ""))
loaded_models = lmstudio.list_loaded_models("llm")
if len(loaded_models) > 0:
return TranslatePluginInitInfo(plugin_name=plugin_name, model_name=loaded_models[0].identifier)
else:
raise ValueError('List loaded models is empty. Please load model before init this plugin')
else:
prompt = "You are assistant. " + options["prompt_postfix"]
model = http_request(custom_url, prompt, "init")["model"]
return TranslatePluginInitInfo(plugin_name=plugin_name, model_name=model)
def translate(core: AppCore, ts: TranslateStruct):
options = core.plugin_options(modname)
def translate(core: AppCore, ts: TranslateStruct) -> TranslateStruct:
options = core.plugin_options(plugin_name)
from_lang_name = get_lang_by_2_chars_code(ts.req.from_lang)
to_lang_name = get_lang_by_2_chars_code(ts.req.to_lang)
prompt = options["prompt"].format(from_lang_name, to_lang_name)
url = options['custom_url'] + "/v1/chat/completions"
prompt = options["prompt"].format(from_lang_name, to_lang_name) + options["prompt_postfix"]
use_library_for_request = options["use_library_for_request"]
for part in tqdm(ts.parts, unit=tp.unit, ascii=tp.ascii, desc=tp.desc):
model: LLM
if use_library_for_request:
model = lmstudio.llm()
for part in tqdm(ts.parts, unit=params.tp.unit, ascii=params.tp.ascii, desc=params.tp.desc):
if part.need_to_translate():
req = {
"messages": [
{"role": "system", "content": prompt + options["prompt_postfix"]},
{"role": "user", "content": part.text}
],
"temperature": 0.0
}
content: str
if use_library_for_request:
content = library_request(model, prompt, part.text)
else:
content = http_request_content(options['custom_url'], prompt, part.text)
response = requests.post(url, json=req)
if response.status_code != 200:
raise ValueError("Response status {0} for request by url {1}".format(response.status_code, url))
content: str = response.json()["choices"][0]['message']['content']
part.translate = content.replace("<think>\n\n</think>\n\n", "").strip()
return ts
def library_request(model: LLM, prompt: str, text: str) -> str:
chat = lmstudio.Chat(prompt)
chat.add_user_message(text)
result = model.respond(chat, config=LlmPredictionConfig(temperature=0.0))
return result.content
# API request
def http_request(base_url: str, prompt: str, text: str) -> dict:
req = {
"messages": [
{"role": "system", "content": prompt},
{"role": "user", "content": text}
],
"temperature": 0.0
}
response = requests.post(base_url + "/v1/chat/completions", json=req)
if response.status_code != 200:
raise ValueError("Response status {0} for request by url {1}".format(response.status_code, base_url))
return response.json()
def http_request_content(url: str, prompt: str, text: str) -> str:
return http_request(url, prompt, text)["choices"][0]['message']['content']

View file

@ -6,11 +6,11 @@ from ctranslate2 import Translator
from tqdm import tqdm
from transformers import PreTrainedTokenizerBase
from app import cuda, struct
from app import cuda, params
from app.app_core import AppCore
from app.struct import TranslateStruct, tp
from app.dto import TranslatePluginInitInfo, TranslateStruct
modname = os.path.basename(__file__)[:-3]
plugin_name = os.path.basename(__file__)[:-3]
model: Translator
tokenizer: PreTrainedTokenizerBase
@ -43,26 +43,26 @@ def start(core: AppCore):
def start_with_options(core: AppCore, manifest:dict):
struct.read_plugin_params(manifest)
params.read_plugin_translate_params(manifest)
return manifest
def init(core:AppCore):
options = core.plugin_options(modname)
def init(core:AppCore) -> TranslatePluginInitInfo:
options = core.plugin_options(plugin_name)
global model
global tokenizer
model = ctranslate2.Translator(options["model"],
model = ctranslate2.Translator(options["model"], compute_type=options["compute_type"],
device=cuda.get_device(options), device_index=options["cuda_device_index"])
tokenizer = transformers.AutoTokenizer.from_pretrained(options["tokenizer"])
return modname
return TranslatePluginInitInfo(plugin_name=plugin_name, model_name=f'{options["model"]}__{options["compute_type"]}')
def translate(core: AppCore, ts: TranslateStruct):
options = core.plugin_options(modname)
options = core.plugin_options(plugin_name)
# # implementation 1: one part - one batch
# for part in ts.parts:
@ -78,7 +78,7 @@ def translate(core: AppCore, ts: TranslateStruct):
# implementation 2: all parts - one batch. It's faster, but depends on amount of batches.
tokens_list = []
for part in tqdm(ts.parts, unit=tp.unit, ascii=tp.ascii, desc=tp.desc):
for part in tqdm(ts.parts, unit=params.tp.unit, ascii=params.tp.ascii, desc=params.tp.desc):
if part.need_to_translate():
input_text = "<2" + ts.req.to_lang + ">" + part.text
tokens = tokenizer.convert_ids_to_tokens(tokenizer.encode(input_text))

View file

@ -7,12 +7,12 @@ import os
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from app import struct, cuda
from app import cuda, params
from app.app_core import AppCore
from app.dto import TranslatePluginInitInfo, TranslateStruct
from app.lang_dict import lang_2_chars_to_nllb_lang
from app.struct import TranslateStruct, tp
modname = os.path.basename(__file__)[:-3] # calculating modname
plugin_name = os.path.basename(__file__)[:-3] # calculating modname
model = None
tokenizers:dict = {}
@ -41,22 +41,22 @@ def start(core: AppCore):
def start_with_options(core: AppCore, manifest: dict):
struct.read_plugin_params(manifest)
params.read_plugin_translate_params(manifest)
return manifest
def init(core: AppCore):
options = core.plugin_options(modname)
def init(core: AppCore) -> TranslatePluginInitInfo:
options = core.plugin_options(plugin_name)
global model
model = AutoModelForSeq2SeqLM.from_pretrained(options["model"]).to(cuda.get_device_with_gpu_num(options))
return modname
return TranslatePluginInitInfo(plugin_name=plugin_name, model_name=options["model"])
def translate(core: AppCore, ts: TranslateStruct):
options = core.plugin_options(modname)
options = core.plugin_options(plugin_name)
from_lang = lang_2_chars_to_nllb_lang[ts.req.from_lang]
to_lang = lang_2_chars_to_nllb_lang[ts.req.to_lang]
@ -66,7 +66,7 @@ def translate(core: AppCore, ts: TranslateStruct):
tokenizers[from_lang] = AutoTokenizer.from_pretrained(options["model"], src_lang=from_lang)
tokenizer = tokenizers[from_lang]
for part in tqdm(ts.parts, unit=tp.unit, ascii=tp.ascii, desc=tp.desc):
for part in tqdm(ts.parts, unit=params.tp.unit, ascii=params.tp.ascii, desc=params.tp.desc):
if part.need_to_translate():
inputs = tokenizer(part.text, return_tensors="pt").to(cuda_device)

View file

@ -5,12 +5,12 @@ from ctranslate2 import Translator
from tqdm import tqdm
from transformers import AutoTokenizer
from app import cuda, struct
from app import cuda, params
from app.app_core import AppCore
from app.dto import TranslatePluginInitInfo, TranslateStruct
from app.lang_dict import lang_2_chars_to_nllb_lang
from app.struct import TranslateStruct, tp
modname = os.path.basename(__file__)[:-3]
plugin_name = os.path.basename(__file__)[:-3]
model: Translator
tokenizers:dict = {}
@ -42,24 +42,24 @@ def start(core: AppCore):
def start_with_options(core: AppCore, manifest:dict):
struct.read_plugin_params(manifest)
params.read_plugin_translate_params(manifest)
return manifest
def init(core:AppCore):
options = core.plugin_options(modname)
def init(core:AppCore) -> TranslatePluginInitInfo:
options = core.plugin_options(plugin_name)
global model
model = ctranslate2.Translator(options["model"],
model = ctranslate2.Translator(options["model"], compute_type=options["compute_type"],
device=cuda.get_device(options), device_index=options["cuda_device_index"])
return modname
return TranslatePluginInitInfo(plugin_name=plugin_name, model_name=f'{options["model"]}__{options["compute_type"]}')
def translate(core: AppCore, ts: TranslateStruct):
options = core.plugin_options(modname)
options = core.plugin_options(plugin_name)
from_lang = lang_2_chars_to_nllb_lang[ts.req.from_lang]
to_lang = lang_2_chars_to_nllb_lang[ts.req.to_lang]
@ -68,7 +68,7 @@ def translate(core: AppCore, ts: TranslateStruct):
tokenizer = tokenizers[from_lang]
# translate_batch not optimal, but there are problems with try to implement batch processing like madlab_ctranslate2
for part in tqdm(ts.parts, unit=tp.unit, ascii=tp.ascii, desc=tp.desc):
for part in tqdm(ts.parts, unit=params.tp.unit, ascii=params.tp.ascii, desc=params.tp.desc):
if part.need_to_translate():
input_text = part.text
tokens = tokenizer.convert_ids_to_tokens(tokenizer.encode(input_text))

View file

@ -1,15 +1,17 @@
# No Translate dummy plugin
# No Translate dummy plugin, for test / debug
# author: Vladislav Janvarev
import os
from tqdm import tqdm
from app import params
from app.app_core import AppCore
from app.struct import TranslateStruct
from app.dto import TranslatePluginInitInfo, TranslateStruct
modname = os.path.basename(__file__)[:-3] # calculating modname
plugin_name = os.path.basename(__file__)[:-3] # calculating modname
# start function
def start(core: AppCore):
manifest = { # plugin settings
"name": "No Translate dummy plugin", # name
@ -23,12 +25,12 @@ def start(core: AppCore):
return manifest
def init(core: AppCore):
return modname
def init(core: AppCore) -> TranslatePluginInitInfo:
return TranslatePluginInitInfo(plugin_name=plugin_name, model_name="")
def translate(core: AppCore, ts: TranslateStruct):
for part in ts.parts:
for part in tqdm(ts.parts, unit=params.tp.unit, ascii=params.tp.ascii, desc=params.tp.desc):
part.translate = part.text
return ts

View file

@ -5,4 +5,4 @@ This is project for offline translate with LLM (Large Language Model) or more sp
---
Это проект для оффлайн перевода с использованием LLM (Large Language Model) или более специфичных моделей дял перевода, такие как nllb или madlab.
Это проект для оффлайн перевода с использованием LLM (Large Language Model) или более специфичных моделей, таких как nllb или madlab.

View file

@ -2,6 +2,8 @@ uvicorn
uvicorn[standard]
fastapi
termcolor
natsort
chardet
transformers
ctranslate2
@ -9,3 +11,7 @@ ctranslate2
blingfire
pysbd
lmstudio
ebooklib
beautifulsoup4

151
static/common.js Normal file
View file

@ -0,0 +1,151 @@
function fill_language_select_elements() {
const fromLangSelect = document.getElementById('from_lang_select');
const toLangSelect = document.getElementById('to_lang_select');
for (const [key, value] of Object.entries(langDict)) {
fromLangSelect.innerHTML += "<option value='" + key + "'>" + value + "</option>";
toLangSelect.innerHTML += "<option value='" + key + "'>" + value + "</option>";
}
fromLangSelect.value = '';
toLangSelect.value = '';
}
const langDict = {
'': 'default from settings',
'ab': 'abkhazian',
'aa': 'afar',
'af': 'afrikaans',
'sq': 'albanian',
'am': 'amharic',
'ar': 'arabic',
'hy': 'armenian',
'as': 'assamese',
'ay': 'aymara',
'az': 'azerbaijani',
'ba': 'bashkir',
'eu': 'basque',
'bn': 'bengali',
'dz': 'bhutani',
'bh': 'bihari',
'bi': 'bislama',
'br': 'breton',
'bg': 'bulgarian',
'my': 'burmese',
'be': 'byelorussian',
'km': 'cambodian',
'ca': 'catalan',
'zh': 'chinese',
'co': 'corsican',
'hr': 'croatian',
'cs': 'czech',
'da': 'danish',
'nl': 'dutch',
'en': 'english',
'eo': 'esperanto',
'et': 'estonian',
'fo': 'faeroese',
'fj': 'fiji',
'fi': 'finnish',
'fr': 'french',
'fy': 'frisian',
'gd': 'gaelic',
'gl': 'galician',
'ka': 'georgian',
'de': 'german',
'el': 'greek',
'kl': 'greenlandic',
'gn': 'guarani',
'gu': 'gujarati',
'ha': 'hausa',
'iw': 'hebrew',
'hi': 'hindi',
'hu': 'hungarian',
'is': 'icelandic',
'in': 'indonesian',
'ia': 'interlingua',
'ie': 'interlingue',
'ik': 'inupiak',
'ga': 'irish',
'it': 'italian',
'ja': 'japanese',
'jw': 'javanese',
'kn': 'kannada',
'ks': 'kashmiri',
'kk': 'kazakh',
'rw': 'kinyarwanda',
'ky': 'kirghiz',
'rn': 'kirundi',
'ko': 'korean',
'ku': 'kurdish',
'lo': 'laothian',
'la': 'latin',
'lv': 'latvian',
'ln': 'lingala',
'lt': 'lithuanian',
'mk': 'macedonian',
'mg': 'malagasy',
'ms': 'malay',
'ml': 'malayalam',
'mt': 'maltese',
'mi': 'maori',
'mr': 'marathi',
'mo': 'moldavian',
'mn': 'mongolian',
'na': 'nauru',
'ne': 'nepali',
'no': 'norwegian',
'oc': 'occitan',
'or': 'oriya',
'om': 'oromo',
'ps': 'pashto',
'fa': 'persian',
'pl': 'polish',
'pt': 'portuguese',
'pa': 'punjabi',
'qu': 'quechua',
'rm': 'rhaeto-romance',
'ro': 'romanian',
'ru': 'russian',
'sm': 'samoan',
'sg': 'sangro',
'sa': 'sanskrit',
'sr': 'serbian',
'sh': 'serbo-croatian',
'st': 'sesotho',
'tn': 'setswana',
'sn': 'shona',
'sd': 'sindhi',
'si': 'singhalese',
'ss': 'siswati',
'sk': 'slovak',
'sl': 'slovenian',
'so': 'somali',
'es': 'spanish',
'su': 'sudanese',
'sw': 'swahili',
'sv': 'swedish',
'tl': 'tagalog',
'tg': 'tajik',
'ta': 'tamil',
'tt': 'tatar',
'te': 'tegulu',
'th': 'thai',
'bo': 'tibetan',
'ti': 'tigrinya',
'to': 'tonga',
'ts': 'tsonga',
'tr': 'turkish',
'tk': 'turkmen',
'tw': 'twi',
'uk': 'ukrainian',
'ur': 'urdu',
'uz': 'uzbek',
'vi': 'vietnamese',
'vo': 'volapuk',
'cy': 'welsh',
'wo': 'wolof',
'xh': 'xhosa',
'ji': 'yiddish',
'yo': 'yoruba',
'zu': 'zulu',
};

21
static/ext.css Normal file
View file

@ -0,0 +1,21 @@
.loader {
border: 4px solid #f3f3f3; /* Light grey */
border-top: 4px solid #2a82b6; /* Blue */
border-radius: 50%;
width: 16px;
height: 16px;
animation: spin 2s linear infinite;
}
@keyframes spin {
0% {
transform: rotate(0deg);
}
100% {
transform: rotate(360deg);
}
}
.text-bold {
font-weight: bold;
}

View file

@ -0,0 +1,97 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8"/>
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<title>LLM translate - file processing</title>
<script type="application/javascript" src="file_processing.js"></script>
<script type="application/javascript" src="common.js"></script>
<link rel="stylesheet" href="chota.min.css">
<link rel="stylesheet" href="ext.css">
</head>
<body>
<div id="top" class="container" role="document">
<div class="row">
<div class="col">
<h5><a href="index.html">LLM Translate</a> &nbsp;&nbsp; File processing</h5>
</div>
<div class="col">
<span id="errorText" class="text-error"></span>
</div>
</div>
<div class="row">
<div class="col">
<div class="row">
<div class="col">
<select name="from_lang" id="from_lang_select"></select>
</div>
<div class="col">
<button id="submit" class="button primary icon" type="submit">
&nbsp; Process &nbsp;<div id="progress" class="loader" style="display: none;"></div>
</button>
</div>
</div>
</div>
<div class="col">
<div class="row">
<div class="col">
<select name="to_lang" id="to_lang_select"></select>
</div>
<div class="col">
<input id="plugin" value="" placeholder="Use translator plugin (optional)"/>
</div>
</div>
</div>
</div>
<div class="row">
<div class="col">
<label for="preserve_original_text">
<input id="preserve_original_text" type="checkbox" checked="checked"> Preserve original text
</label>
<label for="overwrite_processed_files">
<input id="overwrite_processed_files" type="checkbox" checked="checked"> Overwrite processed files
</label>
<label for="recursive_sub_dirs">
<input id="recursive_sub_dirs" type="checkbox" checked="checked"> Recursive Subdirectories
</label>
</div>
<div class="col">
</div>
</div>
<div class="row">
<div class="col">
<table>
<caption><h5>Input directory content: <span id="directory_in"></span></h5></caption>
<thead>
<tr><th>File name</th><th>File processor</th></tr>
</thead>
<tbody id="process_files_table_in">
</tbody>
</table>
</div>
<div class="col">
<table>
<caption><h5>Output directory content: <span id="directory_out"></span></h5></caption>
<thead>
<tr><th>File name</th></tr>
</thead>
<tbody id="process_files_table_out">
</tbody>
</table>
</div>
</div>
<div class="row">
<div class="col">
<table>
<caption>Processing results</caption>
<thead>
<tr><th>Source File</th><th>Result File</th><th>Status</th></tr>
</thead>
<tbody id="process_files_table_result">
</tbody>
</table>
</div>
</div>
</div>
</body>

134
static/file_processing.js Normal file
View file

@ -0,0 +1,134 @@
async function load_file_list() {
const recursiveSubDirs = document.getElementById('recursive_sub_dirs');
const params = new URLSearchParams({recursive_sub_dirs: recursiveSubDirs.checked});
const response = await fetch(`/process-files-list?${params}`);
const data = await response.json();
if (data.error) {
const errorTextAdd = document.getElementById('errorText');
errorTextAdd.innerHTML = data.error;
} else {
const processFilesTableIn = document.getElementById('process_files_table_in');
const processFilesTableOut = document.getElementById('process_files_table_out');
const directoryIn = document.getElementById('directory_in');
const directoryOut = document.getElementById('directory_out');
processFilesTableIn.innerHTML = "";
processFilesTableOut.innerHTML = "";
for (const fileItem of data.files_in) {
let file_class;
if (fileItem.file_error) {
file_class = "text-error";
} else if (fileItem.file_processor) {
file_class = "text-primary text-bold";
} else {
file_class = "";
}
const file_processor = fileItem.file_processor ? fileItem.file_processor : "Not found";
processFilesTableIn.innerHTML += "<tr><td><span class='" + file_class + "'>" + fileItem.file_with_path
+ "</span></td><td>" + file_processor + "</td></tr>"
}
for (const fileItem of data.files_out) {
processFilesTableOut.innerHTML += "<tr><td><span>" + fileItem.file_with_path + "</span></td></tr>"
}
directoryIn.innerHTML = data.directory_in;
directoryOut.innerHTML = data.directory_out;
}
return "";
}
async function process_files() {
const elProgress = document.getElementById('progress');
const submit = document.getElementById('submit');
const errorText = document.getElementById('errorText');
submit.disabled = true;
elProgress.style.display = 'inline';
const preserve_original_text = document.getElementById('preserve_original_text').checked;
const overwrite_processed_files = document.getElementById('overwrite_processed_files').checked;
const recursiveSubDirs = document.getElementById('recursive_sub_dirs').checked;
const fromLang = document.getElementById('from_lang_select').value;
const toLang = document.getElementById('to_lang_select').value;
const plugin = document.getElementById('plugin').value;
const reqBody = JSON.stringify({
from_lang: fromLang, to_lang: toLang, translator_plugin: plugin,
preserve_original_text: preserve_original_text, overwrite_processed_files: overwrite_processed_files,
recursive_sub_dirs: recursiveSubDirs, file_processors: null
});
const reqParam = {
method: 'POST',
body: reqBody,
signal: AbortSignal.timeout(600000),
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
}
try {
const response = await fetch(`/process-files`, reqParam);
const data = await response.json();
if (data.error) {
errorText.innerHTML = data.error;
return "";
} else {
const processFilesTableResult = document.getElementById('process_files_table_result');
processFilesTableResult.innerHTML = "";
for (const fileItem of data.files) {
let file_class = "";
if (fileItem.status === 'ERROR') {
file_class = "text-error";
} else if (fileItem.status === 'OK') {
file_class = "text-primary text-bold";
} else {
file_class = "";
}
let status = fileItem.status;
switch (fileItem.status) {
case "ERROR":
status = "Error";
break;
case "TYPE_NOT_SUPPORT":
status = "Type not support";
break;
case "TRANSLATE_ALREADY_EXISTS":
status = "Translate already exists"
}
const pathFileOut = fileItem.path_file_out ? fileItem.path_file_out : "";
processFilesTableResult.innerHTML += "<tr><td><span class='" + file_class + "'>" + fileItem.path_file_in
+ "</span></td><td>" + pathFileOut + "</td><td>" + status + "</td></tr>"
}
return "";
}
} catch (error) {
errorText.innerHTML = error.message;
console.error(error.message);
} finally {
elProgress.style.display = 'none';
submit.disabled = false;
}
}
window.onload = () => {
const recursiveSubDirs = document.getElementById('recursive_sub_dirs');
recursiveSubDirs.onchange = () => {
load_file_list();
}
const submit = document.getElementById('submit');
submit.onmouseup = () => {
process_files();
load_file_list();
};
fill_language_select_elements();
load_file_list();
}

View file

@ -5,33 +5,21 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<title>LLM translate</title>
<script type="application/javascript" src="index.js"></script>
<script type="application/javascript" src="common.js"></script>
<link rel="stylesheet" href="chota.min.css">
<style>
.loader {
border: 4px solid #f3f3f3; /* Light grey */
border-top: 4px solid #2a82b6; /* Blue */
border-radius: 50%;
width: 16px;
height: 16px;
animation: spin 2s linear infinite;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
</style>
<link rel="stylesheet" href="ext.css">
</head>
<body>
<div id="top" class="container" role="document">
<div class="row">
<div class="col">
<h5>LLM Translate</h5>
<h5>LLM Translate &nbsp;&nbsp; <a href="file_processing.html">File processing</a></h5>
</div>
<div class="col">
<span id="errorText" class="text-error"></span>
</div>
</div>
<div class="row">
<div class="col">
<div class="row">
@ -39,8 +27,8 @@
<select name="from_lang" id="from_lang_select"></select>
</div>
<div class="col">
<button id="trigger" class="button primary icon" type="submit">
&nbspTranslate&nbsp<div id="progress" class="loader" style="display: none;"></div>
<button id="submit" class="button primary icon" type="submit">
&nbsp; Translate &nbsp;<div id="progress" class="loader" style="display: none;"></div>
</button>
</div>
</div>
@ -56,12 +44,13 @@
</div>
</div>
</div>
<div class="row">
<div class="col">
<textarea id="text" rows="20"></textarea>
<label for="text"></label><textarea id="text" rows="20"></textarea>
</div>
<div class="col" aria-busy="true">
<textarea id="text_result" rows="20" ></textarea>
<label for="text_result"></label><textarea id="text_result" rows="20" ></textarea>
</div>
</div>
</div>

View file

@ -1,29 +1,30 @@
async function translateText() {
const elProgress = document.getElementById('progress');
const trigger = document.getElementById('trigger');
const elResult = document.getElementById('text_result');
const submit = document.getElementById('submit');
const errorText = document.getElementById('errorText');
submit.disabled = true;
elProgress.style.display = 'inline';
elResult.value = '';
const text = document.getElementById('text').value;
const fromLang = document.getElementById('from_lang_select').value;
const toLang = document.getElementById('to_lang_select').value;
const plugin = document.getElementById('plugin').value;
trigger.disabled = true;
elProgress.style.display = 'inline';
const reqBody = JSON.stringify({
text: text, from_lang: fromLang, to_lang: toLang,
translator_plugin: plugin
});
const reqParam = {
method: 'POST',
body: reqBody,
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
}
try {
const reqBody = JSON.stringify({
text: text, from_lang: fromLang, to_lang: toLang,
translator_plugin: plugin
});
const reqParam = {
method: 'POST',
body: reqBody,
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
}
const response = await fetch(`/translate`, reqParam);
const data = await response.json();
if (data.error) {
@ -31,7 +32,7 @@ async function translateText() {
return "";
} else {
const translation = data.result;
document.getElementById('text_result').value = translation;
elResult.value = translation;
errorText.innerHTML = ""
return translation;
@ -41,163 +42,16 @@ async function translateText() {
console.error(error.message);
} finally {
elProgress.style.display = 'none';
trigger.disabled = false;
submit.disabled = false;
}
}
window.onload = () => {
const trigger = document.getElementById('trigger');
trigger.onmouseup = () => {
const submit = document.getElementById('submit');
submit.onmouseup = () => {
translateText();
};
const langDict = {
'en': 'english',
'ru': 'russian',
'ab': 'abkhazian',
'aa': 'afar',
'af': 'afrikaans',
'sq': 'albanian',
'am': 'amharic',
'ar': 'arabic',
'hy': 'armenian',
'as': 'assamese',
'ay': 'aymara',
'az': 'azerbaijani',
'ba': 'bashkir',
'eu': 'basque',
'bn': 'bengali',
'dz': 'bhutani',
'bh': 'bihari',
'bi': 'bislama',
'br': 'breton',
'bg': 'bulgarian',
'my': 'burmese',
'be': 'byelorussian',
'km': 'cambodian',
'ca': 'catalan',
'zh': 'chinese',
'co': 'corsican',
'hr': 'croatian',
'cs': 'czech',
'da': 'danish',
'nl': 'dutch',
'eo': 'esperanto',
'et': 'estonian',
'fo': 'faeroese',
'fj': 'fiji',
'fi': 'finnish',
'fr': 'french',
'fy': 'frisian',
'gd': 'gaelic',
'gl': 'galician',
'ka': 'georgian',
'de': 'german',
'el': 'greek',
'kl': 'greenlandic',
'gn': 'guarani',
'gu': 'gujarati',
'ha': 'hausa',
'iw': 'hebrew',
'hi': 'hindi',
'hu': 'hungarian',
'is': 'icelandic',
'in': 'indonesian',
'ia': 'interlingua',
'ie': 'interlingue',
'ik': 'inupiak',
'ga': 'irish',
'it': 'italian',
'ja': 'japanese',
'jw': 'javanese',
'kn': 'kannada',
'ks': 'kashmiri',
'kk': 'kazakh',
'rw': 'kinyarwanda',
'ky': 'kirghiz',
'rn': 'kirundi',
'ko': 'korean',
'ku': 'kurdish',
'lo': 'laothian',
'la': 'latin',
'lv': 'latvian',
'ln': 'lingala',
'lt': 'lithuanian',
'mk': 'macedonian',
'mg': 'malagasy',
'ms': 'malay',
'ml': 'malayalam',
'mt': 'maltese',
'mi': 'maori',
'mr': 'marathi',
'mo': 'moldavian',
'mn': 'mongolian',
'na': 'nauru',
'ne': 'nepali',
'no': 'norwegian',
'oc': 'occitan',
'or': 'oriya',
'om': 'oromo',
'ps': 'pashto',
'fa': 'persian',
'pl': 'polish',
'pt': 'portuguese',
'pa': 'punjabi',
'qu': 'quechua',
'rm': 'rhaeto-romance',
'ro': 'romanian',
'sm': 'samoan',
'sg': 'sangro',
'sa': 'sanskrit',
'sr': 'serbian',
'sh': 'serbo-croatian',
'st': 'sesotho',
'tn': 'setswana',
'sn': 'shona',
'sd': 'sindhi',
'si': 'singhalese',
'ss': 'siswati',
'sk': 'slovak',
'sl': 'slovenian',
'so': 'somali',
'es': 'spanish',
'su': 'sudanese',
'sw': 'swahili',
'sv': 'swedish',
'tl': 'tagalog',
'tg': 'tajik',
'ta': 'tamil',
'tt': 'tatar',
'te': 'tegulu',
'th': 'thai',
'bo': 'tibetan',
'ti': 'tigrinya',
'to': 'tonga',
'ts': 'tsonga',
'tr': 'turkish',
'tk': 'turkmen',
'tw': 'twi',
'uk': 'ukrainian',
'ur': 'urdu',
'uz': 'uzbek',
'vi': 'vietnamese',
'vo': 'volapuk',
'cy': 'welsh',
'wo': 'wolof',
'xh': 'xhosa',
'ji': 'yiddish',
'yo': 'yoruba',
'zu': 'zulu',
};
const fromLangSelect = document.getElementById('from_lang_select');
const toLangSelect = document.getElementById('to_lang_select');
for (const [key, value] of Object.entries(langDict)) {
fromLangSelect.innerHTML += "<option value='" + key + "'>" + value + "</option>";
toLangSelect.innerHTML += "<option value='" + key + "'>" + value + "</option>";
}
fromLangSelect.value = 'en';
toLangSelect.value = 'ru';
fill_language_select_elements();
}

View file

@ -0,0 +1 @@
Hello, World. Привет, Мир. Ёё.

View file

@ -0,0 +1,9 @@
from unittest import TestCase
from app import file_processor
class FileProcessTest(TestCase):
def test_encoding(self):
content = file_processor.read_file_with_fix_encoding("../files/test_encoding_ansi.txt")
self.assertEqual("Hello, World. Привет, Мир. Ёё.", content)

View file

@ -1,6 +1,6 @@
from unittest import TestCase
from app.struct import Part
from app.dto import Part
class StructTest(TestCase):

View file

@ -1,7 +1,8 @@
import unittest
from app import text_splitter
from app.struct import TextSplitParams, Part
from app.dto import Part
from app.params import TextSplitParams
s1 = "Text one."
s2 = "Text two."