merge: integrate origin/main into sft branch

Resolved 6 conflicts:
- CMakeLists.txt: keep cpptrace + debug flag, accept flexible build type
- worker_pool.cpp: keep SFT profiling + main's block=1 spin fix
- ext_bindings.cpp: keep both SFT MOE bindings and AVX2/BF16/FP8 bindings
- common.hpp: keep gpu_experts_mask + SFT backward weight fields
- __init__.py: export both generate_gpu_experts_masks and AMXSFTMoEWrapper
- experts.py: gpu_experts_mask for inference, num_gpu_experts for SFT, new methods
This commit is contained in:
mrhaoxx 2026-04-08 23:19:28 +08:00
commit a98d544833
112 changed files with 21976 additions and 1917 deletions

View file

@ -291,6 +291,11 @@ class FP8SafeTensorLoader(SafeTensorLoader):
Supported formats:
- DeepSeek style: {base}.mlp.experts.{id}.{gate,up,down}_proj.weight
- Mixtral/MiniMax style: {base}.block_sparse_moe.experts.{id}.{w1,w3,w2}.weight
- Mistral style: {base}.experts.{id}.{w1,w3,w2}.weight
Supported scale formats (auto-detected):
- Block-wise: weight_scale_inv (DeepSeek FP8)
- Per-channel: weight_scale (GLM-4.7-FP8)
The format is auto-detected during initialization.
"""
@ -299,15 +304,32 @@ class FP8SafeTensorLoader(SafeTensorLoader):
MOE_FORMATS = {
"deepseek": ("{base}.mlp.experts", "gate_proj", "up_proj", "down_proj"),
"mixtral": ("{base}.block_sparse_moe.experts", "w1", "w3", "w2"),
"mistral": ("{base}.experts", "w1", "w3", "w2"),
}
def __init__(self, file_path: str):
def __init__(self, file_path: str, scale_suffix: str = None):
"""Initialize FP8 loader with optional scale suffix override.
Args:
file_path: Path to safetensor files
scale_suffix: Optional scale key suffix. If None, auto-detect between
'weight_scale_inv' (block-wise) and 'weight_scale' (per-channel).
"""
super().__init__(file_path)
self._detected_format = None
self._scale_suffix = scale_suffix # None means auto-detect
# Set per_channel based on explicit scale_suffix if provided
if scale_suffix == "weight_scale":
self._is_per_channel = True
elif scale_suffix == "weight_scale_inv":
self._is_per_channel = False
else:
self._is_per_channel = False # Will be updated in _detect_format if auto-detect
self._is_vl_model = False
self._detect_format()
def _detect_format(self):
"""Auto-detect the MoE naming format by checking tensor keys."""
"""Auto-detect the MoE naming format and scale format by checking tensor keys."""
# Sample some tensor names to detect format
sample_keys = list(self.tensor_file_map.keys())[:1000]
@ -320,20 +342,78 @@ class FP8SafeTensorLoader(SafeTensorLoader):
if "block_sparse_moe.experts" in key and fmt_name == "mixtral":
self._detected_format = fmt_name
print(f"[FP8SafeTensorLoader] Detected format: {fmt_name}")
return
break
elif "mlp.experts" in key and "block_sparse_moe" not in key and fmt_name == "deepseek":
self._detected_format = fmt_name
print(f"[FP8SafeTensorLoader] Detected format: {fmt_name}")
return
break
elif fmt_name == "mistral" and ".mlp.experts" not in key and ".block_sparse_moe.experts" not in key:
self._detected_format = fmt_name
print(f"[FP8SafeTensorLoader] Detected format: {fmt_name}")
break
if self._detected_format:
break
# Default to deepseek if no format detected
self._detected_format = "deepseek"
print("[FP8SafeTensorLoader] No MoE format detected, defaulting to: deepseek")
if not self._detected_format:
self._detected_format = "deepseek"
print("[FP8SafeTensorLoader] No MoE format detected, defaulting to: deepseek")
def _get_experts_prefix(self, base_key: str) -> str:
"""Get the experts prefix based on detected format."""
# Auto-detect scale suffix if not specified
if self._scale_suffix is None:
_, gate, _, _ = self.MOE_FORMATS[self._detected_format]
# Check for per-channel scale (weight_scale) vs block-wise (weight_scale_inv)
for key in sample_keys:
if f".{gate}.weight_scale_inv" in key:
self._scale_suffix = "weight_scale_inv"
self._is_per_channel = False
print("[FP8SafeTensorLoader] Detected scale format: block-wise (weight_scale_inv)")
if key.startswith("model.language_model.") and self._detected_format == "deepseek":
# VL models(Qwen3.5): model.layers.{N} -> model.language_model.layers.{N}
self._is_vl_model = True
print("[FP8SafeTensorLoader] Detected VL model")
return
elif f".{gate}.weight_scale" in key and "weight_scale_inv" not in key:
self._scale_suffix = "weight_scale"
# Some models (e.g., Mistral) use block-wise FP8 scales but keep
# the key suffix as `weight_scale` (without `_inv`). Infer format
# from scale tensor shape instead of suffix alone:
# - per-channel: [N] or [N, 1]
# - block-wise: [N_block, K_block] (both dims > 1)
scale_tensor = self.load_tensor(key, device="cpu")
if scale_tensor.dim() == 1:
self._is_per_channel = True
elif scale_tensor.dim() == 2 and scale_tensor.shape[1] == 1:
self._is_per_channel = True
else:
self._is_per_channel = False
scale_kind = "per-channel" if self._is_per_channel else "block-wise"
print(f"[FP8SafeTensorLoader] Detected scale format: {scale_kind} (weight_scale)")
return
# Default to weight_scale_inv
self._scale_suffix = "weight_scale_inv"
self._is_per_channel = False
print("[FP8SafeTensorLoader] No scale format detected, defaulting to: weight_scale_inv")
else:
# Scale suffix was explicitly provided
scale_type = "per-channel" if self._is_per_channel else "block-wise"
print(f"[FP8SafeTensorLoader] Using explicit scale format: {scale_type} ({self._scale_suffix})")
def _get_experts_prefix_candidates(self, base_key: str) -> list[str]:
"""Get candidate experts prefixes based on detected format and base key variants."""
path_tpl, _, _, _ = self.MOE_FORMATS[self._detected_format]
return path_tpl.format(base=base_key)
candidates = []
if self._is_vl_model:
base_key = base_key.replace("model.layers", "model.language_model.layers")
candidates.append(path_tpl.format(base=base_key))
# Some model weights (e.g., Mistral native format) do not have "model." prefix.
if base_key.startswith("model."):
candidates.append(path_tpl.format(base=base_key[len("model.") :]))
# Deduplicate while preserving order.
return list(dict.fromkeys(candidates))
def _get_proj_names(self):
"""Get projection names (gate, up, down) based on detected format."""
@ -353,16 +433,26 @@ class FP8SafeTensorLoader(SafeTensorLoader):
return tensor.to(device)
def load_experts(self, base_key: str, device: str = "cpu"):
"""Load FP8 expert weights and their block-wise scale_inv tensors."""
experts_prefix = self._get_experts_prefix(base_key)
"""Load FP8 expert weights and their scale tensors.
Supports both block-wise (weight_scale_inv) and per-channel (weight_scale) formats.
Per-channel scales are squeezed from [N, 1] to [N] if needed.
"""
experts_prefix_candidates = self._get_experts_prefix_candidates(base_key)
gate_name, up_name, down_name = self._get_proj_names()
expert_count = 0
while self.has_tensor(f"{experts_prefix}.{expert_count}.{gate_name}.weight"):
expert_count += 1
experts_prefix = None
for prefix in experts_prefix_candidates:
expert_count = 0
while self.has_tensor(f"{prefix}.{expert_count}.{gate_name}.weight"):
expert_count += 1
if expert_count > 0:
experts_prefix = prefix
break
if expert_count == 0:
raise ValueError(f"No experts found for key {experts_prefix}")
if expert_count == 0 or experts_prefix is None:
raise ValueError(f"No experts found for keys: {experts_prefix_candidates}")
gate_weights = [None] * expert_count
up_weights = [None] * expert_count
@ -375,16 +465,30 @@ class FP8SafeTensorLoader(SafeTensorLoader):
gate_w_key = f"{experts_prefix}.{exp_id}.{gate_name}.weight"
up_w_key = f"{experts_prefix}.{exp_id}.{up_name}.weight"
down_w_key = f"{experts_prefix}.{exp_id}.{down_name}.weight"
gate_s_key = f"{experts_prefix}.{exp_id}.{gate_name}.weight_scale_inv"
up_s_key = f"{experts_prefix}.{exp_id}.{up_name}.weight_scale_inv"
down_s_key = f"{experts_prefix}.{exp_id}.{down_name}.weight_scale_inv"
gate_s_key = f"{experts_prefix}.{exp_id}.{gate_name}.{self._scale_suffix}"
up_s_key = f"{experts_prefix}.{exp_id}.{up_name}.{self._scale_suffix}"
down_s_key = f"{experts_prefix}.{exp_id}.{down_name}.{self._scale_suffix}"
gate_weights[exp_id] = self.load_tensor(gate_w_key, device).contiguous()
up_weights[exp_id] = self.load_tensor(up_w_key, device).contiguous()
down_weights[exp_id] = self.load_tensor(down_w_key, device).contiguous()
gate_scales[exp_id] = self.load_tensor(gate_s_key, device).contiguous()
up_scales[exp_id] = self.load_tensor(up_s_key, device).contiguous()
down_scales[exp_id] = self.load_tensor(down_s_key, device).contiguous()
gate_scale = self.load_tensor(gate_s_key, device)
up_scale = self.load_tensor(up_s_key, device)
down_scale = self.load_tensor(down_s_key, device)
# For per-channel scales, squeeze [N, 1] -> [N] if needed
if self._is_per_channel:
if gate_scale.dim() == 2 and gate_scale.shape[1] == 1:
gate_scale = gate_scale.squeeze(1)
if up_scale.dim() == 2 and up_scale.shape[1] == 1:
up_scale = up_scale.squeeze(1)
if down_scale.dim() == 2 and down_scale.shape[1] == 1:
down_scale = down_scale.squeeze(1)
gate_scales[exp_id] = gate_scale.contiguous()
up_scales[exp_id] = up_scale.contiguous()
down_scales[exp_id] = down_scale.contiguous()
return {
"gate": gate_weights,
@ -395,6 +499,174 @@ class FP8SafeTensorLoader(SafeTensorLoader):
"down_scale": down_scales,
}
def is_per_channel(self) -> bool:
"""Return True if using per-channel quantization, False for block-wise."""
return self._is_per_channel
class BF16SafeTensorLoader(SafeTensorLoader):
"""Loader for native BF16 expert weights (no quantization, no scales).
Supported formats:
- DeepSeek style: {base}.mlp.experts.{id}.{gate,up,down}_proj.weight
- Mixtral/MiniMax style: {base}.block_sparse_moe.experts.{id}.{w1,w3,w2}.weight
- Mistral style: {base}.experts.{id}.{w1,w3,w2}.weight
The format is auto-detected during initialization.
"""
MOE_FORMATS = {
"deepseek": ("{base}.mlp.experts", "gate_proj", "up_proj", "down_proj"),
"mixtral": ("{base}.block_sparse_moe.experts", "w1", "w3", "w2"),
"mistral": ("{base}.experts", "w1", "w3", "w2"),
}
def __init__(self, file_path: str):
super().__init__(file_path)
self._detected_format = None
self._detect_format()
def _detect_format(self):
"""Auto-detect the MoE naming format by checking tensor keys."""
sample_keys = list(self.tensor_file_map.keys())[:1000]
# Check for packed format first (Qwen3.5 MoE style: all experts in one 3D tensor)
for key in sample_keys:
if key.endswith(".mlp.experts.gate_up_proj"):
self._detected_format = "packed"
print("[BF16SafeTensorLoader] Detected format: packed (Qwen3.5 MoE style)")
return
for fmt_name, (path_tpl, gate, up, down) in self.MOE_FORMATS.items():
for key in sample_keys:
if ".experts." in key and f".{gate}.weight" in key:
if "block_sparse_moe.experts" in key and fmt_name == "mixtral":
self._detected_format = fmt_name
print(f"[BF16SafeTensorLoader] Detected format: {fmt_name}")
return
elif "mlp.experts" in key and "block_sparse_moe" not in key and fmt_name == "deepseek":
self._detected_format = fmt_name
print(f"[BF16SafeTensorLoader] Detected format: {fmt_name}")
return
elif fmt_name == "mistral" and ".mlp.experts" not in key and ".block_sparse_moe.experts" not in key:
self._detected_format = fmt_name
print(f"[BF16SafeTensorLoader] Detected format: {fmt_name}")
return
self._detected_format = "deepseek"
print("[BF16SafeTensorLoader] No MoE format detected, defaulting to: deepseek")
def _get_experts_prefix_candidates(self, base_key: str) -> list[str]:
"""Get candidate experts prefixes based on detected format and base key variants."""
path_tpl, _, _, _ = self.MOE_FORMATS[self._detected_format]
candidates = [path_tpl.format(base=base_key)]
# Some model weights (e.g., Mistral native format) do not have "model." prefix.
if base_key.startswith("model."):
candidates.append(path_tpl.format(base=base_key[len("model.") :]))
return list(dict.fromkeys(candidates))
def _get_proj_names(self):
"""Get projection names (gate, up, down) based on detected format."""
_, gate, up, down = self.MOE_FORMATS[self._detected_format]
return gate, up, down
def load_tensor(self, key: str, device: str = "cpu"):
if key not in self.tensor_file_map:
raise KeyError(f"Key {key} not found in Safetensor files")
file = self.tensor_file_map[key]
f = self.file_handle_map.get(file)
if f is None:
raise FileNotFoundError(f"File {file} not found in Safetensor files")
tensor = f.get_tensor(key)
if device == "cpu":
return tensor
return tensor.to(device)
def load_experts(self, base_key: str, device: str = "cpu"):
"""Load BF16 expert weights (no scales needed)."""
if self._detected_format == "packed":
return self._load_experts_packed(base_key, device)
experts_prefix_candidates = self._get_experts_prefix_candidates(base_key)
gate_name, up_name, down_name = self._get_proj_names()
expert_count = 0
experts_prefix = None
for prefix in experts_prefix_candidates:
expert_count = 0
while self.has_tensor(f"{prefix}.{expert_count}.{gate_name}.weight"):
expert_count += 1
if expert_count > 0:
experts_prefix = prefix
break
if expert_count == 0 or experts_prefix is None:
raise ValueError(f"No experts found for keys: {experts_prefix_candidates}")
gate_weights = [None] * expert_count
up_weights = [None] * expert_count
down_weights = [None] * expert_count
for exp_id in range(expert_count):
gate_w_key = f"{experts_prefix}.{exp_id}.{gate_name}.weight"
up_w_key = f"{experts_prefix}.{exp_id}.{up_name}.weight"
down_w_key = f"{experts_prefix}.{exp_id}.{down_name}.weight"
gate_weights[exp_id] = self.load_tensor(gate_w_key, device).contiguous()
up_weights[exp_id] = self.load_tensor(up_w_key, device).contiguous()
down_weights[exp_id] = self.load_tensor(down_w_key, device).contiguous()
return {
"gate": gate_weights,
"up": up_weights,
"down": down_weights,
}
def _resolve_packed_experts_prefix(self, base_key: str) -> str:
"""Resolve the experts prefix for packed format, trying fallbacks."""
# Direct: model.layers.{N}.mlp.experts
experts_prefix = f"{base_key}.mlp.experts"
if self.has_tensor(f"{experts_prefix}.gate_up_proj"):
return experts_prefix
# VL models: model.layers.{N} -> model.language_model.layers.{N}
parts = base_key.split(".", 1)
if len(parts) == 2:
alt_base = f"{parts[0]}.language_model.{parts[1]}"
experts_prefix = f"{alt_base}.mlp.experts"
if self.has_tensor(f"{experts_prefix}.gate_up_proj"):
return experts_prefix
raise ValueError(f"No packed experts found for base_key '{base_key}'.")
def _load_experts_packed(self, base_key: str, device: str = "cpu"):
"""Load packed expert weights (Qwen3.5 MoE style).
Packed format stores all experts in stacked 3D tensors:
- gate_up_proj: [num_experts, 2 * intermediate_size, hidden_size]
- down_proj: [num_experts, hidden_size, intermediate_size]
"""
experts_prefix = self._resolve_packed_experts_prefix(base_key)
gate_up_key = f"{experts_prefix}.gate_up_proj"
down_key = f"{experts_prefix}.down_proj"
gate_up = self.load_tensor(gate_up_key, device) # [E, 2*I, H]
down = self.load_tensor(down_key, device) # [E, H, I]
mid = gate_up.shape[1] // 2
gate_list = [gate_up[i, :mid, :].contiguous() for i in range(gate_up.shape[0])]
up_list = [gate_up[i, mid:, :].contiguous() for i in range(gate_up.shape[0])]
down_list = [down[i].contiguous() for i in range(down.shape[0])]
return {
"gate": gate_list,
"up": up_list,
"down": down_list,
}
class CompressedSafeTensorLoader(SafeTensorLoader):
"""Loader for compressed SafeTensor layouts (RAWINT4 weights)."""
@ -409,7 +681,12 @@ class CompressedSafeTensorLoader(SafeTensorLoader):
expert_idx += 1
if expert_idx == 0:
raise ValueError(f"No experts found for key {experts_prefix}")
experts_prefix = f"language_model.{base_key}.mlp.experts"
expert_idx = 0
while self.has_tensor(f"{experts_prefix}.{expert_idx}.up_proj.weight_packed"):
expert_idx += 1
if expert_idx == 0:
raise ValueError(f"No experts found for key {experts_prefix}")
def load_projection(proj_name: str):
weight_entries = []
@ -837,3 +1114,120 @@ class GGUFLoader:
data = torch.from_numpy(np.frombuffer(data_bytes, dtype=np.uint8).copy())
return data, ggml_type
class GPTQSafeTensorLoader(FP8SafeTensorLoader):
"""Loader for symmetric GPTQ-Int4 expert weights (qweight + scales, no qzeros).
Only supports sym=true, desc_act=false GPTQ models.
Tensor keys:
- qweight: {prefix}.{id}.{proj}.qweight (int32, packed 8x4-bit along K)
- scales: {prefix}.{id}.{proj}.scales (fp16 -> converted to fp32)
"""
def __init__(self, file_path: str):
# Call FP8SafeTensorLoader init (which calls SafeTensorLoader init + format detection)
super().__init__(file_path, scale_suffix="scales")
# Verify GPTQ config
self._verify_gptq_config(file_path)
def _detect_format(self):
"""Override FP8 format detection to look for .qweight instead of .weight."""
sample_keys = list(self.tensor_file_map.keys())[:2000]
for fmt_name, (path_tpl, gate, up, down) in self.MOE_FORMATS.items():
for key in sample_keys:
if ".experts." in key and f".{gate}.qweight" in key:
if "block_sparse_moe.experts" in key and fmt_name == "mixtral":
self._detected_format = fmt_name
break
elif "mlp.experts" in key and "block_sparse_moe" not in key and fmt_name == "deepseek":
self._detected_format = fmt_name
# Check for VL model (language_model prefix)
if "language_model." in key:
self._is_vl_model = True
break
elif fmt_name == "mistral" and "block_sparse_moe" not in key and "mlp" not in key:
self._detected_format = fmt_name
break
if self._detected_format is not None:
break
if self._detected_format is None:
self._detected_format = "deepseek"
vl_str = " (VL model)" if self._is_vl_model else ""
print(f"[GPTQSafeTensorLoader] Detected format: {self._detected_format}{vl_str}")
def _verify_gptq_config(self, file_path):
"""Check that the model uses sym=true, desc_act=false."""
import json
import os
config_path = os.path.join(os.path.dirname(file_path), "config.json")
if not os.path.exists(config_path):
# Try parent directory
config_path = os.path.join(file_path, "config.json")
if os.path.exists(config_path):
with open(config_path) as f:
config = json.load(f)
qc = config.get("quantization_config", {})
if qc.get("quant_method") == "gptq":
if qc.get("desc_act", False):
raise NotImplementedError(
"GPTQ desc_act=true is not supported. Only desc_act=false models are supported."
)
if not qc.get("sym", True):
raise NotImplementedError(
"GPTQ sym=false (asymmetric) is not supported. Only sym=true models are supported."
)
print(f"[GPTQSafeTensorLoader] Verified: sym={qc.get('sym')}, desc_act={qc.get('desc_act')}, "
f"bits={qc.get('bits')}, group_size={qc.get('group_size')}")
def load_experts(self, base_key: str, device: str = "cpu"):
"""Load GPTQ expert qweight and scales.
Returns dict with keys: gate, up, down (qweight int32), gate_scale, up_scale, down_scale (fp32).
"""
experts_prefix_candidates = self._get_experts_prefix_candidates(base_key)
gate_name, up_name, down_name = self._get_proj_names()
expert_count = 0
experts_prefix = None
for prefix in experts_prefix_candidates:
expert_count = 0
while self.has_tensor(f"{prefix}.{expert_count}.{gate_name}.qweight"):
expert_count += 1
if expert_count > 0:
experts_prefix = prefix
break
if expert_count == 0 or experts_prefix is None:
raise ValueError(f"No GPTQ experts found for keys: {experts_prefix_candidates}")
gate_weights = [None] * expert_count
up_weights = [None] * expert_count
down_weights = [None] * expert_count
gate_scales = [None] * expert_count
up_scales = [None] * expert_count
down_scales = [None] * expert_count
for exp_id in range(expert_count):
gate_weights[exp_id] = self.load_tensor(f"{experts_prefix}.{exp_id}.{gate_name}.qweight", device).contiguous()
up_weights[exp_id] = self.load_tensor(f"{experts_prefix}.{exp_id}.{up_name}.qweight", device).contiguous()
down_weights[exp_id] = self.load_tensor(f"{experts_prefix}.{exp_id}.{down_name}.qweight", device).contiguous()
gate_scales[exp_id] = self.load_tensor(f"{experts_prefix}.{exp_id}.{gate_name}.scales", device).float().contiguous()
up_scales[exp_id] = self.load_tensor(f"{experts_prefix}.{exp_id}.{up_name}.scales", device).float().contiguous()
down_scales[exp_id] = self.load_tensor(f"{experts_prefix}.{exp_id}.{down_name}.scales", device).float().contiguous()
print(f"[GPTQSafeTensorLoader] Loaded {expert_count} experts from {experts_prefix}")
return {
"gate": gate_weights,
"up": up_weights,
"down": down_weights,
"gate_scale": gate_scales,
"up_scale": up_scales,
"down_scale": down_scales,
}