mirror of
https://github.com/Lizonghang/prima.cpp.git
synced 2025-09-08 04:49:02 +00:00
fix: change default ip to 127.0.0.1
& improve args for setting ports
fix: change default ip to `127.0.0.1` & improve args for setting ports
This commit is contained in:
commit
2b902f89bd
7 changed files with 46 additions and 21 deletions
|
@ -675,6 +675,20 @@ gpt_params_context gpt_params_parser_init(gpt_params & params, llama_example ex,
|
|||
params.rank = value;
|
||||
}
|
||||
).set_env("LLAMA_ARG_RANK"));
|
||||
add_opt(llama_arg(
|
||||
{"--data-port"}, "N",
|
||||
format("data port for distributed inference (default: %d)", params.data_port),
|
||||
[](gpt_params & params, int value) {
|
||||
params.data_port = value;
|
||||
}
|
||||
).set_env("LLAMA_ARG_DATA_PORT"));
|
||||
add_opt(llama_arg(
|
||||
{"--signal-port"}, "N",
|
||||
format("signal port for distributed inference (default: %d)", params.signal_port),
|
||||
[](gpt_params & params, int value) {
|
||||
params.signal_port = value;
|
||||
}
|
||||
).set_env("LLAMA_ARG_SIGNAL_PORT"));
|
||||
add_opt(llama_arg(
|
||||
{"-lw", "--layer-window", "--n-layer-window"}, "N",
|
||||
format("number of layers to process in each compute (e.g., 16,16)"),
|
||||
|
|
|
@ -1810,9 +1810,10 @@ struct llama_init_result llama_init_from_gpt_params(gpt_params & params) {
|
|||
n_world = update_n_world;
|
||||
|
||||
llama_update_context_with_rankworld(lctx, update_rank, update_n_world, worker_rank, n_worker);
|
||||
|
||||
if(node_type == NodeType::NODE_TYPE_FORWARDER){
|
||||
|
||||
if (node_type == NodeType::NODE_TYPE_FORWARDER) {
|
||||
//just forward
|
||||
LOG_INF("No layer is assigned to me, and I serve as a network proxy.\n");
|
||||
std::atomic<bool> should_exit{false};
|
||||
auto t = std::thread([lctx, &should_exit]() {
|
||||
while(!should_exit) {
|
||||
|
@ -2032,6 +2033,8 @@ struct llama_context_params llama_context_params_from_gpt_params(const gpt_param
|
|||
}
|
||||
cparams.master_ip = new char[params.master_ip.length() + 1];
|
||||
std::strcpy(cparams.master_ip, params.master_ip.c_str());
|
||||
cparams.data_port = params.data_port;
|
||||
cparams.signal_port = params.signal_port;
|
||||
|
||||
if (cparams.next_node_ip != nullptr) {
|
||||
delete[] cparams.next_node_ip;
|
||||
|
|
|
@ -145,8 +145,10 @@ struct gpt_params {
|
|||
int32_t n_world = 1; // number of devices to use
|
||||
int32_t rank = 0; // my rank for distributed inference
|
||||
uint32_t n_layer_window[32] = {0}; // layer window size on each node
|
||||
std::string master_ip = "localhost"; // ip address of the master node
|
||||
std::string next_node_ip = "localhost"; // ip address of my next node
|
||||
std::string master_ip = "127.0.0.1"; // ip address of the master node
|
||||
std::string next_node_ip = "127.0.0.1"; // ip address of my next node
|
||||
uint32_t data_port = 9000; // data port for distributed inference
|
||||
uint32_t signal_port = 10000; // signal port for distributed inference
|
||||
bool prefetch = false; // prefetch layer weights
|
||||
bool keep_out_in_metal = true; // whether to keep output weights in metal memory, true by default
|
||||
bool force = false; // force to start prefetching after computation
|
||||
|
|
|
@ -330,6 +330,8 @@ extern "C" {
|
|||
bool keep_out_in_metal; // whether to keep output weights in metal memory
|
||||
char * master_ip; // ip address of the master node
|
||||
char * next_node_ip; // ip address of the next node
|
||||
uint32_t data_port; // data port for distributed inference
|
||||
uint32_t signal_port; // signal port for distributed inference
|
||||
uint32_t n_ctx; // text context, 0 = from model
|
||||
uint32_t n_predict; // number of tokens to predict
|
||||
uint32_t n_batch; // logical maximum batch size that can be submitted to llama_decode
|
||||
|
|
|
@ -3438,8 +3438,8 @@ struct llama_context {
|
|||
struct ggml_tensor * inp_KQ_mask_cross; // F32 [n_outputs_enc, n_batch]
|
||||
|
||||
// sockets
|
||||
std::string master_ip = "localhost";
|
||||
std::string next_node_ip = "localhost";
|
||||
std::string master_ip = "127.0.0.1";
|
||||
std::string next_node_ip = "127.0.0.1";
|
||||
uint32_t data_port = 9000;
|
||||
uint32_t signal_port = 10000;
|
||||
zmq::context_t * sock_context = nullptr;
|
||||
|
@ -20266,6 +20266,8 @@ struct llama_context_params llama_context_default_params() {
|
|||
/*.keep_out_in_metal =*/ true,
|
||||
/*.master_ip =*/ nullptr,
|
||||
/*.next_node_ip =*/ nullptr,
|
||||
/*.data_port =*/ 9000,
|
||||
/*.signal_port =*/ 10000,
|
||||
/*.n_ctx =*/ 512,
|
||||
/*.n_predict =*/ 512,
|
||||
/*.n_batch =*/ 2048,
|
||||
|
@ -20452,12 +20454,12 @@ static uint32_t map_rank_to_port(uint32_t rank, uint32_t data_port) {
|
|||
return data_port + rank;
|
||||
}
|
||||
|
||||
static std::string try_connect(llama_context *ctx, uint32_t rank, TopoRebuildHelperInfo* infos, uint32_t n_world, zmq::socket_t** socket){
|
||||
auto prv_rank = (rank - 1 + n_world) % n_world;
|
||||
std::string ip = infos[prv_rank].dev_info.next_ip;
|
||||
static std::string try_connect(llama_context * ctx, uint32_t rank, TopoRebuildHelperInfo * infos, uint32_t n_world, zmq::socket_t ** socket){
|
||||
auto prev_rank = (rank - 1 + n_world) % n_world;
|
||||
std::string ip = infos[prev_rank].dev_info.next_ip;
|
||||
auto port = map_rank_to_port(rank, ctx->data_port);
|
||||
|
||||
if(!isPortOpen(ip, port)){
|
||||
if (!is_port_open(ip, port)) {
|
||||
*socket = nullptr;
|
||||
return "";
|
||||
}
|
||||
|
@ -20679,7 +20681,7 @@ int llama_rebuild_topo(llama_context * ctx,
|
|||
auto next_connect_rank = (my_rank + 1) % n_world;
|
||||
zmq::socket_t* socket_to_close = nullptr;
|
||||
bool is_not_exit = n_layer_window[my_rank] > 0 || topo_helper[my_rank].is_forwarder == 1;
|
||||
if (is_not_exit){
|
||||
if (is_not_exit) {
|
||||
// reconstruct socket to the next valid rank
|
||||
auto current_rank = my_rank;
|
||||
std::vector<uint32_t> nodes;
|
||||
|
@ -20738,7 +20740,7 @@ int llama_rebuild_topo(llama_context * ctx,
|
|||
}
|
||||
|
||||
// notify next connect node
|
||||
if(!ctx->next_node_ip.empty() && is_not_exit){
|
||||
if (!ctx->next_node_ip.empty() && is_not_exit) {
|
||||
GGML_ASSERT(ctx->send_socket != nullptr);
|
||||
try {
|
||||
auto msgs = topohelper_to_messages(topo_helper, n_world);
|
||||
|
@ -20749,15 +20751,15 @@ int llama_rebuild_topo(llama_context * ctx,
|
|||
}
|
||||
}
|
||||
|
||||
if(n_layer_window[my_rank] > 0){
|
||||
if (n_layer_window[my_rank] > 0) {
|
||||
*node_type = NodeType::NODE_TYPE_WORKER;
|
||||
}else if (topo_helper[my_rank].is_forwarder == 1){
|
||||
} else if (topo_helper[my_rank].is_forwarder == 1) {
|
||||
*node_type = NodeType::NODE_TYPE_FORWARDER;
|
||||
}else{
|
||||
} else {
|
||||
*node_type = NodeType::NODE_TYPE_EXIT;
|
||||
}
|
||||
|
||||
if(ctx->send_socket != nullptr && *node_type!=NodeType::NODE_TYPE_EXIT){
|
||||
|
||||
if (ctx->send_socket != nullptr && *node_type != NodeType::NODE_TYPE_EXIT) {
|
||||
// recv the whole view of all nodes
|
||||
std::vector<zmq::message_t> msgs;
|
||||
if (!zmq::recv_multipart(*ctx->recv_socket, std::back_inserter(msgs))) {
|
||||
|
@ -20768,7 +20770,7 @@ int llama_rebuild_topo(llama_context * ctx,
|
|||
topo_helper[i].deserialize((char *)msgs[i].data());
|
||||
}
|
||||
// broadcast the whole view
|
||||
if(next_connect_rank!=0){
|
||||
if (next_connect_rank!=0) {
|
||||
try {
|
||||
zmq::send_multipart(*ctx->send_socket, msgs);
|
||||
} catch (const zmq::error_t& e) {
|
||||
|
@ -20777,7 +20779,7 @@ int llama_rebuild_topo(llama_context * ctx,
|
|||
}
|
||||
}
|
||||
}
|
||||
for(size_t i = 0; i < n_world; i++) {
|
||||
for (size_t i = 0; i < n_world; i++) {
|
||||
is_forwarder[i] = topo_helper[i].is_forwarder;
|
||||
}
|
||||
ctx->cparams.node_type = *node_type;
|
||||
|
@ -20896,6 +20898,8 @@ struct llama_context * llama_new_context_with_model(
|
|||
|
||||
ctx->master_ip = params.master_ip;
|
||||
ctx->next_node_ip = params.next_node_ip;
|
||||
ctx->data_port = params.data_port;
|
||||
ctx->signal_port = params.signal_port;
|
||||
ctx->cparams.n_world = params.n_world;
|
||||
ctx->cparams.rank = params.rank;
|
||||
ctx->cparams.force = params.force;
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
#include <arpa/inet.h>
|
||||
#include <unistd.h>
|
||||
|
||||
bool isPortOpen(const std::string& ip, uint32_t port, int timeout_sec) {
|
||||
bool is_port_open(const std::string& ip, uint32_t port, int timeout_sec) {
|
||||
int sock = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (sock < 0) return false;
|
||||
|
||||
|
|
|
@ -4,4 +4,4 @@
|
|||
|
||||
typedef unsigned int uint32_t;
|
||||
|
||||
bool isPortOpen(const std::string& ip, uint32_t port, int timeout_sec = 2);
|
||||
bool is_port_open(const std::string& ip, uint32_t port, int timeout_sec = 2);
|
Loading…
Add table
Reference in a new issue