mirror of
https://github.com/LostRuins/koboldcpp.git
synced 2026-05-17 04:09:19 +00:00
* move conversion code to a dedicated conversion directory and split the files akin to the src/models architecture --------- Co-authored-by: Sigbjørn Skjæret <sigbjorn.skjaeret@scala.com>
390 lines
18 KiB
Python
390 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import math
|
|
|
|
from typing import Callable, Iterable, TYPE_CHECKING
|
|
|
|
import torch
|
|
|
|
if TYPE_CHECKING:
|
|
from torch import Tensor
|
|
|
|
from .base import MmprojModel, ModelBase, SentencePieceTokenTypes, TextModel, gguf, logger
|
|
|
|
|
|
@ModelBase.register("PhiForCausalLM")
|
|
class Phi2Model(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.PHI2
|
|
|
|
def set_gguf_parameters(self):
|
|
rot_pct = self.find_hparam(["partial_rotary_factor"])
|
|
n_embd = self.find_hparam(["hidden_size", "n_embd"])
|
|
n_head = self.find_hparam(["num_attention_heads", "n_head"])
|
|
|
|
self.gguf_writer.add_context_length(self.find_hparam(["n_positions", "max_position_embeddings"]))
|
|
|
|
self.gguf_writer.add_embedding_length(n_embd)
|
|
self.gguf_writer.add_feed_forward_length(4 * n_embd)
|
|
self.gguf_writer.add_block_count(self.block_count)
|
|
self.gguf_writer.add_head_count(n_head)
|
|
self.gguf_writer.add_head_count_kv(n_head)
|
|
self.gguf_writer.add_layer_norm_eps(self.find_hparam(["layer_norm_epsilon", "layer_norm_eps"]))
|
|
self.gguf_writer.add_rope_dimension_count(int(rot_pct * n_embd) // n_head)
|
|
self.gguf_writer.add_file_type(self.ftype)
|
|
self.gguf_writer.add_add_bos_token(False)
|
|
|
|
|
|
@ModelBase.register("Phi3ForCausalLM", "Phi4ForCausalLMV")
|
|
class Phi3MiniModel(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.PHI3
|
|
|
|
def set_vocab(self):
|
|
# Phi-4 model uses GPT2Tokenizer
|
|
tokenizer_config_file = self.dir_model / 'tokenizer_config.json'
|
|
if tokenizer_config_file.is_file():
|
|
with open(tokenizer_config_file, "r", encoding="utf-8") as f:
|
|
tokenizer_config_json = json.load(f)
|
|
tokenizer_class = tokenizer_config_json['tokenizer_class']
|
|
if tokenizer_class == 'GPT2Tokenizer':
|
|
return self._set_vocab_gpt2()
|
|
|
|
from sentencepiece import SentencePieceProcessor
|
|
|
|
tokenizer_path = self.dir_model / 'tokenizer.model'
|
|
|
|
if not tokenizer_path.is_file():
|
|
raise ValueError(f'Error: Missing {tokenizer_path}')
|
|
|
|
tokenizer = SentencePieceProcessor()
|
|
tokenizer.LoadFromFile(str(tokenizer_path))
|
|
|
|
vocab_size = self.hparams.get('vocab_size', tokenizer.vocab_size())
|
|
|
|
tokens: list[bytes] = [f"[PAD{i}]".encode("utf-8") for i in range(vocab_size)]
|
|
scores: list[float] = [-10000.0] * vocab_size
|
|
toktypes: list[int] = [SentencePieceTokenTypes.UNUSED] * vocab_size
|
|
|
|
for token_id in range(tokenizer.vocab_size()):
|
|
|
|
piece = tokenizer.IdToPiece(token_id)
|
|
text = piece.encode("utf-8")
|
|
score = tokenizer.GetScore(token_id)
|
|
|
|
toktype = SentencePieceTokenTypes.NORMAL
|
|
if tokenizer.IsUnknown(token_id):
|
|
toktype = SentencePieceTokenTypes.UNKNOWN
|
|
elif tokenizer.IsControl(token_id):
|
|
toktype = SentencePieceTokenTypes.CONTROL
|
|
elif tokenizer.IsUnused(token_id):
|
|
toktype = SentencePieceTokenTypes.UNUSED
|
|
elif tokenizer.IsByte(token_id):
|
|
toktype = SentencePieceTokenTypes.BYTE
|
|
|
|
tokens[token_id] = text
|
|
scores[token_id] = score
|
|
toktypes[token_id] = toktype
|
|
|
|
added_tokens_file = self.dir_model / 'added_tokens.json'
|
|
if added_tokens_file.is_file():
|
|
with open(added_tokens_file, "r", encoding="utf-8") as f:
|
|
added_tokens_json = json.load(f)
|
|
|
|
for key in added_tokens_json:
|
|
token_id = added_tokens_json[key]
|
|
if token_id >= vocab_size:
|
|
logger.debug(f'ignore token {token_id}: id is out of range, max={vocab_size - 1}')
|
|
continue
|
|
|
|
tokens[token_id] = key.encode("utf-8")
|
|
scores[token_id] = -1000.0
|
|
toktypes[token_id] = SentencePieceTokenTypes.USER_DEFINED
|
|
|
|
tokenizer_config_file = self.dir_model / 'tokenizer_config.json'
|
|
if tokenizer_config_file.is_file():
|
|
with open(tokenizer_config_file, "r", encoding="utf-8") as f:
|
|
tokenizer_config_json = json.load(f)
|
|
added_tokens_decoder = tokenizer_config_json.get("added_tokens_decoder", {})
|
|
for token_id, foken_data in added_tokens_decoder.items():
|
|
token_id = int(token_id)
|
|
token = foken_data["content"].encode("utf-8")
|
|
if toktypes[token_id] != SentencePieceTokenTypes.UNUSED:
|
|
if tokens[token_id] != token:
|
|
logger.warning(f'replacing token {token_id}: {tokens[token_id].decode("utf-8")!r} -> {token.decode("utf-8")!r}')
|
|
tokens[token_id] = token
|
|
scores[token_id] = -1000.0
|
|
toktypes[token_id] = SentencePieceTokenTypes.USER_DEFINED
|
|
if foken_data.get("special"):
|
|
toktypes[token_id] = SentencePieceTokenTypes.CONTROL
|
|
|
|
tokenizer_file = self.dir_model / 'tokenizer.json'
|
|
if tokenizer_file.is_file():
|
|
with open(tokenizer_file, "r", encoding="utf-8") as f:
|
|
tokenizer_json = json.load(f)
|
|
added_tokens = tokenizer_json.get("added_tokens", [])
|
|
for foken_data in added_tokens:
|
|
token_id = int(foken_data["id"])
|
|
token = foken_data["content"].encode("utf-8")
|
|
if toktypes[token_id] != SentencePieceTokenTypes.UNUSED:
|
|
if tokens[token_id] != token:
|
|
logger.warning(f'replacing token {token_id}: {tokens[token_id].decode("utf-8")!r} -> {token.decode("utf-8")!r}')
|
|
tokens[token_id] = token
|
|
scores[token_id] = -1000.0
|
|
toktypes[token_id] = SentencePieceTokenTypes.USER_DEFINED
|
|
if foken_data.get("special"):
|
|
toktypes[token_id] = SentencePieceTokenTypes.CONTROL
|
|
|
|
self.gguf_writer.add_tokenizer_model("llama")
|
|
self.gguf_writer.add_tokenizer_pre("default")
|
|
self.gguf_writer.add_token_list(tokens)
|
|
self.gguf_writer.add_token_scores(scores)
|
|
self.gguf_writer.add_token_types(toktypes)
|
|
|
|
special_vocab = gguf.SpecialVocab(self.dir_model, n_vocab=len(tokens))
|
|
special_vocab.add_to_gguf(self.gguf_writer)
|
|
|
|
def set_gguf_parameters(self):
|
|
n_embd = self.find_hparam(["hidden_size", "n_embd"])
|
|
n_head = self.find_hparam(["num_attention_heads", "n_head"])
|
|
n_head_kv = self.find_hparam(["num_key_value_heads", "n_head_kv"])
|
|
rms_eps = self.find_hparam(["rms_norm_eps"])
|
|
max_pos_embds = self.find_hparam(["n_positions", "max_position_embeddings"])
|
|
orig_max_pos_embds = self.find_hparam(["original_max_position_embeddings"])
|
|
rot_pct = self.hparams.get("partial_rotary_factor", 1.0)
|
|
rope_dims = int(rot_pct * n_embd) // n_head
|
|
|
|
self.gguf_writer.add_context_length(max_pos_embds)
|
|
self.gguf_writer.add_rope_scaling_orig_ctx_len(orig_max_pos_embds)
|
|
self.gguf_writer.add_embedding_length(n_embd)
|
|
self.gguf_writer.add_feed_forward_length(self.find_hparam(["intermediate_size"]))
|
|
self.gguf_writer.add_block_count(self.block_count)
|
|
self.gguf_writer.add_head_count(n_head)
|
|
self.gguf_writer.add_head_count_kv(n_head_kv)
|
|
self.gguf_writer.add_layer_norm_rms_eps(rms_eps)
|
|
self.gguf_writer.add_rope_dimension_count(rope_dims)
|
|
self.gguf_writer.add_rope_freq_base(self.rope_parameters.get("full_attention", self.rope_parameters)["rope_theta"])
|
|
self.gguf_writer.add_file_type(self.ftype)
|
|
sliding_window = self.hparams.get("sliding_window")
|
|
# use zero value of sliding_window to distinguish Phi-4 from other PHI3 models
|
|
if sliding_window is None:
|
|
sliding_window = 0
|
|
self.gguf_writer.add_sliding_window(sliding_window)
|
|
|
|
def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
|
|
n_embd = self.find_hparam(["hidden_size", "n_embd"])
|
|
n_head = self.find_hparam(["num_attention_heads", "n_head"])
|
|
max_pos_embds = self.find_hparam(["n_positions", "max_position_embeddings"])
|
|
orig_max_pos_embds = self.find_hparam(["original_max_position_embeddings"])
|
|
rot_pct = self.hparams.get("partial_rotary_factor", 1.0)
|
|
rope_dims = int(rot_pct * n_embd) // n_head
|
|
|
|
# write rope scaling for long context (128k) model
|
|
rope_scaling = self.find_hparam(['rope_scaling'], True)
|
|
if rope_scaling is None:
|
|
return
|
|
|
|
scale = max_pos_embds / orig_max_pos_embds
|
|
|
|
rope_scaling_type = rope_scaling.get('rope_type', rope_scaling.get('type', '')).lower()
|
|
if len(rope_scaling_type) == 0:
|
|
raise KeyError('Missing the required key rope_scaling.type')
|
|
|
|
if rope_scaling_type == 'su' or rope_scaling_type == 'longrope':
|
|
attn_factor = math.sqrt(1 + math.log(scale) / math.log(orig_max_pos_embds)) if scale > 1.0 else 1.0
|
|
elif rope_scaling_type == 'yarn':
|
|
attn_factor = 0.1 * math.log(scale) + 1.0 if scale > 1.0 else 1.0
|
|
else:
|
|
raise NotImplementedError(f'The rope scaling type {rope_scaling_type} is not supported yet')
|
|
|
|
self.gguf_writer.add_rope_scaling_attn_factors(attn_factor)
|
|
|
|
long_factors = rope_scaling.get('long_factor', None)
|
|
short_factors = rope_scaling.get('short_factor', None)
|
|
|
|
if long_factors is None or short_factors is None:
|
|
raise KeyError('Missing the required key rope_scaling.long_factor or rope_scaling_short_factor')
|
|
|
|
if len(long_factors) != len(short_factors) or len(long_factors) != rope_dims / 2:
|
|
raise ValueError(f'The length of rope long and short factors must be {rope_dims / 2}. long_factors = {len(long_factors)}, short_factors = {len(short_factors)}.')
|
|
|
|
yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FACTORS_LONG), torch.tensor(long_factors, dtype=torch.float32))
|
|
yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FACTORS_SHORT), torch.tensor(short_factors, dtype=torch.float32))
|
|
|
|
|
|
@ModelBase.register("Phi4ForCausalLMV")
|
|
class Phi4VisionMmprojModel(MmprojModel):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
assert self.hparams_vision is not None
|
|
|
|
self.vision_total_layers = int(self.find_vparam(self.n_block_keys))
|
|
if self.vision_total_layers < 2:
|
|
raise ValueError(
|
|
f"Phi-4 vision mmproj conversion requires at least 2 vision layers, got {self.vision_total_layers}"
|
|
)
|
|
|
|
# Phi-4 uses SigLIP2 hidden_states[-2], so export one fewer encoder block and
|
|
# drop post-layernorm/head weights. This makes the GGUF runtime output match
|
|
# the feature map consumed by the patched siglip.cpp Phi-4 projector path.
|
|
self.vision_export_layers = self.vision_total_layers - 1
|
|
self.vision_last_layer_idx = self.vision_total_layers - 1
|
|
|
|
for key in self.n_block_keys:
|
|
if key in self.hparams_vision:
|
|
self.hparams_vision[key] = self.vision_export_layers
|
|
break
|
|
|
|
self.block_count = self.vision_export_layers
|
|
self.tensor_map = gguf.get_tensor_name_map(gguf.MODEL_ARCH.MMPROJ, self.block_count)
|
|
|
|
patch_size = self.preprocessor_config.get("patch_size")
|
|
if patch_size is None:
|
|
raise KeyError("Phi-4 vision mmproj conversion requires patch_size in preprocessor_config.json")
|
|
|
|
self.hparams_vision["patch_size"] = patch_size
|
|
|
|
pos_emb_name = next(
|
|
(
|
|
name for name in self.model_tensors
|
|
if name.endswith("vision_model.embeddings.position_embedding.weight")
|
|
),
|
|
None,
|
|
)
|
|
if pos_emb_name is None:
|
|
raise KeyError("Phi-4 vision mmproj conversion could not find position_embedding.weight")
|
|
|
|
pos_emb_shape = self.model_tensors[pos_emb_name]().shape
|
|
base_grid_tokens = int(pos_emb_shape[0])
|
|
grid_side = math.isqrt(base_grid_tokens)
|
|
if grid_side * grid_side != base_grid_tokens:
|
|
raise ValueError(f"Unexpected Phi-4 position embedding shape: {tuple(pos_emb_shape)}")
|
|
|
|
self.hparams_vision["image_size"] = grid_side * patch_size
|
|
|
|
min_num_patches = self.preprocessor_config.get("min_num_patches", self.global_config.get("min_num_patches"))
|
|
max_num_patches = self.preprocessor_config.get("max_num_patches", self.global_config.get("max_num_patches"))
|
|
if min_num_patches is None or max_num_patches is None:
|
|
raise KeyError("Phi-4 vision mmproj conversion requires min_num_patches and max_num_patches")
|
|
|
|
self.min_pixels = int(min_num_patches) * patch_size * patch_size
|
|
self.max_pixels = int(max_num_patches) * patch_size * patch_size
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
assert self.hparams_vision is not None
|
|
|
|
self.gguf_writer.add_clip_projector_type(gguf.VisionProjectorType.PHI4)
|
|
self.gguf_writer.add_vision_min_pixels(self.min_pixels)
|
|
self.gguf_writer.add_vision_max_pixels(self.max_pixels)
|
|
self.gguf_writer.add_vision_use_gelu(True)
|
|
self.gguf_writer.add_vision_attention_layernorm_eps(self.hparams_vision.get("layer_norm_eps", 1e-6))
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
name = name.replace("model.vision_tower.vision_tower.", "vision_tower.")
|
|
|
|
if not name.startswith(("vision_tower.", "model.mm_projector.", "mm_projector.")):
|
|
return None
|
|
|
|
if ".vision_model.head." in name:
|
|
return None
|
|
|
|
if ".vision_model.post_layernorm." in name:
|
|
return None
|
|
|
|
return super().filter_tensors((name, gen))
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
if name.startswith("vision_tower."):
|
|
if bid is not None and bid == self.vision_last_layer_idx:
|
|
return
|
|
|
|
if name.endswith("vision_model.embeddings.patch_embedding.weight"):
|
|
assert self.hparams_vision is not None
|
|
if data_torch.ndim != 2:
|
|
raise ValueError(f"Unexpected Phi-4 patch embedding shape: {tuple(data_torch.shape)}")
|
|
|
|
patch_area = self.hparams_vision["patch_size"] ** 2
|
|
in_features = data_torch.shape[1]
|
|
if in_features % patch_area != 0:
|
|
raise ValueError(
|
|
f"Phi-4 patch embedding input dim {in_features} is not divisible by patch area {patch_area}"
|
|
)
|
|
|
|
num_channels = in_features // patch_area
|
|
patch_size = self.hparams_vision["patch_size"]
|
|
data_torch = data_torch.view(data_torch.shape[0], patch_size, patch_size, num_channels)
|
|
data_torch = data_torch.permute(0, 3, 1, 2)
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
return
|
|
|
|
if name.startswith(("model.mm_projector.", "mm_projector.")):
|
|
local_name = name
|
|
local_name = local_name.replace("model.mm_projector.", "")
|
|
local_name = local_name.replace("mm_projector.", "")
|
|
|
|
if not (local_name.startswith("0.") or local_name.startswith("2.")):
|
|
return
|
|
|
|
suffix = ".bias" if local_name.endswith(".bias") else ".weight"
|
|
mm_idx = int(local_name.split(".", maxsplit=1)[0])
|
|
yield (self.format_tensor_name(gguf.MODEL_TENSOR.V_MMPROJ, mm_idx, suffix=suffix), data_torch)
|
|
return
|
|
|
|
return
|
|
|
|
|
|
@ModelBase.register("PhiMoEForCausalLM")
|
|
class PhiMoeModel(Phi3MiniModel):
|
|
model_arch = gguf.MODEL_ARCH.PHIMOE
|
|
|
|
_experts: list[dict[str, Tensor]] | None = None
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
self.gguf_writer.add_expert_used_count(self.find_hparam(["num_experts_per_tok", "num_experts_per_token"]))
|
|
self.gguf_writer.add_expert_count(self.find_hparam(["num_local_experts", "num_experts"]))
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
# process the experts separately
|
|
if name.find("block_sparse_moe.experts") != -1:
|
|
n_experts = self.find_hparam(["num_local_experts", "num_experts"])
|
|
assert bid is not None
|
|
|
|
if self._experts is None:
|
|
self._experts = [{} for _ in range(self.block_count)]
|
|
|
|
self._experts[bid][name] = data_torch
|
|
|
|
if len(self._experts[bid]) >= n_experts * 3:
|
|
# merge the experts into a single 3d tensor
|
|
for w_name in ["w1", "w2", "w3"]:
|
|
datas: list[Tensor] = []
|
|
|
|
for xid in range(n_experts):
|
|
ename = f"model.layers.{bid}.block_sparse_moe.experts.{xid}.{w_name}.weight"
|
|
datas.append(self._experts[bid][ename])
|
|
del self._experts[bid][ename]
|
|
|
|
data_torch = torch.stack(datas, dim=0)
|
|
|
|
merged_name = f"model.layers.{bid}.block_sparse_moe.experts.{w_name}.weight"
|
|
|
|
yield from super().modify_tensors(data_torch, merged_name, bid)
|
|
return
|
|
else:
|
|
return
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
def prepare_tensors(self):
|
|
super().prepare_tensors()
|
|
|
|
if self._experts is not None:
|
|
# flatten `list[dict[str, Tensor]]` into `list[str]`
|
|
experts = [k for d in self._experts for k in d.keys()]
|
|
if len(experts) > 0:
|
|
raise ValueError(f"Unprocessed experts: {experts}")
|