From 26bb86c09bc19e5e17549294ffddb5620b133f27 Mon Sep 17 00:00:00 2001 From: DeEMO Date: Wed, 14 May 2025 07:50:04 +0000 Subject: [PATCH 1/8] Add tune_layer_allocation Signed-off-by: DeEMO --- common/common.cpp | 64 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 5 deletions(-) diff --git a/common/common.cpp b/common/common.cpp index 55807f78..021f1640 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -1522,6 +1522,61 @@ static bool assign_layers_to_device( return true; } +static bool tune_layer_allocation( + uint32_t n_world, + uint32_t my_rank, + std::vector dev_infos, + uint32_t * n_layer_window, + uint32_t * n_gpu_layers, + struct llama_model * model, + const struct llama_context_params cparams, + float min_disk_read_speed = 0.1f) { + memset(n_layer_window, 0, n_world * sizeof(uint32_t)); + memset(n_gpu_layers, 0, n_world * sizeof(uint32_t)); + std::vector dev_infos_temp = dev_infos; + std::vector n_layer_windows_temp; + std::vector n_gpu_layers_temp; + while(n_world > 0) { + std::vector dev_infos_ = dev_infos_temp; + std::vector n_layer_windows_(n_world, 0); + std::vector n_gpu_layers_(n_world, 0); + if (!assign_layers_to_device(n_world, my_rank, dev_infos_.data(), + n_layer_windows_.data(), n_gpu_layers_.data(), model, cparams)) { + return false; + } + dev_infos_temp.clear(); + n_layer_windows_temp.clear(); + n_gpu_layers_temp.clear(); + for(auto i=0; i 1 || i==0 ) { + dev_infos_temp.push_back(dev_infos_[i]); + n_layer_windows_temp.push_back(n_layer_windows_[i]); + n_gpu_layers_temp.push_back(n_gpu_layers_[i]); + } + } + if(dev_infos_temp.size() == n_world) { + // no device be removed + break; + } + + n_world = dev_infos_temp.size(); + } + int i =0 , j =0; + while(j < n_world) { + if(dev_infos[i].rank == dev_infos_temp[j].rank){ + n_layer_window[i] = n_layer_windows_temp[j]; + n_gpu_layers[i] = n_gpu_layers_temp[j]; + j++; + i++; + } else { + n_layer_window[i] = 0; + n_gpu_layers[i] = 0; + i++; + } + } + return true; +} + // // Model utils // @@ -1613,15 +1668,14 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { // sychronize device profile to the master node if (my_rank == 0) { if (auto_schedule) { - struct device_info * dev_info_set = nullptr; - dev_info_set = (struct device_info *)malloc(n_world * sizeof(struct device_info)); + std::vector dev_info_set(n_world); dev_info_set[0] = dev_info; - llama_gather_device_info(lctx, dev_info_set); - device_print_props(dev_info_set, n_world, model, cparams); + llama_gather_device_info(lctx, dev_info_set.data()); + device_print_props(dev_info_set.data(), n_world, model, cparams); // automatically determine n_layer_window and n_gpu_layers - if (!assign_layers_to_device(n_world, my_rank, dev_info_set, n_layer_window, n_gpu_layers, model, cparams)) { + if (!tune_layer_allocation(n_world, my_rank, dev_info_set, n_layer_window, n_gpu_layers, model, cparams)) { LOG_ERR("%s: Invalid allocation by HiGHS solver\n", __func__); llama_free(lctx); llama_free_model(model); From fdd669463398483b81def2c7fbd723e65ed5a087 Mon Sep 17 00:00:00 2001 From: DeEMO Date: Thu, 15 May 2025 04:22:12 +0000 Subject: [PATCH 2/8] add topo rebuild Signed-off-by: DeEMO --- common/common.cpp | 10 +++++- common/profiler.h | 2 ++ include/llama.h | 1 + src/llama.cpp | 86 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 1 deletion(-) diff --git a/common/common.cpp b/common/common.cpp index 021f1640..991b34a5 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -1660,6 +1660,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { // get device profile LOG_INF("\nstart profiling this device, this may take some seconds ...\n"); dev_info.rank = params.rank; + dev_info.next_ip = params.next_node_ip.c_str(); if (n_world > 1) { llama_profile_device(&dev_info, model, ml, params.gpu_mem, params.n_predict, params.n_ctx, params.cpuparams.n_threads, params.flash_attn); } @@ -1682,6 +1683,9 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { return iparams; } llama_bcast_layer_setup(lctx, n_layer_window, n_gpu_layers); + + //rebuild topo + llama_rebuild_topo(lctx, n_layer_window, dev_info_set.data()); } else { // use the user-defined n_layer_window std::copy(std::begin(params.n_layer_window), std::end(params.n_layer_window), n_layer_window); @@ -1690,8 +1694,12 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { } else { if (auto_schedule){ llama_send_device_info(lctx, &dev_info); + llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers); + // rebuild topo + llama_rebuild_topo(lctx,n_layer_window, nullptr); + }else{ + llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers); } - llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers); } // update n_layer_window and n_gpu_layers diff --git a/common/profiler.h b/common/profiler.h index a685ff8c..a3110299 100644 --- a/common/profiler.h +++ b/common/profiler.h @@ -320,6 +320,7 @@ struct device_info { uint32_t rank; const char * device_name; const char * device_os; + const char * next_ip; struct disk_props disk; struct cpu_props cpu_props; struct memory_info memory; @@ -333,6 +334,7 @@ struct device_info { rank(0), device_name(""), device_os(""), + next_ip(""), disk(), cpu_props(), memory(), diff --git a/include/llama.h b/include/llama.h index 9f3da708..515dfd93 100644 --- a/include/llama.h +++ b/include/llama.h @@ -455,6 +455,7 @@ extern "C" { LLAMA_API int llama_send_device_info (struct llama_context * ctx, struct device_info * dev_info); LLAMA_API int llama_bcast_startup_args(struct llama_context * ctx, uint32_t rank, struct startup_args * args); LLAMA_API int llama_bcast_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers); + LLAMA_API int llama_rebuild_topo (struct llama_context * ctx, uint32_t * n_layer_window, struct device_info * dev_info_set); LLAMA_API int llama_recv_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers); LLAMA_API int llm_load_tensors( diff --git a/src/llama.cpp b/src/llama.cpp index 87ae83ac..8cc37213 100644 --- a/src/llama.cpp +++ b/src/llama.cpp @@ -20329,6 +20329,92 @@ int llama_bcast_layer_setup(struct llama_context * ctx, uint32_t * n_layer_windo return 0; } +LLAMA_API int llama_rebuild_topo(llama_context *ctx, + uint32_t *n_layer_window, + device_info *dev_info_set) { + uint32_t n_world = ctx->cparams.n_world; + uint32_t my_rank = ctx->cparams.rank; + std::vector msgs; + device_info* dev_info_ptr = nullptr; + if (dev_info_set == nullptr){ + // for rank!=0, recv all devices info + if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) { + return -1; + } + dev_info_ptr = new device_info[n_world]; + for (size_t i = 0; i < msgs.size(); i++) { + deserialize((const char *)msgs[i].data(), &dev_info_set[i]); + } + }else{ + char * buffer = nullptr; + for(size_t i = 0; i < n_world; i++) { + size_t buffer_size = serialize(&dev_info_set[i], &buffer); + msgs.emplace_back(buffer, buffer_size); + + free(buffer); + } + dev_info_ptr = dev_info_set; + } + + GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr); + GGML_ASSERT(msgs.size() == n_world); + + // notify next rank + auto next_rank = (my_rank + 1) % n_world; + if(n_layer_window[next_rank] <= 0){ + try { + ctx->send_socket->setsockopt(ZMQ_LINGER, 3500); + zmq::send_multipart(*ctx->send_socket, msgs); + } catch (const zmq::error_t& e) { + LLAMA_LOG_INFO("Failed to send data: %s\n", e.what()); + if(!dev_info_set){ + delete[] dev_info_ptr; + } + return -1; + } + } + + // check myself's layer + auto* socket_to_close = ctx->send_socket; + if(n_layer_window[my_rank] > 0) { + // reconstruct socket to the next valid rank + std::string next_ip; + auto current_rank = my_rank; + while(next_rank!=my_rank){ + if(n_layer_window[next_rank] > 0){ + next_ip = dev_info_ptr[next_rank].next_ip; + break; + } + next_rank = (next_rank + 1) % n_world; + current_rank = (current_rank + 1) % n_world; + } + if(!next_ip.empty()){ + ctx->send_socket = new zmq::socket_t(*ctx->sock_context, zmq::socket_type::push); + std::string send_endp = "tcp://" + next_ip + ":" + std::to_string(map_rank_to_port(next_rank, ctx->data_port)); + ctx->next_node_ip = next_ip; + try { + ctx->send_socket->connect(send_endp); + zmq::send_multipart(*ctx->send_socket, msgs); + } catch (const zmq::error_t &e) { + LLAMA_LOG_INFO("Error binding/connecting recv socket to endpoint: %s", e.what()); + if(!dev_info_set){ + delete[] dev_info_ptr; + } + return -1; + } + } + } + if(!dev_info_set){ + delete[] dev_info_ptr; + } + socket_to_close->close(); + delete socket_to_close; + if(n_layer_window[my_rank]<=0){ + exit(0); + } + return true; +} + int llama_recv_layer_setup(struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers) { uint32_t n_world = ctx->cparams.n_world; uint32_t my_rank = ctx->cparams.rank; From cc46aa9828f3ff3223d895cd92d124cdccc06dc3 Mon Sep 17 00:00:00 2001 From: DeEMO Date: Thu, 15 May 2025 13:57:16 +0800 Subject: [PATCH 3/8] update rank and n_world Signed-off-by: DeEMO --- common/common.cpp | 24 ++++++++++++++++++++++++ include/llama.h | 6 +++++- src/llama.cpp | 9 +++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/common/common.cpp b/common/common.cpp index 991b34a5..88b00075 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -1702,6 +1702,30 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { } } + //update rank and n_world for consistency + uint32_t update_rank = 0; + uint32_t update_n_world = 1; + std::vector n_layer_window_temp = {n_layer_window[0]}; + std::vector n_gpu_layers_temp = {n_gpu_layers[0]}; + for(auto i=1; icparams.rank = rank; + ctx->cparams.n_world = n_world; + } +} + struct llama_context * llama_new_context_with_model( struct llama_model * model, struct llama_context_params params) { From 4b36aef157e5234b3f971c9523214595859c3dec Mon Sep 17 00:00:00 2001 From: DeEMO Date: Thu, 15 May 2025 06:25:12 +0000 Subject: [PATCH 4/8] fix some bugs Signed-off-by: DeEMO --- common/common.cpp | 14 ++++++++++---- src/llama.cpp | 20 ++++++++++---------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/common/common.cpp b/common/common.cpp index 88b00075..a98337d3 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -1547,7 +1547,7 @@ static bool tune_layer_allocation( dev_infos_temp.clear(); n_layer_windows_temp.clear(); n_gpu_layers_temp.clear(); - for(auto i=0; i 1 || i==0 ) { dev_infos_temp.push_back(dev_infos_[i]); n_layer_windows_temp.push_back(n_layer_windows_[i]); @@ -1561,7 +1561,7 @@ static bool tune_layer_allocation( n_world = dev_infos_temp.size(); } - int i =0 , j =0; + uint32_t i =0 , j =0; while(j < n_world) { if(dev_infos[i].rank == dev_infos_temp[j].rank){ n_layer_window[i] = n_layer_windows_temp[j]; @@ -1701,13 +1701,19 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers); } } + if(n_layer_window[my_rank]<=0){ + LOG_INF("%s: info: rank %d has no layers to run, skipping\n", __func__, my_rank); + llama_free(lctx); + llama_free_model(model); + exit(0); + } //update rank and n_world for consistency uint32_t update_rank = 0; uint32_t update_n_world = 1; std::vector n_layer_window_temp = {n_layer_window[0]}; std::vector n_gpu_layers_temp = {n_gpu_layers[0]}; - for(auto i=1; icparams.n_world; + auto n_world = ctx->cparams.n_world; if (n_world == 1) { return 0; } @@ -20343,14 +20343,14 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, } dev_info_ptr = new device_info[n_world]; for (size_t i = 0; i < msgs.size(); i++) { - deserialize((const char *)msgs[i].data(), &dev_info_set[i]); + deserialize((const char *)msgs[i].data(), &dev_info_ptr[i]); } }else{ char * buffer = nullptr; for(size_t i = 0; i < n_world; i++) { size_t buffer_size = serialize(&dev_info_set[i], &buffer); msgs.emplace_back(buffer, buffer_size); - + free(buffer); } dev_info_ptr = dev_info_set; @@ -20361,9 +20361,9 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, // notify next rank auto next_rank = (my_rank + 1) % n_world; - if(n_layer_window[next_rank] <= 0){ + if(n_layer_window[next_rank] <= 0 && next_rank != 0){ try { - ctx->send_socket->setsockopt(ZMQ_LINGER, 3500); + ctx->send_socket->set(zmq::sockopt::linger, 3500); zmq::send_multipart(*ctx->send_socket, msgs); } catch (const zmq::error_t& e) { LLAMA_LOG_INFO("Failed to send data: %s\n", e.what()); @@ -20382,7 +20382,7 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, auto current_rank = my_rank; while(next_rank!=my_rank){ if(n_layer_window[next_rank] > 0){ - next_ip = dev_info_ptr[next_rank].next_ip; + next_ip = dev_info_ptr[current_rank].next_ip; break; } next_rank = (next_rank + 1) % n_world; @@ -20402,6 +20402,9 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, } return -1; } + }else{ + // only one node + ctx->next_node_ip = ""; } } if(!dev_info_set){ @@ -20409,10 +20412,7 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, } socket_to_close->close(); delete socket_to_close; - if(n_layer_window[my_rank]<=0){ - exit(0); - } - return true; + return 0; } int llama_recv_layer_setup(struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers) { From 0ad009a2f45363579219c3c21b24c191ea3a4e0f Mon Sep 17 00:00:00 2001 From: DeEMO Date: Fri, 16 May 2025 15:26:16 +0800 Subject: [PATCH 5/8] fix: update serialization and deserialization for next_ip in device_info Signed-off-by: DeEMO --- common/profiler.cpp | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/common/profiler.cpp b/common/profiler.cpp index 18b345a9..380bc5b0 100644 --- a/common/profiler.cpp +++ b/common/profiler.cpp @@ -2358,15 +2358,17 @@ size_t serialize(const struct device_info * dev_info, char ** buffer) { // calculate total size for serialized buffer size_t device_name_len = strlen(dev_info->device_name) + 1; size_t device_os_len = strlen(dev_info->device_os) + 1; + size_t next_ip_len = strlen(dev_info->next_ip) + 1; size_t cpu_name_len = strlen(dev_info->cpu_props.name) + 1; size_t cpu_description_len = strlen(dev_info->cpu_props.description) + 1; size_t gpu_name_len = strlen(dev_info->gpu_props.name) + 1; size_t gpu_description_len = strlen(dev_info->gpu_props.description) + 1; size_t total_size = sizeof(uint32_t) - + sizeof(size_t) * 6 // for lengths of strings + + sizeof(size_t) * 7 // for lengths of strings + device_name_len + device_os_len + + next_ip_len + cpu_name_len + cpu_description_len + gpu_name_len @@ -2426,6 +2428,11 @@ size_t serialize(const struct device_info * dev_info, char ** buffer) { memcpy(ptr, dev_info->device_os, device_os_len); ptr += device_os_len; + memcpy(ptr, &next_ip_len, sizeof(size_t)); + ptr += sizeof(size_t); + memcpy(ptr, dev_info->next_ip, next_ip_len); + ptr += next_ip_len; + memcpy(ptr, &cpu_name_len, sizeof(size_t)); ptr += sizeof(size_t); memcpy(ptr, dev_info->cpu_props.name, cpu_name_len); @@ -2611,6 +2618,14 @@ void deserialize(const char * buffer, struct device_info * dev_info) { memcpy(const_cast(static_cast(dev_info->device_os)), ptr, device_os_len); ptr += device_os_len; + // next ip + size_t next_ip_len; + memcpy(&next_ip_len, ptr, sizeof(size_t)); + ptr += sizeof(size_t); + dev_info->next_ip = (char *)malloc(next_ip_len); + memcpy(const_cast(static_cast(dev_info->next_ip)), ptr, next_ip_len); + ptr += next_ip_len; + // cpu_props.name size_t cpu_name_len; memcpy(&cpu_name_len, ptr, sizeof(size_t)); From df16b1876f316958dbef6e3843f63e1888cb4725 Mon Sep 17 00:00:00 2001 From: DeEMO Date: Fri, 16 May 2025 16:02:25 +0800 Subject: [PATCH 6/8] refactor: add zmq helper to generate message Signed-off-by: DeEMO --- src/llama.cpp | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/src/llama.cpp b/src/llama.cpp index 5c640b90..a0b91edd 100644 --- a/src/llama.cpp +++ b/src/llama.cpp @@ -161,6 +161,19 @@ static void zeros(std::ofstream & file, size_t n) { } } +// zmq helpers +static std::vector dev_infos_to_messages(const device_info* infos, + uint32_t n_world){ + std::vector res; + for (uint32_t i = 0; i < n_world; ++i) { + char * buffer = nullptr; + size_t buffer_size = serialize(&infos[i], &buffer); + res.emplace_back(buffer, buffer_size); + free(buffer); + } + return res; +} + LLAMA_ATTRIBUTE_FORMAT(1, 2) static std::string format(const char * fmt, ...) { va_list ap; @@ -20334,10 +20347,10 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, device_info *dev_info_set) { uint32_t n_world = ctx->cparams.n_world; uint32_t my_rank = ctx->cparams.rank; - std::vector msgs; device_info* dev_info_ptr = nullptr; if (dev_info_set == nullptr){ // for rank!=0, recv all devices info + std::vector msgs; if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) { return -1; } @@ -20345,24 +20358,18 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, for (size_t i = 0; i < msgs.size(); i++) { deserialize((const char *)msgs[i].data(), &dev_info_ptr[i]); } + GGML_ASSERT(msgs.size() == n_world); }else{ - char * buffer = nullptr; - for(size_t i = 0; i < n_world; i++) { - size_t buffer_size = serialize(&dev_info_set[i], &buffer); - msgs.emplace_back(buffer, buffer_size); - - free(buffer); - } dev_info_ptr = dev_info_set; } GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr); - GGML_ASSERT(msgs.size() == n_world); // notify next rank auto next_rank = (my_rank + 1) % n_world; if(n_layer_window[next_rank] <= 0 && next_rank != 0){ try { + auto msgs = dev_infos_to_messages(dev_info_ptr, n_world); ctx->send_socket->set(zmq::sockopt::linger, 3500); zmq::send_multipart(*ctx->send_socket, msgs); } catch (const zmq::error_t& e) { @@ -20394,6 +20401,7 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, ctx->next_node_ip = next_ip; try { ctx->send_socket->connect(send_endp); + auto msgs = dev_infos_to_messages(dev_info_ptr, n_world); zmq::send_multipart(*ctx->send_socket, msgs); } catch (const zmq::error_t &e) { LLAMA_LOG_INFO("Error binding/connecting recv socket to endpoint: %s", e.what()); @@ -20477,7 +20485,7 @@ void llama_free_sockets(struct llama_context * ctx, char ** msg) { void llama_update_context_with_rankworld(struct llama_context * ctx, uint32_t rank, - uint32_t n_world) { + uint32_t n_world) { if(ctx) { ctx->cparams.rank = rank; ctx->cparams.n_world = n_world; From 8b61cb2fa4b30952bc303a587dc1d77bfc8af5dd Mon Sep 17 00:00:00 2001 From: DeEMO Date: Fri, 16 May 2025 17:03:36 +0800 Subject: [PATCH 7/8] fix: adapt the new topo Signed-off-by: DeEMO --- common/common.cpp | 8 ++++++++ examples/main/main.cpp | 7 +++++-- src/llama.cpp | 6 +++++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/common/common.cpp b/common/common.cpp index a98337d3..35d285c6 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -1731,6 +1731,14 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { n_gpu_layers[i] = n_gpu_layers_temp[i]; } llama_update_context_with_rankworld(lctx, update_rank, update_n_world); + cparams.rank = update_rank; + cparams.n_world = update_n_world; + mparams.rank = update_rank; + mparams.n_world = update_n_world; + params.rank = update_rank; + params.n_world = update_n_world; + my_rank = update_rank; + n_world = update_n_world; // update n_layer_window and n_gpu_layers std::copy(std::begin(n_layer_window), std::end(n_layer_window), params.n_layer_window); diff --git a/examples/main/main.cpp b/examples/main/main.cpp index 39d4b60c..04680373 100644 --- a/examples/main/main.cpp +++ b/examples/main/main.cpp @@ -143,8 +143,8 @@ int main(int argc, char ** argv) { return 1; } - const uint32_t n_world = params.n_world; - const uint32_t my_rank = params.rank; + uint32_t n_world = params.n_world; + uint32_t my_rank = params.rank; GGML_ASSERT(!(n_world == 1 && my_rank > 0)); // check if --n-layer-window and --world is matched @@ -200,6 +200,9 @@ int main(int argc, char ** argv) { // load the model and apply lora adapter, if any LOG_INF("%s: load the model and apply lora adapter, if any\n", __func__); llama_init_result llama_init = llama_init_from_gpt_params(params); + // update + my_rank = params.rank; + n_world = params.n_world; model = llama_init.model; ctx = llama_init.context; diff --git a/src/llama.cpp b/src/llama.cpp index a0b91edd..f083e8e5 100644 --- a/src/llama.cpp +++ b/src/llama.cpp @@ -2585,6 +2585,7 @@ static_assert(std::is_trivially_copyable::value, "llama_hparams m struct llama_cparams { uint32_t n_world; uint32_t rank; + uint32_t original_next_rank; // original rank of the next node uint32_t n_layer_window[32]; bool prefetch; bool force; @@ -20399,6 +20400,7 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, ctx->send_socket = new zmq::socket_t(*ctx->sock_context, zmq::socket_type::push); std::string send_endp = "tcp://" + next_ip + ":" + std::to_string(map_rank_to_port(next_rank, ctx->data_port)); ctx->next_node_ip = next_ip; + ctx->cparams.original_next_rank = next_rank; try { ctx->send_socket->connect(send_endp); auto msgs = dev_infos_to_messages(dev_info_ptr, n_world); @@ -20457,7 +20459,8 @@ int llama_recv_layer_setup(struct llama_context * ctx, uint32_t * n_layer_window void llama_free_sockets(struct llama_context * ctx, char ** msg) { const uint32_t n_world = ctx->cparams.n_world; const uint32_t my_rank = ctx->cparams.rank; - const uint32_t next_rank = (my_rank + 1) % n_world; + // to adapt to the new topology, use old next_rank + const uint32_t next_rank = ctx->cparams.original_next_rank; if (n_world == 1) { return; @@ -20508,6 +20511,7 @@ struct llama_context * llama_new_context_with_model( ctx->cparams.n_world = params.n_world; ctx->cparams.rank = params.rank; ctx->cparams.force = params.force; + ctx->cparams.original_next_rank = (params.rank + 1) % params.n_world; return ctx; } From 34eaa8224d18952c92a81ebaf341ba26c30347fa Mon Sep 17 00:00:00 2001 From: DeEMO Date: Fri, 16 May 2025 20:48:51 +0800 Subject: [PATCH 8/8] fix: handle socket closure and connection in llama_rebuild_topo Signed-off-by: DeEMO --- src/llama.cpp | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/llama.cpp b/src/llama.cpp index f083e8e5..121a00b6 100644 --- a/src/llama.cpp +++ b/src/llama.cpp @@ -20383,7 +20383,7 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, } // check myself's layer - auto* socket_to_close = ctx->send_socket; + zmq::socket_t* socket_to_close = nullptr; if(n_layer_window[my_rank] > 0) { // reconstruct socket to the next valid rank std::string next_ip; @@ -20397,20 +20397,25 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, current_rank = (current_rank + 1) % n_world; } if(!next_ip.empty()){ - ctx->send_socket = new zmq::socket_t(*ctx->sock_context, zmq::socket_type::push); - std::string send_endp = "tcp://" + next_ip + ":" + std::to_string(map_rank_to_port(next_rank, ctx->data_port)); - ctx->next_node_ip = next_ip; - ctx->cparams.original_next_rank = next_rank; - try { + if((my_rank+1)%n_world != next_rank){ + socket_to_close = ctx->send_socket; + ctx->send_socket = new zmq::socket_t(*ctx->sock_context, zmq::socket_type::push); + std::string send_endp = "tcp://" + next_ip + ":" + std::to_string(map_rank_to_port(next_rank, ctx->data_port)); ctx->send_socket->connect(send_endp); - auto msgs = dev_infos_to_messages(dev_info_ptr, n_world); - zmq::send_multipart(*ctx->send_socket, msgs); - } catch (const zmq::error_t &e) { - LLAMA_LOG_INFO("Error binding/connecting recv socket to endpoint: %s", e.what()); - if(!dev_info_set){ - delete[] dev_info_ptr; + ctx->next_node_ip = next_ip; + ctx->cparams.original_next_rank = next_rank; + } + if(next_rank != 0){ + try { + auto msgs = dev_infos_to_messages(dev_info_ptr, n_world); + zmq::send_multipart(*ctx->send_socket, msgs); + } catch (const zmq::error_t &e) { + LLAMA_LOG_INFO("Error binding/connecting recv socket to endpoint: %s", e.what()); + if(!dev_info_set){ + delete[] dev_info_ptr; + } + return -1; } - return -1; } }else{ // only one node @@ -20420,8 +20425,10 @@ LLAMA_API int llama_rebuild_topo(llama_context *ctx, if(!dev_info_set){ delete[] dev_info_ptr; } - socket_to_close->close(); - delete socket_to_close; + if(socket_to_close != nullptr){ + socket_to_close->close(); + delete socket_to_close; + } return 0; }