mirror of
https://github.com/LostRuins/koboldcpp.git
synced 2026-05-17 04:09:19 +00:00
* move conversion code to a dedicated conversion directory and split the files akin to the src/models architecture --------- Co-authored-by: Sigbjørn Skjæret <sigbjorn.skjaeret@scala.com>
210 lines
9.9 KiB
Python
210 lines
9.9 KiB
Python
from __future__ import annotations
|
|
|
|
import math
|
|
|
|
from pathlib import Path
|
|
from typing import Iterable, TYPE_CHECKING
|
|
|
|
import torch
|
|
|
|
if TYPE_CHECKING:
|
|
from torch import Tensor
|
|
|
|
from .base import ModelBase, TextModel, gguf
|
|
|
|
|
|
@ModelBase.register("ExaoneForCausalLM")
|
|
class ExaoneModel(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.EXAONE
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
hparams = self.hparams
|
|
|
|
assert (hparams["activation_function"] == "silu")
|
|
|
|
rotary_factor = self.find_hparam(["partial_rotary_factor", "rope_pct"], optional=True)
|
|
rotary_factor = rotary_factor if rotary_factor is not None else 1.0
|
|
self.gguf_writer.add_rope_dimension_count(int(rotary_factor * (hparams["hidden_size"] // hparams["num_attention_heads"])))
|
|
|
|
def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
|
|
if rope_params := self.rope_parameters.get("full_attention", self.rope_parameters):
|
|
if rope_params.get("rope_type", '').lower() == "llama3":
|
|
base = self.rope_parameters.get("rope_theta", 10000.0)
|
|
if (dim := self.hparams.get("head_dim")) is None:
|
|
dim = self.hparams["hidden_size"] // self.hparams["num_attention_heads"]
|
|
freqs = 1.0 / (base ** (torch.arange(0, dim, 2, dtype=torch.float32) / dim))
|
|
|
|
factor = rope_params.get("factor", 8.0)
|
|
low_freq_factor = rope_params.get("low_freq_factor", 1.0)
|
|
high_freq_factor = rope_params.get("high_freq_factor", 4.0)
|
|
old_context_len = self.hparams.get("original_max_position_embeddings", 8192)
|
|
|
|
low_freq_wavelen = old_context_len / low_freq_factor
|
|
high_freq_wavelen = old_context_len / high_freq_factor
|
|
assert low_freq_wavelen != high_freq_wavelen
|
|
|
|
rope_factors = []
|
|
for freq in freqs:
|
|
wavelen = 2 * math.pi / freq
|
|
if wavelen < high_freq_wavelen:
|
|
rope_factors.append(1)
|
|
elif wavelen > low_freq_wavelen:
|
|
rope_factors.append(factor)
|
|
else:
|
|
smooth = (old_context_len / wavelen - low_freq_factor) / (high_freq_factor - low_freq_factor)
|
|
rope_factors.append(1 / ((1 - smooth) / factor + smooth))
|
|
|
|
yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FREQS), torch.tensor(rope_factors, dtype=torch.float32))
|
|
|
|
|
|
@ModelBase.register("Exaone4ForCausalLM")
|
|
class Exaone4Model(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.EXAONE4
|
|
|
|
def set_vocab(self):
|
|
tokens, toktypes, tokpre = self.get_vocab_base()
|
|
self.gguf_writer.add_tokenizer_model("gpt2")
|
|
self.gguf_writer.add_tokenizer_pre(tokpre)
|
|
self.gguf_writer.add_token_list(tokens)
|
|
self.gguf_writer.add_token_types(toktypes)
|
|
|
|
special_vocab = gguf.SpecialVocab(self.dir_model, load_merges=True)
|
|
special_vocab.add_to_gguf(self.gguf_writer)
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
hparams = self.hparams
|
|
self.gguf_writer.add_vocab_size(hparams["vocab_size"])
|
|
|
|
if hparams.get("sliding_window") is not None:
|
|
self.gguf_writer.add_sliding_window(hparams["sliding_window"])
|
|
if "layer_types" in hparams:
|
|
self.gguf_writer.add_sliding_window_pattern([t == "sliding_attention" for t in hparams["layer_types"]])
|
|
elif "sliding_window_pattern" in hparams:
|
|
sliding_window_pattern = []
|
|
if isinstance(hparams["sliding_window_pattern"], str): # e.g. LLLG
|
|
for i in range(hparams["num_hidden_layers"]):
|
|
sliding_window_pattern.append(hparams["sliding_window_pattern"][i % len(hparams["sliding_window_pattern"])] == "L")
|
|
if isinstance(hparams["sliding_window_pattern"], int): # e.g. 4
|
|
for i in range(hparams["num_hidden_layers"]):
|
|
sliding_window_pattern.append((i + 1) % hparams["sliding_window_pattern"] != 0)
|
|
if len(sliding_window_pattern) == hparams["num_hidden_layers"]:
|
|
self.gguf_writer.add_sliding_window_pattern(sliding_window_pattern)
|
|
|
|
def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
|
|
if rope_params := self.rope_parameters.get("full_attention", self.rope_parameters):
|
|
if rope_params.get("rope_type", '').lower() == "llama3":
|
|
base = rope_params.get("rope_theta", 10_000.0)
|
|
if (dim := self.hparams.get("head_dim")) is None:
|
|
dim = self.hparams["hidden_size"] // self.hparams["num_attention_heads"]
|
|
freqs = 1.0 / (base ** (torch.arange(0, dim, 2, dtype=torch.float32) / dim))
|
|
|
|
factor = rope_params.get("factor", 16.0)
|
|
low_freq_factor = rope_params.get("low_freq_factor", 1.0)
|
|
high_freq_factor = rope_params.get("high_freq_factor", 4.0)
|
|
old_context_len = self.hparams.get("original_max_position_embeddings", 8192)
|
|
|
|
low_freq_wavelen = old_context_len / low_freq_factor
|
|
high_freq_wavelen = old_context_len / high_freq_factor
|
|
|
|
rope_factors = []
|
|
for freq in freqs:
|
|
wavelen = 2 * math.pi / freq
|
|
if wavelen < high_freq_wavelen:
|
|
rope_factors.append(1)
|
|
elif wavelen > low_freq_wavelen:
|
|
rope_factors.append(factor)
|
|
else:
|
|
smooth = (old_context_len / wavelen - low_freq_factor) / (high_freq_factor - low_freq_factor)
|
|
rope_factors.append(1 / ((1 - smooth) / factor + smooth))
|
|
|
|
yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FREQS), torch.tensor(rope_factors, dtype=torch.float32))
|
|
|
|
|
|
@ModelBase.register("ExaoneMoEForCausalLM")
|
|
class ExaoneMoEModel(Exaone4Model):
|
|
model_arch = gguf.MODEL_ARCH.EXAONE_MOE
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.block_count = self.hparams["num_hidden_layers"] + self.hparams.get("num_nextn_predict_layers", 0)
|
|
self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count)
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
moe_intermediate_size = self.hparams["moe_intermediate_size"]
|
|
num_shared_experts = self.hparams["num_shared_experts"]
|
|
self.gguf_writer.add_expert_feed_forward_length(moe_intermediate_size)
|
|
self.gguf_writer.add_expert_shared_count(num_shared_experts)
|
|
self.gguf_writer.add_expert_shared_feed_forward_length(moe_intermediate_size * num_shared_experts)
|
|
self.gguf_writer.add_expert_weights_scale(self.hparams["routed_scaling_factor"])
|
|
self.gguf_writer.add_expert_weights_norm(self.hparams["norm_topk_prob"])
|
|
n_dense_layer = self.hparams.get("first_k_dense_replace", self.hparams.get("first_last_k_dense_replace", 0))
|
|
self.gguf_writer.add_leading_dense_block_count(n_dense_layer)
|
|
self.gguf_writer.add_nextn_predict_layers(self.hparams.get("num_nextn_predict_layers", 0))
|
|
|
|
self.gguf_writer.add_rope_scaling_type(gguf.RopeScalingType.NONE)
|
|
|
|
_experts: list[dict[str, Tensor]] | None = None
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
if name.startswith("mtp."):
|
|
if name.find("layers.") != -1:
|
|
# `mtp.layers.0.[module_name]` format
|
|
name = name.replace(f"mtp.layers.{bid}", f"model.layers.{bid + self.hparams['num_hidden_layers']}")
|
|
else:
|
|
# mtp fc/norm weights
|
|
remapper = {
|
|
"mtp.fc": "model.layers.{bid}.eh_proj",
|
|
"mtp.pre_fc_norm_embedding": "model.layers.{bid}.enorm",
|
|
"mtp.pre_fc_norm_hidden": "model.layers.{bid}.hnorm",
|
|
"mtp.norm": "model.layers.{bid}.shared_head.norm",
|
|
}
|
|
_n = Path(name)
|
|
new_name = remapper[_n.stem] + _n.suffix
|
|
|
|
# set shared weights for all NextN/MTP layers
|
|
for bid in range(self.hparams['num_hidden_layers'], self.block_count):
|
|
yield from super().modify_tensors(data_torch, new_name.format(bid=bid), bid)
|
|
return
|
|
|
|
if name.find("mlp.experts") != -1:
|
|
n_experts = self.find_hparam(["num_local_experts", "num_experts"])
|
|
assert bid is not None
|
|
|
|
if self._experts is None:
|
|
self._experts = [{} for _ in range(self.block_count)]
|
|
|
|
self._experts[bid][name] = data_torch
|
|
|
|
if len(self._experts[bid]) >= n_experts * 3:
|
|
# merge the experts into a single 3d tensor
|
|
for w_name in ["down_proj", "gate_proj", "up_proj"]:
|
|
datas: list[Tensor] = []
|
|
|
|
for xid in range(n_experts):
|
|
ename = f"model.layers.{bid}.mlp.experts.{xid}.{w_name}.weight"
|
|
datas.append(self._experts[bid][ename])
|
|
del self._experts[bid][ename]
|
|
|
|
data_torch = torch.stack(datas, dim=0)
|
|
|
|
merged_name = f"model.layers.{bid}.mlp.experts.{w_name}.weight"
|
|
|
|
new_name = self.map_tensor_name(merged_name)
|
|
|
|
yield from super().modify_tensors(data_torch, new_name, bid)
|
|
return
|
|
else:
|
|
return
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
def prepare_tensors(self):
|
|
super().prepare_tensors()
|
|
if self._experts is not None:
|
|
# flatten `list[dict[str, Tensor]]` into `list[str]`
|
|
experts = [k for d in self._experts for k in d.keys()]
|
|
if len(experts) > 0:
|
|
raise ValueError(f"Unprocessed experts: {experts}")
|