koboldcpp/conversion/phi.py
Piotr Wilkin (ilintar) cc7200bf12
Refactor: convert_hf_to_gguf.py (#17114)
* move conversion code to a dedicated conversion directory and split the files akin to the src/models architecture

---------

Co-authored-by: Sigbjørn Skjæret <sigbjorn.skjaeret@scala.com>
2026-05-15 15:18:12 +02:00

390 lines
18 KiB
Python

from __future__ import annotations
import json
import math
from typing import Callable, Iterable, TYPE_CHECKING
import torch
if TYPE_CHECKING:
from torch import Tensor
from .base import MmprojModel, ModelBase, SentencePieceTokenTypes, TextModel, gguf, logger
@ModelBase.register("PhiForCausalLM")
class Phi2Model(TextModel):
model_arch = gguf.MODEL_ARCH.PHI2
def set_gguf_parameters(self):
rot_pct = self.find_hparam(["partial_rotary_factor"])
n_embd = self.find_hparam(["hidden_size", "n_embd"])
n_head = self.find_hparam(["num_attention_heads", "n_head"])
self.gguf_writer.add_context_length(self.find_hparam(["n_positions", "max_position_embeddings"]))
self.gguf_writer.add_embedding_length(n_embd)
self.gguf_writer.add_feed_forward_length(4 * n_embd)
self.gguf_writer.add_block_count(self.block_count)
self.gguf_writer.add_head_count(n_head)
self.gguf_writer.add_head_count_kv(n_head)
self.gguf_writer.add_layer_norm_eps(self.find_hparam(["layer_norm_epsilon", "layer_norm_eps"]))
self.gguf_writer.add_rope_dimension_count(int(rot_pct * n_embd) // n_head)
self.gguf_writer.add_file_type(self.ftype)
self.gguf_writer.add_add_bos_token(False)
@ModelBase.register("Phi3ForCausalLM", "Phi4ForCausalLMV")
class Phi3MiniModel(TextModel):
model_arch = gguf.MODEL_ARCH.PHI3
def set_vocab(self):
# Phi-4 model uses GPT2Tokenizer
tokenizer_config_file = self.dir_model / 'tokenizer_config.json'
if tokenizer_config_file.is_file():
with open(tokenizer_config_file, "r", encoding="utf-8") as f:
tokenizer_config_json = json.load(f)
tokenizer_class = tokenizer_config_json['tokenizer_class']
if tokenizer_class == 'GPT2Tokenizer':
return self._set_vocab_gpt2()
from sentencepiece import SentencePieceProcessor
tokenizer_path = self.dir_model / 'tokenizer.model'
if not tokenizer_path.is_file():
raise ValueError(f'Error: Missing {tokenizer_path}')
tokenizer = SentencePieceProcessor()
tokenizer.LoadFromFile(str(tokenizer_path))
vocab_size = self.hparams.get('vocab_size', tokenizer.vocab_size())
tokens: list[bytes] = [f"[PAD{i}]".encode("utf-8") for i in range(vocab_size)]
scores: list[float] = [-10000.0] * vocab_size
toktypes: list[int] = [SentencePieceTokenTypes.UNUSED] * vocab_size
for token_id in range(tokenizer.vocab_size()):
piece = tokenizer.IdToPiece(token_id)
text = piece.encode("utf-8")
score = tokenizer.GetScore(token_id)
toktype = SentencePieceTokenTypes.NORMAL
if tokenizer.IsUnknown(token_id):
toktype = SentencePieceTokenTypes.UNKNOWN
elif tokenizer.IsControl(token_id):
toktype = SentencePieceTokenTypes.CONTROL
elif tokenizer.IsUnused(token_id):
toktype = SentencePieceTokenTypes.UNUSED
elif tokenizer.IsByte(token_id):
toktype = SentencePieceTokenTypes.BYTE
tokens[token_id] = text
scores[token_id] = score
toktypes[token_id] = toktype
added_tokens_file = self.dir_model / 'added_tokens.json'
if added_tokens_file.is_file():
with open(added_tokens_file, "r", encoding="utf-8") as f:
added_tokens_json = json.load(f)
for key in added_tokens_json:
token_id = added_tokens_json[key]
if token_id >= vocab_size:
logger.debug(f'ignore token {token_id}: id is out of range, max={vocab_size - 1}')
continue
tokens[token_id] = key.encode("utf-8")
scores[token_id] = -1000.0
toktypes[token_id] = SentencePieceTokenTypes.USER_DEFINED
tokenizer_config_file = self.dir_model / 'tokenizer_config.json'
if tokenizer_config_file.is_file():
with open(tokenizer_config_file, "r", encoding="utf-8") as f:
tokenizer_config_json = json.load(f)
added_tokens_decoder = tokenizer_config_json.get("added_tokens_decoder", {})
for token_id, foken_data in added_tokens_decoder.items():
token_id = int(token_id)
token = foken_data["content"].encode("utf-8")
if toktypes[token_id] != SentencePieceTokenTypes.UNUSED:
if tokens[token_id] != token:
logger.warning(f'replacing token {token_id}: {tokens[token_id].decode("utf-8")!r} -> {token.decode("utf-8")!r}')
tokens[token_id] = token
scores[token_id] = -1000.0
toktypes[token_id] = SentencePieceTokenTypes.USER_DEFINED
if foken_data.get("special"):
toktypes[token_id] = SentencePieceTokenTypes.CONTROL
tokenizer_file = self.dir_model / 'tokenizer.json'
if tokenizer_file.is_file():
with open(tokenizer_file, "r", encoding="utf-8") as f:
tokenizer_json = json.load(f)
added_tokens = tokenizer_json.get("added_tokens", [])
for foken_data in added_tokens:
token_id = int(foken_data["id"])
token = foken_data["content"].encode("utf-8")
if toktypes[token_id] != SentencePieceTokenTypes.UNUSED:
if tokens[token_id] != token:
logger.warning(f'replacing token {token_id}: {tokens[token_id].decode("utf-8")!r} -> {token.decode("utf-8")!r}')
tokens[token_id] = token
scores[token_id] = -1000.0
toktypes[token_id] = SentencePieceTokenTypes.USER_DEFINED
if foken_data.get("special"):
toktypes[token_id] = SentencePieceTokenTypes.CONTROL
self.gguf_writer.add_tokenizer_model("llama")
self.gguf_writer.add_tokenizer_pre("default")
self.gguf_writer.add_token_list(tokens)
self.gguf_writer.add_token_scores(scores)
self.gguf_writer.add_token_types(toktypes)
special_vocab = gguf.SpecialVocab(self.dir_model, n_vocab=len(tokens))
special_vocab.add_to_gguf(self.gguf_writer)
def set_gguf_parameters(self):
n_embd = self.find_hparam(["hidden_size", "n_embd"])
n_head = self.find_hparam(["num_attention_heads", "n_head"])
n_head_kv = self.find_hparam(["num_key_value_heads", "n_head_kv"])
rms_eps = self.find_hparam(["rms_norm_eps"])
max_pos_embds = self.find_hparam(["n_positions", "max_position_embeddings"])
orig_max_pos_embds = self.find_hparam(["original_max_position_embeddings"])
rot_pct = self.hparams.get("partial_rotary_factor", 1.0)
rope_dims = int(rot_pct * n_embd) // n_head
self.gguf_writer.add_context_length(max_pos_embds)
self.gguf_writer.add_rope_scaling_orig_ctx_len(orig_max_pos_embds)
self.gguf_writer.add_embedding_length(n_embd)
self.gguf_writer.add_feed_forward_length(self.find_hparam(["intermediate_size"]))
self.gguf_writer.add_block_count(self.block_count)
self.gguf_writer.add_head_count(n_head)
self.gguf_writer.add_head_count_kv(n_head_kv)
self.gguf_writer.add_layer_norm_rms_eps(rms_eps)
self.gguf_writer.add_rope_dimension_count(rope_dims)
self.gguf_writer.add_rope_freq_base(self.rope_parameters.get("full_attention", self.rope_parameters)["rope_theta"])
self.gguf_writer.add_file_type(self.ftype)
sliding_window = self.hparams.get("sliding_window")
# use zero value of sliding_window to distinguish Phi-4 from other PHI3 models
if sliding_window is None:
sliding_window = 0
self.gguf_writer.add_sliding_window(sliding_window)
def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
n_embd = self.find_hparam(["hidden_size", "n_embd"])
n_head = self.find_hparam(["num_attention_heads", "n_head"])
max_pos_embds = self.find_hparam(["n_positions", "max_position_embeddings"])
orig_max_pos_embds = self.find_hparam(["original_max_position_embeddings"])
rot_pct = self.hparams.get("partial_rotary_factor", 1.0)
rope_dims = int(rot_pct * n_embd) // n_head
# write rope scaling for long context (128k) model
rope_scaling = self.find_hparam(['rope_scaling'], True)
if rope_scaling is None:
return
scale = max_pos_embds / orig_max_pos_embds
rope_scaling_type = rope_scaling.get('rope_type', rope_scaling.get('type', '')).lower()
if len(rope_scaling_type) == 0:
raise KeyError('Missing the required key rope_scaling.type')
if rope_scaling_type == 'su' or rope_scaling_type == 'longrope':
attn_factor = math.sqrt(1 + math.log(scale) / math.log(orig_max_pos_embds)) if scale > 1.0 else 1.0
elif rope_scaling_type == 'yarn':
attn_factor = 0.1 * math.log(scale) + 1.0 if scale > 1.0 else 1.0
else:
raise NotImplementedError(f'The rope scaling type {rope_scaling_type} is not supported yet')
self.gguf_writer.add_rope_scaling_attn_factors(attn_factor)
long_factors = rope_scaling.get('long_factor', None)
short_factors = rope_scaling.get('short_factor', None)
if long_factors is None or short_factors is None:
raise KeyError('Missing the required key rope_scaling.long_factor or rope_scaling_short_factor')
if len(long_factors) != len(short_factors) or len(long_factors) != rope_dims / 2:
raise ValueError(f'The length of rope long and short factors must be {rope_dims / 2}. long_factors = {len(long_factors)}, short_factors = {len(short_factors)}.')
yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FACTORS_LONG), torch.tensor(long_factors, dtype=torch.float32))
yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FACTORS_SHORT), torch.tensor(short_factors, dtype=torch.float32))
@ModelBase.register("Phi4ForCausalLMV")
class Phi4VisionMmprojModel(MmprojModel):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
assert self.hparams_vision is not None
self.vision_total_layers = int(self.find_vparam(self.n_block_keys))
if self.vision_total_layers < 2:
raise ValueError(
f"Phi-4 vision mmproj conversion requires at least 2 vision layers, got {self.vision_total_layers}"
)
# Phi-4 uses SigLIP2 hidden_states[-2], so export one fewer encoder block and
# drop post-layernorm/head weights. This makes the GGUF runtime output match
# the feature map consumed by the patched siglip.cpp Phi-4 projector path.
self.vision_export_layers = self.vision_total_layers - 1
self.vision_last_layer_idx = self.vision_total_layers - 1
for key in self.n_block_keys:
if key in self.hparams_vision:
self.hparams_vision[key] = self.vision_export_layers
break
self.block_count = self.vision_export_layers
self.tensor_map = gguf.get_tensor_name_map(gguf.MODEL_ARCH.MMPROJ, self.block_count)
patch_size = self.preprocessor_config.get("patch_size")
if patch_size is None:
raise KeyError("Phi-4 vision mmproj conversion requires patch_size in preprocessor_config.json")
self.hparams_vision["patch_size"] = patch_size
pos_emb_name = next(
(
name for name in self.model_tensors
if name.endswith("vision_model.embeddings.position_embedding.weight")
),
None,
)
if pos_emb_name is None:
raise KeyError("Phi-4 vision mmproj conversion could not find position_embedding.weight")
pos_emb_shape = self.model_tensors[pos_emb_name]().shape
base_grid_tokens = int(pos_emb_shape[0])
grid_side = math.isqrt(base_grid_tokens)
if grid_side * grid_side != base_grid_tokens:
raise ValueError(f"Unexpected Phi-4 position embedding shape: {tuple(pos_emb_shape)}")
self.hparams_vision["image_size"] = grid_side * patch_size
min_num_patches = self.preprocessor_config.get("min_num_patches", self.global_config.get("min_num_patches"))
max_num_patches = self.preprocessor_config.get("max_num_patches", self.global_config.get("max_num_patches"))
if min_num_patches is None or max_num_patches is None:
raise KeyError("Phi-4 vision mmproj conversion requires min_num_patches and max_num_patches")
self.min_pixels = int(min_num_patches) * patch_size * patch_size
self.max_pixels = int(max_num_patches) * patch_size * patch_size
def set_gguf_parameters(self):
super().set_gguf_parameters()
assert self.hparams_vision is not None
self.gguf_writer.add_clip_projector_type(gguf.VisionProjectorType.PHI4)
self.gguf_writer.add_vision_min_pixels(self.min_pixels)
self.gguf_writer.add_vision_max_pixels(self.max_pixels)
self.gguf_writer.add_vision_use_gelu(True)
self.gguf_writer.add_vision_attention_layernorm_eps(self.hparams_vision.get("layer_norm_eps", 1e-6))
@classmethod
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
name, gen = item
name = name.replace("model.vision_tower.vision_tower.", "vision_tower.")
if not name.startswith(("vision_tower.", "model.mm_projector.", "mm_projector.")):
return None
if ".vision_model.head." in name:
return None
if ".vision_model.post_layernorm." in name:
return None
return super().filter_tensors((name, gen))
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
if name.startswith("vision_tower."):
if bid is not None and bid == self.vision_last_layer_idx:
return
if name.endswith("vision_model.embeddings.patch_embedding.weight"):
assert self.hparams_vision is not None
if data_torch.ndim != 2:
raise ValueError(f"Unexpected Phi-4 patch embedding shape: {tuple(data_torch.shape)}")
patch_area = self.hparams_vision["patch_size"] ** 2
in_features = data_torch.shape[1]
if in_features % patch_area != 0:
raise ValueError(
f"Phi-4 patch embedding input dim {in_features} is not divisible by patch area {patch_area}"
)
num_channels = in_features // patch_area
patch_size = self.hparams_vision["patch_size"]
data_torch = data_torch.view(data_torch.shape[0], patch_size, patch_size, num_channels)
data_torch = data_torch.permute(0, 3, 1, 2)
yield from super().modify_tensors(data_torch, name, bid)
return
if name.startswith(("model.mm_projector.", "mm_projector.")):
local_name = name
local_name = local_name.replace("model.mm_projector.", "")
local_name = local_name.replace("mm_projector.", "")
if not (local_name.startswith("0.") or local_name.startswith("2.")):
return
suffix = ".bias" if local_name.endswith(".bias") else ".weight"
mm_idx = int(local_name.split(".", maxsplit=1)[0])
yield (self.format_tensor_name(gguf.MODEL_TENSOR.V_MMPROJ, mm_idx, suffix=suffix), data_torch)
return
return
@ModelBase.register("PhiMoEForCausalLM")
class PhiMoeModel(Phi3MiniModel):
model_arch = gguf.MODEL_ARCH.PHIMOE
_experts: list[dict[str, Tensor]] | None = None
def set_gguf_parameters(self):
super().set_gguf_parameters()
self.gguf_writer.add_expert_used_count(self.find_hparam(["num_experts_per_tok", "num_experts_per_token"]))
self.gguf_writer.add_expert_count(self.find_hparam(["num_local_experts", "num_experts"]))
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
# process the experts separately
if name.find("block_sparse_moe.experts") != -1:
n_experts = self.find_hparam(["num_local_experts", "num_experts"])
assert bid is not None
if self._experts is None:
self._experts = [{} for _ in range(self.block_count)]
self._experts[bid][name] = data_torch
if len(self._experts[bid]) >= n_experts * 3:
# merge the experts into a single 3d tensor
for w_name in ["w1", "w2", "w3"]:
datas: list[Tensor] = []
for xid in range(n_experts):
ename = f"model.layers.{bid}.block_sparse_moe.experts.{xid}.{w_name}.weight"
datas.append(self._experts[bid][ename])
del self._experts[bid][ename]
data_torch = torch.stack(datas, dim=0)
merged_name = f"model.layers.{bid}.block_sparse_moe.experts.{w_name}.weight"
yield from super().modify_tensors(data_torch, merged_name, bid)
return
else:
return
yield from super().modify_tensors(data_torch, name, bid)
def prepare_tensors(self):
super().prepare_tensors()
if self._experts is not None:
# flatten `list[dict[str, Tensor]]` into `list[str]`
experts = [k for d in self._experts for k in d.keys()]
if len(experts) > 0:
raise ValueError(f"Unprocessed experts: {experts}")