3 #include <geneial/utility/ThreadedExecutionManager.h>
5 geneial_private_namespace(geneial)
7 geneial_private_namespace(utility)
10 geneial_export_namespace
12 void ThreadedExecutionManager::executor()
14 std::deque<std::function<void()>> innerTask;
19 std::unique_lock<std::mutex> l(_mutex);
20 _condEntry.wait(l, [
this]()
22 return _finish || _tasks.size() != 0;
27 int i = _amountPerThread;
29 while (i-- && !_tasks.empty())
31 auto task = _tasks.front();
33 innerTask.emplace_back(task);
35 const size_t taskToProcess = innerTask.size();
36 _activeTasks += taskToProcess;
39 size_t processedTasks = 0;
40 for (
auto task : innerTask)
46 assert(processedTasks == taskToProcess);
48 _activeTasks -= processedTasks;
55 _condExit.notify_all();
59 void ThreadedExecutionManager::initializeThreads(
const unsigned int amountThreads)
61 for (
unsigned int i = 0; i < amountThreads; ++i)
63 _threads.emplace_back(
64 std::make_shared<std::thread>(&ThreadedExecutionManager::executor,
this));
68 void ThreadedExecutionManager::addTask(std::function<
void()>
const &task)
70 std::unique_lock < std::mutex > l(_mutex);
71 _tasks.emplace_back(task);
72 _condEntry.notify_one();
75 void ThreadedExecutionManager::waitForTasks()
77 unsigned int activeTasks = 0;
80 std::unique_lock < std::mutex > l(_mutex);
81 activeTasks = _tasks.size() + _activeTasks;
84 _condExit.wait(l, [
this]()
85 {
return _tasks.size() == 0 && _activeTasks == 0;});
87 }
while (activeTasks != 0);
90 void ThreadedExecutionManager::joinAll()
93 std::unique_lock < std::mutex > l(_mutex);
96 _condEntry.notify_all();
97 for (
auto t : _threads)