mirror of
https://github.com/PixlOne/logiops.git
synced 2025-07-13 21:02:43 +08:00
Use RAII for IOMonitor locks
May solve faulty IO monitor locking and solve #374.
This commit is contained in:
parent
30ade71edf
commit
4ae58b81a3
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2022 PixlOne
|
* Copyright 2019-2023 PixlOne
|
||||||
*
|
*
|
||||||
* This program is free software: you can redistribute it and/or modify
|
* This program is free software: you can redistribute it and/or modify
|
||||||
* it under the terms of the GNU General Public License as published by
|
* it under the terms of the GNU General Public License as published by
|
||||||
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
#include <backend/raw/IOMonitor.h>
|
#include <backend/raw/IOMonitor.h>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
#include <optional>
|
||||||
|
|
||||||
extern "C"
|
extern "C"
|
||||||
{
|
{
|
||||||
@ -35,6 +36,55 @@ IOHandler::IOHandler(std::function<void()> r,
|
|||||||
error(std::move(err)) {
|
error(std::move(err)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class IOMonitor::io_lock {
|
||||||
|
std::optional<std::unique_lock<std::mutex>> _lock;
|
||||||
|
IOMonitor* _io_monitor;
|
||||||
|
const uint64_t counter = 1;
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit io_lock(IOMonitor* io_monitor) : _io_monitor(io_monitor) {
|
||||||
|
_io_monitor->_interrupting = true;
|
||||||
|
[[maybe_unused]] ssize_t ret = ::write(_io_monitor->_event_fd, &counter, sizeof(counter));
|
||||||
|
assert(ret == sizeof(counter));
|
||||||
|
_lock.emplace(_io_monitor->_run_mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
io_lock(const io_lock&) = delete;
|
||||||
|
|
||||||
|
io_lock& operator=(const io_lock&) = delete;
|
||||||
|
|
||||||
|
io_lock(io_lock&& o) noexcept: _lock(std::move(o._lock)), _io_monitor(o._io_monitor) {
|
||||||
|
o._lock.reset();
|
||||||
|
o._io_monitor = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
io_lock& operator=(io_lock&& o) noexcept {
|
||||||
|
if (this != &o) {
|
||||||
|
_lock = std::move(o._lock);
|
||||||
|
_io_monitor = o._io_monitor;
|
||||||
|
o._lock.reset();
|
||||||
|
o._io_monitor = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
~io_lock() noexcept {
|
||||||
|
if (_lock && _io_monitor) {
|
||||||
|
uint64_t buf{};
|
||||||
|
[[maybe_unused]] const ssize_t ret = ::read(
|
||||||
|
_io_monitor->_event_fd, &buf, sizeof(counter));
|
||||||
|
|
||||||
|
assert(ret != -1);
|
||||||
|
|
||||||
|
if (buf == counter) {
|
||||||
|
_io_monitor->_interrupting = false;
|
||||||
|
_io_monitor->_interrupt_cv.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
IOMonitor::IOMonitor() : _epoll_fd(epoll_create1(0)),
|
IOMonitor::IOMonitor() : _epoll_fd(epoll_create1(0)),
|
||||||
_event_fd(eventfd(0, EFD_NONBLOCK)) {
|
_event_fd(eventfd(0, EFD_NONBLOCK)) {
|
||||||
if (_epoll_fd < 0) {
|
if (_epoll_fd < 0) {
|
||||||
@ -69,7 +119,6 @@ IOMonitor::IOMonitor() : _epoll_fd(epoll_create1(0)),
|
|||||||
}
|
}
|
||||||
|
|
||||||
IOMonitor::~IOMonitor() noexcept {
|
IOMonitor::~IOMonitor() noexcept {
|
||||||
std::lock_guard<std::mutex> ctl_lock(_ctl_lock);
|
|
||||||
_stop();
|
_stop();
|
||||||
|
|
||||||
if (_event_fd >= 0)
|
if (_event_fd >= 0)
|
||||||
@ -80,23 +129,21 @@ IOMonitor::~IOMonitor() noexcept {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void IOMonitor::_listen() {
|
void IOMonitor::_listen() {
|
||||||
std::lock_guard<std::mutex> run_lock(_run_lock);
|
std::unique_lock lock(_run_mutex);
|
||||||
std::vector<struct epoll_event> events;
|
std::vector<struct epoll_event> events;
|
||||||
|
|
||||||
_is_running = true;
|
_is_running = true;
|
||||||
|
|
||||||
while (_is_running) {
|
while (_is_running) {
|
||||||
if (_interrupting) {
|
if (_interrupting) {
|
||||||
std::unique_lock<std::mutex> lock(_interrupt_mutex);
|
|
||||||
_interrupt_cv.wait(lock, [this]() {
|
_interrupt_cv.wait(lock, [this]() {
|
||||||
return !(bool) _interrupting;
|
return !_interrupting;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!_is_running)
|
if (!_is_running)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::lock_guard<std::mutex> io_lock(_io_lock);
|
|
||||||
if (events.size() != _fds.size())
|
if (events.size() != _fds.size())
|
||||||
events.resize(_fds.size());
|
events.resize(_fds.size());
|
||||||
int ev_count = ::epoll_wait(_epoll_fd, events.data(), (int) events.size(), -1);
|
int ev_count = ::epoll_wait(_epoll_fd, events.data(), (int) events.size(), -1);
|
||||||
@ -113,76 +160,32 @@ void IOMonitor::_listen() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void IOMonitor::_stop() noexcept {
|
void IOMonitor::_stop() noexcept {
|
||||||
_interrupt();
|
{
|
||||||
_is_running = false;
|
[[maybe_unused]] const io_lock lock(this);
|
||||||
_continue();
|
_is_running = false;
|
||||||
|
}
|
||||||
_io_thread->join();
|
_io_thread->join();
|
||||||
}
|
}
|
||||||
|
|
||||||
[[maybe_unused]]
|
|
||||||
bool IOMonitor::_running() const {
|
|
||||||
std::unique_lock<std::mutex> run_lock(_run_lock, std::try_to_lock);
|
|
||||||
return !run_lock.owns_lock() || _is_running;
|
|
||||||
}
|
|
||||||
|
|
||||||
void IOMonitor::add(int fd, IOHandler handler) {
|
void IOMonitor::add(int fd, IOHandler handler) {
|
||||||
std::lock_guard<std::mutex> lock(_ctl_lock);
|
[[maybe_unused]] const io_lock lock(this);
|
||||||
_interrupt();
|
|
||||||
|
|
||||||
struct epoll_event event{};
|
struct epoll_event event{};
|
||||||
event.events = EPOLLIN | EPOLLHUP | EPOLLERR;
|
event.events = EPOLLIN | EPOLLHUP | EPOLLERR;
|
||||||
event.data.fd = fd;
|
event.data.fd = fd;
|
||||||
|
|
||||||
// TODO: EPOLL_CTL_MOD
|
// TODO: EPOLL_CTL_MOD
|
||||||
if (_fds.contains(fd)) {
|
if (_fds.contains(fd))
|
||||||
_continue();
|
|
||||||
throw std::runtime_error("duplicate io fd");
|
throw std::runtime_error("duplicate io fd");
|
||||||
}
|
|
||||||
|
|
||||||
if (::epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &event)) {
|
if (::epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &event))
|
||||||
_continue();
|
|
||||||
throw std::system_error(errno, std::generic_category());
|
throw std::system_error(errno, std::generic_category());
|
||||||
}
|
|
||||||
_fds.emplace(fd, std::move(handler));
|
_fds.emplace(fd, std::move(handler));
|
||||||
|
|
||||||
_continue();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void IOMonitor::remove(int fd) noexcept {
|
void IOMonitor::remove(int fd) noexcept {
|
||||||
std::lock_guard<std::mutex> lock(_ctl_lock);
|
[[maybe_unused]] const io_lock lock(this);
|
||||||
_interrupt();
|
|
||||||
std::lock_guard<std::mutex> io_lock(_io_lock);
|
|
||||||
|
|
||||||
::epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
|
::epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
|
||||||
_fds.erase(fd);
|
_fds.erase(fd);
|
||||||
|
}
|
||||||
_continue();
|
|
||||||
}
|
|
||||||
|
|
||||||
void IOMonitor::_interrupt() noexcept {
|
|
||||||
std::unique_lock<std::mutex> run_lock(_run_lock, std::try_to_lock);
|
|
||||||
|
|
||||||
_interrupting = true;
|
|
||||||
|
|
||||||
uint64_t counter = 1;
|
|
||||||
[[maybe_unused]] ssize_t ret = ::write(_event_fd, &counter, sizeof(counter));
|
|
||||||
assert(ret == sizeof(counter));
|
|
||||||
|
|
||||||
// Wait for the IO monitor to _stop
|
|
||||||
std::lock_guard<std::mutex> io_lock(_io_lock);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
void IOMonitor::_continue() noexcept {
|
|
||||||
std::unique_lock<std::mutex> run_lock(_run_lock, std::try_to_lock);
|
|
||||||
|
|
||||||
uint64_t counter;
|
|
||||||
[[maybe_unused]] ssize_t ret = ::read(_event_fd, &counter, sizeof(counter));
|
|
||||||
|
|
||||||
assert(ret != -1);
|
|
||||||
|
|
||||||
if (counter == 1) {
|
|
||||||
_interrupting = false;
|
|
||||||
_interrupt_cv.notify_all();
|
|
||||||
}
|
|
||||||
}
|
|
@ -40,6 +40,14 @@ namespace logid::backend::raw {
|
|||||||
public:
|
public:
|
||||||
IOMonitor();
|
IOMonitor();
|
||||||
|
|
||||||
|
IOMonitor(IOMonitor&&) = delete;
|
||||||
|
|
||||||
|
IOMonitor(const IOMonitor&) = delete;
|
||||||
|
|
||||||
|
IOMonitor& operator=(IOMonitor&&) = delete;
|
||||||
|
|
||||||
|
IOMonitor& operator=(const IOMonitor&) = delete;
|
||||||
|
|
||||||
~IOMonitor() noexcept;
|
~IOMonitor() noexcept;
|
||||||
|
|
||||||
void add(int fd, IOHandler handler);
|
void add(int fd, IOHandler handler);
|
||||||
@ -50,26 +58,19 @@ namespace logid::backend::raw {
|
|||||||
void _listen(); // This is a blocking call
|
void _listen(); // This is a blocking call
|
||||||
void _stop() noexcept;
|
void _stop() noexcept;
|
||||||
|
|
||||||
[[maybe_unused]]
|
|
||||||
[[nodiscard]] bool _running() const;
|
|
||||||
|
|
||||||
void _interrupt() noexcept;
|
|
||||||
|
|
||||||
void _continue() noexcept;
|
|
||||||
|
|
||||||
std::unique_ptr<std::thread> _io_thread;
|
std::unique_ptr<std::thread> _io_thread;
|
||||||
|
|
||||||
std::map<int, IOHandler> _fds;
|
std::map<int, IOHandler> _fds;
|
||||||
std::mutex _io_lock, _ctl_lock;
|
mutable std::mutex _run_mutex;
|
||||||
mutable std::mutex _run_lock;
|
|
||||||
std::atomic_bool _is_running;
|
std::atomic_bool _is_running;
|
||||||
|
|
||||||
std::atomic_bool _interrupting;
|
std::atomic_bool _interrupting;
|
||||||
std::mutex _interrupt_mutex;
|
|
||||||
std::condition_variable _interrupt_cv;
|
std::condition_variable _interrupt_cv;
|
||||||
|
|
||||||
const int _epoll_fd;
|
const int _epoll_fd;
|
||||||
const int _event_fd;
|
const int _event_fd;
|
||||||
|
|
||||||
|
class io_lock;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user