diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index ef9f09a2c1..09fb99bb73 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -126,6 +126,7 @@ #include "miscadmin.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "storage/procsignal.h" #include "storage/sinval.h" #include "tcop/tcopprot.h" @@ -1978,7 +1979,27 @@ asyncQueueProcessPageEntries(QueuePosition *current, /* Ignore messages destined for other databases */ if (qe->dboid == MyDatabaseId) { - if (TransactionIdDidCommit(qe->xid)) + if (TransactionIdIsInProgress(qe->xid)) + { + /* + * The source transaction is still in progress, so we can't + * process this message yet. Break out of the loop, but first + * back up *current so we will reprocess the message next + * time. (Note: it is unlikely but not impossible for + * TransactionIdDidCommit to fail, so we can't really avoid + * this advance-then-back-up behavior when dealing with an + * uncommitted message.) + * + * Note that we must test TransactionIdIsInProgress before we + * test TransactionIdDidCommit, else we might return a message + * from a transaction that is not yet visible to snapshots; + * compare the comments at the head of tqual.c. + */ + *current = thisentry; + reachedStop = true; + break; + } + else if (TransactionIdDidCommit(qe->xid)) { /* qe->data is the null-terminated channel name */ char *channel = qe->data; @@ -1991,27 +2012,12 @@ asyncQueueProcessPageEntries(QueuePosition *current, NotifyMyFrontEnd(channel, payload, qe->srcPid); } } - else if (TransactionIdDidAbort(qe->xid)) - { - /* - * If the source transaction aborted, we just ignore its - * notifications. - */ - } else { /* - * The transaction has neither committed nor aborted so far, - * so we can't process its message yet. Break out of the - * loop, but first back up *current so we will reprocess the - * message next time. (Note: it is unlikely but not - * impossible for TransactionIdDidCommit to fail, so we can't - * really avoid this advance-then-back-up behavior when - * dealing with an uncommitted message.) + * The source transaction aborted or crashed, so we just + * ignore its notifications. */ - *current = thisentry; - reachedStop = true; - break; } }