communication: use barrier instead of manually adding delay

This commit is contained in:
Li, Zonghang 2025-06-26 17:30:47 +04:00
parent 3f27a25340
commit a05022c05a
2 changed files with 95 additions and 86 deletions

View file

@ -1788,7 +1788,6 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
return iparams;
}
llama_bcast_layer_setup(lctx, n_layer_window, n_gpu_layers);
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // add a delay to avoid packet interleaving
llama_rebuild_topo (lctx, n_layer_window, dev_info_set.data(), &node_type, is_forwarder);
} else {
// use the user-defined n_layer_window
@ -1799,7 +1798,6 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
if (auto_schedule){
llama_send_device_info (lctx, &dev_info);
llama_recv_layer_setup (lctx, n_layer_window, n_gpu_layers);
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // add a delay to avoid packet interleaving
llama_rebuild_topo (lctx, n_layer_window, nullptr, &node_type, is_forwarder);
} else {
llama_recv_layer_setup (lctx, n_layer_window, n_gpu_layers);
@ -1823,17 +1821,13 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
if (n_layer_window[i] <= 0 && is_forwarder[i] == 0) {
continue;
}
if (i <= my_rank) {
update_rank++;
}
if (i <= my_rank) update_rank++;
update_n_world++;
n_layer_window_temp.push_back(n_layer_window[i]);
n_gpu_layers_temp.push_back(n_gpu_layers[i]);
if (n_layer_window[i] > 0) {
if (i <= my_rank) {
worker_rank++;
}
if (i <= my_rank) worker_rank++;
n_worker++;
}
}

View file

@ -20518,12 +20518,11 @@ void llama_init_sockets(struct llama_context * ctx, uint32_t n_world, uint32_t m
int llama_gather_device_info(struct llama_context * ctx, struct device_info * dev_info_set) {
uint32_t n_world = ctx->cparams.n_world;
if (n_world == 1) {
return 0;
}
if (n_world == 1) return 0;
GGML_ASSERT(dev_info_set != nullptr);
GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr);
// master rank sends its device info to its next rank
try {
char * buffer = nullptr;
size_t buffer_size = serialize(&dev_info_set[0], &buffer);
@ -20538,6 +20537,7 @@ int llama_gather_device_info(struct llama_context * ctx, struct device_info * de
return -1;
}
// master rank receives aggregated device info from its previous rank (a barrier op)
std::vector<zmq::message_t> recv_msgs;
if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(recv_msgs))) {
return -1;
@ -20551,6 +20551,7 @@ int llama_gather_device_info(struct llama_context * ctx, struct device_info * de
}
int llama_send_device_info(struct llama_context * ctx, struct device_info * dev_info) {
// non-master ranks receive device info from their previous rank
std::vector<zmq::message_t> recv_msgs;
if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(recv_msgs))) {
return -1;
@ -20559,6 +20560,7 @@ int llama_send_device_info(struct llama_context * ctx, struct device_info * dev_
GGML_ASSERT(dev_info != nullptr);
GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr);
// non-master ranks aggregate their device info and forward it to their next rank
try {
char * buffer = nullptr;
size_t buffer_size = serialize(dev_info, &buffer);
@ -20577,79 +20579,95 @@ int llama_send_device_info(struct llama_context * ctx, struct device_info * dev_
int llama_bcast_startup_args(llama_context * ctx, uint32_t rank, startup_args * args) {
int32_t n_world = ctx->cparams.n_world;
GGML_ASSERT(n_world > 0);
if (n_world == 1) return 0;
GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr);
std::vector<zmq::message_t> msgs;
if (rank == 0){
// send
// master rank sends its startup args to its next rank
try {
std::vector<zmq::message_t> send_msgs;
send_msgs.emplace_back("should_profile", strlen("should_profile"));
send_msgs.emplace_back(&args->should_profile, sizeof(args->should_profile));
send_msgs.emplace_back("n_ctx", strlen("n_ctx"));
send_msgs.emplace_back(&args->n_ctx, sizeof(args->n_ctx));
zmq::send_multipart(*ctx->send_socket, send_msgs);
msgs.emplace_back("should_profile", strlen("should_profile"));
msgs.emplace_back(&args->should_profile, sizeof(args->should_profile));
msgs.emplace_back("n_ctx", strlen("n_ctx"));
msgs.emplace_back(&args->n_ctx, sizeof(args->n_ctx));
zmq::send_multipart(*ctx->send_socket, msgs);
} catch (const zmq::error_t& e) {
LLAMA_LOG_INFO("Failed to send data: %s\n", e.what());
return -1;
}
// master rank receives ack from its previous rank (a barrier op)
msgs.clear();
if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) {
return -1;
}
GGML_ASSERT(msgs.size() == 1);
GGML_ASSERT(msgs[0].to_string() == "ACK");
} else {
// receive
std::vector<zmq::message_t> recv_msgs;
if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(recv_msgs))) {
// non-master ranks receive startup args from their previous rank
if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) {
return -1;
}
GGML_ASSERT(recv_msgs[0].to_string() == "should_profile");
GGML_ASSERT(recv_msgs[1].size() == sizeof(bool));
bool should_profile = *static_cast<bool*>(recv_msgs[1].data());
args->should_profile = should_profile;
GGML_ASSERT(msgs[0].to_string() == "should_profile");
GGML_ASSERT(msgs[1].size() == sizeof(bool));
GGML_ASSERT(msgs[2].to_string() == "n_ctx");
GGML_ASSERT(msgs[3].size() == sizeof(uint32_t));
GGML_ASSERT(recv_msgs[2].to_string() == "n_ctx");
GGML_ASSERT(recv_msgs[3].size() == sizeof(uint32_t));
uint32_t n_ctx = *static_cast<uint32_t*>(recv_msgs[3].data());
args->n_ctx = n_ctx;
args->should_profile = *static_cast<bool*>(msgs[1].data());
args->n_ctx = *static_cast<uint32_t*>(msgs[3].data());
if ((int)rank != (int)n_world - 1){
// send
// non-master ranks forward the startup args to their next rank
try {
zmq::send_multipart(*ctx->send_socket, recv_msgs);
// last rank just sends an ack to pass the barrier
if ((int)rank == (int)n_world - 1) {
msgs.clear();
msgs.emplace_back("ACK", strlen("ACK"));
}
zmq::send_multipart(*ctx->send_socket, msgs);
} catch (const zmq::error_t & e) {
LLAMA_LOG_INFO("Failed to send data: %s\n", e.what());
return -1;
}
}
}
return 0;
}
int llama_bcast_layer_setup(struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers) {
uint32_t n_world = ctx->cparams.n_world;
if (n_world == 1) {
return 0;
}
if (n_world == 1) return 0;
GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr);
try {
std::vector<zmq::message_t> send_msgs;
send_msgs.emplace_back("n_layer_window", strlen("n_layer_window"));
send_msgs.emplace_back(n_layer_window, sizeof(uint32_t) * 32);
std::vector<zmq::message_t> msgs;
// master rank sends its layer setup to its next rank
try {
msgs.emplace_back("n_layer_window", strlen("n_layer_window"));
msgs.emplace_back(n_layer_window, sizeof(uint32_t) * 32);
if (n_gpu_layers != nullptr) {
send_msgs.emplace_back("n_gpu_layers", strlen("n_gpu_layers"));
send_msgs.emplace_back(n_gpu_layers, sizeof(uint32_t) * 32);
msgs.emplace_back("n_gpu_layers", strlen("n_gpu_layers"));
msgs.emplace_back(n_gpu_layers, sizeof(uint32_t) * 32);
}
zmq::send_multipart(*ctx->send_socket, send_msgs);
zmq::send_multipart(*ctx->send_socket, msgs);
} catch (const zmq::error_t & e) {
LLAMA_LOG_INFO("Failed to send data: %s\n", e.what());
return -1;
}
// master rank receives ack from its previous rank (a barrier op)
msgs.clear();
if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) {
return -1;
}
GGML_ASSERT(msgs.size() == 1);
GGML_ASSERT(msgs[0].to_string() == "ACK");
return 0;
}
@ -20663,7 +20681,7 @@ int llama_rebuild_topo(llama_context * ctx,
TopoRebuildHelperInfo* topo_helper = new TopoRebuildHelperInfo[n_world];
if (dev_info_set == nullptr) {
// for rank != 0, recv all devices info
// for non-master ranks, receive devices info from their previous rank
std::vector<zmq::message_t> msgs;
if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) {
return -1;
@ -20824,35 +20842,34 @@ int llama_recv_layer_setup(struct llama_context * ctx, uint32_t * n_layer_window
uint32_t n_world = ctx->cparams.n_world;
uint32_t my_rank = ctx->cparams.rank;
std::vector<zmq::message_t> recv_msgs;
while (true) {
recv_msgs.clear();
if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(recv_msgs))) {
// non-master ranks receive data from their previous rank
std::vector<zmq::message_t> msgs;
if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) {
return -1;
}
if (!recv_msgs.empty() && recv_msgs[0].to_string() == "n_layer_window") {
break;
}
GGML_ASSERT(msgs[0].to_string() == "n_layer_window");
GGML_ASSERT(msgs[1].size() == sizeof(uint32_t) * 32);
memcpy(n_layer_window, msgs[1].data(), sizeof(uint32_t) * 32);
if (msgs.size() > 2) {
GGML_ASSERT(msgs[2].to_string() == "n_gpu_layers");
GGML_ASSERT(msgs[3].size() == sizeof(uint32_t) * 32);
memcpy(n_gpu_layers, msgs[3].data(), sizeof(uint32_t) * 32);
}
GGML_ASSERT(recv_msgs[0].to_string() == "n_layer_window");
GGML_ASSERT(recv_msgs[1].size() == sizeof(uint32_t) * 32);
memcpy(n_layer_window, recv_msgs[1].data(), sizeof(uint32_t) * 32);
if (recv_msgs.size() > 2) {
GGML_ASSERT(recv_msgs[2].to_string() == "n_gpu_layers");
GGML_ASSERT(recv_msgs[3].size() == sizeof(uint32_t) * 32);
memcpy(n_gpu_layers, recv_msgs[3].data(), sizeof(uint32_t) * 32);
}
if (my_rank != n_world - 1) {
// non-master ranks forward the received message to their next rank
try {
zmq::send_multipart(*ctx->send_socket, recv_msgs);
if (my_rank == n_world - 1) {
// last rank just sends an ack to pass the barrier
msgs.clear();
msgs.emplace_back("ACK", strlen("ACK"));
}
zmq::send_multipart(*ctx->send_socket, msgs);
} catch (const zmq::error_t& e) {
LLAMA_LOG_INFO("Failed to send data: %s\n", e.what());
return -1;
}
}
return 0;
}
@ -20863,9 +20880,7 @@ void llama_free_sockets(struct llama_context * ctx, char ** msg) {
// to adapt to the new topology, use old next_rank
const uint32_t next_rank = ctx->cparams.original_next_rank;
if (n_world == 1) {
return;
}
if (n_world == 1) return;
zmq::socket_t signal_sender(*ctx->sock_context, zmq::socket_type::push);
std::string endp = "tcp://" + ctx->next_node_ip + ":" + std::to_string(map_rank_to_port(next_rank, ctx->signal_port));