mirror of
https://github.com/LostRuins/koboldcpp.git
synced 2026-05-17 04:09:19 +00:00
* move conversion code to a dedicated conversion directory and split the files akin to the src/models architecture --------- Co-authored-by: Sigbjørn Skjæret <sigbjorn.skjaeret@scala.com>
328 lines
15 KiB
Python
328 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Callable, Iterable, TYPE_CHECKING
|
|
|
|
import torch
|
|
|
|
if TYPE_CHECKING:
|
|
from torch import Tensor
|
|
|
|
from .base import MmprojModel, ModelBase, gguf, logger
|
|
|
|
from .llama import LlamaModel
|
|
from .mamba import Mamba2Model
|
|
|
|
|
|
@ModelBase.register("GraniteForCausalLM", "GraniteSpeechForConditionalGeneration")
|
|
class GraniteModel(LlamaModel):
|
|
"""Conversion for IBM's GraniteForCausalLM"""
|
|
model_arch = gguf.MODEL_ARCH.GRANITE
|
|
|
|
def set_gguf_parameters(self):
|
|
"""Granite uses standard llama parameters with the following differences:
|
|
|
|
- No head_dim support
|
|
- New multiplier params:
|
|
- attention_scale
|
|
- embedding_scale
|
|
- residual_scale
|
|
- logits_scaling
|
|
"""
|
|
if head_dim := self.hparams.pop("head_dim", None):
|
|
logger.warning("Ignoring head_dim (%s) from config for Granite", head_dim)
|
|
super().set_gguf_parameters()
|
|
# NOTE: Convert _multiplier params to _scale params for naming
|
|
# consistency
|
|
if attention_scale := self.hparams.get("attention_multiplier"):
|
|
self.gguf_writer.add_attention_scale(attention_scale)
|
|
logger.info("gguf: (granite) attention_scale = %s", attention_scale)
|
|
if embedding_scale := self.hparams.get("embedding_multiplier"):
|
|
self.gguf_writer.add_embedding_scale(embedding_scale)
|
|
logger.info("gguf: (granite) embedding_scale = %s", embedding_scale)
|
|
if residual_scale := self.hparams.get("residual_multiplier"):
|
|
self.gguf_writer.add_residual_scale(residual_scale)
|
|
logger.info("gguf: (granite) residual_scale = %s", residual_scale)
|
|
if logits_scale := self.hparams.get("logits_scaling"):
|
|
self.gguf_writer.add_logit_scale(logits_scale)
|
|
logger.info("gguf: (granite) logits_scale = %s", logits_scale)
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
if name.startswith("encoder."):
|
|
return None
|
|
return super().filter_tensors(item)
|
|
|
|
|
|
@ModelBase.register("GraniteMoeForCausalLM", "GraniteMoeSharedForCausalLM")
|
|
class GraniteMoeModel(GraniteModel):
|
|
"""Conversion for IBM's GraniteMoeForCausalLM"""
|
|
model_arch = gguf.MODEL_ARCH.GRANITE_MOE
|
|
|
|
def set_gguf_parameters(self):
|
|
"""GraniteMoeShared uses GraniteMoe parameters plus the following:
|
|
- shared_intermediate_size
|
|
"""
|
|
super().set_gguf_parameters()
|
|
if shared_feed_forward_length := self.hparams.get("shared_intermediate_size"):
|
|
self.gguf_writer.add_expert_shared_feed_forward_length(shared_feed_forward_length)
|
|
logger.info("gguf: (granitemoeshared) shared_feed_forward_length = %s", shared_feed_forward_length)
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
"""In modeling_granitemoe, the JetMoe implementation of parallel experts
|
|
is used. This essentially merges w1 and w3 into a single tensor with 2x
|
|
the hidden size that is then split during forward. To keep compatibility
|
|
with existing mixtral support, we pull them apart here.
|
|
"""
|
|
|
|
if name.endswith("block_sparse_moe.input_linear.weight"):
|
|
ffn_dim = self.hparams["intermediate_size"]
|
|
assert data_torch.shape[-2] == 2 * ffn_dim, "Merged FFN tensor size must be 2 * intermediate_size"
|
|
gate, up = data_torch.split(ffn_dim, dim=-2)
|
|
yield from ModelBase.modify_tensors(self, gate, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_GATE_EXP, bid), bid)
|
|
yield from ModelBase.modify_tensors(self, up, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_UP_EXP, bid), bid)
|
|
return
|
|
|
|
has_experts = bool(self.hparams.get('num_local_experts'))
|
|
|
|
if name.endswith("shared_mlp.input_linear.weight"):
|
|
ffn_dim = self.hparams["shared_intermediate_size"]
|
|
assert data_torch.shape[-2] == 2 * ffn_dim, "Merged FFN tensor size must be 2 * shared_intermediate_size"
|
|
gate, up = data_torch.split(ffn_dim, dim=-2)
|
|
if has_experts:
|
|
yield from ModelBase.modify_tensors(self, gate,self.format_tensor_name(gguf.MODEL_TENSOR.FFN_GATE_SHEXP, bid), bid)
|
|
yield from ModelBase.modify_tensors(self, up, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_UP_SHEXP, bid), bid)
|
|
return
|
|
yield from ModelBase.modify_tensors(self, gate, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_GATE, bid), bid)
|
|
yield from ModelBase.modify_tensors(self, up, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_UP, bid), bid)
|
|
return
|
|
|
|
if not has_experts and name.endswith("shared_mlp.output_linear.weight"):
|
|
yield from ModelBase.modify_tensors(self, data_torch, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_DOWN, bid), bid)
|
|
return
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("GraniteMoeHybridForCausalLM", "BambaForCausalLM")
|
|
class GraniteHybridModel(Mamba2Model, GraniteMoeModel):
|
|
"""GraniteHybrid is a hybrid SSM + Attention model that uses Mamba2 SSM
|
|
layers and optionally uses MoE w/ a shared expert"""
|
|
model_arch = gguf.MODEL_ARCH.GRANITE_HYBRID
|
|
undo_permute = True
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
# Hybrid mamba models use a prefix for the mamba-specific params.
|
|
# TODO: Extend this if the prefix(es) need to be configurable
|
|
self.hparam_prefixes = ["mamba"]
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
# Lists of which layers use ssm vs attention
|
|
self._attn_layers = self.get_attn_layers()
|
|
self._ssm_layers = [
|
|
i for i in range(self.block_count)
|
|
if i not in self._attn_layers
|
|
]
|
|
|
|
# There are some models in this family that are non-hybrid, but keep the
|
|
# same parent class by setting all layers to "attention." If this is the
|
|
# case, the model architecture needs to be updated to a standard
|
|
# "granite" or "granitemoe" model
|
|
if not self._ssm_layers:
|
|
has_experts = self.find_hparam(["num_experts_per_tok", "num_experts_per_token"], optional=True)
|
|
new_arch = (
|
|
gguf.MODEL_ARCH.GRANITE_MOE
|
|
if has_experts else
|
|
gguf.MODEL_ARCH.GRANITE
|
|
)
|
|
self.model_arch = new_arch
|
|
self.gguf_writer.arch = gguf.MODEL_ARCH_NAMES[new_arch]
|
|
self.gguf_writer.add_architecture()
|
|
|
|
# n_group and d_inner are used during reshape_tensors for mamba2
|
|
# NOTE: Explicitly include hparam prefix prefix for d_model to
|
|
# disambiguate with top-level head_dim
|
|
# NOTE 2: If needed for future models, this can be isolated in a method
|
|
# to separate the prefix setting and the keys used
|
|
self.d_model = self.find_hparam([f"{self.hparam_prefixes[0]}_head_dim", "hidden_size", "d_model"])
|
|
self.n_group = self.find_hparam(["n_groups", "num_groups"])
|
|
self.d_inner = self.find_hparam(["expand", "num_heads"]) * self.d_model
|
|
|
|
def get_attn_layers(self):
|
|
# Explicit list of layer type names
|
|
if layer_types := self.hparams.get("layer_types"):
|
|
return [
|
|
i for i, typ in enumerate(layer_types)
|
|
if typ == "attention"
|
|
]
|
|
|
|
# Layer types indicated by index or period
|
|
attn_layers = self.hparams.get("attn_layer_indices", [])
|
|
if not attn_layers:
|
|
attn_period = self.hparams.get("attn_layer_period")
|
|
assert attn_period, "Didn't find attn_layer_indices or attn_layer_period"
|
|
attn_offset = self.hparams.get("attn_layer_offset")
|
|
assert attn_offset is not None, "No attention layer offset set with attn_layer_period"
|
|
attn_layers = [
|
|
i for i in range(self.block_count)
|
|
if i % attn_period == attn_offset
|
|
]
|
|
return attn_layers
|
|
|
|
def find_hparam(self, keys: Iterable[str], *args, **kwargs) -> Any:
|
|
prefixed = []
|
|
for pfx in self.hparam_prefixes:
|
|
prefixed.extend(
|
|
"_".join([pfx, k])
|
|
for k in keys
|
|
)
|
|
keys = list(keys) + prefixed
|
|
return Mamba2Model.find_hparam(self, keys, *args, **kwargs)
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
if (
|
|
name.endswith("block_sparse_moe.input_linear.weight")
|
|
or "shared_mlp" in name
|
|
):
|
|
yield from GraniteMoeModel.modify_tensors(self, data_torch, name, bid)
|
|
return
|
|
|
|
# Determine whether this is a mamba layer or an attention layer
|
|
if bid in self._ssm_layers:
|
|
yield from Mamba2Model.modify_tensors(self, data_torch, name, bid)
|
|
return
|
|
elif bid in self._attn_layers:
|
|
yield from GraniteMoeModel.modify_tensors(self, data_torch, name, bid)
|
|
return
|
|
yield from ModelBase.modify_tensors(self, data_torch, name, bid)
|
|
|
|
def set_gguf_parameters(self):
|
|
"""This method merges params from both parents and some that are
|
|
specific to this model. The result is some duplication of how the params
|
|
get set. The following warnings are expected during conversion:
|
|
|
|
WARNING:Duplicated key name 'granitehybrid.attention.head_count_kv'
|
|
WARNING:Duplicated key name 'granitehybrid.context_length'
|
|
"""
|
|
GraniteMoeModel.set_gguf_parameters(self)
|
|
|
|
## Mamba mixer params ##
|
|
self.gguf_writer.add_ssm_conv_kernel(self.find_hparam(["conv_kernel", "d_conv"]))
|
|
self.gguf_writer.add_ssm_state_size(self.find_hparam(["state_size", "d_state", "state_dim", "ssm_state_size"]))
|
|
self.gguf_writer.add_ssm_group_count(self.n_group)
|
|
self.gguf_writer.add_ssm_inner_size(self.d_inner)
|
|
# NOTE: The mamba_dt_rank is _not_ the right field for how this is used
|
|
# in llama.cpp
|
|
self.gguf_writer.add_ssm_time_step_rank(self.find_hparam(["n_heads", "num_heads"]))
|
|
|
|
## Attention params ##
|
|
head_count_kv = self.find_hparam(["num_key_value_heads", "n_head_kv"])
|
|
head_count_kv_vec = [
|
|
head_count_kv if i in self._attn_layers else 0 for i in range(self.block_count)
|
|
]
|
|
if rope_dim := self.hparams.get("attn_rotary_emb"):
|
|
self.gguf_writer.add_rope_dimension_count(rope_dim)
|
|
self.gguf_writer.add_head_count_kv(head_count_kv_vec)
|
|
|
|
## If Bamba or non-hybrid, use rope, otherwise don't
|
|
use_rope = (
|
|
"BambaForCausalLM" in self.hparams["architectures"]
|
|
or not self._ssm_layers
|
|
)
|
|
self.gguf_writer.add_rope_scaling_finetuned(use_rope)
|
|
if not use_rope:
|
|
self.gguf_writer.add_context_length(2**20)
|
|
|
|
## Validation ##
|
|
d_head = self.find_hparam(["d_head"], optional=True) or 64
|
|
assert self.hparams.get("hidden_act") in [None, "silu"], "Only SILU activation supported"
|
|
assert self.d_inner % d_head == 0, f"SSM inner size {self.d_inner} not a multiple of head dim {d_head}"
|
|
|
|
def set_vocab(self):
|
|
self.hparams["pad_vocab_size_multiple"] = 8
|
|
Mamba2Model.set_vocab(self)
|
|
|
|
|
|
@ModelBase.register("GraniteSpeechForConditionalGeneration")
|
|
class GraniteSpeechMmprojModel(MmprojModel):
|
|
has_vision_encoder = False
|
|
has_audio_encoder = True
|
|
|
|
_batch_norm_tensors: list[dict[str, Tensor]] | None = None
|
|
|
|
def get_audio_config(self) -> dict[str, Any] | None:
|
|
return self.global_config.get("encoder_config")
|
|
|
|
def set_gguf_parameters(self):
|
|
assert self.hparams_audio is not None
|
|
a = self.hparams_audio
|
|
a["hidden_size"] = a["hidden_dim"]
|
|
a["intermediate_size"] = a["hidden_dim"] * a["feedforward_mult"]
|
|
a["num_attention_heads"] = a["num_heads"]
|
|
a["num_hidden_layers"] = a["num_layers"]
|
|
|
|
super().set_gguf_parameters()
|
|
|
|
self.gguf_writer.add_clip_projector_type(gguf.VisionProjectorType.GRANITE_SPEECH)
|
|
self.gguf_writer.add_audio_num_mel_bins(a["input_dim"])
|
|
self.gguf_writer.add_audio_attention_layernorm_eps(1e-5)
|
|
self.gguf_writer.add_audio_chunk_size(a["context_size"])
|
|
self.gguf_writer.add_audio_conv_kernel_size(a["conv_kernel_size"])
|
|
self.gguf_writer.add_audio_max_pos_emb(a["max_pos_emb"])
|
|
|
|
p = self.global_config
|
|
self.gguf_writer.add_audio_projector_window_size(p["window_size"])
|
|
self.gguf_writer.add_audio_projector_downsample_rate(p["downsample_rate"])
|
|
self.gguf_writer.add_audio_projector_head_count(p["projector_config"]["num_attention_heads"])
|
|
|
|
def tensor_force_quant(self, name, new_name, bid, n_dims):
|
|
if "encoder" in name or "projector" in name:
|
|
if ".conv" in name and ".weight" in name:
|
|
return gguf.GGMLQuantizationType.F32
|
|
return super().tensor_force_quant(name, new_name, bid, n_dims)
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
if "attention_dists" in name or "num_batches_tracked" in name:
|
|
return None
|
|
return super().filter_tensors(item)
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
# fold running_mean, running_var and eps into weight and bias for batch_norm
|
|
if "batch_norm" in name and "encoder.layers." in name:
|
|
if self._batch_norm_tensors is None:
|
|
self._batch_norm_tensors = [{} for _ in range(self.block_count)]
|
|
assert bid is not None
|
|
self._batch_norm_tensors[bid][name] = data_torch
|
|
if len(self._batch_norm_tensors[bid]) < 4:
|
|
return
|
|
prefix = f"encoder.layers.{bid}.conv.batch_norm"
|
|
weight = self._batch_norm_tensors[bid][f"{prefix}.weight"]
|
|
bias = self._batch_norm_tensors[bid][f"{prefix}.bias"]
|
|
running_mean = self._batch_norm_tensors[bid][f"{prefix}.running_mean"]
|
|
running_var = self._batch_norm_tensors[bid][f"{prefix}.running_var"]
|
|
eps = 1e-5
|
|
a = weight / torch.sqrt(running_var + eps)
|
|
b = bias - running_mean * a
|
|
yield from super().modify_tensors(a, f"encoder.layers.{bid}.conv.batch_norm.weight", bid)
|
|
yield from super().modify_tensors(b, f"encoder.layers.{bid}.conv.batch_norm.bias", bid)
|
|
return
|
|
|
|
if ".attn.to_kv.weight" in name:
|
|
k_weight, v_weight = data_torch.chunk(2, dim=0)
|
|
yield from super().modify_tensors(k_weight, name.replace("to_kv", "to_k"), bid)
|
|
yield from super().modify_tensors(v_weight, name.replace("to_kv", "to_v"), bid)
|
|
return
|
|
|
|
if ("up_conv" in name or "down_conv" in name) and name.endswith(".weight"):
|
|
if data_torch.ndim == 3 and data_torch.shape[2] == 1:
|
|
data_torch = data_torch.squeeze(2)
|
|
|
|
if "depth_conv" in name and name.endswith(".weight"):
|
|
if data_torch.ndim == 3 and data_torch.shape[1] == 1:
|
|
data_torch = data_torch.squeeze(1)
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|