diff --git a/common/common.cpp b/common/common.cpp index 89707fb7..86b8fd2d 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -1522,6 +1522,61 @@ static bool assign_layers_to_device( return true; } +static bool tune_layer_allocation( + uint32_t n_world, + uint32_t my_rank, + std::vector dev_infos, + uint32_t * n_layer_window, + uint32_t * n_gpu_layers, + struct llama_model * model, + const struct llama_context_params cparams, + float min_disk_read_speed = 0.1f) { + memset(n_layer_window, 0, n_world * sizeof(uint32_t)); + memset(n_gpu_layers, 0, n_world * sizeof(uint32_t)); + std::vector dev_infos_temp = dev_infos; + std::vector n_layer_windows_temp; + std::vector n_gpu_layers_temp; + while(n_world > 0) { + std::vector dev_infos_ = dev_infos_temp; + std::vector n_layer_windows_(n_world, 0); + std::vector n_gpu_layers_(n_world, 0); + if (!assign_layers_to_device(n_world, my_rank, dev_infos_.data(), + n_layer_windows_.data(), n_gpu_layers_.data(), model, cparams)) { + return false; + } + dev_infos_temp.clear(); + n_layer_windows_temp.clear(); + n_gpu_layers_temp.clear(); + for(uint32_t i=0; i 1 || i==0 ) { + dev_infos_temp.push_back(dev_infos_[i]); + n_layer_windows_temp.push_back(n_layer_windows_[i]); + n_gpu_layers_temp.push_back(n_gpu_layers_[i]); + } + } + if(dev_infos_temp.size() == n_world) { + // no device be removed + break; + } + + n_world = dev_infos_temp.size(); + } + uint32_t i =0 , j =0; + while(j < n_world) { + if(dev_infos[i].rank == dev_infos_temp[j].rank){ + n_layer_window[i] = n_layer_windows_temp[j]; + n_gpu_layers[i] = n_gpu_layers_temp[j]; + j++; + i++; + } else { + n_layer_window[i] = 0; + n_gpu_layers[i] = 0; + i++; + } + } + return true; +} + // // Model utils // @@ -1625,6 +1680,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { // get device profile LOG_INF("\nstart profiling this device, this may take some seconds ...\n"); dev_info.rank = params.rank; + dev_info.next_ip = params.next_node_ip.c_str(); if (n_world > 1) { llama_profile_device(&dev_info, model, ml, params.gpu_mem, params.n_predict, params.n_ctx, params.cpuparams.n_threads, params.flash_attn); } @@ -1633,21 +1689,23 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { // sychronize device profile to the master node if (my_rank == 0) { if (auto_schedule) { - struct device_info * dev_info_set = nullptr; - dev_info_set = (struct device_info *)malloc(n_world * sizeof(struct device_info)); + std::vector dev_info_set(n_world); dev_info_set[0] = dev_info; - llama_gather_device_info(lctx, dev_info_set); - device_print_props(dev_info_set, n_world, model, cparams); + llama_gather_device_info(lctx, dev_info_set.data()); + device_print_props(dev_info_set.data(), n_world, model, cparams); // automatically determine n_layer_window and n_gpu_layers - if (!assign_layers_to_device(n_world, my_rank, dev_info_set, n_layer_window, n_gpu_layers, model, cparams)) { + if (!tune_layer_allocation(n_world, my_rank, dev_info_set, n_layer_window, n_gpu_layers, model, cparams)) { LOG_ERR("%s: Invalid allocation by HiGHS solver\n", __func__); llama_free(lctx); llama_free_model(model); return iparams; } llama_bcast_layer_setup(lctx, n_layer_window, n_gpu_layers); + + //rebuild topo + llama_rebuild_topo(lctx, n_layer_window, dev_info_set.data()); } else { // use the user-defined n_layer_window std::copy(std::begin(params.n_layer_window), std::end(params.n_layer_window), n_layer_window); @@ -1656,9 +1714,51 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { } else { if (auto_schedule){ llama_send_device_info(lctx, &dev_info); + llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers); + // rebuild topo + llama_rebuild_topo(lctx,n_layer_window, nullptr); + }else{ + llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers); } - llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers); } + if(n_layer_window[my_rank]<=0){ + LOG_INF("%s: info: rank %d has no layers to run, skipping\n", __func__, my_rank); + llama_free(lctx); + llama_free_model(model); + exit(0); + } + + //update rank and n_world for consistency + uint32_t update_rank = 0; + uint32_t update_n_world = 1; + std::vector n_layer_window_temp = {n_layer_window[0]}; + std::vector n_gpu_layers_temp = {n_gpu_layers[0]}; + for(uint32_t i=1; idevice_name) + 1; size_t device_os_len = strlen(dev_info->device_os) + 1; + size_t next_ip_len = strlen(dev_info->next_ip) + 1; size_t cpu_name_len = strlen(dev_info->cpu_props.name) + 1; size_t cpu_description_len = strlen(dev_info->cpu_props.description) + 1; size_t gpu_name_len = strlen(dev_info->gpu_props.name) + 1; size_t gpu_description_len = strlen(dev_info->gpu_props.description) + 1; size_t total_size = sizeof(uint32_t) - + sizeof(size_t) * 6 // for lengths of strings + + sizeof(size_t) * 7 // for lengths of strings + device_name_len + device_os_len + + next_ip_len + cpu_name_len + cpu_description_len + gpu_name_len @@ -2425,6 +2427,11 @@ size_t serialize(const struct device_info * dev_info, char ** buffer) { memcpy(ptr, dev_info->device_os, device_os_len); ptr += device_os_len; + memcpy(ptr, &next_ip_len, sizeof(size_t)); + ptr += sizeof(size_t); + memcpy(ptr, dev_info->next_ip, next_ip_len); + ptr += next_ip_len; + memcpy(ptr, &cpu_name_len, sizeof(size_t)); ptr += sizeof(size_t); memcpy(ptr, dev_info->cpu_props.name, cpu_name_len); @@ -2610,6 +2617,14 @@ void deserialize(const char * buffer, struct device_info * dev_info) { memcpy(const_cast(static_cast(dev_info->device_os)), ptr, device_os_len); ptr += device_os_len; + // next ip + size_t next_ip_len; + memcpy(&next_ip_len, ptr, sizeof(size_t)); + ptr += sizeof(size_t); + dev_info->next_ip = (char *)malloc(next_ip_len); + memcpy(const_cast(static_cast(dev_info->next_ip)), ptr, next_ip_len); + ptr += next_ip_len; + // cpu_props.name size_t cpu_name_len; memcpy(&cpu_name_len, ptr, sizeof(size_t)); diff --git a/common/profiler.h b/common/profiler.h index 2f3a20a6..06741d6c 100644 --- a/common/profiler.h +++ b/common/profiler.h @@ -321,6 +321,7 @@ struct device_info { uint32_t rank; const char * device_name; const char * device_os; + const char * next_ip; struct disk_props disk; struct cpu_props cpu_props; struct memory_info memory; @@ -334,6 +335,7 @@ struct device_info { rank(0), device_name(""), device_os(""), + next_ip(""), disk(), cpu_props(), memory(), diff --git a/examples/main/main.cpp b/examples/main/main.cpp index ccff70f2..622bc94f 100644 --- a/examples/main/main.cpp +++ b/examples/main/main.cpp @@ -143,8 +143,8 @@ int main(int argc, char ** argv) { return 1; } - const uint32_t n_world = params.n_world; - const uint32_t my_rank = params.rank; + uint32_t n_world = params.n_world; + uint32_t my_rank = params.rank; GGML_ASSERT(!(n_world == 1 && my_rank > 0)); // check if --n-layer-window and --world is matched @@ -200,6 +200,9 @@ int main(int argc, char ** argv) { // load the model and apply lora adapter, if any LOG_INF("%s: load the model and apply lora adapter, if any\n", __func__); llama_init_result llama_init = llama_init_from_gpt_params(params); + // update + my_rank = params.rank; + n_world = params.n_world; model = llama_init.model; ctx = llama_init.context; diff --git a/include/llama.h b/include/llama.h index 05b99624..a39a7bb2 100644 --- a/include/llama.h +++ b/include/llama.h @@ -455,13 +455,18 @@ extern "C" { LLAMA_API int llama_send_device_info (struct llama_context * ctx, struct device_info * dev_info); LLAMA_API int llama_bcast_startup_args(struct llama_context * ctx, uint32_t rank, struct startup_args * args); LLAMA_API int llama_bcast_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers); + LLAMA_API int llama_rebuild_topo (struct llama_context * ctx, uint32_t * n_layer_window, struct device_info * dev_info_set); LLAMA_API int llama_recv_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers); LLAMA_API int llm_load_tensors( struct llama_model_loader * ml, struct llama_model * model, struct llama_model_params params); - + + LLAMA_API void llama_update_context_with_rankworld(struct llama_context * ctx, + uint32_t rank, + uint32_t n_world); + LLAMA_API struct llama_context * llama_new_context_with_model( struct llama_model * model, struct llama_context_params params); diff --git a/src/llama.cpp b/src/llama.cpp index cd5a95b1..4ee01700 100644 --- a/src/llama.cpp +++ b/src/llama.cpp @@ -172,6 +172,19 @@ static void zeros(std::ofstream & file, size_t n) { } } +// zmq helpers +static std::vector dev_infos_to_messages(const device_info* infos, + uint32_t n_world){ + std::vector res; + for (uint32_t i = 0; i < n_world; ++i) { + char * buffer = nullptr; + size_t buffer_size = serialize(&infos[i], &buffer); + res.emplace_back(buffer, buffer_size); + free(buffer); + } + return res; +} + LLAMA_ATTRIBUTE_FORMAT(1, 2) static std::string format(const char * fmt, ...) { va_list ap; @@ -2583,6 +2596,7 @@ static_assert(std::is_trivially_copyable::value, "llama_hparams m struct llama_cparams { uint32_t n_world; uint32_t rank; + uint32_t original_next_rank; // original rank of the next node uint32_t n_layer_window[32]; bool prefetch; bool force; @@ -20511,6 +20525,95 @@ int llama_bcast_layer_setup(struct llama_context * ctx, uint32_t * n_layer_windo return 0; } +LLAMA_API int llama_rebuild_topo(llama_context *ctx, + uint32_t *n_layer_window, + device_info *dev_info_set) { + uint32_t n_world = ctx->cparams.n_world; + uint32_t my_rank = ctx->cparams.rank; + device_info* dev_info_ptr = nullptr; + if (dev_info_set == nullptr){ + // for rank!=0, recv all devices info + std::vector msgs; + if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) { + return -1; + } + dev_info_ptr = new device_info[n_world]; + for (size_t i = 0; i < msgs.size(); i++) { + deserialize((const char *)msgs[i].data(), &dev_info_ptr[i]); + } + GGML_ASSERT(msgs.size() == n_world); + }else{ + dev_info_ptr = dev_info_set; + } + + GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr); + + // notify next rank + auto next_rank = (my_rank + 1) % n_world; + if(n_layer_window[next_rank] <= 0 && next_rank != 0){ + try { + auto msgs = dev_infos_to_messages(dev_info_ptr, n_world); + ctx->send_socket->set(zmq::sockopt::linger, 3500); + zmq::send_multipart(*ctx->send_socket, msgs); + } catch (const zmq::error_t& e) { + LLAMA_LOG_INFO("Failed to send data: %s\n", e.what()); + if(!dev_info_set){ + delete[] dev_info_ptr; + } + return -1; + } + } + + // check myself's layer + zmq::socket_t* socket_to_close = nullptr; + if(n_layer_window[my_rank] > 0) { + // reconstruct socket to the next valid rank + std::string next_ip; + auto current_rank = my_rank; + while(next_rank!=my_rank){ + if(n_layer_window[next_rank] > 0){ + next_ip = dev_info_ptr[current_rank].next_ip; + break; + } + next_rank = (next_rank + 1) % n_world; + current_rank = (current_rank + 1) % n_world; + } + if(!next_ip.empty()){ + if((my_rank+1)%n_world != next_rank){ + socket_to_close = ctx->send_socket; + ctx->send_socket = new zmq::socket_t(*ctx->sock_context, zmq::socket_type::push); + std::string send_endp = "tcp://" + next_ip + ":" + std::to_string(map_rank_to_port(next_rank, ctx->data_port)); + ctx->send_socket->connect(send_endp); + ctx->next_node_ip = next_ip; + ctx->cparams.original_next_rank = next_rank; + } + if(next_rank != 0){ + try { + auto msgs = dev_infos_to_messages(dev_info_ptr, n_world); + zmq::send_multipart(*ctx->send_socket, msgs); + } catch (const zmq::error_t &e) { + LLAMA_LOG_INFO("Error binding/connecting recv socket to endpoint: %s", e.what()); + if(!dev_info_set){ + delete[] dev_info_ptr; + } + return -1; + } + } + }else{ + // only one node + ctx->next_node_ip = ""; + } + } + if(!dev_info_set){ + delete[] dev_info_ptr; + } + if(socket_to_close != nullptr){ + socket_to_close->close(); + delete socket_to_close; + } + return 0; +} + int llama_recv_layer_setup(struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers) { uint32_t n_world = ctx->cparams.n_world; uint32_t my_rank = ctx->cparams.rank; @@ -20545,7 +20648,8 @@ int llama_recv_layer_setup(struct llama_context * ctx, uint32_t * n_layer_window void llama_free_sockets(struct llama_context * ctx, char ** msg) { const uint32_t n_world = ctx->cparams.n_world; const uint32_t my_rank = ctx->cparams.rank; - const uint32_t next_rank = (my_rank + 1) % n_world; + // to adapt to the new topology, use old next_rank + const uint32_t next_rank = ctx->cparams.original_next_rank; if (n_world == 1) { return; @@ -20571,6 +20675,15 @@ void llama_free_sockets(struct llama_context * ctx, char ** msg) { } } +void llama_update_context_with_rankworld(struct llama_context * ctx, + uint32_t rank, + uint32_t n_world) { + if(ctx) { + ctx->cparams.rank = rank; + ctx->cparams.n_world = n_world; + } +} + struct llama_context * llama_new_context_with_model( struct llama_model * model, struct llama_context_params params) { @@ -20587,6 +20700,7 @@ struct llama_context * llama_new_context_with_model( ctx->cparams.n_world = params.n_world; ctx->cparams.rank = params.rank; ctx->cparams.force = params.force; + ctx->cparams.original_next_rank = (params.rank + 1) % params.n_world; return ctx; }