Skip to content

Commit

Permalink
修复管道无效导致poller线程cpu占用太高问题 (ZLMediaKit#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
xia-chu committed Jun 10, 2023
1 parent 67ea918 commit e4744a0
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 19 deletions.
34 changes: 23 additions & 11 deletions src/Poller/EventPoller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,19 @@ EventPoller &EventPoller::Instance() {
return *(EventPollerPool::Instance().getFirstPoller());
}

void EventPoller::addEventPipe() {
SockUtil::setNoBlocked(_pipe.readFD());
SockUtil::setNoBlocked(_pipe.writeFD());

// 添加内部管道事件
if (addEvent(_pipe.readFD(), EventPoller::Event_Read, [this](int event) { onPipeEvent(); }) == -1) {
throw std::runtime_error("Add pipe fd to poller failed");
}
}

EventPoller::EventPoller(std::string name, ThreadPool::Priority priority) {
_name = std::move(name);
_priority = priority;
SockUtil::setNoBlocked(_pipe.readFD());
SockUtil::setNoBlocked(_pipe.writeFD());

#if defined(HAS_EPOLL)
_epoll_fd = epoll_create(EPOLL_SIZE);
Expand All @@ -64,11 +72,7 @@ EventPoller::EventPoller(std::string name, ThreadPool::Priority priority) {
#endif //HAS_EPOLL
_logger = Logger::Instance().shared_from_this();
_loop_thread_id = this_thread::get_id();

//添加内部管道事件
if (addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) == -1) {
throw std::runtime_error("Add pipe fd to poller failed");
}
addEventPipe();
}

void EventPoller::shutdown() {
Expand Down Expand Up @@ -221,12 +225,20 @@ bool EventPoller::isCurrentThread() {
inline void EventPoller::onPipeEvent() {
char buf[1024];
int err = 0;
do {
if (_pipe.read(buf, sizeof(buf)) > 0) {
while (true) {
if ((err = _pipe.read(buf, sizeof(buf))) > 0) {
// 读到管道数据,继续读,直到读空为止
continue;
}
err = get_uv_error(true);
} while (err == UV_EINTR);
if (err == 0 || get_uv_error(true) != UV_EAGAIN) {
// 收到eof或非EAGAIN(无更多数据)错误,说明管道无效了,重新打开管道
ErrorL << "Invalid pipe fd of event poller, reopen it";
delEvent(_pipe.readFD());
_pipe.reOpen();
addEventPipe();
}
break;
}

decltype(_list_task) _list_swap;
{
Expand Down
5 changes: 5 additions & 0 deletions src/Poller/EventPoller.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_s
*/
uint64_t getMinDelay();

/**
* 添加管道监听事件
*/
void addEventPipe();

private:
class ExitException : public std::exception {};

Expand Down
7 changes: 6 additions & 1 deletion src/Poller/PipeWrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ using namespace std;
namespace toolkit {

PipeWrap::PipeWrap() {
reOpen();
}

void PipeWrap::reOpen() {
clearFD();
#if defined(_WIN32)
const char *localip = SockUtil::support_ipv6() ? "::1" : "127.0.0.1";
auto listener_fd = SockUtil::listen(0, localip);
Expand All @@ -48,7 +53,7 @@ PipeWrap::PipeWrap() {
if (pipe(_pipe_fd) == -1) {
throw runtime_error(StrPrinter << "Create posix pipe failed: " << get_uv_errmsg());
}
#endif // defined(_WIN32)
#endif // defined(_WIN32)
SockUtil::setNoBlocked(_pipe_fd[0], true);
SockUtil::setNoBlocked(_pipe_fd[1], false);
SockUtil::setCloExec(_pipe_fd[0]);
Expand Down
11 changes: 4 additions & 7 deletions src/Poller/PipeWrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ class PipeWrap {
~PipeWrap();
int write(const void *buf, int n);
int read(void *buf, int n);
int readFD() const {
return _pipe_fd[0];
}
int writeFD() const {
return _pipe_fd[1];
}
int readFD() const { return _pipe_fd[0]; }
int writeFD() const { return _pipe_fd[1]; }
void reOpen();

private:
void clearFD();

private:
int _pipe_fd[2] = {-1, -1};
int _pipe_fd[2] = { -1, -1 };
};

} /* namespace toolkit */
Expand Down

0 comments on commit e4744a0

Please sign in to comment.