fix: improve distributed sync robustness with retry mechanism and longer timeout

This commit is contained in:
leeetao 2025-07-20 08:12:21 +00:00
parent 663ad2896d
commit 749934a5e0
2 changed files with 28 additions and 4 deletions

View file

@ -509,13 +509,14 @@ static results_perplexity perplexity(llama_context * ctx, const gpt_params & par
if (my_rank == 0) {
auto tim1 = std::chrono::high_resolution_clock::now();
LOG_INF("%s: rank %d tokenizing the input ..\n", __func__, my_rank);
LOG_INF("%s: rank %d tokenizing the input (this may take 1-2 seconds)...\n", __func__, my_rank);
tokens = ::llama_tokenize(ctx, params.prompt, true);
tokens_size = tokens.size();
auto tim2 = std::chrono::high_resolution_clock::now();
LOG_INF("%s: rank %d tokenization took %g ms\n", __func__, my_rank, 1e-3*std::chrono::duration_cast<std::chrono::microseconds>(tim2-tim1).count());
LOG_INF("%s: rank %d tokenization completed in %g ms, tokens_size = %zu\n", __func__, my_rank,
1e-3*std::chrono::duration_cast<std::chrono::microseconds>(tim2-tim1).count(), tokens_size);
}
if (my_rank != 0) {
@ -526,9 +527,32 @@ static results_perplexity perplexity(llama_context * ctx, const gpt_params & par
sync_meta meta;
if (my_rank != 0) {
if (llama_recv_meta(ctx, &meta) == -1) {
// Initial synchronization with extended retry mechanism for tokenization wait
int retry_count = 0;
const int max_retries = 10; // More retries for initialization
bool recv_success = false;
LOG_INF("rank %d: waiting for initial sync (tokenization may take time)...\n", my_rank);
while (retry_count < max_retries && !recv_success) {
if (llama_recv_meta(ctx, &meta) == 0) {
recv_success = true;
LOG_INF("rank %d: successfully received initial sync on attempt %d\n", my_rank, retry_count + 1);
} else {
retry_count++;
LOG_WRN("rank %d: failed to recv initial sync, retry %d/%d (tokenization may still be in progress)\n",
my_rank, retry_count, max_retries);
// Longer delay for initialization phase
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}
}
if (!recv_success) {
LOG_ERR("rank %d: failed to recv initial sync after %d retries (total wait time: %d seconds)\n",
my_rank, max_retries, max_retries * 2);
return { {}, -1.0, {}, {} };
}
tokens_size = meta.tokens_size;
n_chunks = meta.n_chunks;
}

View file

@ -18001,7 +18001,7 @@ void llama_send_meta(llama_context * ctx, struct sync_meta * meta) {
int llama_recv_meta(llama_context * ctx, struct sync_meta * meta) {
zmq::socket_t * recv_socket = ctx->recv_socket;
GGML_ASSERT(recv_socket != nullptr);
recv_socket->set(zmq::sockopt::rcvtimeo, 5000); // Increase timeout to 5 seconds
recv_socket->set(zmq::sockopt::rcvtimeo, 10000); // Increase timeout to 10 seconds for better robustness
std::vector<zmq::message_t> recv_msgs;