From dbe24f9350212c1ea68dc2de60f431117214fb0e Mon Sep 17 00:00:00 2001 From: pixl Date: Fri, 21 Jan 2022 23:24:47 -0500 Subject: [PATCH] Phase out workqueue --- src/logid/CMakeLists.txt | 5 +- src/logid/Device.cpp | 16 +-- src/logid/Device.h | 1 - src/logid/DeviceManager.cpp | 9 +- src/logid/DeviceManager.h | 1 - src/logid/Receiver.cpp | 3 +- src/logid/actions/ChangeDPI.cpp | 2 +- src/logid/actions/ChangeHostAction.cpp | 2 +- src/logid/actions/CycleDPI.cpp | 2 +- src/logid/actions/ToggleHiresScroll.cpp | 2 +- src/logid/actions/ToggleSmartShift.cpp | 2 +- src/logid/backend/dj/Receiver.cpp | 7 +- src/logid/backend/dj/Receiver.h | 4 +- src/logid/backend/dj/ReceiverMonitor.cpp | 68 ++++++------ src/logid/backend/dj/ReceiverMonitor.h | 4 +- src/logid/backend/hidpp/Device.cpp | 6 +- src/logid/backend/hidpp/Device.h | 3 +- src/logid/backend/hidpp10/Device.cpp | 5 +- src/logid/backend/hidpp10/Device.h | 3 +- src/logid/backend/hidpp20/Device.cpp | 5 +- src/logid/backend/hidpp20/Device.h | 2 +- src/logid/backend/raw/DeviceMonitor.cpp | 72 ++++++------ src/logid/backend/raw/DeviceMonitor.h | 11 +- src/logid/backend/raw/RawDevice.cpp | 41 ++----- src/logid/backend/raw/RawDevice.h | 12 +- src/logid/config/schema.h | 6 +- src/logid/features/DeviceStatus.cpp | 2 +- src/logid/logid.cpp | 1 - src/logid/util/task.cpp | 67 ++---------- src/logid/util/task.h | 46 +------- src/logid/util/worker_thread.cpp | 89 --------------- src/logid/util/worker_thread.h | 56 ---------- src/logid/util/workqueue.cpp | 134 ----------------------- src/logid/util/workqueue.h | 63 ----------- 34 files changed, 128 insertions(+), 624 deletions(-) delete mode 100644 src/logid/util/worker_thread.cpp delete mode 100644 src/logid/util/worker_thread.h delete mode 100644 src/logid/util/workqueue.cpp delete mode 100644 src/logid/util/workqueue.h diff --git a/src/logid/CMakeLists.txt b/src/logid/CMakeLists.txt index 3fe8150..63bac6b 100644 --- a/src/logid/CMakeLists.txt +++ b/src/logid/CMakeLists.txt @@ -64,11 +64,8 @@ add_executable(logid backend/hidpp20/features/WirelessDeviceStatus.cpp backend/hidpp20/features/ThumbWheel.cpp backend/dj/Report.cpp - util/mutex_queue.h - util/workqueue.cpp - util/worker_thread.cpp - util/task.cpp util/thread.cpp + util/task.cpp util/ExceptionHandler.cpp) set_target_properties(logid PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) diff --git a/src/logid/Device.cpp b/src/logid/Device.cpp index 2163e24..bbc176e 100644 --- a/src/logid/Device.cpp +++ b/src/logid/Device.cpp @@ -98,8 +98,7 @@ std::shared_ptr Device::make( Device::Device(std::string path, backend::hidpp::DeviceIndex index, std::shared_ptr manager) : _hidpp20 (path, index, - manager->config()->io_timeout.value_or(defaults::io_timeout), - manager->workQueue()), + manager->config()->io_timeout.value_or(defaults::io_timeout)), _path (std::move(path)), _index (index), _config (_getConfig(manager, _hidpp20.name())), _receiver (nullptr), @@ -213,19 +212,6 @@ void Device::reset() "available.", _path.c_str(), _index); } -std::shared_ptr Device::workQueue() const -{ - if(auto manager = _manager.lock()) { - return manager->workQueue(); - } else { - logPrintf(ERROR, "Device manager lost"); - logPrintf(ERROR, - "Fatal error occurred, file a bug report," - " the program will now exit."); - std::terminate(); - } -} - std::shared_ptr Device::virtualInput() const { if(auto manager = _manager.lock()) { diff --git a/src/logid/Device.h b/src/logid/Device.h index 0c143d9..503ab44 100644 --- a/src/logid/Device.h +++ b/src/logid/Device.h @@ -82,7 +82,6 @@ namespace logid void reset(); - [[nodiscard]] std::shared_ptr workQueue() const; [[nodiscard]] std::shared_ptr virtualInput() const; [[nodiscard]] std::shared_ptr ipcNode() const; diff --git a/src/logid/DeviceManager.cpp b/src/logid/DeviceManager.cpp index 854df9d..7ae827d 100644 --- a/src/logid/DeviceManager.cpp +++ b/src/logid/DeviceManager.cpp @@ -24,7 +24,6 @@ #include "DeviceManager.h" #include "Receiver.h" #include "util/log.h" -#include "util/workqueue.h" #include "backend/hidpp10/Error.h" #include "backend/Error.h" @@ -44,7 +43,7 @@ namespace logid { DeviceManager::DeviceManager(std::shared_ptr config, std::shared_ptr virtual_input, std::shared_ptr server) : - backend::raw::DeviceMonitor(config->workers.value_or(defaults::worker_count)), + backend::raw::DeviceMonitor(), _server (std::move(server)), _config (std::move(config)), _virtual_input (std::move(virtual_input)), _root_node (ipcgull::node::make_root("")), @@ -96,8 +95,7 @@ void DeviceManager::addDevice(std::string path) // Check if device is ignored before continuing { raw::RawDevice raw_dev( - path,config()->io_timeout.value_or(defaults::io_timeout), - workQueue()); + path,config()->io_timeout.value_or(defaults::io_timeout)); if(config()->ignore.has_value() && config()->ignore.value().contains(raw_dev.productId())) { logPrintf(DEBUG, "%s: Device 0x%04x ignored.", @@ -109,8 +107,7 @@ void DeviceManager::addDevice(std::string path) try { hidpp::Device device( path, hidpp::DefaultDevice, - config()->io_timeout.value_or(defaults::io_timeout), - workQueue()); + config()->io_timeout.value_or(defaults::io_timeout)); isReceiver = device.version() == std::make_tuple(1, 0); } catch(hidpp10::Error &e) { if(e.code() != hidpp10::Error::UnknownDevice) diff --git a/src/logid/DeviceManager.h b/src/logid/DeviceManager.h index 8f67934..aecc991 100644 --- a/src/logid/DeviceManager.h +++ b/src/logid/DeviceManager.h @@ -32,7 +32,6 @@ namespace logid { - class workqueue; class InputDevice; class DeviceManager : public backend::raw::DeviceMonitor diff --git a/src/logid/Receiver.cpp b/src/logid/Receiver.cpp index cc1edaf..7ea7f5c 100644 --- a/src/logid/Receiver.cpp +++ b/src/logid/Receiver.cpp @@ -66,8 +66,7 @@ Receiver::Receiver(const std::string& path, const std::shared_ptr& manager) : dj::ReceiverMonitor(path, manager->config()->io_timeout.value_or( - defaults::io_timeout), - manager->workQueue()), + defaults::io_timeout)), _path (path), _manager (manager), _nickname (manager), _ipc_node (manager->receiversNode()->make_child(_nickname)), _ipc_interface (_ipc_node->make_interface(this)) diff --git a/src/logid/actions/ChangeDPI.cpp b/src/logid/actions/ChangeDPI.cpp index 7eee2c9..c1f7f62 100644 --- a/src/logid/actions/ChangeDPI.cpp +++ b/src/logid/actions/ChangeDPI.cpp @@ -39,7 +39,7 @@ void ChangeDPI::press() { _pressed = true; if(_dpi) { - task::spawn(_device->workQueue(), + spawn_task( [this]{ try { uint16_t last_dpi = _dpi->getDPI(_config.sensor.value_or(0)); diff --git a/src/logid/actions/ChangeHostAction.cpp b/src/logid/actions/ChangeHostAction.cpp index 0b87507..ad0a8b3 100644 --- a/src/logid/actions/ChangeHostAction.cpp +++ b/src/logid/actions/ChangeHostAction.cpp @@ -49,7 +49,7 @@ void ChangeHostAction::press() void ChangeHostAction::release() { if(_change_host) { - task::spawn(_device->workQueue(), + spawn_task( [this] { auto host_info = _change_host->getHostInfo(); int next_host; diff --git a/src/logid/actions/CycleDPI.cpp b/src/logid/actions/CycleDPI.cpp index 499e29d..60c0f04 100644 --- a/src/logid/actions/CycleDPI.cpp +++ b/src/logid/actions/CycleDPI.cpp @@ -40,7 +40,7 @@ void CycleDPI::press() { _pressed = true; if(_dpi && !_config.dpis.empty()) { - task::spawn(_device->workQueue(), + spawn_task( [this](){ std::lock_guard lock(_dpi_lock); ++_current_dpi; diff --git a/src/logid/actions/ToggleHiresScroll.cpp b/src/logid/actions/ToggleHiresScroll.cpp index 591e09b..35e5f8d 100644 --- a/src/logid/actions/ToggleHiresScroll.cpp +++ b/src/logid/actions/ToggleHiresScroll.cpp @@ -38,7 +38,7 @@ void ToggleHiresScroll::press() _pressed = true; if(_hires_scroll) { - task::spawn(_device->workQueue(), + spawn_task( [hires=this->_hires_scroll](){ auto mode = hires->getMode(); mode ^= backend::hidpp20::HiresScroll::HiRes; diff --git a/src/logid/actions/ToggleSmartShift.cpp b/src/logid/actions/ToggleSmartShift.cpp index 58c5e36..fe5efe1 100644 --- a/src/logid/actions/ToggleSmartShift.cpp +++ b/src/logid/actions/ToggleSmartShift.cpp @@ -37,7 +37,7 @@ void ToggleSmartShift::press() { _pressed = true; if(_smartshift) { - task::spawn(_device->workQueue(), + spawn_task( [ss=this->_smartshift](){ auto status = ss->getStatus(); status.setActive = true; diff --git a/src/logid/backend/dj/Receiver.cpp b/src/logid/backend/dj/Receiver.cpp index f0909e5..a97ece1 100644 --- a/src/logid/backend/dj/Receiver.cpp +++ b/src/logid/backend/dj/Receiver.cpp @@ -20,7 +20,6 @@ #include "Report.h" #include "Receiver.h" #include "Error.h" -#include "../../util/thread.h" using namespace logid::backend::dj; using namespace logid::backend; @@ -44,11 +43,9 @@ InvalidReceiver::Reason InvalidReceiver::code() const noexcept return _reason; } -Receiver::Receiver(std::string path, - double io_timeout, - const std::shared_ptr& wq) : +Receiver::Receiver(std::string path, double io_timeout) : _raw_device (std::make_shared( - std::move(path), io_timeout, wq)), + std::move(path), io_timeout)), _hidpp10_device (_raw_device, hidpp::DefaultDevice) { if(!supportsDjReports(_raw_device->reportDescriptor())) diff --git a/src/logid/backend/dj/Receiver.h b/src/logid/backend/dj/Receiver.h index 270a4eb..7152918 100644 --- a/src/logid/backend/dj/Receiver.h +++ b/src/logid/backend/dj/Receiver.h @@ -52,9 +52,7 @@ namespace dj class Receiver final { public: - Receiver(std::string path, - double io_timeout, - const std::shared_ptr& wq); + Receiver(std::string path, double io_timeout); enum DjEvents : uint8_t { diff --git a/src/logid/backend/dj/ReceiverMonitor.cpp b/src/logid/backend/dj/ReceiverMonitor.cpp index ae51314..55e527d 100644 --- a/src/logid/backend/dj/ReceiverMonitor.cpp +++ b/src/logid/backend/dj/ReceiverMonitor.cpp @@ -25,12 +25,8 @@ using namespace logid::backend::dj; -ReceiverMonitor::ReceiverMonitor( - std::string path, - double io_timeout, - const std::shared_ptr& wq) : - _workqueue (wq), - _receiver (std::make_shared(std::move(path), io_timeout, wq)) +ReceiverMonitor::ReceiverMonitor(std::string path, double io_timeout) : + _receiver (std::make_shared(std::move(path), io_timeout)) { assert(_receiver->hidppEventHandlers().find("RECVMON") == _receiver->hidppEventHandlers().end()); @@ -66,25 +62,29 @@ void ReceiverMonitor::run() /* Running in a new thread prevents deadlocks since the * receiver may be enumerating. */ - task::spawn(_workqueue, - [this, report]() { - if (report.subId() == Receiver::DeviceConnection) - this->addDevice(this->_receiver->deviceConnectionEvent - (report)); - else if (report.subId() == Receiver::DeviceDisconnection) - this->removeDevice(this->_receiver-> - deviceDisconnectionEvent(report)); - }, {[report, path=this->_receiver->rawDevice()->hidrawPath()] - (std::exception& e) { - if(report.subId() == Receiver::DeviceConnection) - logPrintf(ERROR, "Failed to add device %d to receiver " - "on %s: %s", report.deviceIndex(), - path.c_str(), e.what()); - else if(report.subId() == Receiver::DeviceDisconnection) - logPrintf(ERROR, "Failed to remove device %d from " - "receiver on %s: %s", report.deviceIndex() - ,path.c_str(), e.what()); - }}); + std::async([this, report, + path=this->_receiver->rawDevice()->hidrawPath()]() { + if (report.subId() == Receiver::DeviceConnection) { + try { + this->addDevice(this->_receiver->deviceConnectionEvent + (report)); + } catch(std::exception& e) { + logPrintf(ERROR, "Failed to add device %d to receiver " + "on %s: %s", report.deviceIndex(), + path.c_str(), e.what()); + } + } + else if (report.subId() == Receiver::DeviceDisconnection) { + try { + this->removeDevice(this->_receiver-> + deviceDisconnectionEvent(report)); + } catch(std::exception& e) { + logPrintf(ERROR, "Failed to remove device %d from " + "receiver on %s: %s", report.deviceIndex(), + path.c_str(), e.what()); + } + } + }); }; _receiver->addHidppEventHandler("RECVMON", event_handler); @@ -122,16 +122,18 @@ void ReceiverMonitor::waitForDevice(hidpp::DeviceIndex index) event.index = index; event.fromTimeoutCheck = true; - task::spawn(_workqueue, - {[this, event, nickname]() { + spawn_task( + [this, event, nickname]() { + try { _receiver->rawDevice()->removeEventHandler(nickname); this->addDevice(event); - }}, {[path=_receiver->rawDevice()->hidrawPath(), event] - (std::exception& e) { - logPrintf(ERROR, "Failed to add device %d to receiver " - "on %s: %s", event.index, - path.c_str(), e.what()); - }}); + } catch(std::exception& e) { + logPrintf(ERROR, "Failed to add device %d to receiver " + "on %s: %s", event.index, + _receiver->rawDevice()->hidrawPath().c_str(), + e.what()); + } + }); }; _receiver->rawDevice()->addEventHandler(nickname, handler); diff --git a/src/logid/backend/dj/ReceiverMonitor.h b/src/logid/backend/dj/ReceiverMonitor.h index 1349bf8..33517b4 100644 --- a/src/logid/backend/dj/ReceiverMonitor.h +++ b/src/logid/backend/dj/ReceiverMonitor.h @@ -33,8 +33,7 @@ namespace dj { public: ReceiverMonitor(std::string path, - double io_timeout, - const std::shared_ptr& wq); + double io_timeout); virtual ~ReceiverMonitor(); void enumerate(); @@ -55,7 +54,6 @@ namespace dj std::shared_ptr receiver() const; private: - std::shared_ptr _workqueue; std::shared_ptr _receiver; }; diff --git a/src/logid/backend/hidpp/Device.cpp b/src/logid/backend/hidpp/Device.cpp index a4d727c..7bc521d 100644 --- a/src/logid/backend/hidpp/Device.cpp +++ b/src/logid/backend/hidpp/Device.cpp @@ -18,7 +18,6 @@ #include #include -#include "../../util/thread.h" #include "Device.h" #include "Report.h" #include "../hidpp20/features/Root.h" @@ -50,9 +49,8 @@ Device::InvalidDevice::Reason Device::InvalidDevice::code() const noexcept } Device::Device(const std::string& path, DeviceIndex index, - double io_timeout, - const std::shared_ptr& wq): - _raw_device (std::make_shared(path, io_timeout, wq)), + double io_timeout): + _raw_device (std::make_shared(path, io_timeout)), _receiver (nullptr), _path (path), _index (index) { _init(); diff --git a/src/logid/backend/hidpp/Device.h b/src/logid/backend/hidpp/Device.h index 1dea588..9c36b85 100644 --- a/src/logid/backend/hidpp/Device.h +++ b/src/logid/backend/hidpp/Device.h @@ -62,8 +62,7 @@ namespace hidpp }; Device(const std::string& path, DeviceIndex index, - double io_timeout, - const std::shared_ptr& wq); + double io_timeout); Device(std::shared_ptr raw_device, DeviceIndex index); Device(std::shared_ptr receiver, diff --git a/src/logid/backend/hidpp10/Device.cpp b/src/logid/backend/hidpp10/Device.cpp index 8f6e54e..65cf538 100644 --- a/src/logid/backend/hidpp10/Device.cpp +++ b/src/logid/backend/hidpp10/Device.cpp @@ -25,9 +25,8 @@ using namespace logid::backend; using namespace logid::backend::hidpp10; Device::Device(const std::string &path, hidpp::DeviceIndex index, - double io_timeout, - const std::shared_ptr& wq) : - hidpp::Device(path, index, io_timeout, wq) + double io_timeout) : + hidpp::Device(path, index, io_timeout) { assert(version() == std::make_tuple(1, 0)); } diff --git a/src/logid/backend/hidpp10/Device.h b/src/logid/backend/hidpp10/Device.h index b61e054..be98dd0 100644 --- a/src/logid/backend/hidpp10/Device.h +++ b/src/logid/backend/hidpp10/Device.h @@ -29,8 +29,7 @@ namespace hidpp10 { public: Device(const std::string& path, hidpp::DeviceIndex index, - double io_timeout, - const std::shared_ptr& wq); + double io_timeout); Device(std::shared_ptr raw_dev, hidpp::DeviceIndex index); Device(std::shared_ptr receiver, diff --git a/src/logid/backend/hidpp20/Device.cpp b/src/logid/backend/hidpp20/Device.cpp index ed93b70..a3ac6a3 100644 --- a/src/logid/backend/hidpp20/Device.cpp +++ b/src/logid/backend/hidpp20/Device.cpp @@ -23,9 +23,8 @@ using namespace logid::backend::hidpp20; -Device::Device(std::string path, hidpp::DeviceIndex index, - double io_timeout, const std::shared_ptr& wq) : - hidpp::Device(path, index, io_timeout, wq) +Device::Device(std::string path, hidpp::DeviceIndex index, double io_timeout) : + hidpp::Device(path, index, io_timeout) { assert(std::get<0>(version()) >= 2); } diff --git a/src/logid/backend/hidpp20/Device.h b/src/logid/backend/hidpp20/Device.h index 482abe8..64f80c3 100644 --- a/src/logid/backend/hidpp20/Device.h +++ b/src/logid/backend/hidpp20/Device.h @@ -29,7 +29,7 @@ namespace hidpp20 { { public: Device(std::string path, hidpp::DeviceIndex index, - double io_timeout, const std::shared_ptr& wq); + double io_timeout); Device(std::shared_ptr raw_device, hidpp::DeviceIndex index); Device(std::shared_ptr receiver, hidpp::DeviceIndex index); diff --git a/src/logid/backend/raw/DeviceMonitor.cpp b/src/logid/backend/raw/DeviceMonitor.cpp index 8520c23..1553e61 100644 --- a/src/logid/backend/raw/DeviceMonitor.cpp +++ b/src/logid/backend/raw/DeviceMonitor.cpp @@ -18,7 +18,6 @@ #include "DeviceMonitor.h" #include "../../util/task.h" -#include "../../util/workqueue.h" #include "../../util/log.h" #include "RawDevice.h" #include "../hidpp/Device.h" @@ -35,8 +34,7 @@ extern "C" using namespace logid; using namespace logid::backend::raw; -DeviceMonitor::DeviceMonitor(int worker_count) : - _workqueue (std::make_shared(worker_count)) +DeviceMonitor::DeviceMonitor() { if(-1 == pipe(_pipe)) throw std::system_error(errno, std::system_category(), @@ -103,28 +101,32 @@ void DeviceMonitor::run() std::string devnode = udev_device_get_devnode(device); if (action == "add") - task::spawn(_workqueue, + spawn_task( [this, name=devnode]() { - // Wait for device to initialise - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - auto supported_reports = backend::hidpp::getSupportedReports( - RawDevice::getReportDescriptor(name)); - if(supported_reports) - this->addDevice(name); - else - logPrintf(DEBUG, "Unsupported device %s ignored", - name.c_str()); - }, [name=devnode](std::exception& e){ - logPrintf(WARN, "Error adding device %s: %s", - name.c_str(), e.what()); + try { + // Wait for device to initialise + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto supported_reports = backend::hidpp::getSupportedReports( + RawDevice::getReportDescriptor(name)); + if(supported_reports) + this->addDevice(name); + else + logPrintf(DEBUG, "Unsupported device %s ignored", + name.c_str()); + } catch(std::exception& e) { + logPrintf(WARN, "Error adding device %s: %s", + name.c_str(), e.what()); + } }); else if (action == "remove") - task::spawn(_workqueue, + spawn_task( [this, name=devnode]() { - this->removeDevice(name); - }, [name=devnode](std::exception& e){ - logPrintf(WARN, "Error removing device %s: %s", - name.c_str(), e.what()); + try { + this->removeDevice(name); + } catch(std::exception& e) { + logPrintf(WARN, "Error removing device %s: %s", + name.c_str(), e.what()); + } }); udev_device_unref (device); @@ -172,24 +174,22 @@ void DeviceMonitor::enumerate() std::string devnode = udev_device_get_devnode(device); udev_device_unref(device); - task::spawn(_workqueue, + spawn_task( [this, name=devnode]() { - auto supported_reports = backend::hidpp::getSupportedReports( - RawDevice::getReportDescriptor(name)); - if(supported_reports) - this->addDevice(name); - else - logPrintf(DEBUG, "Unsupported device %s ignored", - name.c_str()); - }, [name=devnode](std::exception& e){ - logPrintf(WARN, "Error adding device %s: %s", - name.c_str(), e.what()); + try { + auto supported_reports = backend::hidpp::getSupportedReports( + RawDevice::getReportDescriptor(name)); + if(supported_reports) + this->addDevice(name); + else + logPrintf(DEBUG, "Unsupported device %s ignored", + name.c_str()); + } catch(std::exception& e) { + logPrintf(WARN, "Error adding device %s: %s", + name.c_str(), e.what()); + } }); } udev_enumerate_unref(udev_enum); } - -std::shared_ptr DeviceMonitor::workQueue() const { - return _workqueue; -} diff --git a/src/logid/backend/raw/DeviceMonitor.h b/src/logid/backend/raw/DeviceMonitor.h index c54cb64..9b7fc8e 100644 --- a/src/logid/backend/raw/DeviceMonitor.h +++ b/src/logid/backend/raw/DeviceMonitor.h @@ -26,10 +26,7 @@ struct udev; -namespace logid { - class workqueue; -namespace backend { -namespace raw +namespace logid::backend::raw { class DeviceMonitor { @@ -38,9 +35,8 @@ namespace raw void run(); void stop(); - std::shared_ptr workQueue() const; protected: - explicit DeviceMonitor(int worker_count); + DeviceMonitor(); virtual ~DeviceMonitor(); virtual void addDevice(std::string device) = 0; virtual void removeDevice(std::string device) = 0; @@ -49,8 +45,7 @@ namespace raw int _pipe[2]; std::atomic _run_monitor; std::mutex _running; - std::shared_ptr _workqueue; }; -}}} +} #endif //LOGID_BACKEND_RAW_DEVICEMONITOR_H \ No newline at end of file diff --git a/src/logid/backend/raw/RawDevice.cpp b/src/logid/backend/raw/RawDevice.cpp index dd2103e..0c86eb6 100644 --- a/src/logid/backend/raw/RawDevice.cpp +++ b/src/logid/backend/raw/RawDevice.cpp @@ -22,10 +22,7 @@ #include "../dj/defs.h" #include "../../util/log.h" #include "../hidpp/Report.h" -#include "../../Configuration.h" #include "../../util/thread.h" -#include "../../util/task.h" -#include "../../util/workqueue.h" #include #include @@ -64,14 +61,11 @@ bool RawDevice::supportedReport(uint8_t id, uint8_t length) } } -RawDevice::RawDevice(std::string path, - double io_timeout, - const std::shared_ptr& wq) : +RawDevice::RawDevice(std::string path, double io_timeout) : _path (std::move(path)), _continue_listen (false), _continue_respond (false), _io_timeout (duration_cast( - duration(io_timeout))), - _workqueue (wq) + duration(io_timeout))) { int ret; @@ -204,32 +198,19 @@ std::vector RawDevice::sendReport(const std::vector& report) return f.get(); } else { - std::vector response; std::exception_ptr _exception; - std::shared_ptr t = std::make_shared( - [this, report, &response]() { - response = _respondToReport(report); - }, [&_exception](std::exception& e) { - try { - throw e; - } catch(std::exception& e) { - _exception = std::make_exception_ptr(e); - } - }); - _workqueue->queue(t); - t->waitStart(); - auto status = t->waitFor(_io_timeout); - if(_exception) - std::rethrow_exception(_exception); + auto response = std::async(std::launch::deferred, + [this, report]()->std::vector { + return _respondToReport(report); + }); + auto status = response.wait_for(_io_timeout); if(status == std::future_status::timeout) { - _continue_respond = false; interruptRead(); - t->wait(); - if(_exception) - std::rethrow_exception(_exception); + if(response.valid()) + response.wait(); throw TimeoutError(); - } else - return response; + } + return response.get(); } } diff --git a/src/logid/backend/raw/RawDevice.h b/src/logid/backend/raw/RawDevice.h index 2619e90..0a41caa 100644 --- a/src/logid/backend/raw/RawDevice.h +++ b/src/logid/backend/raw/RawDevice.h @@ -30,11 +30,7 @@ #include "defs.h" #include "../../util/mutex_queue.h" -namespace logid { - class workqueue; - -namespace backend { -namespace raw +namespace logid::backend::raw { class RawDevice { @@ -42,8 +38,7 @@ namespace raw static bool supportedReport(uint8_t id, uint8_t length); explicit RawDevice(std::string path, - double io_timeout, - const std::shared_ptr& wq); + double io_timeout); ~RawDevice(); std::string hidrawPath() const; @@ -85,7 +80,6 @@ namespace raw std::condition_variable _listen_condition; const std::chrono::milliseconds _io_timeout; - const std::shared_ptr _workqueue; std::map> _event_handlers; @@ -104,6 +98,6 @@ namespace raw mutex_queue()>>> _io_queue; }; -}}} +} #endif //LOGID_BACKEND_RAWDEVICE_H \ No newline at end of file diff --git a/src/logid/config/schema.h b/src/logid/config/schema.h index ecffbf3..3fe3f52 100644 --- a/src/logid/config/schema.h +++ b/src/logid/config/schema.h @@ -276,12 +276,10 @@ namespace logid::config { std::variant, "name">> devices; std::optional> ignore; std::optional io_timeout; - std::optional workers; - Config() : group({"devices", "ignore", "io_timeout", "workers"}, + Config() : group({"devices", "ignore", "io_timeout"}, &Config::devices, &Config::ignore, - &Config::io_timeout, - &Config::workers) { } + &Config::io_timeout) { } }; } diff --git a/src/logid/features/DeviceStatus.cpp b/src/logid/features/DeviceStatus.cpp index e7c2bc7..0757cd8 100644 --- a/src/logid/features/DeviceStatus.cpp +++ b/src/logid/features/DeviceStatus.cpp @@ -62,7 +62,7 @@ void DeviceStatus::listen() auto event = hidpp20::WirelessDeviceStatus::statusBroadcastEvent( report); if(event.reconfNeeded) - task::spawn(dev->workQueue(), [dev](){ dev->wakeup(); }); + spawn_task( [dev](){ dev->wakeup(); }); }; _device->hidpp20().addEventHandler(EVENTHANDLER_NAME, handler); diff --git a/src/logid/logid.cpp b/src/logid/logid.cpp index b3ffcd6..223546f 100644 --- a/src/logid/logid.cpp +++ b/src/logid/logid.cpp @@ -25,7 +25,6 @@ #include "util/log.h" #include "DeviceManager.h" #include "InputDevice.h" -#include "util/workqueue.h" #define LOGID_VIRTUAL_INPUT_NAME "LogiOps Virtual Input" #define DEFAULT_CONFIG_FILE "/etc/logid.cfg" diff --git a/src/logid/util/task.cpp b/src/logid/util/task.cpp index 7a680d7..32f4a05 100644 --- a/src/logid/util/task.cpp +++ b/src/logid/util/task.cpp @@ -16,66 +16,17 @@ * */ #include "task.h" -#include "workqueue.h" using namespace logid; -task::task(const std::function& function, - const std::function& exception_handler) : - _function (std::make_shared>(function)), - _exception_handler (std::make_shared> - (exception_handler)), _status (Waiting), - _task_pkg ([this](){ - try { - (*_function)(); - } catch(std::exception& e) { - (*_exception_handler)(e); - } - }), _future (_task_pkg.get_future()) +void logid::spawn_task(const std::function& function) { -} - -void task::run() -{ - _status = Running; - _status_cv.notify_all(); - _task_pkg(); - _status = Completed; - _status_cv.notify_all(); -} - -task::Status task::getStatus() -{ - return _status; -} - -void task::wait() -{ - if(_future.valid()) - _future.wait(); - else { - std::mutex wait_start; - std::unique_lock lock(wait_start); - _status_cv.wait(lock, [this](){ return _status == Completed; }); - } -} - -void task::waitStart() -{ - std::mutex wait_start; - std::unique_lock lock(wait_start); - _status_cv.wait(lock, [this](){ return _status != Waiting; }); -} - -std::future_status task::waitFor(std::chrono::milliseconds ms) -{ - return _future.wait_for(ms); -} - -void task::spawn(std::shared_ptr wq, - const std::function& function, - const std::function& exception_handler) -{ - auto t = std::make_shared(function, exception_handler); - wq->queue(t); + auto future = std::make_shared>(); + *future = std::async(std::launch::async,[function, future]() { + try { + function(); + } catch(std::exception& e) { + ExceptionHandler::Default(e); + } + }); } \ No newline at end of file diff --git a/src/logid/util/task.h b/src/logid/util/task.h index 728504d..e258465 100644 --- a/src/logid/util/task.h +++ b/src/logid/util/task.h @@ -25,48 +25,10 @@ namespace logid { - class workqueue; - - class task - { - public: - enum Status - { - Waiting, - Running, - Completed - }; - - explicit task(const std::function& function, - const std::function& - exception_handler={[](std::exception& e) - {ExceptionHandler::Default(e);}}); - - Status getStatus(); - - void run(); // Runs synchronously - void wait(); - void waitStart(); - std::future_status waitFor(std::chrono::milliseconds ms); - - /* This function spawns a new task into the least used worker queue - * and forgets about it. - */ - static void spawn(std::shared_ptr wq, - const std::function& function, - const std::function& - exception_handler={[](std::exception& e) - {ExceptionHandler::Default(e);}}); - - private: - std::shared_ptr> _function; - std::shared_ptr> - _exception_handler; - std::atomic _status; - std::condition_variable _status_cv; - std::packaged_task _task_pkg; - std::future _future; - }; + /* This function spawns a new task into the least used worker queue + * and forgets about it. + */ + void spawn_task(const std::function& function); } #endif //LOGID_TASK_H diff --git a/src/logid/util/worker_thread.cpp b/src/logid/util/worker_thread.cpp deleted file mode 100644 index 81dda91..0000000 --- a/src/logid/util/worker_thread.cpp +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2019-2020 PixlOne - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - */ -#include -#include "worker_thread.h" -#include "log.h" -#include "workqueue.h" - -using namespace logid; - -worker_thread::worker_thread(workqueue* parent, std::size_t worker_number) : -_parent (parent), _worker_number (worker_number), _continue_run (false), -_thread (std::make_unique ([this](){ - _run(); }, [this](std::exception& e){ _exception_handler(e); })) -{ - _thread->run(); -} - -worker_thread::~worker_thread() -{ - _continue_run = false; - _queue_cv.notify_all(); - // Block until task is complete - std::unique_lock lock(_busy); - - while(!_queue.empty()) { - _parent->queue(_queue.front()); - _queue.pop(); - } -} - -void worker_thread::queue(const std::shared_ptr& t) -{ - _queue.push(t); - _queue_cv.notify_all(); -} - -bool worker_thread::busy() -{ - bool not_busy = _busy.try_lock(); - - if(not_busy) - _busy.unlock(); - - return !not_busy; -} - -void worker_thread::_run() -{ - std::unique_lock lock(_run_lock); - _continue_run = true; - while(_continue_run) { - _parent->busyUpdate(); - _queue_cv.wait(lock, [this]{ return !_queue.empty() || - !_continue_run; }); - if(!_continue_run) - return; - std::unique_lock busy_lock(_busy); - while(!_queue.empty()) { - _queue.front()->run(); - _queue.pop(); - } - _parent->notifyFree(); - } -} - -void worker_thread::_exception_handler(std::exception &e) -{ - logPrintf(WARN, "Exception caught on worker thread %d, restarting: %s", - _worker_number, e.what()); - // This action destroys the logid::thread, std::thread should detach safely. - _thread = std::make_unique([this](){ _run(); }, - [this](std::exception& e) { _exception_handler(e); }); - _thread->run(); -} \ No newline at end of file diff --git a/src/logid/util/worker_thread.h b/src/logid/util/worker_thread.h deleted file mode 100644 index 0dd53ca..0000000 --- a/src/logid/util/worker_thread.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2019-2020 PixlOne - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - */ -#ifndef LOGID_WORKER_THREAD_H -#define LOGID_WORKER_THREAD_H - -#include "mutex_queue.h" -#include "task.h" -#include "thread.h" - -namespace logid -{ - class workqueue; - - class worker_thread - { - public: - worker_thread(workqueue* parent, std::size_t worker_number); - ~worker_thread(); - - void queue(const std::shared_ptr& t); - - bool busy(); - private: - void _run(); - void _exception_handler(std::exception& e); - - workqueue* _parent; - const std::size_t _worker_number; - - std::mutex _run_lock; - std::atomic _continue_run; - std::condition_variable _queue_cv; - - std::unique_ptr _thread; - std::mutex _busy; - - mutex_queue> _queue; - }; -} - -#endif //LOGID_WORKER_THREAD_H diff --git a/src/logid/util/workqueue.cpp b/src/logid/util/workqueue.cpp deleted file mode 100644 index 44fb347..0000000 --- a/src/logid/util/workqueue.cpp +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2019-2020 PixlOne - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - */ -#include -#include "workqueue.h" -#include "worker_thread.h" -#include "log.h" - -using namespace logid; - -workqueue::workqueue(std::size_t thread_count) : _manager_thread ( - std::make_unique( - [this](){ _run(); } - , [this](std::exception& e){ _exception_handler(e); } - )), _continue_run (false), _worker_count (thread_count) -{ - _workers.reserve(_worker_count); - for(std::size_t i = 0; i < _worker_count; i++) - _workers.push_back(std::make_unique(this, i)); - _manager_thread->run(); -} - -workqueue::~workqueue() -{ - stop(); - - while(_workers.empty()) - _workers.pop_back(); - - // Queue should have been empty before, but just confirm here. - while(!_queue.empty()) { - thread::spawn([t=_queue.front()](){ t->run(); }); - _queue.pop(); - } -} - -void workqueue::queue(const std::shared_ptr& t) -{ - assert(t != nullptr); - _queue.push(t); - _queue_cv.notify_all(); -} - -void workqueue::busyUpdate() -{ - _busy_cv.notify_all(); -} - -void workqueue::stop() -{ - _continue_run = false; - std::unique_lock lock(_run_lock); -} - -std::size_t workqueue::threadCount() const -{ - return _workers.size(); -} - -void workqueue::notifyFree() -{ - _busy_cv.notify_all(); -} - -void workqueue::_run() -{ - using namespace std::chrono_literals; - - std::unique_lock lock(_run_lock); - _continue_run = true; - while(_continue_run) { - bool queued = false; - _queue_cv.wait(lock, [this]{ return !(_queue.empty()); }); - while(!_queue.empty()) { - if(_workers.empty()) { - if(_worker_count) - logPrintf(DEBUG, "No workers were found, running task in" - " a new thread."); - thread::spawn([t=_queue.front()](){ t->run(); }); - _queue.pop(); - continue; - } - - for(auto& worker : _workers) { - if(!worker->busy()) { - worker->queue(_queue.front()); - queued = true; - break; - } - } - if(!queued) { - if(_busy_cv.wait_for(lock, 500ms) == std::cv_status::no_timeout) { - for(auto& worker : _workers) { - if(!worker->busy()) { - worker->queue(_queue.front()); - break; - } - } - } else{ - // Workers busy, launch in new thread - logPrintf(DEBUG, "All workers were busy for 500ms, " - "running task in new thread."); - thread::spawn([t = _queue.front()]() { t->run(); }); - } - } - - _queue.pop(); - } - } -} - -void workqueue::_exception_handler(std::exception &e) -{ - logPrintf(WARN, "Exception caught on workqueue manager thread, " - "restarting: %s" , e.what()); - // This action destroys the logid::thread, std::thread should detach safely. - _manager_thread = std::make_unique([this](){ _run(); }, - [this](std::exception& e) { _exception_handler(e); }); - _manager_thread->run(); -} \ No newline at end of file diff --git a/src/logid/util/workqueue.h b/src/logid/util/workqueue.h deleted file mode 100644 index 39cb891..0000000 --- a/src/logid/util/workqueue.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2019-2020 PixlOne - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - */ -#ifndef LOGID_WORKQUEUE_H -#define LOGID_WORKQUEUE_H - -#include "mutex_queue.h" -#include "task.h" -#include "thread.h" - -namespace logid -{ - class worker_thread; - - class workqueue - { - public: - explicit workqueue(std::size_t thread_count); - ~workqueue(); - - void queue(const std::shared_ptr& t); - - void busyUpdate(); - - void stop(); - - [[nodiscard]] std::size_t threadCount() const; - - friend class worker_thread; - void notifyFree(); - private: - void _run(); - - void _exception_handler(std::exception& e); - std::unique_ptr _manager_thread; - - mutex_queue> _queue; - std::condition_variable _queue_cv; - std::condition_variable _busy_cv; - std::mutex _run_lock; - std::atomic _continue_run; - - std::vector> _workers; - std::size_t _worker_count; - }; - -} - -#endif //LOGID_WORKQUEUE_H