mirror of
https://github.com/LostRuins/koboldcpp.git
synced 2026-05-17 04:09:19 +00:00
* move conversion code to a dedicated conversion directory and split the files akin to the src/models architecture --------- Co-authored-by: Sigbjørn Skjæret <sigbjorn.skjaeret@scala.com>
312 lines
13 KiB
Python
312 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import math
|
|
|
|
from typing import Callable, Iterable, TYPE_CHECKING
|
|
|
|
import torch
|
|
|
|
if TYPE_CHECKING:
|
|
from torch import Tensor
|
|
|
|
from .base import ModelBase, TextModel, gguf
|
|
|
|
|
|
@ModelBase.register(
|
|
"LLaMAForCausalLM",
|
|
"LlamaForCausalLM",
|
|
"MistralForCausalLM",
|
|
"MixtralForCausalLM",
|
|
"VLlama3ForCausalLM",
|
|
"LlavaForConditionalGeneration",
|
|
"VoxtralForConditionalGeneration",
|
|
"IQuestCoderForCausalLM",
|
|
"LlamaModel")
|
|
class LlamaModel(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.LLAMA
|
|
undo_permute = True
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
# fix for SmolVLM2, missing `num_attention_heads` in config.json
|
|
if self.hf_arch == "VLlama3ForCausalLM":
|
|
self.hparams["num_attention_heads"] = self.hparams.get("num_attention_heads", 32)
|
|
# Mistral consolidated format has no config.json; origin_hf_arch is HF-only.
|
|
if self.is_mistral_format:
|
|
self.origin_hf_arch = None
|
|
else:
|
|
hparams = ModelBase.load_hparams(self.dir_model, is_mistral_format=False)
|
|
self.origin_hf_arch = hparams.get('architectures', [None])[0]
|
|
|
|
def set_vocab(self):
|
|
if self.origin_hf_arch == "GlmasrModel":
|
|
return self._set_vocab_glmedge()
|
|
|
|
if self.is_mistral_format:
|
|
return self._set_vocab_mistral()
|
|
|
|
path_tekken_json = self.dir_model / "tekken.json"
|
|
path_tokenizer_json = self.dir_model / "tokenizer.json"
|
|
if path_tekken_json.is_file() and not path_tokenizer_json.is_file():
|
|
self._set_vocab_mistral()
|
|
|
|
try:
|
|
self._set_vocab_sentencepiece()
|
|
except FileNotFoundError:
|
|
try:
|
|
self._set_vocab_llama_hf()
|
|
except (FileNotFoundError, TypeError):
|
|
# Llama 3
|
|
self._set_vocab_gpt2()
|
|
|
|
# Apply to CodeLlama only (and ignore for Llama 3 with a vocab size of 128256)
|
|
if self.hparams.get("vocab_size", 32000) == 32016:
|
|
special_vocab = gguf.SpecialVocab(
|
|
self.dir_model, load_merges=False,
|
|
special_token_types = ['prefix', 'suffix', 'middle', 'eot']
|
|
)
|
|
special_vocab._set_special_token("prefix", 32007)
|
|
special_vocab._set_special_token("suffix", 32008)
|
|
special_vocab._set_special_token("middle", 32009)
|
|
special_vocab._set_special_token("eot", 32010)
|
|
special_vocab.add_to_gguf(self.gguf_writer)
|
|
|
|
tokenizer_config_file = self.dir_model / 'tokenizer_config.json'
|
|
if tokenizer_config_file.is_file():
|
|
with open(tokenizer_config_file, "r", encoding="utf-8") as f:
|
|
tokenizer_config_json = json.load(f)
|
|
if "add_prefix_space" in tokenizer_config_json:
|
|
self.gguf_writer.add_add_space_prefix(tokenizer_config_json["add_prefix_space"])
|
|
|
|
# Apply to granite small models only
|
|
if self.hparams.get("vocab_size", 32000) == 49152:
|
|
self.gguf_writer.add_add_bos_token(False)
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
hparams = self.hparams
|
|
|
|
if not self.is_mistral_format:
|
|
self.gguf_writer.add_vocab_size(hparams["vocab_size"])
|
|
|
|
if (rope_dim := hparams.get("head_dim")) is None:
|
|
rope_dim = hparams["hidden_size"] // hparams["num_attention_heads"]
|
|
self.gguf_writer.add_rope_dimension_count(rope_dim)
|
|
|
|
@staticmethod
|
|
def permute(weights: Tensor, n_head: int, n_head_kv: int | None):
|
|
if n_head_kv is not None and n_head != n_head_kv:
|
|
n_head = n_head_kv
|
|
return (weights.reshape(n_head, 2, weights.shape[0] // n_head // 2, *weights.shape[1:])
|
|
.swapaxes(1, 2)
|
|
.reshape(weights.shape))
|
|
|
|
def _repack_nvfp4(self, name: str, weight: Tensor, scale: Tensor, scale2: Tensor, input_scale: Tensor):
|
|
# Mirror the BF16 Q/K RoPE permutation site in modify_tensors; the NVFP4 path bypasses it.
|
|
if self.undo_permute:
|
|
n_head = self.find_hparam(["n_heads", "num_attention_heads"], optional=True)
|
|
n_kv_head = self.find_hparam(["n_kv_heads", "num_key_value_heads"], optional=True)
|
|
if n_head is not None:
|
|
if name.endswith("q_proj.weight"):
|
|
weight = LlamaModel.permute(weight, n_head, n_head)
|
|
scale = LlamaModel.permute(scale, n_head, n_head)
|
|
elif name.endswith("k_proj.weight"):
|
|
weight = LlamaModel.permute(weight, n_head, n_kv_head)
|
|
scale = LlamaModel.permute(scale, n_head, n_kv_head)
|
|
super()._repack_nvfp4(name, weight, scale, scale2, input_scale)
|
|
|
|
_experts: list[dict[str, Tensor]] | None = None
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
if "text_model." in name:
|
|
name = name.replace("text_model.", "") # for SmolVLM
|
|
|
|
return super().filter_tensors((name, gen))
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
n_head = self.find_hparam(["n_heads", "num_attention_heads"])
|
|
n_kv_head = self.find_hparam(["n_kv_heads", "num_key_value_heads"])
|
|
|
|
if self.hf_arch == "LlamaModel":
|
|
name = "model." + name
|
|
|
|
if self.undo_permute:
|
|
if name.endswith(("q_proj.weight", "q_proj.bias")):
|
|
data_torch = LlamaModel.permute(data_torch, n_head, n_head)
|
|
if name.endswith(("k_proj.weight", "k_proj.bias")):
|
|
data_torch = LlamaModel.permute(data_torch, n_head, n_kv_head)
|
|
|
|
# process the experts separately
|
|
if name.find("block_sparse_moe.experts") != -1:
|
|
n_experts = self.hparams["num_local_experts"]
|
|
|
|
assert bid is not None
|
|
|
|
if self._experts is None:
|
|
self._experts = [{} for _ in range(self.block_count)]
|
|
|
|
self._experts[bid][name] = data_torch
|
|
|
|
if len(self._experts[bid]) >= n_experts * 3:
|
|
# merge the experts into a single 3d tensor
|
|
for wid in ["w1", "w2", "w3"]:
|
|
datas: list[Tensor] = []
|
|
|
|
for xid in range(n_experts):
|
|
ename = f"model.layers.{bid}.block_sparse_moe.experts.{xid}.{wid}.weight"
|
|
datas.append(self._experts[bid][ename])
|
|
del self._experts[bid][ename]
|
|
|
|
data_torch = torch.stack(datas, dim=0)
|
|
|
|
merged_name = f"layers.{bid}.feed_forward.experts.{wid}.weight"
|
|
|
|
yield from super().modify_tensors(data_torch, merged_name, bid)
|
|
return
|
|
else:
|
|
return
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
|
|
if rope_params := self.rope_parameters.get("full_attention", self.rope_parameters):
|
|
if rope_params.get("rope_type", '').lower() == "llama3":
|
|
base = rope_params.get("rope_theta", 10000.0)
|
|
if (dim := self.hparams.get("head_dim")) is None:
|
|
dim = self.hparams["hidden_size"] // self.hparams["num_attention_heads"]
|
|
freqs = 1.0 / (base ** (torch.arange(0, dim, 2, dtype=torch.float32) / dim))
|
|
|
|
factor = rope_params.get("factor", 8.0)
|
|
low_freq_factor = rope_params.get("low_freq_factor", 1.0)
|
|
high_freq_factor = rope_params.get("high_freq_factor", 4.0)
|
|
old_context_len = self.hparams.get("original_max_position_embeddings", 8192)
|
|
|
|
low_freq_wavelen = old_context_len / low_freq_factor
|
|
high_freq_wavelen = old_context_len / high_freq_factor
|
|
# assert low_freq_wavelen != high_freq_wavelen # Errors for Llama4
|
|
|
|
rope_factors = []
|
|
for freq in freqs:
|
|
wavelen = 2 * math.pi / freq
|
|
if wavelen < high_freq_wavelen:
|
|
rope_factors.append(1)
|
|
elif wavelen > low_freq_wavelen:
|
|
rope_factors.append(factor)
|
|
else:
|
|
smooth = (old_context_len / wavelen - low_freq_factor) / (high_freq_factor - low_freq_factor)
|
|
rope_factors.append(1 / ((1 - smooth) / factor + smooth))
|
|
|
|
yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FREQS), torch.tensor(rope_factors, dtype=torch.float32))
|
|
|
|
def prepare_tensors(self):
|
|
super().prepare_tensors()
|
|
|
|
if self._experts is not None:
|
|
# flatten `list[dict[str, Tensor]]` into `list[str]`
|
|
experts = [k for d in self._experts for k in d.keys()]
|
|
if len(experts) > 0:
|
|
raise ValueError(f"Unprocessed experts: {experts}")
|
|
|
|
|
|
@ModelBase.register("ArceeForCausalLM")
|
|
class ArceeModel(LlamaModel):
|
|
model_arch = gguf.MODEL_ARCH.ARCEE
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
self._try_set_pooling_type()
|
|
|
|
|
|
@ModelBase.register(
|
|
"Llama4ForConditionalGeneration",
|
|
"Llama4ForCausalLM",
|
|
)
|
|
class Llama4Model(LlamaModel):
|
|
model_arch = gguf.MODEL_ARCH.LLAMA4
|
|
undo_permute = False
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
# IMPORTANT: the normal "intermediate_size" is renamed to "intermediate_size_mlp", we need to undo this
|
|
self.hparams["intermediate_size_moe"] = self.hparams["intermediate_size"]
|
|
self.hparams["intermediate_size"] = self.hparams["intermediate_size_mlp"]
|
|
|
|
def set_vocab(self):
|
|
self._set_vocab_gpt2()
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
self.gguf_writer.add_interleave_moe_layer_step(self.hparams["interleave_moe_layer_step"])
|
|
self.gguf_writer.add_expert_feed_forward_length(self.hparams["intermediate_size_moe"])
|
|
if "layer_types" in self.hparams:
|
|
if all(lt == "full_attention" for lt in self.hparams["layer_types"]):
|
|
# all layers are full attention (for MobileLLM), disable swa
|
|
self.gguf_writer.add_sliding_window(0)
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None):
|
|
# split the gate_up into gate and up
|
|
if "gate_up_proj" in name:
|
|
name_up = name.replace("gate_up_proj", "up_proj.weight")
|
|
name_gate = name.replace("gate_up_proj", "gate_proj.weight")
|
|
dim_half = data_torch.shape[-1] // 2
|
|
gate_proj_weight, up_proj_weight = data_torch.transpose(-1, -2).split(dim_half, dim=-2)
|
|
yield from super().modify_tensors(gate_proj_weight, name_gate, bid)
|
|
yield from super().modify_tensors(up_proj_weight, name_up, bid)
|
|
return
|
|
|
|
if name.endswith("down_proj"):
|
|
name += ".weight"
|
|
data_torch = data_torch.transpose(-1, -2)
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("LlamaBidirectionalModel")
|
|
class LlamaEmbedNemotronModel(LlamaModel):
|
|
model_arch = gguf.MODEL_ARCH.LLAMA_EMBED
|
|
|
|
|
|
@ModelBase.register("SmolLM3ForCausalLM")
|
|
class SmolLM3Model(LlamaModel):
|
|
model_arch = gguf.MODEL_ARCH.SMOLLM3
|
|
|
|
|
|
@ModelBase.register("ApertusForCausalLM")
|
|
class ApertusModel(LlamaModel):
|
|
model_arch = gguf.MODEL_ARCH.APERTUS
|
|
undo_permute = False
|
|
|
|
_alpha_n = {}
|
|
_alpha_p = {}
|
|
_beta = {}
|
|
_eps = {}
|
|
|
|
def modify_tensors(self, data_torch, name, bid):
|
|
# Handle xIELU activation parameters
|
|
n_layers = self.hparams["num_hidden_layers"]
|
|
if name.endswith(".act_fn.alpha_n"):
|
|
self._alpha_n[bid] = data_torch.to("cpu").float().item()
|
|
if (len(self._alpha_n) == n_layers):
|
|
self.gguf_writer.add_xielu_alpha_n([self._alpha_n[k] for k in sorted(self._alpha_n)])
|
|
return
|
|
if name.endswith(".act_fn.alpha_p"):
|
|
self._alpha_p[bid] = data_torch.to("cpu").float().item()
|
|
if (len(self._alpha_p) == n_layers):
|
|
self.gguf_writer.add_xielu_alpha_p([self._alpha_p[k] for k in sorted(self._alpha_p)])
|
|
return
|
|
if name.endswith(".act_fn.beta"):
|
|
self._beta[bid] = data_torch.to("cpu").float().item()
|
|
if (len(self._beta) == n_layers):
|
|
self.gguf_writer.add_xielu_beta([self._beta[k] for k in sorted(self._beta)])
|
|
return
|
|
if name.endswith(".act_fn.eps"):
|
|
self._eps[bid] = data_torch.to("cpu").float().item()
|
|
if (len(self._eps) == n_layers):
|
|
self.gguf_writer.add_xielu_eps([self._eps[k] for k in sorted(self._eps)])
|
|
return
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|