mirror of
https://github.com/Lizonghang/prima.cpp.git
synced 2025-09-10 04:34:34 +00:00
fix try_connect
This commit is contained in:
parent
d1b97f798e
commit
d6c8d322cd
8 changed files with 78 additions and 50 deletions
9
Makefile
9
Makefile
|
@ -950,7 +950,8 @@ OBJ_LLAMA = \
|
||||||
src/llama-grammar.o \
|
src/llama-grammar.o \
|
||||||
src/llama-sampling.o \
|
src/llama-sampling.o \
|
||||||
src/unicode.o \
|
src/unicode.o \
|
||||||
src/unicode-data.o
|
src/unicode-data.o \
|
||||||
|
src/network-utils.o \
|
||||||
|
|
||||||
OBJ_COMMON = \
|
OBJ_COMMON = \
|
||||||
common/profiler.o \
|
common/profiler.o \
|
||||||
|
@ -1139,6 +1140,11 @@ src/unicode-data.o: \
|
||||||
src/unicode-data.cpp \
|
src/unicode-data.cpp \
|
||||||
src/unicode-data.h
|
src/unicode-data.h
|
||||||
$(CXX) $(CXXFLAGS) -c $< -o $@
|
$(CXX) $(CXXFLAGS) -c $< -o $@
|
||||||
|
|
||||||
|
src/network-utils.o: \
|
||||||
|
src/network-utils.cpp \
|
||||||
|
src/network-utils.h
|
||||||
|
$(CXX) $(CXXFLAGS) -c $< -o $@
|
||||||
|
|
||||||
src/llama.o: \
|
src/llama.o: \
|
||||||
src/llama.cpp \
|
src/llama.cpp \
|
||||||
|
@ -1147,6 +1153,7 @@ src/llama.o: \
|
||||||
src/llama-grammar.h \
|
src/llama-grammar.h \
|
||||||
src/llama-sampling.h \
|
src/llama-sampling.h \
|
||||||
src/unicode.h \
|
src/unicode.h \
|
||||||
|
src/network-utils.h \
|
||||||
include/llama.h \
|
include/llama.h \
|
||||||
ggml/include/ggml-cuda.h \
|
ggml/include/ggml-cuda.h \
|
||||||
ggml/include/ggml-metal.h \
|
ggml/include/ggml-metal.h \
|
||||||
|
|
|
@ -1718,7 +1718,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
|
||||||
|
|
||||||
// sychronize device profile to the master node
|
// sychronize device profile to the master node
|
||||||
NodeType node_type;
|
NodeType node_type;
|
||||||
char is_fowarder[32] = {0};
|
char is_forwarder[32] = {0};
|
||||||
if (my_rank == 0) {
|
if (my_rank == 0) {
|
||||||
if (auto_schedule) {
|
if (auto_schedule) {
|
||||||
std::vector<device_info> dev_info_set(n_world);
|
std::vector<device_info> dev_info_set(n_world);
|
||||||
|
@ -1735,7 +1735,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
|
||||||
return iparams;
|
return iparams;
|
||||||
}
|
}
|
||||||
llama_bcast_layer_setup(lctx, n_layer_window, n_gpu_layers);
|
llama_bcast_layer_setup(lctx, n_layer_window, n_gpu_layers);
|
||||||
llama_rebuild_topo(lctx, n_layer_window, dev_info_set.data());
|
llama_rebuild_topo(lctx, n_layer_window, dev_info_set.data(), &node_type, is_forwarder);
|
||||||
} else {
|
} else {
|
||||||
// use the user-defined n_layer_window
|
// use the user-defined n_layer_window
|
||||||
std::copy(std::begin(params.n_layer_window), std::end(params.n_layer_window), n_layer_window);
|
std::copy(std::begin(params.n_layer_window), std::end(params.n_layer_window), n_layer_window);
|
||||||
|
@ -1745,7 +1745,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
|
||||||
if (auto_schedule){
|
if (auto_schedule){
|
||||||
llama_send_device_info(lctx, &dev_info);
|
llama_send_device_info(lctx, &dev_info);
|
||||||
llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers);
|
llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers);
|
||||||
llama_rebuild_topo (lctx, n_layer_window, nullptr, &node_type, is_fowarder);
|
llama_rebuild_topo (lctx, n_layer_window, nullptr, &node_type, is_forwarder);
|
||||||
} else {
|
} else {
|
||||||
llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers);
|
llama_recv_layer_setup(lctx, n_layer_window, n_gpu_layers);
|
||||||
}
|
}
|
||||||
|
@ -1764,7 +1764,7 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
|
||||||
std::vector<uint32_t> n_layer_window_temp = {n_layer_window[0]}, n_gpu_layers_temp = {n_gpu_layers[0]};
|
std::vector<uint32_t> n_layer_window_temp = {n_layer_window[0]}, n_gpu_layers_temp = {n_gpu_layers[0]};
|
||||||
|
|
||||||
for (uint32_t i = 1; i < n_world; i++) {
|
for (uint32_t i = 1; i < n_world; i++) {
|
||||||
if (n_layer_window[i] <= 0 && is_fowarder[i] == 0) {
|
if (n_layer_window[i] <= 0 && is_forwarder[i] == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (i <= my_rank) {
|
if (i <= my_rank) {
|
||||||
|
@ -1797,10 +1797,10 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
|
||||||
|
|
||||||
llama_update_context_with_rankworld(lctx, update_rank, update_n_world);
|
llama_update_context_with_rankworld(lctx, update_rank, update_n_world);
|
||||||
|
|
||||||
if(node_type == NodeType::NODE_TYPE_EXIT){
|
if(node_type == NodeType::NODE_TYPE_FORWARDER){
|
||||||
//just foward
|
//just foward
|
||||||
while (true) {
|
while (true) {
|
||||||
llama_foward_messages(lctx);
|
llama_forward_messages(lctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2833,7 +2833,7 @@ void TopoRebuildHelperInfo::deserialize(const char *buffer) {
|
||||||
LOG_ERR("%s: failed to deserialize device info\n", __func__);
|
LOG_ERR("%s: failed to deserialize device info\n", __func__);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
memcpy(&is_fowarder, buffer + buffer_size, 1);
|
memcpy(&is_forwarder, buffer + buffer_size, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t TopoRebuildHelperInfo::serialize(char **buffer) const{
|
size_t TopoRebuildHelperInfo::serialize(char **buffer) const{
|
||||||
|
@ -2845,7 +2845,7 @@ size_t TopoRebuildHelperInfo::serialize(char **buffer) const{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
memcpy(buffer_, *buffer, buffer_size);
|
memcpy(buffer_, *buffer, buffer_size);
|
||||||
memcpy(buffer_ + buffer_size, &is_fowarder, 1);
|
memcpy(buffer_ + buffer_size, &is_forwarder, 1);
|
||||||
free(*buffer);
|
free(*buffer);
|
||||||
*buffer = buffer_;
|
*buffer = buffer_;
|
||||||
return buffer_size + 1;
|
return buffer_size + 1;
|
||||||
|
|
|
@ -348,11 +348,11 @@ struct device_info {
|
||||||
|
|
||||||
struct TopoRebuildHelperInfo{
|
struct TopoRebuildHelperInfo{
|
||||||
struct device_info dev_info;
|
struct device_info dev_info;
|
||||||
char is_fowarder;
|
char is_forwarder;
|
||||||
|
|
||||||
TopoRebuildHelperInfo():
|
TopoRebuildHelperInfo():
|
||||||
dev_info(),
|
dev_info(),
|
||||||
is_fowarder(0){}
|
is_forwarder(0){}
|
||||||
|
|
||||||
void deserialize(const char * buffer);
|
void deserialize(const char * buffer);
|
||||||
size_t serialize(char ** buffer) const;
|
size_t serialize(char ** buffer) const;
|
||||||
|
|
|
@ -451,7 +451,7 @@ extern "C" {
|
||||||
|
|
||||||
enum NodeType{
|
enum NodeType{
|
||||||
NODE_TYPE_WORKER,
|
NODE_TYPE_WORKER,
|
||||||
NODE_TYPE_FOWARDER,
|
NODE_TYPE_FORWARDER,
|
||||||
NODE_TYPE_EXIT,
|
NODE_TYPE_EXIT,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -463,10 +463,10 @@ extern "C" {
|
||||||
LLAMA_API int llama_bcast_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers);
|
LLAMA_API int llama_bcast_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers);
|
||||||
LLAMA_API int llama_rebuild_topo (struct llama_context * ctx,
|
LLAMA_API int llama_rebuild_topo (struct llama_context * ctx,
|
||||||
uint32_t * n_layer_window,
|
uint32_t * n_layer_window,
|
||||||
struct device_info * dev_info_set,
|
struct device_info * desv_info_set,
|
||||||
NodeType* node_type,
|
NodeType* node_type,
|
||||||
char * is_fowarder);
|
char * is_forwarder);
|
||||||
LLAMA_API int llama_foward_messages (struct llama_context * ctx);
|
LLAMA_API int llama_forward_messages (struct llama_context * ctx);
|
||||||
LLAMA_API int llama_recv_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers);
|
LLAMA_API int llama_recv_layer_setup (struct llama_context * ctx, uint32_t * n_layer_window, uint32_t * n_gpu_layers);
|
||||||
|
|
||||||
LLAMA_API int llm_load_tensors(
|
LLAMA_API int llm_load_tensors(
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
#include "ggml-backend.h"
|
#include "ggml-backend.h"
|
||||||
|
|
||||||
#include "profiler.h"
|
#include "profiler.h"
|
||||||
|
#include "network-utils.h"
|
||||||
|
|
||||||
#ifdef GGML_USE_RPC
|
#ifdef GGML_USE_RPC
|
||||||
# include "ggml-rpc.h"
|
# include "ggml-rpc.h"
|
||||||
|
@ -20431,34 +20432,22 @@ static uint32_t map_rank_to_port(uint32_t rank, uint32_t data_port) {
|
||||||
static std::string try_connect(llama_context *ctx, uint32_t rank, TopoRebuildHelperInfo* infos, uint32_t n_world, zmq::socket_t** socket){
|
static std::string try_connect(llama_context *ctx, uint32_t rank, TopoRebuildHelperInfo* infos, uint32_t n_world, zmq::socket_t** socket){
|
||||||
auto prv_rank = (rank - 1 + n_world) % n_world;
|
auto prv_rank = (rank - 1 + n_world) % n_world;
|
||||||
std::string ip = infos[prv_rank].dev_info.next_ip;
|
std::string ip = infos[prv_rank].dev_info.next_ip;
|
||||||
std::string send_endp = "tcp://" + ip + ":" + std::to_string(map_rank_to_port(rank, ctx->data_port));
|
auto port = map_rank_to_port(rank, ctx->data_port);
|
||||||
|
|
||||||
|
if(!isPortOpen(ip, port)){
|
||||||
|
*socket = nullptr;
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
std::string send_endp = "tcp://" + ip + ":" + std::to_string(port);
|
||||||
*socket = new zmq::socket_t(*ctx->sock_context, zmq::socket_type::push);
|
*socket = new zmq::socket_t(*ctx->sock_context, zmq::socket_type::push);
|
||||||
int events = 0;
|
|
||||||
try {
|
try {
|
||||||
(*socket)->set(zmq::sockopt::linger, 0);
|
|
||||||
(*socket)->set(zmq::sockopt::sndtimeo, 500);
|
|
||||||
|
|
||||||
(*socket)->connect(send_endp);
|
(*socket)->connect(send_endp);
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
|
||||||
|
|
||||||
size_t events_size = sizeof(events);
|
|
||||||
(*socket)->getsockopt(ZMQ_EVENTS, &events, &events_size);
|
|
||||||
|
|
||||||
} catch (const zmq::error_t& e) {
|
} catch (const zmq::error_t& e) {
|
||||||
delete *socket;
|
delete *socket;
|
||||||
*socket = nullptr;
|
*socket = nullptr;
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
return ip;
|
||||||
if((events & ZMQ_POLLOUT) != 0){
|
|
||||||
return ip;
|
|
||||||
}else{
|
|
||||||
delete *socket;
|
|
||||||
*socket = nullptr;
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void llama_init_sockets(struct llama_context * ctx, uint32_t n_world, uint32_t my_rank) {
|
void llama_init_sockets(struct llama_context * ctx, uint32_t n_world, uint32_t my_rank) {
|
||||||
|
@ -20639,7 +20628,7 @@ int llama_rebuild_topo(llama_context * ctx,
|
||||||
uint32_t * n_layer_window,
|
uint32_t * n_layer_window,
|
||||||
device_info * dev_info_set,
|
device_info * dev_info_set,
|
||||||
NodeType * node_type,
|
NodeType * node_type,
|
||||||
char * is_fowarder) {
|
char * is_forwarder) {
|
||||||
uint32_t n_world = ctx->cparams.n_world;
|
uint32_t n_world = ctx->cparams.n_world;
|
||||||
uint32_t my_rank = ctx->cparams.rank;
|
uint32_t my_rank = ctx->cparams.rank;
|
||||||
TopoRebuildHelperInfo* topo_helper = new TopoRebuildHelperInfo[n_world];
|
TopoRebuildHelperInfo* topo_helper = new TopoRebuildHelperInfo[n_world];
|
||||||
|
@ -20657,7 +20646,7 @@ int llama_rebuild_topo(llama_context * ctx,
|
||||||
} else {
|
} else {
|
||||||
for (size_t i = 0; i < n_world; i++) {
|
for (size_t i = 0; i < n_world; i++) {
|
||||||
topo_helper[i].dev_info = dev_info_set[i];
|
topo_helper[i].dev_info = dev_info_set[i];
|
||||||
topo_helper[i].is_fowarder = 0;
|
topo_helper[i].is_forwarder = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20666,7 +20655,7 @@ int llama_rebuild_topo(llama_context * ctx,
|
||||||
auto next_rank = (my_rank + 1) % n_world;
|
auto next_rank = (my_rank + 1) % n_world;
|
||||||
auto next_connect_rank = (my_rank + 1) % n_world;
|
auto next_connect_rank = (my_rank + 1) % n_world;
|
||||||
zmq::socket_t* socket_to_close = nullptr;
|
zmq::socket_t* socket_to_close = nullptr;
|
||||||
bool is_not_exit = n_layer_window[my_rank] > 0 || topo_helper[my_rank].is_fowarder == 1;
|
bool is_not_exit = n_layer_window[my_rank] > 0 || topo_helper[my_rank].is_forwarder == 1;
|
||||||
if (is_not_exit){
|
if (is_not_exit){
|
||||||
// reconstruct socket to the next valid rank
|
// reconstruct socket to the next valid rank
|
||||||
auto current_rank = my_rank;
|
auto current_rank = my_rank;
|
||||||
|
@ -20692,13 +20681,13 @@ int llama_rebuild_topo(llama_context * ctx,
|
||||||
for (int i = nodes.size() - 1; i > 0; --i) {
|
for (int i = nodes.size() - 1; i > 0; --i) {
|
||||||
auto rank = nodes[i];
|
auto rank = nodes[i];
|
||||||
ip = try_connect(ctx, rank, topo_helper, n_world, &socket);
|
ip = try_connect(ctx, rank, topo_helper, n_world, &socket);
|
||||||
if(!ip.empty()){
|
if (!ip.empty()) {
|
||||||
topo_helper[rank].is_fowarder = 1;
|
|
||||||
next_connect_rank = rank;
|
next_connect_rank = rank;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(next_connect_rank != next_rank){
|
topo_helper[next_connect_rank].is_forwarder = 1;
|
||||||
|
if (next_connect_rank != next_rank) {
|
||||||
// reset socket
|
// reset socket
|
||||||
GGML_ASSERT(socket != nullptr);
|
GGML_ASSERT(socket != nullptr);
|
||||||
GGML_ASSERT(!ip.empty());
|
GGML_ASSERT(!ip.empty());
|
||||||
|
@ -20708,13 +20697,13 @@ int llama_rebuild_topo(llama_context * ctx,
|
||||||
ctx->cparams.original_next_rank = next_connect_rank;
|
ctx->cparams.original_next_rank = next_connect_rank;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}else if(n_layer_window[next_rank] <= 0 && topo_helper[my_rank].is_fowarder == 0){
|
}else if (n_layer_window[next_rank] <= 0 && topo_helper[next_rank].is_forwarder == 0) {
|
||||||
socket_to_close = ctx->send_socket;
|
socket_to_close = ctx->send_socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
// notify next exiting node
|
// notify next exiting node
|
||||||
if (socket_to_close != nullptr) {
|
if (socket_to_close != nullptr) {
|
||||||
GGML_ASSERT(n_layer_window[next_rank] <= 0 && topo_helper[next_rank].is_fowarder == 0);
|
GGML_ASSERT(n_layer_window[next_rank] <= 0 && topo_helper[next_rank].is_forwarder == 0);
|
||||||
try {
|
try {
|
||||||
auto msgs = topohelper_to_messages(topo_helper, n_world);
|
auto msgs = topohelper_to_messages(topo_helper, n_world);
|
||||||
socket_to_close->set(zmq::sockopt::linger, 3500);
|
socket_to_close->set(zmq::sockopt::linger, 3500);
|
||||||
|
@ -20739,8 +20728,8 @@ int llama_rebuild_topo(llama_context * ctx,
|
||||||
|
|
||||||
if(n_layer_window[my_rank] > 0){
|
if(n_layer_window[my_rank] > 0){
|
||||||
*node_type = NodeType::NODE_TYPE_WORKER;
|
*node_type = NodeType::NODE_TYPE_WORKER;
|
||||||
}else if (topo_helper[my_rank].is_fowarder == 1){
|
}else if (topo_helper[my_rank].is_forwarder == 1){
|
||||||
*node_type = NodeType::NODE_TYPE_FOWARDER;
|
*node_type = NodeType::NODE_TYPE_FORWARDER;
|
||||||
}else{
|
}else{
|
||||||
*node_type = NodeType::NODE_TYPE_EXIT;
|
*node_type = NodeType::NODE_TYPE_EXIT;
|
||||||
}
|
}
|
||||||
|
@ -20766,11 +20755,10 @@ int llama_rebuild_topo(llama_context * ctx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for(size_t i = 0; i < n_world; i++) {
|
for(size_t i = 0; i < n_world; i++) {
|
||||||
is_fowarder[i] = topo_helper[i].is_fowarder;
|
is_forwarder[i] = topo_helper[i].is_forwarder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (socket_to_close != nullptr) {
|
||||||
if(socket_to_close != nullptr){
|
|
||||||
socket_to_close->close();
|
socket_to_close->close();
|
||||||
delete socket_to_close;
|
delete socket_to_close;
|
||||||
}
|
}
|
||||||
|
@ -20778,9 +20766,9 @@ int llama_rebuild_topo(llama_context * ctx,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
LLAMA_API int llama_foward_messages(llama_context *ctx) {
|
int llama_forward_messages(llama_context *ctx) {
|
||||||
zmq::message_t message;
|
zmq::message_t message;
|
||||||
bool more = true;
|
int more = true;
|
||||||
|
|
||||||
while (more) {
|
while (more) {
|
||||||
ctx->recv_socket->recv(message, zmq::recv_flags::none);
|
ctx->recv_socket->recv(message, zmq::recv_flags::none);
|
||||||
|
|
26
src/network-utils.cpp
Normal file
26
src/network-utils.cpp
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
#include "network-utils.h"
|
||||||
|
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
bool isPortOpen(const std::string& ip, uint32_t port, int timeout_sec) {
|
||||||
|
int sock = socket(AF_INET, SOCK_STREAM, 0);
|
||||||
|
if (sock < 0) return false;
|
||||||
|
|
||||||
|
struct timeval tv;
|
||||||
|
tv.tv_sec = timeout_sec;
|
||||||
|
tv.tv_usec = 0;
|
||||||
|
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
|
||||||
|
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
|
||||||
|
|
||||||
|
struct sockaddr_in server;
|
||||||
|
server.sin_addr.s_addr = inet_addr(ip.c_str());
|
||||||
|
server.sin_family = AF_INET;
|
||||||
|
server.sin_port = htons(port);
|
||||||
|
|
||||||
|
int res = connect(sock, (struct sockaddr*)&server, sizeof(server));
|
||||||
|
close(sock);
|
||||||
|
return res == 0;
|
||||||
|
}
|
7
src/network-utils.h
Normal file
7
src/network-utils.h
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
typedef unsigned int uint32_t;
|
||||||
|
|
||||||
|
bool isPortOpen(const std::string& ip, uint32_t port, int timeout_sec = 2);
|
Loading…
Add table
Add a link
Reference in a new issue