diff --git a/src/logid/util/worker_thread.cpp b/src/logid/util/worker_thread.cpp index 116e991..81dda91 100644 --- a/src/logid/util/worker_thread.cpp +++ b/src/logid/util/worker_thread.cpp @@ -43,7 +43,7 @@ worker_thread::~worker_thread() } } -void worker_thread::queue(std::shared_ptr t) +void worker_thread::queue(const std::shared_ptr& t) { _queue.push(t); _queue_cv.notify_all(); @@ -74,6 +74,7 @@ void worker_thread::_run() _queue.front()->run(); _queue.pop(); } + _parent->notifyFree(); } } diff --git a/src/logid/util/worker_thread.h b/src/logid/util/worker_thread.h index 0c252e1..0dd53ca 100644 --- a/src/logid/util/worker_thread.h +++ b/src/logid/util/worker_thread.h @@ -32,7 +32,7 @@ namespace logid worker_thread(workqueue* parent, std::size_t worker_number); ~worker_thread(); - void queue(std::shared_ptr t); + void queue(const std::shared_ptr& t); bool busy(); private: @@ -40,7 +40,7 @@ namespace logid void _exception_handler(std::exception& e); workqueue* _parent; - std::size_t _worker_number; + const std::size_t _worker_number; std::mutex _run_lock; std::atomic _continue_run; diff --git a/src/logid/util/workqueue.cpp b/src/logid/util/workqueue.cpp index 2bf409d..44fb347 100644 --- a/src/logid/util/workqueue.cpp +++ b/src/logid/util/workqueue.cpp @@ -17,6 +17,7 @@ */ #include #include "workqueue.h" +#include "worker_thread.h" #include "log.h" using namespace logid; @@ -47,7 +48,7 @@ workqueue::~workqueue() } } -void workqueue::queue(std::shared_ptr t) +void workqueue::queue(const std::shared_ptr& t) { assert(t != nullptr); _queue.push(t); @@ -70,6 +71,11 @@ std::size_t workqueue::threadCount() const return _workers.size(); } +void workqueue::notifyFree() +{ + _busy_cv.notify_all(); +} + void workqueue::_run() { using namespace std::chrono_literals; @@ -77,6 +83,7 @@ void workqueue::_run() std::unique_lock lock(_run_lock); _continue_run = true; while(_continue_run) { + bool queued = false; _queue_cv.wait(lock, [this]{ return !(_queue.empty()); }); while(!_queue.empty()) { if(_workers.empty()) { @@ -88,33 +95,29 @@ void workqueue::_run() continue; } - auto worker = _workers.begin(); - for(; worker != _workers.end(); worker++) { - if(!(*worker)->busy()) + for(auto& worker : _workers) { + if(!worker->busy()) { + worker->queue(_queue.front()); + queued = true; break; + } } - if(worker != _workers.end()) - (*worker)->queue(_queue.front()); - else { - _busy_cv.wait_for(lock, 500ms, [this, &worker]{ - for(worker = _workers.begin(); worker != _workers.end(); - worker++) { - if (!(*worker)->busy()) { - return true; + if(!queued) { + if(_busy_cv.wait_for(lock, 500ms) == std::cv_status::no_timeout) { + for(auto& worker : _workers) { + if(!worker->busy()) { + worker->queue(_queue.front()); + break; } } - return false; - }); - - if(worker != _workers.end()) - (*worker)->queue(_queue.front()); - else{ + } else{ // Workers busy, launch in new thread logPrintf(DEBUG, "All workers were busy for 500ms, " "running task in new thread."); thread::spawn([t = _queue.front()]() { t->run(); }); } } + _queue.pop(); } } diff --git a/src/logid/util/workqueue.h b/src/logid/util/workqueue.h index b8191fd..39cb891 100644 --- a/src/logid/util/workqueue.h +++ b/src/logid/util/workqueue.h @@ -18,24 +18,30 @@ #ifndef LOGID_WORKQUEUE_H #define LOGID_WORKQUEUE_H -#include "worker_thread.h" +#include "mutex_queue.h" +#include "task.h" #include "thread.h" namespace logid { + class worker_thread; + class workqueue { public: explicit workqueue(std::size_t thread_count); ~workqueue(); - void queue(std::shared_ptr t); + void queue(const std::shared_ptr& t); void busyUpdate(); void stop(); - std::size_t threadCount() const; + [[nodiscard]] std::size_t threadCount() const; + + friend class worker_thread; + void notifyFree(); private: void _run();