add mmap prefetch and unloading

This commit is contained in:
Lizonghang 2024-10-25 16:33:56 +04:00
parent ba5117581e
commit c97ea10617
7 changed files with 161 additions and 11 deletions

View file

@ -1938,7 +1938,7 @@ struct llama_mmap {
if (prefetch > 0) {
// advise the kernel to preload the mapped memory
if (posix_madvise(addr, std::min(file->size, prefetch), POSIX_MADV_WILLNEED)) {
if (posix_madvise(addr, std::min(file->size, prefetch), POSIX_MADV_WILLNEED)) {
LLAMA_LOG_WARN("warning: posix_madvise(.., POSIX_MADV_WILLNEED) failed: %s\n",
strerror(errno));
}
@ -1968,6 +1968,10 @@ struct llama_mmap {
if (*last <= *first) {
*last = *first;
}
GGML_ASSERT(*first % page_size == 0);
GGML_ASSERT(*last % page_size == 0);
GGML_ASSERT(*last >= *first);
}
// partially unmap the file in the range [first, last)
@ -2562,6 +2566,7 @@ struct llama_cparams {
uint32_t n_world;
uint32_t rank;
uint32_t n_layer_window;
bool unload;
uint32_t n_ctx; // context size used during inference
uint32_t n_batch;
uint32_t n_ubatch;
@ -8935,7 +8940,7 @@ static bool llm_load_tensors(
}
}
ml.init_mappings(true, use_mlock ? &model.mlock_mmaps : nullptr);
ml.init_mappings(false, use_mlock ? &model.mlock_mmaps : nullptr); // disable prefetch here
model.mappings.reserve(ml.mappings.size());
// create the backend buffers
@ -17286,14 +17291,14 @@ static void llama_send_tensors(zmq::socket_t & socket, struct input_tensors * te
}
static int llama_recv_meta(zmq::socket_t & socket, struct sync_meta * meta) {
socket.setsockopt(ZMQ_RCVTIMEO, 1000);
socket.set(zmq::sockopt::rcvtimeo, 1000);
std::vector<zmq::message_t> recv_msgs;
if (!zmq::recv_multipart(socket, std::back_inserter(recv_msgs))) {
return -1;
}
socket.setsockopt(ZMQ_RCVTIMEO, -1);
socket.set(zmq::sockopt::rcvtimeo, -1);
for (size_t i = 0; i < recv_msgs.size(); i += 2) {
std::string key = recv_msgs[i].to_string();
@ -17335,6 +17340,106 @@ static void llama_recv_tensors(zmq::socket_t & socket, input_tensors * tensors)
}
}
static bool is_tensor_loaded(struct ggml_tensor * tensor) {
void * addr = (void *)tensor->data;
size_t size = ggml_nbytes(tensor);
#ifdef _WIN32
MEMORY_BASIC_INFORMATION mbi;
size_t bytes_checked = 0;
while (bytes_checked < size) {
if (VirtualQuery((char *)addr + bytes_checked, &mbi, sizeof(mbi)) == 0) {
LLAMA_LOG_ERROR("VirtualQuery failed\n");
return false;
}
if (mbi.State != MEM_COMMIT) {
return false; // memory not loaded
}
bytes_checked += mbi.RegionSize;
}
return true;
#else
size_t first = (size_t)addr;
size_t last = first + size;
long page_size = sysconf(_SC_PAGESIZE);
// align addr
llama_mmap::align_range(&first, &last, page_size);
size_t len = std::max(last - first, static_cast<size_t>(page_size));
// calculate the number of pages to check
size_t page_count = (len + page_size - 1) / page_size;
#ifdef __APPLE__
char * mincore_res = new char[page_count];
#else
unsigned char *mincore_res = new unsigned char[page_count]; // use 'unsigned char' for Linux
#endif
// call mincore to check if pages are resident in memory
if (mincore((void *)first, len, mincore_res) == 0) {
for (size_t i = 0; i < page_count; ++i) {
if (!(mincore_res[i] & 1)) {
delete[] mincore_res;
return false; // page not loaded
}
}
delete[] mincore_res;
return true; // page loaded
} else {
LLAMA_LOG_ERROR("mincore failed\n");
delete[] mincore_res;
return false;
}
#endif
}
static float is_graph_loaded(struct ggml_cgraph * cgraph) {
uint32_t n_loaded = 0;
uint32_t n_total = 0;
for (int i = 0; i < ggml_graph_n_leafs(cgraph); i++) {
struct ggml_tensor * cur = ggml_graph_leaf(cgraph, i);
if (strstr(cur->name, "weight") == nullptr || cur->data == nullptr) {
continue;
}
if (is_tensor_loaded(cur)) n_loaded++;
n_total++;
}
return float(n_loaded) / float(n_total) * 100.0f;
}
static void manage_graph_tensors(struct ggml_cgraph * cgraph, int advice, bool force = false) {
for (int i = 0; i < ggml_graph_n_leafs(cgraph); i++) {
struct ggml_tensor * cur = ggml_graph_leaf(cgraph, i);
if (strstr(cur->name, "weight") == nullptr || cur->data == nullptr) {
continue;
}
void * addr = (void *)cur->data;
size_t size = ggml_nbytes(cur);
size_t first = (size_t)addr;
size_t last = first + size;
long page_size = sysconf(_SC_PAGESIZE);
// align addr
llama_mmap::align_range(&first, &last, page_size);
size_t len = std::max(last - first, static_cast<size_t>(page_size));
// hint to load memory
posix_madvise((void *)first, len, advice);
// if advice is POSIX_MADV_WILLNEED, force to prefetch data
if (force && advice == POSIX_MADV_WILLNEED) {
volatile char * ptr = (volatile char *)first;
for (size_t off = 0; off < len; off += page_size) {
volatile char data = ptr[off];
(void)data;
}
}
}
}
// decode a batch of tokens by evaluating the transformer
//
// - lctx: llama context
@ -17533,6 +17638,8 @@ static int llama_decode_internal(
ggml_backend_sched_alloc_graph(lctx.sched[i], gf[i]);
}
ggml_cgraph * sub_gf = nullptr;
ggml_cgraph * next_gf = nullptr;
const uint32_t n_layer = hparams.n_layer;
const char * layer_str = nullptr;
int cur_l = -1;
@ -17541,7 +17648,8 @@ static int llama_decode_internal(
GGML_ASSERT(my_rank == 0 || n_world > 1);
for (size_t i = 0; i < (size_t)gf.size(); ++i) {
ggml_cgraph * sub_gf = gf[i];
sub_gf = gf[i];
next_gf = gf[(i + 1) % gf.size()];
if (n_world > 1 && !(my_rank == 0 && i == 0) && !(my_rank == 0 && is_last_l)) {
{ // receive data from previous nodes
@ -17600,6 +17708,17 @@ static int llama_decode_internal(
zmq::socket_t * s = is_to_master ? lctx.master_socket : lctx.send_socket;
llama_send_tensors(*s, &tensors);
}
// overlap memory scheduling with other nodes' communication and computing
if (cparams.unload) {
timer(manage_graph_tensors);
if (n_world != 1) {
manage_graph_tensors(sub_gf, POSIX_MADV_DONTNEED);
if (!(my_rank == 0 && is_last_l)) {
manage_graph_tensors(next_gf, POSIX_MADV_WILLNEED, true);
}
}
}
}
// update the kv ring buffer
@ -19314,6 +19433,7 @@ struct llama_context_params llama_context_default_params() {
/*.n_world =*/ 1,
/*.rank =*/ 0,
/*.n_layer_window =*/ 32,
/*.unload =*/ false,
/*.master_ip =*/ nullptr,
/*.next_node_ip =*/ nullptr,
/*.n_ctx =*/ 512,
@ -19617,6 +19737,7 @@ struct llama_context * llama_new_context_with_model(
cparams.n_world = params.n_world;
cparams.rank = params.rank;
cparams.n_layer_window = params.n_layer_window;
cparams.unload = params.unload;
cparams.n_seq_max = std::max(1u, params.n_seq_max);
cparams.n_threads = params.n_threads;
cparams.n_threads_batch = params.n_threads_batch;
@ -19995,6 +20116,9 @@ struct llama_context * llama_new_context_with_model(
std::vector<ggml_cgraph *> gf = llama_build_graph(*ctx, ubatch, true);
ctx->sched.resize(gf.size());
// prefetch the first subgraph weights
manage_graph_tensors(gf.front(), POSIX_MADV_WILLNEED, true);
// initialize scheduler with the worst-case graph
bool ok = true;