/*------------------------------------------------------------------------- * * async.c * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN * * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.115 2004/08/29 05:06:41 momjian Exp $ * *------------------------------------------------------------------------- */ /*------------------------------------------------------------------------- * New Async Notification Model: * 1. Multiple backends on same machine. Multiple backends listening on * one relation. (Note: "listening on a relation" is not really the * right way to think about it, since the notify names need not have * anything to do with the names of relations actually in the database. * But this terminology is all over the code and docs, and I don't feel * like trying to replace it.) * * 2. There is a tuple in relation "pg_listener" for each active LISTEN, * ie, each relname/listenerPID pair. The "notification" field of the * tuple is zero when no NOTIFY is pending for that listener, or the PID * of the originating backend when a cross-backend NOTIFY is pending. * (We skip writing to pg_listener when doing a self-NOTIFY, so the * notification field should never be equal to the listenerPID field.) * * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target * relname to a list of outstanding NOTIFY requests. Actual processing * happens if and only if we reach transaction commit. At that time (in * routine AtCommit_Notify) we scan pg_listener for matching relnames. * If the listenerPID in a matching tuple is ours, we just send a notify * message to our own front end. If it is not ours, and "notification" * is not already nonzero, we set notification to our own PID and send a * SIGUSR2 signal to the receiving process (indicated by listenerPID). * BTW: if the signal operation fails, we presume that the listener backend * crashed without removing this tuple, and remove the tuple for it. * * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound- * notify processing immediately if this backend is idle (ie, it is * waiting for a frontend command and is not within a transaction block). * Otherwise the handler may only set a flag, which will cause the * processing to occur just before we next go idle. * * 5. Inbound-notify processing consists of scanning pg_listener for tuples * matching our own listenerPID and having nonzero notification fields. * For each such tuple, we send a message to our frontend and clear the * notification field. BTW: this routine has to start/commit its own * transaction, since by assumption it is only called from outside any * transaction. * * Although we grab ExclusiveLock on pg_listener for any operation, * the lock is never held very long, so it shouldn't cause too much of * a performance problem. (Previously we used AccessExclusiveLock, but * there's no real reason to forbid concurrent reads.) * * An application that listens on the same relname it notifies will get * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, * by comparing be_pid in the NOTIFY message to the application's own backend's * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the * frontend during startup.) The above design guarantees that notifies from * other backends will never be missed by ignoring self-notifies. Note, * however, that we do *not* guarantee that a separate frontend message will * be sent for every outside NOTIFY. Since there is only room for one * originating PID in pg_listener, outside notifies occurring at about the * same time may be collapsed into a single message bearing the PID of the * first outside backend to perform the NOTIFY. *------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include #include #include "access/heapam.h" #include "catalog/catname.h" #include "catalog/pg_listener.h" #include "commands/async.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "miscadmin.h" #include "storage/ipc.h" #include "storage/sinval.h" #include "tcop/tcopprot.h" #include "utils/fmgroids.h" #include "utils/ps_status.h" #include "utils/syscache.h" /* * State for outbound notifies consists of a list of all relnames NOTIFYed * in the current transaction. We do not actually perform a NOTIFY until * and unless the transaction commits. pendingNotifies is NIL if no * NOTIFYs have been done in the current transaction. * * The list is kept in CurTransactionContext. In subtransactions, each * subtransaction has its own list in its own CurTransactionContext, but * successful subtransactions attach their lists to their parent's list. * Failed subtransactions simply discard their lists. */ static List *pendingNotifies = NIL; static List *upperPendingNotifies = NIL; /* list of upper-xact * lists */ /* * State for inbound notifies consists of two flags: one saying whether * the signal handler is currently allowed to call ProcessIncomingNotify * directly, and one saying whether the signal has occurred but the handler * was not allowed to call ProcessIncomingNotify at the time. * * NB: the "volatile" on these declarations is critical! If your compiler * does not grok "volatile", you'd be best advised to compile this file * with all optimization turned off. */ static volatile int notifyInterruptEnabled = 0; static volatile int notifyInterruptOccurred = 0; /* True if we've registered an on_shmem_exit cleanup */ static bool unlistenExitRegistered = false; bool Trace_notify = false; static void Async_UnlistenAll(void); static void Async_UnlistenOnExit(int code, Datum arg); static void ProcessIncomingNotify(void); static void NotifyMyFrontEnd(char *relname, int32 listenerPID); static bool AsyncExistsPendingNotify(const char *relname); static void ClearPendingNotifies(void); /* *-------------------------------------------------------------- * Async_Notify * * This is executed by the SQL notify command. * * Adds the relation to the list of pending notifies. * Actual notification happens during transaction commit. * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * * Results: * XXX * *-------------------------------------------------------------- */ void Async_Notify(char *relname) { if (Trace_notify) elog(DEBUG1, "Async_Notify(%s)", relname); /* no point in making duplicate entries in the list ... */ if (!AsyncExistsPendingNotify(relname)) { /* * The name list needs to live until end of transaction, so store * it in the transaction context. */ MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(CurTransactionContext); pendingNotifies = lcons(pstrdup(relname), pendingNotifies); MemoryContextSwitchTo(oldcontext); } } /* *-------------------------------------------------------------- * Async_Listen * * This is executed by the SQL listen command. * * Register a backend (identified by its Unix PID) as listening * on the specified relation. * * Results: * XXX * * Side effects: * pg_listener is updated. * *-------------------------------------------------------------- */ void Async_Listen(char *relname, int pid) { Relation lRel; HeapScanDesc scan; HeapTuple tuple; Datum values[Natts_pg_listener]; char nulls[Natts_pg_listener]; int i; bool alreadyListener = false; if (Trace_notify) elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid); lRel = heap_openr(ListenerRelationName, ExclusiveLock); /* Detect whether we are already listening on this relname */ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); if (listener->listenerpid == pid && strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) { alreadyListener = true; /* No need to scan the rest of the table */ break; } } heap_endscan(scan); if (alreadyListener) { heap_close(lRel, ExclusiveLock); return; } /* * OK to insert a new tuple */ for (i = 0; i < Natts_pg_listener; i++) { nulls[i] = ' '; values[i] = PointerGetDatum(NULL); } i = 0; values[i++] = (Datum) relname; values[i++] = (Datum) pid; values[i++] = (Datum) 0; /* no notifies pending */ tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls); simple_heap_insert(lRel, tuple); #ifdef NOT_USED /* currently there are no indexes */ CatalogUpdateIndexes(lRel, tuple); #endif heap_freetuple(tuple); heap_close(lRel, ExclusiveLock); /* * now that we are listening, make sure we will unlisten before dying. */ if (!unlistenExitRegistered) { on_shmem_exit(Async_UnlistenOnExit, 0); unlistenExitRegistered = true; } } /* *-------------------------------------------------------------- * Async_Unlisten * * This is executed by the SQL unlisten command. * * Remove the backend from the list of listening backends * for the specified relation. * * Results: * XXX * * Side effects: * pg_listener is updated. * *-------------------------------------------------------------- */ void Async_Unlisten(char *relname, int pid) { Relation lRel; HeapScanDesc scan; HeapTuple tuple; /* Handle specially the `unlisten "*"' command */ if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0)) { Async_UnlistenAll(); return; } if (Trace_notify) elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid); lRel = heap_openr(ListenerRelationName, ExclusiveLock); scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); if (listener->listenerpid == pid && strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) { /* Found the matching tuple, delete it */ simple_heap_delete(lRel, &tuple->t_self); /* * We assume there can be only one match, so no need to scan * the rest of the table */ break; } } heap_endscan(scan); heap_close(lRel, ExclusiveLock); /* * We do not complain about unlistening something not being listened; * should we? */ } /* *-------------------------------------------------------------- * Async_UnlistenAll * * Unlisten all relations for this backend. * * This is invoked by UNLISTEN "*" command, and also at backend exit. * * Results: * XXX * * Side effects: * pg_listener is updated. * *-------------------------------------------------------------- */ static void Async_UnlistenAll(void) { Relation lRel; TupleDesc tdesc; HeapScanDesc scan; HeapTuple lTuple; ScanKeyData key[1]; if (Trace_notify) elog(DEBUG1, "Async_UnlistenAll"); lRel = heap_openr(ListenerRelationName, ExclusiveLock); tdesc = RelationGetDescr(lRel); /* Find and delete all entries with my listenerPID */ ScanKeyInit(&key[0], Anum_pg_listener_pid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(MyProcPid)); scan = heap_beginscan(lRel, SnapshotNow, 1, key); while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) simple_heap_delete(lRel, &lTuple->t_self); heap_endscan(scan); heap_close(lRel, ExclusiveLock); } /* *-------------------------------------------------------------- * Async_UnlistenOnExit * * Clean up the pg_listener table at backend exit. * * This is executed if we have done any LISTENs in this backend. * It might not be necessary anymore, if the user UNLISTENed everything, * but we don't try to detect that case. * * Results: * XXX * * Side effects: * pg_listener is updated if necessary. * *-------------------------------------------------------------- */ static void Async_UnlistenOnExit(int code, Datum arg) { /* * We need to start/commit a transaction for the unlisten, but if * there is already an active transaction we had better abort that one * first. Otherwise we'd end up committing changes that probably * ought to be discarded. */ AbortOutOfAnyTransaction(); /* Now we can do the unlisten */ StartTransactionCommand(); Async_UnlistenAll(); CommitTransactionCommand(); } /* *-------------------------------------------------------------- * AtCommit_Notify * * This is called at transaction commit. * * If there are outbound notify requests in the pendingNotifies list, * scan pg_listener for matching tuples, and either signal the other * backend or send a message to our own frontend. * * NOTE: we are still inside the current transaction, therefore can * piggyback on its committing of changes. * * Results: * XXX * * Side effects: * Tuples in pg_listener that have matching relnames and other peoples' * listenerPIDs are updated with a nonzero notification field. * *-------------------------------------------------------------- */ void AtCommit_Notify(void) { Relation lRel; TupleDesc tdesc; HeapScanDesc scan; HeapTuple lTuple, rTuple; Datum value[Natts_pg_listener]; char repl[Natts_pg_listener], nulls[Natts_pg_listener]; if (pendingNotifies == NIL) return; /* no NOTIFY statements in this * transaction */ /* * NOTIFY is disabled if not normal processing mode. This test used to * be in xact.c, but it seems cleaner to do it here. */ if (!IsNormalProcessingMode()) { ClearPendingNotifies(); return; } if (Trace_notify) elog(DEBUG1, "AtCommit_Notify"); /* preset data to update notify column to MyProcPid */ nulls[0] = nulls[1] = nulls[2] = ' '; repl[0] = repl[1] = repl[2] = ' '; repl[Anum_pg_listener_notify - 1] = 'r'; value[0] = value[1] = value[2] = (Datum) 0; value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid); lRel = heap_openr(ListenerRelationName, ExclusiveLock); tdesc = RelationGetDescr(lRel); scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); char *relname = NameStr(listener->relname); int32 listenerPID = listener->listenerpid; if (!AsyncExistsPendingNotify(relname)) continue; if (listenerPID == MyProcPid) { /* * Self-notify: no need to bother with table update. Indeed, * we *must not* clear the notification field in this path, or * we could lose an outside notify, which'd be bad for * applications that ignore self-notify messages. */ if (Trace_notify) elog(DEBUG1, "AtCommit_Notify: notifying self"); NotifyMyFrontEnd(relname, listenerPID); } else { if (Trace_notify) elog(DEBUG1, "AtCommit_Notify: notifying pid %d", listenerPID); /* * If someone has already notified this listener, we don't * bother modifying the table, but we do still send a SIGUSR2 * signal, just in case that backend missed the earlier signal * for some reason. It's OK to send the signal first, because * the other guy can't read pg_listener until we unlock it. */ if (kill(listenerPID, SIGUSR2) < 0) { /* * Get rid of pg_listener entry if it refers to a PID that * no longer exists. Presumably, that backend crashed * without deleting its pg_listener entries. This code * used to only delete the entry if errno==ESRCH, but as * far as I can see we should just do it for any failure * (certainly at least for EPERM too...) */ simple_heap_delete(lRel, &lTuple->t_self); } else if (listener->notification == 0) { ItemPointerData ctid; int result; rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl); /* * We cannot use simple_heap_update here because the tuple * could have been modified by an uncommitted transaction; * specifically, since UNLISTEN releases exclusive lock on * the table before commit, the other guy could already * have tried to unlisten. There are no other cases where * we should be able to see an uncommitted update or * delete. Therefore, our response to a * HeapTupleBeingUpdated result is just to ignore it. We * do *not* wait for the other guy to commit --- that * would risk deadlock, and we don't want to block while * holding the table lock anyway for performance reasons. * We also ignore HeapTupleUpdated, which could occur if * the other guy commits between our heap_getnext and * heap_update calls. */ result = heap_update(lRel, &lTuple->t_self, rTuple, &ctid, GetCurrentCommandId(), SnapshotAny, false /* no wait for commit */ ); switch (result) { case HeapTupleSelfUpdated: /* Tuple was already updated in current command? */ elog(ERROR, "tuple already updated by self"); break; case HeapTupleMayBeUpdated: /* done successfully */ #ifdef NOT_USED /* currently there are no indexes */ CatalogUpdateIndexes(lRel, rTuple); #endif break; case HeapTupleBeingUpdated: /* ignore uncommitted tuples */ break; case HeapTupleUpdated: /* ignore just-committed tuples */ break; default: elog(ERROR, "unrecognized heap_update status: %u", result); break; } } } } heap_endscan(scan); /* * We do NOT release the lock on pg_listener here; we need to hold it * until end of transaction (which is about to happen, anyway) to * ensure that notified backends see our tuple updates when they look. * Else they might disregard the signal, which would make the * application programmer very unhappy. */ heap_close(lRel, NoLock); ClearPendingNotifies(); if (Trace_notify) elog(DEBUG1, "AtCommit_Notify: done"); } /* *-------------------------------------------------------------- * AtAbort_Notify * * This is called at transaction abort. * * Gets rid of pending outbound notifies that we would have executed * if the transaction got committed. * * Results: * XXX * *-------------------------------------------------------------- */ void AtAbort_Notify(void) { ClearPendingNotifies(); } /* * AtSubStart_Notify() --- Take care of subtransaction start. * * Push empty state for the new subtransaction. */ void AtSubStart_Notify(void) { MemoryContext old_cxt; /* Keep the list-of-lists in TopTransactionContext for simplicity */ old_cxt = MemoryContextSwitchTo(TopTransactionContext); upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies); pendingNotifies = NIL; MemoryContextSwitchTo(old_cxt); } /* * AtSubCommit_Notify() --- Take care of subtransaction commit. * * Reassign all items in the pending notifies list to the parent transaction. */ void AtSubCommit_Notify(void) { List *parentPendingNotifies; parentPendingNotifies = (List *) linitial(upperPendingNotifies); upperPendingNotifies = list_delete_first(upperPendingNotifies); /* * We could try to eliminate duplicates here, but it seems not * worthwhile. */ pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies); } /* * AtSubAbort_Notify() --- Take care of subtransaction abort. */ void AtSubAbort_Notify(void) { /* * All we have to do is pop the stack --- the notifies made in this * subxact are no longer interesting, and the space will be freed when * CurTransactionContext is recycled. */ pendingNotifies = (List *) linitial(upperPendingNotifies); upperPendingNotifies = list_delete_first(upperPendingNotifies); } /* *-------------------------------------------------------------- * NotifyInterruptHandler * * This is the signal handler for SIGUSR2. * * If we are idle (notifyInterruptEnabled is set), we can safely invoke * ProcessIncomingNotify directly. Otherwise, just set a flag * to do it later. * * Results: * none * * Side effects: * per above *-------------------------------------------------------------- */ void NotifyInterruptHandler(SIGNAL_ARGS) { int save_errno = errno; /* * Note: this is a SIGNAL HANDLER. You must be very wary what you do * here. Some helpful soul had this routine sprinkled with TPRINTFs, * which would likely lead to corruption of stdio buffers if they were * ever turned on. */ /* Don't joggle the elbow of proc_exit */ if (proc_exit_inprogress) return; if (notifyInterruptEnabled) { bool save_ImmediateInterruptOK = ImmediateInterruptOK; /* * We may be called while ImmediateInterruptOK is true; turn it * off while messing with the NOTIFY state. (We would have to * save and restore it anyway, because PGSemaphore operations * inside ProcessIncomingNotify() might reset it.) */ ImmediateInterruptOK = false; /* * I'm not sure whether some flavors of Unix might allow another * SIGUSR2 occurrence to recursively interrupt this routine. To * cope with the possibility, we do the same sort of dance that * EnableNotifyInterrupt must do --- see that routine for * comments. */ notifyInterruptEnabled = 0; /* disable any recursive signal */ notifyInterruptOccurred = 1; /* do at least one iteration */ for (;;) { notifyInterruptEnabled = 1; if (!notifyInterruptOccurred) break; notifyInterruptEnabled = 0; if (notifyInterruptOccurred) { /* Here, it is finally safe to do stuff. */ if (Trace_notify) elog(DEBUG1, "NotifyInterruptHandler: perform async notify"); ProcessIncomingNotify(); if (Trace_notify) elog(DEBUG1, "NotifyInterruptHandler: done"); } } /* * Restore ImmediateInterruptOK, and check for interrupts if * needed. */ ImmediateInterruptOK = save_ImmediateInterruptOK; if (save_ImmediateInterruptOK) CHECK_FOR_INTERRUPTS(); } else { /* * In this path it is NOT SAFE to do much of anything, except * this: */ notifyInterruptOccurred = 1; } errno = save_errno; } /* * -------------------------------------------------------------- * EnableNotifyInterrupt * * This is called by the PostgresMain main loop just before waiting * for a frontend command. If we are truly idle (ie, *not* inside * a transaction block), then process any pending inbound notifies, * and enable the signal handler to process future notifies directly. * * NOTE: the signal handler starts out disabled, and stays so until * PostgresMain calls this the first time. * -------------------------------------------------------------- */ void EnableNotifyInterrupt(void) { if (IsTransactionOrTransactionBlock()) return; /* not really idle */ /* * This code is tricky because we are communicating with a signal * handler that could interrupt us at any point. If we just checked * notifyInterruptOccurred and then set notifyInterruptEnabled, we * could fail to respond promptly to a signal that happens in between * those two steps. (A very small time window, perhaps, but Murphy's * Law says you can hit it...) Instead, we first set the enable flag, * then test the occurred flag. If we see an unserviced interrupt has * occurred, we re-clear the enable flag before going off to do the * service work. (That prevents re-entrant invocation of * ProcessIncomingNotify() if another interrupt occurs.) If an * interrupt comes in between the setting and clearing of * notifyInterruptEnabled, then it will have done the service work and * left notifyInterruptOccurred zero, so we have to check again after * clearing enable. The whole thing has to be in a loop in case * another interrupt occurs while we're servicing the first. Once we * get out of the loop, enable is set and we know there is no * unserviced interrupt. * * NB: an overenthusiastic optimizing compiler could easily break this * code. Hopefully, they all understand what "volatile" means these * days. */ for (;;) { notifyInterruptEnabled = 1; if (!notifyInterruptOccurred) break; notifyInterruptEnabled = 0; if (notifyInterruptOccurred) { if (Trace_notify) elog(DEBUG1, "EnableNotifyInterrupt: perform async notify"); ProcessIncomingNotify(); if (Trace_notify) elog(DEBUG1, "EnableNotifyInterrupt: done"); } } } /* * -------------------------------------------------------------- * DisableNotifyInterrupt * * This is called by the PostgresMain main loop just after receiving * a frontend command. Signal handler execution of inbound notifies * is disabled until the next EnableNotifyInterrupt call. * * The SIGUSR1 signal handler also needs to call this, so as to * prevent conflicts if one signal interrupts the other. So we * must return the previous state of the flag. * -------------------------------------------------------------- */ bool DisableNotifyInterrupt(void) { bool result = (notifyInterruptEnabled != 0); notifyInterruptEnabled = 0; return result; } /* * -------------------------------------------------------------- * ProcessIncomingNotify * * Deal with arriving NOTIFYs from other backends. * This is called either directly from the SIGUSR2 signal handler, * or the next time control reaches the outer idle loop. * Scan pg_listener for arriving notifies, report them to my front end, * and clear the notification field in pg_listener until next time. * * NOTE: since we are outside any transaction, we must create our own. * -------------------------------------------------------------- */ static void ProcessIncomingNotify(void) { Relation lRel; TupleDesc tdesc; ScanKeyData key[1]; HeapScanDesc scan; HeapTuple lTuple, rTuple; Datum value[Natts_pg_listener]; char repl[Natts_pg_listener], nulls[Natts_pg_listener]; bool catchup_enabled; /* Must prevent SIGUSR1 interrupt while I am running */ catchup_enabled = DisableCatchupInterrupt(); if (Trace_notify) elog(DEBUG1, "ProcessIncomingNotify"); set_ps_display("notify interrupt"); notifyInterruptOccurred = 0; StartTransactionCommand(); lRel = heap_openr(ListenerRelationName, ExclusiveLock); tdesc = RelationGetDescr(lRel); /* Scan only entries with my listenerPID */ ScanKeyInit(&key[0], Anum_pg_listener_pid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(MyProcPid)); scan = heap_beginscan(lRel, SnapshotNow, 1, key); /* Prepare data for rewriting 0 into notification field */ nulls[0] = nulls[1] = nulls[2] = ' '; repl[0] = repl[1] = repl[2] = ' '; repl[Anum_pg_listener_notify - 1] = 'r'; value[0] = value[1] = value[2] = (Datum) 0; value[Anum_pg_listener_notify - 1] = Int32GetDatum(0); while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); char *relname = NameStr(listener->relname); int32 sourcePID = listener->notification; if (sourcePID != 0) { /* Notify the frontend */ if (Trace_notify) elog(DEBUG1, "ProcessIncomingNotify: received %s from %d", relname, (int) sourcePID); NotifyMyFrontEnd(relname, sourcePID); /* * Rewrite the tuple with 0 in notification column. * * simple_heap_update is safe here because no one else would have * tried to UNLISTEN us, so there can be no uncommitted * changes. */ rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl); simple_heap_update(lRel, &lTuple->t_self, rTuple); #ifdef NOT_USED /* currently there are no indexes */ CatalogUpdateIndexes(lRel, rTuple); #endif } } heap_endscan(scan); /* * We do NOT release the lock on pg_listener here; we need to hold it * until end of transaction (which is about to happen, anyway) to * ensure that other backends see our tuple updates when they look. * Otherwise, a transaction started after this one might mistakenly * think it doesn't need to send this backend a new NOTIFY. */ heap_close(lRel, NoLock); CommitTransactionCommand(); /* * Must flush the notify messages to ensure frontend gets them * promptly. */ pq_flush(); set_ps_display("idle"); if (Trace_notify) elog(DEBUG1, "ProcessIncomingNotify: done"); if (catchup_enabled) EnableCatchupInterrupt(); } /* * Send NOTIFY message to my front end. */ static void NotifyMyFrontEnd(char *relname, int32 listenerPID) { if (whereToSendOutput == Remote) { StringInfoData buf; pq_beginmessage(&buf, 'A'); pq_sendint(&buf, listenerPID, sizeof(int32)); pq_sendstring(&buf, relname); if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* XXX Add parameter string here later */ pq_sendstring(&buf, ""); } pq_endmessage(&buf); /* * NOTE: we do not do pq_flush() here. For a self-notify, it will * happen at the end of the transaction, and for incoming notifies * ProcessIncomingNotify will do it after finding all the * notifies. */ } else elog(INFO, "NOTIFY for %s", relname); } /* Does pendingNotifies include the given relname? */ static bool AsyncExistsPendingNotify(const char *relname) { ListCell *p; foreach(p, pendingNotifies) { /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */ if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0) return true; } return false; } /* Clear the pendingNotifies list. */ static void ClearPendingNotifies(void) { /* * We used to have to explicitly deallocate the list members and * nodes, because they were malloc'd. Now, since we know they are * palloc'd in CurTransactionContext, we need not do that --- they'll * go away automatically at transaction exit. We need only reset the * list head pointer. */ pendingNotifies = NIL; }