From ec9037df2634ddcd6a3b036463722c8ee009b132 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Tue, 14 Jan 2014 12:23:22 -0500 Subject: [PATCH] Single-reader, single-writer, lightweight shared message queue. This code provides infrastructure for user backends to communicate relatively easily with background workers. The message queue is structured as a ring buffer and allows messages of arbitary length to be sent and received. Patch by me. Review by KaiGai Kohei and Andres Freund. --- src/backend/storage/ipc/Makefile | 2 +- src/backend/storage/ipc/shm_mq.c | 945 +++++++++++++++++++++++++++++++ src/include/storage/shm_mq.h | 70 +++ 3 files changed, 1016 insertions(+), 1 deletion(-) create mode 100644 src/backend/storage/ipc/shm_mq.c create mode 100644 src/include/storage/shm_mq.h diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile index df0a49ed6c..850347c367 100644 --- a/src/backend/storage/ipc/Makefile +++ b/src/backend/storage/ipc/Makefile @@ -16,6 +16,6 @@ endif endif OBJS = dsm_impl.o dsm.o ipc.o ipci.o pmsignal.o procarray.o procsignal.o \ - shmem.o shmqueue.o shm_toc.o sinval.o sinvaladt.o standby.o + shmem.o shmqueue.o shm_mq.o shm_toc.o sinval.o sinvaladt.o standby.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c new file mode 100644 index 0000000000..2d298a3598 --- /dev/null +++ b/src/backend/storage/ipc/shm_mq.c @@ -0,0 +1,945 @@ +/*------------------------------------------------------------------------- + * + * shm_mq.c + * single-reader, single-writer shared memory message queue + * + * Both the sender and the receiver must have a PGPROC; their respective + * process latches are used for synchronization. Only the sender may send, + * and only the receiver may receive. This is intended to allow a user + * backend to communicate with worker backends that it has registered. + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/shm_mq.h + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "storage/procsignal.h" +#include "storage/shm_mq.h" +#include "storage/spin.h" + +/* + * This structure represents the actual queue, stored in shared memory. + * + * Some notes on synchronization: + * + * mq_receiver and mq_bytes_read can only be changed by the receiver; and + * mq_sender and mq_bytes_written can only be changed by the sender. However, + * because most of these fields are 8 bytes and we don't assume that 8 byte + * reads and writes are atomic, the spinlock must be taken whenever the field + * is updated, and whenever it is read by a process other than the one allowed + * to modify it. But the process that is allowed to modify it is also allowed + * to read it without the lock. On architectures where 8-byte writes are + * atomic, we could replace these spinlocks with memory barriers, but + * testing found no performance benefit, so it seems best to keep things + * simple for now. + * + * mq_detached can be set by either the sender or the receiver, so the mutex + * must be held to read or write it. Memory barriers could be used here as + * well, if needed. + * + * mq_ring_size and mq_ring_offset never change after initialization, and + * can therefore be read without the lock. + * + * Importantly, mq_ring can be safely read and written without a lock. Were + * this not the case, we'd have to hold the spinlock for much longer + * intervals, and performance might suffer. Fortunately, that's not + * necessary. At any given time, the difference between mq_bytes_read and + * mq_bytes_written defines the number of bytes within mq_ring that contain + * unread data, and mq_bytes_read defines the position where those bytes + * begin. The sender can increase the number of unread bytes at any time, + * but only the receiver can give license to overwrite those bytes, by + * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read + * the unread bytes it knows to be present without the lock. Conversely, + * the sender can write to the unused portion of the ring buffer without + * the lock, because nobody else can be reading or writing those bytes. The + * receiver could be making more bytes unused by incrementing mq_bytes_read, + * but that's OK. Note that it would be unsafe for the receiver to read any + * data it's already marked as read, or to write any data; and it would be + * unsafe for the sender to reread any data after incrementing + * mq_bytes_written, but fortunately there's no need for any of that. + */ +struct shm_mq +{ + slock_t mq_mutex; + PGPROC *mq_receiver; + PGPROC *mq_sender; + uint64 mq_bytes_read; + uint64 mq_bytes_written; + uint64 mq_ring_size; + bool mq_detached; + uint8 mq_ring_offset; + char mq_ring[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* + * This structure is a backend-private handle for access to a queue. + * + * mqh_queue is a pointer to the queue we've attached, and mqh_segment is + * a pointer to the dynamic shared memory segment that contains it. + * + * If this queue is intended to connect the current process with a background + * worker that started it, the user can pass a pointer to the worker handle + * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this + * is to allow us to begin sending to or receiving from that queue before the + * process we'll be communicating with has even been started. If it fails + * to start, the handle will allow us to notice that and fail cleanly, rather + * than waiting forever; see shm_mq_wait_internal. This is mostly useful in + * simple cases - e.g. where there are just 2 processes communicating; in + * more complex scenarios, every process may not have a BackgroundWorkerHandle + * available, or may need to watch for the failure of more than one other + * process at a time. + * + * When a message exists as a contiguous chunk of bytes in the queue - that is, + * it is smaller than the size of the ring buffer and does not wrap around + * the end - we return the message to the caller as a pointer into the buffer. + * For messages that are larger or happen to wrap, we reassemble the message + * locally by copying the chunks into a backend-local buffer. mqh_buffer is + * the buffer, and mqh_buflen is the number of bytes allocated for it. + * + * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_did_length_word + * are used to track the state of non-blocking operations. When the caller + * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they + * are expected to retry the call at a later time with the same argument; + * we need to retain enough state to pick up where we left off. + * mqh_did_length_word tracks whether we read or wrote the length word, + * mqh_partial_message_bytes tracks the number of payload bytes read or + * written, and mqh_expected_bytes - which is used only for reads - tracks + * the expected total size of the payload. + * + * mqh_counterparty_attached tracks whether we know the counterparty to have + * attached to the queue at some previous point. This lets us avoid some + * mutex acquisitions. + * + * mqh_context is the memory context in effect at the time we attached to + * the shm_mq. The shm_mq_handle itself is allocated in this context, and + * we make sure any other allocations we do happen in this context as well, + * to avoid nasty surprises. + */ +struct shm_mq_handle +{ + shm_mq *mqh_queue; + dsm_segment *mqh_segment; + BackgroundWorkerHandle *mqh_handle; + char *mqh_buffer; + uint64 mqh_buflen; + uint64 mqh_consume_pending; + uint64 mqh_partial_message_bytes; + uint64 mqh_expected_bytes; + bool mqh_did_length_word; + bool mqh_counterparty_attached; + MemoryContext mqh_context; +}; + +static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, uint64 nbytes, + void *data, bool nowait, uint64 *bytes_written); +static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, + bool nowait, uint64 *nbytesp, void **datap); +static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr, + BackgroundWorkerHandle *handle); +static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached); +static void shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n); +static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached); +static void shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n); +static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq); +static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); + +/* Minimum queue size is enough for header and at least one chunk of data. */ +const Size shm_mq_minimum_size = + MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF; + +#define MQH_INITIAL_BUFSIZE 8192 + +/* + * Initialize a new shared message queue. + */ +shm_mq * +shm_mq_create(void *address, Size size) +{ + shm_mq *mq = address; + uint64 data_offset = MAXALIGN(offsetof(shm_mq, mq_ring)); + + /* If the size isn't MAXALIGN'd, just discard the odd bytes. */ + size = MAXALIGN_DOWN(size); + + /* Queue size must be large enough to hold some data. */ + Assert(size > data_offset); + + /* Initialize queue header. */ + SpinLockInit(&mq->mq_mutex); + mq->mq_receiver = NULL; + mq->mq_sender = NULL; + mq->mq_bytes_read = 0; + mq->mq_bytes_written = 0; + mq->mq_ring_size = size - data_offset; + mq->mq_detached = false; + mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring); + + return mq; +} + +/* + * Set the identity of the process that will receive from a shared message + * queue. + */ +void +shm_mq_set_receiver(shm_mq *mq, PGPROC *proc) +{ + volatile shm_mq *vmq = mq; + PGPROC *sender; + + SpinLockAcquire(&mq->mq_mutex); + Assert(vmq->mq_receiver == NULL); + vmq->mq_receiver = proc; + sender = vmq->mq_sender; + SpinLockRelease(&mq->mq_mutex); + + if (sender != NULL) + SetLatch(&sender->procLatch); +} + +/* + * Set the identity of the process that will send to a shared message queue. + */ +void +shm_mq_set_sender(shm_mq *mq, PGPROC *proc) +{ + volatile shm_mq *vmq = mq; + PGPROC *receiver; + + SpinLockAcquire(&mq->mq_mutex); + Assert(vmq->mq_sender == NULL); + vmq->mq_sender = proc; + receiver = vmq->mq_receiver; + SpinLockRelease(&mq->mq_mutex); + + if (receiver != NULL) + SetLatch(&receiver->procLatch); +} + +/* + * Get the configured receiver. + */ +PGPROC * +shm_mq_get_receiver(shm_mq *mq) +{ + volatile shm_mq *vmq = mq; + PGPROC *receiver; + + SpinLockAcquire(&mq->mq_mutex); + receiver = vmq->mq_receiver; + SpinLockRelease(&mq->mq_mutex); + + return receiver; +} + +/* + * Get the configured sender. + */ +PGPROC * +shm_mq_get_sender(shm_mq *mq) +{ + volatile shm_mq *vmq = mq; + PGPROC *sender; + + SpinLockAcquire(&mq->mq_mutex); + sender = vmq->mq_sender; + SpinLockRelease(&mq->mq_mutex); + + return sender; +} + +/* + * Attach to a shared message queue so we can send or receive messages. + * + * The memory context in effect at the time this function is called should + * be one which will last for at least as long as the message queue itself. + * We'll allocate the handle in that context, and future allocations that + * are needed to buffer incoming data will happen in that context as well. + * + * If seg != NULL, the queue will be automatically detached when that dynamic + * shared memory segment is detached. + * + * If handle != NULL, the queue can be read or written even before the + * other process has attached. We'll wait for it to do so if needed. The + * handle must be for a background worker initialized with bgw_notify_pid + * equal to our PID. + * + * shm_mq_detach() should be called when done. This will free the + * shm_mq_handle and mark the queue itself as detached, so that our + * counterpart won't get stuck waiting for us to fill or drain the queue + * after we've already lost interest. + */ +shm_mq_handle * +shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) +{ + shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle)); + + Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc); + mqh->mqh_queue = mq; + mqh->mqh_segment = seg; + mqh->mqh_buffer = NULL; + mqh->mqh_handle = handle; + mqh->mqh_buflen = 0; + mqh->mqh_consume_pending = 0; + mqh->mqh_context = CurrentMemoryContext; + mqh->mqh_partial_message_bytes = 0; + mqh->mqh_did_length_word = false; + mqh->mqh_counterparty_attached = false; + + if (seg != NULL) + on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq)); + + return mqh; +} + +/* + * Write a message into a shared message queue. + * + * When nowait = false, we'll wait on our process latch when the ring buffer + * fills up, and then continue writing once the receiver has drained some data. + * The process latch is reset after each wait. + * + * When nowait = true, we do not manipulate the state of the process latch; + * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In + * this case, the caller should call this function again, with the same + * arguments, each time the process latch is set. (Once begun, the sending + * of a message cannot be aborted except by detaching from the queue; changing + * the length or payload will corrupt the queue.) + */ +shm_mq_result +shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait) +{ + shm_mq_result res; + shm_mq *mq = mqh->mqh_queue; + uint64 bytes_written; + + Assert(mq->mq_sender == MyProc); + + /* Write the message length into the buffer. */ + if (!mqh->mqh_did_length_word) + { + res = shm_mq_send_bytes(mqh, sizeof(uint64), &nbytes, nowait, + &bytes_written); + if (res != SHM_MQ_SUCCESS) + return res; + + /* + * We're sure to have sent the length in full, since we always + * write a MAXALIGN'd chunk. + */ + Assert(bytes_written == MAXALIGN64(sizeof(uint64))); + mqh->mqh_did_length_word = true; + } + + /* Write the actual data bytes into the buffer. */ + Assert(mqh->mqh_partial_message_bytes <= nbytes); + res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_message_bytes, + ((char *) data) + mqh->mqh_partial_message_bytes, + nowait, &bytes_written); + if (res == SHM_MQ_WOULD_BLOCK) + mqh->mqh_partial_message_bytes += bytes_written; + else + { + mqh->mqh_partial_message_bytes = 0; + mqh->mqh_did_length_word = false; + } + if (res != SHM_MQ_SUCCESS) + return res; + + /* Notify receiver of the newly-written data, and return. */ + return shm_mq_notify_receiver(mq); +} + +/* + * Receive a message from a shared message queue. + * + * We set *nbytes to the message length and *data to point to the message + * payload. If the entire message exists in the queue as a single, + * contiguous chunk, *data will point directly into shared memory; otherwise, + * it will point to a temporary buffer. This mostly avoids data copying in + * the hoped-for case where messages are short compared to the buffer size, + * while still allowing longer messages. In either case, the return value + * remains valid until the next receive operation is perfomed on the queue. + * + * When nowait = false, we'll wait on our process latch when the ring buffer + * is empty and we have not yet received a full message. The sender will + * set our process latch after more data has been written, and we'll resume + * processing. Each call will therefore return a complete message + * (unless the sender detaches the queue). + * + * When nowait = true, we do not manipulate the state of the process latch; + * instead, whenever the buffer is empty and we need to read from it, we + * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this + * function again after the process latch has been set. + */ +shm_mq_result +shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) +{ + shm_mq *mq = mqh->mqh_queue; + shm_mq_result res; + uint64 rb = 0; + uint64 nbytes; + uint64 needed; + void *rawdata; + + Assert(mq->mq_receiver == MyProc); + + /* We can't receive data until the sender has attached. */ + if (!mqh->mqh_counterparty_attached) + { + if (nowait) + { + if (shm_mq_get_sender(mq) == NULL) + return SHM_MQ_WOULD_BLOCK; + } + else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)) + { + mq->mq_detached = true; + return SHM_MQ_DETACHED; + } + mqh->mqh_counterparty_attached = true; + } + + /* Consume any zero-copy data from previous receive operation. */ + if (mqh->mqh_consume_pending > 0) + { + shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending); + mqh->mqh_consume_pending = 0; + } + + /* Determine the message length. */ + if (mqh->mqh_did_length_word) + { + /* We've partially received a message; recall expected length. */ + nbytes = mqh->mqh_expected_bytes; + } + else + { + /* Try to receive the message length word. */ + res = shm_mq_receive_bytes(mq, sizeof(uint64), nowait, &rb, &rawdata); + if (res != SHM_MQ_SUCCESS) + return res; + Assert(rb >= sizeof(uint64)); + memcpy(&nbytes, rawdata, sizeof(uint64)); + mqh->mqh_expected_bytes = nbytes; + + /* If we've already got the whole message, we're done. */ + needed = MAXALIGN64(sizeof(uint64)) + MAXALIGN64(nbytes); + if (rb >= needed) + { + /* + * Technically, we could consume the message length information at + * this point, but the extra write to shared memory wouldn't be + * free and in most cases we would reap no benefit. + */ + mqh->mqh_consume_pending = needed; + *nbytesp = nbytes; + *datap = ((char *) rawdata) + MAXALIGN64(sizeof(uint64)); + return SHM_MQ_SUCCESS; + } + + /* Consume the length word. */ + shm_mq_inc_bytes_read(mq, MAXALIGN64(sizeof(uint64))); + mqh->mqh_did_length_word = true; + rb -= MAXALIGN64(sizeof(uint64)); + } + + if (mqh->mqh_partial_message_bytes == 0) + { + /* + * Try to obtain the whole message in a single chunk. If this works, + * we need not copy the data and can return a pointer directly into + * shared memory. + */ + res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata); + if (res != SHM_MQ_SUCCESS) + return res; + if (rb >= nbytes) + { + mqh->mqh_did_length_word = false; + mqh->mqh_consume_pending = MAXALIGN64(nbytes); + *nbytesp = nbytes; + *datap = rawdata; + return SHM_MQ_SUCCESS; + } + + /* + * The message has wrapped the buffer. We'll need to copy it in order + * to return it to the client in one chunk. First, make sure we have a + * large enough buffer available. + */ + if (mqh->mqh_buflen < nbytes) + { + uint64 newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE); + + while (newbuflen < nbytes) + newbuflen *= 2; + + if (mqh->mqh_buffer != NULL) + { + pfree(mqh->mqh_buffer); + mqh->mqh_buffer = NULL; + mqh->mqh_buflen = 0; + } + mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen); + mqh->mqh_buflen = newbuflen; + } + } + + /* Loop until we've copied the entire message. */ + for (;;) + { + uint64 still_needed; + + /* Copy as much as we can. */ + Assert(mqh->mqh_partial_message_bytes + rb <= nbytes); + memcpy(&mqh->mqh_buffer[mqh->mqh_partial_message_bytes], rawdata, rb); + mqh->mqh_partial_message_bytes += rb; + + /* + * Update count of bytes read, with alignment padding. Note + * that this will never actually insert any padding except at the + * end of a message, because the buffer size is a multiple of + * MAXIMUM_ALIGNOF, and each read and write is as well. + */ + Assert(mqh->mqh_partial_message_bytes == nbytes || + rb == MAXALIGN64(rb)); + shm_mq_inc_bytes_read(mq, MAXALIGN64(rb)); + + /* If we got all the data, exit the loop. */ + if (mqh->mqh_partial_message_bytes >= nbytes) + break; + + /* Wait for some more data. */ + still_needed = nbytes - mqh->mqh_partial_message_bytes; + res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata); + if (res != SHM_MQ_SUCCESS) + return res; + if (rb > still_needed) + rb = still_needed; + } + + /* Return the complete message, and reset for next message. */ + *nbytesp = nbytes; + *datap = mqh->mqh_buffer; + mqh->mqh_did_length_word = false; + mqh->mqh_partial_message_bytes = 0; + return SHM_MQ_SUCCESS; +} + +/* + * Wait for the other process that's supposed to use this queue to attach + * to it. + * + * The return value is SHM_MQ_DETACHED if the worker has already detached or + * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached. + * Note that we will only be able to detect that the worker has died before + * attaching if a background worker handle was passed to shm_mq_attach(). + */ +shm_mq_result +shm_mq_wait_for_attach(shm_mq_handle *mqh) +{ + shm_mq *mq = mqh->mqh_queue; + PGPROC **victim; + + if (shm_mq_get_receiver(mq) == MyProc) + victim = &mq->mq_sender; + else + { + Assert(shm_mq_get_sender(mq) == MyProc); + victim = &mq->mq_receiver; + } + + if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle)) + return SHM_MQ_SUCCESS; + else + return SHM_MQ_DETACHED; +} + +/* + * Detach a shared message queue. + * + * The purpose of this function is to make sure that the process + * with which we're communicating doesn't block forever waiting for us to + * fill or drain the queue once we've lost interest. Whem the sender + * detaches, the receiver can read any messages remaining in the queue; + * further reads will return SHM_MQ_DETACHED. If the receiver detaches, + * further attempts to send messages will likewise return SHM_MQ_DETACHED. + */ +void +shm_mq_detach(shm_mq *mq) +{ + volatile shm_mq *vmq = mq; + PGPROC *victim; + + SpinLockAcquire(&mq->mq_mutex); + if (vmq->mq_sender == MyProc) + victim = vmq->mq_receiver; + else + { + Assert(vmq->mq_receiver == MyProc); + victim = vmq->mq_sender; + } + vmq->mq_detached = true; + SpinLockRelease(&mq->mq_mutex); + + if (victim != NULL) + SetLatch(&victim->procLatch); +} + +/* + * Write bytes into a shared message queue. + */ +static shm_mq_result +shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait, + uint64 *bytes_written) +{ + shm_mq *mq = mqh->mqh_queue; + uint64 sent = 0; + uint64 used; + uint64 ringsize = mq->mq_ring_size; + uint64 available; + + while (sent < nbytes) + { + bool detached; + uint64 rb; + + /* Compute number of ring buffer bytes used and available. */ + rb = shm_mq_get_bytes_read(mq, &detached); + Assert(mq->mq_bytes_written >= rb); + used = mq->mq_bytes_written - rb; + Assert(used <= ringsize); + available = Min(ringsize - used, nbytes - sent); + + /* Bail out if the queue has been detached. */ + if (detached) + return SHM_MQ_DETACHED; + + if (available == 0) + { + shm_mq_result res; + + /* + * The queue is full, so if the receiver isn't yet known to be + * attached, we must wait for that to happen. + */ + if (!mqh->mqh_counterparty_attached) + { + if (nowait) + { + if (shm_mq_get_receiver(mq) == NULL) + return SHM_MQ_WOULD_BLOCK; + } + else if (!shm_mq_wait_internal(mq, &mq->mq_receiver, + mqh->mqh_handle)) + { + mq->mq_detached = true; + return SHM_MQ_DETACHED; + } + mqh->mqh_counterparty_attached = true; + } + + /* Let the receiver know that we need them to read some data. */ + res = shm_mq_notify_receiver(mq); + if (res != SHM_MQ_SUCCESS) + { + *bytes_written = res; + return res; + } + + /* Skip manipulation of our latch if nowait = true. */ + if (nowait) + { + *bytes_written = sent; + return SHM_MQ_WOULD_BLOCK; + } + + /* + * Wait for our latch to be set. It might already be set for + * some unrelated reason, but that'll just result in one extra + * trip through the loop. It's worth it to avoid resetting the + * latch at top of loop, because setting an already-set latch is + * much cheaper than setting one that has been reset. + */ + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + + /* Reset the latch so we don't spin. */ + ResetLatch(&MyProc->procLatch); + } + else + { + uint64 offset = mq->mq_bytes_written % ringsize; + uint64 sendnow = Min(available, ringsize - offset); + + /* Write as much data as we can via a single memcpy(). */ + memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], + (char *) data + sent, sendnow); + sent += sendnow; + + /* + * Update count of bytes written, with alignment padding. Note + * that this will never actually insert any padding except at the + * end of a run of bytes, because the buffer size is a multiple of + * MAXIMUM_ALIGNOF, and each read is as well. + */ + Assert(sent == nbytes || sendnow == MAXALIGN64(sendnow)); + shm_mq_inc_bytes_written(mq, MAXALIGN64(sendnow)); + + /* + * For efficiency, we don't set the reader's latch here. We'll + * do that only when the buffer fills up or after writing an + * entire message. + */ + } + } + + *bytes_written = sent; + return SHM_MQ_SUCCESS; +} + +/* + * Wait until at least *nbytesp bytes are available to be read from the + * shared message queue, or until the buffer wraps around. On return, + * *datap is set to the location at which data bytes can be read. The + * return value is the number of bytes available to be read starting at + * that offset; if the message has wrapped the buffer, it may be less than + * bytes_needed. + */ +static shm_mq_result +shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, bool nowait, + uint64 *nbytesp, void **datap) +{ + uint64 used; + uint64 ringsize = mq->mq_ring_size; + uint64 written; + + for (;;) + { + uint64 offset; + bool detached; + + /* Get bytes written, so we can compute what's available to read. */ + written = shm_mq_get_bytes_written(mq, &detached); + used = written - mq->mq_bytes_read; + Assert(used <= ringsize); + offset = mq->mq_bytes_read % ringsize; + + /* If we have enough data or buffer has wrapped, we're done. */ + if (used >= bytes_needed || offset + used >= ringsize) + { + *nbytesp = Min(used, ringsize - offset); + *datap = &mq->mq_ring[mq->mq_ring_offset + offset]; + return SHM_MQ_SUCCESS; + } + + /* + * Fall out before waiting if the queue has been detached. + * + * Note that we don't check for this until *after* considering + * whether the data already available is enough, since the + * receiver can finish receiving a message stored in the buffer + * even after the sender has detached. + */ + if (detached) + return SHM_MQ_DETACHED; + + /* Skip manipulation of our latch if nowait = true. */ + if (nowait) + return SHM_MQ_WOULD_BLOCK; + + /* + * Wait for our latch to be set. It might already be set for + * some unrelated reason, but that'll just result in one extra + * trip through the loop. It's worth it to avoid resetting the + * latch at top of loop, because setting an already-set latch is + * much cheaper than setting one that has been reset. + */ + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + + /* Reset the latch so we don't spin. */ + ResetLatch(&MyProc->procLatch); + } +} + +/* + * This is used when a process is waiting for its counterpart to attach to the + * queue. We exit when the other process attaches as expected, or, if + * handle != NULL, when the referenced background process or the postmaster + * dies. Note that if handle == NULL, and the process fails to attach, we'll + * potentially get stuck here forever waiting for a process that may never + * start. We do check for interrupts, though. + * + * ptr is a pointer to the memory address that we're expecting to become + * non-NULL when our counterpart attaches to the queue. + */ +static bool +shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr, + BackgroundWorkerHandle *handle) +{ + bool save_set_latch_on_sigusr1; + bool result = false; + + save_set_latch_on_sigusr1 = set_latch_on_sigusr1; + if (handle != NULL) + set_latch_on_sigusr1 = true; + + PG_TRY(); + { + for (;;) + { + BgwHandleStatus status; + pid_t pid; + bool detached; + + /* Acquire the lock just long enough to check the pointer. */ + SpinLockAcquire(&mq->mq_mutex); + detached = mq->mq_detached; + result = (*ptr != NULL); + SpinLockRelease(&mq->mq_mutex); + + /* Fail if detached; else succeed if initialized. */ + if (detached) + { + result = false; + break; + } + if (result) + break; + + if (handle != NULL) + { + /* Check for unexpected worker death. */ + status = GetBackgroundWorkerPid(handle, &pid); + if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) + { + result = false; + break; + } + } + + /* Wait to be signalled. */ + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + + /* Reset the latch so we don't spin. */ + ResetLatch(&MyProc->procLatch); + } + } + PG_CATCH(); + { + set_latch_on_sigusr1 = save_set_latch_on_sigusr1; + PG_RE_THROW(); + } + PG_END_TRY(); + + return result; +} + +/* + * Get the number of bytes read. The receiver need not use this to access + * the count of bytes read, but the sender must. + */ +static uint64 +shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached) +{ + uint64 v; + + SpinLockAcquire(&mq->mq_mutex); + v = mq->mq_bytes_read; + *detached = mq->mq_detached; + SpinLockRelease(&mq->mq_mutex); + + return v; +} + +/* + * Increment the number of bytes read. + */ +static void +shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n) +{ + PGPROC *sender; + + SpinLockAcquire(&mq->mq_mutex); + mq->mq_bytes_read += n; + sender = mq->mq_sender; + SpinLockRelease(&mq->mq_mutex); + + /* We shoudn't have any bytes to read without a sender. */ + Assert(sender != NULL); + SetLatch(&sender->procLatch); +} + +/* + * Get the number of bytes written. The sender need not use this to access + * the count of bytes written, but the reciever must. + */ +static uint64 +shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached) +{ + uint64 v; + + SpinLockAcquire(&mq->mq_mutex); + v = mq->mq_bytes_written; + *detached = mq->mq_detached; + SpinLockRelease(&mq->mq_mutex); + + return v; +} + +/* + * Increment the number of bytes written. + */ +static void +shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n) +{ + SpinLockAcquire(&mq->mq_mutex); + mq->mq_bytes_written += n; + SpinLockRelease(&mq->mq_mutex); +} + +/* + * Set sender's latch, unless queue is detached. + */ +static shm_mq_result +shm_mq_notify_receiver(volatile shm_mq *mq) +{ + PGPROC *receiver; + bool detached; + + SpinLockAcquire(&mq->mq_mutex); + detached = mq->mq_detached; + receiver = mq->mq_receiver; + SpinLockRelease(&mq->mq_mutex); + + if (detached) + return SHM_MQ_DETACHED; + if (receiver) + SetLatch(&receiver->procLatch); + return SHM_MQ_SUCCESS; +} + +/* Shim for on_dsm_callback. */ +static void +shm_mq_detach_callback(dsm_segment *seg, Datum arg) +{ + shm_mq *mq = (shm_mq *) DatumGetPointer(arg); + + shm_mq_detach(mq); +} diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h new file mode 100644 index 0000000000..1ce88a1c4b --- /dev/null +++ b/src/include/storage/shm_mq.h @@ -0,0 +1,70 @@ +/*------------------------------------------------------------------------- + * + * shm_mq.h + * single-reader, single-writer shared memory message queue + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/shm_mq.h + * + *------------------------------------------------------------------------- + */ +#ifndef SHM_MQ_H +#define SHM_MQ_H + +#include "postmaster/bgworker.h" +#include "storage/dsm.h" +#include "storage/proc.h" + +/* The queue itself, in shared memory. */ +struct shm_mq; +typedef struct shm_mq shm_mq; + +/* Backend-private state. */ +struct shm_mq_handle; +typedef struct shm_mq_handle shm_mq_handle; + +/* Possible results of a send or receive operation. */ +typedef enum +{ + SHM_MQ_SUCCESS, /* Sent or received a message. */ + SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */ + SHM_MQ_DETACHED /* Other process has detached queue. */ +} shm_mq_result; + +/* + * Primitives to create a queue and set the sender and receiver. + * + * Both the sender and the receiver must be set before any messages are read + * or written, but they need not be set by the same process. Each must be + * set exactly once. + */ +extern shm_mq *shm_mq_create(void *address, Size size); +extern void shm_mq_set_receiver(shm_mq *mq, PGPROC *); +extern void shm_mq_set_sender(shm_mq *mq, PGPROC *); + +/* Accessor methods for sender and receiver. */ +extern PGPROC *shm_mq_get_receiver(shm_mq *); +extern PGPROC *shm_mq_get_sender(shm_mq *); + +/* Set up backend-local queue state. */ +extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg, + BackgroundWorkerHandle *handle); + +/* Break connection. */ +extern void shm_mq_detach(shm_mq *); + +/* Send or receive messages. */ +extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, + uint64 nbytes, void *data, bool nowait); +extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, + uint64 *nbytesp, void **datap, bool nowait); + +/* Wait for our counterparty to attach to the queue. */ +extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh); + +/* Smallest possible queue. */ +extern const Size shm_mq_minimum_size; + +#endif /* SHM_MQ_H */