Add WL_SOCKET_CLOSED for socket shutdown events.

Provide a way for WaitEventSet to report that the remote peer has shut
down its socket, independently of whether there is any buffered data
remaining to be read.  This works only on systems where the kernel
exposes that information, namely:

* WAIT_USE_POLL builds using POLLRDHUP, if available
* WAIT_USE_EPOLL builds using EPOLLRDHUP
* WAIT_USE_KQUEUE builds using EV_EOF

Reviewed-by: Zhihong Yu <zyu@yugabyte.com>
Reviewed-by: Maksim Milyutin <milyutinma@gmail.com>
Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru
This commit is contained in:
Thomas Munro 2022-02-14 16:29:28 +13:00
parent 5e01001ffb
commit 50e570a59e
2 changed files with 74 additions and 11 deletions

View File

@ -840,6 +840,7 @@ FreeWaitEventSet(WaitEventSet *set)
* - WL_SOCKET_CONNECTED: Wait for socket connection to be established,
* can be combined with other WL_SOCKET_* events (on non-Windows
* platforms, this is the same as WL_SOCKET_WRITEABLE)
* - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
* - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
*
* Returns the offset in WaitEventSet->events (starting from 0), which can be
@ -1042,12 +1043,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
else
{
Assert(event->fd != PGINVALID_SOCKET);
Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
Assert(event->events & (WL_SOCKET_READABLE |
WL_SOCKET_WRITEABLE |
WL_SOCKET_CLOSED));
if (event->events & WL_SOCKET_READABLE)
epoll_ev.events |= EPOLLIN;
if (event->events & WL_SOCKET_WRITEABLE)
epoll_ev.events |= EPOLLOUT;
if (event->events & WL_SOCKET_CLOSED)
epoll_ev.events |= EPOLLRDHUP;
}
/*
@ -1086,12 +1091,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
}
else
{
Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
Assert(event->events & (WL_SOCKET_READABLE |
WL_SOCKET_WRITEABLE |
WL_SOCKET_CLOSED));
pollfd->events = 0;
if (event->events & WL_SOCKET_READABLE)
pollfd->events |= POLLIN;
if (event->events & WL_SOCKET_WRITEABLE)
pollfd->events |= POLLOUT;
#ifdef POLLRDHUP
if (event->events & WL_SOCKET_CLOSED)
pollfd->events |= POLLRDHUP;
#endif
}
Assert(event->fd != PGINVALID_SOCKET);
@ -1164,7 +1175,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
Assert(event->events != WL_LATCH_SET || set->latch != NULL);
Assert(event->events == WL_LATCH_SET ||
event->events == WL_POSTMASTER_DEATH ||
(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
(event->events & (WL_SOCKET_READABLE |
WL_SOCKET_WRITEABLE |
WL_SOCKET_CLOSED)));
if (event->events == WL_POSTMASTER_DEATH)
{
@ -1187,9 +1200,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
* old event mask to the new event mask, since kevent treats readable
* and writable as separate events.
*/
if (old_events & WL_SOCKET_READABLE)
if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
old_filt_read = true;
if (event->events & WL_SOCKET_READABLE)
if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
new_filt_read = true;
if (old_events & WL_SOCKET_WRITEABLE)
old_filt_write = true;
@ -1209,7 +1222,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
event);
}
Assert(count > 0);
/* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */
if (count == 0)
return;
Assert(count <= 2);
rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
@ -1524,7 +1540,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
returned_events++;
}
}
else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
else if (cur_event->events & (WL_SOCKET_READABLE |
WL_SOCKET_WRITEABLE |
WL_SOCKET_CLOSED))
{
Assert(cur_event->fd != PGINVALID_SOCKET);
@ -1542,6 +1560,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
occurred_events->events |= WL_SOCKET_WRITEABLE;
}
if ((cur_event->events & WL_SOCKET_CLOSED) &&
(cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)))
{
/* remote peer shut down, or error */
occurred_events->events |= WL_SOCKET_CLOSED;
}
if (occurred_events->events != 0)
{
occurred_events->fd = cur_event->fd;
@ -1667,7 +1692,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
occurred_events++;
returned_events++;
}
else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
else if (cur_event->events & (WL_SOCKET_READABLE |
WL_SOCKET_WRITEABLE |
WL_SOCKET_CLOSED))
{
Assert(cur_event->fd >= 0);
@ -1678,6 +1705,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
occurred_events->events |= WL_SOCKET_READABLE;
}
if ((cur_event->events & WL_SOCKET_CLOSED) &&
(cur_kqueue_event->filter == EVFILT_READ) &&
(cur_kqueue_event->flags & EV_EOF))
{
/* the remote peer has shut down */
occurred_events->events |= WL_SOCKET_CLOSED;
}
if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
(cur_kqueue_event->filter == EVFILT_WRITE))
{
@ -1788,7 +1823,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
returned_events++;
}
}
else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
else if (cur_event->events & (WL_SOCKET_READABLE |
WL_SOCKET_WRITEABLE |
WL_SOCKET_CLOSED))
{
int errflags = POLLHUP | POLLERR | POLLNVAL;
@ -1808,6 +1845,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
occurred_events->events |= WL_SOCKET_WRITEABLE;
}
#ifdef POLLRDHUP
if ((cur_event->events & WL_SOCKET_CLOSED) &&
(cur_pollfd->revents & (POLLRDHUP | errflags)))
{
/* remote peer closed, or error */
occurred_events->events |= WL_SOCKET_CLOSED;
}
#endif
if (occurred_events->events != 0)
{
occurred_events->fd = cur_event->fd;
@ -2014,6 +2060,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
}
#endif
/*
* Return whether the current build options can report WL_SOCKET_CLOSED.
*/
bool
WaitEventSetCanReportClosed(void)
{
#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \
defined(WAIT_USE_EPOLL) || \
defined(WAIT_USE_KQUEUE)
return true;
#else
return false;
#endif
}
/*
* Get the number of wait events registered in a given WaitEventSet.
*/

View File

@ -134,10 +134,11 @@ typedef struct Latch
/* avoid having to deal with case on platforms not requiring it */
#define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE
#endif
#define WL_SOCKET_CLOSED (1 << 7)
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \
WL_SOCKET_WRITEABLE | \
WL_SOCKET_CONNECTED)
WL_SOCKET_CONNECTED | \
WL_SOCKET_CLOSED)
typedef struct WaitEvent
{
@ -180,5 +181,6 @@ extern int WaitLatchOrSocket(Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
extern void InitializeLatchWaitSet(void);
extern int GetNumRegisteredWaitEvents(WaitEventSet *set);
extern bool WaitEventSetCanReportClosed(void);
#endif /* LATCH_H */