Reduce thread footprint by using a work queue

Also allows tasks on queue to be run after a certain amount of time.
This commit is contained in:
pixl 2023-05-01 22:43:22 -04:00
parent 1ff386a4bf
commit 8a4e2cce81
No known key found for this signature in database
GPG Key ID: 1866C148CD593B6E
18 changed files with 178 additions and 103 deletions

View File

@ -29,6 +29,7 @@
namespace logid { namespace logid {
namespace defaults { namespace defaults {
static constexpr double io_timeout = 500; static constexpr double io_timeout = 500;
static constexpr int workers = 4;
static constexpr int gesture_threshold = 50; static constexpr int gesture_threshold = 50;
} }

View File

@ -178,7 +178,6 @@ void Device::sleep() {
void Device::wakeup() { void Device::wakeup() {
std::lock_guard<std::mutex> lock(_state_lock); std::lock_guard<std::mutex> lock(_state_lock);
logPrintf(INFO, "%s:%d woke up.", _path.c_str(), _index); logPrintf(INFO, "%s:%d woke up.", _path.c_str(), _index);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
reset(); reset();

View File

@ -93,8 +93,7 @@ void Receiver::addDevice(hidpp::DeviceConnectionEvent event) {
try { try {
// Check if device is ignored before continuing // Check if device is ignored before continuing
if (manager->config()->ignore.value_or( if (manager->config()->ignore.value_or(std::set<uint16_t>()).contains(event.pid)) {
std::set<uint16_t>()).contains(event.pid)) {
logPrintf(DEBUG, "%s:%d: Device 0x%04x ignored.", logPrintf(DEBUG, "%s:%d: Device 0x%04x ignored.",
_path.c_str(), event.index, event.pid); _path.c_str(), event.index, event.pid);
return; return;

View File

@ -48,8 +48,7 @@ void ChangeDPI::press() {
_pressed = true; _pressed = true;
std::shared_lock lock(_config_mutex); std::shared_lock lock(_config_mutex);
if (_dpi && _config.inc.has_value()) { if (_dpi && _config.inc.has_value()) {
spawn_task( run_task([this, sensor = _config.sensor.value_or(0),
[this, sensor = _config.sensor.value_or(0),
inc = _config.inc.value()] { inc = _config.inc.value()] {
try { try {
uint16_t last_dpi = _dpi->getDPI(sensor); uint16_t last_dpi = _dpi->getDPI(sensor);

View File

@ -84,8 +84,7 @@ void ChangeHostAction::press() {
void ChangeHostAction::release() { void ChangeHostAction::release() {
std::shared_lock lock(_config_mutex); std::shared_lock lock(_config_mutex);
if (_change_host && _config.host.has_value()) { if (_change_host && _config.host.has_value()) {
spawn_task( run_task([this, host = _config.host.value()] {
[this, host=_config.host.value()] {
auto host_info = _change_host->getHostInfo(); auto host_info = _change_host->getHostInfo();
int next_host; int next_host;
if (std::holds_alternative<std::string>(host)) { if (std::holds_alternative<std::string>(host)) {

View File

@ -71,7 +71,7 @@ void CycleDPI::press() {
if (_current_dpi == _config.dpis.value().end()) if (_current_dpi == _config.dpis.value().end())
_current_dpi = _config.dpis.value().begin(); _current_dpi = _config.dpis.value().begin();
spawn_task([this, dpi = *_current_dpi] { run_task([this, dpi = *_current_dpi] {
try { try {
_dpi->setDPI(dpi, _config.sensor.value_or(0)); _dpi->setDPI(dpi, _config.sensor.value_or(0));
} catch (backend::hidpp20::Error& e) { } catch (backend::hidpp20::Error& e) {

View File

@ -41,8 +41,7 @@ ToggleHiresScroll::ToggleHiresScroll(
void ToggleHiresScroll::press() { void ToggleHiresScroll::press() {
_pressed = true; _pressed = true;
if (_hires_scroll) { if (_hires_scroll) {
spawn_task( run_task([hires = this->_hires_scroll]() {
[hires = this->_hires_scroll]() {
auto mode = hires->getMode(); auto mode = hires->getMode();
mode ^= backend::hidpp20::HiresScroll::HiRes; mode ^= backend::hidpp20::HiresScroll::HiRes;
hires->setMode(mode); hires->setMode(mode);

View File

@ -42,8 +42,7 @@ ToggleSmartShift::ToggleSmartShift(
void ToggleSmartShift::press() { void ToggleSmartShift::press() {
_pressed = true; _pressed = true;
if (_smartshift) { if (_smartshift) {
spawn_task( run_task([ss = this->_smartshift]() {
[ss = this->_smartshift]() {
auto status = ss->getStatus(); auto status = ss->getStatus();
status.setActive = true; status.setActive = true;
status.active = !status.active; status.active = !status.active;

View File

@ -22,7 +22,7 @@
#include <stdexcept> #include <stdexcept>
namespace logid::backend { namespace logid::backend {
class DeviceNotReady : std::exception { class DeviceNotReady : public std::exception {
public: public:
[[nodiscard]] const char* what() const noexcept override; [[nodiscard]] const char* what() const noexcept override;
}; };

View File

@ -57,8 +57,7 @@ hidpp::Report Device::sendReport(const hidpp::Report& report) {
response_slot.sub_id = report.subId(); response_slot.sub_id = report.subId();
_sendReport(report); _sendReport(report);
bool valid = _response_cv.wait_for(lock, io_timeout, bool valid = _response_cv.wait_for(lock, io_timeout, [&response_slot]() {
[&response_slot]() {
return response_slot.response.has_value(); return response_slot.response.has_value();
}); });

View File

@ -49,7 +49,7 @@ void ReceiverMonitor::ready() {
*/ */
hidpp::Report report(raw); hidpp::Report report(raw);
spawn_task([this, report, path = this->_receiver->rawDevice()->rawPath()]() { run_task([this, report, path = this->_receiver->rawDevice()->rawPath()]() {
if (report.subId() == Receiver::DeviceConnection) { if (report.subId() == Receiver::DeviceConnection) {
try { try {
this->addDevice(this->_receiver->deviceConnectionEvent(report)); this->addDevice(this->_receiver->deviceConnectionEvent(report));
@ -85,7 +85,7 @@ void ReceiverMonitor::ready() {
if (filled) { if (filled) {
_pair_state = FindingPasskey; _pair_state = FindingPasskey;
spawn_task([this, event = _discovery_event]() { run_task([this, event = _discovery_event]() {
receiver()->startBoltPairing(event); receiver()->startBoltPairing(event);
}); });
} }
@ -165,7 +165,7 @@ void ReceiverMonitor::waitForDevice(hidpp::DeviceIndex index) {
event.index = index; event.index = index;
event.fromTimeoutCheck = true; event.fromTimeoutCheck = true;
spawn_task([this, event, handler_id]() { run_task([this, event, handler_id]() {
*handler_id = {}; *handler_id = {};
try { try {
addDevice(event); addDevice(event);

View File

@ -96,9 +96,9 @@ void DeviceMonitor::ready() {
std::string dev_node = udev_device_get_devnode(device); std::string dev_node = udev_device_get_devnode(device);
if (action == "add") if (action == "add")
spawn_task([this, dev_node]() { _addHandler(dev_node); }); run_task([this, dev_node]() { _addHandler(dev_node); });
else if (action == "remove") else if (action == "remove")
spawn_task([this, dev_node]() { _removeHandler(dev_node); }); run_task([this, dev_node]() { _removeHandler(dev_node); });
udev_device_unref(device); udev_device_unref(device);
}, },
@ -129,23 +129,20 @@ void DeviceMonitor::enumerate() {
udev_enumerate_get_list_entry(udev_enum)) { udev_enumerate_get_list_entry(udev_enum)) {
const char* name = udev_list_entry_get_name(udev_enum_entry); const char* name = udev_list_entry_get_name(udev_enum_entry);
struct udev_device* device = udev_device_new_from_syspath(_udev_context, struct udev_device* device = udev_device_new_from_syspath(_udev_context, name);
name);
if (!device) if (!device)
throw std::runtime_error("udev_device_new_from_syspath failed"); throw std::runtime_error("udev_device_new_from_syspath failed");
std::string dev_node = udev_device_get_devnode(device); std::string dev_node = udev_device_get_devnode(device);
udev_device_unref(device); udev_device_unref(device);
spawn_task([this, dev_node]() { _addHandler(dev_node); }); _addHandler(dev_node);
} }
udev_enumerate_unref(udev_enum); udev_enumerate_unref(udev_enum);
} }
void DeviceMonitor::_addHandler(const std::string& device) { void DeviceMonitor::_addHandler(const std::string& device, int tries) {
int tries;
for (tries = 0; tries < max_tries; ++tries) {
try { try {
auto supported_reports = backend::hidpp::getSupportedReports( auto supported_reports = backend::hidpp::getSupportedReports(
RawDevice::getReportDescriptor(device)); RawDevice::getReportDescriptor(device));
@ -153,21 +150,19 @@ void DeviceMonitor::_addHandler(const std::string& device) {
this->addDevice(device); this->addDevice(device);
else else
logPrintf(DEBUG, "Unsupported device %s ignored", device.c_str()); logPrintf(DEBUG, "Unsupported device %s ignored", device.c_str());
break;
} catch (backend::DeviceNotReady& e) { } catch (backend::DeviceNotReady& e) {
/* Do exponential backoff for 2^tries * backoff ms. */
std::chrono::milliseconds timeout((1 << tries) * ready_backoff);
logPrintf(DEBUG, "Failed to add device %s on try %d, backing off for %dms",
device.c_str(), tries + 1, timeout.count());
std::this_thread::sleep_for(timeout);
} catch (std::exception& e) {
logPrintf(WARN, "Error adding device %s: %s", device.c_str(), e.what());
break;
}
}
if (tries == max_tries) { if (tries == max_tries) {
logPrintf(WARN, "Failed to add device %s after %d tries. Treating as failure.", logPrintf(WARN, "Failed to add device %s after %d tries. Treating as failure.",
device.c_str(), max_tries); device.c_str(), max_tries);
} else {
/* Do exponential backoff for 2^tries * backoff ms. */
std::chrono::milliseconds wait((1 << tries) * ready_backoff);
logPrintf(DEBUG, "Failed to add device %s on try %d, backing off for %dms",
device.c_str(), tries + 1, wait.count());
run_task_after([this, device, tries](){ _addHandler(device, tries + 1); }, wait);
}
} catch (std::exception& e) {
logPrintf(WARN, "Error adding device %s: %s", device.c_str(), e.what());
} }
} }

View File

@ -55,7 +55,7 @@ namespace logid::backend::raw {
virtual void removeDevice(std::string device) = 0; virtual void removeDevice(std::string device) = 0;
private: private:
void _addHandler(const std::string& device); void _addHandler(const std::string& device, int tries = 0);
void _removeHandler(const std::string& device); void _removeHandler(const std::string& device);

View File

@ -304,11 +304,13 @@ namespace logid::config {
std::variant<Device, Profile>, "name">> devices; std::variant<Device, Profile>, "name">> devices;
std::optional<std::set<uint16_t>> ignore; std::optional<std::set<uint16_t>> ignore;
std::optional<double> io_timeout; std::optional<double> io_timeout;
std::optional<int> workers;
Config() : group({"devices", "ignore", "io_timeout"}, Config() : group({"devices", "ignore", "io_timeout", "workers"},
&Config::devices, &Config::devices,
&Config::ignore, &Config::ignore,
&Config::io_timeout) {} &Config::io_timeout,
&Config::workers) {}
}; };
} }

View File

@ -57,7 +57,8 @@ void DeviceStatus::listen() {
auto event = auto event =
hidpp20::WirelessDeviceStatus::statusBroadcastEvent(report); hidpp20::WirelessDeviceStatus::statusBroadcastEvent(report);
if (event.reconfNeeded) if (event.reconfNeeded)
spawn_task([dev]() { dev->wakeup(); }); run_task_after([dev]() { dev->wakeup(); },
std::chrono::milliseconds(100));
} }
}); });
} }

View File

@ -18,6 +18,7 @@
#include <DeviceManager.h> #include <DeviceManager.h>
#include <InputDevice.h> #include <InputDevice.h>
#include <util/task.h>
#include <util/log.h> #include <util/log.h>
#include <algorithm> #include <algorithm>
#include <ipc_defs.h> #include <ipc_defs.h>
@ -142,6 +143,8 @@ int main(int argc, char** argv) {
config = std::make_shared<Configuration>(); config = std::make_shared<Configuration>();
} }
init_workers(config->workers.value_or(defaults::workers));
#ifdef USE_USER_BUS #ifdef USE_USER_BUS
auto server_bus = ipcgull::IPCGULL_USER; auto server_bus = ipcgull::IPCGULL_USER;
#else #else

View File

@ -16,16 +16,90 @@
* *
*/ */
#include <util/task.h> #include <util/task.h>
#include <queue>
#include <optional>
using namespace logid; using namespace logid;
using namespace std::chrono;
void logid::spawn_task(const std::function<void()>& function) { struct task_less {
auto future = std::make_shared<std::future<void>>(); private:
*future = std::async(std::launch::async, [function, future]() { std::greater<> greater;
public:
bool operator()(const task& a, const task& b) const {
return greater(a.time, b.time);
}
};
static std::priority_queue<task, std::vector<task>, task_less> tasks {};
static std::mutex task_mutex {};
static std::condition_variable task_cv {};
static std::atomic_bool workers_init = false;
[[noreturn]] static void worker() {
std::unique_lock lock(task_mutex);
while (true) {
task_cv.wait(lock, []() { return !tasks.empty(); });
/* top task is in the future, wait */
if (tasks.top().time >= system_clock::now()) {
auto wait = tasks.top().time - system_clock::now();
task_cv.wait_for(lock, wait, []() {
return !tasks.empty() && (tasks.top().time < system_clock::now());
});
}
if (!tasks.empty()) {
/* May have timed out and is no longer empty */
auto f = tasks.top().function;
tasks.pop();
lock.unlock();
try { try {
function(); f();
} catch(std::exception& e) { } catch(std::exception& e) {
ExceptionHandler::Default(e); ExceptionHandler::Default(e);
} }
}); lock.lock();
}
}
}
void logid::init_workers(int worker_count) {
std::lock_guard lock(task_mutex);
for (int i = 0; i < worker_count; ++i)
std::thread(&worker).detach();
workers_init = true;
}
void logid::run_task(std::function<void()> function) {
task t{
.function = std::move(function),
.time = std::chrono::system_clock::now()
};
run_task(t);
}
void logid::run_task_after(std::function<void()> function, std::chrono::milliseconds delay) {
task t{
.function = std::move(function),
.time = system_clock::now() + delay
};
run_task(t);
}
void logid::run_task(task t) {
std::lock_guard lock(task_mutex);
if (!workers_init) {
throw std::runtime_error("tasks queued before work queue ready");
}
tasks.emplace(std::move(t));
// TODO: only need to wake up at top
task_cv.notify_one();
} }

View File

@ -24,10 +24,16 @@
#include <future> #include <future>
namespace logid { namespace logid {
/* This function spawns a new task into the least used worker queue struct task {
* and forgets about it. std::function<void()> function;
*/ std::chrono::time_point<std::chrono::system_clock> time;
void spawn_task(const std::function<void()>& function); };
void init_workers(int worker_count);
void run_task(std::function<void()> function);
void run_task_after(std::function<void()> function, std::chrono::milliseconds delay);
void run_task(task t);
} }
#endif //LOGID_TASK_H #endif //LOGID_TASK_H