From 9de77b5453130242654ff0b30a551c9c862ed661 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Sat, 18 Jul 2020 12:44:51 -0400 Subject: [PATCH] Allow logical replication to transfer data in binary format. This patch adds a "binary" option to CREATE/ALTER SUBSCRIPTION. When that's set, the publisher will send data using the data type's typsend function if any, rather than typoutput. This is generally faster, if slightly less robust. As committed, we won't try to transfer user-defined array or composite types in binary, for fear that type OIDs won't match at the subscriber. This might be changed later, but it seems like fit material for a follow-on patch. Dave Cramer, reviewed by Daniel Gustafsson, Petr Jelinek, and others; adjusted some by me Discussion: https://postgr.es/m/CADK3HH+R3xMn=8t3Ct+uD+qJ1KD=Hbif5NFMJ+d5DkoCzp6Vgw@mail.gmail.com --- doc/src/sgml/catalogs.sgml | 26 +- doc/src/sgml/ref/alter_subscription.sgml | 6 +- doc/src/sgml/ref/create_subscription.sgml | 26 +- src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 100 ++++++-- .../libpqwalreceiver/libpqwalreceiver.c | 4 + src/backend/replication/logical/proto.c | 125 +++++++--- src/backend/replication/logical/worker.c | 225 ++++++++++-------- src/backend/replication/pgoutput/pgoutput.c | 28 ++- src/bin/pg_dump/pg_dump.c | 33 ++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_subscription.h | 5 + src/include/replication/logicalproto.h | 21 +- src/include/replication/pgoutput.h | 4 +- src/include/replication/walreceiver.h | 1 + src/test/regress/expected/subscription.out | 47 +++- src/test/regress/sql/subscription.sql | 15 ++ src/test/subscription/t/014_binary.pl | 134 +++++++++++ 21 files changed, 606 insertions(+), 208 deletions(-) create mode 100644 src/test/subscription/t/014_binary.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index e9cdff4864..18ab3d434c 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7472,7 +7472,7 @@ SCRAM-SHA-256$<iteration count>:&l (references pg_database.oid) - OID of the database which the subscription resides in + OID of the database that the subscription resides in @@ -7500,7 +7500,17 @@ SCRAM-SHA-256$<iteration count>:&l subenabled bool - If true, the subscription is enabled and should be replicating. + If true, the subscription is enabled and should be replicating + + + + + + subbinary bool + + + If true, the subscription will request that the publisher send data + in binary format @@ -7518,8 +7528,8 @@ SCRAM-SHA-256$<iteration count>:&l subslotname name - Name of the replication slot in the upstream database. Also used - for local replication origin name. + Name of the replication slot in the upstream database (also used + for the local replication origin name) @@ -7528,8 +7538,8 @@ SCRAM-SHA-256$<iteration count>:&l subsynccommit text - Contains the value of the synchronous_commit - setting for the subscription workers. + The synchronous_commit + setting for the subscription's workers to use @@ -7538,8 +7548,8 @@ SCRAM-SHA-256$<iteration count>:&l subpublications text[] - Array of subscribed publication names. These reference the - publications on the publisher server. For more on publications + Array of subscribed publication names. These reference + publications defined in the upstream database. For more on publications see . diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index c24ace14d1..81c4e70cdf 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -163,8 +163,10 @@ ALTER SUBSCRIPTION name RENAME TO < This clause alters parameters originally set by . See there for more - information. The allowed options are slot_name and - synchronous_commit + information. The parameters that can be altered + are slot_name, + synchronous_commit, and + binary. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 5bbc165f70..cdb22c54fe 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -152,8 +152,9 @@ CREATE SUBSCRIPTION subscription_name The value of this parameter overrides the - setting. The default - value is off. + setting within this + subscription's apply worker processes. The default value + is off. @@ -178,6 +179,27 @@ CREATE SUBSCRIPTION subscription_name + + binary (boolean) + + + Specifies whether the subscription will request the publisher to + send the data in binary format (as opposed to text). + The default is false. + Even when this option is enabled, only data types that have + binary send and receive functions will be transferred in binary. + + + + When doing cross-version replication, it could happen that the + publisher has a binary send function for some data type, but the + subscriber lacks a binary receive function for the type. In + such a case, data transfer will fail, and + the binary option cannot be used. + + + + connect (boolean) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index cb15731115..e6afb3203e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -65,6 +65,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; sub->enabled = subform->subenabled; + sub->binary = subform->subbinary; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5ecd2e986b..8625cbeab6 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1122,7 +1122,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications) +GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9ebb026187..40b6377a85 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -55,11 +55,15 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); * accommodate that. */ static void -parse_subscription_options(List *options, bool *connect, bool *enabled_given, - bool *enabled, bool *create_slot, +parse_subscription_options(List *options, + bool *connect, + bool *enabled_given, bool *enabled, + bool *create_slot, bool *slot_name_given, char **slot_name, - bool *copy_data, char **synchronous_commit, - bool *refresh) + bool *copy_data, + char **synchronous_commit, + bool *refresh, + bool *binary_given, bool *binary) { ListCell *lc; bool connect_given = false; @@ -90,6 +94,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *synchronous_commit = NULL; if (refresh) *refresh = true; + if (binary) + { + *binary_given = false; + *binary = false; + } /* Parse options */ foreach(lc, options) @@ -175,6 +184,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, refresh_given = true; *refresh = defGetBoolean(defel); } + else if (strcmp(defel->defname, "binary") == 0 && binary) + { + if (*binary_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *binary_given = true; + *binary = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -322,6 +341,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char *conninfo; char *slotname; bool slotname_given; + bool binary; + bool binary_given; char originname[NAMEDATALEN]; bool create_slot; List *publications; @@ -331,10 +352,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * * Connection and publication should not be specified here. */ - parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname_given, - &slotname, ©_data, &synchronous_commit, - NULL); + parse_subscription_options(stmt->options, + &connect, + &enabled_given, &enabled, + &create_slot, + &slotname_given, &slotname, + ©_data, + &synchronous_commit, + NULL, /* no "refresh" */ + &binary_given, &binary); /* * Since creating a replication slot is not transactional, rolling back @@ -400,6 +426,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -669,10 +696,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *slotname; bool slotname_given; char *synchronous_commit; + bool binary_given; + bool binary; - parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slotname_given, &slotname, - NULL, &synchronous_commit, NULL); + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + &slotname_given, &slotname, + NULL, /* no "copy_data" */ + &synchronous_commit, + NULL, /* no "refresh" */ + &binary_given, &binary); if (slotname_given) { @@ -697,6 +732,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subsynccommit - 1] = true; } + if (binary_given) + { + values[Anum_pg_subscription_subbinary - 1] = + BoolGetDatum(binary); + replaces[Anum_pg_subscription_subbinary - 1] = true; + } + update_tuple = true; break; } @@ -706,9 +748,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) bool enabled, enabled_given; - parse_subscription_options(stmt->options, NULL, - &enabled_given, &enabled, NULL, - NULL, NULL, NULL, NULL, NULL); + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + &enabled_given, &enabled, + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + NULL, /* no "copy_data" */ + NULL, /* no "synchronous_commit" */ + NULL, /* no "refresh" */ + NULL, NULL); /* no "binary" */ Assert(enabled_given); if (!sub->slotname && enabled) @@ -744,9 +792,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) bool copy_data; bool refresh; - parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, NULL, ©_data, - NULL, &refresh); + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + ©_data, + NULL, /* no "synchronous_commit" */ + &refresh, + NULL, NULL); /* no "binary" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -781,9 +835,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); - parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, NULL, ©_data, - NULL, NULL); + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + ©_data, + NULL, /* no "synchronous_commit" */ + NULL, /* no "refresh" */ + NULL, NULL); /* no "binary" */ AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e4fd1f9bb6..e9057230e4 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -424,6 +424,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQfreemem(pubnames_literal); pfree(pubnames_str); + if (options->proto.logical.binary && + PQserverVersion(conn->streamConn) >= 140000) + appendStringInfoString(&cmd, ", binary 'true'"); + appendStringInfoChar(&cmd, ')'); } else diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 3c6d0cd171..2b1356ee24 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -17,7 +17,6 @@ #include "catalog/pg_type.h" #include "libpq/pqformat.h" #include "replication/logicalproto.h" -#include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -31,7 +30,7 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple); + HeapTuple tuple, bool binary); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -139,7 +138,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) * Write INSERT to the output stream. */ void -logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) +logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'I'); /* action INSERT */ @@ -147,7 +146,7 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple); + logicalrep_write_tuple(out, rel, newtuple, binary); } /* @@ -179,7 +178,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple) + HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'U'); /* action UPDATE */ @@ -196,11 +195,11 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple); + logicalrep_write_tuple(out, rel, oldtuple, binary); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple); + logicalrep_write_tuple(out, rel, newtuple, binary); } /* @@ -248,7 +247,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, * Write DELETE to the output stream. */ void -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) +logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -264,7 +263,7 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple); + logicalrep_write_tuple(out, rel, oldtuple, binary); } /* @@ -437,7 +436,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) { TupleDesc desc; Datum values[MaxTupleAttributeNumber]; @@ -474,12 +473,18 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) if (isnull[i]) { - pq_sendbyte(out, 'n'); /* null column */ + pq_sendbyte(out, LOGICALREP_COLUMN_NULL); continue; } - else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i])) + + if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i])) { - pq_sendbyte(out, 'u'); /* unchanged toast column */ + /* + * Unchanged toasted datum. (Note that we don't promise to detect + * unchanged data in general; this is just a cheap check to avoid + * sending large values unnecessarily.) + */ + pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED); continue; } @@ -488,20 +493,48 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) elog(ERROR, "cache lookup failed for type %u", att->atttypid); typclass = (Form_pg_type) GETSTRUCT(typtup); - pq_sendbyte(out, 't'); /* 'text' data follows */ + /* + * Choose whether to send in binary. Obviously, the option must be + * requested and the type must have a send function. Also, if the + * type is not built-in then it must not be a composite or array type. + * Such types contain type OIDs, which will likely not match at the + * receiver if it's not a built-in type. + * + * XXX this could be relaxed if we changed record_recv and array_recv + * to be less picky. + * + * XXX this fails to apply the restriction to domains over such types. + */ + if (binary && + OidIsValid(typclass->typsend) && + (att->atttypid < FirstGenbkiObjectId || + (typclass->typtype != TYPTYPE_COMPOSITE && + typclass->typelem == InvalidOid))) + { + bytea *outputbytes; + int len; - outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); - pq_sendcountedtext(out, outputstr, strlen(outputstr), false); - pfree(outputstr); + pq_sendbyte(out, LOGICALREP_COLUMN_BINARY); + outputbytes = OidSendFunctionCall(typclass->typsend, values[i]); + len = VARSIZE(outputbytes) - VARHDRSZ; + pq_sendint(out, len, 4); /* length */ + pq_sendbytes(out, VARDATA(outputbytes), len); /* data */ + pfree(outputbytes); + } + else + { + pq_sendbyte(out, LOGICALREP_COLUMN_TEXT); + outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); + pq_sendcountedtext(out, outputstr, strlen(outputstr), false); + pfree(outputstr); + } ReleaseSysCache(typtup); } } /* - * Read tuple in remote format from stream. - * - * The returned tuple points into the input stringinfo. + * Read tuple in logical replication format from stream. */ static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) @@ -512,38 +545,52 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) /* Get number of attributes */ natts = pq_getmsgint(in, 2); - memset(tuple->changed, 0, sizeof(tuple->changed)); + /* Allocate space for per-column values; zero out unused StringInfoDatas */ + tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData)); + tuple->colstatus = (char *) palloc(natts * sizeof(char)); /* Read the data */ for (i = 0; i < natts; i++) { char kind; + int len; + StringInfo value = &tuple->colvalues[i]; kind = pq_getmsgbyte(in); + tuple->colstatus[i] = kind; switch (kind) { - case 'n': /* null */ - tuple->values[i] = NULL; - tuple->changed[i] = true; + case LOGICALREP_COLUMN_NULL: + /* nothing more to do */ break; - case 'u': /* unchanged column */ + case LOGICALREP_COLUMN_UNCHANGED: /* we don't receive the value of an unchanged column */ - tuple->values[i] = NULL; break; - case 't': /* text formatted value */ - { - int len; + case LOGICALREP_COLUMN_TEXT: + len = pq_getmsgint(in, 4); /* read length */ - tuple->changed[i] = true; + /* and data */ + value->data = palloc(len + 1); + pq_copymsgbytes(in, value->data, len); + value->data[len] = '\0'; + /* make StringInfo fully valid */ + value->len = len; + value->cursor = 0; + value->maxlen = len; + break; + case LOGICALREP_COLUMN_BINARY: + len = pq_getmsgint(in, 4); /* read length */ - len = pq_getmsgint(in, 4); /* read length */ - - /* and data */ - tuple->values[i] = palloc(len + 1); - pq_copymsgbytes(in, tuple->values[i], len); - tuple->values[i][len] = '\0'; - } + /* and data */ + value->data = palloc(len + 1); + pq_copymsgbytes(in, value->data, len); + /* not strictly necessary but per StringInfo practice */ + value->data[len] = '\0'; + /* make StringInfo fully valid */ + value->len = len; + value->cursor = 0; + value->maxlen = len; break; default: elog(ERROR, "unrecognized data representation type '%c'", kind); @@ -552,7 +599,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) } /* - * Write relation attributes to the stream. + * Write relation attribute metadata to the stream. */ static void logicalrep_write_attrs(StringInfo out, Relation rel) @@ -611,7 +658,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel) } /* - * Read relation attribute names from the stream. + * Read relation attribute metadata from the stream. */ static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f90a896fc3..407eee3c0b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -319,13 +319,13 @@ slot_store_error_callback(void *arg) } /* - * Store data in C string form into slot. - * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our - * use better. + * Store tuple data into slot. + * + * Incoming data can be either text or binary format. */ static void -slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, - char **values) +slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -343,27 +343,65 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, errcallback.previous = error_context_stack; error_context_stack = &errcallback; - /* Call the "in" function for each non-dropped attribute */ + /* Call the "in" function for each non-dropped, non-null attribute */ Assert(natts == rel->attrmap->maplen); for (i = 0; i < natts; i++) { Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); int remoteattnum = rel->attrmap->attnums[i]; - if (!att->attisdropped && remoteattnum >= 0 && - values[remoteattnum] != NULL) + if (!att->attisdropped && remoteattnum >= 0) { - Oid typinput; - Oid typioparam; + StringInfo colvalue = &tupleData->colvalues[remoteattnum]; errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); - slot->tts_isnull[i] = false; + if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, colvalue->data, + typioparam, att->atttypmod); + slot->tts_isnull[i] = false; + } + else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) + { + Oid typreceive; + Oid typioparam; + + /* + * In some code paths we may be asked to re-parse the same + * tuple data. Reset the StringInfo's cursor so that works. + */ + colvalue->cursor = 0; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, colvalue, + typioparam, att->atttypmod); + + /* Trouble if it didn't eat the whole buffer */ + if (colvalue->cursor != colvalue->len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format in logical replication column %d", + remoteattnum + 1))); + slot->tts_isnull[i] = false; + } + else + { + /* + * NULL value from remote. (We don't expect to see + * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as + * NULL.) + */ + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } errarg.local_attnum = -1; errarg.remote_attnum = -1; @@ -371,8 +409,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, else { /* - * We assign NULL to dropped attributes, NULL values, and missing - * values (missing values should be later filled using + * We assign NULL to dropped attributes and missing values + * (missing values should be later filled using * slot_fill_defaults). */ slot->tts_values[i] = (Datum) 0; @@ -387,20 +425,21 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, } /* - * Replace selected columns with user data provided as C strings. + * Replace updated columns with data from the LogicalRepTupleData struct. * This is somewhat similar to heap_modify_tuple but also calls the type * input functions on the user data. - * "slot" is filled with a copy of the tuple in "srcslot", with - * columns selected by the "replaces" array replaced with data values - * from "values". + * + * "slot" is filled with a copy of the tuple in "srcslot", replacing + * columns provided in "tupleData" and leaving others as-is. + * * Caution: unreplaced pass-by-ref columns in "slot" will point into the * storage for "srcslot". This is OK for current usage, but someday we may * need to materialize "slot" at the end to make it independent of "srcslot". */ static void -slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, - LogicalRepRelMapEntry *rel, - char **values, bool *replaces) +slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, + LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -438,31 +477,58 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, if (remoteattnum < 0) continue; - if (!replaces[remoteattnum]) - continue; - - if (values[remoteattnum] != NULL) + if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) { - Oid typinput; - Oid typioparam; + StringInfo colvalue = &tupleData->colvalues[remoteattnum]; errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); - slot->tts_isnull[i] = false; + if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, colvalue->data, + typioparam, att->atttypmod); + slot->tts_isnull[i] = false; + } + else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) + { + Oid typreceive; + Oid typioparam; + + /* + * In some code paths we may be asked to re-parse the same + * tuple data. Reset the StringInfo's cursor so that works. + */ + colvalue->cursor = 0; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, colvalue, + typioparam, att->atttypmod); + + /* Trouble if it didn't eat the whole buffer */ + if (colvalue->cursor != colvalue->len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format in logical replication column %d", + remoteattnum + 1))); + slot->tts_isnull[i] = false; + } + else + { + /* must be LOGICALREP_COLUMN_NULL */ + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } errarg.local_attnum = -1; errarg.remote_attnum = -1; } - else - { - slot->tts_values[i] = (Datum) 0; - slot->tts_isnull[i] = true; - } } /* Pop the error context stack */ @@ -641,7 +707,7 @@ apply_handle_insert(StringInfo s) /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, newtup.values); + slot_store_data(remoteslot, rel, &newtup); slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); @@ -765,7 +831,7 @@ apply_handle_update(StringInfo s) target_rte = list_nth(estate->es_range_table, 0); for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) { - if (newtup.changed[i]) + if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED) target_rte->updatedCols = bms_add_member(target_rte->updatedCols, i + 1 - FirstLowInvalidHeapAttributeNumber); } @@ -776,8 +842,8 @@ apply_handle_update(StringInfo s) /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, - has_oldtup ? oldtup.values : newtup.values); + slot_store_data(remoteslot, rel, + has_oldtup ? &oldtup : &newtup); MemoryContextSwitchTo(oldctx); /* For a partitioned table, apply update to correct partition. */ @@ -831,8 +897,7 @@ apply_handle_update_internal(ResultRelInfo *relinfo, { /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_cstrings(remoteslot, localslot, relmapentry, - newtup->values, newtup->changed); + slot_modify_data(remoteslot, localslot, relmapentry, newtup); MemoryContextSwitchTo(oldctx); EvalPlanQualSetSlot(&epqstate, remoteslot); @@ -900,7 +965,7 @@ apply_handle_delete(StringInfo s) /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, oldtup.values); + slot_store_data(remoteslot, rel, &oldtup); MemoryContextSwitchTo(oldctx); /* For a partitioned table, apply delete to correct partition. */ @@ -1096,9 +1161,9 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, if (found) { /* Apply the update. */ - slot_modify_cstrings(remoteslot_part, localslot, - part_entry, - newtup->values, newtup->changed); + slot_modify_data(remoteslot_part, localslot, + part_entry, + newtup); MemoryContextSwitchTo(oldctx); } else @@ -1312,8 +1377,8 @@ apply_handle_truncate(StringInfo s) } /* - * Even if we used CASCADE on the upstream primary we explicitly default to - * replaying changes without further cascading. This might be later + * Even if we used CASCADE on the upstream primary we explicitly default + * to replaying changes without further cascading. This might be later * changeable with a user specified option. */ ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); @@ -1850,60 +1915,21 @@ maybe_reread_subscription(void) proc_exit(0); } - /* - * Exit if connection string was changed. The launcher will start new - * worker. - */ - if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because the connection information was changed", - MySubscription->name))); - - proc_exit(0); - } - - /* - * Exit if subscription name was changed (it's used for - * fallback_application_name). The launcher will start new worker. - */ - if (strcmp(newsub->name, MySubscription->name) != 0) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because subscription was renamed", - MySubscription->name))); - - proc_exit(0); - } - /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); /* - * We need to make new connection to new slot if slot name has changed so - * exit here as well if that's the case. + * Exit if any parameter that affects the remote connection was changed. + * The launcher will start a new worker. */ - if (strcmp(newsub->slotname, MySubscription->slotname) != 0) + if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 || + strcmp(newsub->name, MySubscription->name) != 0 || + strcmp(newsub->slotname, MySubscription->slotname) != 0 || + newsub->binary != MySubscription->binary || + !equal(newsub->publications, MySubscription->publications)) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because the replication slot name was changed", - MySubscription->name))); - - proc_exit(0); - } - - /* - * Exit if publication list was changed. The launcher will start new - * worker. - */ - if (!equal(newsub->publications, MySubscription->publications)) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because subscription's publications were changed", + (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change", MySubscription->name))); proc_exit(0); @@ -2106,6 +2132,7 @@ ApplyWorkerMain(Datum main_arg) options.slotname = myslotname; options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; + options.proto.logical.binary = MySubscription->binary; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 15379e3118..81ef7dc4c1 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,6 +15,7 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "commands/defrem.h" #include "fmgr.h" #include "replication/logical.h" #include "replication/logicalproto.h" @@ -118,11 +119,14 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) static void parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names) + List **publication_names, bool *binary) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; + bool binary_option_given = false; + + *binary = false; foreach(lc, options) { @@ -168,6 +172,16 @@ parse_output_parameters(List *options, uint32 *protocol_version, (errcode(ERRCODE_INVALID_NAME), errmsg("invalid publication_names syntax"))); } + else if (strcmp(defel->defname, "binary") == 0) + { + if (binary_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + binary_option_given = true; + + *binary = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -202,7 +216,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Parse the params and ERROR if we see any we don't recognize */ parse_output_parameters(ctx->output_plugin_options, &data->protocol_version, - &data->publication_names); + &data->publication_names, + &data->binary); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) @@ -411,7 +426,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, tuple); + logicalrep_write_insert(ctx->out, relation, tuple, + data->binary); OutputPluginWrite(ctx, true); break; } @@ -435,7 +451,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, newtuple); + logicalrep_write_update(ctx->out, relation, oldtuple, newtuple, + data->binary); OutputPluginWrite(ctx, true); break; } @@ -455,7 +472,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, oldtuple); + logicalrep_write_delete(ctx->out, relation, oldtuple, + data->binary); OutputPluginWrite(ctx, true); } else diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 857c7c2278..94459b3539 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4205,6 +4205,7 @@ getSubscriptions(Archive *fout) int i_subslotname; int i_subsynccommit; int i_subpublications; + int i_subbinary; int i, ntups; @@ -4229,18 +4230,26 @@ getSubscriptions(Archive *fout) query = createPQExpBuffer(); - resetPQExpBuffer(query); - /* Get the subscriptions in current database. */ appendPQExpBuffer(query, - "SELECT s.tableoid, s.oid, s.subname," - "(%s s.subowner) AS rolname, " - " s.subconninfo, s.subslotname, s.subsynccommit, " - " s.subpublications " - "FROM pg_subscription s " - "WHERE s.subdbid = (SELECT oid FROM pg_database" - " WHERE datname = current_database())", + "SELECT s.tableoid, s.oid, s.subname,\n" + " (%s s.subowner) AS rolname,\n" + " s.subconninfo, s.subslotname, s.subsynccommit,\n" + " s.subpublications,\n", username_subquery); + + if (fout->remoteVersion >= 140000) + appendPQExpBuffer(query, + " s.subbinary\n"); + else + appendPQExpBuffer(query, + " false AS subbinary\n"); + + appendPQExpBuffer(query, + "FROM pg_subscription s\n" + "WHERE s.subdbid = (SELECT oid FROM pg_database\n" + " WHERE datname = current_database())"); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); ntups = PQntuples(res); @@ -4253,6 +4262,7 @@ getSubscriptions(Archive *fout) i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subpublications = PQfnumber(res, "subpublications"); + i_subbinary = PQfnumber(res, "subbinary"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4274,6 +4284,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subsynccommit)); subinfo[i].subpublications = pg_strdup(PQgetvalue(res, i, i_subpublications)); + subinfo[i].subbinary = + pg_strdup(PQgetvalue(res, i, i_subbinary)); if (strlen(subinfo[i].rolname) == 0) pg_log_warning("owner of subscription \"%s\" appears to be invalid", @@ -4342,6 +4354,9 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) else appendPQExpBufferStr(query, "NONE"); + if (strcmp(subinfo->subbinary, "t") == 0) + appendPQExpBuffer(query, ", binary = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 0c2fcfb3a9..da97b731b1 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -625,6 +625,7 @@ typedef struct _SubscriptionInfo char *rolname; char *subconninfo; char *subslotname; + char *subbinary; char *subsynccommit; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 3b870c3b17..e197dcdb4d 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5963,7 +5963,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false}; + false, false, false}; if (pset.sversion < 100000) { @@ -5989,6 +5989,12 @@ describeSubscriptions(const char *pattern, bool verbose) if (verbose) { + /* Binary mode is only supported in v14 and higher */ + if (pset.sversion >= 140000) + appendPQExpBuffer(&buf, + ", subbinary AS \"%s\"\n", + gettext_noop("Binary")); + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 6b3aa7c006..ccb586ad00 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202007131 +#define CATALOG_VERSION_NO 202007181 #endif diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0a756d42d8..a041ce9740 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ + bool subbinary; /* True if the subscription wants the + * publisher to send data in binary */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -73,6 +76,8 @@ typedef struct Subscription char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ + bool binary; /* Indicates if the subscription wants data in + * binary format */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 4860561be9..287288ab41 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -30,12 +30,19 @@ /* Tuple coming via logical replication. */ typedef struct LogicalRepTupleData { - /* column values in text format, or NULL for a null value: */ - char *values[MaxTupleAttributeNumber]; - /* markers for changed/unchanged column values: */ - bool changed[MaxTupleAttributeNumber]; + /* Array of StringInfos, one per column; some may be unused */ + StringInfoData *colvalues; + /* Array of markers for null/unchanged/text/binary, one per column */ + char *colstatus; } LogicalRepTupleData; +/* Possible values for LogicalRepTupleData.colstatus[colnum] */ +/* These values are also used in the on-the-wire protocol */ +#define LOGICALREP_COLUMN_NULL 'n' +#define LOGICALREP_COLUMN_UNCHANGED 'u' +#define LOGICALREP_COLUMN_TEXT 't' +#define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */ + typedef uint32 LogicalRepRelId; /* Relation information */ @@ -87,15 +94,15 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, Relation rel, - HeapTuple newtuple); + HeapTuple newtuple, bool binary); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple); + HeapTuple newtuple, bool binary); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, Relation rel, - HeapTuple oldtuple); + HeapTuple oldtuple, bool binary); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 2e8e9daf44..a8c676ed23 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -20,11 +20,11 @@ typedef struct PGOutputData MemoryContext context; /* private memory context for transient * allocations */ - /* client info */ + /* client-supplied info: */ uint32 protocol_version; - List *publication_names; List *publications; + bool binary; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index c75dcebea0..c2d5dbee54 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -177,6 +177,7 @@ typedef struct { uint32 proto_version; /* Logical protocol version */ List *publication_names; /* String list of publications */ + bool binary; /* Ask publisher to use binary */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index e7add9d2b8..d71db0d520 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | dbname=regress_doesnotexist2 (1 row) BEGIN; @@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -155,6 +155,29 @@ DROP SUBSCRIPTION IF EXISTS regress_testsub; NOTICE: subscription "regress_testsub" does not exist, skipping DROP SUBSCRIPTION regress_testsub; -- fail ERROR: subscription "regress_testsub" does not exist +-- fail - binary must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo); +ERROR: binary requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | dbname=regress_doesnotexist +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (binary = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist +(1 row) + +DROP SUBSCRIPTION regress_testsub; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 9e234ab8b3..eeb2ec06eb 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -117,6 +117,21 @@ COMMIT; DROP SUBSCRIPTION IF EXISTS regress_testsub; DROP SUBSCRIPTION regress_testsub; -- fail +-- fail - binary must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (binary = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); + +\dRs+ + +DROP SUBSCRIPTION regress_testsub; + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl new file mode 100644 index 0000000000..36a2f58e17 --- /dev/null +++ b/src/test/subscription/t/014_binary.pl @@ -0,0 +1,134 @@ +# Binary mode logical replication test + +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# Create and initialize a publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create and initialize subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create tables on both sides of the replication +my $ddl = qq( + CREATE TABLE public.test_numerical ( + a INTEGER PRIMARY KEY, + b NUMERIC, + c FLOAT, + d BIGINT + ); + CREATE TABLE public.test_arrays ( + a INTEGER[] PRIMARY KEY, + b NUMERIC[], + c TEXT[] + );); + +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +# Configure logical replication +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tpub FOR ALL TABLES"); + +my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' " + . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)"); + +# Ensure nodes are in sync with each other +$node_publisher->wait_for_catchup('tsub'); +$node_subscriber->poll_query_until('postgres', + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');" +) or die "Timed out while waiting for subscriber to synchronize data"; + +# Insert some content and make sure it's replicated across +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_arrays (a, b, c) VALUES + ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'), + ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}'); + + INSERT INTO public.test_numerical (a, b, c, d) VALUES + (1, 1.2, 1.3, 10), + (2, 2.2, 2.3, 20), + (3, 3.2, 3.3, 30); + )); + +$node_publisher->wait_for_catchup('tsub'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a"); + +is( $result, '1|1.2|1.3|10 +2|2.2|2.3|20 +3|3.2|3.3|30', 'check replicated data on subscriber'); + +# Test updates as well +$node_publisher->safe_psql( + 'postgres', qq( + UPDATE public.test_arrays SET b[1] = 42, c = NULL; + UPDATE public.test_numerical SET b = 42, c = NULL; + )); + +$node_publisher->wait_for_catchup('tsub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c FROM test_arrays ORDER BY a"); + +is( $result, '{1,2,3}|{42,1.2,1.3}| +{3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a"); + +is( $result, '1|42||10 +2|42||20 +3|42||30', 'check updated replicated data on subscriber'); + +# Test to reset back to text formatting, and then to binary again +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tsub SET (binary = false);"); + +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_numerical (a, b, c, d) VALUES + (4, 4.2, 4.3, 40); + )); + +$node_publisher->wait_for_catchup('tsub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a"); + +is( $result, '1|42||10 +2|42||20 +3|42||30 +4|4.2|4.3|40', 'check replicated data on subscriber'); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tsub SET (binary = true);"); + +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_arrays (a, b, c) VALUES + ('{2,3,1}', '{1.2, 1.3, 1.1}', '{"two", "three", "one"}'); + )); + +$node_publisher->wait_for_catchup('tsub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c FROM test_arrays ORDER BY a"); + +is( $result, '{1,2,3}|{42,1.2,1.3}| +{2,3,1}|{1.2,1.3,1.1}|{two,three,one} +{3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast');