mirror of
https://github.com/Lizonghang/prima.cpp.git
synced 2025-09-06 14:19:02 +00:00
Merge pull request #19 from yezhizi/feat/auto-exit
feat: ranks with only 1 layer auto-exit and rebuild topology
This commit is contained in:
commit
e25c739ecf
6 changed files with 250 additions and 11 deletions
|
@ -1522,6 +1522,61 @@ static bool assign_layers_to_device(
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool tune_layer_allocation(
|
||||
uint32_t n_world,
|
||||
uint32_t my_rank,
|
||||
std::vector<device_info> dev_infos,
|
||||
uint32_t * n_layer_window,
|
||||
uint32_t * n_gpu_layers,
|
||||
struct llama_model * model,
|
||||
const struct llama_context_params cparams,
|
||||
float min_disk_read_speed = 0.1f) {
|
||||
memset(n_layer_window, 0, n_world * sizeof(uint32_t));
|
||||
memset(n_gpu_layers, 0, n_world * sizeof(uint32_t));
|
||||
std::vector<device_info> dev_infos_temp = dev_infos;
|
||||
std::vector<uint32_t> n_layer_windows_temp;
|
||||
std::vector<uint32_t> n_gpu_layers_temp;
|
||||
while(n_world > 0) {
|
||||
std::vector<device_info> dev_infos_ = dev_infos_temp;
|
||||
std::vector<uint32_t> n_layer_windows_(n_world, 0);
|
||||
std::vector<uint32_t> n_gpu_layers_(n_world, 0);
|
||||
if (!assign_layers_to_device(n_world, my_rank, dev_infos_.data(),
|
||||
n_layer_windows_.data(), n_gpu_layers_.data(), model, cparams)) {
|
||||
return false;
|
||||
}
|
||||
dev_infos_temp.clear();
|
||||
n_layer_windows_temp.clear();
|
||||
n_gpu_layers_temp.clear();
|
||||
for(uint32_t i=0; i<n_world; i++) {
|
||||
if (n_layer_windows_[i] > 1 || i==0 ) {
|
||||
dev_infos_temp.push_back(dev_infos_[i]);
|
||||
n_layer_windows_temp.push_back(n_layer_windows_[i]);
|
||||
n_gpu_layers_temp.push_back(n_gpu_layers_[i]);
|
||||
}
|
||||
}
|
||||
if(dev_infos_temp.size() == n_world) {
|
||||
// no device be removed
|
||||
break;
|
||||
}
|
||||
|
||||
n_world = dev_infos_temp.size();
|
||||
}
|
||||
uint32_t i =0 , j =0;
|
||||
while(j < n_world) {
|
||||
if(dev_infos[i].rank == dev_infos_temp[j].rank){
|
||||
n_layer_window[i] = n_layer_windows_temp[j];
|
||||
n_gpu_layers[i] = n_gpu_layers_temp[j];
|
||||
j++;
|
||||
i++;
|
||||
} else {
|
||||
n_layer_window[i] = 0;
|
||||
n_gpu_layers[i] = 0;
|
||||
i++;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
//
|
||||
// Model utils
|
||||
//
|
||||
|
@ -1625,6 +1680,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
|
|||
// get device profile
|
||||
LOG_INF("\nstart profiling this device, this may take some seconds ...\n");
|
||||
dev_info.rank = params.rank;
|
||||
dev_info.next_ip = params.next_node_ip.c_str();
|
||||
if (n_world > 1) {
|
||||
llama_profile_device(&dev_info, model, ml, params.gpu_mem, params.n_predict, params.n_ctx, params.cpuparams.n_threads, params.flash_attn);
|
||||
}
|
||||
|
@ -1633,21 +1689,23 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
|
|||
// sychronize device profile to the master node
|
||||
if (my_rank == 0) {
|
||||
if (auto_schedule) {
|
||||
struct device_info * dev_info_set = nullptr;
|
||||
dev_info_set = (struct device_info *)malloc(n_world * sizeof(struct device_info));
|
||||
std::vector<device_info> dev_info_set(n_world);
|
||||
dev_info_set[0] = dev_info;
|
||||
|
||||
llama_gather_device_info(lctx, dev_info_set);
|
||||
device_print_props(dev_info_set, n_world, model, cparams);
|
||||
llama_gather_device_info(lctx, dev_info_set.data());
|
||||
device_print_props(dev_info_set.data(), n_world, model, cparams);
|
||||
|
||||
// automatically determine n_layer_window and n_gpu_layers
|
||||
if (!assign_layers_to_device(n_world, my_rank, dev_info_set, n_layer_window, n_gpu_layers, model, cparams)) {
|
||||
if (!tune_layer_allocation(n_world, my_rank, dev_info_set, n_layer_window, n_gpu_layers, model, cparams)) {
|
||||
LOG_ERR("%s: Invalid allocation by HiGHS solver\n", __func__);
|
||||
llama_free(lctx);
|
||||
llama_free_model(model);
|
||||
return iparams;
|
||||
}
|
||||
llama_bcast_layer_setup(lctx, n_layer_window, n_gpu_layers);
|
||||
|
||||
//rebuild topo
|
||||
llama_rebuild_topo(lctx, n_layer_window, dev_info_set.data());
|
||||
} else {
|
||||
// use the user-defined n_layer_window
|
||||
std::copy(std::begin(params.n_layer_window), std::end(params.n_layer_window), n_layer_window);
|
||||
|
@ -1656,9 +1714,51 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
|
|||
} else {
|
||||
if (auto_schedule){
|
||||
llama_send_device_info(lctx, &dev_info);
|
||||
}
|
||||
llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers);
|
||||
// rebuild topo
|
||||
llama_rebuild_topo(lctx,n_layer_window, nullptr);
|
||||
}else{
|
||||
llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers);
|
||||
}
|
||||
}
|
||||
if(n_layer_window[my_rank]<=0){
|
||||
LOG_INF("%s: info: rank %d has no layers to run, skipping\n", __func__, my_rank);
|
||||
llama_free(lctx);
|
||||
llama_free_model(model);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
//update rank and n_world for consistency
|
||||
uint32_t update_rank = 0;
|
||||
uint32_t update_n_world = 1;
|
||||
std::vector<uint32_t> n_layer_window_temp = {n_layer_window[0]};
|
||||
std::vector<uint32_t> n_gpu_layers_temp = {n_gpu_layers[0]};
|
||||
for(uint32_t i=1; i<n_world; i++) {
|
||||
if(n_layer_window[i] <= 0 ){
|
||||
continue;
|
||||
}
|
||||
if(i <= my_rank){
|
||||
update_rank++;
|
||||
}
|
||||
update_n_world++;
|
||||
n_layer_window_temp.push_back(n_layer_window[i]);
|
||||
n_gpu_layers_temp.push_back(n_gpu_layers[i]);
|
||||
}
|
||||
memset(n_layer_window, 0, n_world * sizeof(uint32_t));
|
||||
memset(n_gpu_layers, 0, n_world * sizeof(uint32_t));
|
||||
for (uint32_t i=0; i<update_n_world; i++) {
|
||||
n_layer_window[i] = n_layer_window_temp[i];
|
||||
n_gpu_layers[i] = n_gpu_layers_temp[i];
|
||||
}
|
||||
llama_update_context_with_rankworld(lctx, update_rank, update_n_world);
|
||||
cparams.rank = update_rank;
|
||||
cparams.n_world = update_n_world;
|
||||
mparams.rank = update_rank;
|
||||
mparams.n_world = update_n_world;
|
||||
params.rank = update_rank;
|
||||
params.n_world = update_n_world;
|
||||
my_rank = update_rank;
|
||||
n_world = update_n_world;
|
||||
|
||||
// update n_layer_window and n_gpu_layers
|
||||
std::copy(std::begin(n_layer_window), std::end(n_layer_window), params.n_layer_window);
|
||||
|
|
|
@ -2357,15 +2357,17 @@ size_t serialize(const struct device_info * dev_info, char ** buffer) {
|
|||
// calculate total size for serialized buffer
|
||||
size_t device_name_len = strlen(dev_info->device_name) + 1;
|
||||
size_t device_os_len = strlen(dev_info->device_os) + 1;
|
||||
size_t next_ip_len = strlen(dev_info->next_ip) + 1;
|
||||
size_t cpu_name_len = strlen(dev_info->cpu_props.name) + 1;
|
||||
size_t cpu_description_len = strlen(dev_info->cpu_props.description) + 1;
|
||||
size_t gpu_name_len = strlen(dev_info->gpu_props.name) + 1;
|
||||
size_t gpu_description_len = strlen(dev_info->gpu_props.description) + 1;
|
||||
|
||||
size_t total_size = sizeof(uint32_t)
|
||||
+ sizeof(size_t) * 6 // for lengths of strings
|
||||
+ sizeof(size_t) * 7 // for lengths of strings
|
||||
+ device_name_len
|
||||
+ device_os_len
|
||||
+ next_ip_len
|
||||
+ cpu_name_len
|
||||
+ cpu_description_len
|
||||
+ gpu_name_len
|
||||
|
@ -2425,6 +2427,11 @@ size_t serialize(const struct device_info * dev_info, char ** buffer) {
|
|||
memcpy(ptr, dev_info->device_os, device_os_len);
|
||||
ptr += device_os_len;
|
||||
|
||||
memcpy(ptr, &next_ip_len, sizeof(size_t));
|
||||
ptr += sizeof(size_t);
|
||||
memcpy(ptr, dev_info->next_ip, next_ip_len);
|
||||
ptr += next_ip_len;
|
||||
|
||||
memcpy(ptr, &cpu_name_len, sizeof(size_t));
|
||||
ptr += sizeof(size_t);
|
||||
memcpy(ptr, dev_info->cpu_props.name, cpu_name_len);
|
||||
|
@ -2610,6 +2617,14 @@ void deserialize(const char * buffer, struct device_info * dev_info) {
|
|||
memcpy(const_cast<void*>(static_cast<const void*>(dev_info->device_os)), ptr, device_os_len);
|
||||
ptr += device_os_len;
|
||||
|
||||
// next ip
|
||||
size_t next_ip_len;
|
||||
memcpy(&next_ip_len, ptr, sizeof(size_t));
|
||||
ptr += sizeof(size_t);
|
||||
dev_info->next_ip = (char *)malloc(next_ip_len);
|
||||
memcpy(const_cast<void*>(static_cast<const void*>(dev_info->next_ip)), ptr, next_ip_len);
|
||||
ptr += next_ip_len;
|
||||
|
||||
// cpu_props.name
|
||||
size_t cpu_name_len;
|
||||
memcpy(&cpu_name_len, ptr, sizeof(size_t));
|
||||
|
|
|
@ -321,6 +321,7 @@ struct device_info {
|
|||
uint32_t rank;
|
||||
const char * device_name;
|
||||
const char * device_os;
|
||||
const char * next_ip;
|
||||
struct disk_props disk;
|
||||
struct cpu_props cpu_props;
|
||||
struct memory_info memory;
|
||||
|
@ -334,6 +335,7 @@ struct device_info {
|
|||
rank(0),
|
||||
device_name(""),
|
||||
device_os(""),
|
||||
next_ip(""),
|
||||
disk(),
|
||||
cpu_props(),
|
||||
memory(),
|
||||
|
|
|
@ -143,8 +143,8 @@ int main(int argc, char ** argv) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
const uint32_t n_world = params.n_world;
|
||||
const uint32_t my_rank = params.rank;
|
||||
uint32_t n_world = params.n_world;
|
||||
uint32_t my_rank = params.rank;
|
||||
GGML_ASSERT(!(n_world == 1 && my_rank > 0));
|
||||
|
||||
// check if --n-layer-window and --world is matched
|
||||
|
@ -200,6 +200,9 @@ int main(int argc, char ** argv) {
|
|||
// load the model and apply lora adapter, if any
|
||||
LOG_INF("%s: load the model and apply lora adapter, if any\n", __func__);
|
||||
llama_init_result llama_init = llama_init_from_gpt_params(params);
|
||||
// update
|
||||
my_rank = params.rank;
|
||||
n_world = params.n_world;
|
||||
|
||||
model = llama_init.model;
|
||||
ctx = llama_init.context;
|
||||
|
|
|
@ -455,6 +455,7 @@ extern "C" {
|
|||
LLAMA_API int llama_send_device_info (struct llama_context * ctx, struct device_info * dev_info);
|
||||
LLAMA_API int llama_bcast_startup_args(struct llama_context * ctx, uint32_t rank, struct startup_args * args);
|
||||
LLAMA_API int llama_bcast_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers);
|
||||
LLAMA_API int llama_rebuild_topo (struct llama_context * ctx, uint32_t * n_layer_window, struct device_info * dev_info_set);
|
||||
LLAMA_API int llama_recv_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers);
|
||||
|
||||
LLAMA_API int llm_load_tensors(
|
||||
|
@ -462,6 +463,10 @@ extern "C" {
|
|||
struct llama_model * model,
|
||||
struct llama_model_params params);
|
||||
|
||||
LLAMA_API void llama_update_context_with_rankworld(struct llama_context * ctx,
|
||||
uint32_t rank,
|
||||
uint32_t n_world);
|
||||
|
||||
LLAMA_API struct llama_context * llama_new_context_with_model(
|
||||
struct llama_model * model,
|
||||
struct llama_context_params params);
|
||||
|
|
116
src/llama.cpp
116
src/llama.cpp
|
@ -172,6 +172,19 @@ static void zeros(std::ofstream & file, size_t n) {
|
|||
}
|
||||
}
|
||||
|
||||
// zmq helpers
|
||||
static std::vector<zmq::message_t> dev_infos_to_messages(const device_info* infos,
|
||||
uint32_t n_world){
|
||||
std::vector<zmq::message_t> res;
|
||||
for (uint32_t i = 0; i < n_world; ++i) {
|
||||
char * buffer = nullptr;
|
||||
size_t buffer_size = serialize(&infos[i], &buffer);
|
||||
res.emplace_back(buffer, buffer_size);
|
||||
free(buffer);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
LLAMA_ATTRIBUTE_FORMAT(1, 2)
|
||||
static std::string format(const char * fmt, ...) {
|
||||
va_list ap;
|
||||
|
@ -2583,6 +2596,7 @@ static_assert(std::is_trivially_copyable<llama_hparams>::value, "llama_hparams m
|
|||
struct llama_cparams {
|
||||
uint32_t n_world;
|
||||
uint32_t rank;
|
||||
uint32_t original_next_rank; // original rank of the next node
|
||||
uint32_t n_layer_window[32];
|
||||
bool prefetch;
|
||||
bool force;
|
||||
|
@ -20511,6 +20525,95 @@ int llama_bcast_layer_setup(struct llama_context * ctx, uint32_t * n_layer_windo
|
|||
return 0;
|
||||
}
|
||||
|
||||
LLAMA_API int llama_rebuild_topo(llama_context *ctx,
|
||||
uint32_t *n_layer_window,
|
||||
device_info *dev_info_set) {
|
||||
uint32_t n_world = ctx->cparams.n_world;
|
||||
uint32_t my_rank = ctx->cparams.rank;
|
||||
device_info* dev_info_ptr = nullptr;
|
||||
if (dev_info_set == nullptr){
|
||||
// for rank!=0, recv all devices info
|
||||
std::vector<zmq::message_t> msgs;
|
||||
if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) {
|
||||
return -1;
|
||||
}
|
||||
dev_info_ptr = new device_info[n_world];
|
||||
for (size_t i = 0; i < msgs.size(); i++) {
|
||||
deserialize((const char *)msgs[i].data(), &dev_info_ptr[i]);
|
||||
}
|
||||
GGML_ASSERT(msgs.size() == n_world);
|
||||
}else{
|
||||
dev_info_ptr = dev_info_set;
|
||||
}
|
||||
|
||||
GGML_ASSERT(ctx != nullptr && ctx->send_socket != nullptr);
|
||||
|
||||
// notify next rank
|
||||
auto next_rank = (my_rank + 1) % n_world;
|
||||
if(n_layer_window[next_rank] <= 0 && next_rank != 0){
|
||||
try {
|
||||
auto msgs = dev_infos_to_messages(dev_info_ptr, n_world);
|
||||
ctx->send_socket->set(zmq::sockopt::linger, 3500);
|
||||
zmq::send_multipart(*ctx->send_socket, msgs);
|
||||
} catch (const zmq::error_t& e) {
|
||||
LLAMA_LOG_INFO("Failed to send data: %s\n", e.what());
|
||||
if(!dev_info_set){
|
||||
delete[] dev_info_ptr;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// check myself's layer
|
||||
zmq::socket_t* socket_to_close = nullptr;
|
||||
if(n_layer_window[my_rank] > 0) {
|
||||
// reconstruct socket to the next valid rank
|
||||
std::string next_ip;
|
||||
auto current_rank = my_rank;
|
||||
while(next_rank!=my_rank){
|
||||
if(n_layer_window[next_rank] > 0){
|
||||
next_ip = dev_info_ptr[current_rank].next_ip;
|
||||
break;
|
||||
}
|
||||
next_rank = (next_rank + 1) % n_world;
|
||||
current_rank = (current_rank + 1) % n_world;
|
||||
}
|
||||
if(!next_ip.empty()){
|
||||
if((my_rank+1)%n_world != next_rank){
|
||||
socket_to_close = ctx->send_socket;
|
||||
ctx->send_socket = new zmq::socket_t(*ctx->sock_context, zmq::socket_type::push);
|
||||
std::string send_endp = "tcp://" + next_ip + ":" + std::to_string(map_rank_to_port(next_rank, ctx->data_port));
|
||||
ctx->send_socket->connect(send_endp);
|
||||
ctx->next_node_ip = next_ip;
|
||||
ctx->cparams.original_next_rank = next_rank;
|
||||
}
|
||||
if(next_rank != 0){
|
||||
try {
|
||||
auto msgs = dev_infos_to_messages(dev_info_ptr, n_world);
|
||||
zmq::send_multipart(*ctx->send_socket, msgs);
|
||||
} catch (const zmq::error_t &e) {
|
||||
LLAMA_LOG_INFO("Error binding/connecting recv socket to endpoint: %s", e.what());
|
||||
if(!dev_info_set){
|
||||
delete[] dev_info_ptr;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
// only one node
|
||||
ctx->next_node_ip = "";
|
||||
}
|
||||
}
|
||||
if(!dev_info_set){
|
||||
delete[] dev_info_ptr;
|
||||
}
|
||||
if(socket_to_close != nullptr){
|
||||
socket_to_close->close();
|
||||
delete socket_to_close;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int llama_recv_layer_setup(struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers) {
|
||||
uint32_t n_world = ctx->cparams.n_world;
|
||||
uint32_t my_rank = ctx->cparams.rank;
|
||||
|
@ -20545,7 +20648,8 @@ int llama_recv_layer_setup(struct llama_context * ctx, uint32_t * n_layer_window
|
|||
void llama_free_sockets(struct llama_context * ctx, char ** msg) {
|
||||
const uint32_t n_world = ctx->cparams.n_world;
|
||||
const uint32_t my_rank = ctx->cparams.rank;
|
||||
const uint32_t next_rank = (my_rank + 1) % n_world;
|
||||
// to adapt to the new topology, use old next_rank
|
||||
const uint32_t next_rank = ctx->cparams.original_next_rank;
|
||||
|
||||
if (n_world == 1) {
|
||||
return;
|
||||
|
@ -20571,6 +20675,15 @@ void llama_free_sockets(struct llama_context * ctx, char ** msg) {
|
|||
}
|
||||
}
|
||||
|
||||
void llama_update_context_with_rankworld(struct llama_context * ctx,
|
||||
uint32_t rank,
|
||||
uint32_t n_world) {
|
||||
if(ctx) {
|
||||
ctx->cparams.rank = rank;
|
||||
ctx->cparams.n_world = n_world;
|
||||
}
|
||||
}
|
||||
|
||||
struct llama_context * llama_new_context_with_model(
|
||||
struct llama_model * model,
|
||||
struct llama_context_params params) {
|
||||
|
@ -20587,6 +20700,7 @@ struct llama_context * llama_new_context_with_model(
|
|||
ctx->cparams.n_world = params.n_world;
|
||||
ctx->cparams.rank = params.rank;
|
||||
ctx->cparams.force = params.force;
|
||||
ctx->cparams.original_next_rank = (params.rank + 1) % params.n_world;
|
||||
return ctx;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue