From e4f95cfeb37a6547957d57ac4522af0e8d412f35 Mon Sep 17 00:00:00 2001 From: wangshaoyi Date: Mon, 19 May 2025 11:04:09 +0800 Subject: [PATCH] fix connections overflow when command processing takes too long and client close connection --- src/net/include/net_define.h | 3 ++- src/net/src/net_epoll.cc | 14 +++++++++++++- src/net/src/worker_thread.cc | 9 ++++++++- tests/integration/pubsub_test.go | 5 ++++- 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/net/include/net_define.h b/src/net/include/net_define.h index 4ec16cc4e3..45c8367471 100644 --- a/src/net/include/net_define.h +++ b/src/net/include/net_define.h @@ -48,7 +48,8 @@ enum EventStatus { kNone = 0, kReadable = 0x1, kWritable = 0x1 << 1, - kErrorEvent = 0x1 << 2, + kPeerClose = 0x1 << 2, + kErrorEvent = 0x1 << 3, }; enum ConnStatus { diff --git a/src/net/src/net_epoll.cc b/src/net/src/net_epoll.cc index 2215a62764..752e83a38e 100644 --- a/src/net/src/net_epoll.cc +++ b/src/net/src/net_epoll.cc @@ -14,6 +14,10 @@ #include "net/include/net_define.h" #include "pstd/include/xdebug.h" +#ifndef EPOLLRDHUP +#define EPOLLRDHUP 0x2000 +#endif + namespace net { NetMultiplexer* CreateNetMultiplexer(int limit) { return new NetEpoll(limit); } @@ -45,7 +49,7 @@ int NetEpoll::NetAddEvent(int fd, int mask) { } if (mask & kWritable) { ee.events |= EPOLLOUT; - } + } return epoll_ctl(multiplexer_, EPOLL_CTL_ADD, fd, &ee); } @@ -62,6 +66,10 @@ int NetEpoll::NetModEvent(int fd, int old_mask, int mask) { if ((old_mask | mask) & kWritable) { ee.events |= EPOLLOUT; } + + if ((old_mask | mask) & kPeerClose) { + ee.events |= EPOLLRDHUP; + } return epoll_ctl(multiplexer_, EPOLL_CTL_MOD, fd, &ee); } @@ -93,6 +101,10 @@ int NetEpoll::NetPoll(int timeout) { ev.mask |= kWritable; } + if (events_[i].events & EPOLLRDHUP) { + ev.mask |= kPeerClose; + } + if (events_[i].events & (EPOLLERR | EPOLLHUP)) { ev.mask |= kErrorEvent; } diff --git a/src/net/src/worker_thread.cc b/src/net/src/worker_thread.cc index c4735f46b4..c4fb3694d6 100644 --- a/src/net/src/worker_thread.cc +++ b/src/net/src/worker_thread.cc @@ -189,7 +189,7 @@ void* WorkerThread::ThreadMain() { ReadStatus read_status = in_conn->GetRequest(); in_conn->set_last_interaction(now); if (read_status == kReadAll) { - net_multiplexer_->NetModEvent(pfe->fd, 0, 0); + net_multiplexer_->NetModEvent(pfe->fd, 0, kPeerClose); // Wait for the conn complete asynchronous task and // Mod Event to kWritable } else if (read_status == kReadHalf) { @@ -199,8 +199,15 @@ void* WorkerThread::ThreadMain() { } } + if ((should_close == 0) && ((pfe->mask & kPeerClose) != 0)) { + should_close = 1; + } + if (((pfe->mask & kErrorEvent) != 0) || (should_close != 0)) { net_multiplexer_->NetDelEvent(pfe->fd, 0); + // TODO: in_conn may live longer than fd. + // eg. in_conn are being transferred to net_pubsub + // while peer client closing this connection CloseFd(in_conn); in_conn = nullptr; { diff --git a/tests/integration/pubsub_test.go b/tests/integration/pubsub_test.go index fd167da01a..62492e69ee 100644 --- a/tests/integration/pubsub_test.go +++ b/tests/integration/pubsub_test.go @@ -39,7 +39,10 @@ var _ = Describe("PubSub", func() { It("implements Stringer", func() { pubsub := client.PSubscribe(ctx, "mychannel*") - defer pubsub.Close() + defer func() { + time.Sleep(100 * time.Millisecond) + pubsub.Close() + }() Expect(pubsub.String()).To(Equal("PubSub(mychannel*)")) })