mirror of
https://github.com/LostRuins/koboldcpp.git
synced 2026-05-17 04:09:19 +00:00
* move conversion code to a dedicated conversion directory and split the files akin to the src/models architecture --------- Co-authored-by: Sigbjørn Skjæret <sigbjorn.skjaeret@scala.com>
256 lines
9.8 KiB
Python
256 lines
9.8 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Callable, Iterable, TYPE_CHECKING
|
|
|
|
import torch
|
|
|
|
if TYPE_CHECKING:
|
|
from torch import Tensor
|
|
|
|
from .base import MmprojModel, ModelBase, TextModel, gguf
|
|
|
|
from .gemma import ConformerAudioModel
|
|
|
|
|
|
@ModelBase.register("Lfm2ForCausalLM", "LFM2ForCausalLM")
|
|
class LFM2Model(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.LFM2
|
|
|
|
def _add_feed_forward_length(self):
|
|
ff_dim = self.find_hparam(["block_ff_dim", "intermediate_size"])
|
|
auto_adjust_ff_dim = self.hparams["block_auto_adjust_ff_dim"]
|
|
ffn_dim_multiplier = self.hparams["block_ffn_dim_multiplier"]
|
|
multiple_of = self.hparams["block_multiple_of"]
|
|
|
|
if auto_adjust_ff_dim:
|
|
ff_dim = int(2 * ff_dim / 3)
|
|
# custom dim factor multiplier
|
|
if ffn_dim_multiplier is not None:
|
|
ff_dim = int(ffn_dim_multiplier * ff_dim)
|
|
ff_dim = multiple_of * ((ff_dim + multiple_of - 1) // multiple_of)
|
|
|
|
self.gguf_writer.add_feed_forward_length(ff_dim)
|
|
|
|
def set_gguf_parameters(self):
|
|
# set num_key_value_heads only for attention layers
|
|
self.hparams["num_key_value_heads"] = [
|
|
self.hparams["num_key_value_heads"] if layer_type != "conv" else 0
|
|
for layer_type in self.hparams["layer_types"]
|
|
]
|
|
|
|
super().set_gguf_parameters()
|
|
self.gguf_writer.add_vocab_size(self.hparams["vocab_size"])
|
|
self.gguf_writer.add_shortconv_l_cache(self.hparams["conv_L_cache"])
|
|
self.gguf_writer.add_layer_norm_rms_eps(self.hparams["norm_eps"])
|
|
self._add_feed_forward_length()
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
if ConformerAudioModel.is_audio_tensor(name):
|
|
# skip multimodal tensors
|
|
return None
|
|
|
|
name = name.replace("lfm.", "model.") # audio
|
|
|
|
return super().filter_tensors((name, gen))
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
# conv op requires 2d tensor
|
|
if 'conv.conv' in name:
|
|
data_torch = data_torch.squeeze(1)
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("Lfm2Model")
|
|
class LFM2ColBertModel(LFM2Model):
|
|
model_arch = gguf.MODEL_ARCH.LFM2
|
|
dense_tensor_name = "dense_2"
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
if not name.startswith(self.dense_tensor_name):
|
|
name = "model." + name
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
|
|
# dense tensor is stored in a separate safetensors file
|
|
from safetensors.torch import load_file
|
|
tensors_file = self.dir_model / "1_Dense" / "model.safetensors"
|
|
assert tensors_file.is_file()
|
|
tensor = load_file(tensors_file)["linear.weight"]
|
|
self.gguf_writer.add_embedding_length_out(tensor.shape[0])
|
|
yield f"{self.dense_tensor_name}.weight", tensor.clone()
|
|
|
|
|
|
@ModelBase.register("Lfm2MoeForCausalLM")
|
|
class LFM2MoeModel(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.LFM2MOE
|
|
|
|
def set_gguf_parameters(self):
|
|
# set num_key_value_heads only for attention layers
|
|
self.hparams["num_key_value_heads"] = [
|
|
self.hparams["num_key_value_heads"] if layer_type == "full_attention" else 0
|
|
for layer_type in self.hparams["layer_types"]
|
|
]
|
|
|
|
super().set_gguf_parameters()
|
|
|
|
self.gguf_writer.add_expert_feed_forward_length(self.hparams["moe_intermediate_size"])
|
|
self.gguf_writer.add_leading_dense_block_count(self.hparams["num_dense_layers"])
|
|
self.gguf_writer.add_expert_gating_func(gguf.ExpertGatingFuncType.SIGMOID)
|
|
|
|
self.gguf_writer.add_vocab_size(self.hparams["vocab_size"])
|
|
self.gguf_writer.add_shortconv_l_cache(self.hparams["conv_L_cache"])
|
|
|
|
# cache for experts weights for merging
|
|
_experts_cache: dict[int, dict[str, Tensor]] = {}
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
if name.endswith(".expert_bias"):
|
|
name = name.replace(".expert_bias", ".expert_bias.bias")
|
|
|
|
return super().filter_tensors((name, gen))
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
# conv op requires 2d tensor
|
|
if 'conv.conv' in name:
|
|
data_torch = data_torch.squeeze(1)
|
|
|
|
# merge expert weights
|
|
if 'experts' in name:
|
|
n_experts = self.find_hparam(["num_local_experts", "num_experts"])
|
|
assert bid is not None
|
|
|
|
expert_cache = self._experts_cache.setdefault(bid, {})
|
|
expert_cache[name] = data_torch
|
|
expert_weights = ["w1", "w2", "w3"]
|
|
|
|
# not enough expert weights to merge
|
|
if len(expert_cache) < n_experts * len(expert_weights):
|
|
return
|
|
|
|
for w_name in expert_weights:
|
|
datas: list[Tensor] = []
|
|
|
|
for xid in range(n_experts):
|
|
ename = f"model.layers.{bid}.feed_forward.experts.{xid}.{w_name}.weight"
|
|
datas.append(expert_cache[ename])
|
|
del expert_cache[ename]
|
|
|
|
data_torch = torch.stack(datas, dim=0)
|
|
merged_name = f"layers.{bid}.feed_forward.experts.{w_name}.weight"
|
|
|
|
yield from super().modify_tensors(data_torch, merged_name, bid)
|
|
|
|
del self._experts_cache[bid]
|
|
return
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
def prepare_tensors(self):
|
|
super().prepare_tensors()
|
|
assert not self._experts_cache
|
|
|
|
|
|
@ModelBase.register("Lfm2VlForConditionalGeneration")
|
|
class LFM2VLModel(MmprojModel):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
assert self.hparams_vision is not None
|
|
# TODO(tarek): for dynamic resolution image_size is not specified, setting here for compatibility
|
|
self.hparams_vision["image_size"] = 256
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
self.gguf_writer.add_clip_projector_type(gguf.VisionProjectorType.LFM2)
|
|
self.gguf_writer.add_vision_attention_layernorm_eps(self.find_vparam(["layer_norm_eps"]))
|
|
self.gguf_writer.add_vision_projector_scale_factor(self.global_config.get("downsample_factor", 2))
|
|
self.gguf_writer.add_vision_use_gelu(True)
|
|
# python notation, e.g. for vision_feature_layer == -1, we pick last layer -> vision_feature_layers_to_drop = 0
|
|
vision_feature_layers_to_drop = -(self.global_config.get("vision_feature_layer", -1) + 1)
|
|
self.gguf_writer.add_vision_block_count(self.find_vparam(self.n_block_keys) - vision_feature_layers_to_drop)
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
name = name.replace("model.vision_tower.", "vision_tower.")
|
|
name = name.replace("model.multi_modal_projector.", "multi_modal_projector.")
|
|
|
|
return super().filter_tensors((name, gen))
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
if "patch_embedding.weight" in name:
|
|
data_torch = data_torch.view(data_torch.shape[0], 16, 16, 3).permute(0, 3, 1, 2)
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("Lfm2AudioForConditionalGeneration")
|
|
class LFM2AudioModel(ConformerAudioModel):
|
|
has_vision_encoder = False
|
|
has_audio_encoder = True
|
|
model_name = "Lfm2AudioEncoder"
|
|
|
|
def get_audio_config(self) -> dict[str, Any] | None:
|
|
return self.global_config.get("encoder")
|
|
|
|
def set_gguf_parameters(self):
|
|
assert self.hparams_audio is not None
|
|
self.hparams_audio["hidden_size"] = self.hparams_audio["d_model"]
|
|
self.hparams_audio["intermediate_size"] = self.hparams_audio["d_model"]
|
|
self.hparams_audio["num_attention_heads"] = self.hparams_audio["n_heads"]
|
|
super().set_gguf_parameters()
|
|
self.gguf_writer.add_clip_projector_type(gguf.VisionProjectorType.LFM2A)
|
|
self.gguf_writer.add_audio_num_mel_bins(self.hparams_audio["feat_in"])
|
|
self.gguf_writer.add_audio_attention_layernorm_eps(1e-5)
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
# skip language model tensors
|
|
if name.startswith("lfm."):
|
|
return None
|
|
|
|
# for training only
|
|
if any(p in name for p in ["audio_loss_weight"]):
|
|
return None
|
|
|
|
# for audio output
|
|
if any(p in name for p in ["codebook_offsets", "depth_embeddings", "depth_linear", "depthformer"]):
|
|
return None
|
|
|
|
return super().filter_tensors(item)
|
|
|
|
|
|
@ModelBase.register("Lfm25AudioTokenizer")
|
|
class LFM25AudioTokenizer(LFM2Model):
|
|
model_arch = gguf.MODEL_ARCH.LFM2
|
|
|
|
def set_vocab(self):
|
|
self._set_vocab_none()
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
self.gguf_writer.add_sliding_window(self.hparams["sliding_window"])
|
|
self.gguf_writer.add_embedding_length_out(self.hparams["output_size"])
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
# skip language model tensors
|
|
if name == "istft.window" or name.startswith("emb.emb"):
|
|
return None
|
|
|
|
if name.startswith("lin"):
|
|
name = name.replace("lin", "dense_2_out")
|
|
|
|
return super().filter_tensors((name, gen))
|