diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index 80956ce430..61f9298489 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -142,6 +142,8 @@ static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, const void *data, bool nowait, Size *bytes_written); static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, Size *nbytesp, void **datap); +static bool shm_mq_counterparty_gone(volatile shm_mq *mq, + BackgroundWorkerHandle *handle); static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr, BackgroundWorkerHandle *handle); static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached); @@ -499,6 +501,8 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) { if (nowait) { + if (shm_mq_counterparty_gone(mq, mqh->mqh_handle)) + return SHM_MQ_DETACHED; if (shm_mq_get_sender(mq) == NULL) return SHM_MQ_WOULD_BLOCK; } @@ -794,6 +798,11 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, */ if (nowait) { + if (shm_mq_counterparty_gone(mq, mqh->mqh_handle)) + { + *bytes_written = sent; + return SHM_MQ_DETACHED; + } if (shm_mq_get_receiver(mq) == NULL) { *bytes_written = sent; @@ -947,6 +956,45 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, } } +/* + * Test whether a counterparty who may not even be alive yet is definitely gone. + */ +static bool +shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle) +{ + bool detached; + pid_t pid; + + /* Acquire the lock just long enough to check the pointer. */ + SpinLockAcquire(&mq->mq_mutex); + detached = mq->mq_detached; + SpinLockRelease(&mq->mq_mutex); + + /* If the queue has been detached, counterparty is definitely gone. */ + if (detached) + return true; + + /* If there's a handle, check worker status. */ + if (handle != NULL) + { + BgwHandleStatus status; + + /* Check for unexpected worker death. */ + status = GetBackgroundWorkerPid(handle, &pid); + if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) + { + /* Mark it detached, just to make it official. */ + SpinLockAcquire(&mq->mq_mutex); + mq->mq_detached = true; + SpinLockRelease(&mq->mq_mutex); + return true; + } + } + + /* Counterparty is not definitively gone. */ + return false; +} + /* * This is used when a process is waiting for its counterpart to attach to the * queue. We exit when the other process attaches as expected, or, if