diff --git a/common/common.cpp b/common/common.cpp index 1aba9933..f5b4cffb 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -1778,7 +1778,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { dev_info_set[0] = dev_info; llama_gather_device_info(lctx, dev_info_set.data()); - device_print_props(dev_info_set.data(), n_world, model, cparams); + device_print_props (dev_info_set.data(), n_world, model, cparams); // assign layers to devices and remove weak devices if (!assign_layers_and_select_devices(n_world, dev_info_set, n_layer_window, n_gpu_layers, model, cparams)) { @@ -1788,8 +1788,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { return iparams; } llama_bcast_layer_setup(lctx, n_layer_window, n_gpu_layers); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); // add a delay to avoid packet interleaving - llama_rebuild_topo(lctx, n_layer_window, dev_info_set.data(), &node_type, is_forwarder); + llama_rebuild_topo (lctx, n_layer_window, dev_info_set.data(), &node_type, is_forwarder); } else { // use the user-defined n_layer_window std::copy(std::begin(params.n_layer_window), std::end(params.n_layer_window), n_layer_window); @@ -1797,12 +1796,11 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { } } else { if (auto_schedule){ - llama_send_device_info(lctx, &dev_info); - llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); // add a delay to avoid packet interleaving - llama_rebuild_topo (lctx, n_layer_window, nullptr, &node_type, is_forwarder); + llama_send_device_info (lctx, &dev_info); + llama_recv_layer_setup (lctx, n_layer_window, n_gpu_layers); + llama_rebuild_topo (lctx, n_layer_window, nullptr, &node_type, is_forwarder); } else { - llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers); + llama_recv_layer_setup (lctx, n_layer_window, n_gpu_layers); } } @@ -1823,17 +1821,13 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { if (n_layer_window[i] <= 0 && is_forwarder[i] == 0) { continue; } - if (i <= my_rank) { - update_rank++; - } + if (i <= my_rank) update_rank++; update_n_world++; n_layer_window_temp.push_back(n_layer_window[i]); n_gpu_layers_temp.push_back(n_gpu_layers[i]); if (n_layer_window[i] > 0) { - if (i <= my_rank) { - worker_rank++; - } + if (i <= my_rank) worker_rank++; n_worker++; } } diff --git a/src/llama.cpp b/src/llama.cpp index 7b07a291..b70f338b 100644 --- a/src/llama.cpp +++ b/src/llama.cpp @@ -20518,12 +20518,11 @@ void llama_init_sockets(struct llama_context * ctx, uint32_t n_world, uint32_t m int llama_gather_device_info(struct llama_context * ctx, struct device_info * dev_info_set) { uint32_t n_world = ctx->cparams.n_world; - if (n_world == 1) { - return 0; - } - + if (n_world == 1) return 0; GGML_ASSERT(dev_info_set != nullptr); GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr); + + // master rank sends its device info to its next rank try { char * buffer = nullptr; size_t buffer_size = serialize(&dev_info_set[0], &buffer); @@ -20538,6 +20537,7 @@ int llama_gather_device_info(struct llama_context * ctx, struct device_info * de return -1; } + // master rank receives aggregated device info from its previous rank (a barrier op) std::vector recv_msgs; if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(recv_msgs))) { return -1; @@ -20551,6 +20551,7 @@ int llama_gather_device_info(struct llama_context * ctx, struct device_info * de } int llama_send_device_info(struct llama_context * ctx, struct device_info * dev_info) { + // non-master ranks receive device info from their previous rank std::vector recv_msgs; if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(recv_msgs))) { return -1; @@ -20559,6 +20560,7 @@ int llama_send_device_info(struct llama_context * ctx, struct device_info * dev_ GGML_ASSERT(dev_info != nullptr); GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr); + // non-master ranks aggregate their device info and forward it to their next rank try { char * buffer = nullptr; size_t buffer_size = serialize(dev_info, &buffer); @@ -20577,79 +20579,95 @@ int llama_send_device_info(struct llama_context * ctx, struct device_info * dev_ int llama_bcast_startup_args(llama_context * ctx, uint32_t rank, startup_args * args) { int32_t n_world = ctx->cparams.n_world; - GGML_ASSERT(n_world > 0); + if (n_world == 1) return 0; GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr); + std::vector msgs; + if (rank == 0){ - // send + // master rank sends its startup args to its next rank try { - std::vector send_msgs; - - send_msgs.emplace_back("should_profile", strlen("should_profile")); - send_msgs.emplace_back(&args->should_profile, sizeof(args->should_profile)); - - send_msgs.emplace_back("n_ctx", strlen("n_ctx")); - send_msgs.emplace_back(&args->n_ctx, sizeof(args->n_ctx)); - - zmq::send_multipart(*ctx->send_socket, send_msgs); + msgs.emplace_back("should_profile", strlen("should_profile")); + msgs.emplace_back(&args->should_profile, sizeof(args->should_profile)); + msgs.emplace_back("n_ctx", strlen("n_ctx")); + msgs.emplace_back(&args->n_ctx, sizeof(args->n_ctx)); + zmq::send_multipart(*ctx->send_socket, msgs); } catch (const zmq::error_t& e) { LLAMA_LOG_INFO("Failed to send data: %s\n", e.what()); return -1; } - } else { - // receive - std::vector recv_msgs; - if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(recv_msgs))) { + + // master rank receives ack from its previous rank (a barrier op) + msgs.clear(); + if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) { return -1; } - GGML_ASSERT(recv_msgs[0].to_string() == "should_profile"); - GGML_ASSERT(recv_msgs[1].size() == sizeof(bool)); - bool should_profile = *static_cast(recv_msgs[1].data()); - args->should_profile = should_profile; + GGML_ASSERT(msgs.size() == 1); + GGML_ASSERT(msgs[0].to_string() == "ACK"); + } else { + // non-master ranks receive startup args from their previous rank + if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) { + return -1; + } - GGML_ASSERT(recv_msgs[2].to_string() == "n_ctx"); - GGML_ASSERT(recv_msgs[3].size() == sizeof(uint32_t)); - uint32_t n_ctx = *static_cast(recv_msgs[3].data()); - args->n_ctx = n_ctx; + GGML_ASSERT(msgs[0].to_string() == "should_profile"); + GGML_ASSERT(msgs[1].size() == sizeof(bool)); + GGML_ASSERT(msgs[2].to_string() == "n_ctx"); + GGML_ASSERT(msgs[3].size() == sizeof(uint32_t)); - if ((int)rank != (int)n_world - 1){ - // send - try { - zmq::send_multipart(*ctx->send_socket, recv_msgs); - } catch (const zmq::error_t & e) { - LLAMA_LOG_INFO("Failed to send data: %s\n", e.what()); - return -1; + args->should_profile = *static_cast(msgs[1].data()); + args->n_ctx = *static_cast(msgs[3].data()); + + // non-master ranks forward the startup args to their next rank + try { + // last rank just sends an ack to pass the barrier + if ((int)rank == (int)n_world - 1) { + msgs.clear(); + msgs.emplace_back("ACK", strlen("ACK")); } + zmq::send_multipart(*ctx->send_socket, msgs); + } catch (const zmq::error_t & e) { + LLAMA_LOG_INFO("Failed to send data: %s\n", e.what()); + return -1; } } + return 0; } int llama_bcast_layer_setup(struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers) { uint32_t n_world = ctx->cparams.n_world; - if (n_world == 1) { - return 0; - } - + if (n_world == 1) return 0; GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr); - try { - std::vector send_msgs; - send_msgs.emplace_back("n_layer_window", strlen("n_layer_window")); - send_msgs.emplace_back(n_layer_window, sizeof(uint32_t) * 32); + std::vector msgs; + + // master rank sends its layer setup to its next rank + try { + msgs.emplace_back("n_layer_window", strlen("n_layer_window")); + msgs.emplace_back(n_layer_window, sizeof(uint32_t) * 32); if (n_gpu_layers != nullptr) { - send_msgs.emplace_back("n_gpu_layers", strlen("n_gpu_layers")); - send_msgs.emplace_back(n_gpu_layers, sizeof(uint32_t) * 32); + msgs.emplace_back("n_gpu_layers", strlen("n_gpu_layers")); + msgs.emplace_back(n_gpu_layers, sizeof(uint32_t) * 32); } - zmq::send_multipart(*ctx->send_socket, send_msgs); - } catch (const zmq::error_t& e) { + zmq::send_multipart(*ctx->send_socket, msgs); + } catch (const zmq::error_t & e) { LLAMA_LOG_INFO("Failed to send data: %s\n", e.what()); return -1; } + // master rank receives ack from its previous rank (a barrier op) + msgs.clear(); + if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) { + return -1; + } + + GGML_ASSERT(msgs.size() == 1); + GGML_ASSERT(msgs[0].to_string() == "ACK"); + return 0; } @@ -20663,7 +20681,7 @@ int llama_rebuild_topo(llama_context * ctx, TopoRebuildHelperInfo* topo_helper = new TopoRebuildHelperInfo[n_world]; if (dev_info_set == nullptr) { - // for rank != 0, recv all devices info + // for non-master ranks, receive devices info from their previous rank std::vector msgs; if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) { return -1; @@ -20824,34 +20842,33 @@ int llama_recv_layer_setup(struct llama_context * ctx, uint32_t * n_layer_window uint32_t n_world = ctx->cparams.n_world; uint32_t my_rank = ctx->cparams.rank; - std::vector recv_msgs; - while (true) { - recv_msgs.clear(); - if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(recv_msgs))) { - return -1; - } - if (!recv_msgs.empty() && recv_msgs[0].to_string() == "n_layer_window") { - break; - } + // non-master ranks receive data from their previous rank + std::vector msgs; + if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) { + return -1; } - GGML_ASSERT(recv_msgs[0].to_string() == "n_layer_window"); - GGML_ASSERT(recv_msgs[1].size() == sizeof(uint32_t) * 32); - memcpy(n_layer_window, recv_msgs[1].data(), sizeof(uint32_t) * 32); + GGML_ASSERT(msgs[0].to_string() == "n_layer_window"); + GGML_ASSERT(msgs[1].size() == sizeof(uint32_t) * 32); + memcpy(n_layer_window, msgs[1].data(), sizeof(uint32_t) * 32); - if (recv_msgs.size() > 2) { - GGML_ASSERT(recv_msgs[2].to_string() == "n_gpu_layers"); - GGML_ASSERT(recv_msgs[3].size() == sizeof(uint32_t) * 32); - memcpy(n_gpu_layers, recv_msgs[3].data(), sizeof(uint32_t) * 32); + if (msgs.size() > 2) { + GGML_ASSERT(msgs[2].to_string() == "n_gpu_layers"); + GGML_ASSERT(msgs[3].size() == sizeof(uint32_t) * 32); + memcpy(n_gpu_layers, msgs[3].data(), sizeof(uint32_t) * 32); } - if (my_rank != n_world - 1) { - try { - zmq::send_multipart(*ctx->send_socket, recv_msgs); - } catch (const zmq::error_t& e) { - LLAMA_LOG_INFO("Failed to send data: %s\n", e.what()); - return -1; + // non-master ranks forward the received message to their next rank + try { + if (my_rank == n_world - 1) { + // last rank just sends an ack to pass the barrier + msgs.clear(); + msgs.emplace_back("ACK", strlen("ACK")); } + zmq::send_multipart(*ctx->send_socket, msgs); + } catch (const zmq::error_t& e) { + LLAMA_LOG_INFO("Failed to send data: %s\n", e.what()); + return -1; } return 0; @@ -20863,9 +20880,7 @@ void llama_free_sockets(struct llama_context * ctx, char ** msg) { // to adapt to the new topology, use old next_rank const uint32_t next_rank = ctx->cparams.original_next_rank; - if (n_world == 1) { - return; - } + if (n_world == 1) return; zmq::socket_t signal_sender(*ctx->sock_context, zmq::socket_type::push); std::string endp = "tcp://" + ctx->next_node_ip + ":" + std::to_string(map_rank_to_port(next_rank, ctx->signal_port));