mirror of
https://github.com/LostRuins/koboldcpp.git
synced 2026-05-17 04:09:19 +00:00
* move conversion code to a dedicated conversion directory and split the files akin to the src/models architecture --------- Co-authored-by: Sigbjørn Skjæret <sigbjorn.skjaeret@scala.com>
259 lines
12 KiB
Python
259 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Iterable, TYPE_CHECKING
|
|
|
|
import torch
|
|
|
|
if TYPE_CHECKING:
|
|
from torch import Tensor
|
|
|
|
from .base import ModelBase, TextModel, gguf, logger
|
|
|
|
from .deepseek import DeepseekV2Model
|
|
|
|
|
|
@ModelBase.register("Glm4ForCausalLM", "Glm4vForConditionalGeneration")
|
|
class Glm4Model(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.GLM4
|
|
use_mrope = False
|
|
partial_rotary_factor = 0.5
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.partial_rotary_factor = self.rope_parameters.get("partial_rotary_factor", 0.5)
|
|
if "mrope_section" in self.rope_parameters:
|
|
self.use_mrope = True
|
|
logger.info("Q/K weight will need to be permuted for M-RoPE")
|
|
|
|
def set_vocab(self):
|
|
from transformers import AutoTokenizer
|
|
tokenizer = AutoTokenizer.from_pretrained(self.dir_model, trust_remote_code=True)
|
|
special_vocab = gguf.SpecialVocab(self.dir_model, load_merges=True)
|
|
tokens, toktypes, tokpre = self.get_vocab_base()
|
|
self.gguf_writer.add_tokenizer_model("gpt2")
|
|
self.gguf_writer.add_tokenizer_pre(tokpre)
|
|
self.gguf_writer.add_token_list(tokens)
|
|
self.gguf_writer.add_token_types(toktypes)
|
|
special_vocab = gguf.SpecialVocab(self.dir_model, load_merges=True)
|
|
special_vocab._set_special_token("eos", tokenizer.get_added_vocab()["<|endoftext|>"]) # ty: ignore[unresolved-attribute]
|
|
special_vocab._set_special_token("eot", tokenizer.get_added_vocab()["<|user|>"]) # ty: ignore[unresolved-attribute]
|
|
special_vocab._set_special_token("unk", tokenizer.get_added_vocab()["<|endoftext|>"]) # ty: ignore[unresolved-attribute]
|
|
special_vocab._set_special_token("bos", tokenizer.get_added_vocab()["<|endoftext|>"]) # ty: ignore[unresolved-attribute]
|
|
special_vocab.add_to_gguf(self.gguf_writer)
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
if (rope_dim := self.hparams.get("head_dim")) is None:
|
|
rope_dim = self.hparams["hidden_size"] // self.hparams["num_attention_heads"]
|
|
self.gguf_writer.add_rope_dimension_count(int(rope_dim * self.partial_rotary_factor))
|
|
|
|
@staticmethod
|
|
def normal_to_neox(weights: Tensor, n_head: int, n_head_kv: int, head_dim: int, partial_rotary_factor: float) -> Tensor:
|
|
orig_shape = weights.shape
|
|
if len(orig_shape) == 1:
|
|
weights = weights.unsqueeze(1) # [out_dim, 1]
|
|
if len(weights.shape) != 2:
|
|
raise ValueError("Only 1D and 2D tensors are supported.")
|
|
n_effective_heads = weights.shape[0] // head_dim
|
|
if n_head_kv is not None and n_effective_heads != n_head:
|
|
if n_effective_heads != n_head_kv:
|
|
raise AssertionError(f"Mismatch in effective heads: computed {n_effective_heads}, expected {n_head} or {n_head_kv}")
|
|
rotary_dim = int(head_dim * partial_rotary_factor)
|
|
if rotary_dim % 2 != 0:
|
|
raise ValueError("rotary_dim must be even.")
|
|
reshaped = weights.reshape(n_effective_heads, head_dim, -1)
|
|
rot_part = reshaped[:, :rotary_dim, :]
|
|
non_rot_part = reshaped[:, rotary_dim:, :]
|
|
permuted_rot = torch.cat((rot_part[:, ::2, :], rot_part[:, 1::2, :]), dim=1)
|
|
combined = torch.cat((permuted_rot, non_rot_part), dim=1)
|
|
result = combined.reshape(weights.shape)
|
|
return result if len(orig_shape) != 1 else result.squeeze(1)
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
if self.use_mrope:
|
|
n_head = self.hparams["num_attention_heads"]
|
|
n_kv_head = self.hparams["num_key_value_heads"]
|
|
n_embd = self.hparams["hidden_size"]
|
|
head_dim = self.hparams.get("head_dim", n_embd // n_head)
|
|
# because llama.cpp M-RoPE kernel only supports Neox ordering, we have to permute the weights here
|
|
if name.endswith(("q_proj.weight", "q_proj.bias")):
|
|
data_torch = Glm4Model.normal_to_neox(data_torch, n_head, n_head, head_dim, self.partial_rotary_factor)
|
|
if name.endswith(("k_proj.weight", "k_proj.bias")):
|
|
data_torch = Glm4Model.normal_to_neox(data_torch, n_head, n_kv_head, head_dim, self.partial_rotary_factor)
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("GlmOcrForConditionalGeneration")
|
|
class GlmOCRModel(Glm4Model):
|
|
model_arch = gguf.MODEL_ARCH.GLM4
|
|
use_mrope = False
|
|
partial_rotary_factor = 0.5
|
|
|
|
# Note: GLM-OCR is the same as GLM4, but with an extra NextN/MTP prediction layer
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
# GLM-OCR has num_hidden_layers + 1 actual layers (including NextN layer)
|
|
self.block_count = self.hparams["num_hidden_layers"] + self.hparams.get("num_nextn_predict_layers", 0)
|
|
self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count)
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
# NextN/MTP prediction layers
|
|
if (num_nextn_predict_layers := self.hparams.get("num_nextn_predict_layers")) is not None:
|
|
self.gguf_writer.add_nextn_predict_layers(num_nextn_predict_layers)
|
|
|
|
|
|
@ModelBase.register("Glm4MoeForCausalLM", "Glm4vMoeForConditionalGeneration")
|
|
class Glm4MoeModel(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.GLM4_MOE
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
# GLM4_MOE has num_hidden_layers + 1 actual layers (including NextN layer)
|
|
self.block_count = self.hparams["num_hidden_layers"] + self.hparams.get("num_nextn_predict_layers", 0)
|
|
self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count)
|
|
|
|
def set_vocab(self):
|
|
return self._set_vocab_glm()
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
if (rope_dim := self.hparams.get("head_dim")) is None:
|
|
rope_dim = (
|
|
self.hparams["hidden_size"] // self.hparams["num_attention_heads"]
|
|
)
|
|
self.gguf_writer.add_rope_dimension_count(
|
|
int(rope_dim * self.hparams.get("partial_rotary_factor", 0.5))
|
|
)
|
|
|
|
# MoE parameters - Use only routed expert count (shared experts handled separately)
|
|
if (n_routed_experts := self.hparams.get("n_routed_experts")) is not None:
|
|
self.gguf_writer.add_expert_count(n_routed_experts)
|
|
if (moe_intermediate_size := self.hparams.get("moe_intermediate_size")) is not None:
|
|
self.gguf_writer.add_expert_feed_forward_length(moe_intermediate_size)
|
|
if (n_shared_experts := self.hparams.get("n_shared_experts")) is not None:
|
|
self.gguf_writer.add_expert_shared_count(n_shared_experts)
|
|
if (first_k_dense_replace := self.hparams.get("first_k_dense_replace")) is not None:
|
|
self.gguf_writer.add_leading_dense_block_count(first_k_dense_replace)
|
|
|
|
# Expert gating function (sigmoid for GLM4_MOE)
|
|
self.gguf_writer.add_expert_gating_func(gguf.ExpertGatingFuncType.SIGMOID)
|
|
|
|
# Routed scaling factor
|
|
if (routed_scaling_factor := self.hparams.get("routed_scaling_factor")) is not None:
|
|
self.gguf_writer.add_expert_weights_scale(routed_scaling_factor)
|
|
|
|
# Normalise topk probabilities
|
|
if (norm_topk_prob := self.hparams.get("norm_topk_prob")) is not None:
|
|
self.gguf_writer.add_expert_weights_norm(norm_topk_prob)
|
|
|
|
# NextN/MTP prediction layers
|
|
if (num_nextn_predict_layers := self.hparams.get("num_nextn_predict_layers")) is not None:
|
|
self.gguf_writer.add_nextn_predict_layers(num_nextn_predict_layers)
|
|
|
|
_experts: list[dict[str, Tensor]] | None = None
|
|
|
|
# note: unlike GLM4V non-MoE, we don't need to permute Q/K here since GLM4V_MOE uses Neox ordering already
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
# Handle main token embedding (but not layer-specific NextN embeddings)
|
|
if name == "model.embed_tokens.weight" and ".layers." not in name:
|
|
yield from super().modify_tensors(data_torch, "token_embd.weight", bid)
|
|
return
|
|
|
|
# Handle routed experts
|
|
if name.find("mlp.experts") != -1:
|
|
n_experts = self.hparams["n_routed_experts"]
|
|
assert bid is not None
|
|
|
|
if self._experts is None:
|
|
self._experts = [{} for _ in range(self.block_count)]
|
|
|
|
self._experts[bid][name] = data_torch
|
|
|
|
if len(self._experts[bid]) >= n_experts * 3:
|
|
# merge the experts into a single 3d tensor
|
|
for w_name in ["down_proj", "gate_proj", "up_proj"]:
|
|
datas: list[Tensor] = []
|
|
|
|
for xid in range(n_experts):
|
|
ename = f"model.layers.{bid}.mlp.experts.{xid}.{w_name}.weight"
|
|
datas.append(self._experts[bid][ename])
|
|
del self._experts[bid][ename]
|
|
|
|
data_torch = torch.stack(datas, dim=0)
|
|
|
|
merged_name = f"model.layers.{bid}.mlp.experts.{w_name}.weight"
|
|
|
|
yield from super().modify_tensors(data_torch, merged_name, bid)
|
|
return
|
|
else:
|
|
return
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
def prepare_tensors(self):
|
|
super().prepare_tensors()
|
|
if self._experts is not None:
|
|
# flatten `list[dict[str, Tensor]]` into `list[str]`
|
|
experts = [k for d in self._experts for k in d.keys()]
|
|
if len(experts) > 0:
|
|
raise ValueError(f"Unprocessed experts: {experts}")
|
|
|
|
|
|
@ModelBase.register("Glm4MoeLiteForCausalLM")
|
|
class Glm4MoeLiteModel(DeepseekV2Model):
|
|
model_arch = gguf.MODEL_ARCH.DEEPSEEK2
|
|
|
|
def set_vocab(self):
|
|
return self._set_vocab_glm()
|
|
|
|
|
|
@ModelBase.register("GlmMoeDsaForCausalLM")
|
|
class GlmMoeDsaModel(DeepseekV2Model):
|
|
model_arch = gguf.MODEL_ARCH.GLM_DSA
|
|
skip_mtp = False
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.block_count = self.hparams["num_hidden_layers"] + self.hparams.get("num_nextn_predict_layers", 0)
|
|
self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count)
|
|
|
|
def set_vocab(self):
|
|
return self._set_vocab_glm()
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
|
|
rope_dim = self.hparams["qk_rope_head_dim"]
|
|
partial_rotary_factor = self.hparams.get("partial_rotary_factor", 1.0)
|
|
self.gguf_writer.add_rope_dimension_count(int(rope_dim * partial_rotary_factor))
|
|
|
|
# NextN/MTP prediction layers
|
|
if (num_nextn_predict_layers := self.hparams.get("num_nextn_predict_layers")) is not None:
|
|
self.gguf_writer.add_nextn_predict_layers(num_nextn_predict_layers)
|
|
|
|
# DSA indexer parameters
|
|
self.gguf_writer.add_indexer_head_count(self.hparams["index_n_heads"])
|
|
self.gguf_writer.add_indexer_key_length(self.hparams["index_head_dim"])
|
|
self.gguf_writer.add_indexer_top_k(self.hparams["index_topk"])
|
|
|
|
|
|
@ModelBase.register("SolarOpenForCausalLM")
|
|
class SolarOpenModel(Glm4MoeModel):
|
|
model_arch = gguf.MODEL_ARCH.GLM4_MOE
|
|
|
|
def set_vocab(self):
|
|
from transformers import AutoTokenizer
|
|
tokenizer = AutoTokenizer.from_pretrained(self.dir_model)
|
|
special_vocab = gguf.SpecialVocab(self.dir_model, load_merges=True)
|
|
tokens, toktypes, tokpre = self.get_vocab_base()
|
|
self.gguf_writer.add_tokenizer_model("gpt2")
|
|
self.gguf_writer.add_tokenizer_pre(tokpre)
|
|
self.gguf_writer.add_token_list(tokens)
|
|
self.gguf_writer.add_token_types(toktypes)
|
|
special_vocab._set_special_token("eos", tokenizer.get_added_vocab()["<|endoftext|>"]) # ty: ignore[unresolved-attribute]
|
|
special_vocab._set_special_token("eot", tokenizer.get_added_vocab()["<|endoftext|>"]) # ty: ignore[unresolved-attribute]
|
|
special_vocab._set_special_token("unk", tokenizer.get_added_vocab()["<unk>"]) # ty: ignore[unresolved-attribute]
|
|
special_vocab._set_special_token("bos", tokenizer.get_added_vocab()["<|startoftext|>"]) # ty: ignore[unresolved-attribute]
|
|
special_vocab.add_to_gguf(self.gguf_writer)
|