Merge pull request #27 from Lizonghang/lizh_dev

Fix seq_id mismatch between the head and worker devices.
This commit is contained in:
Zonghang Li 2025-06-11 17:12:08 +04:00 committed by GitHub
commit e50b3aa473
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 65 additions and 62 deletions

View file

@ -1059,13 +1059,9 @@ struct server_context {
} }
void kv_cache_clear() { void kv_cache_clear() {
SRV_DBG("%s", "clearing KV cache\n"); SRV_DBG("%s", "clearing all KV cache\n");
// clear the entire KV cache
llama_kv_cache_clear(ctx); llama_kv_cache_clear(ctx);
llama_send_kv_cache_clear(ctx); llama_send_kv_cache_clear(ctx);
clean_kv_cache = false; clean_kv_cache = false;
} }
@ -1090,7 +1086,7 @@ struct server_context {
llama_batch_add(batch, system_tokens[i + j], i + j, { 0 }, false); llama_batch_add(batch, system_tokens[i + j], i + j, { 0 }, false);
} }
if (llama_decode(ctx, batch) != 0) { if (llama_decode(ctx, batch, true) != 0) {
SRV_ERR("%s", "llama_decode() failed\n"); SRV_ERR("%s", "llama_decode() failed\n");
return; return;
} }
@ -2311,7 +2307,7 @@ struct server_context {
0, 0, 0, // unused 0, 0, 0, // unused
}; };
const int ret = llama_decode(ctx, batch_view); const int ret = llama_decode(ctx, batch_view, true);
metrics.on_decoded(slots); metrics.on_decoded(slots);
if (ret != 0) { if (ret != 0) {

View file

@ -957,7 +957,8 @@ extern "C" {
// < 0 - error // < 0 - error
LLAMA_API int32_t llama_decode( LLAMA_API int32_t llama_decode(
struct llama_context * ctx, struct llama_context * ctx,
struct llama_batch batch); struct llama_batch batch,
bool server_mode = false);
// Set the number of threads used for decoding // Set the number of threads used for decoding
// n_threads is the number of threads used for generation (single token) // n_threads is the number of threads used for generation (single token)

View file

@ -7472,10 +7472,7 @@ static void llm_load_qwen2_tensors(
uint32_t n_world, uint32_t n_world,
uint32_t my_rank, uint32_t my_rank,
const uint32_t * n_layer_window, const uint32_t * n_layer_window,
bool * use_mmap_buffer,
bool set_needed) { bool set_needed) {
(void)use_mmap_buffer; // unused in this function
const auto tn = LLM_TN(model.arch); const auto tn = LLM_TN(model.arch);
ggml_context * ctx_input = nullptr; ggml_context * ctx_input = nullptr;
@ -7590,10 +7587,10 @@ static bool llm_load_tensors_impl(
GGML_ASSERT(local_i != -1); GGML_ASSERT(local_i != -1);
if (local_i % window_size >= window_size - n_gpu_layers) { if (local_i % window_size >= window_size - n_gpu_layers) {
// LLAMA_LOG_INFO("Layer %i assigned to gpu (cache index %i)\n", i, local_i); LLAMA_LOG_DEBUG("Layer %i assigned to gpu (cache index %i)\n", i, local_i);
model.buft_layer[local_i] = llama_default_buffer_type_offload(model, main_gpu); model.buft_layer[local_i] = llama_default_buffer_type_offload(model, main_gpu);
} else { } else {
// LLAMA_LOG_INFO("Layer %i assigned to cpu (cache index %i)\n", i, local_i); LLAMA_LOG_DEBUG("Layer %i assigned to cpu (cache index %i)\n", i, local_i);
model.buft_layer[local_i] = llama_default_buffer_type_cpu(model, true); model.buft_layer[local_i] = llama_default_buffer_type_cpu(model, true);
} }
} }
@ -7603,8 +7600,8 @@ static bool llm_load_tensors_impl(
if (my_rank == 0) { if (my_rank == 0) {
model.buft_input = llama_default_buffer_type_cpu(model, true); model.buft_input = llama_default_buffer_type_cpu(model, true);
model.buft_output = llama_default_buffer_type_cpu(model, true); model.buft_output = llama_default_buffer_type_cpu(model, true);
// LLAMA_LOG_INFO("Layer input assigned to cpu\n"); LLAMA_LOG_DEBUG("Layer input assigned to cpu\n");
// LLAMA_LOG_INFO("Layer output assigned to cpu\n"); LLAMA_LOG_DEBUG("Layer output assigned to cpu\n");
} }
// count used buffer types // count used buffer types
@ -8212,7 +8209,7 @@ static bool llm_load_tensors_impl(
} }
} break; } break;
case LLM_ARCH_QWEN2: case LLM_ARCH_QWEN2:
llm_load_qwen2_tensors(ml, model, ctx_map, n_world, my_rank, n_layer_window, &use_mmap_buffer, true); llm_load_qwen2_tensors(ml, model, ctx_map, n_world, my_rank, n_layer_window, true);
break; break;
case LLM_ARCH_QWEN2MOE: case LLM_ARCH_QWEN2MOE:
{ {
@ -11182,8 +11179,6 @@ struct llm_build_context {
cur = llm_build_out_embd(ctx0, lctx, hparams, cb); cur = llm_build_out_embd(ctx0, lctx, hparams, cb);
// cur = ggml_get_rows(ctx0, cur, inp_out_ids);
cur = llm_build_norm(ctx0, cur, hparams, cur = llm_build_norm(ctx0, cur, hparams,
model.output_norm, NULL, model.output_norm, NULL,
LLM_NORM_RMS, cb, -1); LLM_NORM_RMS, cb, -1);
@ -12724,18 +12719,19 @@ struct llm_build_context {
} }
std::vector<ggml_cgraph *> build_qwen2() { std::vector<ggml_cgraph *> build_qwen2() {
// mutable variable, needed during the last layer of the computation to skip unused tokens
int32_t n_tokens = this->n_tokens;
const int64_t n_embd_head = hparams.n_embd_head_v; const int64_t n_embd_head = hparams.n_embd_head_v;
GGML_ASSERT(n_embd_head == hparams.n_embd_head_k); GGML_ASSERT(n_embd_head == hparams.n_embd_head_k);
GGML_ASSERT(n_embd_head == hparams.n_rot); GGML_ASSERT(n_embd_head == hparams.n_rot);
// create a vector to hold the subgraphs // create a vector to hold the subgraphs
std::vector<struct ggml_cgraph *> sub_gfs; std::vector<struct ggml_cgraph *> sub_gfs;
struct ggml_cgraph * sub_gf = nullptr; struct ggml_cgraph * sub_gf = nullptr;
struct ggml_tensor * cur = nullptr; struct ggml_tensor * cur = nullptr;
struct ggml_tensor * inpL = nullptr; struct ggml_tensor * inpL = nullptr;
struct ggml_tensor * inpB = nullptr; struct ggml_tensor * inpB = nullptr;
const uint32_t n_world = this->cparams.n_world; const uint32_t n_world = this->cparams.n_world;
const uint32_t my_rank = this->cparams.rank; const uint32_t my_rank = this->cparams.rank;
const uint32_t * n_layer_window = this->cparams.n_layer_window; const uint32_t * n_layer_window = this->cparams.n_layer_window;
@ -12751,7 +12747,7 @@ struct llm_build_context {
sub_gfs.push_back(sub_gf); sub_gfs.push_back(sub_gf);
sub_gf = nullptr; sub_gf = nullptr;
inpL = nullptr; inpL = nullptr;
} }
// inpB - contains the output embedding from other nodes // inpB - contains the output embedding from other nodes
@ -12763,6 +12759,7 @@ struct llm_build_context {
// KQ_mask (mask for 1 head, it will be broadcasted to all heads) // KQ_mask (mask for 1 head, it will be broadcasted to all heads)
struct ggml_tensor * KQ_mask = build_inp_KQ_mask(); struct ggml_tensor * KQ_mask = build_inp_KQ_mask();
const float kq_scale = hparams.f_attention_scale == 0.0f ? 1.0f/sqrtf(float(n_embd_head)) : hparams.f_attention_scale;
for (int il = 0; il < n_layer; ++il) { for (int il = 0; il < n_layer; ++il) {
if (!this_layer_is_mine(il, n_world, my_rank, n_layer_window)) { if (!this_layer_is_mine(il, n_world, my_rank, n_layer_window)) {
// if we have an active sub-graph, add it to the list // if we have an active sub-graph, add it to the list
@ -12771,7 +12768,6 @@ struct llm_build_context {
sub_gfs.push_back(sub_gf); sub_gfs.push_back(sub_gf);
sub_gf = nullptr; sub_gf = nullptr;
} }
// synchronous input tensor
if (inpL != inpB) { if (inpL != inpB) {
inpL = inpB; inpL = inpB;
} }
@ -12801,21 +12797,27 @@ struct llm_build_context {
// compute Q and K and RoPE them // compute Q and K and RoPE them
struct ggml_tensor * Qcur = llm_build_lora_mm(lctx, ctx0, model.layers[local_il].wq, cur); struct ggml_tensor * Qcur = llm_build_lora_mm(lctx, ctx0, model.layers[local_il].wq, cur);
cb(Qcur, "Qcur", il); cb(Qcur, "Qcur", il);
Qcur = ggml_add(ctx0, Qcur, model.layers[local_il].bq); if (model.layers[local_il].bq) {
cb(Qcur, "Qcur", il); Qcur = ggml_add(ctx0, Qcur, model.layers[local_il].bq);
cb(Qcur, "Qcur", il);
}
struct ggml_tensor * Kcur = llm_build_lora_mm(lctx, ctx0, model.layers[local_il].wk, cur); struct ggml_tensor * Kcur = llm_build_lora_mm(lctx, ctx0, model.layers[local_il].wk, cur);
cb(Kcur, "Kcur", il); cb(Kcur, "Kcur", il);
Kcur = ggml_add(ctx0, Kcur, model.layers[local_il].bk); if (model.layers[local_il].bk) {
cb(Kcur, "Kcur", il); Kcur = ggml_add(ctx0, Kcur, model.layers[local_il].bk);
cb(Kcur, "Kcur", il);
}
struct ggml_tensor * Vcur = llm_build_lora_mm(lctx, ctx0, model.layers[local_il].wv, cur); struct ggml_tensor * Vcur = llm_build_lora_mm(lctx, ctx0, model.layers[local_il].wv, cur);
cb(Vcur, "Vcur", il); cb(Vcur, "Vcur", il);
Vcur = ggml_add(ctx0, Vcur, model.layers[local_il].bv); if (model.layers[local_il].bv) {
cb(Vcur, "Vcur", il); Vcur = ggml_add(ctx0, Vcur, model.layers[local_il].bv);
cb(Vcur, "Vcur", il);
}
Qcur = ggml_rope_ext( Qcur = ggml_rope_ext(
ctx0, ggml_reshape_3d(ctx0, Qcur, n_embd_head, n_head, n_tokens), inp_pos, nullptr, ctx0, ggml_reshape_3d(ctx0, Qcur, n_embd_head, n_head, n_tokens), inp_pos, nullptr,
n_rot, rope_type, n_ctx_orig, freq_base, freq_scale, n_rot, rope_type, n_ctx_orig, freq_base, freq_scale,
ext_factor, attn_factor, beta_fast, beta_slow ext_factor, attn_factor, beta_fast, beta_slow
); );
@ -12830,7 +12832,7 @@ struct llm_build_context {
cur = llm_build_kv(ctx0, lctx, kv_self, sub_gf, cur = llm_build_kv(ctx0, lctx, kv_self, sub_gf,
model.layers[local_il].wo, model.layers[local_il].bo, model.layers[local_il].wo, model.layers[local_il].bo,
Kcur, Vcur, Qcur, KQ_mask, n_tokens, kv_head, n_kv, 1.0f/sqrtf(float(n_embd_head)), cb, il); Kcur, Vcur, Qcur, KQ_mask, n_tokens, kv_head, n_kv, kq_scale, cb, il);
} }
if (il == n_layer - 1) { if (il == n_layer - 1) {
@ -12840,7 +12842,7 @@ struct llm_build_context {
inpSA = ggml_get_rows(ctx0, inpSA, inp_out_ids); inpSA = ggml_get_rows(ctx0, inpSA, inp_out_ids);
} }
struct ggml_tensor * ffn_inp = ggml_add(ctx0, cur, inpSA); struct ggml_tensor * ffn_inp = ggml_add(ctx0, cur, inpSA); // shortcut
cb(ffn_inp, "ffn_inp", il); cb(ffn_inp, "ffn_inp", il);
// feed-forward network // feed-forward network
@ -12858,6 +12860,8 @@ struct llm_build_context {
cb(cur, "ffn_out", il); cb(cur, "ffn_out", il);
cur = ggml_add(ctx0, cur, ffn_inp); // shortcut cur = ggml_add(ctx0, cur, ffn_inp); // shortcut
cb(cur, "ffn_out", il);
cur = lctx.cvec.apply_to(ctx0, cur, il); cur = lctx.cvec.apply_to(ctx0, cur, il);
cb(cur, "l_out", il); cb(cur, "l_out", il);
@ -17034,7 +17038,6 @@ static std::vector<struct ggml_cgraph *> llama_build_graph(
} break; } break;
case LLM_ARCH_QWEN2: case LLM_ARCH_QWEN2:
{ {
// result.push_back(llm.build_qwen2());
result = llm.build_qwen2(); result = llm.build_qwen2();
} break; } break;
case LLM_ARCH_QWEN2MOE: case LLM_ARCH_QWEN2MOE:
@ -17846,7 +17849,7 @@ struct sync_meta {
int div_factor = 1; int div_factor = 1;
}; };
static void llama_send_meta(zmq::socket_t & socket, struct sync_meta * meta) { static void llama_send_meta(zmq::socket_t & socket, struct sync_meta * meta, bool align_seq_ids = false) {
GGML_ASSERT(meta != nullptr); GGML_ASSERT(meta != nullptr);
try { try {
std::vector<zmq::message_t> send_msgs; std::vector<zmq::message_t> send_msgs;
@ -17861,19 +17864,20 @@ static void llama_send_meta(zmq::socket_t & socket, struct sync_meta * meta) {
} }
if (meta->n_seq_id != nullptr) { if (meta->n_seq_id != nullptr) {
GGML_ASSERT(meta->n_ctx > 0); GGML_ASSERT(meta->n_tokens > 0);
send_msgs.emplace_back("n_seq_id", strlen("n_seq_id")); send_msgs.emplace_back("n_seq_id", strlen("n_seq_id"));
send_msgs.emplace_back(meta->n_seq_id, meta->n_ctx * sizeof(int32_t)); send_msgs.emplace_back(meta->n_seq_id, meta->n_tokens * sizeof(int32_t));
// here we assume only a single seq_id per token is needed // here we assume only a single seq_id per token is needed
// pack all single seq_id values into a contiguous array // pack all single seq_id values into a contiguous array
llama_seq_id * all_seq_ids = (llama_seq_id *) malloc(meta->n_ctx * sizeof(llama_seq_id)); llama_seq_id * all_seq_ids = (llama_seq_id *) malloc(meta->n_tokens * sizeof(llama_seq_id));
for (uint32_t i = 0; i < meta->n_ctx; ++i) { int seq_id_offset = align_seq_ids ? 1 : 0;
all_seq_ids[i] = meta->seq_id[i][0]; for (int32_t i = 0; i < meta->n_tokens; ++i) {
all_seq_ids[i] = meta->seq_id[i][0] - seq_id_offset;
} }
send_msgs.emplace_back("seq_id", strlen("seq_id")); send_msgs.emplace_back("seq_id", strlen("seq_id"));
send_msgs.emplace_back(all_seq_ids, meta->n_ctx * sizeof(llama_seq_id)); send_msgs.emplace_back(all_seq_ids, meta->n_tokens * sizeof(llama_seq_id));
free(all_seq_ids); free(all_seq_ids);
} }
@ -17963,18 +17967,18 @@ static int llama_recv_meta(zmq::socket_t & socket, struct sync_meta * meta) {
} }
if (key == "n_seq_id") { if (key == "n_seq_id") {
GGML_ASSERT(meta->n_ctx > 0); GGML_ASSERT(meta->n_tokens > 0);
GGML_ASSERT(data_msg.size() == meta->n_ctx * sizeof(int32_t)); GGML_ASSERT(data_msg.size() == meta->n_tokens * sizeof(int32_t));
meta->n_seq_id = (int32_t *) malloc(meta->n_ctx * sizeof(int32_t)); meta->n_seq_id = (int32_t *) malloc(meta->n_tokens * sizeof(int32_t));
std::memcpy(meta->n_seq_id, data_msg.data(), meta->n_ctx * sizeof(int32_t)); std::memcpy(meta->n_seq_id, data_msg.data(), meta->n_tokens * sizeof(int32_t));
} }
if (key == "seq_id") { if (key == "seq_id") {
GGML_ASSERT(meta->n_ctx > 0); GGML_ASSERT(meta->n_tokens > 0);
GGML_ASSERT(data_msg.size() == meta->n_ctx * sizeof(llama_seq_id)); GGML_ASSERT(data_msg.size() == meta->n_tokens * sizeof(llama_seq_id));
const llama_seq_id * all_seq_ids = (llama_seq_id *) data_msg.data(); const llama_seq_id * all_seq_ids = (llama_seq_id *) data_msg.data();
meta->seq_id = (llama_seq_id **) malloc(meta->n_ctx * sizeof(llama_seq_id *)); meta->seq_id = (llama_seq_id **) malloc(meta->n_tokens * sizeof(llama_seq_id *));
for (uint32_t i = 0; i < meta->n_ctx; ++i) { for (int32_t i = 0; i < meta->n_tokens; ++i) {
meta->seq_id[i] = (llama_seq_id *) malloc(sizeof(llama_seq_id)); meta->seq_id[i] = (llama_seq_id *) malloc(sizeof(llama_seq_id));
meta->seq_id[i][0] = all_seq_ids[i]; meta->seq_id[i][0] = all_seq_ids[i];
} }
@ -18200,7 +18204,8 @@ static void manage_graph_tensors(struct ggml_cgraph * cgraph, int advice, bool f
// //
static int llama_decode_internal( static int llama_decode_internal(
llama_context & lctx, llama_context & lctx,
llama_batch batch_all) { // TODO: rename back to batch llama_batch batch_all,
bool server_mode) {
const auto & model = lctx.model; const auto & model = lctx.model;
const auto & hparams = model.hparams; const auto & hparams = model.hparams;
const auto & cparams = lctx.cparams; const auto & cparams = lctx.cparams;
@ -18272,16 +18277,16 @@ static int llama_decode_internal(
if (meta.n_tokens > 0) { if (meta.n_tokens > 0) {
batch_all.n_tokens = meta.n_tokens; batch_all.n_tokens = meta.n_tokens;
if (meta.pos != nullptr) { if (meta.pos != nullptr) {
batch_all.pos = (llama_pos *) malloc(cparams.n_ctx * sizeof(llama_pos)); batch_all.pos = (llama_pos *) malloc(meta.n_ctx * sizeof(llama_pos));
std::memcpy(batch_all.pos, meta.pos, cparams.n_ctx * sizeof(llama_pos)); std::memcpy(batch_all.pos, meta.pos, meta.n_ctx * sizeof(llama_pos));
} }
if (meta.n_seq_id != nullptr) { if (meta.n_seq_id != nullptr) {
batch_all.n_seq_id = (int32_t *) malloc(cparams.n_ctx * sizeof(int32_t)); batch_all.n_seq_id = (int32_t *) malloc(meta.n_tokens * sizeof(int32_t));
std::memcpy(batch_all.n_seq_id, meta.n_seq_id, cparams.n_ctx * sizeof(int32_t)); std::memcpy(batch_all.n_seq_id, meta.n_seq_id, meta.n_tokens * sizeof(int32_t));
} }
if (meta.seq_id != nullptr) { if (meta.seq_id != nullptr) {
batch_all.seq_id = (llama_seq_id **) malloc(cparams.n_ctx * sizeof(llama_seq_id *)); batch_all.seq_id = (llama_seq_id **) malloc(meta.n_tokens * sizeof(llama_seq_id *));
for (size_t i = 0; i < cparams.n_ctx; ++i) { for (int32_t i = 0; i < meta.n_tokens; ++i) {
batch_all.seq_id[i] = (llama_seq_id *) malloc(sizeof(llama_seq_id)); batch_all.seq_id[i] = (llama_seq_id *) malloc(sizeof(llama_seq_id));
batch_all.seq_id[i][0] = meta.seq_id[i][0]; batch_all.seq_id[i][0] = meta.seq_id[i][0];
} }
@ -18343,7 +18348,7 @@ static int llama_decode_internal(
meta.logits = batch_all.logits; meta.logits = batch_all.logits;
meta.all_pos_0 = batch_all.all_pos_0; meta.all_pos_0 = batch_all.all_pos_0;
meta.all_pos_1 = batch_all.all_pos_1; meta.all_pos_1 = batch_all.all_pos_1;
llama_send_meta(*lctx.send_socket, &meta); llama_send_meta(*lctx.send_socket, &meta, server_mode);
} }
lctx.sbatch.from_batch(batch_all, n_embd, lctx.sbatch.from_batch(batch_all, n_embd,
@ -21896,7 +21901,7 @@ void llama_model_n_flops(
llm_load_llama_tensors(*ml, *model, ctx_map, 1, 0, n_layer_window, &use_mmap_buffer, false); llm_load_llama_tensors(*ml, *model, ctx_map, 1, 0, n_layer_window, &use_mmap_buffer, false);
break; break;
case LLM_ARCH_QWEN2: case LLM_ARCH_QWEN2:
llm_load_qwen2_tensors(*ml, *model, ctx_map, 1, 0, n_layer_window, &use_mmap_buffer, false); llm_load_qwen2_tensors(*ml, *model, ctx_map, 1, 0, n_layer_window, false);
break; break;
default: default:
throw std::runtime_error("unsupported architecture\n"); throw std::runtime_error("unsupported architecture\n");
@ -23481,8 +23486,9 @@ int32_t llama_encode(
int32_t llama_decode( int32_t llama_decode(
struct llama_context * ctx, struct llama_context * ctx,
struct llama_batch batch) { struct llama_batch batch,
return llama_decode_internal(*ctx, batch); bool server_mode) {
return llama_decode_internal(*ctx, batch, server_mode);
} }
void llama_synchronize(struct llama_context * ctx) { void llama_synchronize(struct llama_context * ctx) {