diff --git a/.gitmodules b/.gitmodules
index 68242f3..0716ec2 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -4,3 +4,19 @@
[submodule "third_party/pybind11"]
path = third_party/pybind11
url = https://github.com/pybind/pybind11.git
+
+[submodule "third_party/spdlog"]
+ path = third_party/spdlog
+ url = https://github.com/gabime/spdlog.git
+[submodule "third_party/custom_flashinfer"]
+ path = third_party/custom_flashinfer
+ url = https://github.com/kvcache-ai/custom_flashinfer.git
+ branch = fix-precision-mla-merge-main
+
+[submodule "third_party/xxHash"]
+ path = third_party/xxHash
+ url = https://github.com/Cyan4973/xxHash.git
+
+[submodule "third_party/prometheus-cpp"]
+ path = third_party/prometheus-cpp
+ url = https://github.com/jupp0r/prometheus-cpp
diff --git a/MANIFEST.in b/MANIFEST.in
index dac9b32..4097ce6 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -1,6 +1,7 @@
graft third_party
graft ktransformers
graft local_chat.py
+graft csrc
include LICENSE README.md
prune ktransformers/website
prune ktransformers/logs
@@ -9,3 +10,4 @@ prune third_party/llama.cpp/models
graft ktransformers/website/dist
global-exclude __pycache__
include KTransformersOps.*.so
+include cpuinfer_ext.*.so
diff --git a/Makefile b/Makefile
index 8349809..74cb3c9 100644
--- a/Makefile
+++ b/Makefile
@@ -29,4 +29,4 @@ clean:
install_numa:
USE_NUMA=1 make dev_install
install_no_numa:
- env -u USE_NUMA make dev_install
+ env -u USE_NUMA make dev_install
\ No newline at end of file
diff --git a/README.md b/README.md
index 63728d2..d90caf2 100644
--- a/README.md
+++ b/README.md
@@ -23,17 +23,20 @@ Our vision for KTransformers is to serve as a flexible platform for experimentin
@@ -45,16 +48,16 @@ https://github.com/user-attachments/assets/ebd70bfa-b2c1-4abb-ae3b-296ed38aa285
- **[NEW!!!] Local 671B DeepSeek-Coder-V3/R1:** Running its Q4_K_M version using only 14GB VRAM and 382GB DRAM([Tutorial](./doc/en/DeepseekR1_V3_tutorial.md)).
- - Prefill Speed (tokens/s):
- - KTransformers: 54.21 (32 cores) → 74.362 (dual-socket, 2×32 cores) → 255.26 (optimized AMX-based MoE kernel, V0.3 only) → 286.55 (selectively using 6 experts, V0.3 only)
- - Compared to 10.31 tokens/s in llama.cpp with 2×32 cores, achieving up to **27.79× speedup**.
- - Decode Speed (tokens/s):
- - KTransformers: 8.73 (32 cores) → 11.26 (dual-socket, 2×32 cores) → 13.69 (selectively using 6 experts, V0.3 only)
- - Compared to 4.51 tokens/s in llama.cpp with 2×32 cores, achieving up to **3.03× speedup**.
- - Upcoming Open Source Release:
- - AMX optimizations and selective expert activation will be open-sourced in V0.3.
- - Currently available only in preview binary distribution, which can be downloaded [here](./doc/en/DeepseekR1_V3_tutorial.md).
+ - Prefill Speed (tokens/s):
+ - KTransformers: 54.21 (32 cores) → 74.362 (dual-socket, 2×32 cores) → 255.26 (optimized AMX-based MoE kernel, V0.3 only) → 286.55 (selectively using 6 experts, V0.3 only)
+ - Compared to 10.31 tokens/s in llama.cpp with 2×32 cores, achieving up to **27.79× speedup**.
+ - Decode Speed (tokens/s):
+ - KTransformers: 8.73 (32 cores) → 11.26 (dual-socket, 2×32 cores) → 13.69 (selectively using 6 experts, V0.3 only)
+ - Compared to 4.51 tokens/s in llama.cpp with 2×32 cores, achieving up to **3.03× speedup**.
+ - Upcoming Open Source Release:
+ - AMX optimizations and selective expert activation will be open-sourced in V0.3.
+ - Currently available only in preview binary distribution, which can be downloaded [here](./doc/en/DeepseekR1_V3_tutorial.md).
- **Local 236B DeepSeek-Coder-V2:** Running its Q4_K_M version using only 21GB VRAM and 136GB DRAM, attainable on a local desktop machine, which scores even better than GPT4-0613 in [BigCodeBench](https://huggingface.co/blog/leaderboard-bigcodebench).
@@ -96,19 +99,16 @@ https://github.com/user-attachments/assets/a865e5e4-bca3-401e-94b8-af3c080e6c12
* **Flexible Sparse Attention Framework**: Offers a flexible block sparse attention framework for CPU offloaded decoding. Compatible with SnapKV, Quest, and InfLLm. Further information is available [here](./doc/en/long_context_introduction.md).
-->
-
More advanced features will coming soon, so stay tuned!
🚀 Quick Start
-
Getting started with KTransformers is simple! Follow the steps below to set up and start using it.
### 📥 Installation
To install KTransformers, follow the official [Installation Guide](https://kvcache-ai.github.io/ktransformers/en/install.html).
-
📃 Brief Injection Tutorial
At the heart of KTransformers is a user-friendly, template-based injection framework.
This allows researchers to easily replace original torch modules with optimized variants. It also simplifies the process of combining multiple optimizations, allowing the exploration of their synergistic effects.
@@ -167,7 +167,6 @@ The development of KTransformer is based on the flexible and versatile framework
KTransformer is actively maintained and developed by contributors from the
MADSys group at Tsinghua University and members from
Approaching.AI. We welcome new contributors to join us in making KTransformer faster and easier to use.
-
Discussion
If you have any questions, feel free to open an issue. Alternatively, you can join our WeChat group for further discussion. QR Code: [WeChat Group](WeChatGroup.png)
diff --git a/csrc/balance_serve/CMakeLists.txt b/csrc/balance_serve/CMakeLists.txt
new file mode 100644
index 0000000..e9b6072
--- /dev/null
+++ b/csrc/balance_serve/CMakeLists.txt
@@ -0,0 +1,69 @@
+
+cmake_minimum_required(VERSION 3.21)
+find_program(GCC_COMPILER NAMES g++-13 g++-12 g++-11 REQUIRED)
+set(CMAKE_CXX_COMPILER ${GCC_COMPILER})
+
+# 显示选定的编译器
+message(STATUS "Using compiler: ${CMAKE_CXX_COMPILER}")
+
+
+project(balance_serve VERSION 0.1.0)
+
+set(CMAKE_CXX_STANDARD 20)
+# set(CMAKE_CXX_FLAGS "-Og -march=native -Wall -Wextra -g -fPIC")
+# set(CMAKE_BUILD_TYPE "Debug")
+set(CMAKE_CXX_FLAGS "-O3 -march=native -Wall -Wextra -fPIC")
+set(CMAKE_BUILD_TYPE "Release")
+
+file(GLOB_RECURSE FMT_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/*.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/*.hpp" "${CMAKE_CURRENT_SOURCE_DIR}/*.h")
+
+add_custom_target(
+ format
+ COMMAND clang-format
+ -i
+ -style=file
+ ${FMT_SOURCES}
+ COMMENT "Running clang-format on all source files"
+)
+
+
+
+add_definitions(-D_GLIBCXX_USE_CXX11_ABI=0)
+set(BUILD_SHARED_LIBS ON)
+set(ENABLE_PUSH OFF)
+set(ENABLE_COMPRESSION OFF)
+
+# set(CMAKE_BUILD_TYPE "Release")
+set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
+
+set(THIRD_PARTY_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../third_party)
+set(THIRD_PARTY_BUILD_DIR ${CMAKE_CURRENT_BINARY_DIR}/third_party)
+add_subdirectory(${THIRD_PARTY_DIR}/prometheus-cpp ${THIRD_PARTY_BUILD_DIR}/prometheus-cpp EXCLUDE_FROM_ALL)
+add_subdirectory(${THIRD_PARTY_DIR}/xxHash/cmake_unofficial ${THIRD_PARTY_BUILD_DIR}/xxHash EXCLUDE_FROM_ALL)
+
+# add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/third_party/prometheus-cpp ${CMAKE_CURRENT_BINARY_DIR}/third_party/prometheus-cpp)
+set(SPDLOG_DIR ${THIRD_PARTY_DIR}/spdlog)
+set(FMT_DIR ${THIRD_PARTY_DIR}/fmt)
+
+set(KVC2_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/kvc2/src)
+
+include_directories(${THIRD_PARTY_DIR})
+
+add_subdirectory(${THIRD_PARTY_DIR}/pybind11 ${THIRD_PARTY_BUILD_DIR}/pybind11)
+
+execute_process(
+ COMMAND python3 -c "import torch; print(torch.__path__[0])"
+ OUTPUT_VARIABLE TORCH_INSTALL_PREFIX
+ OUTPUT_STRIP_TRAILING_WHITESPACE
+)
+
+message(STATUS "Found PyTorch at: ${TORCH_INSTALL_PREFIX}")
+
+# set(TORCH_INSTALL_PREFIX "/home/xwy/.conda/envs/kvc/lib/python3.12/site-packages/torch")
+find_library(TORCH_PYTHON_LIBRARY torch_python PATH "${TORCH_INSTALL_PREFIX}/lib")
+find_package(Torch REQUIRED PATHS "${TORCH_INSTALL_PREFIX}/share/cmake/Torch" NO_DEFAULT_PATH)
+
+add_subdirectory(kvc2)
+add_subdirectory(sched)
+
+# add_subdirectory(test)
diff --git a/csrc/balance_serve/kvc2/.clang-format b/csrc/balance_serve/kvc2/.clang-format
new file mode 100644
index 0000000..752070f
--- /dev/null
+++ b/csrc/balance_serve/kvc2/.clang-format
@@ -0,0 +1,25 @@
+Language: Cpp
+# 格式化风格,可以是LLVM, Google, Chromium, Mozilla, WebKit等,或者自定义
+BasedOnStyle: Google
+
+# 缩进设置
+IndentWidth: 2
+TabWidth: 2
+UseTab: Never
+
+# 换行相关设置
+BreakBeforeBraces: Attach
+AllowShortIfStatementsOnASingleLine: false
+AllowShortFunctionsOnASingleLine: Inline
+AllowShortLoopsOnASingleLine: false
+
+# 类与结构体
+DerivePointerAlignment: false
+PointerAlignment: Left
+
+# 包含文件的排序和分组
+IncludeBlocks: Preserve
+SortIncludes: true
+
+# 控制最大行宽
+ColumnLimit: 120
diff --git a/csrc/balance_serve/kvc2/CMakeLists.txt b/csrc/balance_serve/kvc2/CMakeLists.txt
new file mode 100644
index 0000000..4238f15
--- /dev/null
+++ b/csrc/balance_serve/kvc2/CMakeLists.txt
@@ -0,0 +1,104 @@
+cmake_minimum_required(VERSION 3.21)
+
+find_program(GCC_COMPILER NAMES g++-13 g++-12 g++-11 REQUIRED)
+set(CMAKE_CXX_COMPILER ${GCC_COMPILER})
+
+project(kvcache-manager VERSION 0.1.0)
+
+set(CMAKE_CXX_STANDARD 20)
+
+# set(CMAKE_CXX_FLAGS "-fPIC -O3 -ffast-math -march=native -Wall -Wextra -Wpedantic -fvisibility=hidden -s")
+# set(CMAKE_CXX_FLAGS "-Og -march=native -Wall -Wextra -Wpedantic -g -fsanitize=address")
+# set(CMAKE_CXX_FLAGS "-march=native -Wall -Wextra -Wpedantic -g")
+# set(CMAKE_CXX_FLAGS "-fPIC -O3 -ffast-math -march=native -Wall -Wextra -g")
+# set(CMAKE_BUILD_TYPE "Release")
+set(CMAKE_BUILD_TYPE "Debug")
+set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
+set(BUILD_TEST OFF)
+set(BUILD_PYTHON_EXT OFF)
+
+add_definitions(-D_GLIBCXX_USE_CXX11_ABI=0)
+
+# set(USE_IO_URING ON)
+if(USE_IO_URING)
+ message(STATUS "Using io_uring")
+ add_compile_definitions(USE_IO_URING)
+else()
+ message(STATUS "Using aio")
+endif()
+
+file(GLOB_RECURSE ALL_SOURCE_FILES src/*.cpp src/*.h test/*.cpp test/*.h test/*.hpp)
+
+# 添加一个自定义目标来格式化所有代码
+if(NOT TARGET format)
+ add_custom_target(
+ format
+ COMMAND clang-format
+ -i
+ -style=file
+ ${ALL_SOURCE_FILES}
+ COMMENT "Running clang-format on all source files"
+ )
+endif()
+
+execute_process(
+ COMMAND python3 -c "import torch; print(torch.__path__[0])"
+ OUTPUT_VARIABLE TORCH_INSTALL_PREFIX
+ OUTPUT_STRIP_TRAILING_WHITESPACE
+)
+
+message(STATUS "Found PyTorch at: ${TORCH_INSTALL_PREFIX}")
+
+# set(TORCH_INSTALL_PREFIX "/home/xwy/.conda/envs/kvc/lib/python3.12/site-packages/torch")
+find_library(TORCH_PYTHON_LIBRARY torch_python PATH "${TORCH_INSTALL_PREFIX}/lib")
+find_package(Torch REQUIRED PATHS "${TORCH_INSTALL_PREFIX}/share/cmake/Torch" NO_DEFAULT_PATH)
+
+include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../../third_party)
+
+find_package(TBB REQUIRED)
+find_package(CUDA REQUIRED)
+
+# find_package(prometheus-cpp CONFIG REQUIRED)
+if(NOT TARGET prometheus-cpp::pull)
+ message(FATAL_ERROR "prometheus-cpp::pull not found")
+else()
+ message(STATUS "prometheus Found!")
+endif()
+
+if(CUDA_FOUND)
+ message(STATUS "CUDA Found!")
+ message(STATUS "CUDA Version: ${CUDA_VERSION_STRING}")
+ message(STATUS "CUDA Toolkit Root: ${CUDA_TOOLKIT_ROOT_DIR}")
+else()
+ message(FATAL_ERROR "CUDA not found!")
+endif()
+
+add_subdirectory(src)
+
+if(BUILD_TEST)
+ add_subdirectory(test)
+endif()
+
+message(STATUS "BUILD_PYTHON_EXT: ${BUILD_PYTHON_EXT}")
+
+if(BUILD_PYTHON_EXT)
+ if(NOT TARGET pybind11::pybind11)
+ add_subdirectory(${THIRD_PARTY_DIR}/pybind11 ${THIRD_PARTY_BUILD_DIR}/pybind11)
+ endif()
+
+ pybind11_add_module(kvc2_ext src/bind.cpp)
+
+ # EXAMPLE_VERSION_INFO is defined by setup.py and passed into the C++ code as a
+ # define (VERSION_INFO) here.
+ target_compile_definitions(kvc2_ext PRIVATE VERSION_INFO=${EXAMPLE_VERSION_INFO})
+ message(STATUS "CMAKE_CURRENT_SOURCE_DIR: ${CMAKE_CURRENT_SOURCE_DIR}")
+ target_include_directories(kvc2_ext PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../../third_party/spdlog/include)
+
+ target_link_libraries(kvc2_ext PUBLIC kvc2 async_store)
+
+ install(TARGETS kvc2_ext LIBRARY
+ DESTINATION ${CMAKE_BINARY_DIR}/output)
+ install(FILES src/kvc2_utils.py
+ DESTINATION ${CMAKE_BINARY_DIR}/output)
+endif()
+
diff --git a/csrc/balance_serve/kvc2/README.md b/csrc/balance_serve/kvc2/README.md
new file mode 100644
index 0000000..e4c4745
--- /dev/null
+++ b/csrc/balance_serve/kvc2/README.md
@@ -0,0 +1,38 @@
+# KVC2
+
+# Build
+运行以下命令编译kvc2,注意可能需要 sudo 权限安装一些依赖
+```shell
+git clone https://github.com/kvcache-ai/kvc2
+cd kvc2
+./install_deps.sh
+mkdir build
+cd build
+cmake ..
+make -j && make install
+```
+编译完成后会生成`build/output`,包含`kvc2_ext.cpython-312-x86_64-linux-gnu.so`和`kvc2_utils.py`方便调用。
+
+
+
+# Troubleshooting
+在 Python 环境运行时,可以需要在 conda 中安装相关的依赖。
+```shell
+conda install -c conda-forge gcc_linux-64 gxx_linux-64
+```
+
+也可以尝试设置一下环境变量,然后再运行。
+```shell
+export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
+export LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libffi.so.7
+```
+
+
diff --git a/csrc/balance_serve/kvc2/config/model_configs.json b/csrc/balance_serve/kvc2/config/model_configs.json
new file mode 100644
index 0000000..4d8195a
--- /dev/null
+++ b/csrc/balance_serve/kvc2/config/model_configs.json
@@ -0,0 +1,42 @@
+{
+ "DeepSeek-Coder-V2-Instruct": {
+ "hidden_size": 5120,
+ "intermediate_size": 12288,
+ "max_position_embeddings": 163840,
+ "model_type": "deepseek_v2",
+ "num_attention_heads": 128,
+ "num_hidden_layers": 60,
+ "num_key_value_heads": 128,
+ "vocab_size": 102400
+ },
+ "LLaMA-2-7B-32K": {
+ "hidden_size": 4096,
+ "intermediate_size": 11008,
+ "max_position_embeddings": 32768,
+ "model_type": "llama",
+ "num_attention_heads": 32,
+ "num_hidden_layers": 32,
+ "num_key_value_heads": 32,
+ "vocab_size": 32000
+ },
+ "Qwen2.5-7B-Instruct": {
+ "hidden_size": 3584,
+ "intermediate_size": 18944,
+ "max_position_embeddings": 32768,
+ "model_type": "qwen2",
+ "num_attention_heads": 28,
+ "num_hidden_layers": 28,
+ "num_key_value_heads": 4,
+ "vocab_size": 152064
+ },
+ "qwen2-72b-instruct": {
+ "hidden_size": 8192,
+ "intermediate_size": 29568,
+ "max_position_embeddings": 32768,
+ "model_type": "qwen2",
+ "num_attention_heads": 64,
+ "num_hidden_layers": 80,
+ "num_key_value_heads": 8,
+ "vocab_size": 152064
+ }
+}
\ No newline at end of file
diff --git a/csrc/balance_serve/kvc2/config/quant_configs.json b/csrc/balance_serve/kvc2/config/quant_configs.json
new file mode 100644
index 0000000..191df5a
--- /dev/null
+++ b/csrc/balance_serve/kvc2/config/quant_configs.json
@@ -0,0 +1,57 @@
+{
+ "BF16": {
+ "block_element_count": 1,
+ "block_element_size": 2,
+ "bytes_per_element": 2.0,
+ "can_be_used_as_vector": true,
+ "has_min": false,
+ "has_scale": false,
+ "name": "BF16",
+ "reference": "",
+ "type_of_dot_vector": "BF16"
+ },
+ "FP16": {
+ "block_element_count": 1,
+ "block_element_size": 2,
+ "bytes_per_element": 2.0,
+ "can_be_used_as_vector": true,
+ "has_min": false,
+ "has_scale": false,
+ "name": "FP16",
+ "reference": "",
+ "type_of_dot_vector": "FP16"
+ },
+ "FP32": {
+ "block_element_count": 1,
+ "block_element_size": 4,
+ "bytes_per_element": 4.0,
+ "can_be_used_as_vector": true,
+ "has_min": false,
+ "has_scale": false,
+ "name": "FP32",
+ "reference": "",
+ "type_of_dot_vector": "FP32"
+ },
+ "Q4_0": {
+ "block_element_count": 32,
+ "block_element_size": 18,
+ "bytes_per_element": 0.5625,
+ "can_be_used_as_vector": false,
+ "has_min": false,
+ "has_scale": true,
+ "name": "Q4_0",
+ "reference": "https://huggingface.co/docs/hub/gguf",
+ "type_of_dot_vector": "Q8_0"
+ },
+ "Q8_0": {
+ "block_element_count": 32,
+ "block_element_size": 34,
+ "bytes_per_element": 1.0625,
+ "can_be_used_as_vector": true,
+ "has_min": false,
+ "has_scale": true,
+ "name": "Q8_0",
+ "reference": "https://huggingface.co/docs/hub/gguf",
+ "type_of_dot_vector": "Q8_0"
+ }
+}
\ No newline at end of file
diff --git a/csrc/balance_serve/kvc2/export_envs_before_run.sh b/csrc/balance_serve/kvc2/export_envs_before_run.sh
new file mode 100755
index 0000000..b2fcd9b
--- /dev/null
+++ b/csrc/balance_serve/kvc2/export_envs_before_run.sh
@@ -0,0 +1,2 @@
+export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
+export LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libffi.so.7
diff --git a/csrc/balance_serve/kvc2/install_deps.sh b/csrc/balance_serve/kvc2/install_deps.sh
new file mode 100755
index 0000000..336a32a
--- /dev/null
+++ b/csrc/balance_serve/kvc2/install_deps.sh
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+cd "${0%/*}"
+git submodule update --init --recursive
+
+sudo apt update
+sudo apt install libtbb-dev
+sudo apt install libcurl4-openssl-dev
+sudo apt install libaio-dev
+
+cd third_party/xxHash/
+make -j
+sudo make install
+cd ../..
+
diff --git a/csrc/balance_serve/kvc2/mkfs.sh b/csrc/balance_serve/kvc2/mkfs.sh
new file mode 100755
index 0000000..aadcb02
--- /dev/null
+++ b/csrc/balance_serve/kvc2/mkfs.sh
@@ -0,0 +1,4 @@
+sudo umount /mnt/xwy
+sudo mkfs.xfs /dev/nvme0n1 -f
+sudo mount /dev/nvme0n1 /mnt/xwy
+sudo chown -R xwy /mnt/xwy/
\ No newline at end of file
diff --git a/csrc/balance_serve/kvc2/src/CMakeLists.txt b/csrc/balance_serve/kvc2/src/CMakeLists.txt
new file mode 100644
index 0000000..98ea626
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/CMakeLists.txt
@@ -0,0 +1,45 @@
+include_directories(${THIRD_PARTY_DIR}/asyncio/include)
+
+add_library(kvc2_metrics STATIC metrics.cpp)
+target_link_libraries(kvc2_metrics PUBLIC prometheus-cpp::pull)
+
+add_library(page_aligned_memory_pool page_aligned_memory_pool.cpp)
+target_include_directories(page_aligned_memory_pool PRIVATE ${THIRD_PARTY_DIR}/spdlog/include)
+
+function(add_third_party_includes TARGET_NAME)
+ target_include_directories(${TARGET_NAME} PRIVATE
+ ${THIRD_PARTY_BUILD_DIR}/prometheus-cpp/core/include
+ ${THIRD_PARTY_BUILD_DIR}/prometheus-cpp/pull/include
+ ${THIRD_PARTY_DIR}/prometheus-cpp/core/include
+ ${THIRD_PARTY_DIR}/prometheus-cpp/pull/include
+ ${THIRD_PARTY_DIR}/spdlog/include
+ )
+endfunction()
+
+
+add_library(cache_entry cache_entry.cpp)
+add_third_party_includes(cache_entry)
+target_link_libraries(cache_entry PUBLIC gpu_cache)
+
+add_library(gpu_cache gpu_cache.cpp)
+add_third_party_includes(gpu_cache)
+target_link_libraries(gpu_cache PUBLIC xxHash::xxhash ${TORCH_LIBRARIES} cuda_stream_manager)
+
+add_library(kvc2 prefix.cpp)
+target_include_directories(kvc2 PRIVATE ${THIRD_PARTY_DIR}/nlohmann/single_include)
+add_third_party_includes(kvc2)
+target_link_libraries(kvc2 PUBLIC TBB::tbb xxHash::xxhash cache_entry cuda_stream_manager page_aligned_memory_pool ${TORCH_LIBRARIES} prometheus-cpp::pull kvc2_metrics)
+
+message(STATUS "CMAKE_SOURCE_DIR: " ${CMAKE_SOURCE_DIR})
+add_library(async_store async_store.cpp)
+target_include_directories(async_store PRIVATE ${THIRD_PARTY_DIR}/nlohmann/single_include)
+target_include_directories(async_store PRIVATE ${THIRD_PARTY_DIR}/spdlog/include)
+target_link_libraries(async_store PUBLIC pthread)
+
+
+
+add_library(cuda_stream_manager cuda_stream_manager.cpp)
+target_include_directories(cuda_stream_manager PUBLIC ${THIRD_PARTY_DIR}/nlohmann/single_include)
+target_include_directories(cuda_stream_manager PUBLIC ${THIRD_PARTY_DIR}/spdlog/include)
+target_include_directories(cuda_stream_manager PUBLIC ${CUDAToolkit_INCLUDE_DIRS})
+target_link_libraries(cuda_stream_manager PUBLIC CUDA::cudart)
diff --git a/csrc/balance_serve/kvc2/src/async_store.cpp b/csrc/balance_serve/kvc2/src/async_store.cpp
new file mode 100644
index 0000000..b5400c9
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/async_store.cpp
@@ -0,0 +1,137 @@
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "utils/lock_free_queue.hpp"
+
+#include "async_store.hh"
+
+namespace async_store {
+
+struct ArrayStore {
+ static const size_t DeviceBlockSize = 512;
+
+ const size_t element_size;
+ const size_t element_size_aligned;
+
+ size_t size;
+
+ size_t size_in_bytes() { return size * element_size_aligned; }
+
+ std::filesystem::path data_path;
+
+ void extend(size_t to) {
+ if (to <= size) {
+ return;
+ }
+ //TODO: extend file
+ size = to;
+ //LOG_INFO("Extend file to `, size `", to, size_in_bytes());
+ }
+
+ ArrayStore(size_t element_size, size_t size, std::filesystem::path data_path)
+ : element_size(element_size),
+ element_size_aligned((element_size + DeviceBlockSize - 1) / DeviceBlockSize),
+ data_path(data_path) {
+ //TODO: prefix cache
+ }
+
+ void read(size_t index, void* buffer) {
+ //TODO: read from file
+ }
+ void write(size_t index, void* buffer) {
+ //TODO: write to file
+ }
+};
+
+ArrayStore* create_or_open_store(size_t element_size, size_t size, std::filesystem::path data_path) {
+ return new ArrayStore(element_size, size, data_path);
+}
+
+void close_store(ArrayStore* store) {
+ delete store;
+}
+
+size_t capacity(ArrayStore* store) {
+ return store->size;
+}
+
+void extend(ArrayStore* store, size_t to) {
+ store->extend(to);
+}
+
+template
+struct ArrayStoreT {
+ ArrayStore store;
+ ArrayStoreT(size_t element_count, std::filesystem::path data_path) : store(sizeof(T), element_count, data_path) {}
+
+ void read(size_t index, void* output) { store.read(index, output); }
+
+ void write(size_t index, T& value) { store.write(index, &value); }
+ void write(size_t index, void* value) { store.write(index, value); }
+};
+
+std::string request_to_string(IORequest* req) {
+ return fmt::format("IOReqeust {} {} to {}[{}]", req->write ? "Write" : "Read ", req->data,
+ req->store->data_path.c_str(), req->index);
+}
+
+struct IODealerImpl {
+ MPSCQueue ioQueue;
+ uint64_t io_cnt = 0;
+ size_t io_amount = 0;
+ bool use_io_uring;
+ int IO_DEPTH;
+
+ bool stop = false;
+ IODealerImpl(bool use_io_uring, int IO_DEPTH) : use_io_uring(use_io_uring), IO_DEPTH(IO_DEPTH) {}
+
+ void queue_consumer() {
+ //TODO:
+ }
+
+ void io_perf() {
+ //TODO:
+ }
+
+ void io_dealer() {
+ //TODO:
+ }
+};
+
+IODealer::IODealer(bool use_io_uring, int IO_DEPTH) {
+ io_impl = new IODealerImpl(use_io_uring, IO_DEPTH);
+}
+
+IODealer::~IODealer() {
+ stop();
+ delete io_impl;
+}
+
+void IODealer::enqueue(std::shared_ptr req) {
+ io_impl->ioQueue.enqueue(req);
+}
+
+std::thread IODealer::start_io_thread() {
+ return std::thread([this]() { io_impl->io_dealer(); });
+}
+void IODealer::stop() {
+ if (io_impl->stop) {
+ return;
+ }
+ //LOG_INFO("Stopping IO Dealer");
+ io_impl->stop = true;
+}
+
+} // namespace async_store
diff --git a/csrc/balance_serve/kvc2/src/async_store.hh b/csrc/balance_serve/kvc2/src/async_store.hh
new file mode 100644
index 0000000..046e990
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/async_store.hh
@@ -0,0 +1,51 @@
+#pragma once
+#include
+#include
+
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_DEBUG
+#define FMT_HEADER_ONLY
+#include "spdlog/spdlog.h"
+
+#include "io_helper.hpp"
+
+namespace async_store {
+
+struct ArrayStore;
+
+ArrayStore* create_or_open_store(size_t element_size, size_t size, std::filesystem::path data_path);
+void close_store(ArrayStore* store);
+size_t capacity(ArrayStore* store);
+void extend(ArrayStore* store, size_t to);
+
+
+
+struct IORequest {
+ ArrayStore* store;
+ bool write;
+ void* data;
+ size_t index;
+
+ // for sync
+ bool need_promise = false;
+ BatchPromise* promise;
+};
+
+std::string request_to_string(IORequest* req);
+
+struct IODealerImpl;
+struct IODealer {
+ IODealerImpl* io_impl;
+
+ IODealer(bool use_io_uring = false, int IO_DEPTH = 128);
+ ~IODealer();
+ IODealer(const IODealer&) = delete;
+ IODealer& operator=(const IODealer&) = delete;
+ IODealer(IODealer&&) = default;
+ IODealer& operator=(IODealer&&) = default;
+
+ void enqueue(std::shared_ptr req);
+ std::thread start_io_thread();
+ void stop();
+};
+
+} // namespace async_store
diff --git a/csrc/balance_serve/kvc2/src/bind.cpp b/csrc/balance_serve/kvc2/src/bind.cpp
new file mode 100644
index 0000000..c76bb26
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/bind.cpp
@@ -0,0 +1,53 @@
+// #include
+// #include
+// #include
+// #include
+// #include
+// #include
+// #include "kvc2.h"
+// #define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_DEBUG
+// #define FMT_HEADER_ONLY
+// #include "spdlog/spdlog.h"
+// #include "utils/arithmetic.hpp"
+
+// namespace py = pybind11;
+
+// PYBIND11_MODULE(kvc2_ext, m) {
+// // Bind KVC2Config struct
+// py::class_(m, "KVC2Config")
+// .def(py::init<>())
+// .def_readwrite("path", &kvc2::KVC2Config::path)
+// .def_readwrite("block_length", &kvc2::KVC2Config::num_token_per_page)
+// .def_readwrite("memory_pool_size", &kvc2::KVC2Config::memory_pool_size)
+// .def_readwrite("evict_count", &kvc2::KVC2Config::evict_count);
+
+// // Bind CacheInfo struct
+// py::class_(m, "CacheInfo")
+// .def(py::init<>())
+// .def_readwrite("model_name", &kvc2::CacheInfo::model_name)
+// .def_readwrite("is_key_cache", &kvc2::CacheInfo::is_key_cache)
+// .def_readwrite("quant_type", &kvc2::CacheInfo::quant_type)
+// .def("hidden_layer_count", &kvc2::CacheInfo::hidden_layer_count)
+// .def("path", &kvc2::CacheInfo::path, py::arg("which_layer") = std::nullopt)
+// .def("__eq__", &kvc2::CacheInfo::operator==)
+// .def("element_size", &kvc2::CacheInfo::element_size)
+// .def("hash_value", &kvc2::CacheInfo::hash_value);
+
+// // Bind KVC2HandleInterface class
+// py::class_>(m, "KVC2HandleInterface")
+// .def("matched_length", &kvc2::SingleCacheHandleInterface::matched_length)
+// .def("handle_data", &kvc2::KVC2HandleInterface::handle_data);
+
+// // Bind KVC2Interface class
+// py::class_>(m, "KVC2Interface")
+// .def("start_io_thread", [](kvc2::KVC2Interface& self) { self.start_io_thread(); })
+// .def("stop_io_thread", &kvc2::KVC2Interface::stop_io_thread)
+// .def("load", &kvc2::KVC2Interface::load)
+// .def("save", &kvc2::KVC2Interface::save)
+// .def("raw_insert", &kvc2::KVC2Interface::raw_insert)
+// .def("raw_read", &kvc2::KVC2Interface::raw_read)
+// .def("lookup", &kvc2::KVC2Interface::lookup);
+
+// // Bind create_kvc2 function
+// m.def("create_kvc2", &kvc2::create_kvc2, py::arg("config"));
+// }
\ No newline at end of file
diff --git a/csrc/balance_serve/kvc2/src/cache_entry.cpp b/csrc/balance_serve/kvc2/src/cache_entry.cpp
new file mode 100644
index 0000000..3fe6b0a
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/cache_entry.cpp
@@ -0,0 +1,263 @@
+#include "cache_entry.hh"
+#include
+
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_DEBUG
+#define FMT_HEADER_ONLY
+#include "spdlog/spdlog.h"
+
+#include "gpu_cache.hh"
+
+namespace kvc2 {
+
+bool ConcurrentControlUnit::can_desert() {
+ if (ref_count.load() == 0 && dirty.load() == false) {
+ tc.reset();
+ return true;
+ } else {
+ return false;
+ }
+}
+void ConcurrentControlUnit::debug() {
+ SPDLOG_DEBUG("ref count {}, dirty {}, {}", ref_count.load(), dirty.load(), tc.debug());
+}
+
+CacheBlockEntry::~CacheBlockEntry() {
+ if (data != nullptr && manager && manager->pool) {
+ SPDLOG_WARN("Free {} when destruct", data);
+ free_on_cpu();
+ }
+}
+
+bool CacheBlockEntry::alloc_on_cpu() {
+ assert(data == nullptr);
+ data = manager->pool->alloc(size);
+ if (data == nullptr) {
+ manager->evict_for_cpu_cache();
+ data = manager->pool->alloc(size);
+ if (data == nullptr) {
+ SPDLOG_ERROR("Not enough memory for Block Cache");
+ return false;
+ }
+ }
+ return true;
+}
+
+void CacheBlockEntry::free_on_cpu() {
+ manager->pool->free(data, size);
+ data = nullptr;
+}
+
+bool CacheBlockEntry::alloc_on_cpu_no_lock() {
+ if (data == nullptr) {
+ if (alloc_on_cpu() == false) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool CacheBlockEntry::inc_ref_or_alloc_on_cpu() {
+ std::lock_guard lg(lock);
+ if (data == nullptr) {
+ if (alloc_on_cpu()) {
+ cpu_cc.ref_count.fetch_add(1);
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ cpu_cc.ref_count.fetch_add(1);
+ return true;
+ }
+}
+
+std::unique_lock CacheBlockEntry::try_lock() {
+ return std::unique_lock(lock, std::try_to_lock);
+}
+
+std::lock_guard CacheBlockEntry::lock_guard() {
+ return std::lock_guard(lock);
+}
+
+void CacheBlockEntry::debug() {
+ SPDLOG_DEBUG(
+ "CacheBlockEntry: disk[{:4},{:7}], with key {}, hash {:016x}, data: {}, ref_count: {}, size: {}, cpu tc: {}, "
+ "in page cache: {}, gpu ref count:{}, gpu tc: {}",
+ layer, idx, with_key, hash, data, cpu_cc.ref_count.load(), size, cpu_cc.tc.debug(), manager != nullptr,
+ gpu_cc.ref_count.load(), gpu_cc.tc.debug());
+}
+
+CacheBlockEntryCollector::CacheBlockEntryCollector(std::function exit_fn) : exit_fn(exit_fn) {}
+
+CacheBlockEntryCollector::~CacheBlockEntryCollector() {
+ // SPDLOG_DEBUG("Collector Destruct");
+ for (auto& e : entries) {
+ exit_fn(e);
+ }
+}
+
+void CacheBlockEntry::io_with(async_store::IODealer* dealer, IO_Helper& io_helper,
+ async_store::ArrayStore* store, size_t layer, size_t index, IOOption option) {
+ bool write;
+
+ auto& batch_promise = io_helper.batch_promise;
+
+ switch (option) {
+ case IO_Read: {
+ write = false;
+ if (io_helper.absorb_tc(this, cpu_cc.tc)) {
+ // need read
+ } else {
+ return;
+ }
+ break;
+ }
+ case IO_ForceRead: {
+ // Not change
+ write = false;
+ break;
+ }
+ case IO_ForceWrite: {
+ // Not change
+ write = true;
+ break;
+ }
+ case IO_Write: {
+ write = true;
+ break;
+ }
+ default: {
+ assert(0);
+ }
+ }
+ io_helper.new_task();
+ this->layer = layer;
+ this->idx = index;
+
+ auto req = std::make_shared();
+ req->store = store;
+ req->data = data;
+ req->index = index;
+ req->write = write;
+ req->need_promise = true;
+ req->promise = &batch_promise;
+
+ SPDLOG_TRACE("Submitting {}", async_store::request_to_string(req.get()));
+ dealer->enqueue(std::move(req));
+}
+
+CacheEntryManager::CacheEntryManager(CacheEntryManagerConfig config) : config(config) {}
+
+void CacheEntryManager::evict_for_cpu_cache() {
+ size_t count = 0;
+ evict(
+ [&count](const BlockPtr& block) {
+ // here we assume each with gpu must resides on cpu
+ if (block->data != nullptr && block->cpu_cc.can_desert() &&
+ block->gpu_cc.can_desert() /*For now If A Cache Entry Block is on GPU, it must on cpu. */) {
+ block->free_on_cpu();
+ count += 1;
+ return true;
+ } else {
+ return false;
+ }
+ },
+ [&count, this]() {
+ return false;
+ // return count == this->config.evict_count;
+ });
+}
+
+void CacheEntryManager::insert(BlockPtr entry) {
+ assert(entry->with_key);
+ assert(key_entry_map.count(entry->hash) == 0);
+ usage_list.push_front(entry);
+ key_entry_map[entry->hash] = usage_list.begin();
+}
+
+CacheEntryManager::BlockPtr CacheEntryManager::access(const Key& key) {
+ auto it = key_entry_map.at(key);
+ auto entry = *it;
+ usage_list.erase(it);
+ usage_list.push_front(entry);
+ key_entry_map[key] = usage_list.begin();
+ return entry;
+}
+
+// void CacheEntryManager::remove(const Key& key) {
+// auto it = key_entry_map[key];
+// usage_list.erase(it);
+// key_entry_map.erase(key);
+// }
+
+void CacheEntryManager::evict(std::function filter, std::function stop_condition) {
+ auto evict_count = 0;
+ auto inspect_count = 0;
+
+ std::lock_guard lg(lock);
+ for (auto it = usage_list.rbegin(); it != usage_list.rend();) {
+ inspect_count += 1;
+ // SPDLOG_DEBUG("Map Size {}, List Size {}, Evicted {} blocks, Inspected {}, {}", key_entry_map.size(),
+ // usage_list.size(), evict_count, inspect_count, pool->debug());
+ // (*it)->debug();
+ if (stop_condition())
+ break;
+ auto entry_ul = (*it)->try_lock();
+ if (entry_ul.owns_lock() == false) {
+ ++it; // Ensure iterator advances when locking fails
+ continue;
+ }
+ if (filter(*it)) {
+ // SPDLOG_DEBUG("Evicting {}", fmt::ptr(it->get()));
+ evict_count++;
+ if ((*it)->with_key)
+ key_entry_map.erase((*it)->hash);
+ it = decltype(it)(usage_list.erase(std::next(it).base())); // Use base() to adjust for reverse iterator
+ } else {
+ ++it; // Ensure iterator advances when filter fails
+ }
+ }
+
+ if (evict_count > 0) {
+ SPDLOG_DEBUG("Map Size {}, List Size {}, Evicted {} blocks, Inspected {}, {}", key_entry_map.size(),
+ usage_list.size(), evict_count, inspect_count, pool->debug());
+ }
+}
+
+CacheEntryManager::BlockPtr CacheEntryManager::get(bool& is_new, size_t size, std::optional key) {
+ std::unique_lock ul(lock);
+ if (key.has_value()) {
+ if (key_entry_map.count(key.value())) {
+ is_new = false;
+ return access(key.value());
+ } else {
+ auto entry = std::make_shared();
+ entry->with_key = true;
+ entry->hash = key.value();
+ entry->size = size;
+ entry->manager = this;
+ insert(entry);
+ is_new = true;
+ return entry;
+ }
+ } else {
+ auto entry = std::make_shared();
+ entry->with_key = false;
+ entry->size = size;
+ entry->manager = this;
+ is_new = true;
+ return entry;
+ }
+}
+
+void CacheEntryManager::debug() {
+ fmt::print("Cache Manager: {} entries\n", key_entry_map.size());
+ pool->debug();
+ fmt::print("Layer 0 Entries in Order\n", key_entry_map.size());
+ for (auto& it : usage_list) {
+ if (it->layer == 0)
+ it->debug();
+ }
+}
+
+}; // namespace kvc2
diff --git a/csrc/balance_serve/kvc2/src/cache_entry.hh b/csrc/balance_serve/kvc2/src/cache_entry.hh
new file mode 100644
index 0000000..16f9b84
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/cache_entry.hh
@@ -0,0 +1,182 @@
+#ifndef __CACHE_ENTRY_HH_
+#define __CACHE_ENTRY_HH_
+#include "async_store.hh"
+#include "cuda_stream_manager.hh"
+#include "defs.h"
+#include "hasher.hpp"
+#include "io_helper.hpp"
+#include "page_aligned_memory_pool.h"
+#include "utils/periodic_task.hpp"
+
+#include
+#include
+#include
+#include "utils/mutex_extend.hpp"
+
+namespace kvc2 {
+using CacheBlockKey = TokensHash;
+
+class CacheEntryManager;
+struct DoubleVerticalBlocksHandle;
+class GPUPageCache;
+
+struct ConcurrentControlUnit {
+ std::atomic_size_t ref_count = 0;
+ std::atomic_bool dirty = false;
+ TransferControl tc;
+
+ bool can_desert();
+ void debug();
+};
+
+enum IOOption {
+ IO_ForceRead,
+ IO_ForceWrite,
+ IO_Read,
+ IO_Write,
+};
+
+inline std::string to_string(IOOption op) {
+ switch (op) {
+ case IO_ForceRead:
+ return "IO_ForceRead";
+ case IO_ForceWrite:
+ return "IO_ForceWrite";
+ case IO_Read:
+ return "IO_Read";
+ case IO_Write:
+ return "IO_Write";
+ default:
+ return "Unknown";
+ }
+}
+
+struct CacheBlockEntry {
+ friend CacheEntryManager;
+ using MutexT = non_recursive_mutex;
+ // using MutexT = std::mutex;
+ MutexT lock;
+
+ // for cache
+ bool with_key = true;
+ CacheBlockKey hash = 0;
+ CacheBlockKey hash_check = 0;
+
+ CacheInfo cache_info;
+ CacheEntryManager* manager = nullptr;
+
+ // for memory pool
+ void* data = nullptr;
+ size_t size = 0;
+
+ ConcurrentControlUnit cpu_cc;
+
+ // for disk
+ size_t layer = -1;
+ size_t idx = -1;
+
+ // for gpu
+
+ std::optional gpu_block_idx = std::nullopt;
+ ConcurrentControlUnit gpu_cc;
+
+ CacheBlockEntry() =default;
+ CacheBlockEntry(const CacheBlockEntry& other) = delete;
+ CacheBlockEntry& operator=(const CacheBlockEntry& other) = delete;
+ CacheBlockEntry(CacheBlockEntry&& other) = delete;
+ CacheBlockEntry& operator=(CacheBlockEntry&& other) = delete;
+ ~CacheBlockEntry();
+
+ private:
+ bool alloc_on_cpu();
+
+
+ public:
+ void free_on_cpu();
+ bool alloc_on_cpu_no_lock();
+
+ bool inc_ref_or_alloc_on_cpu();
+ void set_key(TokensHash key, std::shared_ptr me);
+
+ std::unique_lock try_lock();
+ std::lock_guard lock_guard();
+
+ // will not get lock
+ void io_with(async_store::IODealer* dealer, IO_Helper& io_helper, async_store::ArrayStore* store,
+ size_t layer, size_t index, IOOption option);
+ void flush_back_async(IO_Helper& helper, std::vector& dirty_flags);
+
+ void debug();
+};
+
+struct CacheBlockEntryCollector{
+
+ std::vector entries;
+ std::function exit_fn;
+
+ CacheBlockEntryCollector(std::function exit_fn);
+ ~CacheBlockEntryCollector();
+
+ CacheBlockEntryCollector(const CacheBlockEntryCollector& other) = delete;
+ CacheBlockEntryCollector(CacheBlockEntryCollector&& other) = delete;
+ CacheBlockEntryCollector& operator=(const CacheBlockEntryCollector& other) = delete;
+ CacheBlockEntryCollector& operator=(CacheBlockEntryCollector&& other) = delete;
+
+
+
+};
+
+
+struct KVC2;
+struct CacheEntryManagerConfig {
+ size_t evict_count = 100;
+ KVC2* kvc2_top = nullptr;
+};
+
+class CacheEntryManager {
+ public:
+ using Key = CacheBlockKey;
+ using BlockPtr = std::shared_ptr;
+
+ private:
+ friend CacheBlockEntry;
+
+ CacheEntryManagerConfig config;
+
+ std::mutex lock;
+ std::list usage_list;
+ std::unordered_map::iterator> key_entry_map;
+
+ void insert(BlockPtr entry);
+ BlockPtr access(const Key& key);
+
+ // void remove(const Key& key);
+ void evict(std::function filter, std::function stop_condition);
+
+
+ public:
+ std::unique_ptr background_flush_back=nullptr;
+ std::shared_ptr pool;
+ std::shared_ptr gpu_cache;
+
+ CacheEntryManager(CacheEntryManagerConfig config);
+
+ // disable all move and copy
+ CacheEntryManager(const CacheEntryManager& other) = delete;
+ CacheEntryManager& operator=(const CacheEntryManager& other) = delete;
+ CacheEntryManager(CacheEntryManager&& other) = delete;
+ CacheEntryManager& operator=(CacheEntryManager&& other) = delete;
+
+ void cpu_background_flush();
+
+ void evict_for_cpu_cache();
+
+ // just get block pointers, not allocate them, will not return nullptr
+ BlockPtr get(bool& is_new,size_t size, std::optional key = std::nullopt);
+
+ void debug();
+};
+
+} // namespace kvc2
+
+#endif
\ No newline at end of file
diff --git a/csrc/balance_serve/kvc2/src/common.h b/csrc/balance_serve/kvc2/src/common.h
new file mode 100644
index 0000000..e69de29
diff --git a/csrc/balance_serve/kvc2/src/cuda_stream_manager.cpp b/csrc/balance_serve/kvc2/src/cuda_stream_manager.cpp
new file mode 100644
index 0000000..c696bf4
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/cuda_stream_manager.cpp
@@ -0,0 +1,135 @@
+#include "cuda_stream_manager.hh"
+#include
+#include
+#include
+#include
+#include
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_INFO
+// #define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_DEBUG
+#define FMT_HEADER_ONLY
+#include "spdlog/spdlog.h"
+
+CudaStreamManager::CudaStreamManager(const std::vector& device_ids, int num_streams_per_device) {
+ for (int device_id : device_ids) {
+ auto x = std::unique_ptr(new DeviceInfo);
+ DeviceInfo& device_info = *x;
+ device_info.device_id = device_id;
+ device_info.next_stream_index = 0;
+ device_info.stop_flag = false;
+
+ // 设置设备
+ cudaError_t err = cudaSetDevice(device_id);
+ if (err != cudaSuccess) {
+ SPDLOG_WARN("cudaSetDevice failed on device {}: {}", device_id, cudaGetErrorString(err));
+ throw std::runtime_error("cudaSetDevice failed");
+ }
+
+ // 创建 CUDA 流
+ device_info.streams.resize(num_streams_per_device);
+ for (int i = 0; i < num_streams_per_device; ++i) {
+ err = cudaStreamCreate(&device_info.streams[i]);
+ if (err != cudaSuccess) {
+ SPDLOG_WARN("Failed to create CUDA stream on device {}: {}", device_id, cudaGetErrorString(err));
+ throw std::runtime_error("Failed to create CUDA stream");
+ }
+ }
+
+ // 启动设备工作线程
+ device_info.worker_thread = std::thread(&CudaStreamManager::deviceWorker, this, std::ref(device_info));
+
+ devices_.push_back(std::move(x));
+ }
+}
+
+CudaStreamManager::~CudaStreamManager() {
+ // 通知所有设备线程停止
+ for (auto& device_info : devices_) {
+ device_info->stop_flag.store(true);
+ auto request = std::shared_ptr(new Request);
+ request->should_exit = true;
+ device_info->request_queue.enqueue(std::move(request));
+ }
+
+ // 等待所有线程结束
+ for (auto& device_info : devices_) {
+ if (device_info->worker_thread.joinable()) {
+ device_info->worker_thread.join();
+ }
+
+ // 销毁 CUDA 流
+ cudaSetDevice(device_info->device_id);
+ for (auto& stream : device_info->streams) {
+ cudaStreamDestroy(stream);
+ }
+ }
+}
+
+void CudaStreamManager::submitRequest(std::shared_ptr request) {
+ // 找到对应的设备
+ for (auto& device_info : devices_) {
+ if (device_info->device_id == request->device_id) {
+ device_info->request_queue.enqueue(request);
+ return;
+ }
+ }
+ throw std::runtime_error("Invalid device ID in request");
+}
+
+void CudaStreamManager::deviceWorker(DeviceInfo& device_info) {
+ // 设置设备
+ cudaError_t err = cudaSetDevice(device_info.device_id);
+ if (err != cudaSuccess) {
+ SPDLOG_WARN("cudaSetDevice failed in worker thread for device {}: {}", device_info.device_id,
+ cudaGetErrorString(err));
+ return;
+ }
+
+ while (device_info.stop_flag.load() == false) {
+ auto request = device_info.request_queue.dequeue();
+ if (request->should_exit) {
+ return;
+ }
+ // 处理请求
+ SPDLOG_DEBUG("Getting request on device {}, count {}", device_info.device_id, request->host_mem_addresses.size());
+ int stream_index = device_info.next_stream_index;
+ cudaStream_t stream = device_info.streams[stream_index];
+ device_info.next_stream_index = (device_info.next_stream_index + 1) % device_info.streams.size();
+
+ size_t num_transfers = request->host_mem_addresses.size();
+ for (size_t i = 0; i < num_transfers; ++i) {
+ void* dst = request->device_mem_addresses[i];
+ void* src = request->host_mem_addresses[i];
+ if (request->direction == cudaMemcpyDeviceToHost) {
+ std::swap(dst, src);
+ }
+
+ cudaError_t err = cudaMemcpyAsync(dst, src, request->sizes[i], request->direction, stream);
+ if (err != cudaSuccess) {
+ SPDLOG_WARN("cudaMemcpyAsync failed on device {}: {}", device_info.device_id, cudaGetErrorString(err));
+ // 可以根据需要处理错误,这里简单地继续
+ continue;
+ }
+ }
+
+ // 添加回调函数,因为是异步,所以需要包起来
+ struct CallbackData {
+ std::function callback;
+ };
+ CallbackData* cb_data = new CallbackData{request->callback};
+
+ err = cudaLaunchHostFunc(
+ stream,
+ [](void* data) {
+ // SPDLOG_DEBUG("Callback function called");
+ CallbackData* cb_data = static_cast(data);
+ cb_data->callback();
+ delete cb_data;
+ },
+ cb_data);
+
+ if (err != cudaSuccess) {
+ SPDLOG_WARN("cudaLaunchHostFunc failed on device {}: {}", device_info.device_id, cudaGetErrorString(err));
+ // 根据需要处理错误
+ }
+ }
+}
diff --git a/csrc/balance_serve/kvc2/src/cuda_stream_manager.hh b/csrc/balance_serve/kvc2/src/cuda_stream_manager.hh
new file mode 100644
index 0000000..d4fe215
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/cuda_stream_manager.hh
@@ -0,0 +1,54 @@
+/*
+ * @Author: Xie Weiyu ervinxie@qq.com
+ * @Date: 2024-11-19 09:24:47
+ * @LastEditors: Xie Weiyu ervinxie@qq.com
+ * @LastEditTime: 2024-11-20 02:55:49
+ * @FilePath: /kvc2/src/cuda_stream_manager.hh
+ * @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
+ */
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include "utils/mpsc.hpp"
+
+class CudaStreamManager {
+ public:
+ // 构造函数,接受要使用的设备 ID 列表和每个设备的流数量
+ CudaStreamManager(const std::vector& device_ids, int num_streams_per_device);
+ ~CudaStreamManager();
+
+ // 请求结构体
+ struct Request {
+ bool should_exit = false;
+ int device_id;
+ std::vector host_mem_addresses;
+ std::vector device_mem_addresses;
+ std::vector sizes;
+ cudaMemcpyKind direction;
+ std::function callback;
+ };
+
+ void submitRequest(std::shared_ptr request);
+
+ private:
+ // 每个设备的信息
+ struct DeviceInfo {
+ int device_id;
+ std::thread worker_thread;
+ std::vector streams;
+ int next_stream_index;
+ MPSCQueueConsumerLock> request_queue;
+ std::atomic_bool stop_flag;
+ };
+
+ // 设备 ID 到 DeviceInfo 的映射
+ std::vector> devices_;
+
+ // 私有方法
+ void deviceWorker(DeviceInfo& device_info);
+};
diff --git a/csrc/balance_serve/kvc2/src/defs.h b/csrc/balance_serve/kvc2/src/defs.h
new file mode 100644
index 0000000..b21f4e2
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/defs.h
@@ -0,0 +1,35 @@
+#ifndef __DEFS_H_
+#define __DEFS_H_
+
+#include
+#include
+#include
+#include "model_config.h"
+
+namespace kvc2 {
+using kvc2_ptr = void*;
+// using data_block_ptr = std::intptr_t;
+using data_block_ptr = void*;
+using layer_data = std::vector;
+using kvc2_handle = void*;
+
+using Token = uint32_t;
+using Tokens = std::vector;
+using TokenPtr = std::intptr_t;
+using TokenLength = size_t;
+using BlockLength = size_t;
+
+struct CacheInfo {
+ ModelName model_name;
+ bool is_key_cache;
+ QuantType quant_type;
+
+ size_t hidden_layer_count();
+ std::filesystem::path path(std::optional which_layer = std::nullopt);
+ bool operator==(const CacheInfo& other) const;
+ size_t element_size(size_t block_length);
+ size_t hash_value() const;
+};
+
+}; // namespace kvc2
+#endif
diff --git a/csrc/balance_serve/kvc2/src/gpu_cache.cpp b/csrc/balance_serve/kvc2/src/gpu_cache.cpp
new file mode 100644
index 0000000..2bfe945
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/gpu_cache.cpp
@@ -0,0 +1,282 @@
+#include "gpu_cache.hh"
+
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_DEBUG
+#define FMT_HEADER_ONLY
+#include "spdlog/spdlog.h"
+
+#include "cache_entry.hh"
+#include "utils/arithmetic.hpp"
+
+namespace kvc2 {
+
+GPUPageCache::GPUPageCache(GPUPageCacheConfig& config) : config(config) {
+ if (torch::cuda::is_available()) {
+ size_t gpu_count = torch::cuda::device_count();
+ SPDLOG_INFO("Number of available GPUs: {}, want {}", gpu_count, config.gpu_devices_id.size());
+ if (gpu_count < config.gpu_devices_id.size()) {
+ SPDLOG_ERROR("Not enough GPUs available.");
+ exit(0);
+ }
+ for (auto x : config.gpu_devices_id) {
+ gpu_devices.push_back(torch::Device(torch::kCUDA, x));
+ }
+ } else {
+ SPDLOG_ERROR("CUDA is not available on this system.");
+ exit(0);
+ }
+
+ SPDLOG_WARN("Creating GPU Cache");
+ shape.push_back(config.layer_count);
+ shape.push_back(config.total_kvcache_pages);
+ shape.push_back(config.num_token_per_page);
+ if (config.full_kv_cache_on_each_gpu) {
+ if (config.gpu_devices_id.size() > 1) {
+ SPDLOG_WARN("Replicated KVCache on multiple gpu");
+ }
+ shape.push_back(config.num_k_heads);
+ } else {
+ shape.push_back(config.num_k_heads / config.gpu_devices_id.size());
+ }
+ shape.push_back(config.k_head_dim);
+ tensor_size = torch::elementSize(config.tensor_type);
+ for (auto& s : shape) {
+ tensor_size *= s;
+ }
+ SPDLOG_INFO("Creating KV Page Cache, Shape ({},{},{},{},{}), Size {} MiB", shape[0], shape[1], shape[2], shape[3],
+ shape[4], tensor_size / (1 << 20));
+ if (config.k_cache_on) {
+ for (size_t i = 0; i < config.gpu_devices_id.size(); i++) {
+ auto k = torch::zeros(shape, torch::TensorOptions().dtype(config.tensor_type));
+ k = k.to(gpu_devices[i]);
+
+ k_cache.push_back(k);
+
+ SPDLOG_INFO("K Page Cache of GPU {} is created", config.gpu_devices_id[i]);
+ }
+ occupations.resize(config.layer_count);
+ } else {
+ SPDLOG_WARN("Disalbe K Cache");
+ assert(config.gpu_only);
+ }
+
+ if (config.v_cache_on) {
+ for (size_t i = 0; i < config.gpu_devices_id.size(); i++) {
+ auto v = torch::zeros(shape, torch::TensorOptions().dtype(config.tensor_type));
+ v = v.to(gpu_devices[i]);
+ v_cache.push_back(v);
+
+ SPDLOG_INFO("V Page Cache of GPU {} is created", config.gpu_devices_id[i]);
+ }
+ v_occupations.resize(config.layer_count);
+ } else {
+ SPDLOG_WARN("Disalbe V Cache");
+ // assert(config.gpu_only); // should not assert
+ }
+
+ if (config.gpu_only) {
+ gpu_only_occupations.resize(config.total_kvcache_pages, false);
+ }
+
+
+ num_free_pages = config.total_kvcache_pages;
+ for (size_t i = 0; i < config.layer_count; i++) {
+ if (config.k_cache_on)
+ occupations[i].resize(config.total_kvcache_pages, nullptr);
+ if (config.v_cache_on)
+ v_occupations[i].resize(config.total_kvcache_pages, nullptr);
+ }
+
+ tp_size.resize(config.gpu_devices_id.size(), shape[2] * shape[3] * shape[4] * c10::elementSize(config.tensor_type));
+ tp_offset.resize(config.gpu_devices_id.size(), 0);
+ for (size_t i = 1; i < tp_offset.size(); i++) {
+ tp_offset[i] = tp_offset[i - 1] + tp_size[i - 1];
+ }
+
+ stream_manager =
+ std::unique_ptr(new CudaStreamManager(config.gpu_devices_id, config.num_streams_per_device));
+}
+
+bool GPUPageCache::alloc_col(std::vector>>& k_entries,
+ std::vector>>& v_entries, size_t at) {
+ std::lock_guard lg(lock);
+ auto idx = next_empty_col();
+ if (idx.has_value()) {
+ // must have entry lock
+ auto& k0_entry = k_entries[0][at];
+ k0_entry->gpu_block_idx = idx;
+
+ for (size_t l = 0; l < config.layer_count; l++) {
+ if (config.k_cache_on) {
+ assert(k_entries[l][at]->data != nullptr);
+ occupations[l][idx.value()] = k_entries[l][at];
+ }
+ if (config.v_cache_on) {
+ assert(v_entries[l][at]->data != nullptr);
+ v_occupations[l][idx.value()] = v_entries[l][at];
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+std::vector GPUPageCache::gpu_only_alloc_col(size_t count) {
+ assert(config.gpu_only);
+ std::lock_guard lg(lock);
+ std::vector re;
+
+ for (size_t i = 0; i < config.total_kvcache_pages; i++) {
+ if (gpu_only_occupations[i] == false) {
+ re.push_back(i);
+ if (re.size() == count) {
+ break;
+ }
+ }
+ }
+
+ if (re.size() == count) {
+ for (auto at : re) {
+ gpu_only_occupations[at] = true;
+ }
+ } else {
+ SPDLOG_WARN("GPU ONLY: Cannot allocate {} cols", count);
+ re.clear();
+ }
+ return re;
+}
+
+void GPUPageCache::gpu_only_free_cols(std::vector cols) {
+ assert(config.gpu_only);
+ std::lock_guard lg(lock);
+ for (auto at : cols) {
+ assert(gpu_only_occupations[at]);
+ gpu_only_occupations[at] = false;
+ }
+}
+
+std::optional GPUPageCache::next_empty_col() {
+ if (num_free_pages == 0) {
+ evict_cols();
+ if (num_free_pages == 0) {
+ return std::nullopt;
+ }
+ }
+ while (occupations[0][_col_idx] != nullptr) {
+ _col_idx = (_col_idx + 1) % config.total_kvcache_pages;
+ }
+ num_free_pages -= 1;
+ return _col_idx;
+}
+
+void GPUPageCache::evict_cols() {
+ auto evicted_count = 0;
+ for (size_t i = 0; i < config.total_kvcache_pages; i++) {
+ auto& h = occupations[0][i];
+ if (h == nullptr) {
+ continue;
+ }
+ auto lg = h->lock_guard();
+ if (h->gpu_cc.can_desert()) {
+ h->gpu_cc.tc.reset();
+ h = nullptr;
+ num_free_pages += 1;
+ evicted_count += 1;
+ }
+ }
+ if (evicted_count > 0)
+ SPDLOG_INFO("GPU: Evicted {} GPU pages", evicted_count);
+}
+
+std::vector> GPUPageCache::try_lock_col(size_t at) {
+ std::vector> re;
+ if (config.k_cache_on) {
+ for (size_t l = 0; l < config.layer_count; l++) {
+ if (occupations[l][at] == nullptr) {
+ return {};
+ }
+ auto ul = occupations[l][at]->try_lock();
+ if (ul.owns_lock()) {
+ re.push_back(std::move(ul));
+ } else {
+ return {};
+ }
+ }
+ }
+ if (config.v_cache_on) {
+ for (size_t l = 0; l < config.layer_count; l++) {
+ if (v_occupations[l][at] == nullptr) {
+ return {};
+ }
+ auto ul = v_occupations[l][at]->try_lock();
+ if (ul.owns_lock()) {
+ re.push_back(std::move(ul));
+ } else {
+ return {};
+ }
+ }
+ }
+ return re;
+}
+
+std::vector> GPUPageCache::basic_request(cudaMemcpyKind direction,
+ std::function callback) {
+ std::vector> re;
+ re.resize(config.gpu_devices_id.size(), nullptr);
+ for (size_t i = 0; i < re.size(); i++) {
+ re[i] = std::shared_ptr(new CudaStreamManager::Request);
+ re[i]->direction = direction;
+ re[i]->device_id = config.gpu_devices_id[i];
+ re[i]->callback = callback;
+ }
+ return re;
+}
+
+void GPUPageCache::submit_requests(std::vector> reqs) {
+ for (auto& r : reqs) {
+ stream_manager->submitRequest(r);
+ }
+}
+
+void GPUPageCache::append_col_to_request(std::vector>& reqs,
+ std::vector>>& k_handles,
+ std::vector>>& v_handles,
+ size_t at) {
+ if (config.k_cache_on == false && config.v_cache_on == false) {
+ return;
+ }
+ auto gpu_block_idx = k_handles[0][at]->gpu_block_idx.value();
+ for (size_t layer = 0; layer < config.layer_count; layer++) {
+ for (size_t which_gpu = 0; which_gpu < config.gpu_devices_id.size(); which_gpu++) {
+
+ if (config.k_cache_on) {
+ assert(k_handles[layer][at]->data != nullptr);
+ reqs[which_gpu]->sizes.push_back(tp_size[which_gpu]);
+ reqs[which_gpu]->host_mem_addresses.push_back(offset_by_bytes(k_handles[layer][at]->data, tp_offset[which_gpu]));
+ reqs[which_gpu]->device_mem_addresses.push_back(k_cache[which_gpu][layer][gpu_block_idx].data_ptr());
+ }
+
+ if (config.v_cache_on) {
+ assert(v_handles[layer][at]->data != nullptr);
+ reqs[which_gpu]->sizes.push_back(tp_size[which_gpu]);
+ reqs[which_gpu]->host_mem_addresses.push_back(offset_by_bytes(v_handles[layer][at]->data, tp_offset[which_gpu]));
+ reqs[which_gpu]->device_mem_addresses.push_back(v_cache[which_gpu][layer][gpu_block_idx].data_ptr());
+ }
+ }
+ }
+ // SPDLOG_DEBUG("GPU: Appended Vertical Handle to Request, count {}", reqs[0]->sizes.size());
+}
+
+void GPUPageCache::debug() {
+ size_t count = 0;
+ for (size_t i = 0; i < config.total_kvcache_pages; i++) {
+ if (occupations[0][i] == nullptr) {
+ count += 1;
+ } else {
+ // occupations[0][i]->gpu_cc.debug();
+ }
+ }
+ SPDLOG_DEBUG("Free Page: {}/{}", count, config.total_kvcache_pages);
+}
+
+} // namespace kvc2
diff --git a/csrc/balance_serve/kvc2/src/gpu_cache.hh b/csrc/balance_serve/kvc2/src/gpu_cache.hh
new file mode 100644
index 0000000..0621056
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/gpu_cache.hh
@@ -0,0 +1,74 @@
+#ifndef __GPU_CACHE_HH_
+#define __GPU_CACHE_HH_
+
+#include
+#include "cache_entry.hh"
+#include "cuda_stream_manager.hh"
+#include "defs.h"
+#include "kvc2.h"
+#include "metrics.h"
+#include "utils/periodic_task.hpp"
+
+namespace kvc2 {
+
+class GPUPageCache {
+ std::vector gpu_devices;
+
+ std::vector shape;
+ size_t tensor_size;
+ std::vector tp_offset;
+ std::vector tp_size;
+
+
+
+ // met
+ std::shared_ptr met;
+
+ // states
+ std::mutex lock;
+ size_t num_free_pages;
+ std::vector gpu_only_occupations;
+ std::vector>> occupations,v_occupations;
+ size_t _col_idx = 0;
+
+
+ // cuda stream manager
+ std::optional next_empty_col();
+
+ public:
+ GPUPageCacheConfig config;
+ std::unique_ptr stream_manager;
+ std::vector k_cache;
+ std::vector v_cache;
+ std::unique_ptr background_flush_back =nullptr;
+
+ GPUPageCache(GPUPageCacheConfig& config);
+
+ std::vector gpu_only_alloc_col(size_t count);
+ void gpu_only_free_cols(std::vector cols);
+
+
+ void gpu_background_flush();
+
+
+ bool alloc_col(std::vector>>& k_entries,
+ std::vector>>& v_entries, size_t at);
+ void evict_cols();
+ void flush_col(size_t at);
+ std::vector> try_lock_col(size_t at);
+
+ void free_col(size_t at);
+
+ std::vector> basic_request(cudaMemcpyKind direction,
+ std::function callback);
+
+ void submit_requests(std::vector> reqs);
+
+ void append_col_to_request(std::vector>& reqs,
+ std::vector>>& k_handles,
+ std::vector>>& v_handles, size_t at);
+
+ void debug();
+};
+} // namespace kvc2
+#endif
\ No newline at end of file
diff --git a/csrc/balance_serve/kvc2/src/hasher.hpp b/csrc/balance_serve/kvc2/src/hasher.hpp
new file mode 100644
index 0000000..7b328ae
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/hasher.hpp
@@ -0,0 +1,40 @@
+#ifndef __HASHER_HPP_
+#define __HASHER_HPP_
+
+#include "defs.h"
+#include "xxhash.h"
+
+namespace kvc2 {
+
+const uint64_t hash_seed = 4123512;
+const uint64_t check_hash_seed = 1025753;
+
+using TokensHash = XXH64_hash_t;
+struct TokensHasher {
+ XXH64_state_t* state;
+ TokensHasher() {
+ state = XXH64_createState();
+ reset();
+ }
+ ~TokensHasher() { XXH64_freeState(state); }
+
+ TokensHasher(TokensHasher& other) = delete;
+ TokensHasher& operator=(TokensHasher& other) = delete;
+ TokensHasher(TokensHasher&& other) = delete;
+ TokensHasher& operator=(TokensHasher&& other) = delete;
+ TokensHash get() { return XXH64_digest(state); }
+ void reset(size_t seed = hash_seed) { XXH64_reset(state, seed); }
+ TokensHash update(Token* data, TokenLength length) {
+ XXH64_update(state, data, length * sizeof(Token));
+ return get();
+ }
+
+ TokensHash update_raw(void* data, size_t size) {
+ XXH64_update(state, data, size);
+ return get();
+ }
+
+ static TokensHash hash(Token* data, TokenLength length) { return XXH64(data, length * sizeof(Token), hash_seed); }
+};
+} // namespace kvc2
+#endif
\ No newline at end of file
diff --git a/csrc/balance_serve/kvc2/src/io_helper.hpp b/csrc/balance_serve/kvc2/src/io_helper.hpp
new file mode 100644
index 0000000..b9f3021
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/io_helper.hpp
@@ -0,0 +1,155 @@
+/**
+ * @Description :
+ * @Author : Xie Weiyu
+ * @Date : 2024-12-11 06:35:31
+ * @Version : 1.0.0
+ * @LastEditors : Xie Weiyu
+ * @LastEditTime : 2024-12-11 06:50:55
+ * @Copyright (c) 2024 by KVCache.AI, All Rights Reserved.
+ **/
+
+#pragma once
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+struct BatchPromise {
+ std::promise promise;
+ std::shared_future fut;
+ std::atomic_size_t count;
+
+ inline BatchPromise(size_t count) : count(count) { fut = promise.get_future().share(); }
+
+ inline void inc(size_t count = 1) { this->count.fetch_add(count, std::memory_order_seq_cst); }
+
+ inline void set() {
+ if (count.fetch_sub(1, std::memory_order_seq_cst) == 1) {
+ promise.set_value();
+ }
+ }
+ inline std::shared_future get_shared_fut() { return fut; }
+};
+
+template
+struct TransferControl {
+ Lock lock;
+
+ std::optional> transfer_ok = std::nullopt;
+ bool has_data = false;
+
+ TransferControl() {}
+
+ /*
+ true, std::nullopt : Already has data
+ false, shared_future : Transfer already started, should wait for the future
+ false, std::nullopt : should transfer by you
+ true, shared_future: Should not appear
+ */
+ std::pair>> has_data_or_transfer(std::shared_future shared_fut) {
+ std::lock_guard lg(lock);
+ if (has_data) {
+ return {true, std::nullopt};
+ } else {
+ if (transfer_ok.has_value()) {
+ return {false, transfer_ok};
+ } else {
+ transfer_ok = shared_fut;
+ return {false, std::nullopt};
+ }
+ }
+ }
+
+ void set_has_data() {
+ std::lock_guard lg(lock);
+ has_data = true;
+ transfer_ok = std::nullopt;
+ }
+
+ bool get_has_data() {
+ std::lock_guard lg(lock);
+ if (has_data) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void reset() {
+ std::lock_guard lg(lock);
+ transfer_ok = std::nullopt;
+ has_data = false;
+ }
+
+ std::string debug() {
+ std::lock_guard lg(lock);
+ return std::string("") + (has_data ? "has data" : "no data") + " " +
+ (transfer_ok.has_value() ? "transfer " : "no transfer");
+ }
+};
+
+struct ConcurrentController {
+ std::atomic_bool dirty = false;
+ std::atomic_size_t ref_count = 0;
+ TransferControl tc;
+};
+
+template
+struct IO_Helper {
+ BatchPromise batch_promise;
+ std::function call_back_on_unit = nullptr;
+ std::function call_back = nullptr;
+
+ std::vector> futs;
+ std::vector units_by_myself;
+
+ IO_Helper(std::function call_back_on_unit, std::function call_back = nullptr)
+ : batch_promise(1), call_back_on_unit(call_back_on_unit), call_back(call_back) {}
+
+ IO_Helper(const IO_Helper& other) = delete;
+ IO_Helper& operator=(const IO_Helper& other) = delete;
+ IO_Helper(IO_Helper&& other) = delete;
+ IO_Helper& operator=(IO_Helper&& other) = delete;
+ ~IO_Helper() {
+ // std::cout<<"Destory IO helper"<& tc) {
+ auto [ok, fut] = tc.has_data_or_transfer(batch_promise.get_shared_fut());
+ if (ok) {
+ return false;
+ } else {
+ if (fut.has_value()) {
+ futs.push_back(fut.value());
+ // printf("Transfer started\n");
+ return false;
+ } else {
+ units_by_myself.push_back(unit);
+ // printf("Not Transfer\n");
+ return true;
+ }
+ }
+ }
+
+ void wait() {
+ for (auto& fut : futs) {
+ fut.wait();
+ }
+ batch_promise.get_shared_fut().wait();
+ for (auto& b : units_by_myself) {
+ call_back_on_unit(b);
+ }
+ if (call_back)
+ call_back();
+ }
+};
diff --git a/csrc/balance_serve/kvc2/src/kvc2.h b/csrc/balance_serve/kvc2/src/kvc2.h
new file mode 100644
index 0000000..e93a6cf
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/kvc2.h
@@ -0,0 +1,138 @@
+#pragma once
+#include
+#include
+#include
+#include
+#include "defs.h"
+#include "model_config.h"
+
+namespace kvc2 {
+struct GPUPageCacheConfig {
+ bool gpu_only;
+ std::vector gpu_devices_id;
+
+ size_t layer_count;
+ size_t total_kvcache_pages;
+ size_t num_token_per_page;
+ size_t num_k_heads;
+ size_t k_head_dim;
+
+ bool full_kv_cache_on_each_gpu = false;
+ bool k_cache_on = true;
+ bool v_cache_on = true;
+ torch::ScalarType tensor_type;
+
+ // for cuda stream manager
+ size_t num_streams_per_device = 4;
+};
+
+struct KVC2Config {
+ bool k_cache_on = true;
+ bool v_cache_on = true;
+ bool gpu_only = false;
+ bool load_from_disk = true;
+ bool save_to_disk = true;
+ std::string path;
+ std::string config_path;
+ TokenLength num_token_per_page = 256;
+ size_t memory_pool_size = 10e9;
+ size_t evict_count = 20;
+ std::optional gpu_cache_config = std::nullopt;
+ size_t metrics_port;
+ double recompute_ratio = 0.2;
+};
+
+class DoubleCacheHandleInterface;
+class KVC2Interface {
+ public:
+ virtual ~KVC2Interface() = default;
+
+ virtual void load() = 0;
+ virtual void save() = 0;
+ /*
+Raw Insert
+Insert kvcache from kvcache_data to disk.
+
+info: cache info
+id: start pointer of token array
+length: length of token array
+kvcache_data: data of kvcache
+
+This will firstly match the ID array with the existing kvcache, and then insert the unmatched kvcache to disk.
+*/
+ virtual void raw_insert(ModelName model_name, QuantType quant_type, Token* id, TokenLength length,
+ const std::vector& k_cache, const std::vector& v_cache) = 0;
+
+ /*
+Raw Read
+Read kvcache from disk to user specified pointers.
+
+info: cache info
+id: start pointer of token array
+length: length of token array
+kvcache_data: data of kvcache
+Return: matched length of prefix, in tokens
+
+This will not read from memory pool, it directly read from disk.
+*/
+ virtual TokenLength raw_read(ModelName model_name, QuantType quant_type, Token* id, TokenLength length,
+ const std::vector& k_cache, const std::vector& v_cache) = 0;
+
+ /*
+ Lookup
+ Lookup kvcache and load it from disk to memory pool if needed.
+
+ info: cache info
+ id: start pointer of token array
+ length: length of token array
+
+ Return: kvc2_handle, holds kvcache until being released.
+ if not found, matched_length will return 0.
+ if memory pool is full, return nullptr
+ */
+ virtual std::shared_ptr lookup(ModelName model_name, QuantType quant_type, Token* id,
+ TokenLength length, TokenLength estimated_length) = 0;
+
+ /*
+ Lookup and allocate to gpu
+ info.is_k_cache does not matter here
+ */
+ virtual std::shared_ptr lookup_to_gpu(ModelName model_name, QuantType quant_type,
+ Token* id, TokenLength length,
+ TokenLength estimated_length) = 0;
+
+ virtual void lookup_to_gpu_async(ModelName model_name, QuantType quant_type, Token* id, TokenLength length,
+ TokenLength estimated_length,
+ std::function)> call_back) = 0;
+
+ virtual std::pair, std::vector> get_kvcache() = 0;
+
+ virtual void debug() = 0;
+};
+
+std::shared_ptr create_kvc2(KVC2Config config);
+
+enum MatchStatus {
+ Exact,
+ Partial,
+ NotMatchExact,
+ NotMatchPartial,
+};
+
+class DoubleCacheHandleInterface {
+ public:
+ virtual ~DoubleCacheHandleInterface() = default;
+ virtual TokenLength matched_length() = 0;
+ virtual std::vector matched_status() = 0;
+ virtual std::vector handle_data(bool is_key_cache) = 0;
+ virtual bool to_gpu() = 0;
+ virtual void to_gpu_async(std::function call_back) = 0;
+ virtual std::vector get_gpu_block_idx() = 0;
+ virtual std::vector get_gpu_attached_block_idx() = 0;
+
+ virtual void append_tokens(Token* tokens, TokenLength length) = 0; // update generated tokens
+
+ virtual void debug() = 0;
+};
+
+}; // namespace kvc2
diff --git a/csrc/balance_serve/kvc2/src/kvc2_utils.py b/csrc/balance_serve/kvc2/src/kvc2_utils.py
new file mode 100644
index 0000000..954f63c
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/kvc2_utils.py
@@ -0,0 +1,64 @@
+import torch
+import ctypes
+
+def aligned_tensor(size, alignment=4096):
+ num_bytes = size
+ mem = ctypes.c_void_p()
+ error_code = ctypes.CDLL(None).posix_memalign(
+ ctypes.byref(mem), ctypes.c_size_t(alignment), ctypes.c_size_t(num_bytes)
+ )
+
+ if error_code != 0:
+ raise MemoryError(f"posix_memalign failed with error code {error_code}")
+
+ array_type = (ctypes.c_int8 * size)
+ raw_array = array_type.from_address(mem.value)
+
+ tensor = torch.frombuffer(raw_array, dtype=torch.int8)
+
+ if tensor.data_ptr() % alignment != 0:
+ raise ValueError(f"Tensor data_ptr {tensor.data_ptr()} is not aligned to {alignment} bytes")
+
+ return tensor, mem
+
+def alloc_aligned_cache(layer_count,block_count,element_size):
+ cache = []
+ cache_mem = []
+ for i in range(layer_count):
+ layer_data = []
+ layer_mem = []
+ for j in range(block_count):
+ tensor, mem_ptr = aligned_tensor(element_size, alignment=4096)
+ layer_data.append(tensor)
+ layer_mem.append(mem_ptr)
+ cache.append(layer_data)
+ cache_mem.append(layer_mem)
+ return cache,cache_mem
+
+def dealloc_aligned_cache(cache_mem):
+ for layer_mem in cache_mem:
+ for mem_ptr in layer_mem:
+ ctypes.CDLL(None).free(mem_ptr)
+
+def get_tensor_ptr(tensors):
+ tensor_ptr = []
+ for layer in tensors:
+ layer_ptr = []
+ for data in layer:
+ layer_ptr.append(data.data_ptr())
+ tensor_ptr.append(layer_ptr)
+ return tensor_ptr
+
+def get_tensor_from_data_ptr(matched_data,element_size):
+ re = []
+ for layer in matched_data:
+ re_layer = []
+ for data_ptr in layer:
+ array_type = (ctypes.c_int8 * element_size)
+ raw_array = array_type.from_address(data_ptr)
+ tensor = torch.frombuffer(raw_array, dtype=torch.int8)
+ re_layer.append(tensor)
+ re.append(re_layer)
+ return re
+if __name__ == "__main__":
+ pass
\ No newline at end of file
diff --git a/csrc/balance_serve/kvc2/src/metrics.cpp b/csrc/balance_serve/kvc2/src/metrics.cpp
new file mode 100644
index 0000000..9dd2c9e
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/metrics.cpp
@@ -0,0 +1,141 @@
+#include "metrics.h"
+
+namespace kvc2 {
+
+Metrics::Metrics(const MetricsConfig& config)
+ : registry_(std::make_shared()), exposer_(config.endpoint) {
+ // 注册 prefix_nodes Counter
+ auto& prefix_nodes_family = prometheus::BuildCounter()
+ .Name(std::string(METRIC_PREFIX) + "_prefix_nodes")
+ .Help("Number of prefix nodes")
+ .Register(*registry_);
+ prefix_nodes = &prefix_nodes_family.Add({});
+
+ // 注册 prefix_block_count Counter
+ auto& prefix_block_count_family = prometheus::BuildCounter()
+ .Name(std::string(METRIC_PREFIX) + "_prefix_block_count")
+ .Help("Number of prefix blocks")
+ .Register(*registry_);
+ prefix_block_count = &prefix_block_count_family.Add({});
+
+ // 定义统一的桶大小,最大为 10000 ms (10 s)
+ std::vector common_buckets = {1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0};
+
+ // 注册 raw_insert_time_ms Histogram
+ auto& raw_insert_time_ms_family = prometheus::BuildHistogram()
+ .Name(std::string(METRIC_PREFIX) + "_raw_insert_time_ms")
+ .Help("function raw insert's time in milliseconds")
+ .Register(*registry_);
+ raw_insert_time_ms = &raw_insert_time_ms_family.Add({}, common_buckets);
+
+ // 注册 lookup_time_ms Histogram
+ auto& lookup_time_ms_family = prometheus::BuildHistogram()
+ .Name(std::string(METRIC_PREFIX) + "_lookup_time_ms")
+ .Help("function lookup's time in milliseconds")
+ .Register(*registry_);
+ lookup_time_ms = &lookup_time_ms_family.Add({}, common_buckets);
+
+ // 注册 lookup_prefixmatch_length Histogram
+ auto& lookup_prefixmatch_length_family = prometheus::BuildHistogram()
+ .Name(std::string(METRIC_PREFIX) + "_lookup_prefixmatch_length")
+ .Help("function lookup's prefix match length")
+ .Register(*registry_);
+ lookup_prefixmatch_length = &lookup_prefixmatch_length_family.Add({}, common_buckets);
+
+ // 注册 matched_length_percentage Histogram
+ auto& matched_length_percentage_family = prometheus::BuildHistogram()
+ .Name(std::string(METRIC_PREFIX) + "_matched_length_percentage")
+ .Help("function matched length percentage")
+ .Register(*registry_);
+ matched_length_percentage = &matched_length_percentage_family.Add({}, common_buckets);
+
+ // 注册 disk_usage Gauge
+ auto& disk_usage_family =
+ prometheus::BuildGauge().Name(std::string(METRIC_PREFIX) + "_disk_usage").Help("disk usage").Register(*registry_);
+ disk_usage = &disk_usage_family.Add({});
+
+ // 注册 memory_pool_size Gauge
+ memory_pool_size_family_ = &prometheus::BuildGauge()
+ .Name(std::string(METRIC_PREFIX) + "_memory_pool_size")
+ .Help("memory pool size")
+ .Register(*registry_);
+
+ // 注册 memory_pool_node_count Gauge
+ memory_pool_node_count_family_ = &prometheus::BuildGauge()
+ .Name(std::string(METRIC_PREFIX) + "_memory_pool_node_count")
+ .Help("memory pool node count")
+ .Register(*registry_);
+
+ // 注册 lru_entry_count Gauge
+ lru_entry_count_family_ = &prometheus::BuildGauge()
+ .Name(std::string(METRIC_PREFIX) + "_lru_entry_count")
+ .Help("lru entry count")
+ .Register(*registry_);
+
+ // 注册 gpu_page_count Gauge
+ gpu_page_count_family_ = &prometheus::BuildGauge()
+ .Name(std::string(METRIC_PREFIX) + "_gpu_page_count")
+ .Help("gpu page count")
+ .Register(*registry_);
+
+ // 注册 append_tokens_time_ms Histogram
+ auto& append_tokens_time_ms_family = prometheus::BuildHistogram()
+ .Name(std::string(METRIC_PREFIX) + "_append_tokens_time_ms")
+ .Help("append tokens time in milliseconds")
+ .Register(*registry_);
+ append_tokens_time_ms = &append_tokens_time_ms_family.Add({}, common_buckets);
+
+ // 注册 gpu_flush_back_time_ms Histogram
+ auto& gpu_flush_back_time_ms_family = prometheus::BuildHistogram()
+ .Name(std::string(METRIC_PREFIX) + "_gpu_flush_back_time_ms")
+ .Help("gpu flush back time in milliseconds")
+ .Register(*registry_);
+ gpu_flush_back_time_ms = &gpu_flush_back_time_ms_family.Add({}, common_buckets);
+
+ // 注册 cpu_flush_back_time_ms Histogram
+ auto& cpu_flush_back_time_ms_family = prometheus::BuildHistogram()
+ .Name(std::string(METRIC_PREFIX) + "_cpu_flush_back_time_ms")
+ .Help("cpu flush back time in milliseconds")
+ .Register(*registry_);
+ cpu_flush_back_time_ms = &cpu_flush_back_time_ms_family.Add({}, common_buckets);
+
+ exposer_.RegisterCollectable(registry_);
+}
+
+// 析构函数
+Metrics::~Metrics() {
+ // 停止指标暴露
+ // exposer_.Stop();
+}
+
+// 获取 memory_pool_size 指标
+prometheus::Gauge* Metrics::memory_pool_size(const std::string& type) {
+ return &memory_pool_size_family_->Add({{"type", type}});
+}
+
+// 获取 memory_pool_node_count 指标
+prometheus::Gauge* Metrics::memory_pool_node_count(const std::string& type) {
+ return &memory_pool_node_count_family_->Add({{"type", type}});
+}
+
+// 获取 lru_entry_count 指标
+prometheus::Gauge* Metrics::lru_entry_count(const std::string& type) {
+ return &lru_entry_count_family_->Add({{"type", type}});
+}
+
+// 获取 gpu_page_count 指标
+prometheus::Gauge* Metrics::gpu_page_count(std::string type) {
+ return &gpu_page_count_family_->Add({{"type", type}});
+}
+
+TimeObserver::TimeObserver(prometheus::Histogram* h) {
+ histogram_ = h;
+ timer_.start();
+}
+
+TimeObserver::~TimeObserver() {
+ timer_.stop();
+ histogram_->Observe(timer_.elapsedNs() / 1e6); // ns -> ms
+}
+
+} // namespace kvc2
\ No newline at end of file
diff --git a/csrc/balance_serve/kvc2/src/metrics.h b/csrc/balance_serve/kvc2/src/metrics.h
new file mode 100644
index 0000000..fa88785
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/metrics.h
@@ -0,0 +1,77 @@
+#pragma once
+
+#include "prometheus/counter.h"
+#include "prometheus/exposer.h"
+#include "prometheus/gauge.h"
+#include "prometheus/histogram.h"
+#include "prometheus/registry.h"
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "utils/timer.hpp"
+
+namespace kvc2 {
+
+// 指标前缀宏定义
+#define METRIC_PREFIX "kvc2"
+
+struct MetricsConfig {
+ std::string endpoint; // 监听端点,如 "0.0.0.0:8080"
+};
+
+class Metrics {
+ public:
+ // 构造函数传入 MetricsConfig
+ Metrics(const MetricsConfig& config);
+ ~Metrics();
+
+ // 禁止拷贝和赋值
+ Metrics(const Metrics&) = delete;
+ Metrics& operator=(const Metrics&) = delete;
+
+ // 指标指针
+ prometheus::Counter* prefix_nodes;
+ prometheus::Counter* prefix_block_count;
+
+ prometheus::Histogram* raw_insert_time_ms;
+ prometheus::Histogram* lookup_time_ms;
+ prometheus::Histogram* lookup_prefixmatch_length;
+ prometheus::Histogram* matched_length_percentage;
+
+ prometheus::Gauge* disk_usage;
+
+ prometheus::Gauge* memory_pool_size(const std::string& type);
+ prometheus::Gauge* memory_pool_node_count(const std::string& type);
+
+ prometheus::Gauge* lru_entry_count(const std::string& type);
+ prometheus::Gauge* gpu_page_count(std::string type);
+
+ prometheus::Histogram* append_tokens_time_ms;
+ prometheus::Histogram* gpu_flush_back_time_ms;
+ prometheus::Histogram* cpu_flush_back_time_ms;
+
+ private:
+ std::shared_ptr registry_;
+ prometheus::Exposer exposer_;
+
+ prometheus::Family* memory_pool_size_family_;
+ prometheus::Family* memory_pool_node_count_family_;
+ prometheus::Family* lru_entry_count_family_;
+ prometheus::Family* gpu_page_count_family_;
+};
+
+class TimeObserver {
+ public:
+ TimeObserver(prometheus::Histogram* h);
+ ~TimeObserver();
+
+ private:
+ Timer timer_;
+ prometheus::Histogram* histogram_;
+};
+
+} // namespace kvc2
\ No newline at end of file
diff --git a/csrc/balance_serve/kvc2/src/model_config.h b/csrc/balance_serve/kvc2/src/model_config.h
new file mode 100644
index 0000000..7ad1d90
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/model_config.h
@@ -0,0 +1,103 @@
+#ifndef __MODEL_CONFIG_HPP_
+#define __MODEL_CONFIG_HPP_
+
+#include
+#include "nlohmann/json.hpp"
+
+#include
+#include
+
+using DimSize = size_t;
+using URL = std::string;
+using ModelName = std::string;
+
+// We must assure this can be load by config.json
+class ModelConfig {
+ public:
+ DimSize hidden_size;
+ DimSize intermediate_size;
+ size_t max_position_embeddings;
+ std::string model_type;
+ size_t num_attention_heads;
+ size_t num_hidden_layers;
+ size_t num_key_value_heads;
+ size_t vocab_size;
+
+ NLOHMANN_DEFINE_TYPE_INTRUSIVE(ModelConfig, hidden_size, intermediate_size, max_position_embeddings, model_type,
+ num_attention_heads, num_hidden_layers, num_key_value_heads, vocab_size);
+
+ void load_from(std::filesystem::path path) {
+ std::ifstream i(path);
+ nlohmann::json j;
+ i >> j;
+ *this = j.get();
+ }
+};
+
+using QuantType = std::string;
+static const QuantType NoQuantType = "";
+
+class QuantConfig {
+ public:
+ QuantType name;
+
+ // For GEMV
+ QuantType type_of_dot_vector = NoQuantType;
+ inline bool can_be_used_as_matrix() { return type_of_dot_vector != NoQuantType; }
+
+ bool can_be_used_as_vector;
+
+ double bytes_per_element;
+ bool has_scale;
+ bool has_min;
+
+ size_t block_element_count;
+ size_t block_element_size;
+
+ URL reference = "";
+
+ NLOHMANN_DEFINE_TYPE_INTRUSIVE_WITH_DEFAULT(QuantConfig, name, type_of_dot_vector, can_be_used_as_vector,
+ bytes_per_element, has_scale, has_min, block_element_count,
+ block_element_size, reference);
+};
+
+inline std::map quant_configs;
+inline std::map model_configs;
+
+inline void load_quant_configs(std::filesystem::path path) {
+ std::cout << __FUNCTION__ << " from " << path << std::endl;
+ std::ifstream i(path);
+ nlohmann::json j;
+ i >> j;
+ quant_configs = j.get>();
+ std::cout << "Loaded Quant Configs" << std::endl;
+ for (auto& [k, v] : quant_configs) {
+ std::cout << " - " << k << std::endl;
+ }
+}
+
+inline void dump_quant_configs(std::filesystem::path path) {
+ std::ofstream o(path);
+ nlohmann::json j = quant_configs;
+ o << j.dump(4);
+}
+
+inline void load_model_configs(std::filesystem::path path) {
+ std::cout << __FUNCTION__ << " from " << path << std::endl;
+ std::ifstream i(path);
+ nlohmann::json j;
+ i >> j;
+ model_configs = j.get>();
+ std::cout << "Loaded Model Configs" << std::endl;
+ for (auto& [k, v] : model_configs) {
+ std::cout << " - " << k << std::endl;
+ }
+}
+
+inline void dump_model_configs(std::filesystem::path path) {
+ std::ofstream o(path);
+ nlohmann::json j = model_configs;
+ o << j.dump(4);
+}
+
+#endif
\ No newline at end of file
diff --git a/csrc/balance_serve/kvc2/src/page_aligned_memory_pool.cpp b/csrc/balance_serve/kvc2/src/page_aligned_memory_pool.cpp
new file mode 100644
index 0000000..d70ed2e
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/page_aligned_memory_pool.cpp
@@ -0,0 +1,123 @@
+#include "page_aligned_memory_pool.h"
+
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_DEBUG
+#define FMT_HEADER_ONLY
+#include "spdlog/spdlog.h"
+
+#include "utils/arithmetic.hpp"
+#include "utils/easy_format.hpp"
+
+/// 构造函数
+PageAlignedMemoryPool::PageAlignedMemoryPool(size_t size_in_bytes) {
+ total_size = (size_in_bytes / PageSize) * PageSize;
+ // 对齐分配。C++17 对齐方式写法,如果编译器不支持可以改用其它方法
+ data = ::operator new[](total_size, std::align_val_t(PageSize));
+ total_pages = total_size / PageSize;
+
+ assert(total_pages >= Blocks);
+ page_per_block = total_pages / Blocks;
+
+ for (size_t block_index = 0; block_index < Blocks; block_index ++) {
+ first_page[block_index] = reinterpret_cast(reinterpret_cast(data) + static_cast(block_index) * page_per_block * PageSize);
+ count_page[block_index] =
+ block_index == Blocks - 1 ? (total_pages - page_per_block * (Blocks - 1)) : page_per_block;
+ SPDLOG_DEBUG("first_page[{}] = {}, count_page[{}] = {}",
+ block_index, reinterpret_cast(first_page[block_index]) - reinterpret_cast(data),
+ block_index, count_page[block_index]);
+ bitmap[block_index].resize(count_page[block_index], 0);
+ }
+ SPDLOG_INFO("PageAlignedMemoryPool with size {} Mbytes, {} pages", total_size / (1 << 20), page_count());
+}
+
+/// 析构函数
+PageAlignedMemoryPool::~PageAlignedMemoryPool() {
+ if (data) {
+ // 注意:需要与分配时的对齐方式对应
+ ::operator delete[](data, std::align_val_t(PageSize));
+ data = nullptr;
+ }
+}
+
+/// 返回总页数
+size_t PageAlignedMemoryPool::page_count() {
+ return total_size / PageSize;
+}
+
+/// 返回按整页对齐后的字节数
+size_t PageAlignedMemoryPool::page_padded_size(size_t size) {
+ return div_up(size, PageSize) * PageSize;
+}
+
+void* PageAlignedMemoryPool::alloc_in_block(size_t block_index, size_t alloc_size) {
+ std::lock_guard guard(lock[block_index]);
+ size_t free_pages = 0;
+ for (size_t i = 0; i < count_page[block_index]; i++) {
+ if (bitmap[block_index][i] == 0) {
+ free_pages ++;
+ if (free_pages == alloc_size) {
+ size_t page_index = i + 1 - free_pages;
+ for (size_t page = page_index; page < page_index + alloc_size; page++) {
+ bitmap[block_index][page] = 1;
+ // SPDLOG_DEBUG("alloc page {} in block {}", page, block_index);
+ }
+ return reinterpret_cast(reinterpret_cast(first_page[block_index]) + page_index * PageSize);
+ }
+ } else {
+ free_pages = 0;
+ }
+ }
+ return nullptr;
+}
+
+/// 分配函数
+void* PageAlignedMemoryPool::alloc(size_t size) {
+ size_t alloc_size = div_up(size, PageSize);
+ auto cnt = now_block.fetch_add(1, std::memory_order_relaxed);
+ for (size_t i = 0; i < Blocks; i ++) {
+ auto result = alloc_in_block((i + cnt) % Blocks, alloc_size);
+ if (result != nullptr) {
+ allocated.fetch_add(alloc_size * PageSize, std::memory_order_relaxed);
+ alloc_count.fetch_add(1, std::memory_order_relaxed);
+ return result;
+ }
+ }
+ return nullptr;
+}
+
+/// 释放函数
+void PageAlignedMemoryPool::free(void* p, size_t size) {
+ auto alloc_size = div_up(size, PageSize);
+ size_t block_index = (reinterpret_cast(p) - reinterpret_cast(data)) / page_per_block / PageSize;
+ size_t page_index = (reinterpret_cast(p) - reinterpret_cast(first_page[block_index])) / PageSize;
+
+ std::lock_guard guard(lock[block_index]);
+
+ for (size_t page = page_index; page < page_index + alloc_size; page++)
+ bitmap[block_index][page] = 0;
+
+ allocated.fetch_sub(alloc_size * PageSize, std::memory_order_relaxed);
+ free_count.fetch_add(1, std::memory_order_relaxed);
+}
+// TODO: too slow
+std::vector PageAlignedMemoryPool::alloc_multiple(size_t size, size_t count) {
+ std::vector result;
+ for (size_t i = 0; i < count; i++) {
+ auto p = alloc(size);
+ if (p == nullptr) {
+ for (auto ptr : result) {
+ free(ptr, size);
+ }
+ return {};
+ }
+ result.push_back(p);
+ }
+ return result;
+}
+
+void PageAlignedMemoryPool::defragment() {}
+
+/// 调试打印
+std::string PageAlignedMemoryPool::debug() {
+ return fmt::format("PageAlignedMemoryPool: total_size: {}MB, allocated: {}, alloc/free count: {}/{}\n",
+ readable_number(total_size), readable_number(size_t(allocated)), size_t(alloc_count), size_t(free_count));
+}
diff --git a/csrc/balance_serve/kvc2/src/page_aligned_memory_pool.h b/csrc/balance_serve/kvc2/src/page_aligned_memory_pool.h
new file mode 100644
index 0000000..c65a740
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/page_aligned_memory_pool.h
@@ -0,0 +1,53 @@
+#pragma once
+
+#include // std::sort
+#include // size_t
+#include // std::mutex
+#include
+#include
+#include
+#include
+
+constexpr size_t PageSize = 4096;
+
+/// PageAlignedMemoryPool 类的声明
+struct PageAlignedMemoryPool {
+ private:
+ constexpr static size_t Blocks = 16;
+
+ void* data = nullptr;
+
+ size_t total_size = 0, total_pages = 0;
+
+ std::atomic_size_t now_block = 0;
+ std::atomic_size_t allocated = 0; // allocated_size
+ std::atomic_size_t alloc_count = 0;
+ std::atomic_size_t free_count = 0;
+
+ std::mutex lock[Blocks];
+ size_t page_per_block = 0;
+ void *first_page[Blocks];
+ size_t count_page[Blocks];
+ std::vector bitmap[Blocks];
+ void* alloc_in_block(size_t block_index, size_t alloc_size);
+ public:
+ /// 构造函数和析构函数
+ explicit PageAlignedMemoryPool(size_t size_in_bytes);
+ ~PageAlignedMemoryPool();
+
+ /// 禁用拷贝和移动
+ PageAlignedMemoryPool(PageAlignedMemoryPool&& other) = delete;
+ PageAlignedMemoryPool& operator=(PageAlignedMemoryPool&& other) = delete;
+ PageAlignedMemoryPool(const PageAlignedMemoryPool& other) = delete;
+ PageAlignedMemoryPool& operator=(const PageAlignedMemoryPool& other) = delete;
+
+ /// 成员函数
+ size_t page_count();
+ size_t page_padded_size(size_t size);
+
+ void* alloc(size_t size);
+ std::vector alloc_multiple(size_t size, size_t count);
+ void free(void* data, size_t size);
+ void defragment();
+ std::string debug();
+};
diff --git a/csrc/balance_serve/kvc2/src/prefix.cpp b/csrc/balance_serve/kvc2/src/prefix.cpp
new file mode 100644
index 0000000..add1cd4
--- /dev/null
+++ b/csrc/balance_serve/kvc2/src/prefix.cpp
@@ -0,0 +1,1746 @@
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include