mirror of
https://github.com/LostRuins/koboldcpp.git
synced 2026-05-18 23:49:46 +00:00
360 lines
16 KiB
Python
360 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
from typing import Any, Callable, Iterable, TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from torch import Tensor
|
|
|
|
from .base import MmprojModel, ModelBase, gguf, logger
|
|
|
|
from .qwen import Qwen3Model, Qwen3MoeModel
|
|
from .qwenvl import Qwen25AudioModel
|
|
|
|
|
|
@ModelBase.register("Qwen3VLForConditionalGeneration", "Qwen3VLMoeForConditionalGeneration", "Qwen3_5ForConditionalGeneration", "Qwen3_5MoeForConditionalGeneration")
|
|
class Qwen3VLVisionModel(MmprojModel):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
if self.hparams_vision is None:
|
|
logger.info("No vision config found, skipping vision tensor processing")
|
|
return
|
|
|
|
# Compute image_size if not present
|
|
if "image_size" not in self.hparams_vision:
|
|
# For Qwen3VL/Qwen3VLMoe, compute from num_position_embeddings
|
|
num_pos = self.hparams_vision.get("num_position_embeddings", 2304)
|
|
patch_size = self.hparams_vision.get("patch_size", 16)
|
|
# num_position_embeddings = (image_size / patch_size) ** 2
|
|
# So image_size = sqrt(num_position_embeddings) * patch_size
|
|
image_size = int(num_pos**0.5 * patch_size)
|
|
self.hparams_vision["image_size"] = image_size
|
|
|
|
# Rename config values for compatibility
|
|
self.hparams_vision["num_attention_heads"] = self.hparams_vision.get("num_heads")
|
|
self.hparams_vision["num_hidden_layers"] = self.hparams_vision.get("depth")
|
|
|
|
self.is_deepstack_layers = [False] * int(self.hparams_vision["num_hidden_layers"] or 0)
|
|
for idx in self.hparams_vision.get("deepstack_visual_indexes", []):
|
|
self.is_deepstack_layers[idx] = True
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
# in case mixed modalities, the arch will be handled by subclass
|
|
if not self.has_audio_encoder:
|
|
self.gguf_writer.add_clip_projector_type(gguf.VisionProjectorType.QWEN3VL)
|
|
self.gguf_writer.add_vision_use_gelu(True)
|
|
|
|
if self.hparams_vision is not None:
|
|
merge_size = self.hparams_vision.get("spatial_merge_size")
|
|
if merge_size is not None:
|
|
self.gguf_writer.add_vision_spatial_merge_size(int(merge_size))
|
|
|
|
# Use text config's rms_norm_eps for vision attention layernorm eps
|
|
rms_norm_eps = self.global_config.get("text_config", {}).get("rms_norm_eps", 1e-6)
|
|
self.gguf_writer.add_vision_attention_layernorm_eps(rms_norm_eps)
|
|
|
|
if self.is_deepstack_layers:
|
|
self.gguf_writer.add_vision_is_deepstack_layers(self.is_deepstack_layers)
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
# Skip text model tensors
|
|
if name.startswith("lm_head."):
|
|
return None
|
|
|
|
# Skip MTP tensors
|
|
if name.startswith("mtp."):
|
|
return None
|
|
|
|
if name.startswith("model.visual."):
|
|
name = name.replace("model.visual.", "visual.", 1)
|
|
|
|
if not name.startswith("visual."):
|
|
return None
|
|
|
|
return super().filter_tensors((name, gen))
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
assert self.hparams_vision is not None
|
|
|
|
if name.startswith("visual.deepstack_merger_list."):
|
|
prefix, rest = name.split(".", maxsplit=3)[2:]
|
|
# prefix is the layer index, convert to absolute clip layer index!
|
|
idx = self.hparams_vision.get("deepstack_visual_indexes", [])[int(prefix)]
|
|
target = rest
|
|
|
|
tensor_type: gguf.MODEL_TENSOR
|
|
if target.startswith("norm."):
|
|
tensor_type = gguf.MODEL_TENSOR.V_DS_NORM
|
|
suffix = target.split(".", 1)[1]
|
|
elif target.startswith("linear_fc1."):
|
|
tensor_type = gguf.MODEL_TENSOR.V_DS_FC1
|
|
suffix = target.split(".", 1)[1]
|
|
elif target.startswith("linear_fc2."):
|
|
tensor_type = gguf.MODEL_TENSOR.V_DS_FC2
|
|
suffix = target.split(".", 1)[1]
|
|
else:
|
|
raise ValueError(f"Unexpected deepstack tensor: {name}")
|
|
|
|
new_name = self.format_tensor_name(tensor_type, idx, suffix=f".{suffix}")
|
|
yield from super().modify_tensors(data_torch, new_name, bid)
|
|
return
|
|
|
|
if name.startswith("visual.merger."):
|
|
suffix = name.split(".", 2)[2]
|
|
if suffix.startswith("linear_fc"):
|
|
fc_idx_str, tail = suffix.split(".", 1)
|
|
fc_num = int(fc_idx_str.replace("linear_fc", ""))
|
|
# Qwen3VL has linear_fc1 and linear_fc2
|
|
# Map to indices 0 and 2 (matching Qwen2VL which uses indices 0 and 2)
|
|
if fc_num == 1:
|
|
fc_idx = 0
|
|
elif fc_num == 2:
|
|
fc_idx = 2
|
|
else:
|
|
raise ValueError(f"unexpected fc index {fc_num} in {name}")
|
|
new_name = self.format_tensor_name(gguf.MODEL_TENSOR.V_MMPROJ, fc_idx, suffix=f".{tail}")
|
|
elif suffix.startswith("norm."):
|
|
new_name = self.format_tensor_name(gguf.MODEL_TENSOR.V_POST_NORM, suffix=f".{suffix.split('.', 1)[1]}")
|
|
else:
|
|
raise ValueError(f"Unexpected merger tensor: {name}")
|
|
yield (new_name, data_torch)
|
|
return
|
|
|
|
if name == "visual.patch_embed.proj.weight":
|
|
# split Conv3D into Conv2Ds along temporal dimension
|
|
c1, c2, kt, _, _ = data_torch.shape
|
|
del c1, c2
|
|
if kt != 2:
|
|
raise ValueError("Current implementation only supports temporal_patch_size of 2")
|
|
yield (gguf.TENSOR_NAMES[gguf.MODEL_TENSOR.V_ENC_EMBD_PATCH] + ".weight", data_torch[:, :, 0, ...])
|
|
yield (gguf.TENSOR_NAMES[gguf.MODEL_TENSOR.V_ENC_EMBD_PATCH] + ".weight.1", data_torch[:, :, 1, ...])
|
|
return
|
|
|
|
if name == "visual.patch_embed.proj.bias":
|
|
# Include the bias - it's used by the C++ code
|
|
yield (gguf.TENSOR_NAMES[gguf.MODEL_TENSOR.V_ENC_EMBD_PATCH] + ".bias", data_torch)
|
|
return
|
|
|
|
yield from MmprojModel.modify_tensors(self, data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("Qwen3OmniMoeForConditionalGeneration")
|
|
class Qwen3OmniMmprojModel(Qwen3VLVisionModel, Qwen25AudioModel):
|
|
has_audio_encoder = True
|
|
has_vision_encoder = True
|
|
|
|
def get_vision_config(self) -> dict[str, Any] | None:
|
|
if self.has_vision_encoder:
|
|
return self.global_config["thinker_config"].get("vision_config")
|
|
else:
|
|
return None
|
|
|
|
def get_audio_config(self) -> dict[str, Any] | None:
|
|
if self.has_audio_encoder:
|
|
return self.global_config["thinker_config"].get("audio_config")
|
|
else:
|
|
return None
|
|
|
|
def set_gguf_parameters(self):
|
|
if self.has_vision_encoder:
|
|
Qwen3VLVisionModel.set_gguf_parameters(self)
|
|
self.gguf_writer.add_clip_vision_projector_type(gguf.VisionProjectorType.QWEN3VL)
|
|
if self.has_audio_encoder:
|
|
Qwen25AudioModel.set_gguf_parameters(self)
|
|
self.gguf_writer.add_clip_audio_projector_type(gguf.VisionProjectorType.QWEN3A)
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
# Skip text model tensors
|
|
if name.startswith("lm_head."):
|
|
return None
|
|
|
|
# Skip MTP tensors
|
|
if name.startswith("mtp."):
|
|
return None
|
|
|
|
if name.startswith("model.visual."):
|
|
name = name.replace("model.visual.", "visual.", 1)
|
|
|
|
if name.startswith("thinker.audio_tower."):
|
|
name = name.replace("thinker.audio_tower.", "audio_tower.", 1)
|
|
|
|
if "visual." not in name and "audio_tower." not in name:
|
|
return None
|
|
|
|
return MmprojModel.filter_tensors((name, gen))
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
if "visual." in name:
|
|
if not self.has_vision_encoder:
|
|
raise ValueError(f"Model does not have vision encoder, but found tensor {name}")
|
|
# need to transform vision tensor naming, so that modify_tensors() logic can be used correctly
|
|
name = name.replace("thinker.visual.", "model.visual.")
|
|
if ".merger_list." in name:
|
|
name = name.replace(".merger_list.", ".deepstack_merger_list.")
|
|
name = name.replace(".ln_q", ".norm")
|
|
name = name.replace(".mlp.0", ".linear_fc1")
|
|
name = name.replace(".mlp.2", ".linear_fc2")
|
|
elif ".merger." in name:
|
|
name = name.replace(".ln_q", ".norm")
|
|
name = name.replace(".mlp.0", ".linear_fc1")
|
|
name = name.replace(".mlp.2", ".linear_fc2")
|
|
yield from Qwen3VLVisionModel.modify_tensors(self, data_torch, name, bid)
|
|
elif "audio_tower." in name:
|
|
if not self.has_audio_encoder:
|
|
raise ValueError(f"Model does not have audio encoder, but found tensor {name}")
|
|
if "conv2d" in name and name.endswith(".bias"):
|
|
# transform conv2d bias [n_embd] --> [1, 1, n_embd]
|
|
data_torch = data_torch.unsqueeze(-1).unsqueeze(-1)
|
|
yield from Qwen25AudioModel.modify_tensors(self, data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("Qwen3ASRForConditionalGeneration")
|
|
class Qwen3ASRMmprojModel(Qwen3OmniMmprojModel):
|
|
has_audio_encoder = True
|
|
has_vision_encoder = False
|
|
|
|
|
|
@ModelBase.register("Glm4vForConditionalGeneration", "Glm4vMoeForConditionalGeneration", "GlmOcrForConditionalGeneration")
|
|
class Glm4VVisionModel(Qwen3VLVisionModel):
|
|
def set_gguf_parameters(self):
|
|
MmprojModel.set_gguf_parameters(self) # skip Qwen3VLVisionModel parameters
|
|
assert self.hparams_vision is not None
|
|
self.gguf_writer.add_clip_projector_type(gguf.VisionProjectorType.GLM4V)
|
|
|
|
hidden_act = str(self.hparams_vision.get("hidden_act", "")).lower()
|
|
if hidden_act == "gelu":
|
|
self.gguf_writer.add_vision_use_gelu(True)
|
|
elif hidden_act == "silu":
|
|
self.gguf_writer.add_vision_use_silu(True)
|
|
|
|
rms_norm_eps = self.hparams_vision.get("rms_norm_eps", 1e-5)
|
|
self.gguf_writer.add_vision_attention_layernorm_eps(rms_norm_eps)
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
if name.startswith("visual.merger."):
|
|
yield from ModelBase.modify_tensors(self, data_torch, name, bid)
|
|
return
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("Qwen3VLForConditionalGeneration")
|
|
class Qwen3VLTextModel(Qwen3Model):
|
|
model_arch = gguf.MODEL_ARCH.QWEN3VL
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
if "thinker_config" in self.hparams:
|
|
vision_config = self.hparams["thinker_config"].get("vision_config", {})
|
|
else:
|
|
vision_config = self.hparams.get("vision_config", {})
|
|
deepstack_layer_num = len(vision_config.get("deepstack_visual_indexes", []))
|
|
self.gguf_writer.add_num_deepstack_layers(deepstack_layer_num)
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
name = name.replace("thinker.", "")
|
|
|
|
return super().filter_tensors((name, gen))
|
|
|
|
|
|
@ModelBase.register("Qwen3VLMoeForConditionalGeneration")
|
|
class Qwen3VLMoeTextModel(Qwen3MoeModel):
|
|
model_arch = gguf.MODEL_ARCH.QWEN3VLMOE
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
vision_config = self.hparams.get("vision_config", {})
|
|
deepstack_layer_num = len(vision_config.get("deepstack_visual_indexes", []))
|
|
self.gguf_writer.add_num_deepstack_layers(deepstack_layer_num)
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
name = name.replace("thinker.", "")
|
|
|
|
return super().filter_tensors((name, gen))
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
# Qwen3VL has transposed packed tensors, so we treat it differently from general Qwen2MoE packed tensors
|
|
if name.endswith("mlp.experts.down_proj") or name.endswith("mlp.experts.down_proj.weight"):
|
|
mapped = f"{name}.weight" if not name.endswith(".weight") else name
|
|
permuted = data_torch.permute(0, 2, 1).contiguous()
|
|
yield from ModelBase.modify_tensors(self, permuted, mapped, bid)
|
|
return
|
|
|
|
if name.endswith("mlp.experts.gate_up_proj") or name.endswith("mlp.experts.gate_up_proj.weight"):
|
|
if data_torch.ndim < 3 or data_torch.shape[-1] % 2 != 0:
|
|
raise ValueError(f"Unexpected gate_up_proj shape for {name}: {tuple(data_torch.shape)}")
|
|
split_dim = data_torch.shape[-1] // 2
|
|
gate = data_torch[..., :split_dim].contiguous()
|
|
up = data_torch[..., split_dim:].contiguous()
|
|
# Input gate/up: (n_expert=128, n_embd=2048, n_ff_exp=768)
|
|
# Want GGML ne: {n_embd, n_ff_exp, n_expert} = {2048, 768, 128}
|
|
# Need PyTorch: (128, 768, 2048) [reversed of GGML]
|
|
# So: permute(0, 2, 1): (128, 2048, 768) -> (128, 768, 2048)
|
|
base_name = name.removesuffix(".weight")
|
|
base = base_name.rsplit('.', 1)[0]
|
|
mapped_gate = f"{base}.gate_proj.weight"
|
|
mapped_up = f"{base}.up_proj.weight"
|
|
perm_gate = gate.permute(0, 2, 1).contiguous()
|
|
perm_up = up.permute(0, 2, 1).contiguous()
|
|
yield from ModelBase.modify_tensors(self, perm_gate, mapped_gate, bid)
|
|
yield from ModelBase.modify_tensors(self, perm_up, mapped_up, bid)
|
|
return
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("Qwen3OmniMoeForConditionalGeneration")
|
|
class Qwen3OmniMoeTextModel(Qwen3VLMoeTextModel):
|
|
model_arch = gguf.MODEL_ARCH.QWEN3VLMOE
|
|
|
|
def set_vocab(self):
|
|
super().set_vocab()
|
|
# correct BOS/EOS tokens
|
|
with open(self.dir_model / "tokenizer_config.json", "r", encoding="utf-8") as f:
|
|
tokenizer_config = json.load(f)
|
|
added_tokens = tokenizer_config.get("added_tokens_decoder", {})
|
|
for token_id, data in added_tokens.items():
|
|
if data.get("content") == "<|im_end|>":
|
|
self.gguf_writer.add_bos_token_id(int(token_id))
|
|
self.gguf_writer.add_eos_token_id(int(token_id))
|
|
break
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
self.gguf_writer.add_num_deepstack_layers(0)
|
|
|
|
|
|
@ModelBase.register("Qwen3ASRForConditionalGeneration")
|
|
class Qwen3ASRTextModel(Qwen3VLTextModel):
|
|
model_arch = gguf.MODEL_ARCH.QWEN3VL
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
self.gguf_writer.add_num_deepstack_layers(0)
|
|
|
|
def set_vocab(self):
|
|
super().set_vocab()
|
|
# fix chat template, use correct chatml format
|
|
self.gguf_writer.add_chat_template("{% for message in messages %}{{'<|im_start|>' + message['role'] + '\\n' + message['content'] + '<|im_end|>' + '\\n'}}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\\n' }}{% endif %}")
|
|
# correct BOS/EOS tokens
|
|
with open(self.dir_model / "tokenizer_config.json", "r", encoding="utf-8") as f:
|
|
tokenizer_config = json.load(f)
|
|
added_tokens = tokenizer_config.get("added_tokens_decoder", {})
|
|
for token_id, data in added_tokens.items():
|
|
if data.get("content") == "<|im_end|>":
|
|
self.gguf_writer.add_bos_token_id(int(token_id))
|
|
self.gguf_writer.add_eos_token_id(int(token_id))
|
|
break
|