diff --git a/configure b/configure index c10d9549a4..24655dc096 100755 --- a/configure +++ b/configure @@ -10193,7 +10193,7 @@ fi ## Header files ## -for ac_header in atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h +for ac_header in atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/epoll.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h do : as_ac_Header=`$as_echo "ac_cv_header_$ac_header" | $as_tr_sh` ac_fn_c_check_header_mongrel "$LINENO" "$ac_header" "$as_ac_Header" "$ac_includes_default" diff --git a/configure.in b/configure.in index 47d0f584de..c564a7695f 100644 --- a/configure.in +++ b/configure.in @@ -1183,7 +1183,7 @@ AC_SUBST(UUID_LIBS) ## dnl sys/socket.h is required by AC_FUNC_ACCEPT_ARGTYPES -AC_CHECK_HEADERS([atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h]) +AC_CHECK_HEADERS([atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/epoll.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h]) # On BSD, test for net/if.h will fail unless sys/socket.h # is included first. diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index ac709d1d1b..29297e7299 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -140,13 +140,13 @@ retry: /* In blocking mode, wait until the socket is ready */ if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN)) { - int w; + WaitEvent event; Assert(waitfor); - w = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor, - port->sock, 0); + ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL); + + WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */, &event, 1); /* * If the postmaster has died, it's not safe to continue running, @@ -165,13 +165,13 @@ retry: * cycles checking for this very rare condition, and this should cause * us to exit quickly in most cases.) */ - if (w & WL_POSTMASTER_DEATH) + if (event.events & WL_POSTMASTER_DEATH) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating connection due to unexpected postmaster exit"))); /* Handle interrupt. */ - if (w & WL_LATCH_SET) + if (event.events & WL_LATCH_SET) { ResetLatch(MyLatch); ProcessClientReadInterrupt(true); @@ -241,22 +241,22 @@ retry: if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN)) { - int w; + WaitEvent event; Assert(waitfor); - w = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor, - port->sock, 0); + ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL); + + WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */, &event, 1); /* See comments in secure_read. */ - if (w & WL_POSTMASTER_DEATH) + if (event.events & WL_POSTMASTER_DEATH) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating connection due to unexpected postmaster exit"))); /* Handle interrupt. */ - if (w & WL_LATCH_SET) + if (event.events & WL_LATCH_SET) { ResetLatch(MyLatch); ProcessClientWriteInterrupt(true); diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 71473db997..acd005eb97 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -201,6 +201,11 @@ pq_init(void) (errmsg("could not set socket to nonblocking mode: %m"))); #endif + FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3); + AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, + NULL, NULL); + AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL); + AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL); } /* -------------------------------- diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index d42c9c6fdf..42c2f52f25 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -14,8 +14,8 @@ * however reliably interrupts the sleep, and causes select() to return * immediately even if the signal arrives before select() begins. * - * (Actually, we prefer poll() over select() where available, but the - * same comments apply to it.) + * (Actually, we prefer epoll_wait() over poll() over select() where + * available, but the same comments apply.) * * When SetLatch is called from the same process that owns the latch, * SetLatch writes the byte directly to the pipe. If it's owned by another @@ -41,6 +41,9 @@ #include #include #include +#ifdef HAVE_SYS_EPOLL_H +#include +#endif #ifdef HAVE_POLL_H #include #endif @@ -65,19 +68,60 @@ * useful to manually specify the used primitive. If desired, just add a * define somewhere before this block. */ -#if defined(LATCH_USE_POLL) || defined(LATCH_USE_SELECT) \ - || defined(LATCH_USE_WIN32) +#if defined(WAIT_USE_EPOLL) || defined(WAIT_USE_POLL) || \ + defined(WAIT_USE_SELECT) || defined(WAIT_USE_WIN32) /* don't overwrite manual choice */ +#elif defined(HAVE_SYS_EPOLL_H) +#define WAIT_USE_EPOLL #elif defined(HAVE_POLL) -#define LATCH_USE_POLL +#define WAIT_USE_POLL #elif HAVE_SYS_SELECT_H -#define LATCH_USE_SELECT +#define WAIT_USE_SELECT #elif WIN32 -#define LATCH_USE_WIN32 +#define WAIT_USE_WIN32 #else -#error "no latch implementation available" +#error "no wait set implementation available" #endif +/* typedef in latch.h */ +struct WaitEventSet +{ + int nevents; /* number of registered events */ + int nevents_space; /* maximum number of events in this set */ + + /* + * Array, of nevents_space length, storing the definition of events this + * set is waiting for. + */ + WaitEvent *events; + + /* + * If WL_LATCH_SET is specified in any wait event, latch is a pointer to + * said latch, and latch_pos the offset in the ->events array. This is + * useful because we check the state of the latch before performing doing + * syscalls related to waiting. + */ + Latch *latch; + int latch_pos; + +#if defined(WAIT_USE_EPOLL) + int epoll_fd; + /* epoll_wait returns events in a user provided arrays, allocate once */ + struct epoll_event *epoll_ret_events; +#elif defined(WAIT_USE_POLL) + /* poll expects events to be waited on every poll() call, prepare once */ + struct pollfd *pollfds; +#elif defined(WAIT_USE_WIN32) + + /* + * Array of windows events. The first element always contains + * pgwin32_signal_event, so the remaining elements are offset by one (i.e. + * event->pos + 1). + */ + HANDLE *handles; +#endif +}; + #ifndef WIN32 /* Are we currently in WaitLatch? The signal handler would like to know. */ static volatile sig_atomic_t waiting = false; @@ -91,6 +135,16 @@ static void sendSelfPipeByte(void); static void drainSelfPipe(void); #endif /* WIN32 */ +#if defined(WAIT_USE_EPOLL) +static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action); +#elif defined(WAIT_USE_POLL) +static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event); +#elif defined(WAIT_USE_WIN32) +static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event); +#endif + +static int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, + WaitEvent *occurred_events, int nevents); /* * Initialize the process-local latch infrastructure. @@ -255,531 +309,57 @@ WaitLatch(volatile Latch *latch, int wakeEvents, long timeout) * When waiting on a socket, EOF and error conditions are reported by * returning the socket as readable/writable or both, depending on * WL_SOCKET_READABLE/WL_SOCKET_WRITEABLE being specified. + * + * NB: These days this is just a wrapper around the WaitEventSet API. When + * using a latch very frequently, consider creating a longer living + * WaitEventSet instead; that's more efficient. */ -#ifndef LATCH_USE_WIN32 int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout) { - int result = 0; + int ret = 0; int rc; - instr_time start_time, - cur_time; - long cur_timeout; + WaitEvent event; + WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3); -#if defined(LATCH_USE_POLL) - struct pollfd pfds[3]; - int nfds; -#elif defined(LATCH_USE_SELECT) - struct timeval tv, - *tvp; - fd_set input_mask; - fd_set output_mask; - int hifd; -#endif - - Assert(wakeEvents != 0); /* must have at least one wake event */ - - /* waiting for socket readiness without a socket indicates a bug */ - if (sock == PGINVALID_SOCKET && - (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0) - elog(ERROR, "cannot wait on socket event without a socket"); - - if ((wakeEvents & WL_LATCH_SET) && latch->owner_pid != MyProcPid) - elog(ERROR, "cannot wait on a latch owned by another process"); - - /* - * Initialize timeout if requested. We must record the current time so - * that we can determine the remaining timeout if the poll() or select() - * is interrupted. (On some platforms, select() will update the contents - * of "tv" for us, but unfortunately we can't rely on that.) - */ if (wakeEvents & WL_TIMEOUT) - { - INSTR_TIME_SET_CURRENT(start_time); - Assert(timeout >= 0 && timeout <= INT_MAX); - cur_timeout = timeout; - -#ifdef LATCH_USE_SELECT - tv.tv_sec = cur_timeout / 1000L; - tv.tv_usec = (cur_timeout % 1000L) * 1000L; - tvp = &tv; -#endif - } + Assert(timeout >= 0); else - { - cur_timeout = -1; + timeout = -1; -#ifdef LATCH_USE_SELECT - tvp = NULL; -#endif - } + if (wakeEvents & WL_LATCH_SET) + AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET, + (Latch *) latch, NULL); - waiting = true; - do - { - /* - * Check if the latch is set already. If so, leave loop immediately, - * avoid blocking again. We don't attempt to report any other events - * that might also be satisfied. - * - * If someone sets the latch between this and the poll()/select() - * below, the setter will write a byte to the pipe (or signal us and - * the signal handler will do that), and the poll()/select() will - * return immediately. - * - * If there's a pending byte in the self pipe, we'll notice whenever - * blocking. Only clearing the pipe in that case avoids having to - * drain it every time WaitLatchOrSocket() is used. Should the - * pipe-buffer fill up we're still ok, because the pipe is in - * nonblocking mode. It's unlikely for that to happen, because the - * self pipe isn't filled unless we're blocking (waiting = true), or - * from inside a signal handler in latch_sigusr1_handler(). - * - * Note: we assume that the kernel calls involved in drainSelfPipe() - * and SetLatch() will provide adequate synchronization on machines - * with weak memory ordering, so that we cannot miss seeing is_set if - * the signal byte is already in the pipe when we drain it. - */ - if ((wakeEvents & WL_LATCH_SET) && latch->is_set) - { - result |= WL_LATCH_SET; - break; - } + if (wakeEvents & WL_POSTMASTER_DEATH) + AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, + NULL, NULL); - /* - * Must wait ... we use the polling interface determined at the top of - * this file to do so. - */ -#if defined(LATCH_USE_POLL) - nfds = 0; - - /* selfpipe is always in pfds[0] */ - pfds[0].fd = selfpipe_readfd; - pfds[0].events = POLLIN; - pfds[0].revents = 0; - nfds++; - - if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) - { - /* socket, if used, is always in pfds[1] */ - pfds[1].fd = sock; - pfds[1].events = 0; - if (wakeEvents & WL_SOCKET_READABLE) - pfds[1].events |= POLLIN; - if (wakeEvents & WL_SOCKET_WRITEABLE) - pfds[1].events |= POLLOUT; - pfds[1].revents = 0; - nfds++; - } - - if (wakeEvents & WL_POSTMASTER_DEATH) - { - /* postmaster fd, if used, is always in pfds[nfds - 1] */ - pfds[nfds].fd = postmaster_alive_fds[POSTMASTER_FD_WATCH]; - pfds[nfds].events = POLLIN; - pfds[nfds].revents = 0; - nfds++; - } - - /* Sleep */ - rc = poll(pfds, nfds, (int) cur_timeout); - - /* Check return code */ - if (rc < 0) - { - /* EINTR is okay, otherwise complain */ - if (errno != EINTR) - { - waiting = false; - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("poll() failed: %m"))); - } - } - else if (rc == 0) - { - /* timeout exceeded */ - if (wakeEvents & WL_TIMEOUT) - result |= WL_TIMEOUT; - } - else - { - /* at least one event occurred, so check revents values */ - - if (pfds[0].revents & POLLIN) - { - /* There's data in the self-pipe, clear it. */ - drainSelfPipe(); - } - - if ((wakeEvents & WL_SOCKET_READABLE) && - (pfds[1].revents & POLLIN)) - { - /* data available in socket, or EOF/error condition */ - result |= WL_SOCKET_READABLE; - } - if ((wakeEvents & WL_SOCKET_WRITEABLE) && - (pfds[1].revents & POLLOUT)) - { - /* socket is writable */ - result |= WL_SOCKET_WRITEABLE; - } - if ((wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) && - (pfds[1].revents & (POLLHUP | POLLERR | POLLNVAL))) - { - /* EOF/error condition */ - if (wakeEvents & WL_SOCKET_READABLE) - result |= WL_SOCKET_READABLE; - if (wakeEvents & WL_SOCKET_WRITEABLE) - result |= WL_SOCKET_WRITEABLE; - } - - /* - * We expect a POLLHUP when the remote end is closed, but because - * we don't expect the pipe to become readable or to have any - * errors either, treat those cases as postmaster death, too. - */ - if ((wakeEvents & WL_POSTMASTER_DEATH) && - (pfds[nfds - 1].revents & (POLLHUP | POLLIN | POLLERR | POLLNVAL))) - { - /* - * According to the select(2) man page on Linux, select(2) may - * spuriously return and report a file descriptor as readable, - * when it's not; and presumably so can poll(2). It's not - * clear that the relevant cases would ever apply to the - * postmaster pipe, but since the consequences of falsely - * returning WL_POSTMASTER_DEATH could be pretty unpleasant, - * we take the trouble to positively verify EOF with - * PostmasterIsAlive(). - */ - if (!PostmasterIsAlive()) - result |= WL_POSTMASTER_DEATH; - } - } -#elif defined(LATCH_USE_SELECT) - - /* - * On at least older linux kernels select(), in violation of POSIX, - * doesn't reliably return a socket as writable if closed - but we - * rely on that. So far all the known cases of this problem are on - * platforms that also provide a poll() implementation without that - * bug. If we find one where that's not the case, we'll need to add a - * workaround. - */ - FD_ZERO(&input_mask); - FD_ZERO(&output_mask); - - FD_SET(selfpipe_readfd, &input_mask); - hifd = selfpipe_readfd; - - if (wakeEvents & WL_POSTMASTER_DEATH) - { - FD_SET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask); - if (postmaster_alive_fds[POSTMASTER_FD_WATCH] > hifd) - hifd = postmaster_alive_fds[POSTMASTER_FD_WATCH]; - } - - if (wakeEvents & WL_SOCKET_READABLE) - { - FD_SET(sock, &input_mask); - if (sock > hifd) - hifd = sock; - } - - if (wakeEvents & WL_SOCKET_WRITEABLE) - { - FD_SET(sock, &output_mask); - if (sock > hifd) - hifd = sock; - } - - /* Sleep */ - rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp); - - /* Check return code */ - if (rc < 0) - { - /* EINTR is okay, otherwise complain */ - if (errno != EINTR) - { - waiting = false; - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("select() failed: %m"))); - } - } - else if (rc == 0) - { - /* timeout exceeded */ - if (wakeEvents & WL_TIMEOUT) - result |= WL_TIMEOUT; - } - else - { - /* at least one event occurred, so check masks */ - if (FD_ISSET(selfpipe_readfd, &input_mask)) - { - /* There's data in the self-pipe, clear it. */ - drainSelfPipe(); - } - if ((wakeEvents & WL_SOCKET_READABLE) && FD_ISSET(sock, &input_mask)) - { - /* data available in socket, or EOF */ - result |= WL_SOCKET_READABLE; - } - if ((wakeEvents & WL_SOCKET_WRITEABLE) && FD_ISSET(sock, &output_mask)) - { - /* socket is writable, or EOF */ - result |= WL_SOCKET_WRITEABLE; - } - if ((wakeEvents & WL_POSTMASTER_DEATH) && - FD_ISSET(postmaster_alive_fds[POSTMASTER_FD_WATCH], - &input_mask)) - { - /* - * According to the select(2) man page on Linux, select(2) may - * spuriously return and report a file descriptor as readable, - * when it's not; and presumably so can poll(2). It's not - * clear that the relevant cases would ever apply to the - * postmaster pipe, but since the consequences of falsely - * returning WL_POSTMASTER_DEATH could be pretty unpleasant, - * we take the trouble to positively verify EOF with - * PostmasterIsAlive(). - */ - if (!PostmasterIsAlive()) - result |= WL_POSTMASTER_DEATH; - } - } -#endif /* LATCH_USE_SELECT */ - - /* - * Check again whether latch is set, the arrival of a signal/self-byte - * might be what stopped our sleep. It's not required for correctness - * to signal the latch as being set (we'd just loop if there's no - * other event), but it seems good to report an arrived latch asap. - * This way we also don't have to compute the current timestamp again. - */ - if ((wakeEvents & WL_LATCH_SET) && latch->is_set) - result |= WL_LATCH_SET; - - /* If we're not done, update cur_timeout for next iteration */ - if (result == 0 && (wakeEvents & WL_TIMEOUT)) - { - INSTR_TIME_SET_CURRENT(cur_time); - INSTR_TIME_SUBTRACT(cur_time, start_time); - cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time); - if (cur_timeout <= 0) - { - /* Timeout has expired, no need to continue looping */ - result |= WL_TIMEOUT; - } -#ifdef LATCH_USE_SELECT - else - { - tv.tv_sec = cur_timeout / 1000L; - tv.tv_usec = (cur_timeout % 1000L) * 1000L; - } -#endif - } - } while (result == 0); - waiting = false; - - return result; -} -#else /* LATCH_USE_WIN32 */ -int -WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, - long timeout) -{ - DWORD rc; - instr_time start_time, - cur_time; - long cur_timeout; - HANDLE events[4]; - HANDLE latchevent; - HANDLE sockevent = WSA_INVALID_EVENT; - int numevents; - int result = 0; - int pmdeath_eventno = 0; - - Assert(wakeEvents != 0); /* must have at least one wake event */ - - /* waiting for socket readiness without a socket indicates a bug */ - if (sock == PGINVALID_SOCKET && - (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0) - elog(ERROR, "cannot wait on socket event without a socket"); - - if ((wakeEvents & WL_LATCH_SET) && latch->owner_pid != MyProcPid) - elog(ERROR, "cannot wait on a latch owned by another process"); - - /* - * Initialize timeout if requested. We must record the current time so - * that we can determine the remaining timeout if WaitForMultipleObjects - * is interrupted. - */ - if (wakeEvents & WL_TIMEOUT) - { - INSTR_TIME_SET_CURRENT(start_time); - Assert(timeout >= 0 && timeout <= INT_MAX); - cur_timeout = timeout; - } - else - cur_timeout = INFINITE; - - /* - * Construct an array of event handles for WaitforMultipleObjects(). - * - * Note: pgwin32_signal_event should be first to ensure that it will be - * reported when multiple events are set. We want to guarantee that - * pending signals are serviced. - */ - latchevent = latch->event; - - events[0] = pgwin32_signal_event; - events[1] = latchevent; - numevents = 2; if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) { - /* Need an event object to represent events on the socket */ - int flags = FD_CLOSE; /* always check for errors/EOF */ + int ev; - if (wakeEvents & WL_SOCKET_READABLE) - flags |= FD_READ; - if (wakeEvents & WL_SOCKET_WRITEABLE) - flags |= FD_WRITE; - - sockevent = WSACreateEvent(); - if (sockevent == WSA_INVALID_EVENT) - elog(ERROR, "failed to create event for socket: error code %u", - WSAGetLastError()); - if (WSAEventSelect(sock, sockevent, flags) != 0) - elog(ERROR, "failed to set up event for socket: error code %u", - WSAGetLastError()); - - events[numevents++] = sockevent; - } - if (wakeEvents & WL_POSTMASTER_DEATH) - { - pmdeath_eventno = numevents; - events[numevents++] = PostmasterHandle; + ev = wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); + AddWaitEventToSet(set, ev, sock, NULL, NULL); } - /* Ensure that signals are serviced even if latch is already set */ - pgwin32_dispatch_queued_signals(); + rc = WaitEventSetWait(set, timeout, &event, 1); - do + if (rc == 0) + ret |= WL_TIMEOUT; + else { - /* - * The comment in the unix version above applies here as well. At - * least after mentally replacing self-pipe with windows event. - * There's no danger of overflowing, as "Setting an event that is - * already set has no effect.". - */ - if ((wakeEvents & WL_LATCH_SET) && latch->is_set) - { - result |= WL_LATCH_SET; - - /* - * Leave loop immediately, avoid blocking again. We don't attempt - * to report any other events that might also be satisfied. - */ - break; - } - - rc = WaitForMultipleObjects(numevents, events, FALSE, cur_timeout); - - if (rc == WAIT_FAILED) - elog(ERROR, "WaitForMultipleObjects() failed: error code %lu", - GetLastError()); - else if (rc == WAIT_TIMEOUT) - { - result |= WL_TIMEOUT; - } - else if (rc == WAIT_OBJECT_0) - { - /* Service newly-arrived signals */ - pgwin32_dispatch_queued_signals(); - } - else if (rc == WAIT_OBJECT_0 + 1) - { - /* - * Reset the event. We'll re-check the, potentially, set latch on - * next iteration of loop, but let's not waste the cycles to - * update cur_timeout below. - */ - if (!ResetEvent(latchevent)) - elog(ERROR, "ResetEvent failed: error code %lu", GetLastError()); - - continue; - } - else if ((wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) && - rc == WAIT_OBJECT_0 + 2) /* socket is at event slot 2 */ - { - WSANETWORKEVENTS resEvents; - - ZeroMemory(&resEvents, sizeof(resEvents)); - if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) != 0) - elog(ERROR, "failed to enumerate network events: error code %u", - WSAGetLastError()); - if ((wakeEvents & WL_SOCKET_READABLE) && - (resEvents.lNetworkEvents & FD_READ)) - { - result |= WL_SOCKET_READABLE; - } - if ((wakeEvents & WL_SOCKET_WRITEABLE) && - (resEvents.lNetworkEvents & FD_WRITE)) - { - result |= WL_SOCKET_WRITEABLE; - } - if (resEvents.lNetworkEvents & FD_CLOSE) - { - if (wakeEvents & WL_SOCKET_READABLE) - result |= WL_SOCKET_READABLE; - if (wakeEvents & WL_SOCKET_WRITEABLE) - result |= WL_SOCKET_WRITEABLE; - } - } - else if ((wakeEvents & WL_POSTMASTER_DEATH) && - rc == WAIT_OBJECT_0 + pmdeath_eventno) - { - /* - * Postmaster apparently died. Since the consequences of falsely - * returning WL_POSTMASTER_DEATH could be pretty unpleasant, we - * take the trouble to positively verify this with - * PostmasterIsAlive(), even though there is no known reason to - * think that the event could be falsely set on Windows. - */ - if (!PostmasterIsAlive()) - result |= WL_POSTMASTER_DEATH; - } - else - elog(ERROR, "unexpected return code from WaitForMultipleObjects(): %lu", rc); - - /* If we're not done, update cur_timeout for next iteration */ - if (result == 0 && (wakeEvents & WL_TIMEOUT)) - { - INSTR_TIME_SET_CURRENT(cur_time); - INSTR_TIME_SUBTRACT(cur_time, start_time); - cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time); - if (cur_timeout <= 0) - { - /* Timeout has expired, no need to continue looping */ - result |= WL_TIMEOUT; - } - } - } while (result == 0); - - /* Clean up the event object we created for the socket */ - if (sockevent != WSA_INVALID_EVENT) - { - WSAEventSelect(sock, NULL, 0); - WSACloseEvent(sockevent); + ret |= event.events & (WL_LATCH_SET | + WL_POSTMASTER_DEATH | + WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE); } - return result; + FreeWaitEventSet(set); + + return ret; } -#endif /* LATCH_USE_WIN32 */ /* * Sets a latch and wakes up anyone waiting on it. @@ -892,6 +472,1019 @@ ResetLatch(volatile Latch *latch) pg_memory_barrier(); } +/* + * Create a WaitEventSet with space for nevents different events to wait for. + * + * These events can then efficiently waited upon together, using + * WaitEventSetWait(). + */ +WaitEventSet * +CreateWaitEventSet(MemoryContext context, int nevents) +{ + WaitEventSet *set; + char *data; + Size sz = 0; + + sz += sizeof(WaitEventSet); + sz += sizeof(WaitEvent) * nevents; + +#if defined(WAIT_USE_EPOLL) + sz += sizeof(struct epoll_event) * nevents; +#elif defined(WAIT_USE_POLL) + sz += sizeof(struct pollfd) * nevents; +#elif defined(WAIT_USE_WIN32) + /* need space for the pgwin32_signal_event */ + sz += sizeof(HANDLE) * (nevents + 1); +#endif + + data = (char *) MemoryContextAllocZero(context, sz); + + set = (WaitEventSet *) data; + data += sizeof(WaitEventSet); + + set->events = (WaitEvent *) data; + data += sizeof(WaitEvent) * nevents; + +#if defined(WAIT_USE_EPOLL) + set->epoll_ret_events = (struct epoll_event *) data; + data += sizeof(struct epoll_event) * nevents; +#elif defined(WAIT_USE_POLL) + set->pollfds = (struct pollfd *) data; + data += sizeof(struct pollfd) * nevents; +#elif defined(WAIT_USE_WIN32) + set->handles = (HANDLE) data; + data += sizeof(HANDLE) * nevents; +#endif + + set->latch = NULL; + set->nevents_space = nevents; + +#if defined(WAIT_USE_EPOLL) + set->epoll_fd = epoll_create(nevents); + if (set->epoll_fd < 0) + elog(ERROR, "epoll_create failed: %m"); +#elif defined(WAIT_USE_WIN32) + + /* + * To handle signals while waiting, we need to add a win32 specific event. + * We accounted for the additional event at the top of this routine. See + * port/win32/signal.c for more details. + * + * Note: pgwin32_signal_event should be first to ensure that it will be + * reported when multiple events are set. We want to guarantee that + * pending signals are serviced. + */ + set->handles[0] = pgwin32_signal_event; + StaticAssertStmt(WSA_INVALID_EVENT == NULL, ""); +#endif + + return set; +} + +/* + * Free a previously created WaitEventSet. + */ +void +FreeWaitEventSet(WaitEventSet *set) +{ +#if defined(WAIT_USE_EPOLL) + close(set->epoll_fd); +#elif defined(WAIT_USE_WIN32) + WaitEvent *cur_event; + + for (cur_event = set->events; + cur_event < (set->events + set->nevents); + cur_event++) + { + if (cur_event->events & WL_LATCH_SET) + { + /* uses the latch's HANDLE */ + } + else if (cur_event->events & WL_POSTMASTER_DEATH) + { + /* uses PostmasterHandle */ + } + else + { + /* Clean up the event object we created for the socket */ + WSAEventSelect(cur_event->fd, NULL, 0); + WSACloseEvent(set->handles[cur_event->pos + 1]); + } + } +#endif + + pfree(set); +} + +/* --- + * Add an event to the set. Possible events are: + * - WL_LATCH_SET: Wait for the latch to be set + * - WL_POSTMASTER_DEATH: Wait for postmaster to die + * - WL_SOCKET_READABLE: Wait for socket to become readable + * can be combined in one event with WL_SOCKET_WRITEABLE + * - WL_SOCKET_WRITEABLE: Wait for socket to become writeable + * can be combined with WL_SOCKET_READABLE + * + * Returns the offset in WaitEventSet->events (starting from 0), which can be + * used to modify previously added wait events using ModifyWaitEvent(). + * + * In the WL_LATCH_SET case the latch must be owned by the current process, + * i.e. it must be a backend-local latch initialized with InitLatch, or a + * shared latch associated with the current process by calling OwnLatch. + * + * In the WL_SOCKET_READABLE/WRITEABLE case, EOF and error conditions are + * reported by returning the socket as readable/writable or both, depending on + * WL_SOCKET_READABLE/WRITEABLE being specified. + * + * The user_data pointer specified here will be set for the events returned + * by WaitEventSetWait(), allowing to easily associate additional data with + * events. + */ +int +AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, + void *user_data) +{ + WaitEvent *event; + + /* not enough space */ + Assert(set->nevents < set->nevents_space); + + if (latch) + { + if (latch->owner_pid != MyProcPid) + elog(ERROR, "cannot wait on a latch owned by another process"); + if (set->latch) + elog(ERROR, "cannot wait on more than one latch"); + if ((events & WL_LATCH_SET) != WL_LATCH_SET) + elog(ERROR, "latch events only spuport being set"); + } + else + { + if (events & WL_LATCH_SET) + elog(ERROR, "cannot wait on latch without a specified latch"); + } + + /* waiting for socket readiness without a socket indicates a bug */ + if (fd == PGINVALID_SOCKET && + (events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))) + elog(ERROR, "cannot wait on socket event without a socket"); + + event = &set->events[set->nevents]; + event->pos = set->nevents++; + event->fd = fd; + event->events = events; + event->user_data = user_data; + + if (events == WL_LATCH_SET) + { + set->latch = latch; + set->latch_pos = event->pos; +#ifndef WIN32 + event->fd = selfpipe_readfd; +#endif + } + else if (events == WL_POSTMASTER_DEATH) + { +#ifndef WIN32 + event->fd = postmaster_alive_fds[POSTMASTER_FD_WATCH]; +#endif + } + + /* perform wait primitive specific initialization, if needed */ +#if defined(WAIT_USE_EPOLL) + WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD); +#elif defined(WAIT_USE_POLL) + WaitEventAdjustPoll(set, event); +#elif defined(WAIT_USE_SELECT) + /* nothing to do */ +#elif defined(WAIT_USE_WIN32) + WaitEventAdjustWin32(set, event); +#endif + + return event->pos; +} + +/* + * Change the event mask and, in the WL_LATCH_SET case, the latch associated + * with the WaitEvent. + * + * 'pos' is the id returned by AddWaitEventToSet. + */ +void +ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) +{ + WaitEvent *event; + + Assert(pos < set->nevents); + + event = &set->events[pos]; + + /* + * If neither the event mask nor the associated latch changes, return + * early. That's an important optimization for some sockets, where + * ModifyWaitEvent is frequently used to switch from waiting for reads to + * waiting on writes. + */ + if (events == event->events && + (!(event->events & WL_LATCH_SET) || set->latch == latch)) + return; + + if (event->events & WL_LATCH_SET && + events != event->events) + { + /* we could allow to disable latch events for a while */ + elog(ERROR, "cannot modify latch event"); + } + + if (event->events & WL_POSTMASTER_DEATH) + { + elog(ERROR, "cannot modify postmaster death event"); + } + + /* FIXME: validate event mask */ + event->events = events; + + if (events == WL_LATCH_SET) + { + set->latch = latch; + } + +#if defined(WAIT_USE_EPOLL) + WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD); +#elif defined(WAIT_USE_POLL) + WaitEventAdjustPoll(set, event); +#elif defined(WAIT_USE_SELECT) + /* nothing to do */ +#elif defined(WAIT_USE_WIN32) + WaitEventAdjustWin32(set, event); +#endif +} + +#if defined(WAIT_USE_EPOLL) +/* + * action can be one of EPOLL_CTL_ADD | EPOLL_CTL_MOD | EPOLL_CTL_DEL + */ +static void +WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) +{ + struct epoll_event epoll_ev; + int rc; + + /* pointer to our event, returned by epoll_wait */ + epoll_ev.data.ptr = event; + /* always wait for errors */ + epoll_ev.events = EPOLLERR | EPOLLHUP; + + /* prepare pollfd entry once */ + if (event->events == WL_LATCH_SET) + { + Assert(set->latch != NULL); + epoll_ev.events |= EPOLLIN; + } + else if (event->events == WL_POSTMASTER_DEATH) + { + epoll_ev.events |= EPOLLIN; + } + else + { + Assert(event->fd != PGINVALID_SOCKET); + Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + + if (event->events & WL_SOCKET_READABLE) + epoll_ev.events |= EPOLLIN; + if (event->events & WL_SOCKET_WRITEABLE) + epoll_ev.events |= EPOLLOUT; + } + + /* + * Even though unused, we also pass epoll_ev as the data argument if + * EPOLL_CTL_DEL is passed as action. There used to be an epoll bug + * requiring that, and actually it makes the code simpler... + */ + rc = epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev); + + if (rc < 0) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("epoll_ctl() failed: %m"))); +} +#endif + +#if defined(WAIT_USE_POLL) +static void +WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) +{ + struct pollfd *pollfd = &set->pollfds[event->pos]; + + pollfd->revents = 0; + pollfd->fd = event->fd; + + /* prepare pollfd entry once */ + if (event->events == WL_LATCH_SET) + { + Assert(set->latch != NULL); + pollfd->events = POLLIN; + } + else if (event->events == WL_POSTMASTER_DEATH) + { + pollfd->events = POLLIN; + } + else + { + Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + pollfd->events = 0; + if (event->events & WL_SOCKET_READABLE) + pollfd->events |= POLLIN; + if (event->events & WL_SOCKET_WRITEABLE) + pollfd->events |= POLLOUT; + } + + Assert(event->fd != PGINVALID_SOCKET); +} +#endif + +#if defined(WAIT_USE_WIN32) +static void +WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) +{ + HANDLE *handle = &set->handles[event->pos + 1]; + + if (event->events == WL_LATCH_SET) + { + Assert(set->latch != NULL); + *handle = set->latch->event; + } + else if (event->events == WL_POSTMASTER_DEATH) + { + *handle = PostmasterHandle; + } + else + { + int flags = FD_CLOSE; /* always check for errors/EOF */ + + if (event->events & WL_SOCKET_READABLE) + flags |= FD_READ; + if (event->events & WL_SOCKET_WRITEABLE) + flags |= FD_WRITE; + + if (*handle == WSA_INVALID_EVENT) + { + *handle = WSACreateEvent(); + if (*handle == WSA_INVALID_EVENT) + elog(ERROR, "failed to create event for socket: error code %u", + WSAGetLastError()); + } + if (WSAEventSelect(event->fd, *handle, flags) != 0) + elog(ERROR, "failed to set up event for socket: error code %u", + WSAGetLastError()); + + Assert(event->fd != PGINVALID_SOCKET); + } +} +#endif + +/* + * Wait for events added to the set to happen, or until the timeout is + * reached. At most nevents occurred events are returned. + * + * If timeout = -1, block until an event occurs; if 0, check sockets for + * readiness, but don't block; if > 0, block for at most timeout miliseconds. + * + * Returns the number of events occurred, or 0 if the timeout was reached. + * + * Returned events will have the fd, pos, user_data fields set to the + * values associated with the registered event. + */ +int +WaitEventSetWait(WaitEventSet *set, long timeout, + WaitEvent *occurred_events, int nevents) +{ + int returned_events = 0; + instr_time start_time; + instr_time cur_time; + long cur_timeout = -1; + + Assert(nevents > 0); + + /* + * Initialize timeout if requested. We must record the current time so + * that we can determine the remaining timeout if interrupted. + */ + if (timeout >= 0) + { + INSTR_TIME_SET_CURRENT(start_time); + Assert(timeout >= 0 && timeout <= INT_MAX); + cur_timeout = timeout; + } + +#ifndef WIN32 + waiting = true; +#else + /* Ensure that signals are serviced even if latch is already set */ + pgwin32_dispatch_queued_signals(); +#endif + while (returned_events == 0) + { + int rc; + + /* + * Check if the latch is set already. If so, leave the loop + * immediately, avoid blocking again. We don't attempt to report any + * other events that might also be satisfied. + * + * If someone sets the latch between this and the + * WaitEventSetWaitBlock() below, the setter will write a byte to the + * pipe (or signal us and the signal handler will do that), and the + * readiness routine will return immediately. + * + * On unix, If there's a pending byte in the self pipe, we'll notice + * whenever blocking. Only clearing the pipe in that case avoids + * having to drain it every time WaitLatchOrSocket() is used. Should + * the pipe-buffer fill up we're still ok, because the pipe is in + * nonblocking mode. It's unlikely for that to happen, because the + * self pipe isn't filled unless we're blocking (waiting = true), or + * from inside a signal handler in latch_sigusr1_handler(). + * + * On windows, we'll also notice if there's a pending event for the + * latch when blocking, but there's no danger of anything filling up, + * as "Setting an event that is already set has no effect.". + * + * Note: we assume that the kernel calls involved in latch management + * will provide adequate synchronization on machines with weak memory + * ordering, so that we cannot miss seeing is_set if a notification + * has already been queued. + */ + if (set->latch && set->latch->is_set) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->pos = set->latch_pos; + occurred_events->user_data = + set->events[set->latch_pos].user_data; + occurred_events->events = WL_LATCH_SET; + occurred_events++; + returned_events++; + + break; + } + + /* + * Wait for events using the readiness primitive chosen at the top of + * this file. If -1 is returned, a timeout has occurred, if 0 we have + * to retry, everything >= 1 is the number of returned events. + */ + rc = WaitEventSetWaitBlock(set, cur_timeout, + occurred_events, nevents); + + if (rc == -1) + break; /* timeout occurred */ + else + returned_events = rc; + + /* If we're not done, update cur_timeout for next iteration */ + if (returned_events == 0 && timeout >= 0) + { + INSTR_TIME_SET_CURRENT(cur_time); + INSTR_TIME_SUBTRACT(cur_time, start_time); + cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time); + if (cur_timeout <= 0) + break; + } + } +#ifndef WIN32 + waiting = false; +#endif + + return returned_events; +} + + +#if defined(WAIT_USE_EPOLL) + +/* + * Wait using linux's epoll_wait(2). + * + * This is the preferrable wait method, as several readiness notifications are + * delivered, without having to iterate through all of set->events. The return + * epoll_event struct contain a pointer to our events, making association + * easy. + */ +static int +WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, + WaitEvent *occurred_events, int nevents) +{ + int returned_events = 0; + int rc; + WaitEvent *cur_event; + struct epoll_event *cur_epoll_event; + + /* Sleep */ + rc = epoll_wait(set->epoll_fd, set->epoll_ret_events, + nevents, cur_timeout); + + /* Check return code */ + if (rc < 0) + { + /* EINTR is okay, otherwise complain */ + if (errno != EINTR) + { + waiting = false; + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("epoll_wait() failed: %m"))); + } + return 0; + } + else if (rc == 0) + { + /* timeout exceeded */ + return -1; + } + + /* + * At least one event occurred, iterate over the returned epoll events + * until they're either all processed, or we've returned all the events + * the caller desired. + */ + for (cur_epoll_event = set->epoll_ret_events; + cur_epoll_event < (set->epoll_ret_events + rc) && + returned_events < nevents; + cur_epoll_event++) + { + /* epoll's data pointer is set to the associated WaitEvent */ + cur_event = (WaitEvent *) cur_epoll_event->data.ptr; + + occurred_events->pos = cur_event->pos; + occurred_events->user_data = cur_event->user_data; + occurred_events->events = 0; + + if (cur_event->events == WL_LATCH_SET && + cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)) + { + /* There's data in the self-pipe, clear it. */ + drainSelfPipe(); + + if (set->latch->is_set) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->events = WL_LATCH_SET; + occurred_events++; + returned_events++; + } + } + else if (cur_event->events == WL_POSTMASTER_DEATH && + cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)) + { + /* + * We expect an EPOLLHUP when the remote end is closed, but + * because we don't expect the pipe to become readable or to have + * any errors either, treat those cases as postmaster death, too. + * + * As explained in the WAIT_USE_SELECT implementation, select(2) + * may spuriously return. Be paranoid about that here too, a + * spurious WL_POSTMASTER_DEATH would be painful. + */ + if (!PostmasterIsAlive()) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->events = WL_POSTMASTER_DEATH; + occurred_events++; + returned_events++; + } + } + else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + { + Assert(cur_event->fd != PGINVALID_SOCKET); + + if ((cur_event->events & WL_SOCKET_READABLE) && + (cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))) + { + /* data available in socket, or EOF */ + occurred_events->events |= WL_SOCKET_READABLE; + } + + if ((cur_event->events & WL_SOCKET_WRITEABLE) && + (cur_epoll_event->events & (EPOLLOUT | EPOLLERR | EPOLLHUP))) + { + /* writable, or EOF */ + occurred_events->events |= WL_SOCKET_WRITEABLE; + } + + if (occurred_events->events != 0) + { + occurred_events->fd = cur_event->fd; + occurred_events++; + returned_events++; + } + } + } + + return returned_events; +} + +#elif defined(WAIT_USE_POLL) + +/* + * Wait using poll(2). + * + * This allows to receive readiness notifications for several events at once, + * but requires iterating through all of set->pollfds. + */ +static inline int +WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, + WaitEvent *occurred_events, int nevents) +{ + int returned_events = 0; + int rc; + WaitEvent *cur_event; + struct pollfd *cur_pollfd; + + /* Sleep */ + rc = poll(set->pollfds, set->nevents, (int) cur_timeout); + + /* Check return code */ + if (rc < 0) + { + /* EINTR is okay, otherwise complain */ + if (errno != EINTR) + { + waiting = false; + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("poll() failed: %m"))); + } + return 0; + } + else if (rc == 0) + { + /* timeout exceeded */ + return -1; + } + + for (cur_event = set->events, cur_pollfd = set->pollfds; + cur_event < (set->events + set->nevents) && + returned_events < nevents; + cur_event++, cur_pollfd++) + { + /* no activity on this FD, skip */ + if (cur_pollfd->revents == 0) + continue; + + occurred_events->pos = cur_event->pos; + occurred_events->user_data = cur_event->user_data; + occurred_events->events = 0; + + if (cur_event->events == WL_LATCH_SET && + (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL))) + { + /* There's data in the self-pipe, clear it. */ + drainSelfPipe(); + + if (set->latch->is_set) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->events = WL_LATCH_SET; + occurred_events++; + returned_events++; + } + } + else if (cur_event->events == WL_POSTMASTER_DEATH && + (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL))) + { + /* + * We expect an POLLHUP when the remote end is closed, but because + * we don't expect the pipe to become readable or to have any + * errors either, treat those cases as postmaster death, too. + * + * As explained in the WAIT_USE_SELECT implementation, select(2) + * may spuriously return. Be paranoid about that here too, a + * spurious WL_POSTMASTER_DEATH would be painful. + */ + if (!PostmasterIsAlive()) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->events = WL_POSTMASTER_DEATH; + occurred_events++; + returned_events++; + } + } + else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + { + int errflags = POLLHUP | POLLERR | POLLNVAL; + + Assert(cur_event->fd >= PGINVALID_SOCKET); + + if ((cur_event->events & WL_SOCKET_READABLE) && + (cur_pollfd->revents & (POLLIN | errflags))) + { + /* data available in socket, or EOF */ + occurred_events->events |= WL_SOCKET_READABLE; + } + + if ((cur_event->events & WL_SOCKET_WRITEABLE) && + (cur_pollfd->revents & (POLLOUT | errflags))) + { + /* writeable, or EOF */ + occurred_events->events |= WL_SOCKET_WRITEABLE; + } + + if (occurred_events->events != 0) + { + occurred_events->fd = cur_event->fd; + occurred_events++; + returned_events++; + } + } + } + return returned_events; +} + +#elif defined(WAIT_USE_SELECT) + +/* + * Wait using select(2). + * + * XXX: On at least older linux kernels select(), in violation of POSIX, + * doesn't reliably return a socket as writable if closed - but we rely on + * that. So far all the known cases of this problem are on platforms that also + * provide a poll() implementation without that bug. If we find one where + * that's not the case, we'll need to add a workaround. + */ +static inline int +WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, + WaitEvent *occurred_events, int nevents) +{ + int returned_events = 0; + int rc; + WaitEvent *cur_event; + fd_set input_mask; + fd_set output_mask; + int hifd; + struct timeval tv; + struct timeval *tvp = NULL; + + FD_ZERO(&input_mask); + FD_ZERO(&output_mask); + + /* + * Prepare input/output masks. We do so every loop iteration as there's no + * entirely portable way to copy fd_sets. + */ + for (cur_event = set->events; + cur_event < (set->events + set->nevents); + cur_event++) + { + if (cur_event->events == WL_LATCH_SET) + FD_SET(cur_event->fd, &input_mask); + else if (cur_event->events == WL_POSTMASTER_DEATH) + FD_SET(cur_event->fd, &input_mask); + else + { + Assert(cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + if (cur_event->events == WL_SOCKET_READABLE) + FD_SET(cur_event->fd, &input_mask); + else if (cur_event->events == WL_SOCKET_WRITEABLE) + FD_SET(cur_event->fd, &output_mask); + } + + if (cur_event->fd > hifd) + hifd = cur_event->fd; + } + + /* Sleep */ + if (cur_timeout >= 0) + { + tv.tv_sec = cur_timeout / 1000L; + tv.tv_usec = (cur_timeout % 1000L) * 1000L; + tvp = &tv; + } + rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp); + + /* Check return code */ + if (rc < 0) + { + /* EINTR is okay, otherwise complain */ + if (errno != EINTR) + { + waiting = false; + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("select() failed: %m"))); + } + return 0; /* retry */ + } + else if (rc == 0) + { + /* timeout exceeded */ + return -1; + } + + /* + * To associate events with select's masks, we have to check the status of + * the file descriptors associated with an event; by looping through all + * events. + */ + for (cur_event = set->events; + cur_event < (set->events + set->nevents) + && returned_events < nevents; + cur_event++) + { + occurred_events->pos = cur_event->pos; + occurred_events->user_data = cur_event->user_data; + occurred_events->events = 0; + + if (cur_event->events == WL_LATCH_SET && + FD_ISSET(cur_event->fd, &input_mask)) + { + /* There's data in the self-pipe, clear it. */ + drainSelfPipe(); + + if (set->latch->is_set) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->events = WL_LATCH_SET; + occurred_events++; + returned_events++; + } + } + else if (cur_event->events == WL_POSTMASTER_DEATH && + FD_ISSET(cur_event->fd, &input_mask)) + { + /* + * According to the select(2) man page on Linux, select(2) may + * spuriously return and report a file descriptor as readable, + * when it's not; and presumably so can poll(2). It's not clear + * that the relevant cases would ever apply to the postmaster + * pipe, but since the consequences of falsely returning + * WL_POSTMASTER_DEATH could be pretty unpleasant, we take the + * trouble to positively verify EOF with PostmasterIsAlive(). + */ + if (!PostmasterIsAlive()) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->events = WL_POSTMASTER_DEATH; + occurred_events++; + returned_events++; + } + } + else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + { + Assert(cur_event->fd != PGINVALID_SOCKET); + + if ((cur_event->events & WL_SOCKET_READABLE) && + FD_ISSET(cur_event->fd, &input_mask)) + { + /* data available in socket, or EOF */ + occurred_events->events |= WL_SOCKET_READABLE; + } + + if ((cur_event->events & WL_SOCKET_WRITEABLE) && + FD_ISSET(cur_event->fd, &output_mask)) + { + /* socket is writeable, or EOF */ + occurred_events->events |= WL_SOCKET_WRITEABLE; + } + + if (occurred_events->events != 0) + { + occurred_events->fd = cur_event->fd; + occurred_events++; + returned_events++; + } + } + } + return returned_events; +} + +#elif defined(WAIT_USE_WIN32) + +/* + * Wait using Windows' WaitForMultipleObjects(). + * + * Unfortunately this will only ever return a single readiness notification at + * a time. Note that while the official documentation for + * WaitForMultipleObjects is ambiguous about multiple events being "consumed" + * with a single bWaitAll = FALSE call, + * https://blogs.msdn.microsoft.com/oldnewthing/20150409-00/?p=44273 confirms + * that only one event is "consumed". + */ +static inline int +WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, + WaitEvent *occurred_events, int nevents) +{ + int returned_events = 0; + DWORD rc; + WaitEvent *cur_event; + + /* + * Sleep. + * + * Need to wait for ->nevents + 1, because signal handle is in [0]. + */ + rc = WaitForMultipleObjects(set->nevents + 1, set->handles, FALSE, + cur_timeout); + + /* Check return code */ + if (rc == WAIT_FAILED) + elog(ERROR, "WaitForMultipleObjects() failed: error code %lu", + GetLastError()); + else if (rc == WAIT_TIMEOUT) + { + /* timeout exceeded */ + return -1; + } + + if (rc == WAIT_OBJECT_0) + { + /* Service newly-arrived signals */ + pgwin32_dispatch_queued_signals(); + return 0; /* retry */ + } + + /* + * With an offset of one, due to the always present pgwin32_signal_event, + * the handle offset directly corresponds to a wait event. + */ + cur_event = (WaitEvent *) &set->events[rc - WAIT_OBJECT_0 - 1]; + + occurred_events->pos = cur_event->pos; + occurred_events->user_data = cur_event->user_data; + occurred_events->events = 0; + + if (cur_event->events == WL_LATCH_SET) + { + if (!ResetEvent(set->latch->event)) + elog(ERROR, "ResetEvent failed: error code %lu", GetLastError()); + + if (set->latch->is_set) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->events = WL_LATCH_SET; + occurred_events++; + returned_events++; + } + } + else if (cur_event->events == WL_POSTMASTER_DEATH) + { + /* + * Postmaster apparently died. Since the consequences of falsely + * returning WL_POSTMASTER_DEATH could be pretty unpleasant, we take + * the trouble to positively verify this with PostmasterIsAlive(), + * even though there is no known reason to think that the event could + * be falsely set on Windows. + */ + if (!PostmasterIsAlive()) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->events = WL_POSTMASTER_DEATH; + occurred_events++; + returned_events++; + } + } + else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + { + WSANETWORKEVENTS resEvents; + HANDLE handle = set->handles[cur_event->pos + 1]; + + Assert(cur_event->fd); + + occurred_events->fd = cur_event->fd; + + ZeroMemory(&resEvents, sizeof(resEvents)); + if (WSAEnumNetworkEvents(cur_event->fd, handle, &resEvents) != 0) + elog(ERROR, "failed to enumerate network events: error code %u", + WSAGetLastError()); + if ((cur_event->events & WL_SOCKET_READABLE) && + (resEvents.lNetworkEvents & FD_READ)) + { + /* data available in socket */ + occurred_events->events |= WL_SOCKET_READABLE; + } + if ((cur_event->events & WL_SOCKET_WRITEABLE) && + (resEvents.lNetworkEvents & FD_WRITE)) + { + /* writeable */ + occurred_events->events |= WL_SOCKET_WRITEABLE; + } + if (resEvents.lNetworkEvents & FD_CLOSE) + { + /* EOF */ + if (cur_event->events & WL_SOCKET_READABLE) + occurred_events->events |= WL_SOCKET_READABLE; + if (cur_event->events & WL_SOCKET_WRITEABLE) + occurred_events->events |= WL_SOCKET_WRITEABLE; + } + + if (occurred_events->events != 0) + { + occurred_events++; + returned_events++; + } + } + + return returned_events; +} +#endif + /* * SetLatch uses SIGUSR1 to wake up the process waiting on the latch. * diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 18f5e6fbfe..d13355bf66 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -33,6 +33,7 @@ #include "access/htup_details.h" #include "catalog/pg_authid.h" +#include "libpq/libpq.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "postmaster/autovacuum.h" @@ -247,6 +248,9 @@ SwitchToSharedLatch(void) MyLatch = &MyProc->procLatch; + if (FeBeWaitSet) + ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch); + /* * Set the shared latch as the local one might have been set. This * shouldn't normally be necessary as code is supposed to check the @@ -262,6 +266,10 @@ SwitchBackToLocalLatch(void) Assert(MyProc != NULL && MyLatch == &MyProc->procLatch); MyLatch = &LocalLatchData; + + if (FeBeWaitSet) + ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch); + SetLatch(MyLatch); } diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 0569994f75..109fdf72b5 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -19,6 +19,7 @@ #include "lib/stringinfo.h" #include "libpq/libpq-be.h" +#include "storage/latch.h" typedef struct @@ -95,6 +96,8 @@ extern ssize_t secure_raw_write(Port *port, const void *ptr, size_t len); extern bool ssl_loaded_verify_locations; +WaitEventSet *FeBeWaitSet; + /* GUCs */ extern char *SSLCipherSuites; extern char *SSLECDHCurve; diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index 3813226929..c72635ca96 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -530,6 +530,9 @@ /* Define to 1 if you have the syslog interface. */ #undef HAVE_SYSLOG +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_EPOLL_H + /* Define to 1 if you have the header file. */ #undef HAVE_SYS_IOCTL_H diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 1b9521f5a6..85d211c0e1 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -68,6 +68,12 @@ * use of any generic handler. * * + * WaitEventSets allow to wait for latches being set and additional events - + * postmaster dying and socket readiness of several sockets currently - at the + * same time. On many platforms using a long lived event set is more + * efficient than using WaitLatch or WaitLatchOrSocket. + * + * * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * @@ -95,13 +101,27 @@ typedef struct Latch #endif } Latch; -/* Bitmasks for events that may wake-up WaitLatch() clients */ +/* + * Bitmasks for events that may wake-up WaitLatch(), WaitLatchOrSocket(), or + * WaitEventSetWait(). + */ #define WL_LATCH_SET (1 << 0) #define WL_SOCKET_READABLE (1 << 1) #define WL_SOCKET_WRITEABLE (1 << 2) -#define WL_TIMEOUT (1 << 3) +#define WL_TIMEOUT (1 << 3) /* not for WaitEventSetWait() */ #define WL_POSTMASTER_DEATH (1 << 4) +typedef struct WaitEvent +{ + int pos; /* position in the event data structure */ + uint32 events; /* triggered events */ + pgsocket fd; /* socket fd associated with event */ + void *user_data; /* pointer provided in AddWaitEventToSet */ +} WaitEvent; + +/* forward declaration to avoid exposing latch.c implementation details */ +typedef struct WaitEventSet WaitEventSet; + /* * prototypes for functions in latch.c */ @@ -110,12 +130,19 @@ extern void InitLatch(volatile Latch *latch); extern void InitSharedLatch(volatile Latch *latch); extern void OwnLatch(volatile Latch *latch); extern void DisownLatch(volatile Latch *latch); -extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout); -extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, - pgsocket sock, long timeout); extern void SetLatch(volatile Latch *latch); extern void ResetLatch(volatile Latch *latch); +extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents); +extern void FreeWaitEventSet(WaitEventSet *set); +extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, + Latch *latch, void *user_data); +extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch); + +extern int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents); +extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout); +extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, + pgsocket sock, long timeout); /* * Unix implementation uses SIGUSR1 for inter-process signaling. diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b850db0b05..c2511def9e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2113,6 +2113,8 @@ WalSnd WalSndCtlData WalSndSendDataCallback WalSndState +WaitEvent +WaitEventSet WholeRowVarExprState WindowAgg WindowAggState