diff --git a/common/arg.cpp b/common/arg.cpp index 47d3c5e6..e282c80d 100644 --- a/common/arg.cpp +++ b/common/arg.cpp @@ -675,6 +675,20 @@ gpt_params_context gpt_params_parser_init(gpt_params & params, llama_example ex, params.rank = value; } ).set_env("LLAMA_ARG_RANK")); + add_opt(llama_arg( + {"--data-port"}, "N", + format("data port for distributed inference (default: %d)", params.data_port), + [](gpt_params & params, int value) { + params.data_port = value; + } + ).set_env("LLAMA_ARG_DATA_PORT")); + add_opt(llama_arg( + {"--signal-port"}, "N", + format("signal port for distributed inference (default: %d)", params.signal_port), + [](gpt_params & params, int value) { + params.signal_port = value; + } + ).set_env("LLAMA_ARG_SIGNAL_PORT")); add_opt(llama_arg( {"-lw", "--layer-window", "--n-layer-window"}, "N", format("number of layers to process in each compute (e.g., 16,16)"), diff --git a/common/common.cpp b/common/common.cpp index 374ae1f8..d1d40a9e 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -1810,9 +1810,10 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { n_world = update_n_world; llama_update_context_with_rankworld(lctx, update_rank, update_n_world, worker_rank, n_worker); - - if(node_type == NodeType::NODE_TYPE_FORWARDER){ + + if (node_type == NodeType::NODE_TYPE_FORWARDER) { //just forward + LOG_INF("No layer is assigned to me, and I serve as a network proxy.\n"); std::atomic should_exit{false}; auto t = std::thread([lctx, &should_exit]() { while(!should_exit) { @@ -2032,6 +2033,8 @@ struct llama_context_params llama_context_params_from_gpt_params(const gpt_param } cparams.master_ip = new char[params.master_ip.length() + 1]; std::strcpy(cparams.master_ip, params.master_ip.c_str()); + cparams.data_port = params.data_port; + cparams.signal_port = params.signal_port; if (cparams.next_node_ip != nullptr) { delete[] cparams.next_node_ip; diff --git a/common/common.h b/common/common.h index 0a679213..c6ffe136 100644 --- a/common/common.h +++ b/common/common.h @@ -145,8 +145,10 @@ struct gpt_params { int32_t n_world = 1; // number of devices to use int32_t rank = 0; // my rank for distributed inference uint32_t n_layer_window[32] = {0}; // layer window size on each node - std::string master_ip = "localhost"; // ip address of the master node - std::string next_node_ip = "localhost"; // ip address of my next node + std::string master_ip = "127.0.0.1"; // ip address of the master node + std::string next_node_ip = "127.0.0.1"; // ip address of my next node + uint32_t data_port = 9000; // data port for distributed inference + uint32_t signal_port = 10000; // signal port for distributed inference bool prefetch = false; // prefetch layer weights bool keep_out_in_metal = true; // whether to keep output weights in metal memory, true by default bool force = false; // force to start prefetching after computation diff --git a/include/llama.h b/include/llama.h index 4d6dd80d..3c220562 100644 --- a/include/llama.h +++ b/include/llama.h @@ -330,6 +330,8 @@ extern "C" { bool keep_out_in_metal; // whether to keep output weights in metal memory char * master_ip; // ip address of the master node char * next_node_ip; // ip address of the next node + uint32_t data_port; // data port for distributed inference + uint32_t signal_port; // signal port for distributed inference uint32_t n_ctx; // text context, 0 = from model uint32_t n_predict; // number of tokens to predict uint32_t n_batch; // logical maximum batch size that can be submitted to llama_decode diff --git a/src/llama.cpp b/src/llama.cpp index 915278cc..8b5af567 100644 --- a/src/llama.cpp +++ b/src/llama.cpp @@ -3438,8 +3438,8 @@ struct llama_context { struct ggml_tensor * inp_KQ_mask_cross; // F32 [n_outputs_enc, n_batch] // sockets - std::string master_ip = "localhost"; - std::string next_node_ip = "localhost"; + std::string master_ip = "127.0.0.1"; + std::string next_node_ip = "127.0.0.1"; uint32_t data_port = 9000; uint32_t signal_port = 10000; zmq::context_t * sock_context = nullptr; @@ -20266,6 +20266,8 @@ struct llama_context_params llama_context_default_params() { /*.keep_out_in_metal =*/ true, /*.master_ip =*/ nullptr, /*.next_node_ip =*/ nullptr, + /*.data_port =*/ 9000, + /*.signal_port =*/ 10000, /*.n_ctx =*/ 512, /*.n_predict =*/ 512, /*.n_batch =*/ 2048, @@ -20452,12 +20454,12 @@ static uint32_t map_rank_to_port(uint32_t rank, uint32_t data_port) { return data_port + rank; } -static std::string try_connect(llama_context *ctx, uint32_t rank, TopoRebuildHelperInfo* infos, uint32_t n_world, zmq::socket_t** socket){ - auto prv_rank = (rank - 1 + n_world) % n_world; - std::string ip = infos[prv_rank].dev_info.next_ip; +static std::string try_connect(llama_context * ctx, uint32_t rank, TopoRebuildHelperInfo * infos, uint32_t n_world, zmq::socket_t ** socket){ + auto prev_rank = (rank - 1 + n_world) % n_world; + std::string ip = infos[prev_rank].dev_info.next_ip; auto port = map_rank_to_port(rank, ctx->data_port); - if(!isPortOpen(ip, port)){ + if (!is_port_open(ip, port)) { *socket = nullptr; return ""; } @@ -20679,7 +20681,7 @@ int llama_rebuild_topo(llama_context * ctx, auto next_connect_rank = (my_rank + 1) % n_world; zmq::socket_t* socket_to_close = nullptr; bool is_not_exit = n_layer_window[my_rank] > 0 || topo_helper[my_rank].is_forwarder == 1; - if (is_not_exit){ + if (is_not_exit) { // reconstruct socket to the next valid rank auto current_rank = my_rank; std::vector nodes; @@ -20738,7 +20740,7 @@ int llama_rebuild_topo(llama_context * ctx, } // notify next connect node - if(!ctx->next_node_ip.empty() && is_not_exit){ + if (!ctx->next_node_ip.empty() && is_not_exit) { GGML_ASSERT(ctx->send_socket != nullptr); try { auto msgs = topohelper_to_messages(topo_helper, n_world); @@ -20749,15 +20751,15 @@ int llama_rebuild_topo(llama_context * ctx, } } - if(n_layer_window[my_rank] > 0){ + if (n_layer_window[my_rank] > 0) { *node_type = NodeType::NODE_TYPE_WORKER; - }else if (topo_helper[my_rank].is_forwarder == 1){ + } else if (topo_helper[my_rank].is_forwarder == 1) { *node_type = NodeType::NODE_TYPE_FORWARDER; - }else{ + } else { *node_type = NodeType::NODE_TYPE_EXIT; } - - if(ctx->send_socket != nullptr && *node_type!=NodeType::NODE_TYPE_EXIT){ + + if (ctx->send_socket != nullptr && *node_type != NodeType::NODE_TYPE_EXIT) { // recv the whole view of all nodes std::vector msgs; if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) { @@ -20768,7 +20770,7 @@ int llama_rebuild_topo(llama_context * ctx, topo_helper[i].deserialize((char *)msgs[i].data()); } // broadcast the whole view - if(next_connect_rank!=0){ + if (next_connect_rank!=0) { try { zmq::send_multipart(*ctx->send_socket, msgs); } catch (const zmq::error_t& e) { @@ -20777,7 +20779,7 @@ int llama_rebuild_topo(llama_context * ctx, } } } - for(size_t i = 0; i < n_world; i++) { + for (size_t i = 0; i < n_world; i++) { is_forwarder[i] = topo_helper[i].is_forwarder; } ctx->cparams.node_type = *node_type; @@ -20896,6 +20898,8 @@ struct llama_context * llama_new_context_with_model( ctx->master_ip = params.master_ip; ctx->next_node_ip = params.next_node_ip; + ctx->data_port = params.data_port; + ctx->signal_port = params.signal_port; ctx->cparams.n_world = params.n_world; ctx->cparams.rank = params.rank; ctx->cparams.force = params.force; diff --git a/src/network-utils.cpp b/src/network-utils.cpp index b960153e..e7fa5ab1 100644 --- a/src/network-utils.cpp +++ b/src/network-utils.cpp @@ -5,7 +5,7 @@ #include #include -bool isPortOpen(const std::string& ip, uint32_t port, int timeout_sec) { +bool is_port_open(const std::string& ip, uint32_t port, int timeout_sec) { int sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) return false; diff --git a/src/network-utils.h b/src/network-utils.h index 8f0921e4..7a35475a 100644 --- a/src/network-utils.h +++ b/src/network-utils.h @@ -4,4 +4,4 @@ typedef unsigned int uint32_t; -bool isPortOpen(const std::string& ip, uint32_t port, int timeout_sec = 2); \ No newline at end of file +bool is_port_open(const std::string& ip, uint32_t port, int timeout_sec = 2); \ No newline at end of file