from __future__ import annotations import json import sys from typing import Callable, Iterable, TYPE_CHECKING if TYPE_CHECKING: from torch import Tensor from .base import ModelBase, SentencePieceTokenTypes, TextModel, gguf, logger from .llama import LlamaModel @ModelBase.register("InternLM2ForCausalLM") class InternLM2Model(TextModel): model_arch = gguf.MODEL_ARCH.INTERNLM2 def set_vocab(self): # (TODO): Is there a better way? # Copy from _set_vocab_sentencepiece, The only difference is that we will treat the character # \x00 specially and convert it into an emoji character to prevent it from being mistakenly # recognized as an empty string in C++. from sentencepiece import SentencePieceProcessor from sentencepiece import sentencepiece_model_pb2 as model tokenizer_path = self.dir_model / 'tokenizer.model' tokens: list[bytes] = [] scores: list[float] = [] toktypes: list[int] = [] if not tokenizer_path.is_file(): logger.error(f'Error: Missing {tokenizer_path}') sys.exit(1) sentencepiece_model = model.ModelProto() # pyright: ignore[reportAttributeAccessIssue] # ty: ignore[unresolved-attribute] sentencepiece_model.ParseFromString(open(tokenizer_path, "rb").read()) add_prefix = sentencepiece_model.normalizer_spec.add_dummy_prefix tokenizer = SentencePieceProcessor() tokenizer.LoadFromFile(str(tokenizer_path)) vocab_size = self.hparams.get('vocab_size', tokenizer.vocab_size()) for token_id in range(vocab_size): piece = tokenizer.IdToPiece(token_id) text = piece.encode("utf-8") score = tokenizer.GetScore(token_id) if text == b"\x00": # (TODO): fixme # Hack here and replace the \x00 characters. logger.warning(f"InternLM2 convert token '{text}' to '🐉'!") text = "🐉".encode("utf-8") toktype = SentencePieceTokenTypes.NORMAL if tokenizer.IsUnknown(token_id): toktype = SentencePieceTokenTypes.UNKNOWN elif tokenizer.IsControl(token_id): toktype = SentencePieceTokenTypes.CONTROL elif tokenizer.IsUnused(token_id): toktype = SentencePieceTokenTypes.UNUSED elif tokenizer.IsByte(token_id): toktype = SentencePieceTokenTypes.BYTE # take care of ununsed raw token if piece.startswith('[UNUSED'): toktype = SentencePieceTokenTypes.UNUSED tokens.append(text) scores.append(score) toktypes.append(toktype) added_tokens_file = self.dir_model / 'added_tokens.json' if added_tokens_file.is_file(): with open(added_tokens_file, "r", encoding="utf-8") as f: added_tokens_json = json.load(f) for key in added_tokens_json: tokens.append(key.encode("utf-8")) scores.append(-1000.0) toktypes.append(SentencePieceTokenTypes.USER_DEFINED) chat_eos_token = '<|im_end|>' chat_eos_token_id = None tokenizer_config_file = self.dir_model / 'tokenizer_config.json' if tokenizer_config_file.is_file(): with open(tokenizer_config_file, "r", encoding="utf-8") as f: tokenizer_config_json = json.load(f) added_tokens_decoder = tokenizer_config_json.get("added_tokens_decoder", {}) for token_id, foken_data in added_tokens_decoder.items(): token_id = int(token_id) token = foken_data["content"] if token == chat_eos_token: chat_eos_token_id = token_id token = token.encode("utf-8") if toktypes[token_id] != SentencePieceTokenTypes.UNUSED: if tokens[token_id] != token: logger.warning(f'replacing token {token_id}: {tokens[token_id].decode("utf-8")!r} -> {token.decode("utf-8")!r}') tokens[token_id] = token scores[token_id] = -1000.0 toktypes[token_id] = SentencePieceTokenTypes.USER_DEFINED if foken_data.get("special"): toktypes[token_id] = SentencePieceTokenTypes.CONTROL tokenizer_file = self.dir_model / 'tokenizer.json' if tokenizer_file.is_file(): with open(tokenizer_file, "r", encoding="utf-8") as f: tokenizer_json = json.load(f) added_tokens = tokenizer_json.get("added_tokens", []) for foken_data in added_tokens: token_id = int(foken_data["id"]) token = foken_data["content"] if token == chat_eos_token: chat_eos_token_id = token_id token = token.encode("utf-8") if toktypes[token_id] != SentencePieceTokenTypes.UNUSED: if tokens[token_id] != token: logger.warning(f'replacing token {token_id}: {tokens[token_id].decode("utf-8")!r} -> {token.decode("utf-8")!r}') tokens[token_id] = token scores[token_id] = -1000.0 toktypes[token_id] = SentencePieceTokenTypes.USER_DEFINED if foken_data.get("special"): toktypes[token_id] = SentencePieceTokenTypes.CONTROL self.gguf_writer.add_tokenizer_model("llama") self.gguf_writer.add_tokenizer_pre("default") self.gguf_writer.add_token_list(tokens) self.gguf_writer.add_token_scores(scores) self.gguf_writer.add_token_types(toktypes) self.gguf_writer.add_add_space_prefix(add_prefix) special_vocab = gguf.SpecialVocab(self.dir_model, n_vocab=len(tokens)) old_eos = special_vocab.special_token_ids["eos"] if chat_eos_token_id is not None: # For the chat model, we replace the eos with '<|im_end|>'. # TODO: this is a hack, should be fixed # https://github.com/ggml-org/llama.cpp/pull/6745#issuecomment-2067687048 special_vocab.special_token_ids["eos"] = chat_eos_token_id logger.warning(f"Replace eos:{old_eos} with a special token:{chat_eos_token_id}" " in chat mode so that the conversation can end normally.") special_vocab.add_to_gguf(self.gguf_writer) def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]: num_heads = self.hparams["num_attention_heads"] num_kv_heads = self.hparams["num_key_value_heads"] n_embd = self.hparams["hidden_size"] q_per_kv = num_heads // num_kv_heads head_dim = n_embd // num_heads num_groups = num_heads // q_per_kv if bid is not None and f"model.layers.{bid}.attention.wqkv" in name: qkv = data_torch qkv = qkv.reshape((num_groups, q_per_kv + 2, head_dim, n_embd)) q, k, v = qkv[:, : q_per_kv], qkv[:, -2], qkv[:, -1] # The model weights of q and k equire additional reshape. q = LlamaModel.permute(q.reshape((-1, q.shape[-1])), num_heads, num_heads) k = LlamaModel.permute(k.reshape((-1, k.shape[-1])), num_heads, num_kv_heads) v = v.reshape((-1, v.shape[-1])) yield from super().modify_tensors(q, self.format_tensor_name(gguf.MODEL_TENSOR.ATTN_Q, bid), bid) yield from super().modify_tensors(k, self.format_tensor_name(gguf.MODEL_TENSOR.ATTN_K, bid), bid) yield from super().modify_tensors(v, self.format_tensor_name(gguf.MODEL_TENSOR.ATTN_V, bid), bid) else: yield from super().modify_tensors(data_torch, name, bid) @ModelBase.register("InternLM3ForCausalLM") class InternLM3Model(TextModel): model_arch = gguf.MODEL_ARCH.LLAMA def set_vocab(self): tokens, scores, toktypes = self._create_vocab_sentencepiece() self.gguf_writer.add_tokenizer_model("llama") self.gguf_writer.add_tokenizer_pre("default") self.gguf_writer.add_token_list(tokens) self.gguf_writer.add_token_scores(scores) self.gguf_writer.add_token_types(toktypes) special_vocab = gguf.SpecialVocab(self.dir_model, n_vocab=len(tokens)) tokenizer_config_file = self.dir_model / 'tokenizer_config.json' if tokenizer_config_file.is_file(): with open(tokenizer_config_file, "r", encoding="utf-8") as f: tokenizer_config_json = json.load(f) if "add_prefix_space" in tokenizer_config_json: self.gguf_writer.add_add_space_prefix(tokenizer_config_json["add_prefix_space"]) if "added_tokens_decoder" in tokenizer_config_json: for token_id, token_data in tokenizer_config_json["added_tokens_decoder"].items(): if token_data.get("special"): token_id = int(token_id) token = token_data["content"] special_vocab._set_special_token(token, token_id) # update eos token if token == '<|im_end|>' and "eos" in special_vocab.special_token_ids: special_vocab.special_token_ids["eos"] = token_id special_vocab.add_to_gguf(self.gguf_writer) def set_gguf_parameters(self): super().set_gguf_parameters() hparams = self.hparams self.gguf_writer.add_vocab_size(hparams["vocab_size"]) if (rope_dim := hparams.get("head_dim")) is None: rope_dim = hparams["hidden_size"] // hparams["num_attention_heads"] self.gguf_writer.add_rope_dimension_count(rope_dim) @classmethod def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None: name, gen = item if name.startswith(("mlp", "vision_model")): # skip visual tensors return None return super().filter_tensors(item) def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]: n_head = self.hparams["num_attention_heads"] n_kv_head = self.hparams.get("num_key_value_heads") if name.endswith(("q_proj.weight", "q_proj.bias")): data_torch = LlamaModel.permute(data_torch, n_head, n_head) if name.endswith(("k_proj.weight", "k_proj.bias")): data_torch = LlamaModel.permute(data_torch, n_head, n_kv_head) yield from super().modify_tensors(data_torch, name, bid)