Merge pull request #390 from PixlOne/fix-iomon-locks

Resolve deadlocking when adding to IOMonitor
This commit is contained in:
pixl 2023-07-12 13:59:18 -04:00 committed by GitHub
commit 31d1955faf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 128 additions and 119 deletions

View File

@ -66,11 +66,11 @@ void DeviceManager::addDevice(std::string path) {
// Check if device is ignored before continuing // Check if device is ignored before continuing
{ {
raw::RawDevice raw_dev(path, self<DeviceManager>().lock()); auto raw_dev = raw::RawDevice::make(path, self<DeviceManager>().lock());
if (config()->ignore.has_value() && if (config()->ignore.has_value() &&
config()->ignore.value().contains(raw_dev.productId())) { config()->ignore.value().contains(raw_dev->productId())) {
logPrintf(DEBUG, "%s: Device 0x%04x ignored.", logPrintf(DEBUG, "%s: Device 0x%04x ignored.",
path.c_str(), raw_dev.productId()); path.c_str(), raw_dev->productId());
return; return;
} }
} }

View File

@ -53,7 +53,7 @@ Device::Device(const std::string& path, DeviceIndex index,
const std::shared_ptr<raw::DeviceMonitor>& monitor, double timeout) : const std::shared_ptr<raw::DeviceMonitor>& monitor, double timeout) :
io_timeout(duration_cast<milliseconds>( io_timeout(duration_cast<milliseconds>(
duration<double, std::milli>(timeout))), duration<double, std::milli>(timeout))),
_raw_device(std::make_shared<raw::RawDevice>(path, monitor)), _raw_device(raw::RawDevice::make(path, monitor)),
_receiver(nullptr), _path(path), _index(index) { _receiver(nullptr), _path(path), _index(index) {
} }

View File

@ -89,23 +89,25 @@ void DeviceMonitor::ready() {
_ready = true; _ready = true;
_io_monitor->add(_fd, { _io_monitor->add(_fd, {
[this]() { [self_weak = _self]() {
struct udev_device* device = udev_monitor_receive_device(_udev_monitor); if (auto self = self_weak.lock()) {
std::string action = udev_device_get_action(device); struct udev_device* device = udev_monitor_receive_device(self->_udev_monitor);
std::string dev_node = udev_device_get_devnode(device); std::string action = udev_device_get_action(device);
std::string dev_node = udev_device_get_devnode(device);
if (action == "add") if (action == "add")
run_task([self_weak = _self, dev_node]() { run_task([self_weak, dev_node]() {
if (auto self = self_weak.lock()) if (auto self = self_weak.lock())
self->_addHandler(dev_node); self->_addHandler(dev_node);
}); });
else if (action == "remove") else if (action == "remove")
run_task([self_weak = _self, dev_node]() { run_task([self_weak, dev_node]() {
if (auto self = self_weak.lock()) if (auto self = self_weak.lock())
self->_removeHandler(dev_node); self->_removeHandler(dev_node);
}); });
udev_device_unref(device); udev_device_unref(device);
}
}, },
[]() { []() {
throw std::runtime_error("udev hangup"); throw std::runtime_error("udev hangup");

View File

@ -16,7 +16,7 @@
* *
*/ */
#include <backend/raw/IOMonitor.h> #include <backend/raw/IOMonitor.h>
#include <cassert> #include <util/log.h>
#include <optional> #include <optional>
extern "C" extern "C"
@ -36,55 +36,6 @@ IOHandler::IOHandler(std::function<void()> r,
error(std::move(err)) { error(std::move(err)) {
} }
class IOMonitor::io_lock {
std::optional<std::unique_lock<std::mutex>> _lock;
IOMonitor* _io_monitor;
const uint64_t counter = 1;
public:
explicit io_lock(IOMonitor* io_monitor) : _io_monitor(io_monitor) {
_io_monitor->_interrupting = true;
[[maybe_unused]] ssize_t ret = ::write(_io_monitor->_event_fd, &counter, sizeof(counter));
assert(ret == sizeof(counter));
_lock.emplace(_io_monitor->_run_mutex);
}
io_lock(const io_lock&) = delete;
io_lock& operator=(const io_lock&) = delete;
io_lock(io_lock&& o) noexcept: _lock(std::move(o._lock)), _io_monitor(o._io_monitor) {
o._lock.reset();
o._io_monitor = nullptr;
}
io_lock& operator=(io_lock&& o) noexcept {
if (this != &o) {
_lock = std::move(o._lock);
_io_monitor = o._io_monitor;
o._lock.reset();
o._io_monitor = nullptr;
}
return *this;
}
~io_lock() noexcept {
if (_lock && _io_monitor) {
uint64_t buf{};
[[maybe_unused]] const ssize_t ret = ::read(
_io_monitor->_event_fd, &buf, sizeof(counter));
assert(ret != -1);
if (buf == counter) {
_io_monitor->_interrupting = false;
_io_monitor->_interrupt_cv.notify_one();
}
}
}
};
IOMonitor::IOMonitor() : _epoll_fd(epoll_create1(0)), IOMonitor::IOMonitor() : _epoll_fd(epoll_create1(0)),
_event_fd(eventfd(0, EFD_NONBLOCK)) { _event_fd(eventfd(0, EFD_NONBLOCK)) {
if (_epoll_fd < 0) { if (_epoll_fd < 0) {
@ -106,12 +57,7 @@ IOMonitor::IOMonitor() : _epoll_fd(epoll_create1(0)),
throw std::system_error(errno, std::generic_category()); throw std::system_error(errno, std::generic_category());
} }
_fds.emplace(std::piecewise_construct, std::forward_as_tuple(_event_fd), _fds.emplace(_event_fd, nullptr);
std::forward_as_tuple([]() {}, []() {
throw std::runtime_error("event_fd hangup");
}, []() {
throw std::runtime_error("event_fd error");
}));
_io_thread = std::make_unique<std::thread>([this]() { _io_thread = std::make_unique<std::thread>([this]() {
_listen(); _listen();
@ -122,70 +68,100 @@ IOMonitor::~IOMonitor() noexcept {
_stop(); _stop();
if (_event_fd >= 0) if (_event_fd >= 0)
close(_event_fd); ::close(_event_fd);
if (_epoll_fd >= 0) if (_epoll_fd >= 0)
close(_epoll_fd); ::close(_epoll_fd);
} }
void IOMonitor::_listen() { void IOMonitor::_listen() {
std::unique_lock lock(_run_mutex); std::unique_lock lock(_run_mutex);
std::vector<struct epoll_event> events; std::vector<struct epoll_event> events;
if (_is_running)
throw std::runtime_error("IOMonitor double run");
_is_running = true; _is_running = true;
while (_is_running) { while (_is_running) {
if (_interrupting) {
_interrupt_cv.wait(lock, [this]() {
return !_interrupting;
});
if (!_is_running)
break;
}
if (events.size() != _fds.size()) if (events.size() != _fds.size())
events.resize(_fds.size()); events.resize(_fds.size());
int ev_count = ::epoll_wait(_epoll_fd, events.data(), (int) events.size(), -1); int ev_count = ::epoll_wait(_epoll_fd, events.data(), (int) events.size(), -1);
for (int i = 0; i < ev_count; ++i) { for (int i = 0; i < ev_count; ++i) {
const auto& handler = _fds.at(events[i].data.fd); std::shared_ptr<IOHandler> handler;
if (events[i].events & EPOLLIN)
handler.read(); if (events[i].data.fd == _event_fd) {
if (events[i].events & EPOLLHUP) if (events[i].events & EPOLLIN) {
handler.hangup(); lock.unlock();
if (events[i].events & EPOLLERR) /* Wait until done yielding */
handler.error(); const std::lock_guard yield_lock(_yield_mutex);
uint64_t event;
while (-1 != ::eventfd_read(_event_fd, &event)) { }
lock.lock();
}
} else {
try {
handler = _fds.at(events[i].data.fd);
} catch (std::out_of_range& e) {
continue;
}
lock.unlock();
try {
if (events[i].events & EPOLLIN)
handler->read();
if (events[i].events & EPOLLHUP)
handler->hangup();
if (events[i].events & EPOLLERR)
handler->error();
} catch (std::exception& e) {
logPrintf(ERROR, "Unhandled I/O handler error: %s", e.what());
}
lock.lock();
}
} }
} }
} }
void IOMonitor::_stop() noexcept { void IOMonitor::_stop() noexcept {
{ _is_running = false;
[[maybe_unused]] const io_lock lock(this); _yield();
_is_running = false;
}
_io_thread->join(); _io_thread->join();
} }
std::unique_lock<std::mutex> IOMonitor::_yield() noexcept {
/* Prevent listener thread from grabbing lock during yielding */
std::unique_lock yield_lock(_yield_mutex);
std::unique_lock run_lock(_run_mutex, std::try_to_lock);
if (!run_lock.owns_lock()) {
::eventfd_write(_event_fd, 1);
run_lock = std::unique_lock<std::mutex>(_run_mutex);
}
return run_lock;
}
void IOMonitor::add(int fd, IOHandler handler) { void IOMonitor::add(int fd, IOHandler handler) {
[[maybe_unused]] const io_lock lock(this); const auto lock = _yield();
struct epoll_event event{}; struct epoll_event event{};
event.events = EPOLLIN | EPOLLHUP | EPOLLERR; event.events = EPOLLIN | EPOLLHUP | EPOLLERR;
event.data.fd = fd; event.data.fd = fd;
// TODO: EPOLL_CTL_MOD if (!_fds.contains(fd)) {
if (_fds.contains(fd)) if (::epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &event))
throw std::system_error(errno, std::generic_category());
_fds.emplace(fd, std::make_shared<IOHandler>(std::move(handler)));
} else {
throw std::runtime_error("duplicate io fd"); throw std::runtime_error("duplicate io fd");
}
if (::epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &event))
throw std::system_error(errno, std::generic_category());
_fds.emplace(fd, std::move(handler));
} }
void IOMonitor::remove(int fd) noexcept { void IOMonitor::remove(int fd) noexcept {
[[maybe_unused]] const io_lock lock(this); const auto lock = _yield();
::epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr); ::epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
_fds.erase(fd); _fds.erase(fd);
} }

View File

@ -21,9 +21,10 @@
#include <atomic> #include <atomic>
#include <functional> #include <functional>
#include <map> #include <map>
#include <memory>
#include <mutex> #include <mutex>
#include <thread>
#include <condition_variable> #include <condition_variable>
#include <thread>
namespace logid::backend::raw { namespace logid::backend::raw {
struct IOHandler { struct IOHandler {
@ -53,24 +54,21 @@ namespace logid::backend::raw {
void add(int fd, IOHandler handler); void add(int fd, IOHandler handler);
void remove(int fd) noexcept; void remove(int fd) noexcept;
private: private:
void _listen(); // This is a blocking call void _listen(); // This is a blocking call
void _stop() noexcept; void _stop() noexcept;
std::unique_lock<std::mutex> _yield() noexcept;
std::unique_ptr<std::thread> _io_thread; std::unique_ptr<std::thread> _io_thread;
std::map<int, IOHandler> _fds; std::mutex _run_mutex;
mutable std::mutex _run_mutex; std::mutex _yield_mutex;
std::atomic_bool _is_running;
std::atomic_bool _interrupting; std::map<int, std::shared_ptr<IOHandler>> _fds;
std::condition_variable _interrupt_cv; std::atomic_bool _is_running;
const int _epoll_fd; const int _epoll_fd;
const int _event_fd; const int _event_fd;
class io_lock;
}; };
} }

View File

@ -117,11 +117,22 @@ RawDevice::RawDevice(std::string path, const std::shared_ptr<DeviceMonitor>& mon
auto phys = get_phys(_fd); auto phys = get_phys(_fd);
_sub_device = std::regex_match(phys, virtual_path_regex); _sub_device = std::regex_match(phys, virtual_path_regex);
} }
}
void RawDevice::_ready() {
_io_monitor->add(_fd, { _io_monitor->add(_fd, {
[this]() { _readReports(); }, [self_weak = _self]() {
[this]() { _valid = false; }, if (auto self = self_weak.lock())
[this]() { _valid = false; } self->_readReports();
},
[self_weak = _self]() {
if (auto self = self_weak.lock())
self->_valid = false;
},
[self_weak = _self]() {
if (auto self = self_weak.lock())
self->_valid = false;
}
}); });
} }

View File

@ -34,7 +34,16 @@ namespace logid::backend::raw {
class IOMonitor; class IOMonitor;
template <typename T>
class RawDeviceWrapper : public T {
public:
template <typename... Args>
RawDeviceWrapper(Args... args) : T(std::forward<Args>(args)...) { }
};
class RawDevice { class RawDevice {
template <typename>
friend class RawDeviceWrapper;
public: public:
static constexpr int max_data_length = 32; static constexpr int max_data_length = 32;
typedef RawEventHandler EventHandler; typedef RawEventHandler EventHandler;
@ -51,7 +60,14 @@ namespace logid::backend::raw {
BusType bus_type; BusType bus_type;
}; };
RawDevice(std::string path, const std::shared_ptr<DeviceMonitor>& monitor); template <typename... Args>
static std::shared_ptr<RawDevice> make(Args... args) {
auto raw_dev = std::make_shared<RawDeviceWrapper<RawDevice>>(
std::forward<Args>(args)...);
raw_dev->_self = raw_dev;
raw_dev->_ready();
return raw_dev;
}
~RawDevice() noexcept; ~RawDevice() noexcept;
@ -79,6 +95,10 @@ namespace logid::backend::raw {
[[nodiscard]] EventHandlerLock<RawDevice> addEventHandler(RawEventHandler handler); [[nodiscard]] EventHandlerLock<RawDevice> addEventHandler(RawEventHandler handler);
private: private:
RawDevice(std::string path, const std::shared_ptr<DeviceMonitor>& monitor);
void _ready();
void _readReports(); void _readReports();
std::atomic_bool _valid; std::atomic_bool _valid;
@ -91,6 +111,8 @@ namespace logid::backend::raw {
std::shared_ptr<IOMonitor> _io_monitor; std::shared_ptr<IOMonitor> _io_monitor;
std::weak_ptr<RawDevice> _self;
bool _sub_device = false; bool _sub_device = false;
std::shared_ptr<EventHandlerList<RawDevice>> _event_handlers; std::shared_ptr<EventHandlerList<RawDevice>> _event_handlers;