mirror of
https://github.com/LostRuins/koboldcpp.git
synced 2026-05-17 04:09:19 +00:00
* move conversion code to a dedicated conversion directory and split the files akin to the src/models architecture --------- Co-authored-by: Sigbjørn Skjæret <sigbjorn.skjaeret@scala.com>
200 lines
8.4 KiB
Python
200 lines
8.4 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import math
|
|
import re
|
|
|
|
from typing import Callable, Iterable, TYPE_CHECKING
|
|
|
|
import torch
|
|
|
|
if TYPE_CHECKING:
|
|
from torch import Tensor
|
|
|
|
from .base import MmprojModel, ModelBase, TextModel, gguf
|
|
|
|
|
|
@ModelBase.register("Ernie4_5_ForCausalLM", "Ernie4_5ForCausalLM")
|
|
class Ernie4_5Model(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.ERNIE4_5
|
|
|
|
def set_vocab(self):
|
|
self._set_vocab_sentencepiece()
|
|
|
|
tokenizer_config_file = self.dir_model / 'tokenizer_config.json'
|
|
if tokenizer_config_file.is_file():
|
|
with open(tokenizer_config_file, "r", encoding="utf-8") as f:
|
|
tokenizer_config_json = json.load(f)
|
|
if "add_prefix_space" in tokenizer_config_json:
|
|
self.gguf_writer.add_add_space_prefix(tokenizer_config_json["add_prefix_space"])
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
if "ernie." in name:
|
|
name = name.replace("ernie.", "model.")
|
|
|
|
return super().filter_tensors((name, gen))
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
num_heads = self.hparams["num_attention_heads"]
|
|
num_kv_heads = self.hparams["num_key_value_heads"]
|
|
if (head_dim := self.hparams.get("head_dim")) is None:
|
|
head_dim = self.hparams["hidden_size"] // num_heads
|
|
|
|
# split the qkv weights
|
|
# qkv_proj shape: [(num_heads + 2 * num_kv_heads) * head_dim, hidden_size]
|
|
if "qkv_proj" in name:
|
|
name_q = name.replace("qkv_proj.weight", "q_proj.weight")
|
|
name_k = name.replace("qkv_proj.weight", "k_proj.weight")
|
|
name_v = name.replace("qkv_proj.weight", "v_proj.weight")
|
|
total_q_dim = num_heads * head_dim
|
|
total_k_dim = num_kv_heads * head_dim
|
|
total_v_dim = num_kv_heads * head_dim
|
|
q_proj_weight, k_proj_weight, v_proj_weight = data_torch.split([total_q_dim, total_k_dim, total_v_dim], dim=0)
|
|
yield from super().modify_tensors(q_proj_weight, name_q, bid)
|
|
yield from super().modify_tensors(k_proj_weight, name_k, bid)
|
|
yield from super().modify_tensors(v_proj_weight, name_v, bid)
|
|
# split the up_gate_proj into gate and up
|
|
# up_gate_proj shape: [2 * intermediate_size, hidden_size]
|
|
elif "up_gate_proj" in name:
|
|
name_up = name.replace("up_gate_proj.weight", "up_proj.weight")
|
|
name_gate = name.replace("up_gate_proj.weight", "gate_proj.weight")
|
|
dim_half = data_torch.shape[0] // 2
|
|
gate_proj_weight, up_proj_weight = data_torch.split(dim_half, dim=0)
|
|
yield from super().modify_tensors(gate_proj_weight, name_gate, bid)
|
|
yield from super().modify_tensors(up_proj_weight, name_up, bid)
|
|
else:
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("Ernie4_5_MoeForCausalLM")
|
|
class Ernie4_5MoeModel(Ernie4_5Model):
|
|
model_arch = gguf.MODEL_ARCH.ERNIE4_5_MOE
|
|
_experts: list[dict[str, Tensor]] | None = None
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._experts = [{} for _ in range(self.block_count)]
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
self.gguf_writer.add_expert_count(self.hparams["moe_num_experts"])
|
|
self.gguf_writer.add_expert_used_count(self.hparams["moe_k"])
|
|
self.gguf_writer.add_interleave_moe_layer_step(self.hparams["moe_layer_interval"])
|
|
self.gguf_writer.add_leading_dense_block_count(self.hparams["moe_layer_start_index"])
|
|
if (moe_intermediate_size := self.hparams.get("moe_intermediate_size")) is not None:
|
|
self.gguf_writer.add_expert_feed_forward_length(moe_intermediate_size)
|
|
if (shared_expert_count := self.hparams.get('moe_num_shared_experts')) is not None:
|
|
self.gguf_writer.add_expert_shared_count(shared_expert_count)
|
|
if shared_expert_count > 0 and (shared_expert_intermediate_size := self.hparams.get('intermediate_size')) is not None and (num_key_value_heads := self.hparams.get('num_key_value_heads')) is not None:
|
|
self.gguf_writer.add_expert_shared_feed_forward_length(shared_expert_intermediate_size // num_key_value_heads)
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
# skip Multi-Token Prediction (MTP) layers (again, same as DeepseekV2)
|
|
match = re.match(r"model.mtp_block.(\d+)", name)
|
|
if match:
|
|
return None
|
|
|
|
# skip all other MTP tensors for now
|
|
match = re.match(r"model.mtp_emb_norm.(\d+)", name)
|
|
if match:
|
|
return None
|
|
|
|
match = re.match(r"model.mtp_hidden_norm.(\d+)", name)
|
|
if match:
|
|
return None
|
|
|
|
match = re.match(r"model.mtp_linear_proj.(\d+)", name)
|
|
if match:
|
|
return None
|
|
|
|
return super().filter_tensors(item)
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
# process the experts separately
|
|
if name.find("mlp.experts") != -1:
|
|
n_experts = self.hparams["moe_num_experts"]
|
|
assert bid is not None
|
|
|
|
if self._experts is None:
|
|
self._experts = [{} for _ in range(self.block_count)]
|
|
|
|
self._experts[bid][name] = data_torch
|
|
|
|
if len(self._experts[bid]) >= n_experts * 3:
|
|
# merge the experts into a single 3d tensor
|
|
for w_name in ["gate_proj", "up_proj", "down_proj"]:
|
|
datas: list[Tensor] = []
|
|
|
|
for xid in range(n_experts):
|
|
ename_to_retrieve = f"model.layers.{bid}.mlp.experts.{xid}.{w_name}.weight"
|
|
datas.append(self._experts[bid][ename_to_retrieve])
|
|
del self._experts[bid][ename_to_retrieve]
|
|
|
|
data_torch = torch.stack(datas, dim=0)
|
|
merged_name = f"model.layers.{bid}.mlp.experts.{w_name}.weight"
|
|
yield from super().modify_tensors(data_torch, merged_name, bid)
|
|
else:
|
|
yield from ModelBase.modify_tensors(self, data_torch, name, bid)
|
|
|
|
def prepare_tensors(self):
|
|
super().prepare_tensors()
|
|
|
|
if self._experts is not None:
|
|
# flatten `list[dict[str, Tensor]]` into `list[str]`
|
|
experts = [k for d in self._experts for k in d.keys()]
|
|
if len(experts) > 0:
|
|
raise ValueError(f"Unprocessed experts: {experts}")
|
|
|
|
|
|
@ModelBase.register("PaddleOCRVLForConditionalGeneration")
|
|
class PaddleOCRModel(Ernie4_5Model):
|
|
model_arch = gguf.MODEL_ARCH.PADDLEOCR
|
|
|
|
|
|
@ModelBase.register("PaddleOCRVisionModel")
|
|
class PaddleOCRVisionModel(MmprojModel):
|
|
# PaddleOCR-VL uses a modified version of Siglip
|
|
min_pixels: int = 0
|
|
max_pixels: int = 0
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
assert self.hparams_vision is not None
|
|
self.min_pixels = self.preprocessor_config["min_pixels"]
|
|
self.max_pixels = self.preprocessor_config["max_pixels"]
|
|
self.hparams_vision["image_size"] = int(math.sqrt(self.max_pixels))
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
assert self.hparams_vision is not None
|
|
hparams = self.hparams_vision
|
|
self.gguf_writer.add_clip_projector_type(gguf.VisionProjectorType.PADDLEOCR)
|
|
self.gguf_writer.add_vision_max_pixels(self.max_pixels)
|
|
self.gguf_writer.add_vision_min_pixels(self.min_pixels)
|
|
self.gguf_writer.add_vision_use_gelu(True)
|
|
self.gguf_writer.add_vision_attention_layernorm_eps(hparams.get("rms_norm_eps", 1e-6))
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
if "vision_model" not in name and "mlp_AR" not in name:
|
|
return None
|
|
name = name.replace("visual.", "model.")
|
|
if "packing_position_embedding" in name:
|
|
# unused
|
|
return None
|
|
if "vision_model.head" in name:
|
|
# we don't yet support image embeddings for this model
|
|
return None
|
|
|
|
return super().filter_tensors((name, gen))
|