Phase out workqueue

This commit is contained in:
pixl 2022-01-21 23:24:47 -05:00
parent 1dd6dbfe02
commit dbe24f9350
No known key found for this signature in database
GPG Key ID: 1866C148CD593B6E
34 changed files with 128 additions and 624 deletions

View File

@ -64,11 +64,8 @@ add_executable(logid
backend/hidpp20/features/WirelessDeviceStatus.cpp backend/hidpp20/features/WirelessDeviceStatus.cpp
backend/hidpp20/features/ThumbWheel.cpp backend/hidpp20/features/ThumbWheel.cpp
backend/dj/Report.cpp backend/dj/Report.cpp
util/mutex_queue.h
util/workqueue.cpp
util/worker_thread.cpp
util/task.cpp
util/thread.cpp util/thread.cpp
util/task.cpp
util/ExceptionHandler.cpp) util/ExceptionHandler.cpp)
set_target_properties(logid PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) set_target_properties(logid PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})

View File

@ -98,8 +98,7 @@ std::shared_ptr<Device> Device::make(
Device::Device(std::string path, backend::hidpp::DeviceIndex index, Device::Device(std::string path, backend::hidpp::DeviceIndex index,
std::shared_ptr<DeviceManager> manager) : std::shared_ptr<DeviceManager> manager) :
_hidpp20 (path, index, _hidpp20 (path, index,
manager->config()->io_timeout.value_or(defaults::io_timeout), manager->config()->io_timeout.value_or(defaults::io_timeout)),
manager->workQueue()),
_path (std::move(path)), _index (index), _path (std::move(path)), _index (index),
_config (_getConfig(manager, _hidpp20.name())), _config (_getConfig(manager, _hidpp20.name())),
_receiver (nullptr), _receiver (nullptr),
@ -213,19 +212,6 @@ void Device::reset()
"available.", _path.c_str(), _index); "available.", _path.c_str(), _index);
} }
std::shared_ptr<workqueue> Device::workQueue() const
{
if(auto manager = _manager.lock()) {
return manager->workQueue();
} else {
logPrintf(ERROR, "Device manager lost");
logPrintf(ERROR,
"Fatal error occurred, file a bug report,"
" the program will now exit.");
std::terminate();
}
}
std::shared_ptr<InputDevice> Device::virtualInput() const std::shared_ptr<InputDevice> Device::virtualInput() const
{ {
if(auto manager = _manager.lock()) { if(auto manager = _manager.lock()) {

View File

@ -82,7 +82,6 @@ namespace logid
void reset(); void reset();
[[nodiscard]] std::shared_ptr<workqueue> workQueue() const;
[[nodiscard]] std::shared_ptr<InputDevice> virtualInput() const; [[nodiscard]] std::shared_ptr<InputDevice> virtualInput() const;
[[nodiscard]] std::shared_ptr<ipcgull::node> ipcNode() const; [[nodiscard]] std::shared_ptr<ipcgull::node> ipcNode() const;

View File

@ -24,7 +24,6 @@
#include "DeviceManager.h" #include "DeviceManager.h"
#include "Receiver.h" #include "Receiver.h"
#include "util/log.h" #include "util/log.h"
#include "util/workqueue.h"
#include "backend/hidpp10/Error.h" #include "backend/hidpp10/Error.h"
#include "backend/Error.h" #include "backend/Error.h"
@ -44,7 +43,7 @@ namespace logid {
DeviceManager::DeviceManager(std::shared_ptr<Configuration> config, DeviceManager::DeviceManager(std::shared_ptr<Configuration> config,
std::shared_ptr<InputDevice> virtual_input, std::shared_ptr<InputDevice> virtual_input,
std::shared_ptr<ipcgull::server> server) : std::shared_ptr<ipcgull::server> server) :
backend::raw::DeviceMonitor(config->workers.value_or(defaults::worker_count)), backend::raw::DeviceMonitor(),
_server (std::move(server)), _config (std::move(config)), _server (std::move(server)), _config (std::move(config)),
_virtual_input (std::move(virtual_input)), _virtual_input (std::move(virtual_input)),
_root_node (ipcgull::node::make_root("")), _root_node (ipcgull::node::make_root("")),
@ -96,8 +95,7 @@ void DeviceManager::addDevice(std::string path)
// Check if device is ignored before continuing // Check if device is ignored before continuing
{ {
raw::RawDevice raw_dev( raw::RawDevice raw_dev(
path,config()->io_timeout.value_or(defaults::io_timeout), path,config()->io_timeout.value_or(defaults::io_timeout));
workQueue());
if(config()->ignore.has_value() && if(config()->ignore.has_value() &&
config()->ignore.value().contains(raw_dev.productId())) { config()->ignore.value().contains(raw_dev.productId())) {
logPrintf(DEBUG, "%s: Device 0x%04x ignored.", logPrintf(DEBUG, "%s: Device 0x%04x ignored.",
@ -109,8 +107,7 @@ void DeviceManager::addDevice(std::string path)
try { try {
hidpp::Device device( hidpp::Device device(
path, hidpp::DefaultDevice, path, hidpp::DefaultDevice,
config()->io_timeout.value_or(defaults::io_timeout), config()->io_timeout.value_or(defaults::io_timeout));
workQueue());
isReceiver = device.version() == std::make_tuple(1, 0); isReceiver = device.version() == std::make_tuple(1, 0);
} catch(hidpp10::Error &e) { } catch(hidpp10::Error &e) {
if(e.code() != hidpp10::Error::UnknownDevice) if(e.code() != hidpp10::Error::UnknownDevice)

View File

@ -32,7 +32,6 @@
namespace logid namespace logid
{ {
class workqueue;
class InputDevice; class InputDevice;
class DeviceManager : public backend::raw::DeviceMonitor class DeviceManager : public backend::raw::DeviceMonitor

View File

@ -66,8 +66,7 @@ Receiver::Receiver(const std::string& path,
const std::shared_ptr<DeviceManager>& manager) : const std::shared_ptr<DeviceManager>& manager) :
dj::ReceiverMonitor(path, dj::ReceiverMonitor(path,
manager->config()->io_timeout.value_or( manager->config()->io_timeout.value_or(
defaults::io_timeout), defaults::io_timeout)),
manager->workQueue()),
_path (path), _manager (manager), _nickname (manager), _path (path), _manager (manager), _nickname (manager),
_ipc_node (manager->receiversNode()->make_child(_nickname)), _ipc_node (manager->receiversNode()->make_child(_nickname)),
_ipc_interface (_ipc_node->make_interface<ReceiverIPC>(this)) _ipc_interface (_ipc_node->make_interface<ReceiverIPC>(this))

View File

@ -39,7 +39,7 @@ void ChangeDPI::press()
{ {
_pressed = true; _pressed = true;
if(_dpi) { if(_dpi) {
task::spawn(_device->workQueue(), spawn_task(
[this]{ [this]{
try { try {
uint16_t last_dpi = _dpi->getDPI(_config.sensor.value_or(0)); uint16_t last_dpi = _dpi->getDPI(_config.sensor.value_or(0));

View File

@ -49,7 +49,7 @@ void ChangeHostAction::press()
void ChangeHostAction::release() void ChangeHostAction::release()
{ {
if(_change_host) { if(_change_host) {
task::spawn(_device->workQueue(), spawn_task(
[this] { [this] {
auto host_info = _change_host->getHostInfo(); auto host_info = _change_host->getHostInfo();
int next_host; int next_host;

View File

@ -40,7 +40,7 @@ void CycleDPI::press()
{ {
_pressed = true; _pressed = true;
if(_dpi && !_config.dpis.empty()) { if(_dpi && !_config.dpis.empty()) {
task::spawn(_device->workQueue(), spawn_task(
[this](){ [this](){
std::lock_guard<std::mutex> lock(_dpi_lock); std::lock_guard<std::mutex> lock(_dpi_lock);
++_current_dpi; ++_current_dpi;

View File

@ -38,7 +38,7 @@ void ToggleHiresScroll::press()
_pressed = true; _pressed = true;
if(_hires_scroll) if(_hires_scroll)
{ {
task::spawn(_device->workQueue(), spawn_task(
[hires=this->_hires_scroll](){ [hires=this->_hires_scroll](){
auto mode = hires->getMode(); auto mode = hires->getMode();
mode ^= backend::hidpp20::HiresScroll::HiRes; mode ^= backend::hidpp20::HiresScroll::HiRes;

View File

@ -37,7 +37,7 @@ void ToggleSmartShift::press()
{ {
_pressed = true; _pressed = true;
if(_smartshift) { if(_smartshift) {
task::spawn(_device->workQueue(), spawn_task(
[ss=this->_smartshift](){ [ss=this->_smartshift](){
auto status = ss->getStatus(); auto status = ss->getStatus();
status.setActive = true; status.setActive = true;

View File

@ -20,7 +20,6 @@
#include "Report.h" #include "Report.h"
#include "Receiver.h" #include "Receiver.h"
#include "Error.h" #include "Error.h"
#include "../../util/thread.h"
using namespace logid::backend::dj; using namespace logid::backend::dj;
using namespace logid::backend; using namespace logid::backend;
@ -44,11 +43,9 @@ InvalidReceiver::Reason InvalidReceiver::code() const noexcept
return _reason; return _reason;
} }
Receiver::Receiver(std::string path, Receiver::Receiver(std::string path, double io_timeout) :
double io_timeout,
const std::shared_ptr<workqueue>& wq) :
_raw_device (std::make_shared<raw::RawDevice>( _raw_device (std::make_shared<raw::RawDevice>(
std::move(path), io_timeout, wq)), std::move(path), io_timeout)),
_hidpp10_device (_raw_device, hidpp::DefaultDevice) _hidpp10_device (_raw_device, hidpp::DefaultDevice)
{ {
if(!supportsDjReports(_raw_device->reportDescriptor())) if(!supportsDjReports(_raw_device->reportDescriptor()))

View File

@ -52,9 +52,7 @@ namespace dj
class Receiver final class Receiver final
{ {
public: public:
Receiver(std::string path, Receiver(std::string path, double io_timeout);
double io_timeout,
const std::shared_ptr<workqueue>& wq);
enum DjEvents : uint8_t enum DjEvents : uint8_t
{ {

View File

@ -25,12 +25,8 @@
using namespace logid::backend::dj; using namespace logid::backend::dj;
ReceiverMonitor::ReceiverMonitor( ReceiverMonitor::ReceiverMonitor(std::string path, double io_timeout) :
std::string path, _receiver (std::make_shared<Receiver>(std::move(path), io_timeout))
double io_timeout,
const std::shared_ptr<workqueue>& wq) :
_workqueue (wq),
_receiver (std::make_shared<Receiver>(std::move(path), io_timeout, wq))
{ {
assert(_receiver->hidppEventHandlers().find("RECVMON") == assert(_receiver->hidppEventHandlers().find("RECVMON") ==
_receiver->hidppEventHandlers().end()); _receiver->hidppEventHandlers().end());
@ -66,25 +62,29 @@ void ReceiverMonitor::run()
/* Running in a new thread prevents deadlocks since the /* Running in a new thread prevents deadlocks since the
* receiver may be enumerating. * receiver may be enumerating.
*/ */
task::spawn(_workqueue, std::async([this, report,
[this, report]() { path=this->_receiver->rawDevice()->hidrawPath()]() {
if (report.subId() == Receiver::DeviceConnection) if (report.subId() == Receiver::DeviceConnection) {
this->addDevice(this->_receiver->deviceConnectionEvent try {
(report)); this->addDevice(this->_receiver->deviceConnectionEvent
else if (report.subId() == Receiver::DeviceDisconnection) (report));
this->removeDevice(this->_receiver-> } catch(std::exception& e) {
deviceDisconnectionEvent(report)); logPrintf(ERROR, "Failed to add device %d to receiver "
}, {[report, path=this->_receiver->rawDevice()->hidrawPath()] "on %s: %s", report.deviceIndex(),
(std::exception& e) { path.c_str(), e.what());
if(report.subId() == Receiver::DeviceConnection) }
logPrintf(ERROR, "Failed to add device %d to receiver " }
"on %s: %s", report.deviceIndex(), else if (report.subId() == Receiver::DeviceDisconnection) {
path.c_str(), e.what()); try {
else if(report.subId() == Receiver::DeviceDisconnection) this->removeDevice(this->_receiver->
logPrintf(ERROR, "Failed to remove device %d from " deviceDisconnectionEvent(report));
"receiver on %s: %s", report.deviceIndex() } catch(std::exception& e) {
,path.c_str(), e.what()); logPrintf(ERROR, "Failed to remove device %d from "
}}); "receiver on %s: %s", report.deviceIndex(),
path.c_str(), e.what());
}
}
});
}; };
_receiver->addHidppEventHandler("RECVMON", event_handler); _receiver->addHidppEventHandler("RECVMON", event_handler);
@ -122,16 +122,18 @@ void ReceiverMonitor::waitForDevice(hidpp::DeviceIndex index)
event.index = index; event.index = index;
event.fromTimeoutCheck = true; event.fromTimeoutCheck = true;
task::spawn(_workqueue, spawn_task(
{[this, event, nickname]() { [this, event, nickname]() {
try {
_receiver->rawDevice()->removeEventHandler(nickname); _receiver->rawDevice()->removeEventHandler(nickname);
this->addDevice(event); this->addDevice(event);
}}, {[path=_receiver->rawDevice()->hidrawPath(), event] } catch(std::exception& e) {
(std::exception& e) { logPrintf(ERROR, "Failed to add device %d to receiver "
logPrintf(ERROR, "Failed to add device %d to receiver " "on %s: %s", event.index,
"on %s: %s", event.index, _receiver->rawDevice()->hidrawPath().c_str(),
path.c_str(), e.what()); e.what());
}}); }
});
}; };
_receiver->rawDevice()->addEventHandler(nickname, handler); _receiver->rawDevice()->addEventHandler(nickname, handler);

View File

@ -33,8 +33,7 @@ namespace dj
{ {
public: public:
ReceiverMonitor(std::string path, ReceiverMonitor(std::string path,
double io_timeout, double io_timeout);
const std::shared_ptr<workqueue>& wq);
virtual ~ReceiverMonitor(); virtual ~ReceiverMonitor();
void enumerate(); void enumerate();
@ -55,7 +54,6 @@ namespace dj
std::shared_ptr<Receiver> receiver() const; std::shared_ptr<Receiver> receiver() const;
private: private:
std::shared_ptr<workqueue> _workqueue;
std::shared_ptr<Receiver> _receiver; std::shared_ptr<Receiver> _receiver;
}; };

View File

@ -18,7 +18,6 @@
#include <cassert> #include <cassert>
#include <utility> #include <utility>
#include "../../util/thread.h"
#include "Device.h" #include "Device.h"
#include "Report.h" #include "Report.h"
#include "../hidpp20/features/Root.h" #include "../hidpp20/features/Root.h"
@ -50,9 +49,8 @@ Device::InvalidDevice::Reason Device::InvalidDevice::code() const noexcept
} }
Device::Device(const std::string& path, DeviceIndex index, Device::Device(const std::string& path, DeviceIndex index,
double io_timeout, double io_timeout):
const std::shared_ptr<workqueue>& wq): _raw_device (std::make_shared<raw::RawDevice>(path, io_timeout)),
_raw_device (std::make_shared<raw::RawDevice>(path, io_timeout, wq)),
_receiver (nullptr), _path (path), _index (index) _receiver (nullptr), _path (path), _index (index)
{ {
_init(); _init();

View File

@ -62,8 +62,7 @@ namespace hidpp
}; };
Device(const std::string& path, DeviceIndex index, Device(const std::string& path, DeviceIndex index,
double io_timeout, double io_timeout);
const std::shared_ptr<workqueue>& wq);
Device(std::shared_ptr<raw::RawDevice> raw_device, Device(std::shared_ptr<raw::RawDevice> raw_device,
DeviceIndex index); DeviceIndex index);
Device(std::shared_ptr<dj::Receiver> receiver, Device(std::shared_ptr<dj::Receiver> receiver,

View File

@ -25,9 +25,8 @@ using namespace logid::backend;
using namespace logid::backend::hidpp10; using namespace logid::backend::hidpp10;
Device::Device(const std::string &path, hidpp::DeviceIndex index, Device::Device(const std::string &path, hidpp::DeviceIndex index,
double io_timeout, double io_timeout) :
const std::shared_ptr<workqueue>& wq) : hidpp::Device(path, index, io_timeout)
hidpp::Device(path, index, io_timeout, wq)
{ {
assert(version() == std::make_tuple(1, 0)); assert(version() == std::make_tuple(1, 0));
} }

View File

@ -29,8 +29,7 @@ namespace hidpp10
{ {
public: public:
Device(const std::string& path, hidpp::DeviceIndex index, Device(const std::string& path, hidpp::DeviceIndex index,
double io_timeout, double io_timeout);
const std::shared_ptr<workqueue>& wq);
Device(std::shared_ptr<raw::RawDevice> raw_dev, Device(std::shared_ptr<raw::RawDevice> raw_dev,
hidpp::DeviceIndex index); hidpp::DeviceIndex index);
Device(std::shared_ptr<dj::Receiver> receiver, Device(std::shared_ptr<dj::Receiver> receiver,

View File

@ -23,9 +23,8 @@
using namespace logid::backend::hidpp20; using namespace logid::backend::hidpp20;
Device::Device(std::string path, hidpp::DeviceIndex index, Device::Device(std::string path, hidpp::DeviceIndex index, double io_timeout) :
double io_timeout, const std::shared_ptr<workqueue>& wq) : hidpp::Device(path, index, io_timeout)
hidpp::Device(path, index, io_timeout, wq)
{ {
assert(std::get<0>(version()) >= 2); assert(std::get<0>(version()) >= 2);
} }

View File

@ -29,7 +29,7 @@ namespace hidpp20 {
{ {
public: public:
Device(std::string path, hidpp::DeviceIndex index, Device(std::string path, hidpp::DeviceIndex index,
double io_timeout, const std::shared_ptr<workqueue>& wq); double io_timeout);
Device(std::shared_ptr<raw::RawDevice> raw_device, hidpp::DeviceIndex index); Device(std::shared_ptr<raw::RawDevice> raw_device, hidpp::DeviceIndex index);
Device(std::shared_ptr<dj::Receiver> receiver, hidpp::DeviceIndex Device(std::shared_ptr<dj::Receiver> receiver, hidpp::DeviceIndex
index); index);

View File

@ -18,7 +18,6 @@
#include "DeviceMonitor.h" #include "DeviceMonitor.h"
#include "../../util/task.h" #include "../../util/task.h"
#include "../../util/workqueue.h"
#include "../../util/log.h" #include "../../util/log.h"
#include "RawDevice.h" #include "RawDevice.h"
#include "../hidpp/Device.h" #include "../hidpp/Device.h"
@ -35,8 +34,7 @@ extern "C"
using namespace logid; using namespace logid;
using namespace logid::backend::raw; using namespace logid::backend::raw;
DeviceMonitor::DeviceMonitor(int worker_count) : DeviceMonitor::DeviceMonitor()
_workqueue (std::make_shared<workqueue>(worker_count))
{ {
if(-1 == pipe(_pipe)) if(-1 == pipe(_pipe))
throw std::system_error(errno, std::system_category(), throw std::system_error(errno, std::system_category(),
@ -103,28 +101,32 @@ void DeviceMonitor::run()
std::string devnode = udev_device_get_devnode(device); std::string devnode = udev_device_get_devnode(device);
if (action == "add") if (action == "add")
task::spawn(_workqueue, spawn_task(
[this, name=devnode]() { [this, name=devnode]() {
// Wait for device to initialise try {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait for device to initialise
auto supported_reports = backend::hidpp::getSupportedReports( std::this_thread::sleep_for(std::chrono::milliseconds(100));
RawDevice::getReportDescriptor(name)); auto supported_reports = backend::hidpp::getSupportedReports(
if(supported_reports) RawDevice::getReportDescriptor(name));
this->addDevice(name); if(supported_reports)
else this->addDevice(name);
logPrintf(DEBUG, "Unsupported device %s ignored", else
name.c_str()); logPrintf(DEBUG, "Unsupported device %s ignored",
}, [name=devnode](std::exception& e){ name.c_str());
logPrintf(WARN, "Error adding device %s: %s", } catch(std::exception& e) {
name.c_str(), e.what()); logPrintf(WARN, "Error adding device %s: %s",
name.c_str(), e.what());
}
}); });
else if (action == "remove") else if (action == "remove")
task::spawn(_workqueue, spawn_task(
[this, name=devnode]() { [this, name=devnode]() {
this->removeDevice(name); try {
}, [name=devnode](std::exception& e){ this->removeDevice(name);
logPrintf(WARN, "Error removing device %s: %s", } catch(std::exception& e) {
name.c_str(), e.what()); logPrintf(WARN, "Error removing device %s: %s",
name.c_str(), e.what());
}
}); });
udev_device_unref (device); udev_device_unref (device);
@ -172,24 +174,22 @@ void DeviceMonitor::enumerate()
std::string devnode = udev_device_get_devnode(device); std::string devnode = udev_device_get_devnode(device);
udev_device_unref(device); udev_device_unref(device);
task::spawn(_workqueue, spawn_task(
[this, name=devnode]() { [this, name=devnode]() {
auto supported_reports = backend::hidpp::getSupportedReports( try {
RawDevice::getReportDescriptor(name)); auto supported_reports = backend::hidpp::getSupportedReports(
if(supported_reports) RawDevice::getReportDescriptor(name));
this->addDevice(name); if(supported_reports)
else this->addDevice(name);
logPrintf(DEBUG, "Unsupported device %s ignored", else
name.c_str()); logPrintf(DEBUG, "Unsupported device %s ignored",
}, [name=devnode](std::exception& e){ name.c_str());
logPrintf(WARN, "Error adding device %s: %s", } catch(std::exception& e) {
name.c_str(), e.what()); logPrintf(WARN, "Error adding device %s: %s",
name.c_str(), e.what());
}
}); });
} }
udev_enumerate_unref(udev_enum); udev_enumerate_unref(udev_enum);
} }
std::shared_ptr<workqueue> DeviceMonitor::workQueue() const {
return _workqueue;
}

View File

@ -26,10 +26,7 @@
struct udev; struct udev;
namespace logid { namespace logid::backend::raw
class workqueue;
namespace backend {
namespace raw
{ {
class DeviceMonitor class DeviceMonitor
{ {
@ -38,9 +35,8 @@ namespace raw
void run(); void run();
void stop(); void stop();
std::shared_ptr<workqueue> workQueue() const;
protected: protected:
explicit DeviceMonitor(int worker_count); DeviceMonitor();
virtual ~DeviceMonitor(); virtual ~DeviceMonitor();
virtual void addDevice(std::string device) = 0; virtual void addDevice(std::string device) = 0;
virtual void removeDevice(std::string device) = 0; virtual void removeDevice(std::string device) = 0;
@ -49,8 +45,7 @@ namespace raw
int _pipe[2]; int _pipe[2];
std::atomic<bool> _run_monitor; std::atomic<bool> _run_monitor;
std::mutex _running; std::mutex _running;
std::shared_ptr<workqueue> _workqueue;
}; };
}}} }
#endif //LOGID_BACKEND_RAW_DEVICEMONITOR_H #endif //LOGID_BACKEND_RAW_DEVICEMONITOR_H

View File

@ -22,10 +22,7 @@
#include "../dj/defs.h" #include "../dj/defs.h"
#include "../../util/log.h" #include "../../util/log.h"
#include "../hidpp/Report.h" #include "../hidpp/Report.h"
#include "../../Configuration.h"
#include "../../util/thread.h" #include "../../util/thread.h"
#include "../../util/task.h"
#include "../../util/workqueue.h"
#include <string> #include <string>
#include <system_error> #include <system_error>
@ -64,14 +61,11 @@ bool RawDevice::supportedReport(uint8_t id, uint8_t length)
} }
} }
RawDevice::RawDevice(std::string path, RawDevice::RawDevice(std::string path, double io_timeout) :
double io_timeout,
const std::shared_ptr<workqueue>& wq) :
_path (std::move(path)), _path (std::move(path)),
_continue_listen (false), _continue_respond (false), _continue_listen (false), _continue_respond (false),
_io_timeout (duration_cast<milliseconds>( _io_timeout (duration_cast<milliseconds>(
duration<double, std::milli>(io_timeout))), duration<double, std::milli>(io_timeout)))
_workqueue (wq)
{ {
int ret; int ret;
@ -204,32 +198,19 @@ std::vector<uint8_t> RawDevice::sendReport(const std::vector<uint8_t>& report)
return f.get(); return f.get();
} }
else { else {
std::vector<uint8_t> response;
std::exception_ptr _exception; std::exception_ptr _exception;
std::shared_ptr<task> t = std::make_shared<task>( auto response = std::async(std::launch::deferred,
[this, report, &response]() { [this, report]()->std::vector<uint8_t> {
response = _respondToReport(report); return _respondToReport(report);
}, [&_exception](std::exception& e) { });
try { auto status = response.wait_for(_io_timeout);
throw e;
} catch(std::exception& e) {
_exception = std::make_exception_ptr(e);
}
});
_workqueue->queue(t);
t->waitStart();
auto status = t->waitFor(_io_timeout);
if(_exception)
std::rethrow_exception(_exception);
if(status == std::future_status::timeout) { if(status == std::future_status::timeout) {
_continue_respond = false;
interruptRead(); interruptRead();
t->wait(); if(response.valid())
if(_exception) response.wait();
std::rethrow_exception(_exception);
throw TimeoutError(); throw TimeoutError();
} else }
return response; return response.get();
} }
} }

View File

@ -30,11 +30,7 @@
#include "defs.h" #include "defs.h"
#include "../../util/mutex_queue.h" #include "../../util/mutex_queue.h"
namespace logid { namespace logid::backend::raw
class workqueue;
namespace backend {
namespace raw
{ {
class RawDevice class RawDevice
{ {
@ -42,8 +38,7 @@ namespace raw
static bool supportedReport(uint8_t id, uint8_t length); static bool supportedReport(uint8_t id, uint8_t length);
explicit RawDevice(std::string path, explicit RawDevice(std::string path,
double io_timeout, double io_timeout);
const std::shared_ptr<workqueue>& wq);
~RawDevice(); ~RawDevice();
std::string hidrawPath() const; std::string hidrawPath() const;
@ -85,7 +80,6 @@ namespace raw
std::condition_variable _listen_condition; std::condition_variable _listen_condition;
const std::chrono::milliseconds _io_timeout; const std::chrono::milliseconds _io_timeout;
const std::shared_ptr<workqueue> _workqueue;
std::map<std::string, std::shared_ptr<RawEventHandler>> std::map<std::string, std::shared_ptr<RawEventHandler>>
_event_handlers; _event_handlers;
@ -104,6 +98,6 @@ namespace raw
mutex_queue<std::shared_ptr<std::packaged_task<std::vector<uint8_t>()>>> mutex_queue<std::shared_ptr<std::packaged_task<std::vector<uint8_t>()>>>
_io_queue; _io_queue;
}; };
}}} }
#endif //LOGID_BACKEND_RAWDEVICE_H #endif //LOGID_BACKEND_RAWDEVICE_H

View File

@ -276,12 +276,10 @@ namespace logid::config {
std::variant<Device, Profile>, "name">> devices; std::variant<Device, Profile>, "name">> devices;
std::optional<std::set<uint16_t>> ignore; std::optional<std::set<uint16_t>> ignore;
std::optional<double> io_timeout; std::optional<double> io_timeout;
std::optional<int> workers; Config() : group({"devices", "ignore", "io_timeout"},
Config() : group({"devices", "ignore", "io_timeout", "workers"},
&Config::devices, &Config::devices,
&Config::ignore, &Config::ignore,
&Config::io_timeout, &Config::io_timeout) { }
&Config::workers) { }
}; };
} }

View File

@ -62,7 +62,7 @@ void DeviceStatus::listen()
auto event = hidpp20::WirelessDeviceStatus::statusBroadcastEvent( auto event = hidpp20::WirelessDeviceStatus::statusBroadcastEvent(
report); report);
if(event.reconfNeeded) if(event.reconfNeeded)
task::spawn(dev->workQueue(), [dev](){ dev->wakeup(); }); spawn_task( [dev](){ dev->wakeup(); });
}; };
_device->hidpp20().addEventHandler(EVENTHANDLER_NAME, handler); _device->hidpp20().addEventHandler(EVENTHANDLER_NAME, handler);

View File

@ -25,7 +25,6 @@
#include "util/log.h" #include "util/log.h"
#include "DeviceManager.h" #include "DeviceManager.h"
#include "InputDevice.h" #include "InputDevice.h"
#include "util/workqueue.h"
#define LOGID_VIRTUAL_INPUT_NAME "LogiOps Virtual Input" #define LOGID_VIRTUAL_INPUT_NAME "LogiOps Virtual Input"
#define DEFAULT_CONFIG_FILE "/etc/logid.cfg" #define DEFAULT_CONFIG_FILE "/etc/logid.cfg"

View File

@ -16,66 +16,17 @@
* *
*/ */
#include "task.h" #include "task.h"
#include "workqueue.h"
using namespace logid; using namespace logid;
task::task(const std::function<void()>& function, void logid::spawn_task(const std::function<void ()>& function)
const std::function<void(std::exception&)>& exception_handler) :
_function (std::make_shared<std::function<void()>>(function)),
_exception_handler (std::make_shared<std::function<void(std::exception&)>>
(exception_handler)), _status (Waiting),
_task_pkg ([this](){
try {
(*_function)();
} catch(std::exception& e) {
(*_exception_handler)(e);
}
}), _future (_task_pkg.get_future())
{ {
} auto future = std::make_shared<std::future<void>>();
*future = std::async(std::launch::async,[function, future]() {
void task::run() try {
{ function();
_status = Running; } catch(std::exception& e) {
_status_cv.notify_all(); ExceptionHandler::Default(e);
_task_pkg(); }
_status = Completed; });
_status_cv.notify_all();
}
task::Status task::getStatus()
{
return _status;
}
void task::wait()
{
if(_future.valid())
_future.wait();
else {
std::mutex wait_start;
std::unique_lock<std::mutex> lock(wait_start);
_status_cv.wait(lock, [this](){ return _status == Completed; });
}
}
void task::waitStart()
{
std::mutex wait_start;
std::unique_lock<std::mutex> lock(wait_start);
_status_cv.wait(lock, [this](){ return _status != Waiting; });
}
std::future_status task::waitFor(std::chrono::milliseconds ms)
{
return _future.wait_for(ms);
}
void task::spawn(std::shared_ptr<workqueue> wq,
const std::function<void ()>& function,
const std::function<void (std::exception&)>& exception_handler)
{
auto t = std::make_shared<task>(function, exception_handler);
wq->queue(t);
} }

View File

@ -25,48 +25,10 @@
namespace logid namespace logid
{ {
class workqueue; /* This function spawns a new task into the least used worker queue
* and forgets about it.
class task */
{ void spawn_task(const std::function<void()>& function);
public:
enum Status
{
Waiting,
Running,
Completed
};
explicit task(const std::function<void()>& function,
const std::function<void(std::exception&)>&
exception_handler={[](std::exception& e)
{ExceptionHandler::Default(e);}});
Status getStatus();
void run(); // Runs synchronously
void wait();
void waitStart();
std::future_status waitFor(std::chrono::milliseconds ms);
/* This function spawns a new task into the least used worker queue
* and forgets about it.
*/
static void spawn(std::shared_ptr<workqueue> wq,
const std::function<void()>& function,
const std::function<void(std::exception&)>&
exception_handler={[](std::exception& e)
{ExceptionHandler::Default(e);}});
private:
std::shared_ptr<std::function<void()>> _function;
std::shared_ptr<std::function<void(std::exception&)>>
_exception_handler;
std::atomic<Status> _status;
std::condition_variable _status_cv;
std::packaged_task<void()> _task_pkg;
std::future<void> _future;
};
} }
#endif //LOGID_TASK_H #endif //LOGID_TASK_H

View File

@ -1,89 +0,0 @@
/*
* Copyright 2019-2020 PixlOne
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#include <vector>
#include "worker_thread.h"
#include "log.h"
#include "workqueue.h"
using namespace logid;
worker_thread::worker_thread(workqueue* parent, std::size_t worker_number) :
_parent (parent), _worker_number (worker_number), _continue_run (false),
_thread (std::make_unique<thread> ([this](){
_run(); }, [this](std::exception& e){ _exception_handler(e); }))
{
_thread->run();
}
worker_thread::~worker_thread()
{
_continue_run = false;
_queue_cv.notify_all();
// Block until task is complete
std::unique_lock<std::mutex> lock(_busy);
while(!_queue.empty()) {
_parent->queue(_queue.front());
_queue.pop();
}
}
void worker_thread::queue(const std::shared_ptr<task>& t)
{
_queue.push(t);
_queue_cv.notify_all();
}
bool worker_thread::busy()
{
bool not_busy = _busy.try_lock();
if(not_busy)
_busy.unlock();
return !not_busy;
}
void worker_thread::_run()
{
std::unique_lock<std::mutex> lock(_run_lock);
_continue_run = true;
while(_continue_run) {
_parent->busyUpdate();
_queue_cv.wait(lock, [this]{ return !_queue.empty() ||
!_continue_run; });
if(!_continue_run)
return;
std::unique_lock<std::mutex> busy_lock(_busy);
while(!_queue.empty()) {
_queue.front()->run();
_queue.pop();
}
_parent->notifyFree();
}
}
void worker_thread::_exception_handler(std::exception &e)
{
logPrintf(WARN, "Exception caught on worker thread %d, restarting: %s",
_worker_number, e.what());
// This action destroys the logid::thread, std::thread should detach safely.
_thread = std::make_unique<thread>([this](){ _run(); },
[this](std::exception& e) { _exception_handler(e); });
_thread->run();
}

View File

@ -1,56 +0,0 @@
/*
* Copyright 2019-2020 PixlOne
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#ifndef LOGID_WORKER_THREAD_H
#define LOGID_WORKER_THREAD_H
#include "mutex_queue.h"
#include "task.h"
#include "thread.h"
namespace logid
{
class workqueue;
class worker_thread
{
public:
worker_thread(workqueue* parent, std::size_t worker_number);
~worker_thread();
void queue(const std::shared_ptr<task>& t);
bool busy();
private:
void _run();
void _exception_handler(std::exception& e);
workqueue* _parent;
const std::size_t _worker_number;
std::mutex _run_lock;
std::atomic<bool> _continue_run;
std::condition_variable _queue_cv;
std::unique_ptr<thread> _thread;
std::mutex _busy;
mutex_queue<std::shared_ptr<task>> _queue;
};
}
#endif //LOGID_WORKER_THREAD_H

View File

@ -1,134 +0,0 @@
/*
* Copyright 2019-2020 PixlOne
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#include <cassert>
#include "workqueue.h"
#include "worker_thread.h"
#include "log.h"
using namespace logid;
workqueue::workqueue(std::size_t thread_count) : _manager_thread (
std::make_unique<thread>(
[this](){ _run(); }
, [this](std::exception& e){ _exception_handler(e); }
)), _continue_run (false), _worker_count (thread_count)
{
_workers.reserve(_worker_count);
for(std::size_t i = 0; i < _worker_count; i++)
_workers.push_back(std::make_unique<worker_thread>(this, i));
_manager_thread->run();
}
workqueue::~workqueue()
{
stop();
while(_workers.empty())
_workers.pop_back();
// Queue should have been empty before, but just confirm here.
while(!_queue.empty()) {
thread::spawn([t=_queue.front()](){ t->run(); });
_queue.pop();
}
}
void workqueue::queue(const std::shared_ptr<task>& t)
{
assert(t != nullptr);
_queue.push(t);
_queue_cv.notify_all();
}
void workqueue::busyUpdate()
{
_busy_cv.notify_all();
}
void workqueue::stop()
{
_continue_run = false;
std::unique_lock<std::mutex> lock(_run_lock);
}
std::size_t workqueue::threadCount() const
{
return _workers.size();
}
void workqueue::notifyFree()
{
_busy_cv.notify_all();
}
void workqueue::_run()
{
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lock(_run_lock);
_continue_run = true;
while(_continue_run) {
bool queued = false;
_queue_cv.wait(lock, [this]{ return !(_queue.empty()); });
while(!_queue.empty()) {
if(_workers.empty()) {
if(_worker_count)
logPrintf(DEBUG, "No workers were found, running task in"
" a new thread.");
thread::spawn([t=_queue.front()](){ t->run(); });
_queue.pop();
continue;
}
for(auto& worker : _workers) {
if(!worker->busy()) {
worker->queue(_queue.front());
queued = true;
break;
}
}
if(!queued) {
if(_busy_cv.wait_for(lock, 500ms) == std::cv_status::no_timeout) {
for(auto& worker : _workers) {
if(!worker->busy()) {
worker->queue(_queue.front());
break;
}
}
} else{
// Workers busy, launch in new thread
logPrintf(DEBUG, "All workers were busy for 500ms, "
"running task in new thread.");
thread::spawn([t = _queue.front()]() { t->run(); });
}
}
_queue.pop();
}
}
}
void workqueue::_exception_handler(std::exception &e)
{
logPrintf(WARN, "Exception caught on workqueue manager thread, "
"restarting: %s" , e.what());
// This action destroys the logid::thread, std::thread should detach safely.
_manager_thread = std::make_unique<thread>([this](){ _run(); },
[this](std::exception& e) { _exception_handler(e); });
_manager_thread->run();
}

View File

@ -1,63 +0,0 @@
/*
* Copyright 2019-2020 PixlOne
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#ifndef LOGID_WORKQUEUE_H
#define LOGID_WORKQUEUE_H
#include "mutex_queue.h"
#include "task.h"
#include "thread.h"
namespace logid
{
class worker_thread;
class workqueue
{
public:
explicit workqueue(std::size_t thread_count);
~workqueue();
void queue(const std::shared_ptr<task>& t);
void busyUpdate();
void stop();
[[nodiscard]] std::size_t threadCount() const;
friend class worker_thread;
void notifyFree();
private:
void _run();
void _exception_handler(std::exception& e);
std::unique_ptr<thread> _manager_thread;
mutex_queue<std::shared_ptr<task>> _queue;
std::condition_variable _queue_cv;
std::condition_variable _busy_cv;
std::mutex _run_lock;
std::atomic<bool> _continue_run;
std::vector<std::unique_ptr<worker_thread>> _workers;
std::size_t _worker_count;
};
}
#endif //LOGID_WORKQUEUE_H