diff --git a/src/logid/Configuration.h b/src/logid/Configuration.h index 0687b5c..a064469 100644 --- a/src/logid/Configuration.h +++ b/src/logid/Configuration.h @@ -29,6 +29,7 @@ namespace logid { namespace defaults { static constexpr double io_timeout = 500; + static constexpr int workers = 4; static constexpr int gesture_threshold = 50; } diff --git a/src/logid/Device.cpp b/src/logid/Device.cpp index a556e33..db68230 100644 --- a/src/logid/Device.cpp +++ b/src/logid/Device.cpp @@ -178,7 +178,6 @@ void Device::sleep() { void Device::wakeup() { std::lock_guard lock(_state_lock); logPrintf(INFO, "%s:%d woke up.", _path.c_str(), _index); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); reset(); diff --git a/src/logid/Receiver.cpp b/src/logid/Receiver.cpp index afb870d..56c94a2 100644 --- a/src/logid/Receiver.cpp +++ b/src/logid/Receiver.cpp @@ -93,8 +93,7 @@ void Receiver::addDevice(hidpp::DeviceConnectionEvent event) { try { // Check if device is ignored before continuing - if (manager->config()->ignore.value_or( - std::set()).contains(event.pid)) { + if (manager->config()->ignore.value_or(std::set()).contains(event.pid)) { logPrintf(DEBUG, "%s:%d: Device 0x%04x ignored.", _path.c_str(), event.index, event.pid); return; diff --git a/src/logid/actions/ChangeDPI.cpp b/src/logid/actions/ChangeDPI.cpp index 88b418d..c2c3c79 100644 --- a/src/logid/actions/ChangeDPI.cpp +++ b/src/logid/actions/ChangeDPI.cpp @@ -48,21 +48,20 @@ void ChangeDPI::press() { _pressed = true; std::shared_lock lock(_config_mutex); if (_dpi && _config.inc.has_value()) { - spawn_task( - [this, sensor = _config.sensor.value_or(0), - inc = _config.inc.value()] { - try { - uint16_t last_dpi = _dpi->getDPI(sensor); - _dpi->setDPI(last_dpi + inc,sensor); - } catch (backend::hidpp20::Error& e) { - if (e.code() == backend::hidpp20::Error::InvalidArgument) - logPrintf(WARN, "%s:%d: Could not get/set DPI for sensor %d", - _device->hidpp20().devicePath().c_str(), - _device->hidpp20().deviceIndex(), sensor); - else - throw e; - } - }); + run_task([this, sensor = _config.sensor.value_or(0), + inc = _config.inc.value()] { + try { + uint16_t last_dpi = _dpi->getDPI(sensor); + _dpi->setDPI(last_dpi + inc, sensor); + } catch (backend::hidpp20::Error& e) { + if (e.code() == backend::hidpp20::Error::InvalidArgument) + logPrintf(WARN, "%s:%d: Could not get/set DPI for sensor %d", + _device->hidpp20().devicePath().c_str(), + _device->hidpp20().deviceIndex(), sensor); + else + throw e; + } + }); } } diff --git a/src/logid/actions/ChangeHostAction.cpp b/src/logid/actions/ChangeHostAction.cpp index 30840cb..fd5f9ad 100644 --- a/src/logid/actions/ChangeHostAction.cpp +++ b/src/logid/actions/ChangeHostAction.cpp @@ -84,25 +84,24 @@ void ChangeHostAction::press() { void ChangeHostAction::release() { std::shared_lock lock(_config_mutex); if (_change_host && _config.host.has_value()) { - spawn_task( - [this, host=_config.host.value()] { - auto host_info = _change_host->getHostInfo(); - int next_host; - if (std::holds_alternative(host)) { - const auto& host_str = std::get(host); - if (host_str == "next") - next_host = host_info.currentHost + 1; - else if (host_str == "prev" || host_str == "previous") - next_host = host_info.currentHost - 1; - else - next_host = host_info.currentHost; - } else { - next_host = std::get(host) - 1; - } - next_host %= host_info.hostCount; - if (next_host != host_info.currentHost) - _change_host->setHost(next_host); - }); + run_task([this, host = _config.host.value()] { + auto host_info = _change_host->getHostInfo(); + int next_host; + if (std::holds_alternative(host)) { + const auto& host_str = std::get(host); + if (host_str == "next") + next_host = host_info.currentHost + 1; + else if (host_str == "prev" || host_str == "previous") + next_host = host_info.currentHost - 1; + else + next_host = host_info.currentHost; + } else { + next_host = std::get(host) - 1; + } + next_host %= host_info.hostCount; + if (next_host != host_info.currentHost) + _change_host->setHost(next_host); + }); } } diff --git a/src/logid/actions/CycleDPI.cpp b/src/logid/actions/CycleDPI.cpp index f6ab5ae..41b43c6 100644 --- a/src/logid/actions/CycleDPI.cpp +++ b/src/logid/actions/CycleDPI.cpp @@ -71,7 +71,7 @@ void CycleDPI::press() { if (_current_dpi == _config.dpis.value().end()) _current_dpi = _config.dpis.value().begin(); - spawn_task([this, dpi = *_current_dpi] { + run_task([this, dpi = *_current_dpi] { try { _dpi->setDPI(dpi, _config.sensor.value_or(0)); } catch (backend::hidpp20::Error& e) { diff --git a/src/logid/actions/ToggleHiresScroll.cpp b/src/logid/actions/ToggleHiresScroll.cpp index 1345184..0b4dded 100644 --- a/src/logid/actions/ToggleHiresScroll.cpp +++ b/src/logid/actions/ToggleHiresScroll.cpp @@ -41,12 +41,11 @@ ToggleHiresScroll::ToggleHiresScroll( void ToggleHiresScroll::press() { _pressed = true; if (_hires_scroll) { - spawn_task( - [hires = this->_hires_scroll]() { - auto mode = hires->getMode(); - mode ^= backend::hidpp20::HiresScroll::HiRes; - hires->setMode(mode); - }); + run_task([hires = this->_hires_scroll]() { + auto mode = hires->getMode(); + mode ^= backend::hidpp20::HiresScroll::HiRes; + hires->setMode(mode); + }); } } diff --git a/src/logid/actions/ToggleSmartShift.cpp b/src/logid/actions/ToggleSmartShift.cpp index f6445fe..ceb3167 100644 --- a/src/logid/actions/ToggleSmartShift.cpp +++ b/src/logid/actions/ToggleSmartShift.cpp @@ -42,13 +42,12 @@ ToggleSmartShift::ToggleSmartShift( void ToggleSmartShift::press() { _pressed = true; if (_smartshift) { - spawn_task( - [ss = this->_smartshift]() { - auto status = ss->getStatus(); - status.setActive = true; - status.active = !status.active; - ss->setStatus(status); - }); + run_task([ss = this->_smartshift]() { + auto status = ss->getStatus(); + status.setActive = true; + status.active = !status.active; + ss->setStatus(status); + }); } } diff --git a/src/logid/backend/Error.h b/src/logid/backend/Error.h index 26b309d..520923e 100644 --- a/src/logid/backend/Error.h +++ b/src/logid/backend/Error.h @@ -22,7 +22,7 @@ #include namespace logid::backend { - class DeviceNotReady : std::exception { + class DeviceNotReady : public std::exception { public: [[nodiscard]] const char* what() const noexcept override; }; diff --git a/src/logid/backend/hidpp10/Device.cpp b/src/logid/backend/hidpp10/Device.cpp index 8d40a8b..023abe0 100644 --- a/src/logid/backend/hidpp10/Device.cpp +++ b/src/logid/backend/hidpp10/Device.cpp @@ -57,10 +57,9 @@ hidpp::Report Device::sendReport(const hidpp::Report& report) { response_slot.sub_id = report.subId(); _sendReport(report); - bool valid = _response_cv.wait_for(lock, io_timeout, - [&response_slot]() { - return response_slot.response.has_value(); - }); + bool valid = _response_cv.wait_for(lock, io_timeout, [&response_slot]() { + return response_slot.response.has_value(); + }); if (!valid) { response_slot.reset(); diff --git a/src/logid/backend/hidpp10/ReceiverMonitor.cpp b/src/logid/backend/hidpp10/ReceiverMonitor.cpp index db65f89..4f50052 100644 --- a/src/logid/backend/hidpp10/ReceiverMonitor.cpp +++ b/src/logid/backend/hidpp10/ReceiverMonitor.cpp @@ -49,7 +49,7 @@ void ReceiverMonitor::ready() { */ hidpp::Report report(raw); - spawn_task([this, report, path = this->_receiver->rawDevice()->rawPath()]() { + run_task([this, report, path = this->_receiver->rawDevice()->rawPath()]() { if (report.subId() == Receiver::DeviceConnection) { try { this->addDevice(this->_receiver->deviceConnectionEvent(report)); @@ -85,7 +85,7 @@ void ReceiverMonitor::ready() { if (filled) { _pair_state = FindingPasskey; - spawn_task([this, event = _discovery_event]() { + run_task([this, event = _discovery_event]() { receiver()->startBoltPairing(event); }); } @@ -165,7 +165,7 @@ void ReceiverMonitor::waitForDevice(hidpp::DeviceIndex index) { event.index = index; event.fromTimeoutCheck = true; - spawn_task([this, event, handler_id]() { + run_task([this, event, handler_id]() { *handler_id = {}; try { addDevice(event); diff --git a/src/logid/backend/raw/DeviceMonitor.cpp b/src/logid/backend/raw/DeviceMonitor.cpp index 71c744e..3c5f20b 100644 --- a/src/logid/backend/raw/DeviceMonitor.cpp +++ b/src/logid/backend/raw/DeviceMonitor.cpp @@ -96,9 +96,9 @@ void DeviceMonitor::ready() { std::string dev_node = udev_device_get_devnode(device); if (action == "add") - spawn_task([this, dev_node]() { _addHandler(dev_node); }); + run_task([this, dev_node]() { _addHandler(dev_node); }); else if (action == "remove") - spawn_task([this, dev_node]() { _removeHandler(dev_node); }); + run_task([this, dev_node]() { _removeHandler(dev_node); }); udev_device_unref(device); }, @@ -129,45 +129,40 @@ void DeviceMonitor::enumerate() { udev_enumerate_get_list_entry(udev_enum)) { const char* name = udev_list_entry_get_name(udev_enum_entry); - struct udev_device* device = udev_device_new_from_syspath(_udev_context, - name); + struct udev_device* device = udev_device_new_from_syspath(_udev_context, name); if (!device) throw std::runtime_error("udev_device_new_from_syspath failed"); std::string dev_node = udev_device_get_devnode(device); udev_device_unref(device); - spawn_task([this, dev_node]() { _addHandler(dev_node); }); + _addHandler(dev_node); } udev_enumerate_unref(udev_enum); } -void DeviceMonitor::_addHandler(const std::string& device) { - int tries; - for (tries = 0; tries < max_tries; ++tries) { - try { - auto supported_reports = backend::hidpp::getSupportedReports( - RawDevice::getReportDescriptor(device)); - if (supported_reports) - this->addDevice(device); - else - logPrintf(DEBUG, "Unsupported device %s ignored", device.c_str()); - break; - } catch (backend::DeviceNotReady& e) { +void DeviceMonitor::_addHandler(const std::string& device, int tries) { + try { + auto supported_reports = backend::hidpp::getSupportedReports( + RawDevice::getReportDescriptor(device)); + if (supported_reports) + this->addDevice(device); + else + logPrintf(DEBUG, "Unsupported device %s ignored", device.c_str()); + } catch (backend::DeviceNotReady& e) { + if (tries == max_tries) { + logPrintf(WARN, "Failed to add device %s after %d tries. Treating as failure.", + device.c_str(), max_tries); + } else { /* Do exponential backoff for 2^tries * backoff ms. */ - std::chrono::milliseconds timeout((1 << tries) * ready_backoff); + std::chrono::milliseconds wait((1 << tries) * ready_backoff); logPrintf(DEBUG, "Failed to add device %s on try %d, backing off for %dms", - device.c_str(), tries + 1, timeout.count()); - std::this_thread::sleep_for(timeout); - } catch (std::exception& e) { - logPrintf(WARN, "Error adding device %s: %s", device.c_str(), e.what()); - break; + device.c_str(), tries + 1, wait.count()); + run_task_after([this, device, tries](){ _addHandler(device, tries + 1); }, wait); } - } - if (tries == max_tries) { - logPrintf(WARN, "Failed to add device %s after %d tries. Treating as failure.", - device.c_str(), max_tries); + } catch (std::exception& e) { + logPrintf(WARN, "Error adding device %s: %s", device.c_str(), e.what()); } } diff --git a/src/logid/backend/raw/DeviceMonitor.h b/src/logid/backend/raw/DeviceMonitor.h index ac3061e..a0c8473 100644 --- a/src/logid/backend/raw/DeviceMonitor.h +++ b/src/logid/backend/raw/DeviceMonitor.h @@ -55,7 +55,7 @@ namespace logid::backend::raw { virtual void removeDevice(std::string device) = 0; private: - void _addHandler(const std::string& device); + void _addHandler(const std::string& device, int tries = 0); void _removeHandler(const std::string& device); diff --git a/src/logid/config/schema.h b/src/logid/config/schema.h index 97b8b39..1fac35c 100644 --- a/src/logid/config/schema.h +++ b/src/logid/config/schema.h @@ -304,11 +304,13 @@ namespace logid::config { std::variant, "name">> devices; std::optional> ignore; std::optional io_timeout; + std::optional workers; - Config() : group({"devices", "ignore", "io_timeout"}, + Config() : group({"devices", "ignore", "io_timeout", "workers"}, &Config::devices, &Config::ignore, - &Config::io_timeout) {} + &Config::io_timeout, + &Config::workers) {} }; } diff --git a/src/logid/features/DeviceStatus.cpp b/src/logid/features/DeviceStatus.cpp index a88bdbf..7a84c00 100644 --- a/src/logid/features/DeviceStatus.cpp +++ b/src/logid/features/DeviceStatus.cpp @@ -57,7 +57,8 @@ void DeviceStatus::listen() { auto event = hidpp20::WirelessDeviceStatus::statusBroadcastEvent(report); if (event.reconfNeeded) - spawn_task([dev]() { dev->wakeup(); }); + run_task_after([dev]() { dev->wakeup(); }, + std::chrono::milliseconds(100)); } }); } diff --git a/src/logid/logid.cpp b/src/logid/logid.cpp index 4babd3f..3a402ea 100644 --- a/src/logid/logid.cpp +++ b/src/logid/logid.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -142,6 +143,8 @@ int main(int argc, char** argv) { config = std::make_shared(); } + init_workers(config->workers.value_or(defaults::workers)); + #ifdef USE_USER_BUS auto server_bus = ipcgull::IPCGULL_USER; #else diff --git a/src/logid/util/task.cpp b/src/logid/util/task.cpp index 4eaacf3..fea2f5e 100644 --- a/src/logid/util/task.cpp +++ b/src/logid/util/task.cpp @@ -16,16 +16,90 @@ * */ #include +#include +#include using namespace logid; +using namespace std::chrono; -void logid::spawn_task(const std::function& function) { - auto future = std::make_shared>(); - *future = std::async(std::launch::async, [function, future]() { - try { - function(); - } catch (std::exception& e) { - ExceptionHandler::Default(e); +struct task_less { +private: + std::greater<> greater; +public: + bool operator()(const task& a, const task& b) const { + return greater(a.time, b.time); + } +}; + +static std::priority_queue, task_less> tasks {}; +static std::mutex task_mutex {}; +static std::condition_variable task_cv {}; +static std::atomic_bool workers_init = false; + +[[noreturn]] static void worker() { + std::unique_lock lock(task_mutex); + while (true) { + task_cv.wait(lock, []() { return !tasks.empty(); }); + + /* top task is in the future, wait */ + if (tasks.top().time >= system_clock::now()) { + auto wait = tasks.top().time - system_clock::now(); + task_cv.wait_for(lock, wait, []() { + return !tasks.empty() && (tasks.top().time < system_clock::now()); + }); } - }); -} \ No newline at end of file + + if (!tasks.empty()) { + /* May have timed out and is no longer empty */ + auto f = tasks.top().function; + tasks.pop(); + + lock.unlock(); + try { + f(); + } catch(std::exception& e) { + ExceptionHandler::Default(e); + } + lock.lock(); + } + } +} + +void logid::init_workers(int worker_count) { + std::lock_guard lock(task_mutex); + + for (int i = 0; i < worker_count; ++i) + std::thread(&worker).detach(); + + workers_init = true; +} + +void logid::run_task(std::function function) { + task t{ + .function = std::move(function), + .time = std::chrono::system_clock::now() + }; + + run_task(t); +} + +void logid::run_task_after(std::function function, std::chrono::milliseconds delay) { + task t{ + .function = std::move(function), + .time = system_clock::now() + delay + }; + + run_task(t); +} + +void logid::run_task(task t) { + std::lock_guard lock(task_mutex); + + if (!workers_init) { + throw std::runtime_error("tasks queued before work queue ready"); + } + + tasks.emplace(std::move(t)); + // TODO: only need to wake up at top + task_cv.notify_one(); +} diff --git a/src/logid/util/task.h b/src/logid/util/task.h index a48468f..59d75a1 100644 --- a/src/logid/util/task.h +++ b/src/logid/util/task.h @@ -24,10 +24,16 @@ #include namespace logid { - /* This function spawns a new task into the least used worker queue - * and forgets about it. - */ - void spawn_task(const std::function& function); + struct task { + std::function function; + std::chrono::time_point time; + }; + + void init_workers(int worker_count); + + void run_task(std::function function); + void run_task_after(std::function function, std::chrono::milliseconds delay); + void run_task(task t); } #endif //LOGID_TASK_H