Properly handle full workqueue

This commit is contained in:
pixl 2022-01-08 16:29:17 -05:00
parent 918ea63755
commit 436729b07b
No known key found for this signature in database
GPG Key ID: 1866C148CD593B6E
4 changed files with 34 additions and 24 deletions

View File

@ -43,7 +43,7 @@ worker_thread::~worker_thread()
} }
} }
void worker_thread::queue(std::shared_ptr<task> t) void worker_thread::queue(const std::shared_ptr<task>& t)
{ {
_queue.push(t); _queue.push(t);
_queue_cv.notify_all(); _queue_cv.notify_all();
@ -74,6 +74,7 @@ void worker_thread::_run()
_queue.front()->run(); _queue.front()->run();
_queue.pop(); _queue.pop();
} }
_parent->notifyFree();
} }
} }

View File

@ -32,7 +32,7 @@ namespace logid
worker_thread(workqueue* parent, std::size_t worker_number); worker_thread(workqueue* parent, std::size_t worker_number);
~worker_thread(); ~worker_thread();
void queue(std::shared_ptr<task> t); void queue(const std::shared_ptr<task>& t);
bool busy(); bool busy();
private: private:
@ -40,7 +40,7 @@ namespace logid
void _exception_handler(std::exception& e); void _exception_handler(std::exception& e);
workqueue* _parent; workqueue* _parent;
std::size_t _worker_number; const std::size_t _worker_number;
std::mutex _run_lock; std::mutex _run_lock;
std::atomic<bool> _continue_run; std::atomic<bool> _continue_run;

View File

@ -17,6 +17,7 @@
*/ */
#include <cassert> #include <cassert>
#include "workqueue.h" #include "workqueue.h"
#include "worker_thread.h"
#include "log.h" #include "log.h"
using namespace logid; using namespace logid;
@ -47,7 +48,7 @@ workqueue::~workqueue()
} }
} }
void workqueue::queue(std::shared_ptr<task> t) void workqueue::queue(const std::shared_ptr<task>& t)
{ {
assert(t != nullptr); assert(t != nullptr);
_queue.push(t); _queue.push(t);
@ -70,6 +71,11 @@ std::size_t workqueue::threadCount() const
return _workers.size(); return _workers.size();
} }
void workqueue::notifyFree()
{
_busy_cv.notify_all();
}
void workqueue::_run() void workqueue::_run()
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
@ -77,6 +83,7 @@ void workqueue::_run()
std::unique_lock<std::mutex> lock(_run_lock); std::unique_lock<std::mutex> lock(_run_lock);
_continue_run = true; _continue_run = true;
while(_continue_run) { while(_continue_run) {
bool queued = false;
_queue_cv.wait(lock, [this]{ return !(_queue.empty()); }); _queue_cv.wait(lock, [this]{ return !(_queue.empty()); });
while(!_queue.empty()) { while(!_queue.empty()) {
if(_workers.empty()) { if(_workers.empty()) {
@ -88,33 +95,29 @@ void workqueue::_run()
continue; continue;
} }
auto worker = _workers.begin(); for(auto& worker : _workers) {
for(; worker != _workers.end(); worker++) { if(!worker->busy()) {
if(!(*worker)->busy()) worker->queue(_queue.front());
queued = true;
break; break;
}
} }
if(worker != _workers.end()) if(!queued) {
(*worker)->queue(_queue.front()); if(_busy_cv.wait_for(lock, 500ms) == std::cv_status::no_timeout) {
else { for(auto& worker : _workers) {
_busy_cv.wait_for(lock, 500ms, [this, &worker]{ if(!worker->busy()) {
for(worker = _workers.begin(); worker != _workers.end(); worker->queue(_queue.front());
worker++) { break;
if (!(*worker)->busy()) {
return true;
} }
} }
return false; } else{
});
if(worker != _workers.end())
(*worker)->queue(_queue.front());
else{
// Workers busy, launch in new thread // Workers busy, launch in new thread
logPrintf(DEBUG, "All workers were busy for 500ms, " logPrintf(DEBUG, "All workers were busy for 500ms, "
"running task in new thread."); "running task in new thread.");
thread::spawn([t = _queue.front()]() { t->run(); }); thread::spawn([t = _queue.front()]() { t->run(); });
} }
} }
_queue.pop(); _queue.pop();
} }
} }

View File

@ -18,24 +18,30 @@
#ifndef LOGID_WORKQUEUE_H #ifndef LOGID_WORKQUEUE_H
#define LOGID_WORKQUEUE_H #define LOGID_WORKQUEUE_H
#include "worker_thread.h" #include "mutex_queue.h"
#include "task.h"
#include "thread.h" #include "thread.h"
namespace logid namespace logid
{ {
class worker_thread;
class workqueue class workqueue
{ {
public: public:
explicit workqueue(std::size_t thread_count); explicit workqueue(std::size_t thread_count);
~workqueue(); ~workqueue();
void queue(std::shared_ptr<task> t); void queue(const std::shared_ptr<task>& t);
void busyUpdate(); void busyUpdate();
void stop(); void stop();
std::size_t threadCount() const; [[nodiscard]] std::size_t threadCount() const;
friend class worker_thread;
void notifyFree();
private: private:
void _run(); void _run();