From 959b38d770ba1f8f35edab27ef3ccf8b1d99f5dd Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Mon, 1 Apr 2024 16:46:24 -0400 Subject: [PATCH] Invent --transaction-size option for pg_restore. This patch allows pg_restore to wrap its commands into transaction blocks, somewhat like --single-transaction, except that we commit and start a new block after every N objects. Using this mode with a size limit of 1000 or so objects greatly reduces the number of transactions consumed by the restore, while preventing any one transaction from taking enough locks to overrun the receiving server's shared lock table. (A value of 1000 works well with the default lock table size of around 6400 locks. Higher --transaction-size values can be used if one has increased the receiving server's lock table size.) Excessive consumption of XIDs has been reported as a problem for pg_upgrade in particular, but it could be bad for any restore; and the change also reduces the number of fsyncs and amount of WAL generated, so it should provide speed benefits too. This patch does not try to make parallel workers batch the SQL commands they issue. The trouble with doing that is that other workers may need to see the objects a worker creates right away. Possibly this can be improved later. In this patch I have hard-wired pg_upgrade to use a transaction size of 1000 divided by the number of parallel restore jobs allowed (without that, we'd still be at risk of overrunning the shared lock table). Perhaps there would be value in adding another pg_upgrade option to allow user control of that, but I'm unsure that it's worth the trouble; I think few users would use it, and any who did would see not that much benefit compared to the default. Patch by me, but the original idea to batch SQL commands during restore is due to Robins Tharakan. Discussion: https://postgr.es/m/a9f9376f1c3343a6bb319dce294e20ac@EX13D05UWC001.ant.amazon.com --- doc/src/sgml/ref/pg_restore.sgml | 24 +++++ src/bin/pg_dump/pg_backup.h | 4 +- src/bin/pg_dump/pg_backup_archiver.c | 139 +++++++++++++++++++++++++-- src/bin/pg_dump/pg_backup_archiver.h | 3 + src/bin/pg_dump/pg_backup_db.c | 18 ++++ src/bin/pg_dump/pg_restore.c | 15 ++- src/bin/pg_upgrade/pg_upgrade.c | 25 +++++ 7 files changed, 220 insertions(+), 8 deletions(-) diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml index 1a23874da6..2e3ba80258 100644 --- a/doc/src/sgml/ref/pg_restore.sgml +++ b/doc/src/sgml/ref/pg_restore.sgml @@ -786,6 +786,30 @@ PostgreSQL documentation + + + + + Execute the restore as a series of transactions, each processing + up to N database + objects. This option implies . + + + offers an intermediate choice + between the default behavior (one transaction per SQL command) + and / + (one transaction for all restored objects). + While has the least + overhead, it may be impractical for large databases because the + transaction will take a lock on each restored object, possibly + exhausting the server's lock table space. + Using with a size of a few + thousand objects offers nearly the same performance benefits while + capping the amount of lock table space needed. + + + + diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index 9ef2f2017e..fbf5f1c515 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -149,7 +149,9 @@ typedef struct _restoreOptions * compression */ int suppressDumpWarnings; /* Suppress output of WARNING entries * to stderr */ - bool single_txn; + + bool single_txn; /* restore all TOCs in one transaction */ + int txn_size; /* restore this many TOCs per txn, if > 0 */ bool *idWanted; /* array showing which dump IDs to emit */ int enable_row_security; diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index d6e15e25a1..c7a6c918a6 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -502,7 +502,28 @@ RestoreArchive(Archive *AHX) /* Otherwise, drop anything that's selected and has a dropStmt */ if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt) { + bool not_allowed_in_txn = false; + pg_log_info("dropping %s %s", te->desc, te->tag); + + /* + * In --transaction-size mode, we have to temporarily exit our + * transaction block to drop objects that can't be dropped + * within a transaction. + */ + if (ropt->txn_size > 0) + { + if (strcmp(te->desc, "DATABASE") == 0 || + strcmp(te->desc, "DATABASE PROPERTIES") == 0) + { + not_allowed_in_txn = true; + if (AH->connection) + CommitTransaction(AHX); + else + ahprintf(AH, "COMMIT;\n"); + } + } + /* Select owner and schema as necessary */ _becomeOwner(AH, te); _selectOutputSchema(AH, te->namespace); @@ -628,6 +649,33 @@ RestoreArchive(Archive *AHX) } } } + + /* + * In --transaction-size mode, re-establish the transaction + * block if needed; otherwise, commit after every N drops. + */ + if (ropt->txn_size > 0) + { + if (not_allowed_in_txn) + { + if (AH->connection) + StartTransaction(AHX); + else + ahprintf(AH, "BEGIN;\n"); + AH->txnCount = 0; + } + else if (++AH->txnCount >= ropt->txn_size) + { + if (AH->connection) + { + CommitTransaction(AHX); + StartTransaction(AHX); + } + else + ahprintf(AH, "COMMIT;\nBEGIN;\n"); + AH->txnCount = 0; + } + } } } @@ -724,7 +772,11 @@ RestoreArchive(Archive *AHX) } } - if (ropt->single_txn) + /* + * Close out any persistent transaction we may have. While these two + * cases are started in different places, we can end both cases here. + */ + if (ropt->single_txn || ropt->txn_size > 0) { if (AH->connection) CommitTransaction(AHX); @@ -785,6 +837,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) */ if ((reqs & REQ_SCHEMA) != 0) { + bool object_is_db = false; + + /* + * In --transaction-size mode, must exit our transaction block to + * create a database or set its properties. + */ + if (strcmp(te->desc, "DATABASE") == 0 || + strcmp(te->desc, "DATABASE PROPERTIES") == 0) + { + object_is_db = true; + if (ropt->txn_size > 0) + { + if (AH->connection) + CommitTransaction(&AH->public); + else + ahprintf(AH, "COMMIT;\n\n"); + } + } + /* Show namespace in log message if available */ if (te->namespace) pg_log_info("creating %s \"%s.%s\"", @@ -835,10 +906,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) /* * If we created a DB, connect to it. Also, if we changed DB * properties, reconnect to ensure that relevant GUC settings are - * applied to our session. + * applied to our session. (That also restarts the transaction block + * in --transaction-size mode.) */ - if (strcmp(te->desc, "DATABASE") == 0 || - strcmp(te->desc, "DATABASE PROPERTIES") == 0) + if (object_is_db) { pg_log_info("connecting to new database \"%s\"", te->tag); _reconnectToDB(AH, te->tag); @@ -964,6 +1035,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) } } + /* + * If we emitted anything for this TOC entry, that counts as one action + * against the transaction-size limit. Commit if it's time to. + */ + if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0) + { + if (++AH->txnCount >= ropt->txn_size) + { + if (AH->connection) + { + CommitTransaction(&AH->public); + StartTransaction(&AH->public); + } + else + ahprintf(AH, "COMMIT;\nBEGIN;\n\n"); + AH->txnCount = 0; + } + } + if (AH->public.n_errors > 0 && status == WORKER_OK) status = WORKER_IGNORED_ERRORS; @@ -1310,7 +1400,12 @@ StartRestoreLOs(ArchiveHandle *AH) { RestoreOptions *ropt = AH->public.ropt; - if (!ropt->single_txn) + /* + * LOs must be restored within a transaction block, since we need the LO + * handle to stay open while we write it. Establish a transaction unless + * there's one being used globally. + */ + if (!(ropt->single_txn || ropt->txn_size > 0)) { if (AH->connection) StartTransaction(&AH->public); @@ -1329,7 +1424,7 @@ EndRestoreLOs(ArchiveHandle *AH) { RestoreOptions *ropt = AH->public.ropt; - if (!ropt->single_txn) + if (!(ropt->single_txn || ropt->txn_size > 0)) { if (AH->connection) CommitTransaction(&AH->public); @@ -3171,6 +3266,19 @@ _doSetFixedOutputState(ArchiveHandle *AH) else ahprintf(AH, "SET row_security = off;\n"); + /* + * In --transaction-size mode, we should always be in a transaction when + * we begin to restore objects. + */ + if (ropt && ropt->txn_size > 0) + { + if (AH->connection) + StartTransaction(&AH->public); + else + ahprintf(AH, "\nBEGIN;\n"); + AH->txnCount = 0; + } + ahprintf(AH, "\n"); } @@ -4043,6 +4151,14 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) } } + /* + * In --transaction-size mode, we must commit the open transaction before + * dropping the database connection. This also ensures that child workers + * can see the objects we've created so far. + */ + if (AH->public.ropt->txn_size > 0) + CommitTransaction(&AH->public); + /* * Now close parent connection in prep for parallel steps. We do this * mainly to ensure that we don't exceed the specified number of parallel @@ -4782,6 +4898,10 @@ CloneArchive(ArchiveHandle *AH) clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle)); memcpy(clone, AH, sizeof(ArchiveHandle)); + /* Likewise flat-copy the RestoreOptions, so we can alter them locally */ + clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions)); + memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions)); + /* Handle format-independent fields */ memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse)); @@ -4803,6 +4923,13 @@ CloneArchive(ArchiveHandle *AH) /* clones should not share lo_buf */ clone->lo_buf = NULL; + /* + * Clone connections disregard --transaction-size; they must commit after + * each command so that the results are immediately visible to other + * workers. + */ + clone->public.ropt->txn_size = 0; + /* * Connect our new clone object to the database, using the same connection * parameters used for the original connection. diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 14aeb29dca..d6104a7196 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -324,6 +324,9 @@ struct _archiveHandle char *currTablespace; /* current tablespace, or NULL */ char *currTableAm; /* current table access method, or NULL */ + /* in --transaction-size mode, this counts objects emitted in cur xact */ + int txnCount; + void *lo_buf; size_t lo_buf_used; size_t lo_buf_size; diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c index f9683fb0c5..a02841c405 100644 --- a/src/bin/pg_dump/pg_backup_db.c +++ b/src/bin/pg_dump/pg_backup_db.c @@ -554,6 +554,7 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te, { /* Make a writable copy of the command string */ char *buf = pg_strdup(te->defn); + RestoreOptions *ropt = AH->public.ropt; char *st; char *en; @@ -562,6 +563,23 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te, { *en++ = '\0'; ahprintf(AH, "%s%s%s;\n", cmdBegin, st, cmdEnd); + + /* In --transaction-size mode, count each command as an action */ + if (ropt && ropt->txn_size > 0) + { + if (++AH->txnCount >= ropt->txn_size) + { + if (AH->connection) + { + CommitTransaction(&AH->public); + StartTransaction(&AH->public); + } + else + ahprintf(AH, "COMMIT;\nBEGIN;\n\n"); + AH->txnCount = 0; + } + } + st = en; } ahprintf(AH, "\n"); diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index c3beacdec1..5ea78cf7cc 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -120,6 +120,7 @@ main(int argc, char **argv) {"role", required_argument, NULL, 2}, {"section", required_argument, NULL, 3}, {"strict-names", no_argument, &strict_names, 1}, + {"transaction-size", required_argument, NULL, 5}, {"use-set-session-authorization", no_argument, &use_setsessauth, 1}, {"no-comments", no_argument, &no_comments, 1}, {"no-publications", no_argument, &no_publications, 1}, @@ -289,10 +290,18 @@ main(int argc, char **argv) set_dump_section(optarg, &(opts->dumpSections)); break; - case 4: + case 4: /* filter */ read_restore_filters(optarg, opts); break; + case 5: /* transaction-size */ + if (!option_parse_int(optarg, "--transaction-size", + 1, INT_MAX, + &opts->txn_size)) + exit(1); + opts->exit_on_error = true; + break; + default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -337,6 +346,9 @@ main(int argc, char **argv) if (opts->dataOnly && opts->dropSchema) pg_fatal("options -c/--clean and -a/--data-only cannot be used together"); + if (opts->single_txn && opts->txn_size > 0) + pg_fatal("options -1/--single-transaction and --transaction-size cannot be used together"); + /* * -C is not compatible with -1, because we can't create a database inside * a transaction block. @@ -484,6 +496,7 @@ usage(const char *progname) printf(_(" --section=SECTION restore named section (pre-data, data, or post-data)\n")); printf(_(" --strict-names require table and/or schema include patterns to\n" " match at least one entity each\n")); + printf(_(" --transaction-size=N commit after every N objects\n")); printf(_(" --use-set-session-authorization\n" " use SET SESSION AUTHORIZATION commands instead of\n" " ALTER OWNER commands to set ownership\n")); diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index f6143b6bc4..af370768b6 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -51,6 +51,13 @@ #include "fe_utils/string_utils.h" #include "pg_upgrade.h" +/* + * Maximum number of pg_restore actions (TOC entries) to process within one + * transaction. At some point we might want to make this user-controllable, + * but for now a hard-wired setting will suffice. + */ +#define RESTORE_TRANSACTION_SIZE 1000 + static void set_locale_and_encoding(void); static void prepare_new_cluster(void); static void prepare_new_globals(void); @@ -562,10 +569,12 @@ create_new_objects(void) true, true, "\"%s/pg_restore\" %s %s --exit-on-error --verbose " + "--transaction-size=%d " "--dbname postgres \"%s/%s\"", new_cluster.bindir, cluster_conn_opts(&new_cluster), create_opts, + RESTORE_TRANSACTION_SIZE, log_opts.dumpdir, sql_file_name); @@ -578,6 +587,7 @@ create_new_objects(void) log_file_name[MAXPGPATH]; DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; const char *create_opts; + int txn_size; /* Skip template1 in this pass */ if (strcmp(old_db->db_name, "template1") == 0) @@ -597,13 +607,28 @@ create_new_objects(void) else create_opts = "--create"; + /* + * In parallel mode, reduce the --transaction-size of each restore job + * so that the total number of locks that could be held across all the + * jobs stays in bounds. + */ + txn_size = RESTORE_TRANSACTION_SIZE; + if (user_opts.jobs > 1) + { + txn_size /= user_opts.jobs; + /* Keep some sanity if -j is huge */ + txn_size = Max(txn_size, 10); + } + parallel_exec_prog(log_file_name, NULL, "\"%s/pg_restore\" %s %s --exit-on-error --verbose " + "--transaction-size=%d " "--dbname template1 \"%s/%s\"", new_cluster.bindir, cluster_conn_opts(&new_cluster), create_opts, + txn_size, log_opts.dumpdir, sql_file_name); }