Merge commit '947538acb8' into concedo_experimental

# Conflicts:
#	.github/workflows/build.yml
#	.github/workflows/docker.yml
#	CMakePresets.json
#	examples/llama-bench/llama-bench.cpp
#	ggml/CMakeLists.txt
#	ggml/src/CMakeLists.txt
#	tests/test-backend-ops.cpp
#	tests/test-quantize-fns.cpp
This commit is contained in:
Concedo 2024-09-09 11:26:34 +08:00
commit 70cdb55cc9
29 changed files with 93418 additions and 92397 deletions

View file

@ -49,3 +49,12 @@ There are 2 modes of operation:
| 128 | 256 | 8 | 3072 | 0.751 | 1363.92 | 15.110 | 135.54 | 15.861 | 193.69 |
| 128 | 256 | 16 | 6144 | 1.569 | 1304.93 | 18.073 | 226.64 | 19.642 | 312.80 |
| 128 | 256 | 32 | 12288 | 3.409 | 1201.35 | 19.223 | 426.15 | 22.633 | 542.93 |
### JSONL output
Pass `--output-format jsonl` to output JSONL instead of Markdown, á la
```json lines
{"n_kv_max": 2048, "n_batch": 2048, "n_ubatch": 512, "flash_attn": 0, "is_pp_shared": 0, "n_gpu_layers": 99, "n_threads": 8, "n_threads_batch": 8, "pp": 128, "tg": 128, "pl": 1, "n_kv": 256, "t_pp": 0.233810, "speed_pp": 547.453064, "t_tg": 3.503684, "speed_tg": 36.532974, "t": 3.737494, "speed": 68.495094}
{"n_kv_max": 2048, "n_batch": 2048, "n_ubatch": 512, "flash_attn": 0, "is_pp_shared": 0, "n_gpu_layers": 99, "n_threads": 8, "n_threads_batch": 8, "pp": 128, "tg": 128, "pl": 2, "n_kv": 512, "t_pp": 0.422602, "speed_pp": 605.770935, "t_tg": 11.106112, "speed_tg": 23.050371, "t": 11.528713, "speed": 44.410854}
```

View file

@ -122,12 +122,13 @@ int main(int argc, char ** argv) {
}
}
LOG_TEE("\n");
LOG_TEE("%s: n_kv_max = %d, n_batch = %d, n_ubatch = %d, flash_attn = %d, is_pp_shared = %d, n_gpu_layers = %d, n_threads = %u, n_threads_batch = %u\n", __func__, n_kv_max, params.n_batch, params.n_ubatch, params.flash_attn, params.is_pp_shared, params.n_gpu_layers, ctx_params.n_threads, ctx_params.n_threads_batch);
LOG_TEE("\n");
LOG_TEE("|%6s | %6s | %4s | %6s | %8s | %8s | %8s | %8s | %8s | %8s |\n", "PP", "TG", "B", "N_KV", "T_PP s", "S_PP t/s", "T_TG s", "S_TG t/s", "T s", "S t/s");
LOG_TEE("|%6s-|-%6s-|-%4s-|-%6s-|-%8s-|-%8s-|-%8s-|-%8s-|-%8s-|-%8s-|\n", "------", "------", "----", "------", "--------", "--------", "--------", "--------", "--------", "--------");
if (!params.batched_bench_output_jsonl) {
LOG_TEE("\n");
LOG_TEE("%s: n_kv_max = %d, n_batch = %d, n_ubatch = %d, flash_attn = %d, is_pp_shared = %d, n_gpu_layers = %d, n_threads = %u, n_threads_batch = %u\n", __func__, n_kv_max, params.n_batch, params.n_ubatch, params.flash_attn, params.is_pp_shared, params.n_gpu_layers, ctx_params.n_threads, ctx_params.n_threads_batch);
LOG_TEE("\n");
LOG_TEE("|%6s | %6s | %4s | %6s | %8s | %8s | %8s | %8s | %8s | %8s |\n", "PP", "TG", "B", "N_KV", "T_PP s", "S_PP t/s", "T_TG s", "S_TG t/s", "T s", "S t/s");
LOG_TEE("|%6s-|-%6s-|-%4s-|-%6s-|-%8s-|-%8s-|-%8s-|-%8s-|-%8s-|-%8s-|\n", "------", "------", "----", "------", "--------", "--------", "--------", "--------", "--------", "--------");
}
for ( int i_pp = 0; i_pp < (int) n_pp.size(); ++i_pp) {
for ( int i_tg = 0; i_tg < (int) n_tg.size(); ++i_tg) {
@ -195,7 +196,16 @@ int main(int argc, char ** argv) {
const float speed_tg = pl*tg / t_tg;
const float speed = n_kv / t;
LOG_TEE("|%6d | %6d | %4d | %6d | %8.3f | %8.2f | %8.3f | %8.2f | %8.3f | %8.2f |\n", pp, tg, pl, n_kv, t_pp, speed_pp, t_tg, speed_tg, t, speed);
if(params.batched_bench_output_jsonl) {
LOG_TEE(
"{\"n_kv_max\": %d, \"n_batch\": %d, \"n_ubatch\": %d, \"flash_attn\": %d, \"is_pp_shared\": %d, \"n_gpu_layers\": %d, \"n_threads\": %u, \"n_threads_batch\": %u, "
"\"pp\": %d, \"tg\": %d, \"pl\": %d, \"n_kv\": %d, \"t_pp\": %f, \"speed_pp\": %f, \"t_tg\": %f, \"speed_tg\": %f, \"t\": %f, \"speed\": %f}\n",
n_kv_max, params.n_batch, params.n_ubatch, params.flash_attn, params.is_pp_shared, params.n_gpu_layers, ctx_params.n_threads, ctx_params.n_threads_batch,
pp, tg, pl, n_kv, t_pp, speed_pp, t_tg, speed_tg, t, speed
);
} else {
LOG_TEE("|%6d | %6d | %4d | %6d | %8.3f | %8.2f | %8.3f | %8.2f | %8.3f | %8.2f |\n", pp, tg, pl, n_kv, t_pp, speed_pp, t_tg, speed_tg, t, speed);
}
}
}
}

View file

@ -27,6 +27,8 @@ static const std::vector<struct quant_option> QUANT_OPTIONS = {
{ "IQ2_M", LLAMA_FTYPE_MOSTLY_IQ2_M, " 2.7 bpw quantization", },
{ "IQ1_S", LLAMA_FTYPE_MOSTLY_IQ1_S, " 1.56 bpw quantization", },
{ "IQ1_M", LLAMA_FTYPE_MOSTLY_IQ1_M, " 1.75 bpw quantization", },
{ "TQ1_0", LLAMA_FTYPE_MOSTLY_TQ1_0, " 1.69 bpw ternarization", },
{ "TQ2_0", LLAMA_FTYPE_MOSTLY_TQ2_0, " 2.06 bpw ternarization", },
{ "Q2_K", LLAMA_FTYPE_MOSTLY_Q2_K, " 2.96G, +3.5199 ppl @ Llama-3-8B", },
{ "Q2_K_S", LLAMA_FTYPE_MOSTLY_Q2_K_S, " 2.96G, +3.1836 ppl @ Llama-3-8B", },
{ "IQ3_XXS", LLAMA_FTYPE_MOSTLY_IQ3_XXS, " 3.06 bpw quantization", },

View file

@ -51,15 +51,12 @@ enum stop_type {
STOP_TYPE_PARTIAL,
};
// state diagram: https://github.com/ggerganov/llama.cpp/pull/9283
enum slot_state {
SLOT_STATE_IDLE,
SLOT_STATE_PROCESSING,
};
enum slot_command {
SLOT_COMMAND_NONE,
SLOT_COMMAND_LOAD_PROMPT,
SLOT_COMMAND_RELEASE,
SLOT_STATE_PROCESSING_PROMPT,
SLOT_STATE_DONE_PROMPT,
SLOT_STATE_GENERATING,
};
enum server_state {
@ -136,7 +133,6 @@ struct server_slot {
struct slot_params params;
slot_state state = SLOT_STATE_IDLE;
slot_command command = SLOT_COMMAND_NONE;
// used to determine the slot that has been used the longest
int64_t t_last_used = -1;
@ -195,6 +191,8 @@ struct server_slot {
double t_prompt_processing; // ms
double t_token_generation; // ms
std::function<void(int)> callback_on_release;
void reset() {
n_prompt_tokens = 0;
generated_text = "";
@ -229,25 +227,28 @@ struct server_slot {
return n_remaining > 0; // no budget
}
bool available() const {
return state == SLOT_STATE_IDLE && command == SLOT_COMMAND_NONE;
}
bool is_processing() const {
return (state == SLOT_STATE_IDLE && command == SLOT_COMMAND_LOAD_PROMPT) || state == SLOT_STATE_PROCESSING;
return state != SLOT_STATE_IDLE;
}
void add_token_string(const completion_token_output & token) {
if (command == SLOT_COMMAND_RELEASE) {
if (!is_processing()) {
return;
}
generated_token_probs.push_back(token);
}
void release() {
if (state == SLOT_STATE_PROCESSING) {
if (is_processing()) {
t_token_generation = (ggml_time_us() - t_start_generation) / 1e3;
command = SLOT_COMMAND_RELEASE;
state = SLOT_STATE_IDLE;
LOG_INFO("slot released", {
{"id_slot", id},
{"id_task", id_task},
{"n_past", n_past},
{"truncated", truncated},
});
callback_on_release(id);
}
}
@ -354,6 +355,9 @@ struct server_metrics {
uint64_t n_tokens_predicted = 0;
uint64_t t_tokens_generation = 0;
uint64_t n_decode_total = 0;
uint64_t n_busy_slots_total = 0;
void init() {
t_start = ggml_time_us();
}
@ -372,6 +376,15 @@ struct server_metrics {
t_tokens_generation_total += slot.t_token_generation;
}
void on_decoded(const std::vector<server_slot> & slots) {
n_decode_total++;
for (const auto & slot : slots) {
if (slot.is_processing()) {
n_busy_slots_total++;
}
}
}
void reset_bucket() {
n_prompt_tokens_processed = 0;
t_prompt_processing = 0;
@ -413,6 +426,7 @@ struct server_queue {
// multi-task version of post()
int post(std::vector<server_task> & tasks, bool front = false) {
std::unique_lock<std::mutex> lock(mutex_tasks);
for (auto & task : tasks) {
if (task.id == -1) {
task.id = id++;
@ -432,6 +446,7 @@ struct server_queue {
void defer(server_task task) {
std::unique_lock<std::mutex> lock(mutex_tasks);
queue_tasks_deferred.push_back(std::move(task));
condition_tasks.notify_one();
}
// Get the next id for creating a new task
@ -452,14 +467,14 @@ struct server_queue {
callback_update_slots = std::move(callback);
}
// Call when the state of one slot is changed
void notify_slot_changed() {
// move deferred tasks back to main loop
// Call when the state of one slot is changed, it will move one task from deferred to main queue
void pop_deferred_task() {
std::unique_lock<std::mutex> lock(mutex_tasks);
for (auto & task : queue_tasks_deferred) {
queue_tasks.push_back(std::move(task));
if (!queue_tasks_deferred.empty()) {
queue_tasks.emplace_back(std::move(queue_tasks_deferred.front()));
queue_tasks_deferred.pop_front();
}
queue_tasks_deferred.clear();
condition_tasks.notify_one();
}
// end the start_loop routine
@ -489,7 +504,7 @@ struct server_queue {
break;
}
server_task task = queue_tasks.front();
queue_tasks.erase(queue_tasks.begin());
queue_tasks.pop_front();
lock.unlock();
LOG_VERBOSE("callback_new_task", {{"id_task", task.id}});
callback_new_task(task);
@ -717,6 +732,10 @@ struct server_context {
slot.sparams = params.sparams;
slot.callback_on_release = [this](int) {
queue_tasks.pop_deferred_task();
};
slot.reset();
slots.push_back(slot);
@ -798,7 +817,7 @@ struct server_context {
for (server_slot & slot : slots) {
// skip the slot if it is not available
if (!slot.available()) {
if (slot.is_processing()) {
continue;
}
@ -840,7 +859,7 @@ struct server_context {
int64_t t_last = ggml_time_us();
for (server_slot & slot : slots) {
// skip the slot if it is not available
if (!slot.available()) {
if (slot.is_processing()) {
continue;
}
@ -1078,7 +1097,7 @@ struct server_context {
}
}
slot.command = SLOT_COMMAND_LOAD_PROMPT;
slot.state = SLOT_STATE_PROCESSING_PROMPT;
slot.prompt_tokens.clear();
LOG_INFO("slot is processing task", {
@ -1622,7 +1641,7 @@ struct server_context {
queue_tasks.defer(task);
break;
}
if (!slot->available()) {
if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later
LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}});
queue_tasks.defer(task);
@ -1728,6 +1747,9 @@ struct server_context {
{ "n_tokens_predicted", metrics.n_tokens_predicted},
{ "t_tokens_generation", metrics.t_tokens_generation},
{ "n_decode_total", metrics.n_decode_total},
{ "n_busy_slots_total", metrics.n_busy_slots_total},
{ "kv_cache_tokens_count", llama_get_kv_cache_token_count(ctx)},
{ "kv_cache_used_cells", llama_get_kv_cache_used_cells(ctx)},
@ -1747,7 +1769,7 @@ struct server_context {
send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST);
break;
}
if (!slot->available()) {
if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later
LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}});
queue_tasks.defer(task);
@ -1788,7 +1810,7 @@ struct server_context {
send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST);
break;
}
if (!slot->available()) {
if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later
LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}});
queue_tasks.defer(task);
@ -1836,7 +1858,7 @@ struct server_context {
send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST);
break;
}
if (!slot->available()) {
if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later
LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}});
queue_tasks.defer(task);
@ -1876,33 +1898,12 @@ struct server_context {
system_prompt_update();
}
// release slots
for (auto & slot : slots) {
if (slot.command == SLOT_COMMAND_RELEASE) {
slot.state = SLOT_STATE_IDLE;
slot.command = SLOT_COMMAND_NONE;
slot.t_last_used = ggml_time_us();
LOG_INFO("slot released", {
{"id_slot", slot.id},
{"id_task", slot.id_task},
{"n_ctx", n_ctx},
{"n_past", slot.n_past},
{"n_system_tokens", system_tokens.size()},
{"n_cache_tokens", slot.cache_tokens.size()},
{"truncated", slot.truncated}
});
queue_tasks.notify_slot_changed();
}
}
// check if all slots are idle
{
bool all_idle = true;
for (auto & slot : slots) {
if (slot.state != SLOT_STATE_IDLE || slot.command != SLOT_COMMAND_NONE) {
if (slot.is_processing()) {
all_idle = false;
break;
}
@ -1973,7 +1974,7 @@ struct server_context {
// frist, add sampled tokens from any ongoing sequences
for (auto & slot : slots) {
if (slot.state == SLOT_STATE_IDLE) {
if (slot.state != SLOT_STATE_GENERATING) {
continue;
}
@ -2015,7 +2016,7 @@ struct server_context {
if (params.cont_batching || batch.n_tokens == 0) {
for (auto & slot : slots) {
// this slot still has a prompt to be processed
if (slot.state == SLOT_STATE_IDLE && slot.command == SLOT_COMMAND_LOAD_PROMPT) {
if (slot.state == SLOT_STATE_PROCESSING_PROMPT) {
auto & prompt_tokens = slot.prompt_tokens;
// we haven't tokenized the prompt yet - do it now:
@ -2083,8 +2084,6 @@ struct server_context {
{"id_task", slot.id_task}
});
slot.state = SLOT_STATE_PROCESSING;
slot.command = SLOT_COMMAND_NONE;
slot.release();
slot.print_timings();
send_final_response(slot);
@ -2094,8 +2093,6 @@ struct server_context {
if (slot.cmpl_type == SERVER_TASK_CMPL_TYPE_EMBEDDING) {
// this prompt is too large to process - discard it
if (slot.n_prompt_tokens > n_ubatch) {
slot.state = SLOT_STATE_PROCESSING;
slot.command = SLOT_COMMAND_NONE;
slot.release();
send_error(slot, "input is too large to process. increase the physical batch size", ERROR_TYPE_SERVER);
continue;
@ -2253,10 +2250,9 @@ struct server_context {
{"progress", (float) slot.n_prompt_tokens_processed / slot.n_prompt_tokens},
});
// entire prompt has been processed - start decoding new tokens
// entire prompt has been processed
if (slot.n_past == slot.n_prompt_tokens) {
slot.state = SLOT_STATE_PROCESSING;
slot.command = SLOT_COMMAND_NONE;
slot.state = SLOT_STATE_DONE_PROMPT;
GGML_ASSERT(batch.n_tokens > 0);
@ -2338,18 +2334,17 @@ struct server_context {
};
const int ret = llama_decode(ctx, batch_view);
metrics.on_decoded(slots);
if (ret != 0) {
if (n_batch == 1 || ret < 0) {
// if you get here, it means the KV cache is full - try increasing it via the context size
LOG_ERROR("failed to decode the batch: KV cache is full - try increasing it via the context size", {
{"i", i},
{"n_batch", ret},
{"ret", ret},
{"i", i},
{"n_batch", n_batch},
{"ret", ret},
});
for (auto & slot : slots) {
slot.state = SLOT_STATE_PROCESSING;
slot.command = SLOT_COMMAND_NONE;
slot.release();
send_error(slot, "Input prompt is too big compared to KV size. Please try increasing KV size.");
}
@ -2361,24 +2356,31 @@ struct server_context {
i -= n_batch;
LOG_WARNING("failed to find free space in the KV cache, retrying with smaller batch size - try increasing it via the context size or enable defragmentation", {
{"i", i},
{"n_batch", n_batch},
{"ret", ret},
{"i", i},
{"n_batch", n_batch},
{"ret", ret},
});
continue; // continue loop of n_batch
}
for (auto & slot : slots) {
if (slot.state != SLOT_STATE_PROCESSING || slot.i_batch < (int) i || slot.i_batch >= (int) (i + n_tokens)) {
if (slot.i_batch < (int) i || slot.i_batch >= (int) (i + n_tokens)) {
continue; // continue loop of slots
}
// prompt evaluated for embedding
if (slot.cmpl_type == SERVER_TASK_CMPL_TYPE_EMBEDDING) {
send_embedding(slot, batch_view);
slot.release();
slot.i_batch = -1;
if (slot.state == SLOT_STATE_DONE_PROMPT) {
if (slot.cmpl_type == SERVER_TASK_CMPL_TYPE_EMBEDDING) {
// prompt evaluated for embedding
send_embedding(slot, batch_view);
slot.release();
slot.i_batch = -1;
continue; // continue loop of slots
} else {
// prompt evaluated for next-token prediction
slot.state = SLOT_STATE_GENERATING;
}
} else if (slot.state != SLOT_STATE_GENERATING) {
continue; // continue loop of slots
}
@ -2425,6 +2427,7 @@ struct server_context {
}
if (!process_token(result, slot)) {
// release slot because of stop condition
slot.release();
slot.print_timings();
send_final_response(slot);
@ -2705,7 +2708,7 @@ int main(int argc, char ** argv) {
task.type = SERVER_TASK_TYPE_METRICS;
ctx_server.queue_results.add_waiting_task_id(task.id);
ctx_server.queue_tasks.post(task);
ctx_server.queue_tasks.post(task, true); // high-priority task
// get the result
server_task_result result = ctx_server.queue_results.recv(task.id);
@ -2737,7 +2740,7 @@ int main(int argc, char ** argv) {
task.data.push_back({{"reset_bucket", true}});
ctx_server.queue_results.add_waiting_task_id(task.id);
ctx_server.queue_tasks.post(task);
ctx_server.queue_tasks.post(task, true); // high-priority task
// get the result
server_task_result result = ctx_server.queue_results.recv(task.id);
@ -2751,6 +2754,9 @@ int main(int argc, char ** argv) {
const uint64_t n_tokens_predicted = data.at("n_tokens_predicted");
const uint64_t t_tokens_generation = data.at("t_tokens_generation");
const uint64_t n_decode_total = data.at("n_decode_total");
const uint64_t n_busy_slots_total = data.at("n_busy_slots_total");
const int32_t kv_cache_used_cells = data.at("kv_cache_used_cells");
// metrics definition: https://prometheus.io/docs/practices/naming/#metric-names
@ -2771,6 +2777,14 @@ int main(int argc, char ** argv) {
{"name", "tokens_predicted_seconds_total"},
{"help", "Predict process time"},
{"value", (uint64_t) data.at("t_tokens_generation_total") / 1.e3}
}, {
{"name", "n_decode_total"},
{"help", "Total number of llama_decode() calls"},
{"value", n_decode_total}
}, {
{"name", "n_busy_slots_per_decode"},
{"help", "Average number of busy slots per llama_decode() call"},
{"value", (float) n_busy_slots_total / (float) n_decode_total}
}}},
{"gauge", {{
{"name", "prompt_tokens_seconds"},
@ -2837,7 +2851,7 @@ int main(int argc, char ** argv) {
task.data = {
{ "id_slot", id_slot },
{ "filename", filename },
{ "filepath", filepath }
{ "filepath", filepath },
};
const int id_task = ctx_server.queue_tasks.post(task);
@ -2867,7 +2881,7 @@ int main(int argc, char ** argv) {
task.data = {
{ "id_slot", id_slot },
{ "filename", filename },
{ "filepath", filepath }
{ "filepath", filepath },
};
const int id_task = ctx_server.queue_tasks.post(task);
@ -2945,7 +2959,7 @@ int main(int argc, char ** argv) {
{ "system_prompt", ctx_server.system_prompt.c_str() },
{ "default_generation_settings", ctx_server.default_generation_settings_for_props },
{ "total_slots", ctx_server.params.n_parallel },
{ "chat_template", curr_tmpl.c_str() }
{ "chat_template", curr_tmpl.c_str() },
};
res_ok(res, data);
@ -3056,13 +3070,13 @@ int main(int argc, char ** argv) {
json models = {
{"object", "list"},
{"data", {
{
{"id", params.model_alias},
{"object", "model"},
{"created", std::time(0)},
{"owned_by", "llamacpp"},
{"meta", ctx_server.model_meta()}
},
{
{"id", params.model_alias},
{"object", "model"},
{"created", std::time(0)},
{"owned_by", "llamacpp"},
{"meta", ctx_server.model_meta()}
},
}}
};

View file

@ -77,6 +77,35 @@ Feature: Parallel
| disabled | 128 |
| enabled | 64 |
Scenario Outline: Multi users with number of prompts exceeding number of slots
Given a system prompt You are a writer.
And a model tinyllama-2
Given a prompt:
"""
Write a very long book.
"""
And a prompt:
"""
Write another a poem.
"""
And a prompt:
"""
What is LLM?
"""
And a prompt:
"""
The sky is blue and I love it.
"""
And <n_predict> max tokens to predict
And streaming is <streaming>
Given concurrent OAI completions requests
Then the server is busy
Then the server is idle
Then all prompts are predicted with <n_predict> tokens
Examples:
| streaming | n_predict |
| disabled | 128 |
| enabled | 64 |
Scenario: Multi users with total number of tokens to predict exceeds the KV Cache size #3969
Given a prompt:

View file

@ -15,6 +15,7 @@ Feature: Passkey / Self-extend with context shift
And <n_junk> as number of junk
And <n_predicted> server max tokens to predict
And 42 as seed
And 0.0 temperature
And <n_ctx> KV cache size
And 1 slots
And <n_ga> group attention factor to extend context size through self-extend
@ -22,7 +23,8 @@ Feature: Passkey / Self-extend with context shift
# Can be override with N_GPU_LAYERS
And <ngl> GPU offloaded layers
Then the server is starting
Then the server is healthy
# Higher timeout because the model may need to be downloaded from the internet
Then the server is healthy with timeout 120 seconds
Given available models
Then model 0 is trained on <n_ctx_train> tokens context
Given a prefix prompt:

View file

@ -202,17 +202,15 @@ def step_start_server(context):
time.sleep(0.1)
@step("the server is {expecting_status}")
@async_run_until_complete
async def step_wait_for_the_server_to_be_started(context, expecting_status: Literal['healthy', 'ready', 'idle', 'busy'] | str):
async def wait_for_server_status_with_timeout(context, expecting_status: Literal['healthy', 'ready', 'idle', 'busy'] | str, timeout: int):
match expecting_status:
case 'healthy':
await wait_for_slots_status(context, context.base_url, 200,
timeout=30)
timeout=timeout)
case 'ready' | 'idle':
await wait_for_slots_status(context, context.base_url, 200,
timeout=30,
timeout=timeout,
params={'fail_on_no_slot': 1},
slots_idle=context.n_slots,
slots_processing=0)
@ -225,6 +223,18 @@ async def step_wait_for_the_server_to_be_started(context, expecting_status: Lite
assert False, "unknown status"
@step("the server is {expecting_status} with timeout {timeout:d} seconds")
@async_run_until_complete
async def step_wait_for_server_status_with_timeout(context, expecting_status: Literal['healthy', 'ready', 'idle', 'busy'] | str, timeout: int):
await wait_for_server_status_with_timeout(context, expecting_status, timeout)
@step("the server is {expecting_status}")
@async_run_until_complete
async def step_wait_for_server_status(context, expecting_status: Literal['healthy', 'ready', 'idle', 'busy'] | str):
await wait_for_server_status_with_timeout(context, expecting_status, 30)
@step('all slots are {expected_slot_status_string}')
@async_run_until_complete
async def step_all_slots_status(context, expected_slot_status_string: Literal['idle', 'busy'] | str):