mirror of
https://github.com/LostRuins/koboldcpp.git
synced 2026-05-17 04:09:19 +00:00
* move conversion code to a dedicated conversion directory and split the files akin to the src/models architecture --------- Co-authored-by: Sigbjørn Skjæret <sigbjorn.skjaeret@scala.com>
162 lines
6.6 KiB
Python
162 lines
6.6 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
|
|
from typing import Iterable, TYPE_CHECKING
|
|
|
|
import torch
|
|
|
|
if TYPE_CHECKING:
|
|
from torch import Tensor
|
|
|
|
from .base import ModelBase, SentencePieceTokenTypes, TextModel, gguf, logger
|
|
|
|
from .llama import LlamaModel
|
|
|
|
|
|
@ModelBase.register("ArcticForCausalLM")
|
|
class ArcticModel(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.ARCTIC
|
|
|
|
def set_vocab(self):
|
|
# The reason for using a custom implementation here is that the
|
|
# snowflake-arctic-instruct model redefined tokens 31998 and 31999 from
|
|
# tokenizer.model and used them as BOS and EOS instead of adding new tokens.
|
|
from sentencepiece import SentencePieceProcessor
|
|
|
|
tokenizer_path = self.dir_model / 'tokenizer.model'
|
|
|
|
if not tokenizer_path.is_file():
|
|
logger.error(f'Error: Missing {tokenizer_path}')
|
|
sys.exit(1)
|
|
|
|
# Read the whole vocabulary from the tokenizer.model file
|
|
tokenizer = SentencePieceProcessor()
|
|
tokenizer.LoadFromFile(str(tokenizer_path))
|
|
|
|
vocab_size = self.hparams.get('vocab_size', tokenizer.vocab_size())
|
|
|
|
tokens: list[bytes] = [f"[PAD{i}]".encode("utf-8") for i in range(vocab_size)]
|
|
scores: list[float] = [-10000.0] * vocab_size
|
|
toktypes: list[int] = [SentencePieceTokenTypes.UNUSED] * vocab_size
|
|
|
|
for token_id in range(tokenizer.vocab_size()):
|
|
|
|
piece = tokenizer.IdToPiece(token_id)
|
|
text = piece.encode("utf-8")
|
|
score = tokenizer.GetScore(token_id)
|
|
|
|
toktype = SentencePieceTokenTypes.NORMAL
|
|
if tokenizer.IsUnknown(token_id):
|
|
toktype = SentencePieceTokenTypes.UNKNOWN
|
|
elif tokenizer.IsControl(token_id):
|
|
toktype = SentencePieceTokenTypes.CONTROL
|
|
elif tokenizer.IsUnused(token_id):
|
|
toktype = SentencePieceTokenTypes.UNUSED
|
|
elif tokenizer.IsByte(token_id):
|
|
toktype = SentencePieceTokenTypes.BYTE
|
|
|
|
tokens[token_id] = text
|
|
scores[token_id] = score
|
|
toktypes[token_id] = toktype
|
|
|
|
# Use the added_tokens_decoder field from tokeniser_config.json as the source
|
|
# of information about added/redefined tokens and modify them accordingly.
|
|
tokenizer_config_file = self.dir_model / 'tokenizer_config.json'
|
|
if tokenizer_config_file.is_file():
|
|
with open(tokenizer_config_file, "r", encoding="utf-8") as f:
|
|
tokenizer_config_json = json.load(f)
|
|
|
|
if "added_tokens_decoder" in tokenizer_config_json:
|
|
added_tokens_decoder = tokenizer_config_json["added_tokens_decoder"]
|
|
for token_id, token_json in added_tokens_decoder.items():
|
|
token_id = int(token_id)
|
|
if token_id >= vocab_size:
|
|
logger.debug(f'ignore token {token_id}: id is out of range, max={vocab_size - 1}')
|
|
continue
|
|
|
|
token_content = token_json["content"]
|
|
token_type = SentencePieceTokenTypes.USER_DEFINED
|
|
token_score = -10000.0
|
|
|
|
# Map unk_token to UNKNOWN, other special tokens to CONTROL
|
|
# Set the score to 0.0 as in the original tokenizer.model
|
|
if ("special" in token_json) and token_json["special"]:
|
|
if token_content == tokenizer_config_json["unk_token"]:
|
|
token_type = SentencePieceTokenTypes.UNKNOWN
|
|
else:
|
|
token_type = SentencePieceTokenTypes.CONTROL
|
|
token_score = 0.0
|
|
|
|
logger.info(f"Setting added token {token_id} to '{token_content}' (type: {token_type}, score: {token_score:.2f})")
|
|
tokens[token_id] = token_content.encode("utf-8")
|
|
toktypes[token_id] = token_type
|
|
scores[token_id] = token_score
|
|
|
|
self.gguf_writer.add_tokenizer_model("llama")
|
|
self.gguf_writer.add_tokenizer_pre("default")
|
|
self.gguf_writer.add_token_list(tokens)
|
|
self.gguf_writer.add_token_scores(scores)
|
|
self.gguf_writer.add_token_types(toktypes)
|
|
|
|
special_vocab = gguf.SpecialVocab(self.dir_model, n_vocab=len(tokens))
|
|
special_vocab.add_to_gguf(self.gguf_writer)
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
hparams = self.hparams
|
|
self.gguf_writer.add_vocab_size(hparams["vocab_size"])
|
|
self.gguf_writer.add_rope_dimension_count(hparams["hidden_size"] // hparams["num_attention_heads"])
|
|
|
|
_experts: list[dict[str, Tensor]] | None = None
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
n_head = self.hparams["num_attention_heads"]
|
|
n_kv_head = self.hparams.get("num_key_value_heads")
|
|
|
|
if name.endswith("q_proj.weight"):
|
|
data_torch = LlamaModel.permute(data_torch, n_head, n_head)
|
|
if name.endswith("k_proj.weight"):
|
|
data_torch = LlamaModel.permute(data_torch, n_head, n_kv_head)
|
|
|
|
# process the experts separately
|
|
if name.find("block_sparse_moe.experts") != -1:
|
|
n_experts = self.hparams["num_local_experts"]
|
|
|
|
assert bid is not None
|
|
|
|
if self._experts is None:
|
|
self._experts = [{} for _ in range(self.block_count)]
|
|
|
|
self._experts[bid][name] = data_torch
|
|
|
|
if len(self._experts[bid]) >= n_experts * 3:
|
|
# merge the experts into a single 3d tensor
|
|
for wid in ["w1", "w2", "w3"]:
|
|
datas: list[Tensor] = []
|
|
|
|
for xid in range(n_experts):
|
|
ename = f"model.layers.{bid}.block_sparse_moe.experts.{xid}.{wid}.weight"
|
|
datas.append(self._experts[bid][ename])
|
|
del self._experts[bid][ename]
|
|
|
|
data_torch = torch.stack(datas, dim=0)
|
|
|
|
merged_name = f"layers.{bid}.feed_forward.experts.{wid}.weight"
|
|
|
|
yield from super().modify_tensors(data_torch, merged_name, bid)
|
|
return
|
|
else:
|
|
return
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
def prepare_tensors(self):
|
|
super().prepare_tensors()
|
|
|
|
if self._experts is not None:
|
|
# flatten `list[dict[str, Tensor]]` into `list[str]`
|
|
experts = [k for d in self._experts for k in d.keys()]
|
|
if len(experts) > 0:
|
|
raise ValueError(f"Unprocessed experts: {experts}")
|