diff --git a/Makefile b/Makefile index 06d91984..0d794eb8 100644 --- a/Makefile +++ b/Makefile @@ -950,7 +950,8 @@ OBJ_LLAMA = \ src/llama-grammar.o \ src/llama-sampling.o \ src/unicode.o \ - src/unicode-data.o + src/unicode-data.o \ + src/network-utils.o \ OBJ_COMMON = \ common/profiler.o \ @@ -1139,6 +1140,11 @@ src/unicode-data.o: \ src/unicode-data.cpp \ src/unicode-data.h $(CXX) $(CXXFLAGS) -c $< -o $@ + +src/network-utils.o: \ + src/network-utils.cpp \ + src/network-utils.h + $(CXX) $(CXXFLAGS) -c $< -o $@ src/llama.o: \ src/llama.cpp \ @@ -1147,6 +1153,7 @@ src/llama.o: \ src/llama-grammar.h \ src/llama-sampling.h \ src/unicode.h \ + src/network-utils.h \ include/llama.h \ ggml/include/ggml-cuda.h \ ggml/include/ggml-metal.h \ diff --git a/common/common.cpp b/common/common.cpp index 18e0804f..d40647ec 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -1718,7 +1718,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { // sychronize device profile to the master node NodeType node_type; - char is_fowarder[32] = {0}; + char is_forwarder[32] = {0}; if (my_rank == 0) { if (auto_schedule) { std::vector dev_info_set(n_world); @@ -1735,7 +1735,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { return iparams; } llama_bcast_layer_setup(lctx, n_layer_window, n_gpu_layers); - llama_rebuild_topo(lctx, n_layer_window, dev_info_set.data()); + llama_rebuild_topo(lctx, n_layer_window, dev_info_set.data(), &node_type, is_forwarder); } else { // use the user-defined n_layer_window std::copy(std::begin(params.n_layer_window), std::end(params.n_layer_window), n_layer_window); @@ -1745,7 +1745,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { if (auto_schedule){ llama_send_device_info(lctx, &dev_info); llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers); - llama_rebuild_topo (lctx, n_layer_window, nullptr, &node_type, is_fowarder); + llama_rebuild_topo (lctx, n_layer_window, nullptr, &node_type, is_forwarder); } else { llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers); } @@ -1764,7 +1764,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { std::vector n_layer_window_temp = {n_layer_window[0]}, n_gpu_layers_temp = {n_gpu_layers[0]}; for (uint32_t i = 1; i < n_world; i++) { - if (n_layer_window[i] <= 0 && is_fowarder[i] == 0) { + if (n_layer_window[i] <= 0 && is_forwarder[i] == 0) { continue; } if (i <= my_rank) { @@ -1797,10 +1797,10 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) { llama_update_context_with_rankworld(lctx, update_rank, update_n_world); - if(node_type == NodeType::NODE_TYPE_EXIT){ + if(node_type == NodeType::NODE_TYPE_FORWARDER){ //just foward while (true) { - llama_foward_messages(lctx); + llama_forward_messages(lctx); } } diff --git a/common/profiler.cpp b/common/profiler.cpp index fa1a56b5..18fe795d 100644 --- a/common/profiler.cpp +++ b/common/profiler.cpp @@ -2833,7 +2833,7 @@ void TopoRebuildHelperInfo::deserialize(const char *buffer) { LOG_ERR("%s: failed to deserialize device info\n", __func__); return; } - memcpy(&is_fowarder, buffer + buffer_size, 1); + memcpy(&is_forwarder, buffer + buffer_size, 1); } size_t TopoRebuildHelperInfo::serialize(char **buffer) const{ @@ -2845,7 +2845,7 @@ size_t TopoRebuildHelperInfo::serialize(char **buffer) const{ return 0; } memcpy(buffer_, *buffer, buffer_size); - memcpy(buffer_ + buffer_size, &is_fowarder, 1); + memcpy(buffer_ + buffer_size, &is_forwarder, 1); free(*buffer); *buffer = buffer_; return buffer_size + 1; diff --git a/common/profiler.h b/common/profiler.h index 5ac73a8c..ff69a454 100644 --- a/common/profiler.h +++ b/common/profiler.h @@ -348,11 +348,11 @@ struct device_info { struct TopoRebuildHelperInfo{ struct device_info dev_info; - char is_fowarder; + char is_forwarder; TopoRebuildHelperInfo(): dev_info(), - is_fowarder(0){} + is_forwarder(0){} void deserialize(const char * buffer); size_t serialize(char ** buffer) const; diff --git a/include/llama.h b/include/llama.h index c2e4d43c..21f77288 100644 --- a/include/llama.h +++ b/include/llama.h @@ -451,7 +451,7 @@ extern "C" { enum NodeType{ NODE_TYPE_WORKER, - NODE_TYPE_FOWARDER, + NODE_TYPE_FORWARDER, NODE_TYPE_EXIT, }; @@ -463,10 +463,10 @@ extern "C" { LLAMA_API int llama_bcast_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers); LLAMA_API int llama_rebuild_topo (struct llama_context * ctx, uint32_t * n_layer_window, - struct device_info * dev_info_set, + struct device_info * desv_info_set, NodeType* node_type, - char * is_fowarder); - LLAMA_API int llama_foward_messages (struct llama_context * ctx); + char * is_forwarder); + LLAMA_API int llama_forward_messages (struct llama_context * ctx); LLAMA_API int llama_recv_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers); LLAMA_API int llm_load_tensors( diff --git a/src/llama.cpp b/src/llama.cpp index dd7b0c82..281c7360 100644 --- a/src/llama.cpp +++ b/src/llama.cpp @@ -11,6 +11,7 @@ #include "ggml-backend.h" #include "profiler.h" +#include "network-utils.h" #ifdef GGML_USE_RPC # include "ggml-rpc.h" @@ -20431,34 +20432,22 @@ static uint32_t map_rank_to_port(uint32_t rank, uint32_t data_port) { static std::string try_connect(llama_context *ctx, uint32_t rank, TopoRebuildHelperInfo* infos, uint32_t n_world, zmq::socket_t** socket){ auto prv_rank = (rank - 1 + n_world) % n_world; std::string ip = infos[prv_rank].dev_info.next_ip; - std::string send_endp = "tcp://" + ip + ":" + std::to_string(map_rank_to_port(rank, ctx->data_port)); + auto port = map_rank_to_port(rank, ctx->data_port); + + if(!isPortOpen(ip, port)){ + *socket = nullptr; + return ""; + } + std::string send_endp = "tcp://" + ip + ":" + std::to_string(port); *socket = new zmq::socket_t(*ctx->sock_context, zmq::socket_type::push); - int events = 0; try { - (*socket)->set(zmq::sockopt::linger, 0); - (*socket)->set(zmq::sockopt::sndtimeo, 500); - (*socket)->connect(send_endp); - - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - size_t events_size = sizeof(events); - (*socket)->getsockopt(ZMQ_EVENTS, &events, &events_size); - } catch (const zmq::error_t& e) { delete *socket; *socket = nullptr; return ""; } - - if((events & ZMQ_POLLOUT) != 0){ - return ip; - }else{ - delete *socket; - *socket = nullptr; - return ""; - } - + return ip; } void llama_init_sockets(struct llama_context * ctx, uint32_t n_world, uint32_t my_rank) { @@ -20639,7 +20628,7 @@ int llama_rebuild_topo(llama_context * ctx, uint32_t * n_layer_window, device_info * dev_info_set, NodeType * node_type, - char * is_fowarder) { + char * is_forwarder) { uint32_t n_world = ctx->cparams.n_world; uint32_t my_rank = ctx->cparams.rank; TopoRebuildHelperInfo* topo_helper = new TopoRebuildHelperInfo[n_world]; @@ -20657,7 +20646,7 @@ int llama_rebuild_topo(llama_context * ctx, } else { for (size_t i = 0; i < n_world; i++) { topo_helper[i].dev_info = dev_info_set[i]; - topo_helper[i].is_fowarder = 0; + topo_helper[i].is_forwarder = 0; } } @@ -20666,7 +20655,7 @@ int llama_rebuild_topo(llama_context * ctx, auto next_rank = (my_rank + 1) % n_world; auto next_connect_rank = (my_rank + 1) % n_world; zmq::socket_t* socket_to_close = nullptr; - bool is_not_exit = n_layer_window[my_rank] > 0 || topo_helper[my_rank].is_fowarder == 1; + bool is_not_exit = n_layer_window[my_rank] > 0 || topo_helper[my_rank].is_forwarder == 1; if (is_not_exit){ // reconstruct socket to the next valid rank auto current_rank = my_rank; @@ -20692,13 +20681,13 @@ int llama_rebuild_topo(llama_context * ctx, for (int i = nodes.size() - 1; i > 0; --i) { auto rank = nodes[i]; ip = try_connect(ctx, rank, topo_helper, n_world, &socket); - if(!ip.empty()){ - topo_helper[rank].is_fowarder = 1; + if (!ip.empty()) { next_connect_rank = rank; break; } } - if(next_connect_rank != next_rank){ + topo_helper[next_connect_rank].is_forwarder = 1; + if (next_connect_rank != next_rank) { // reset socket GGML_ASSERT(socket != nullptr); GGML_ASSERT(!ip.empty()); @@ -20708,13 +20697,13 @@ int llama_rebuild_topo(llama_context * ctx, ctx->cparams.original_next_rank = next_connect_rank; } } - }else if(n_layer_window[next_rank] <= 0 && topo_helper[my_rank].is_fowarder == 0){ + }else if (n_layer_window[next_rank] <= 0 && topo_helper[next_rank].is_forwarder == 0) { socket_to_close = ctx->send_socket; } // notify next exiting node if (socket_to_close != nullptr) { - GGML_ASSERT(n_layer_window[next_rank] <= 0 && topo_helper[next_rank].is_fowarder == 0); + GGML_ASSERT(n_layer_window[next_rank] <= 0 && topo_helper[next_rank].is_forwarder == 0); try { auto msgs = topohelper_to_messages(topo_helper, n_world); socket_to_close->set(zmq::sockopt::linger, 3500); @@ -20739,8 +20728,8 @@ int llama_rebuild_topo(llama_context * ctx, if(n_layer_window[my_rank] > 0){ *node_type = NodeType::NODE_TYPE_WORKER; - }else if (topo_helper[my_rank].is_fowarder == 1){ - *node_type = NodeType::NODE_TYPE_FOWARDER; + }else if (topo_helper[my_rank].is_forwarder == 1){ + *node_type = NodeType::NODE_TYPE_FORWARDER; }else{ *node_type = NodeType::NODE_TYPE_EXIT; } @@ -20766,11 +20755,10 @@ int llama_rebuild_topo(llama_context * ctx, } } for(size_t i = 0; i < n_world; i++) { - is_fowarder[i] = topo_helper[i].is_fowarder; + is_forwarder[i] = topo_helper[i].is_forwarder; } - - if(socket_to_close != nullptr){ + if (socket_to_close != nullptr) { socket_to_close->close(); delete socket_to_close; } @@ -20778,9 +20766,9 @@ int llama_rebuild_topo(llama_context * ctx, return 0; } -LLAMA_API int llama_foward_messages(llama_context *ctx) { +int llama_forward_messages(llama_context *ctx) { zmq::message_t message; - bool more = true; + int more = true; while (more) { ctx->recv_socket->recv(message, zmq::recv_flags::none); diff --git a/src/network-utils.cpp b/src/network-utils.cpp new file mode 100644 index 00000000..b960153e --- /dev/null +++ b/src/network-utils.cpp @@ -0,0 +1,26 @@ +#include "network-utils.h" + +#include +#include +#include +#include + +bool isPortOpen(const std::string& ip, uint32_t port, int timeout_sec) { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) return false; + + struct timeval tv; + tv.tv_sec = timeout_sec; + tv.tv_usec = 0; + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + + struct sockaddr_in server; + server.sin_addr.s_addr = inet_addr(ip.c_str()); + server.sin_family = AF_INET; + server.sin_port = htons(port); + + int res = connect(sock, (struct sockaddr*)&server, sizeof(server)); + close(sock); + return res == 0; +} \ No newline at end of file diff --git a/src/network-utils.h b/src/network-utils.h new file mode 100644 index 00000000..8f0921e4 --- /dev/null +++ b/src/network-utils.h @@ -0,0 +1,7 @@ +#pragma once + +#include + +typedef unsigned int uint32_t; + +bool isPortOpen(const std::string& ip, uint32_t port, int timeout_sec = 2); \ No newline at end of file