mirror of
https://github.com/kvcache-ai/ktransformers.git
synced 2026-04-28 03:39:48 +00:00
Some checks failed
Book-CI / test (push) Has been cancelled
Book-CI / test-1 (push) Has been cancelled
Book-CI / test-2 (push) Has been cancelled
Deploy / deploy (macos-latest) (push) Has been cancelled
Deploy / deploy (ubuntu-latest) (push) Has been cancelled
Deploy / deploy (windows-latest) (push) Has been cancelled
* initial fix for issue 1858 * [fix]: add done flag check to sync() wait predicate to prevent deadlock during destruction Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Ben Appleby <Ben.Appleby@microsoft.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
85 lines
No EOL
2.2 KiB
C++
85 lines
No EOL
2.2 KiB
C++
/**
|
|
* @Description :
|
|
* @Author : chenht2022
|
|
* @Date : 2024-07-17 12:25:51
|
|
* @Version : 1.0.0
|
|
* @LastEditors : chenht2022
|
|
* @LastEditTime : 2024-10-09 11:08:10
|
|
* @Copyright (c) 2024 by KVCache.AI, All Rights Reserved.
|
|
**/
|
|
#include "task_queue.h"
|
|
|
|
#include <pthread.h>
|
|
#include <sched.h>
|
|
|
|
#include <chrono>
|
|
#include <iostream>
|
|
#include <thread>
|
|
|
|
TaskQueue::TaskQueue() : done(false), pending(0) {
|
|
Node* dummy = new Node();
|
|
head.store(dummy, std::memory_order_relaxed);
|
|
tail.store(dummy, std::memory_order_relaxed);
|
|
workerThread = std::thread(&TaskQueue::worker, this);
|
|
}
|
|
|
|
TaskQueue::~TaskQueue() {
|
|
{
|
|
std::lock_guard<std::mutex> lock(mtx);
|
|
done.store(true, std::memory_order_release);
|
|
}
|
|
cv.notify_all();
|
|
if (workerThread.joinable()) workerThread.join();
|
|
|
|
Node* node = head.load(std::memory_order_relaxed);
|
|
while (node) {
|
|
Node* next = node->next.load(std::memory_order_relaxed);
|
|
delete node;
|
|
node = next;
|
|
}
|
|
}
|
|
|
|
void TaskQueue::enqueue(std::function<void()> task) {
|
|
pending.fetch_add(1, std::memory_order_acq_rel);
|
|
Node* node = new Node(task);
|
|
Node* prev = tail.exchange(node, std::memory_order_acq_rel);
|
|
prev->next.store(node, std::memory_order_release);
|
|
{
|
|
std::lock_guard<std::mutex> lock(mtx);
|
|
}
|
|
cv.notify_one();
|
|
}
|
|
|
|
void TaskQueue::sync(size_t allow_n_pending) {
|
|
std::unique_lock<std::mutex> lock(mtx);
|
|
cv.wait(lock, [&] {
|
|
return pending.load(std::memory_order_acquire) <= allow_n_pending
|
|
|| done.load(std::memory_order_acquire);
|
|
});
|
|
}
|
|
|
|
void TaskQueue::worker() {
|
|
Node* curr = head.load(std::memory_order_relaxed);
|
|
while (!done.load(std::memory_order_acquire)) {
|
|
Node* next = curr->next.load(std::memory_order_acquire);
|
|
if (next) {
|
|
if (next->task) {
|
|
next->task();
|
|
}
|
|
delete curr;
|
|
curr = next;
|
|
head.store(curr, std::memory_order_release);
|
|
{
|
|
std::lock_guard<std::mutex> lock(mtx);
|
|
pending.fetch_sub(1, std::memory_order_acq_rel);
|
|
}
|
|
cv.notify_all();
|
|
} else {
|
|
std::unique_lock<std::mutex> lock(mtx);
|
|
cv.wait(lock, [&] {
|
|
return curr->next.load(std::memory_order_acquire) != nullptr
|
|
|| done.load(std::memory_order_acquire);
|
|
});
|
|
}
|
|
}
|
|
} |