From 7a85073290856554416353a89799a4c04d09b74b Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 1 Mar 2022 06:17:52 +0530 Subject: [PATCH] Reconsider pg_stat_subscription_workers view. It was decided (refer to the Discussion link below) that the stats collector is not an appropriate place to store the error information of subscription workers. This patch changes the pg_stat_subscription_workers view (introduced by commit 8d74fc96db) so that it stores only statistics counters: apply_error_count and sync_error_count, and has one entry for each subscription. The removed error information such as error-XID and the error message would be stored in another way in the future which is more reliable and persistent. After removing these error details, there is no longer any relation information, so the subscription statistics are now a cluster-wide statistics. The patch also changes the view name to pg_stat_subscription_stats since the word "worker" is an implementation detail that we use one worker for one tablesync and one apply. Author: Masahiko Sawada, based on suggestions by Andres Freund Reviewed-by: Peter Smith, Haiying Tang, Takamichi Osumi, Amit Kapila Discussion: https://postgr.es/m/20220125063131.4cmvsxbz2tdg6g65@alap3.anarazel.de --- doc/src/sgml/logical-replication.sgml | 4 +- doc/src/sgml/monitoring.sgml | 99 +-- src/backend/catalog/system_functions.sql | 4 +- src/backend/catalog/system_views.sql | 27 +- src/backend/postmaster/pgstat.c | 656 +++++++++----------- src/backend/replication/logical/worker.c | 42 +- src/backend/utils/adt/pgstatfuncs.c | 160 ++--- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_proc.dat | 27 +- src/include/pgstat.h | 129 ++-- src/test/regress/expected/rules.out | 23 +- src/test/subscription/t/026_stats.pl | 102 +++ src/test/subscription/t/026_worker_stats.pl | 165 ----- src/tools/pgindent/typedefs.list | 8 +- 14 files changed, 583 insertions(+), 865 deletions(-) create mode 100644 src/test/subscription/t/026_stats.pl delete mode 100644 src/test/subscription/t/026_worker_stats.pl diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 96b4886e08..fb4472356d 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -346,9 +346,7 @@ A conflict will produce an error and will stop the replication; it must be resolved manually by the user. Details about the conflict can be found in - - pg_stat_subscription_workers and the - subscriber's server log. + the subscriber's server log. diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index bf7625d988..9fb62fec8e 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -628,11 +628,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser - pg_stat_subscription_workerspg_stat_subscription_workers - One row per subscription worker, showing statistics about errors - that occurred on that subscription worker. - See - pg_stat_subscription_workers for details. + pg_stat_subscription_statspg_stat_subscription_stats + One row per subscription, showing statistics about errors. + See + pg_stat_subscription_stats for details. @@ -3063,23 +3062,20 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i - - <structname>pg_stat_subscription_workers</structname> + + <structname>pg_stat_subscription_stats</structname> - pg_stat_subscription_workers + pg_stat_subscription_stats - The pg_stat_subscription_workers view will contain - one row per subscription worker on which errors have occurred, for workers - applying logical replication changes and workers handling the initial data - copy of the subscribed tables. The statistics entry is removed when the - corresponding subscription is dropped. + The pg_stat_subscription_stats view will contain + one row per subscription. - - <structname>pg_stat_subscription_workers</structname> View +
+ <structname>pg_stat_subscription_stats</structname> View @@ -3113,72 +3109,31 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i - subrelid oid + apply_error_count bigint - OID of the relation that the worker is synchronizing; null for the - main apply worker + Number of times an error occurred while applying changes - last_error_relid oid + sync_error_count bigint - OID of the relation that the worker was processing when the - error occurred + Number of times an error occurred during the initial table + synchronization - last_error_command text + stats_reset timestamp with time zone - Name of command being applied when the error occurred. This field - is null if the error was reported during the initial data copy. + Time at which these statistics were last reset - - - - last_error_xid xid - - - Transaction ID of the publisher node being applied when the error - occurred. This field is null if the error was reported - during the initial data copy. - - - - - - last_error_count uint8 - - - Number of consecutive times the error occurred - - - - - - last_error_message text - - - The error message - - - - - - last_error_time timestamp with time zone - - - Last time at which this error occurred - - -
@@ -5320,22 +5275,16 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i - pg_stat_reset_subscription_worker + pg_stat_reset_subscription_stats - pg_stat_reset_subscription_worker ( subid oid , relid oid ) + pg_stat_reset_subscription_stats ( oid ) void - Resets the statistics of subscription workers running on the - subscription with subid shown in the - pg_stat_subscription_workers view. If the - argument relid is not NULL, - resets statistics of the subscription worker handling the initial data - copy of the relation with relid. Otherwise, - resets the subscription worker statistics of the main apply worker. - If the argument relid is omitted, resets the - statistics of all subscription workers running on the subscription - with subid. + Resets statistics for a single subscription shown in the + pg_stat_subscription_stats view to zero. If + the argument is NULL, reset statistics for all + subscriptions. This function is restricted to superusers by default, but other users diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index fd1421788e..758ab6e25a 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -639,9 +639,7 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM publ REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public; -REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid) FROM public; - -REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid, oid) FROM public; +REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_stats(oid) FROM public; REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 3cb69b1f87..40b7bca5a9 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1264,25 +1264,12 @@ GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; -CREATE VIEW pg_stat_subscription_workers AS +CREATE VIEW pg_stat_subscription_stats AS SELECT - w.subid, + ss.subid, s.subname, - w.subrelid, - w.last_error_relid, - w.last_error_command, - w.last_error_xid, - w.last_error_count, - w.last_error_message, - w.last_error_time - FROM (SELECT - oid as subid, - NULL as relid - FROM pg_subscription - UNION ALL - SELECT - srsubid as subid, - srrelid as relid - FROM pg_subscription_rel) sr, - LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w - JOIN pg_subscription s ON (w.subid = s.oid); + ss.apply_error_count, + ss.sync_error_count, + ss.stats_reset + FROM pg_subscription as s, + pg_stat_get_subscription_stats(s.oid) as ss; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 0646f53098..53ddd930e6 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -106,7 +106,7 @@ #define PGSTAT_DB_HASH_SIZE 16 #define PGSTAT_TAB_HASH_SIZE 512 #define PGSTAT_FUNCTION_HASH_SIZE 512 -#define PGSTAT_SUBWORKER_HASH_SIZE 32 +#define PGSTAT_SUBSCRIPTION_HASH_SIZE 32 #define PGSTAT_REPLSLOT_HASH_SIZE 32 @@ -284,6 +284,7 @@ static PgStat_GlobalStats globalStats; static PgStat_WalStats walStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; static HTAB *replSlotStatHash = NULL; +static HTAB *subscriptionStatHash = NULL; /* * List of OIDs of databases we need to write out. If an entry is InvalidOid, @@ -322,14 +323,13 @@ NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_no static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create); static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create); -static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, - Oid subid, Oid subrelid, - bool create); +static PgStat_StatSubEntry *pgstat_get_subscription_entry(Oid subid, bool create); +static void pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts); static void pgstat_write_statsfiles(bool permanent, bool allDbs); static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent); static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep); static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, - HTAB *subworkerhash, bool permanent); + bool permanent); static void backend_read_statsfile(void); static bool pgstat_write_statsfile_needed(void); @@ -341,7 +341,6 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now); static void pgstat_send_funcstats(void); static void pgstat_send_slru(void); -static void pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg); static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid); static bool pgstat_should_report_connstat(void); static void pgstat_report_disconnect(Oid dboid); @@ -363,6 +362,7 @@ static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, in static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len); static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len); static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len); +static void pgstat_recv_resetsubcounter(PgStat_MsgResetsubcounter *msg, int len); static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len); static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len); static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); @@ -380,8 +380,8 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len); static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len); static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); -static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len); -static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len); +static void pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len); +static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -1187,6 +1187,32 @@ pgstat_vacuum_stat(void) } } + /* + * Repeat the above steps for subscriptions, if subscription stats are + * being collected. + */ + if (subscriptionStatHash) + { + PgStat_StatSubEntry *subentry; + + /* + * Read pg_subscription and make a list of OIDs of all existing + * subscriptions. + */ + htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid); + + hash_seq_init(&hstat, subscriptionStatHash); + while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL) + { + CHECK_FOR_INTERRUPTS(); + + if (hash_search(htab, (void *) &(subentry->subid), HASH_FIND, NULL) == NULL) + pgstat_report_subscription_drop(subentry->subid); + } + + hash_destroy(htab); + } + /* * Lookup our own database entry; if not found, nothing more to do. */ @@ -1311,74 +1337,6 @@ pgstat_vacuum_stat(void) hash_destroy(htab); } - - /* - * Repeat for subscription workers. Similarly, we needn't bother in the - * common case where no subscription workers' stats are being collected. - */ - if (dbentry->subworkers != NULL && - hash_get_num_entries(dbentry->subworkers) > 0) - { - PgStat_StatSubWorkerEntry *subwentry; - PgStat_MsgSubscriptionPurge spmsg; - - /* - * Read pg_subscription and make a list of OIDs of all existing - * subscriptions - */ - htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid); - - spmsg.m_databaseid = MyDatabaseId; - spmsg.m_nentries = 0; - - hash_seq_init(&hstat, dbentry->subworkers); - while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) - { - bool exists = false; - Oid subid = subwentry->key.subid; - - CHECK_FOR_INTERRUPTS(); - - if (hash_search(htab, (void *) &subid, HASH_FIND, NULL) != NULL) - continue; - - /* - * It is possible that we have multiple entries for the - * subscription corresponding to apply worker and tablesync - * workers. In such cases, we don't need to add the same subid - * again. - */ - for (int i = 0; i < spmsg.m_nentries; i++) - { - if (spmsg.m_subids[i] == subid) - { - exists = true; - break; - } - } - - if (exists) - continue; - - /* This subscription is dead, add the subid to the message */ - spmsg.m_subids[spmsg.m_nentries++] = subid; - - /* - * If the message is full, send it out and reinitialize to empty - */ - if (spmsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE) - { - pgstat_send_subscription_purge(&spmsg); - spmsg.m_nentries = 0; - } - } - - /* Send the rest of dead subscriptions */ - if (spmsg.m_nentries > 0) - pgstat_send_subscription_purge(&spmsg); - - hash_destroy(htab); - } } @@ -1551,8 +1509,7 @@ pgstat_reset_shared_counters(const char *target) * ---------- */ void -pgstat_reset_single_counter(Oid objoid, Oid subobjoid, - PgStat_Single_Reset_Type type) +pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type) { PgStat_MsgResetsinglecounter msg; @@ -1563,7 +1520,6 @@ pgstat_reset_single_counter(Oid objoid, Oid subobjoid, msg.m_databaseid = MyDatabaseId; msg.m_resettype = type; msg.m_objectid = objoid; - msg.m_subobjectid = subobjoid; pgstat_send(&msg, sizeof(msg)); } @@ -1623,6 +1579,30 @@ pgstat_reset_replslot_counter(const char *name) pgstat_send(&msg, sizeof(msg)); } +/* ---------- + * pgstat_reset_subscription_counter() - + * + * Tell the statistics collector to reset a single subscription + * counter, or all subscription counters (when subid is InvalidOid). + * + * Permission checking for this function is managed through the normal + * GRANT system. + * ---------- + */ +void +pgstat_reset_subscription_counter(Oid subid) +{ + PgStat_MsgResetsubcounter msg; + + if (pgStatSock == PGINVALID_SOCKET) + return; + + msg.m_subid = subid; + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSUBCOUNTER); + + pgstat_send(&msg, sizeof(msg)); +} + /* ---------- * pgstat_report_autovac() - * @@ -1949,31 +1929,20 @@ pgstat_report_replslot_drop(const char *slotname) } /* ---------- - * pgstat_report_subworker_error() - + * pgstat_report_subscription_error() - * - * Tell the collector about the subscription worker error. + * Tell the collector about the subscription error. * ---------- */ void -pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, - LogicalRepMsgType command, TransactionId xid, - const char *errmsg) +pgstat_report_subscription_error(Oid subid, bool is_apply_error) { - PgStat_MsgSubWorkerError msg; - int len; + PgStat_MsgSubscriptionError msg; - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR); - msg.m_databaseid = MyDatabaseId; + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERROR); msg.m_subid = subid; - msg.m_subrelid = subrelid; - msg.m_relid = relid; - msg.m_command = command; - msg.m_xid = xid; - msg.m_timestamp = GetCurrentTimestamp(); - strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN); - - len = offsetof(PgStat_MsgSubWorkerError, m_message) + strlen(msg.m_message) + 1; - pgstat_send(&msg, len); + msg.m_is_apply_error = is_apply_error; + pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionError)); } /* ---------- @@ -1985,12 +1954,11 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, void pgstat_report_subscription_drop(Oid subid) { - PgStat_MsgSubscriptionPurge msg; + PgStat_MsgSubscriptionDrop msg; - msg.m_databaseid = MyDatabaseId; - msg.m_subids[0] = subid; - msg.m_nentries = 1; - pgstat_send_subscription_purge(&msg); + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONDROP); + msg.m_subid = subid; + pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionDrop)); } /* ---------- @@ -2998,36 +2966,6 @@ pgstat_fetch_stat_funcentry(Oid func_id) return funcentry; } -/* - * --------- - * pgstat_fetch_stat_subworker_entry() - - * - * Support function for the SQL-callable pgstat* functions. Returns - * the collected statistics for subscription worker or NULL. - * --------- - */ -PgStat_StatSubWorkerEntry * -pgstat_fetch_stat_subworker_entry(Oid subid, Oid subrelid) -{ - PgStat_StatDBEntry *dbentry; - PgStat_StatSubWorkerEntry *wentry = NULL; - - /* Load the stats file if needed */ - backend_read_statsfile(); - - /* - * Lookup our database, then find the requested subscription worker stats. - */ - dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId); - if (dbentry != NULL && dbentry->subworkers != NULL) - { - wentry = pgstat_get_subworker_entry(dbentry, subid, subrelid, - false); - } - - return wentry; -} - /* * --------- * pgstat_fetch_stat_archiver() - @@ -3140,6 +3078,23 @@ pgstat_fetch_replslot(NameData slotname) return pgstat_get_replslot_entry(slotname, false); } +/* + * --------- + * pgstat_fetch_stat_subscription() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * the collected statistics for one subscription or NULL. + * --------- + */ +PgStat_StatSubEntry * +pgstat_fetch_stat_subscription(Oid subid) +{ + /* Load the stats file if needed */ + backend_read_statsfile(); + + return pgstat_get_subscription_entry(subid, false); +} + /* * Shut down a single backend's statistics reporting at process exit. * @@ -3465,24 +3420,6 @@ pgstat_send_slru(void) } } -/* -------- - * pgstat_send_subscription_purge() - - * - * Send a subscription purge message to the collector - * -------- - */ -static void -pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg) -{ - int len; - - len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0]) - + msg->m_nentries * sizeof(Oid); - - pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE); - pgstat_send(msg, len); -} - /* ---------- * PgstatCollectorMain() - * @@ -3668,6 +3605,10 @@ PgstatCollectorMain(int argc, char *argv[]) len); break; + case PGSTAT_MTYPE_RESETSUBCOUNTER: + pgstat_recv_resetsubcounter(&msg.msg_resetsubcounter, len); + break; + case PGSTAT_MTYPE_AUTOVAC_START: pgstat_recv_autovac(&msg.msg_autovacuum_start, len); break; @@ -3738,12 +3679,12 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_disconnect(&msg.msg_disconnect, len); break; - case PGSTAT_MTYPE_SUBSCRIPTIONPURGE: - pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len); + case PGSTAT_MTYPE_SUBSCRIPTIONDROP: + pgstat_recv_subscription_drop(&msg.msg_subscriptiondrop, len); break; - case PGSTAT_MTYPE_SUBWORKERERROR: - pgstat_recv_subworker_error(&msg.msg_subworkererror, len); + case PGSTAT_MTYPE_SUBSCRIPTIONERROR: + pgstat_recv_subscription_error(&msg.msg_subscriptionerror, len); break; default: @@ -3791,8 +3732,7 @@ PgstatCollectorMain(int argc, char *argv[]) /* * Subroutine to clear stats in a database entry * - * Tables, functions, and subscription workers hashes are initialized - * to empty. + * Tables and functions hashes are initialized to empty. */ static void reset_dbentry_counters(PgStat_StatDBEntry *dbentry) @@ -3845,13 +3785,6 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry) PGSTAT_FUNCTION_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS); - - hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); - hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); - dbentry->subworkers = hash_create("Per-database subscription worker", - PGSTAT_SUBWORKER_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS); } /* @@ -3876,7 +3809,7 @@ pgstat_get_db_entry(Oid databaseid, bool create) /* * If not found, initialize the new one. This creates empty hash tables - * for tables, functions, and subscription workers, too. + * for tables and functions, too. */ if (!found) reset_dbentry_counters(result); @@ -3934,48 +3867,6 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create) return result; } -/* ---------- - * pgstat_get_subworker_entry - * - * Return subscription worker entry with the given subscription OID and - * relation OID. If subrelid is InvalidOid, it returns an entry of the - * apply worker otherwise returns an entry of the table sync worker - * associated with subrelid. If no subscription worker entry exists, - * initialize it, if the create parameter is true. Else, return NULL. - * ---------- - */ -static PgStat_StatSubWorkerEntry * -pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid, - bool create) -{ - PgStat_StatSubWorkerEntry *subwentry; - PgStat_StatSubWorkerKey key; - bool found; - HASHACTION action = (create ? HASH_ENTER : HASH_FIND); - - key.subid = subid; - key.subrelid = subrelid; - subwentry = (PgStat_StatSubWorkerEntry *) hash_search(dbentry->subworkers, - (void *) &key, - action, &found); - - if (!create && !found) - return NULL; - - /* If not found, initialize the new one */ - if (!found) - { - subwentry->last_error_relid = InvalidOid; - subwentry->last_error_command = 0; - subwentry->last_error_xid = InvalidTransactionId; - subwentry->last_error_count = 0; - subwentry->last_error_time = 0; - subwentry->last_error_message[0] = '\0'; - } - - return subwentry; -} - /* ---------- * pgstat_write_statsfiles() - * Write the global statistics file, as well as requested DB files. @@ -4059,8 +3950,8 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL) { /* - * Write out the table, function, and subscription-worker stats for - * this DB into the appropriate per-DB stat file, if required. + * Write out the table and function stats for this DB into the + * appropriate per-DB stat file, if required. */ if (allDbs || pgstat_db_requested(dbentry->databaseid)) { @@ -4095,6 +3986,22 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) } } + /* + * Write subscription stats struct + */ + if (subscriptionStatHash) + { + PgStat_StatSubEntry *subentry; + + hash_seq_init(&hstat, subscriptionStatHash); + while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL) + { + fputc('S', fpout); + rc = fwrite(subentry, sizeof(PgStat_StatSubEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + } + /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error @@ -4174,10 +4081,8 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) { HASH_SEQ_STATUS tstat; HASH_SEQ_STATUS fstat; - HASH_SEQ_STATUS sstat; PgStat_StatTabEntry *tabentry; PgStat_StatFuncEntry *funcentry; - PgStat_StatSubWorkerEntry *subwentry; FILE *fpout; int32 format_id; Oid dbid = dbentry->databaseid; @@ -4232,17 +4137,6 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) (void) rc; /* we'll check for error with ferror */ } - /* - * Walk through the database's subscription worker stats table. - */ - hash_seq_init(&sstat, dbentry->subworkers); - while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL) - { - fputc('S', fpout); - rc = fwrite(subwentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout); - (void) rc; /* we'll check for error with ferror */ - } - /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error @@ -4301,9 +4195,8 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) * files after reading; the in-memory status is now authoritative, and the * files would be out of date in case somebody else reads them. * - * If a 'deep' read is requested, table/function/subscription-worker stats are - * read, otherwise the table/function/subscription-worker hash tables remain - * empty. + * If a 'deep' read is requested, table/function stats are read, otherwise + * the table/function hash tables remain empty. * ---------- */ static HTAB * @@ -4482,7 +4375,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry)); dbentry->tables = NULL; dbentry->functions = NULL; - dbentry->subworkers = NULL; /* * In the collector, disregard the timestamp we read from the @@ -4494,8 +4386,8 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbentry->stats_timestamp = 0; /* - * Don't create tables/functions/subworkers hashtables for - * uninteresting databases. + * Don't create tables/functions hashtables for uninteresting + * databases. */ if (onlydb != InvalidOid) { @@ -4520,14 +4412,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); - hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); - hash_ctl.hcxt = pgStatLocalContext; - dbentry->subworkers = hash_create("Per-database subscription worker", - PGSTAT_SUBWORKER_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - /* * If requested, read the data from the database-specific * file. Otherwise we just leave the hashtables empty. @@ -4536,7 +4420,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) pgstat_read_db_statsfile(dbentry->databaseid, dbentry->tables, dbentry->functions, - dbentry->subworkers, permanent); break; @@ -4580,6 +4463,45 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; } + /* + * 'S' A PgStat_StatSubEntry struct describing subscription + * statistics. + */ + case 'S': + { + PgStat_StatSubEntry subbuf; + PgStat_StatSubEntry *subentry; + + if (fread(&subbuf, 1, sizeof(PgStat_StatSubEntry), fpin) + != sizeof(PgStat_StatSubEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + if (subscriptionStatHash == NULL) + { + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_StatSubEntry); + hash_ctl.hcxt = pgStatLocalContext; + subscriptionStatHash = hash_create("Subscription hash", + PGSTAT_SUBSCRIPTION_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } + + subentry = (PgStat_StatSubEntry *) hash_search(subscriptionStatHash, + (void *) &subbuf.subid, + HASH_ENTER, NULL); + + memcpy(subentry, &subbuf, sizeof(subbuf)); + break; + } + case 'E': goto done; @@ -4614,21 +4536,19 @@ done: * As in pgstat_read_statsfiles, if the permanent file is requested, it is * removed after reading. * - * Note: this code has the ability to skip storing per-table, per-function, or - * per-subscription-worker data, if NULL is passed for the corresponding hashtable. - * That's not used at the moment though. + * Note: this code has the ability to skip storing per-table or per-function + * data, if NULL is passed for the corresponding hashtable. That's not used + * at the moment though. * ---------- */ static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, - HTAB *subworkerhash, bool permanent) + bool permanent) { PgStat_StatTabEntry *tabentry; PgStat_StatTabEntry tabbuf; PgStat_StatFuncEntry funcbuf; PgStat_StatFuncEntry *funcentry; - PgStat_StatSubWorkerEntry subwbuf; - PgStat_StatSubWorkerEntry *subwentry; FILE *fpin; int32 format_id; bool found; @@ -4742,41 +4662,6 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, memcpy(funcentry, &funcbuf, sizeof(funcbuf)); break; - /* - * 'S' A PgStat_StatSubWorkerEntry struct describing - * subscription worker statistics. - */ - case 'S': - if (fread(&subwbuf, 1, sizeof(PgStat_StatSubWorkerEntry), - fpin) != sizeof(PgStat_StatSubWorkerEntry)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } - - /* - * Skip if subscription worker data not wanted. - */ - if (subworkerhash == NULL) - break; - - subwentry = (PgStat_StatSubWorkerEntry *) hash_search(subworkerhash, - (void *) &subwbuf.key, - HASH_ENTER, &found); - - if (found) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } - - memcpy(subwentry, &subwbuf, sizeof(subwbuf)); - break; - /* * 'E' The EOF marker of a complete stats file. */ @@ -4829,6 +4714,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_WalStats myWalStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; PgStat_StatReplSlotEntry myReplSlotStats; + PgStat_StatSubEntry mySubStats; FILE *fpin; int32 format_id; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; @@ -4959,6 +4845,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, } break; + /* + * 'S' A PgStat_StatSubEntry struct describing subscription + * statistics follows. + */ + case 'S': + if (fread(&mySubStats, 1, sizeof(PgStat_StatSubEntry), fpin) + != sizeof(PgStat_StatSubEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + FreeFile(fpin); + return false; + } + break; + case 'E': goto done; @@ -5164,6 +5066,7 @@ pgstat_clear_snapshot(void) pgStatLocalContext = NULL; pgStatDBHash = NULL; replSlotStatHash = NULL; + subscriptionStatHash = NULL; /* * Historically the backend_status.c facilities lived in this file, and @@ -5450,8 +5353,6 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len) hash_destroy(dbentry->tables); if (dbentry->functions != NULL) hash_destroy(dbentry->functions); - if (dbentry->subworkers != NULL) - hash_destroy(dbentry->subworkers); if (hash_search(pgStatDBHash, (void *) &dbid, @@ -5489,16 +5390,13 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len) hash_destroy(dbentry->tables); if (dbentry->functions != NULL) hash_destroy(dbentry->functions); - if (dbentry->subworkers != NULL) - hash_destroy(dbentry->subworkers); dbentry->tables = NULL; dbentry->functions = NULL; - dbentry->subworkers = NULL; /* * Reset database-level stats, too. This creates empty hash tables for - * tables, functions, and subscription workers. + * tables and functions. */ reset_dbentry_counters(dbentry); } @@ -5567,14 +5465,6 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len) else if (msg->m_resettype == RESET_FUNCTION) (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid), HASH_REMOVE, NULL); - else if (msg->m_resettype == RESET_SUBWORKER) - { - PgStat_StatSubWorkerKey key; - - key.subid = msg->m_objectid; - key.subrelid = msg->m_subobjectid; - (void) hash_search(dbentry->subworkers, (void *) &key, HASH_REMOVE, NULL); - } } /* ---------- @@ -5645,6 +5535,51 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, } } +/* ---------- + * pgstat_recv_resetsubcounter() - + * + * Reset some subscription statistics of the cluster. + * ---------- + */ +static void +pgstat_recv_resetsubcounter(PgStat_MsgResetsubcounter *msg, int len) +{ + PgStat_StatSubEntry *subentry; + TimestampTz ts; + + /* Return if we don't have replication subscription statistics */ + if (subscriptionStatHash == NULL) + return; + + ts = GetCurrentTimestamp(); + if (!OidIsValid(msg->m_subid)) + { + HASH_SEQ_STATUS sstat; + + /* Clear all subscription counters */ + hash_seq_init(&sstat, subscriptionStatHash); + while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&sstat)) != NULL) + pgstat_reset_subscription(subentry, ts); + } + else + { + /* Get the subscription statistics to reset */ + subentry = pgstat_get_subscription_entry(msg->m_subid, false); + + /* + * Nothing to do if the given subscription entry is not found. This + * could happen when the subscription with the subid is removed and + * the corresponding statistics entry is also removed before receiving + * the reset message. + */ + if (!subentry) + return; + + /* Reset the stats for the requested subscription */ + pgstat_reset_subscription(subentry, ts); + } +} + /* ---------- * pgstat_recv_autovac() - @@ -6118,81 +6053,42 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len) } /* ---------- - * pgstat_recv_subscription_purge() - + * pgstat_recv_subscription_drop() - * - * Process a SUBSCRIPTIONPURGE message. + * Process a SUBSCRIPTIONDROP message. * ---------- */ static void -pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len) +pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len) { - HASH_SEQ_STATUS hstat; - PgStat_StatDBEntry *dbentry; - PgStat_StatSubWorkerEntry *subwentry; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, false); - - /* No need to purge if we don't even know the database */ - if (!dbentry || !dbentry->subworkers) + /* Return if we don't have replication subscription statistics */ + if (subscriptionStatHash == NULL) return; - /* Remove all subscription worker statistics for the given subscriptions */ - hash_seq_init(&hstat, dbentry->subworkers); - while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) - { - for (int i = 0; i < msg->m_nentries; i++) - { - if (subwentry->key.subid == msg->m_subids[i]) - { - (void) hash_search(dbentry->subworkers, (void *) &(subwentry->key), - HASH_REMOVE, NULL); - break; - } - } - } + /* Remove from hashtable if present; we don't care if it's not */ + (void) hash_search(subscriptionStatHash, (void *) &(msg->m_subid), + HASH_REMOVE, NULL); } /* ---------- - * pgstat_recv_subworker_error() - + * pgstat_recv_subscription_error() - * - * Process a SUBWORKERERROR message. + * Process a SUBSCRIPTIONERROR message. * ---------- */ static void -pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len) +pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len) { - PgStat_StatDBEntry *dbentry; - PgStat_StatSubWorkerEntry *subwentry; + PgStat_StatSubEntry *subentry; - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); + /* Get the subscription stats */ + subentry = pgstat_get_subscription_entry(msg->m_subid, true); + Assert(subentry); - /* Get the subscription worker stats */ - subwentry = pgstat_get_subworker_entry(dbentry, msg->m_subid, - msg->m_subrelid, true); - Assert(subwentry); - - if (subwentry->last_error_relid == msg->m_relid && - subwentry->last_error_command == msg->m_command && - subwentry->last_error_xid == msg->m_xid && - strcmp(subwentry->last_error_message, msg->m_message) == 0) - { - /* - * The same error occurred again in succession, just update its - * timestamp and count. - */ - subwentry->last_error_count++; - subwentry->last_error_time = msg->m_timestamp; - return; - } - - /* Otherwise, update the error information */ - subwentry->last_error_relid = msg->m_relid; - subwentry->last_error_command = msg->m_command; - subwentry->last_error_xid = msg->m_xid; - subwentry->last_error_count = 1; - subwentry->last_error_time = msg->m_timestamp; - strlcpy(subwentry->last_error_message, msg->m_message, - PGSTAT_SUBWORKERERROR_MSGLEN); + if (msg->m_is_apply_error) + subentry->apply_error_count++; + else + subentry->sync_error_count++; } /* ---------- @@ -6313,6 +6209,68 @@ pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts) slotent->stat_reset_timestamp = ts; } +/* ---------- + * pgstat_get_subscription_entry + * + * Return the subscription statistics entry with the given subscription OID. + * If no subscription entry exists, initialize it, if the create parameter is + * true. Else, return NULL. + * ---------- + */ +static PgStat_StatSubEntry * +pgstat_get_subscription_entry(Oid subid, bool create) +{ + PgStat_StatSubEntry *subentry; + bool found; + HASHACTION action = (create ? HASH_ENTER : HASH_FIND); + + if (subscriptionStatHash == NULL) + { + HASHCTL hash_ctl; + + /* + * Quick return NULL if the hash table is empty and the caller didn't + * request to create the entry. + */ + if (!create) + return NULL; + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_StatSubEntry); + subscriptionStatHash = hash_create("Subscription hash", + PGSTAT_SUBSCRIPTION_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); + } + + subentry = (PgStat_StatSubEntry *) hash_search(subscriptionStatHash, + (void *) &subid, + action, &found); + + if (!create && !found) + return NULL; + + /* If not found, initialize the new one */ + if (!found) + pgstat_reset_subscription(subentry, 0); + + return subentry; +} + +/* ---------- + * pgstat_reset_subscription + * + * Reset the given subscription stats. + * ---------- + */ +static void +pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts) +{ + subentry->apply_error_count = 0; + subentry->sync_error_count = 0; + subentry->stat_reset_timestamp = ts; +} + /* * pgstat_slru_index * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5d9acc6173..7e267f7960 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3377,7 +3377,6 @@ void ApplyWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); - MemoryContext cctx = CurrentMemoryContext; MemoryContext oldctx; char originname[NAMEDATALEN]; XLogRecPtr origin_startpos; @@ -3485,20 +3484,15 @@ ApplyWorkerMain(Datum main_arg) } PG_CATCH(); { - MemoryContext ecxt = MemoryContextSwitchTo(cctx); - ErrorData *errdata = CopyErrorData(); - /* - * Report the table sync error. There is no corresponding message - * type for table synchronization. + * Abort the current transaction so that we send the stats message + * in an idle state. */ - pgstat_report_subworker_error(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - MyLogicalRepWorker->relid, - 0, /* message type */ - InvalidTransactionId, - errdata->message); - MemoryContextSwitchTo(ecxt); + AbortOutOfAnyTransaction(); + + /* Report the worker failed during table synchronization */ + pgstat_report_subscription_error(MySubscription->oid, false); + PG_RE_THROW(); } PG_END_TRY(); @@ -3625,22 +3619,14 @@ ApplyWorkerMain(Datum main_arg) } PG_CATCH(); { - /* report the apply error */ - if (apply_error_callback_arg.command != 0) - { - MemoryContext ecxt = MemoryContextSwitchTo(cctx); - ErrorData *errdata = CopyErrorData(); + /* + * Abort the current transaction so that we send the stats message in + * an idle state. + */ + AbortOutOfAnyTransaction(); - pgstat_report_subworker_error(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - apply_error_callback_arg.rel != NULL - ? apply_error_callback_arg.rel->localreloid - : InvalidOid, - apply_error_callback_arg.command, - apply_error_callback_arg.remote_xid, - errdata->message); - MemoryContextSwitchTo(ecxt); - } + /* Report the worker failed while applying changes */ + pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); PG_RE_THROW(); } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 30e8dfa7c1..fd993d0d5f 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2163,7 +2163,7 @@ pg_stat_reset_single_table_counters(PG_FUNCTION_ARGS) { Oid taboid = PG_GETARG_OID(0); - pgstat_reset_single_counter(taboid, InvalidOid, RESET_TABLE); + pgstat_reset_single_counter(taboid, RESET_TABLE); PG_RETURN_VOID(); } @@ -2173,38 +2173,11 @@ pg_stat_reset_single_function_counters(PG_FUNCTION_ARGS) { Oid funcoid = PG_GETARG_OID(0); - pgstat_reset_single_counter(funcoid, InvalidOid, RESET_FUNCTION); + pgstat_reset_single_counter(funcoid, RESET_FUNCTION); PG_RETURN_VOID(); } -Datum -pg_stat_reset_subscription_worker_subrel(PG_FUNCTION_ARGS) -{ - Oid subid = PG_GETARG_OID(0); - Oid relid = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1); - - pgstat_reset_single_counter(subid, relid, RESET_SUBWORKER); - - PG_RETURN_VOID(); -} - -/* Reset all subscription worker stats associated with the given subscription */ -Datum -pg_stat_reset_subscription_worker_sub(PG_FUNCTION_ARGS) -{ - Oid subid = PG_GETARG_OID(0); - - /* - * Use subscription drop message to remove statistics of all subscription - * workers. - */ - pgstat_report_subscription_drop(subid); - - PG_RETURN_VOID(); -} - - /* Reset SLRU counters (a specific one or all of them). */ Datum pg_stat_reset_slru(PG_FUNCTION_ARGS) @@ -2258,6 +2231,32 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* Reset subscription stats (a specific one or all of them) */ +Datum +pg_stat_reset_subscription_stats(PG_FUNCTION_ARGS) +{ + Oid subid; + + if (PG_ARGISNULL(0)) + { + /* Clear all subscription stats */ + subid = InvalidOid; + } + else + { + subid = PG_GETARG_OID(0); + + if (!OidIsValid(subid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid subscription OID %u", subid))); + } + + pgstat_reset_subscription_counter(subid); + + PG_RETURN_VOID(); +} + Datum pg_stat_get_archiver(PG_FUNCTION_ARGS) { @@ -2400,50 +2399,32 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) } /* - * Get the subscription worker statistics for the given subscription - * (and relation). + * Get the subscription statistics for the given subscription. If the + * subscription statistics is not available, return all-zeros stats. */ Datum -pg_stat_get_subscription_worker(PG_FUNCTION_ARGS) +pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 8 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 4 Oid subid = PG_GETARG_OID(0); - Oid subrelid; TupleDesc tupdesc; - Datum values[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS]; - bool nulls[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS]; - PgStat_StatSubWorkerEntry *wentry; - int i; + Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS]; + bool nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS]; + PgStat_StatSubEntry *subentry; + PgStat_StatSubEntry allzero; - if (PG_ARGISNULL(1)) - subrelid = InvalidOid; - else - subrelid = PG_GETARG_OID(1); - - /* Get subscription worker stats */ - wentry = pgstat_fetch_stat_subworker_entry(subid, subrelid); - - /* Return NULL if there is no worker statistics */ - if (wentry == NULL) - PG_RETURN_NULL(); + /* Get subscription stats */ + subentry = pgstat_fetch_stat_subscription(subid); /* Initialise attributes information in the tuple descriptor */ - tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_WORKER_COLS); + tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_STATS_COLS); TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid", OIDOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid", - OIDOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid", - OIDOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command", - TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid", - XIDOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count", + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message", - TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time", + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2451,46 +2432,27 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS) MemSet(values, 0, sizeof(values)); MemSet(nulls, 0, sizeof(nulls)); - i = 0; + if (!subentry) + { + /* If the subscription is not found, initialise its stats */ + memset(&allzero, 0, sizeof(PgStat_StatSubEntry)); + subentry = &allzero; + } + /* subid */ - values[i++] = ObjectIdGetDatum(subid); + values[0] = ObjectIdGetDatum(subid); - /* subrelid */ - if (OidIsValid(subrelid)) - values[i++] = ObjectIdGetDatum(subrelid); + /* apply_error_count */ + values[1] = Int64GetDatum(subentry->apply_error_count); + + /* sync_error_count */ + values[2] = Int64GetDatum(subentry->sync_error_count); + + /* stats_reset */ + if (subentry->stat_reset_timestamp == 0) + nulls[3] = true; else - nulls[i++] = true; - - /* last_error_relid */ - if (OidIsValid(wentry->last_error_relid)) - values[i++] = ObjectIdGetDatum(wentry->last_error_relid); - else - nulls[i++] = true; - - /* last_error_command */ - if (wentry->last_error_command != 0) - values[i++] = - CStringGetTextDatum(logicalrep_message_type(wentry->last_error_command)); - else - nulls[i++] = true; - - /* last_error_xid */ - if (TransactionIdIsValid(wentry->last_error_xid)) - values[i++] = TransactionIdGetDatum(wentry->last_error_xid); - else - nulls[i++] = true; - - /* last_error_count */ - values[i++] = Int64GetDatum(wentry->last_error_count); - - /* last_error_message */ - values[i++] = CStringGetTextDatum(wentry->last_error_message); - - /* last_error_time */ - if (wentry->last_error_time != 0) - values[i++] = TimestampTzGetDatum(wentry->last_error_time); - else - nulls[i++] = true; + values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 14194afe1c..5cf18059b8 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202202251 +#define CATALOG_VERSION_NO 202203011 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 7de8cfc7e9..bf88858171 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5376,14 +5376,14 @@ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, -{ oid => '8523', descr => 'statistics: information about subscription worker', - proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f', - proretset => 't', provolatile => 's', proparallel => 'r', - prorettype => 'record', proargtypes => 'oid oid', - proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}', - proargmodes => '{i,i,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}', - prosrc => 'pg_stat_get_subscription_worker' }, +{ oid => '8523', descr => 'statistics: information about subscription stats', + proname => 'pg_stat_get_subscription_stats', proisstrict => 'f', + provolatile => 's', proparallel => 'r', + prorettype => 'record', proargtypes => 'oid', + proallargtypes => '{oid,oid,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}', + prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', @@ -5772,15 +5772,10 @@ provolatile => 'v', prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_replication_slot' }, { oid => '8524', - descr => 'statistics: reset collected statistics for a single subscription worker', - proname => 'pg_stat_reset_subscription_worker', proisstrict => 'f', - provolatile => 'v', prorettype => 'void', proargtypes => 'oid oid', - prosrc => 'pg_stat_reset_subscription_worker_subrel' }, -{ oid => '8525', - descr => 'statistics: reset all collected statistics for a single subscription', - proname => 'pg_stat_reset_subscription_worker', + descr => 'statistics: reset collected statistics for a single subscription', + proname => 'pg_stat_reset_subscription_stats', proisstrict => 'f', provolatile => 'v', prorettype => 'void', proargtypes => 'oid', - prosrc => 'pg_stat_reset_subscription_worker_sub' }, + prosrc => 'pg_stat_reset_subscription_stats' }, { oid => '3163', descr => 'current trigger depth', proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index e10d20222a..be2f7e2bcc 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -67,6 +67,7 @@ typedef enum StatMsgType PGSTAT_MTYPE_RESETSINGLECOUNTER, PGSTAT_MTYPE_RESETSLRUCOUNTER, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER, + PGSTAT_MTYPE_RESETSUBCOUNTER, PGSTAT_MTYPE_AUTOVAC_START, PGSTAT_MTYPE_VACUUM, PGSTAT_MTYPE_ANALYZE, @@ -84,8 +85,8 @@ typedef enum StatMsgType PGSTAT_MTYPE_REPLSLOT, PGSTAT_MTYPE_CONNECT, PGSTAT_MTYPE_DISCONNECT, - PGSTAT_MTYPE_SUBSCRIPTIONPURGE, - PGSTAT_MTYPE_SUBWORKERERROR, + PGSTAT_MTYPE_SUBSCRIPTIONDROP, + PGSTAT_MTYPE_SUBSCRIPTIONERROR, } StatMsgType; /* ---------- @@ -148,8 +149,7 @@ typedef enum PgStat_Shared_Reset_Target typedef enum PgStat_Single_Reset_Type { RESET_TABLE, - RESET_FUNCTION, - RESET_SUBWORKER + RESET_FUNCTION } PgStat_Single_Reset_Type; /* ------------------------------------------------------------ @@ -368,7 +368,6 @@ typedef struct PgStat_MsgResetsinglecounter Oid m_databaseid; PgStat_Single_Reset_Type m_resettype; Oid m_objectid; - Oid m_subobjectid; } PgStat_MsgResetsinglecounter; /* ---------- @@ -394,6 +393,19 @@ typedef struct PgStat_MsgResetreplslotcounter bool clearall; } PgStat_MsgResetreplslotcounter; +/* ---------- + * PgStat_MsgResetsubcounter Sent by the backend to tell the collector + * to reset subscription counter(s) + * ---------- + */ +typedef struct PgStat_MsgResetsubcounter +{ + PgStat_MsgHdr m_hdr; + Oid m_subid; /* InvalidOid means reset all subscription + * stats */ +} PgStat_MsgResetsubcounter; + + /* ---------- * PgStat_MsgAutovacStart Sent by the autovacuum daemon to signal * that a database is going to be processed @@ -542,53 +554,28 @@ typedef struct PgStat_MsgReplSlot } PgStat_MsgReplSlot; /* ---------- - * PgStat_MsgSubscriptionPurge Sent by the backend and autovacuum to tell the - * collector about the dead subscriptions. + * PgStat_MsgSubscriptionDrop Sent by the backend and autovacuum to tell the + * collector about the dead subscription. * ---------- */ -#define PGSTAT_NUM_SUBSCRIPTIONPURGE \ - ((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid)) - -typedef struct PgStat_MsgSubscriptionPurge +typedef struct PgStat_MsgSubscriptionDrop { PgStat_MsgHdr m_hdr; - Oid m_databaseid; - int m_nentries; - Oid m_subids[PGSTAT_NUM_SUBSCRIPTIONPURGE]; -} PgStat_MsgSubscriptionPurge; + Oid m_subid; +} PgStat_MsgSubscriptionDrop; /* ---------- - * PgStat_MsgSubWorkerError Sent by the apply worker or the table sync - * worker to report the error occurred while - * processing changes. + * PgStat_MsgSubscriptionError Sent by the apply worker or the table sync + * worker to report an error on the subscription. * ---------- */ -#define PGSTAT_SUBWORKERERROR_MSGLEN 256 -typedef struct PgStat_MsgSubWorkerError +typedef struct PgStat_MsgSubscriptionError { PgStat_MsgHdr m_hdr; - /* - * m_subid and m_subrelid are used to determine the subscription and the - * reporter of the error. m_subrelid is InvalidOid if reported by an apply - * worker otherwise reported by a table sync worker. - */ - Oid m_databaseid; Oid m_subid; - Oid m_subrelid; - - /* - * Oid of the table that the reporter was actually processing. m_relid can - * be InvalidOid if an error occurred during worker applying a - * non-data-modification message such as RELATION. - */ - Oid m_relid; - - LogicalRepMsgType m_command; - TransactionId m_xid; - TimestampTz m_timestamp; - char m_message[PGSTAT_SUBWORKERERROR_MSGLEN]; -} PgStat_MsgSubWorkerError; + bool m_is_apply_error; +} PgStat_MsgSubscriptionError; /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict @@ -750,6 +737,7 @@ typedef union PgStat_Msg PgStat_MsgResetsinglecounter msg_resetsinglecounter; PgStat_MsgResetslrucounter msg_resetslrucounter; PgStat_MsgResetreplslotcounter msg_resetreplslotcounter; + PgStat_MsgResetsubcounter msg_resetsubcounter; PgStat_MsgAutovacStart msg_autovacuum_start; PgStat_MsgVacuum msg_vacuum; PgStat_MsgAnalyze msg_analyze; @@ -767,8 +755,8 @@ typedef union PgStat_Msg PgStat_MsgReplSlot msg_replslot; PgStat_MsgConnect msg_connect; PgStat_MsgDisconnect msg_disconnect; - PgStat_MsgSubscriptionPurge msg_subscriptionpurge; - PgStat_MsgSubWorkerError msg_subworkererror; + PgStat_MsgSubscriptionError msg_subscriptionerror; + PgStat_MsgSubscriptionDrop msg_subscriptiondrop; } PgStat_Msg; @@ -780,7 +768,7 @@ typedef union PgStat_Msg * ------------------------------------------------------------ */ -#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA5 +#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA6 /* ---------- * PgStat_StatDBEntry The collector's data per database @@ -823,16 +811,11 @@ typedef struct PgStat_StatDBEntry TimestampTz stats_timestamp; /* time of db stats file update */ /* - * tables, functions, and subscription workers must be last in the struct, - * because we don't write the pointers out to the stats file. - * - * subworkers is the hash table of PgStat_StatSubWorkerEntry which stores - * statistics of logical replication workers: apply worker and table sync - * worker. + * tables and functions must be last in the struct, because we don't write + * the pointers out to the stats file. */ HTAB *tables; HTAB *functions; - HTAB *subworkers; } PgStat_StatDBEntry; @@ -989,38 +972,17 @@ typedef struct PgStat_StatReplSlotEntry TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; -/* The lookup key for subscription worker hash table */ -typedef struct PgStat_StatSubWorkerKey -{ - Oid subid; - - /* - * Oid of the table for which tablesync worker will copy the initial data. - * An InvalidOid will be assigned for apply workers. - */ - Oid subrelid; -} PgStat_StatSubWorkerKey; - /* - * Logical replication apply worker and table sync worker statistics kept in the - * stats collector. + * Subscription statistics kept in the stats collector. */ -typedef struct PgStat_StatSubWorkerEntry +typedef struct PgStat_StatSubEntry { - PgStat_StatSubWorkerKey key; /* hash key (must be first) */ + Oid subid; /* hash key (must be first) */ - /* - * Subscription worker error statistics representing an error that - * occurred during application of changes or the initial table - * synchronization. - */ - Oid last_error_relid; - LogicalRepMsgType last_error_command; - TransactionId last_error_xid; - PgStat_Counter last_error_count; - TimestampTz last_error_time; - char last_error_message[PGSTAT_SUBWORKERERROR_MSGLEN]; -} PgStat_StatSubWorkerEntry; + PgStat_Counter apply_error_count; + PgStat_Counter sync_error_count; + TimestampTz stat_reset_timestamp; +} PgStat_StatSubEntry; /* * Working state needed to accumulate per-function-call timing statistics. @@ -1111,10 +1073,10 @@ extern void pgstat_drop_database(Oid databaseid); extern void pgstat_clear_snapshot(void); extern void pgstat_reset_counters(void); extern void pgstat_reset_shared_counters(const char *); -extern void pgstat_reset_single_counter(Oid objectid, Oid subobjectid, - PgStat_Single_Reset_Type type); +extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type); extern void pgstat_reset_slru_counter(const char *); extern void pgstat_reset_replslot_counter(const char *name); +extern void pgstat_reset_subscription_counter(Oid subid); extern void pgstat_report_connect(Oid dboid); extern void pgstat_report_autovac(Oid dboid); @@ -1131,9 +1093,7 @@ extern void pgstat_report_checksum_failure(void); extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat); extern void pgstat_report_replslot_create(const char *slotname); extern void pgstat_report_replslot_drop(const char *slotname); -extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, - LogicalRepMsgType command, - TransactionId xid, const char *errmsg); +extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error); extern void pgstat_report_subscription_drop(Oid subid); extern void pgstat_initialize(void); @@ -1226,8 +1186,7 @@ extern void pgstat_send_wal(bool force); extern PgStat_StatDBEntry *pgstat_fetch_stat_dbentry(Oid dbid); extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid); extern PgStat_StatFuncEntry *pgstat_fetch_stat_funcentry(Oid funcid); -extern PgStat_StatSubWorkerEntry *pgstat_fetch_stat_subworker_entry(Oid subid, - Oid subrelid); +extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid); extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void); extern PgStat_BgWriterStats *pgstat_fetch_stat_bgwriter(void); extern PgStat_CheckpointerStats *pgstat_fetch_stat_checkpointer(void); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 1420288d67..ac468568a1 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2072,24 +2072,13 @@ pg_stat_subscription| SELECT su.oid AS subid, st.latest_end_time FROM (pg_subscription su LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); -pg_stat_subscription_workers| SELECT w.subid, +pg_stat_subscription_stats| SELECT ss.subid, s.subname, - w.subrelid, - w.last_error_relid, - w.last_error_command, - w.last_error_xid, - w.last_error_count, - w.last_error_message, - w.last_error_time - FROM ( SELECT pg_subscription.oid AS subid, - NULL::oid AS relid - FROM pg_subscription - UNION ALL - SELECT pg_subscription_rel.srsubid AS subid, - pg_subscription_rel.srrelid AS relid - FROM pg_subscription_rel) sr, - (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time) - JOIN pg_subscription s ON ((w.subid = s.oid))); + ss.apply_error_count, + ss.sync_error_count, + ss.stats_reset + FROM pg_subscription s, + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset); pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl new file mode 100644 index 0000000000..a42ea3170e --- /dev/null +++ b/src/test/subscription/t/026_stats.pl @@ -0,0 +1,102 @@ + +# Copyright (c) 2021-2022, PostgreSQL Global Development Group + +# Tests for subscription stats. +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Create publisher node. +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node. +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Initial table setup on both publisher and subscriber. On subscriber we +# create the same tables but with primary keys. Also, insert some data that +# will conflict with the data replicated from publisher later. +$node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +CREATE TABLE test_tab1 (a int); +INSERT INTO test_tab1 VALUES (1); +COMMIT; +]); +$node_subscriber->safe_psql( + 'postgres', + qq[ +BEGIN; +CREATE TABLE test_tab1 (a int primary key); +INSERT INTO test_tab1 VALUES (1); +COMMIT; +]); + +# Setup publication. +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab1;"); + +# There shouldn't be any subscription errors before starting logical replication. +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_stats"); +is($result, qq(0), 'check no subscription error'); + +# Create subscription. The tablesync for test_tab1 on tap_sub will enter into +# infinite error loop due to violating the unique constraint. +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub;" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Wait for the tablesync error to be reported. +$node_subscriber->poll_query_until( + 'postgres', + qq[ +SELECT sync_error_count > 0 +FROM pg_stat_subscription_stats +WHERE subname = 'tap_sub' +]) or die "Timed out while waiting for tablesync error"; + +# Truncate test_tab1 so that tablesync worker can continue. +$node_subscriber->safe_psql('postgres', "TRUNCATE test_tab1;"); + +# Wait for initial tablesync for test_tab1 to finish. +$node_subscriber->poll_query_until( + 'postgres', + qq[ +SELECT count(1) = 1 FROM pg_subscription_rel +WHERE srrelid = 'test_tab1'::regclass AND srsubstate in ('r', 's') +]) or die "Timed out while waiting for subscriber to synchronize data"; + +# Check test_tab1 on the subscriber has one row. +$result = $node_subscriber->safe_psql('postgres', "SELECT a FROM test_tab1"); +is($result, qq(1), 'check the table has now row'); + +# Insert data to test_tab1 on the publisher, raising an error on the subscriber +# due to violation of the unique constraint on test_tab1. +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab1 VALUES (1)"); + +# Wait for the apply error to be reported. +$node_subscriber->poll_query_until( + 'postgres', + qq[ +SELECT apply_error_count > 0 +FROM pg_stat_subscription_stats +WHERE subname = 'tap_sub' +]) or die "Timed out while waiting for apply error"; + +# Truncate test_tab1 so that apply worker can continue. +$node_subscriber->safe_psql('postgres', "TRUNCATE test_tab1;"); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); + +done_testing(); diff --git a/src/test/subscription/t/026_worker_stats.pl b/src/test/subscription/t/026_worker_stats.pl deleted file mode 100644 index f72e4766e8..0000000000 --- a/src/test/subscription/t/026_worker_stats.pl +++ /dev/null @@ -1,165 +0,0 @@ - -# Copyright (c) 2021-2022, PostgreSQL Global Development Group - -# Tests for subscription error stats. -use strict; -use warnings; -use PostgreSQL::Test::Cluster; -use PostgreSQL::Test::Utils; -use Test::More; - -# Test if the error reported on pg_stat_subscription_workers view is expected. -sub test_subscription_error -{ - my ($node, $relname, $command, $xid, $by_apply_worker, $errmsg_prefix, $msg) - = @_; - - my $check_sql = qq[ -SELECT count(1) > 0 -FROM pg_stat_subscription_workers -WHERE last_error_relid = '$relname'::regclass - AND starts_with(last_error_message, '$errmsg_prefix')]; - - # subrelid - $check_sql .= $by_apply_worker - ? qq[ AND subrelid IS NULL] - : qq[ AND subrelid = '$relname'::regclass]; - - # last_error_command - $check_sql .= $command eq '' - ? qq[ AND last_error_command IS NULL] - : qq[ AND last_error_command = '$command']; - - # last_error_xid - $check_sql .= $xid eq '' - ? qq[ AND last_error_xid IS NULL] - : qq[ AND last_error_xid = '$xid'::xid]; - - # Wait for the particular error statistics to be reported. - $node->poll_query_until('postgres', $check_sql, -) or die "Timed out while waiting for " . $msg; -} - -# Create publisher node. -my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); -$node_publisher->init(allows_streaming => 'logical'); -$node_publisher->start; - -# Create subscriber node. -my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); -$node_subscriber->init(allows_streaming => 'logical'); - -# The subscriber will enter an infinite error loop, so we don't want -# to overflow the server log with error messages. -$node_subscriber->append_conf('postgresql.conf', - qq[ -wal_retrieve_retry_interval = 2s -]); -$node_subscriber->start; - -# Initial table setup on both publisher and subscriber. On subscriber we -# create the same tables but with primary keys. Also, insert some data that -# will conflict with the data replicated from publisher later. -$node_publisher->safe_psql( - 'postgres', - qq[ -BEGIN; -CREATE TABLE test_tab1 (a int); -CREATE TABLE test_tab2 (a int); -INSERT INTO test_tab1 VALUES (1); -INSERT INTO test_tab2 VALUES (1); -COMMIT; -]); -$node_subscriber->safe_psql( - 'postgres', - qq[ -BEGIN; -CREATE TABLE test_tab1 (a int primary key); -CREATE TABLE test_tab2 (a int primary key); -INSERT INTO test_tab2 VALUES (1); -COMMIT; -]); - -# Setup publications. -my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; -$node_publisher->safe_psql( - 'postgres', - "CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2;"); - -# There shouldn't be any subscription errors before starting logical replication. -my $result = $node_subscriber->safe_psql( - 'postgres', - "SELECT count(1) FROM pg_stat_subscription_workers"); -is($result, qq(0), 'check no subscription error'); - -# Create subscription. The table sync for test_tab2 on tap_sub will enter into -# infinite error loop due to violating the unique constraint. -$node_subscriber->safe_psql( - 'postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub;"); - -$node_publisher->wait_for_catchup('tap_sub'); - -# Wait for initial table sync for test_tab1 to finish. -$node_subscriber->poll_query_until( - 'postgres', - qq[ -SELECT count(1) = 1 FROM pg_subscription_rel -WHERE srrelid = 'test_tab1'::regclass AND srsubstate in ('r', 's') -]) or die "Timed out while waiting for subscriber to synchronize data"; - -# Check the initial data. -$result = $node_subscriber->safe_psql( - 'postgres', - "SELECT count(a) FROM test_tab1"); -is($result, q(1), 'check initial data are copied to subscriber'); - -# Insert more data to test_tab1, raising an error on the subscriber due to -# violation of the unique constraint on test_tab1. -my $xid = $node_publisher->safe_psql( - 'postgres', - qq[ -BEGIN; -INSERT INTO test_tab1 VALUES (1); -SELECT pg_current_xact_id()::xid; -COMMIT; -]); -test_subscription_error($node_subscriber, 'test_tab1', 'INSERT', $xid, - 1, # check apply worker error - qq(duplicate key value violates unique constraint), - 'error reported by the apply worker'); - -# Check the table sync worker's error in the view. -test_subscription_error($node_subscriber, 'test_tab2', '', '', - 0, # check tablesync worker error - qq(duplicate key value violates unique constraint), - 'the error reported by the table sync worker'); - -# Test for resetting subscription worker statistics. -# Truncate test_tab1 and test_tab2 so that applying changes and table sync can -# continue, respectively. -$node_subscriber->safe_psql( - 'postgres', - "TRUNCATE test_tab1, test_tab2;"); - -# Wait for the data to be replicated. -$node_subscriber->poll_query_until( - 'postgres', - "SELECT count(1) > 0 FROM test_tab1"); -$node_subscriber->poll_query_until( - 'postgres', - "SELECT count(1) > 0 FROM test_tab2"); - -# There shouldn't be any errors in the view after dropping the subscription. -$node_subscriber->safe_psql( - 'postgres', - "DROP SUBSCRIPTION tap_sub;"); -$result = $node_subscriber->safe_psql( - 'postgres', - "SELECT count(1) FROM pg_stat_subscription_workers"); -is($result, q(0), 'no error after dropping subscription'); - -$node_subscriber->stop('fast'); -$node_publisher->stop('fast'); - -done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c6b302c7b2..d9b83f744f 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1943,9 +1943,10 @@ PgStat_MsgResetreplslotcounter PgStat_MsgResetsharedcounter PgStat_MsgResetsinglecounter PgStat_MsgResetslrucounter +PgStat_MsgResetsubcounter PgStat_MsgSLRU -PgStat_MsgSubscriptionPurge -PgStat_MsgSubWorkerError +PgStat_MsgSubscriptionDrop +PgStat_MsgSubscriptionError PgStat_MsgTabpurge PgStat_MsgTabstat PgStat_MsgTempFile @@ -1957,8 +1958,7 @@ PgStat_Single_Reset_Type PgStat_StatDBEntry PgStat_StatFuncEntry PgStat_StatReplSlotEntry -PgStat_StatSubWorkerEntry -PgStat_StatSubWorkerKey +PgStat_StatSubEntry PgStat_StatTabEntry PgStat_SubXactStatus PgStat_TableCounts