diff --git a/configure b/configure index bd9b347f25..432cd58c8d 100755 --- a/configure +++ b/configure @@ -27773,6 +27773,13 @@ _ACEOF SHMEM_IMPLEMENTATION="src/backend/port/win32_shmem.c" fi +# Select latch implementation type. +if test "$PORTNAME" != "win32"; then + LATCH_IMPLEMENTATION="src/backend/port/unix_latch.c" +else + LATCH_IMPLEMENTATION="src/backend/port/win32_latch.c" +fi + # If not set in template file, set bytes to use libc memset() if test x"$MEMSET_LOOP_LIMIT" = x"" ; then MEMSET_LOOP_LIMIT=1024 @@ -29098,7 +29105,7 @@ fi ac_config_files="$ac_config_files GNUmakefile src/Makefile.global" -ac_config_links="$ac_config_links src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION} src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION} src/include/dynloader.h:src/backend/port/dynloader/${template}.h src/include/pg_config_os.h:src/include/port/${template}.h src/Makefile.port:src/makefiles/Makefile.${template}" +ac_config_links="$ac_config_links src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION} src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION} src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION} src/include/dynloader.h:src/backend/port/dynloader/${template}.h src/include/pg_config_os.h:src/include/port/${template}.h src/Makefile.port:src/makefiles/Makefile.${template}" if test "$PORTNAME" = "win32"; then @@ -29722,6 +29729,7 @@ do "src/backend/port/dynloader.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c" ;; "src/backend/port/pg_sema.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION}" ;; "src/backend/port/pg_shmem.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION}" ;; + "src/backend/port/pg_latch.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION}" ;; "src/include/dynloader.h") CONFIG_LINKS="$CONFIG_LINKS src/include/dynloader.h:src/backend/port/dynloader/${template}.h" ;; "src/include/pg_config_os.h") CONFIG_LINKS="$CONFIG_LINKS src/include/pg_config_os.h:src/include/port/${template}.h" ;; "src/Makefile.port") CONFIG_LINKS="$CONFIG_LINKS src/Makefile.port:src/makefiles/Makefile.${template}" ;; diff --git a/configure.in b/configure.in index 4b2e88d15a..a4ba42e7dc 100644 --- a/configure.in +++ b/configure.in @@ -1,5 +1,5 @@ dnl Process this file with autoconf to produce a configure script. -dnl $PostgreSQL: pgsql/configure.in,v 1.633 2010/07/09 04:10:58 tgl Exp $ +dnl $PostgreSQL: pgsql/configure.in,v 1.634 2010/09/11 15:48:04 heikki Exp $ dnl dnl Developers, please strive to achieve this order: dnl @@ -1700,6 +1700,13 @@ else SHMEM_IMPLEMENTATION="src/backend/port/win32_shmem.c" fi +# Select latch implementation type. +if test "$PORTNAME" != "win32"; then + LATCH_IMPLEMENTATION="src/backend/port/unix_latch.c" +else + LATCH_IMPLEMENTATION="src/backend/port/win32_latch.c" +fi + # If not set in template file, set bytes to use libc memset() if test x"$MEMSET_LOOP_LIMIT" = x"" ; then MEMSET_LOOP_LIMIT=1024 @@ -1841,6 +1848,7 @@ AC_CONFIG_LINKS([ src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION} src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION} + src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION} src/include/dynloader.h:src/backend/port/dynloader/${template}.h src/include/pg_config_os.h:src/include/port/${template}.h src/Makefile.port:src/makefiles/Makefile.${template} diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e3c3bc8dbc..3a3302e8b8 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.63 2010/08/13 20:10:50 rhaas Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.64 2010/09/11 15:48:04 heikki Exp $ * * NOTES * Each global transaction is associated with a global transaction @@ -55,6 +55,7 @@ #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/procarray.h" #include "storage/sinvaladt.h" @@ -1025,6 +1026,13 @@ EndPrepare(GlobalTransaction gxact) /* If we crash now, we have prepared: WAL replay will fix things */ + /* + * Wake up all walsenders to send WAL up to the PREPARE record + * immediately if replication is enabled + */ + if (max_wal_senders > 0) + WalSndWakeup(); + /* write correct CRC and close file */ if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) { @@ -2005,6 +2013,13 @@ RecordTransactionCommitPrepared(TransactionId xid, /* Flush XLOG to disk */ XLogFlush(recptr); + /* + * Wake up all walsenders to send WAL up to the COMMIT PREPARED record + * immediately if replication is enabled + */ + if (max_wal_senders > 0) + WalSndWakeup(); + /* Mark the transaction committed in pg_clog */ TransactionIdCommitTree(xid, nchildren, children); @@ -2077,6 +2092,13 @@ RecordTransactionAbortPrepared(TransactionId xid, /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); + /* + * Wake up all walsenders to send WAL up to the ABORT PREPARED record + * immediately if replication is enabled + */ + if (max_wal_senders > 0) + WalSndWakeup(); + /* * Mark the transaction aborted in clog. This is not absolutely necessary * but we may as well do it while we are here. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 6015eaab1d..89f17a9d16 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.298 2010/08/13 20:10:50 rhaas Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.299 2010/09/11 15:48:04 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -36,6 +36,7 @@ #include "libpq/be-fsstubs.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/lmgr.h" @@ -1067,6 +1068,13 @@ RecordTransactionCommit(void) XLogFlush(XactLastRecEnd); + /* + * Wake up all walsenders to send WAL up to the COMMIT record + * immediately if replication is enabled + */ + if (max_wal_senders > 0) + WalSndWakeup(); + /* * Now we may update the CLOG, if we wrote a COMMIT record above */ diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile index 2009919ad9..ed02523f38 100644 --- a/src/backend/port/Makefile +++ b/src/backend/port/Makefile @@ -13,7 +13,7 @@ # be converted to Method 2. # # IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/port/Makefile,v 1.28 2010/07/05 18:54:37 tgl Exp $ +# $PostgreSQL: pgsql/src/backend/port/Makefile,v 1.29 2010/09/11 15:48:04 heikki Exp $ # #------------------------------------------------------------------------- @@ -21,7 +21,7 @@ subdir = src/backend/port top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = dynloader.o pg_sema.o pg_shmem.o $(TAS) +OBJS = dynloader.o pg_sema.o pg_shmem.o pg_latch.o $(TAS) ifeq ($(PORTNAME), darwin) SUBDIRS += darwin diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c new file mode 100644 index 0000000000..ba136ea5ae --- /dev/null +++ b/src/backend/port/unix_latch.c @@ -0,0 +1,435 @@ +/*------------------------------------------------------------------------- + * + * unix_latch.c + * Routines for inter-process latches + * + * A latch is a boolean variable, with operations that let you to sleep + * until it is set. A latch can be set from another process, or a signal + * handler within the same process. + * + * The latch interface is a reliable replacement for the common pattern of + * using pg_usleep() or select() to wait until a signal arrives, where the + * signal handler sets a global variable. Because on some platforms, an + * incoming signal doesn't interrupt sleep, and even on platforms where it + * does there is a race condition if the signal arrives just before + * entering the sleep, the common pattern must periodically wake up and + * poll the global variable. pselect() system call was invented to solve + * the problem, but it is not portable enough. Latches are designed to + * overcome these limitations, allowing you to sleep without polling and + * ensuring a quick response to signals from other processes. + * + * There are two kinds of latches: local and shared. A local latch is + * initialized by InitLatch, and can only be set from the same process. + * A local latch can be used to wait for a signal to arrive, by calling + * SetLatch in the signal handler. A shared latch resides in shared memory, + * and must be initialized at postmaster startup by InitSharedLatch. Before + * a shared latch can be waited on, it must be associated with a process + * with OwnLatch. Only the process owning the latch can wait on it, but any + * process can set it. + * + * There are three basic operations on a latch: + * + * SetLatch - Sets the latch + * ResetLatch - Clears the latch, allowing it to be set again + * WaitLatch - Waits for the latch to become set + * + * The correct pattern to wait for an event is: + * + * for (;;) + * { + * ResetLatch(); + * if (work to do) + * Do Stuff(); + * + * WaitLatch(); + * } + * + * It's important to reset the latch *before* checking if there's work to + * do. Otherwise, if someone sets the latch between the check and the + * ResetLatch call, you will miss it and Wait will block. + * + * To wake up the waiter, you must first set a global flag or something + * else that the main loop tests in the "if (work to do)" part, and call + * SetLatch *after* that. SetLatch is designed to return quickly if the + * latch is already set. + * + * + * Implementation + * -------------- + * + * The Unix implementation uses the so-called self-pipe trick to overcome + * the race condition involved with select() and setting a global flag + * in the signal handler. When a latch is set and the current process + * is waiting for it, the signal handler wakes up the select() in + * WaitLatch by writing a byte to a pipe. A signal by itself doesn't + * interrupt select() on all platforms, and even on platforms where it + * does, a signal that arrives just before the select() call does not + * prevent the select() from entering sleep. An incoming byte on a pipe + * however reliably interrupts the sleep, and makes select() to return + * immediately if the signal arrives just before select() begins. + * + * When SetLatch is called from the same process that owns the latch, + * SetLatch writes the byte directly to the pipe. If it's owned by another + * process, SIGUSR1 is sent and the signal handler in the waiting process + * writes the byte to the pipe on behalf of the signaling process. + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/port/unix_latch.c,v 1.1 2010/09/11 15:48:04 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include +#include + +#include "miscadmin.h" +#include "storage/latch.h" +#include "storage/shmem.h" + +/* Are we currently in WaitLatch? The signal handler would like to know. */ +static volatile sig_atomic_t waiting = false; + +/* Read and write end of the self-pipe */ +static int selfpipe_readfd = -1; +static int selfpipe_writefd = -1; + +/* private function prototypes */ +static void initSelfPipe(void); +static void drainSelfPipe(void); +static void sendSelfPipeByte(void); + + +/* + * Initialize a backend-local latch. + */ +void +InitLatch(volatile Latch *latch) +{ + /* Initialize the self pipe if this is our first latch in the process */ + if (selfpipe_readfd == -1) + initSelfPipe(); + + latch->is_set = false; + latch->owner_pid = MyProcPid; + latch->is_shared = false; +} + +/* + * Initialize a shared latch that can be set from other processes. The latch + * is initially owned by no-one, use OwnLatch to associate it with the + * current process. + * + * NB: When you introduce a new shared latch, you must increase the shared + * latch count in NumSharedLatches in win32_latch.c! + */ +void +InitSharedLatch(volatile Latch *latch) +{ + latch->is_set = false; + latch->owner_pid = 0; + latch->is_shared = true; +} + +/* + * Associate a shared latch with the current process, allowing it to + * wait on it. + * + * Make sure that latch_sigusr1_handler() is called from the SIGUSR1 signal + * handler, as shared latches use SIGUSR1 to for inter-process communication. + */ +void +OwnLatch(volatile Latch *latch) +{ + Assert(latch->is_shared); + + /* Initialize the self pipe if this is our first latch in the process */ + if (selfpipe_readfd == -1) + initSelfPipe(); + + if (latch->owner_pid != 0) + elog(ERROR, "latch already owned"); + latch->owner_pid = MyProcPid; +} + +/* + * Disown a shared latch currently owned by the current process. + */ +void +DisownLatch(volatile Latch *latch) +{ + Assert(latch->is_shared); + Assert(latch->owner_pid == MyProcPid); + latch->owner_pid = 0; +} + +/* + * Wait for given latch to be set or until timeout is exceeded. + * If the latch is already set, the function returns immediately. + * + * The 'timeout' is given in microseconds, and -1 means wait forever. + * On some platforms, signals cause the timeout to be restarted, so beware + * that the function can sleep for several times longer than the specified + * timeout. + * + * The latch must be owned by the current process, ie. it must be a + * backend-local latch initialized with InitLatch, or a shared latch + * associated with the current process by calling OwnLatch. + * + * Returns 'true' if the latch was set, or 'false' if timeout was reached. + */ +bool +WaitLatch(volatile Latch *latch, long timeout) +{ + return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0; +} + +/* + * Like WaitLatch, but will also return when there's data available in + * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch + * was set, or 2 if the scoket became readable. + */ +int +WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout) +{ + struct timeval tv, *tvp = NULL; + fd_set input_mask; + int rc; + int result = 0; + + if (latch->owner_pid != MyProcPid) + elog(ERROR, "cannot wait on a latch owned by another process"); + + /* Initialize timeout */ + if (timeout >= 0) + { + tv.tv_sec = timeout / 1000000L; + tv.tv_usec = timeout % 1000000L; + tvp = &tv; + } + + waiting = true; + for (;;) + { + int hifd; + + /* + * Clear the pipe, and check if the latch is set already. If someone + * sets the latch between this and the select() below, the setter + * will write a byte to the pipe (or signal us and the signal handler + * will do that), and the select() will return immediately. + */ + drainSelfPipe(); + if (latch->is_set) + { + result = 1; + break; + } + + FD_ZERO(&input_mask); + FD_SET(selfpipe_readfd, &input_mask); + hifd = selfpipe_readfd; + if (sock != PGINVALID_SOCKET) + { + FD_SET(sock, &input_mask); + if (sock > hifd) + hifd = sock; + } + + rc = select(hifd + 1, &input_mask, NULL, NULL, tvp); + if (rc < 0) + { + if (errno == EINTR) + continue; + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("select() failed: %m"))); + } + if (rc == 0) + { + /* timeout exceeded */ + result = 0; + break; + } + if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask)) + { + result = 2; + break; /* data available in socket */ + } + } + waiting = false; + + return result; +} + +/* + * Sets a latch and wakes up anyone waiting on it. Returns quickly if the + * latch is already set. + */ +void +SetLatch(volatile Latch *latch) +{ + pid_t owner_pid; + + /* Quick exit if already set */ + if (latch->is_set) + return; + + latch->is_set = true; + + /* + * See if anyone's waiting for the latch. It can be the current process + * if we're in a signal handler. We use the self-pipe to wake up the + * select() in that case. If it's another process, send a signal. + * + * Fetch owner_pid only once, in case the owner simultaneously disowns + * the latch and clears owner_pid. XXX: This assumes that pid_t is + * atomic, which isn't guaranteed to be true! In practice, the effective + * range of pid_t fits in a 32 bit integer, and so should be atomic. In + * the worst case, we might end up signaling wrong process if the right + * one disowns the latch just as we fetch owner_pid. Even then, you're + * very unlucky if a process with that bogus pid exists. + */ + owner_pid = latch->owner_pid; + if (owner_pid == 0) + return; + else if (owner_pid == MyProcPid) + sendSelfPipeByte(); + else + kill(owner_pid, SIGUSR1); +} + +/* + * Clear the latch. Calling WaitLatch after this will sleep, unless + * the latch is set again before the WaitLatch call. + */ +void +ResetLatch(volatile Latch *latch) +{ + /* Only the owner should reset the latch */ + Assert(latch->owner_pid == MyProcPid); + + latch->is_set = false; +} + +/* + * LatchShmemSize + * Compute space needed for latch's shared memory + * + * Not needed for Unix implementation. + */ +Size +LatchShmemSize(void) +{ + return 0; +} + +/* + * LatchShmemInit + * Allocate and initialize shared memory needed for latches + * + * Not needed for Unix implementation. + */ +void +LatchShmemInit(void) +{ +} + +/* + * SetLatch uses SIGUSR1 to wake up the process waiting on the latch. Wake + * up WaitLatch. + */ +void +latch_sigusr1_handler(void) +{ + if (waiting) + sendSelfPipeByte(); +} + +/* initialize the self-pipe */ +static void +initSelfPipe(void) +{ + int pipefd[2]; + + /* + * Set up the self-pipe that allows a signal handler to wake up the + * select() in WaitLatch. Make the write-end non-blocking, so that + * SetLatch won't block if the event has already been set many times + * filling the kernel buffer. Make the read-end non-blocking too, so + * that we can easily clear the pipe by reading until EAGAIN or + * EWOULDBLOCK. + */ + if (pipe(pipefd) < 0) + elog(FATAL, "pipe() failed: %m"); + if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0) + elog(FATAL, "fcntl() failed on read-end of self-pipe: %m"); + if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0) + elog(FATAL, "fcntl() failed on write-end of self-pipe: %m"); + + selfpipe_readfd = pipefd[0]; + selfpipe_writefd = pipefd[1]; +} + +/* Send one byte to the self-pipe, to wake up WaitLatch */ +static void +sendSelfPipeByte(void) +{ + int rc; + char dummy = 0; + +retry: + rc = write(selfpipe_writefd, &dummy, 1); + if (rc < 0) + { + /* If interrupted by signal, just retry */ + if (errno == EINTR) + goto retry; + + /* + * If the pipe is full, we don't need to retry, the data that's + * there already is enough to wake up WaitLatch. + */ + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + + /* + * Oops, the write() failed for some other reason. We might be in + * a signal handler, so it's not safe to elog(). We have no choice + * but silently ignore the error. + */ + return; + } +} + +/* Read all available data from the self-pipe */ +static void +drainSelfPipe(void) +{ + /* + * There shouldn't normally be more than one byte in the pipe, or maybe + * a few more if multiple processes run SetLatch at the same instant. + */ + char buf[16]; + int rc; + + for (;;) + { + rc = read(selfpipe_readfd, buf, sizeof(buf)); + if (rc < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; /* the pipe is empty */ + else if (errno == EINTR) + continue; /* retry */ + else + elog(ERROR, "read() on self-pipe failed: %m"); + } + else if (rc == 0) + elog(ERROR, "unexpected EOF on self-pipe"); + } +} diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c new file mode 100644 index 0000000000..f6d4920e46 --- /dev/null +++ b/src/backend/port/win32_latch.c @@ -0,0 +1,284 @@ +/*------------------------------------------------------------------------- + * + * win32_latch.c + * Windows implementation of latches. + * + * The Windows implementation uses Windows events. See unix_latch.c for + * information on usage. + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/port/win32_latch.c,v 1.1 2010/09/11 15:48:04 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include +#include + +#include "miscadmin.h" +#include "replication/walsender.h" +#include "storage/latch.h" +#include "storage/shmem.h" +#include "storage/spin.h" + +/* + * Shared latches are implemented with Windows events that are shared by + * all postmaster child processes. At postmaster startup we create enough + * Event objects, and mark them as inheritable so that they are accessible + * in child processes. The handles are stored in sharedHandles. + */ +typedef struct +{ + slock_t mutex; /* protects all the other fields */ + + int maxhandles; /* number of shared handles created */ + int nfreehandles; /* number of free handles in array */ + HANDLE handles[1]; /* free handles, variable length */ +} SharedEventHandles; + +static SharedEventHandles *sharedHandles; + +void +InitLatch(volatile Latch *latch) +{ + latch->event = CreateEvent(NULL, TRUE, FALSE, NULL); + latch->is_shared = false; + latch->is_set = false; +} + +void +InitSharedLatch(volatile Latch *latch) +{ + latch->is_shared = true; + latch->is_set = false; + latch->event = NULL; +} + +void +OwnLatch(volatile Latch *latch) +{ + HANDLE event; + + /* Sanity checks */ + Assert(latch->is_shared); + if (latch->event != 0) + elog(ERROR, "latch already owned"); + + /* Reserve an event handle from the shared handles array */ + SpinLockAcquire(&sharedHandles->mutex); + if (sharedHandles->nfreehandles <= 0) + { + SpinLockRelease(&sharedHandles->mutex); + elog(ERROR, "out of shared event objects"); + } + sharedHandles->nfreehandles--; + event = sharedHandles->handles[sharedHandles->nfreehandles]; + SpinLockRelease(&sharedHandles->mutex); + + latch->event = event; +} + +void +DisownLatch(volatile Latch *latch) +{ + Assert(latch->is_shared); + Assert(latch->event != NULL); + + /* Put the event handle back to the pool */ + SpinLockAcquire(&sharedHandles->mutex); + if (sharedHandles->nfreehandles >= sharedHandles->maxhandles) + { + SpinLockRelease(&sharedHandles->mutex); + elog(PANIC, "too many free event handles"); + } + sharedHandles->handles[sharedHandles->nfreehandles] = latch->event; + sharedHandles->nfreehandles++; + SpinLockRelease(&sharedHandles->mutex); + + latch->event = NULL; +} + +bool +WaitLatch(volatile Latch *latch, long timeout) +{ + return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0; +} + +int +WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout) +{ + DWORD rc; + HANDLE events[3]; + HANDLE latchevent; + HANDLE sockevent; + int numevents; + int result = 0; + + latchevent = latch->event; + + events[0] = latchevent; + events[1] = pgwin32_signal_event; + numevents = 2; + if (sock != PGINVALID_SOCKET) + { + sockevent = WSACreateEvent(); + WSAEventSelect(sock, sockevent, FD_READ); + events[numevents++] = sockevent; + } + + for (;;) + { + /* + * Reset the event, and check if the latch is set already. If someone + * sets the latch between this and the WaitForMultipleObjects() call + * below, the setter will set the event and WaitForMultipleObjects() + * will return immediately. + */ + if (!ResetEvent(latchevent)) + elog(ERROR, "ResetEvent failed: error code %d", (int) GetLastError()); + if (latch->is_set) + { + result = 1; + break; + } + + rc = WaitForMultipleObjects(numevents, events, FALSE, + (timeout >= 0) ? (timeout / 1000) : INFINITE); + if (rc == WAIT_FAILED) + elog(ERROR, "WaitForMultipleObjects() failed: error code %d", (int) GetLastError()); + else if (rc == WAIT_TIMEOUT) + { + result = 0; + break; + } + else if (rc == WAIT_OBJECT_0 + 1) + pgwin32_dispatch_queued_signals(); + else if (rc == WAIT_OBJECT_0 + 2) + { + Assert(sock != PGINVALID_SOCKET); + result = 2; + break; + } + else if (rc != WAIT_OBJECT_0) + elog(ERROR, "unexpected return code from WaitForMultipleObjects(): %d", rc); + } + + /* Clean up the handle we created for the socket */ + if (sock != PGINVALID_SOCKET) + { + WSAEventSelect(sock, sockevent, 0); + WSACloseEvent(sockevent); + } + + return result; +} + +void +SetLatch(volatile Latch *latch) +{ + HANDLE handle; + + /* Quick exit if already set */ + if (latch->is_set) + return; + + latch->is_set = true; + + /* + * See if anyone's waiting for the latch. It can be the current process + * if we're in a signal handler. Use a local variable here in case the + * latch is just disowned between the test and the SetEvent call, and + * event field set to NULL. + * + * Fetch handle field only once, in case the owner simultaneously + * disowns the latch and clears handle. This assumes that HANDLE is + * atomic, which isn't guaranteed to be true! In practice, it should be, + * and in the worst case we end up calling SetEvent with a bogus handle, + * and SetEvent will return an error with no harm done. + */ + handle = latch->event; + if (handle) + { + SetEvent(handle); + /* + * Note that we silently ignore any errors. We might be in a signal + * handler or other critical path where it's not safe to call elog(). + */ + } +} + +void +ResetLatch(volatile Latch *latch) +{ + latch->is_set = false; +} + +/* + * Number of shared latches, used to allocate the right number of shared + * Event handles at postmaster startup. You must update this if you + * introduce a new shared latch! + */ +static int +NumSharedLatches(void) +{ + int numLatches = 0; + + /* Each walsender needs one latch */ + numLatches += max_wal_senders; + + return numLatches; +} + +/* + * LatchShmemSize + * Compute space needed for latch's shared memory + */ +Size +LatchShmemSize(void) +{ + return offsetof(SharedEventHandles, handles) + + NumSharedLatches() * sizeof(HANDLE); +} + +/* + * LatchShmemInit + * Allocate and initialize shared memory needed for latches + */ +void +LatchShmemInit(void) +{ + Size size = LatchShmemSize(); + bool found; + + sharedHandles = ShmemInitStruct("SharedEventHandles", size, &found); + + /* If we're first, initialize the struct and allocate handles */ + if (!found) + { + int i; + SECURITY_ATTRIBUTES sa; + + /* + * Set up security attributes to specify that the events are + * inherited. + */ + ZeroMemory(&sa, sizeof(sa)); + sa.nLength = sizeof(sa); + sa.bInheritHandle = TRUE; + + SpinLockInit(&sharedHandles->mutex); + sharedHandles->maxhandles = NumSharedLatches(); + sharedHandles->nfreehandles = sharedHandles->maxhandles; + for (i = 0; i < sharedHandles->maxhandles; i++) + { + sharedHandles->handles[i] = CreateEvent(&sa, TRUE, FALSE, NULL); + if (sharedHandles->handles[i] == NULL) + elog(ERROR, "CreateEvent failed: error code %d", (int) GetLastError()); + } + } +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9bfb3de15e..2ef90497bb 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -28,12 +28,13 @@ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.29 2010/07/22 13:03:11 rhaas Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.30 2010/09/11 15:48:04 heikki Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" +#include #include #include "access/xlog_internal.h" @@ -66,9 +67,6 @@ bool am_walsender = false; /* Am I a walsender process ? */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int WalSndDelay = 200; /* max sleep time between some actions */ -#define NAPTIME_PER_CYCLE 100000L /* max sleep time between cycles - * (100ms) */ - /* * These variables are used similarly to openLogFile/Id/Seg/Off, * but for walsender to read the XLOG. @@ -93,6 +91,7 @@ static volatile sig_atomic_t ready_to_stop = false; static void WalSndSigHupHandler(SIGNAL_ARGS); static void WalSndShutdownHandler(SIGNAL_ARGS); static void WalSndQuickDieHandler(SIGNAL_ARGS); +static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ @@ -144,6 +143,16 @@ WalSenderMain(void) /* Handle handshake messages before streaming */ WalSndHandshake(); + /* Initialize shared memory status */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } + /* Main loop of walsender */ return WalSndLoop(); } @@ -380,8 +389,6 @@ WalSndLoop(void) /* Loop forever, unless we get an error */ for (;;) { - long remain; /* remaining time (us) */ - /* * Emergency bailout if postmaster has died. This is to avoid the * necessity for manual cleanup of all postmaster children. @@ -421,32 +428,42 @@ WalSndLoop(void) /* * If we had sent all accumulated WAL in last round, nap for the * configured time before retrying. - * - * On some platforms, signals won't interrupt the sleep. To ensure we - * respond reasonably promptly when someone signals us, break down the - * sleep into NAPTIME_PER_CYCLE increments, and check for interrupts - * after each nap. */ if (caughtup) { - remain = WalSndDelay * 1000L; - while (remain > 0) + /* + * Even if we wrote all the WAL that was available when we started + * sending, more might have arrived while we were sending this + * batch. We had the latch set while sending, so we have not + * received any signals from that time. Let's arm the latch + * again, and after that check that we're still up-to-date. + */ + ResetLatch(&MyWalSnd->latch); + + if (!XLogSend(output_message, &caughtup)) + break; + if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested) { - /* Check for interrupts */ - if (got_SIGHUP || shutdown_requested || ready_to_stop) - break; + /* + * XXX: We don't really need the periodic wakeups anymore, + * WaitLatchOrSocket should reliably wake up as soon as + * something interesting happens. + */ - /* Sleep and check that the connection is still alive */ - pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain); - CheckClosedConnection(); - - remain -= NAPTIME_PER_CYCLE; + /* Sleep */ + WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, + WalSndDelay); } - } - /* Attempt to send the log once every loop */ - if (!XLogSend(output_message, &caughtup)) - break; + /* Check if the connection was closed */ + CheckClosedConnection(); + } + else + { + /* Attempt to send the log once every loop */ + if (!XLogSend(output_message, &caughtup)) + break; + } } /* @@ -493,10 +510,15 @@ InitWalSnd(void) } else { - /* found */ - MyWalSnd = (WalSnd *) walsnd; + /* + * Found a free slot. Take ownership of the latch and initialize + * the other fields. + */ + OwnLatch((Latch *) &walsnd->latch); walsnd->pid = MyProcPid; - MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr)); + MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr)); + /* Set MyWalSnd only after it's fully initialized. */ + MyWalSnd = (WalSnd *) walsnd; SpinLockRelease(&walsnd->mutex); break; } @@ -523,6 +545,7 @@ WalSndKill(int code, Datum arg) * for this. */ MyWalSnd->pid = 0; + DisownLatch(&MyWalSnd->latch); /* WalSnd struct isn't mine anymore */ MyWalSnd = NULL; @@ -787,6 +810,8 @@ static void WalSndSigHupHandler(SIGNAL_ARGS) { got_SIGHUP = true; + if (MyWalSnd) + SetLatch(&MyWalSnd->latch); } /* SIGTERM: set flag to shut down */ @@ -794,6 +819,8 @@ static void WalSndShutdownHandler(SIGNAL_ARGS) { shutdown_requested = true; + if (MyWalSnd) + SetLatch(&MyWalSnd->latch); } /* @@ -828,11 +855,20 @@ WalSndQuickDieHandler(SIGNAL_ARGS) exit(2); } +/* SIGUSR1: set flag to send WAL records */ +static void +WalSndXLogSendHandler(SIGNAL_ARGS) +{ + latch_sigusr1_handler(); +} + /* SIGUSR2: set flag to do a last cycle and shut down afterwards */ static void WalSndLastCycleHandler(SIGNAL_ARGS) { ready_to_stop = true; + if (MyWalSnd) + SetLatch(&MyWalSnd->latch); } /* Set up signal handlers */ @@ -847,7 +883,7 @@ WalSndSignals(void) pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */ pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, SIG_IGN); /* not used */ + pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and * shutdown */ @@ -891,10 +927,21 @@ WalSndShmemInit(void) WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockInit(&walsnd->mutex); + InitSharedLatch(&walsnd->latch); } } } +/* Wake up all walsenders */ +void +WalSndWakeup(void) +{ + int i; + + for (i = 0; i < max_wal_senders; i++) + SetLatch(&WalSndCtl->walsnds[i].latch); +} + /* * This isn't currently used for anything. Monitoring tools might be * interested in the future, and we'll need something like this in the diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index b4923d278a..881dae830c 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.104 2010/02/16 22:34:50 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.105 2010/09/11 15:48:04 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -30,6 +30,7 @@ #include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/ipc.h" +#include "storage/latch.h" #include "storage/pg_shmem.h" #include "storage/pmsignal.h" #include "storage/procarray.h" @@ -117,6 +118,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, SInvalShmemSize()); size = add_size(size, PMSignalShmemSize()); size = add_size(size, ProcSignalShmemSize()); + size = add_size(size, LatchShmemSize()); size = add_size(size, BgWriterShmemSize()); size = add_size(size, AutoVacuumShmemSize()); size = add_size(size, WalSndShmemSize()); @@ -217,6 +219,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) */ PMSignalShmemInit(); ProcSignalShmemInit(); + LatchShmemInit(); BgWriterShmemInit(); AutoVacuumShmemInit(); WalSndShmemInit(); diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 5b66dfb320..0b8842a466 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/procsignal.c,v 1.7 2010/08/30 06:33:22 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/procsignal.c,v 1.8 2010/09/11 15:48:04 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -21,6 +21,7 @@ #include "commands/async.h" #include "miscadmin.h" #include "storage/ipc.h" +#include "storage/latch.h" #include "storage/procsignal.h" #include "storage/shmem.h" #include "storage/sinval.h" @@ -278,5 +279,7 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + latch_sigusr1_handler(); + errno = save_errno; } diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 891fea57bc..87e01207c1 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -5,7 +5,7 @@ * * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/include/replication/walsender.h,v 1.4 2010/06/17 00:06:34 itagaki Exp $ + * $PostgreSQL: pgsql/src/include/replication/walsender.h,v 1.5 2010/09/11 15:48:04 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -13,6 +13,7 @@ #define _WALSENDER_H #include "access/xlog.h" +#include "storage/latch.h" #include "storage/spin.h" /* @@ -24,6 +25,12 @@ typedef struct WalSnd XLogRecPtr sentPtr; /* WAL has been sent up to this point */ slock_t mutex; /* locks shared variables shown above */ + + /* + * Latch used by backends to wake up this walsender when it has work + * to do. + */ + Latch latch; } WalSnd; /* There is one WalSndCtl struct for the whole database cluster */ @@ -45,5 +52,6 @@ extern int WalSenderMain(void); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); +extern void WalSndWakeup(void); #endif /* _WALSENDER_H */ diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h new file mode 100644 index 0000000000..2c697741e4 --- /dev/null +++ b/src/include/storage/latch.h @@ -0,0 +1,62 @@ +/*------------------------------------------------------------------------- + * + * latch.h + * Routines for interprocess latches + * + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * $PostgreSQL: pgsql/src/include/storage/latch.h,v 1.1 2010/09/11 15:48:04 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#ifndef LATCH_H +#define LATCH_H + +#include + +/* + * Latch structure should be treated as opaque and only accessed through + * the public functions. It is defined here to allow embedding Latches as + * part of bigger structs. + */ +typedef struct +{ + sig_atomic_t is_set; + bool is_shared; +#ifndef WIN32 + int owner_pid; +#else + HANDLE event; +#endif +} Latch; + +/* + * prototypes for functions in latch.c + */ +extern void InitLatch(volatile Latch *latch); +extern void InitSharedLatch(volatile Latch *latch); +extern void OwnLatch(volatile Latch *latch); +extern void DisownLatch(volatile Latch *latch); +extern bool WaitLatch(volatile Latch *latch, long timeout); +extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, + long timeout); +extern void SetLatch(volatile Latch *latch); +extern void ResetLatch(volatile Latch *latch); +#define TestLatch(latch) (((volatile Latch *) latch)->is_set) + +extern Size LatchShmemSize(void); +extern void LatchShmemInit(void); + +/* + * Unix implementation uses SIGUSR1 for inter-process signaling, Win32 doesn't + * need this. + */ +#ifndef WIN32 +extern void latch_sigusr1_handler(void); +#else +#define latch_sigusr1_handler() +#endif + +#endif /* LATCH_H */ diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 72c58751d0..201fd7aa74 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -3,7 +3,7 @@ package Mkvcbuild; # # Package that generates build files for msvc build # -# $PostgreSQL: pgsql/src/tools/msvc/Mkvcbuild.pm,v 1.59 2010/07/02 23:25:27 adunstan Exp $ +# $PostgreSQL: pgsql/src/tools/msvc/Mkvcbuild.pm,v 1.60 2010/09/11 15:48:04 heikki Exp $ # use Carp; use Win32; @@ -64,6 +64,7 @@ sub mkvcbuild $postgres->ReplaceFile('src\backend\port\dynloader.c','src\backend\port\dynloader\win32.c'); $postgres->ReplaceFile('src\backend\port\pg_sema.c','src\backend\port\win32_sema.c'); $postgres->ReplaceFile('src\backend\port\pg_shmem.c','src\backend\port\win32_shmem.c'); + $postgres->ReplaceFile('src\backend\port\pg_latch.c','src\backend\port\win32_latch.c'); $postgres->AddFiles('src\port',@pgportfiles); $postgres->AddDir('src\timezone'); $postgres->AddFiles('src\backend\parser','scan.l','gram.y');