diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index e9cdff4864..18ab3d434c 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7472,7 +7472,7 @@ SCRAM-SHA-256$<iteration count>:&l (references pg_database.oid) - OID of the database which the subscription resides in + OID of the database that the subscription resides in @@ -7500,7 +7500,17 @@ SCRAM-SHA-256$<iteration count>:&l subenabled bool - If true, the subscription is enabled and should be replicating. + If true, the subscription is enabled and should be replicating + + + + + + subbinary bool + + + If true, the subscription will request that the publisher send data + in binary format @@ -7518,8 +7528,8 @@ SCRAM-SHA-256$<iteration count>:&l subslotname name - Name of the replication slot in the upstream database. Also used - for local replication origin name. + Name of the replication slot in the upstream database (also used + for the local replication origin name) @@ -7528,8 +7538,8 @@ SCRAM-SHA-256$<iteration count>:&l subsynccommit text - Contains the value of the synchronous_commit - setting for the subscription workers. + The synchronous_commit + setting for the subscription's workers to use @@ -7538,8 +7548,8 @@ SCRAM-SHA-256$<iteration count>:&l subpublications text[] - Array of subscribed publication names. These reference the - publications on the publisher server. For more on publications + Array of subscribed publication names. These reference + publications defined in the upstream database. For more on publications see . diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index c24ace14d1..81c4e70cdf 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -163,8 +163,10 @@ ALTER SUBSCRIPTION name RENAME TO < This clause alters parameters originally set by . See there for more - information. The allowed options are slot_name and - synchronous_commit + information. The parameters that can be altered + are slot_name, + synchronous_commit, and + binary. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 5bbc165f70..cdb22c54fe 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -152,8 +152,9 @@ CREATE SUBSCRIPTION subscription_name The value of this parameter overrides the - setting. The default - value is off. + setting within this + subscription's apply worker processes. The default value + is off. @@ -178,6 +179,27 @@ CREATE SUBSCRIPTION subscription_name + + binary (boolean) + + + Specifies whether the subscription will request the publisher to + send the data in binary format (as opposed to text). + The default is false. + Even when this option is enabled, only data types that have + binary send and receive functions will be transferred in binary. + + + + When doing cross-version replication, it could happen that the + publisher has a binary send function for some data type, but the + subscriber lacks a binary receive function for the type. In + such a case, data transfer will fail, and + the binary option cannot be used. + + + + connect (boolean) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index cb15731115..e6afb3203e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -65,6 +65,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; sub->enabled = subform->subenabled; + sub->binary = subform->subbinary; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5ecd2e986b..8625cbeab6 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1122,7 +1122,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications) +GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9ebb026187..40b6377a85 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -55,11 +55,15 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); * accommodate that. */ static void -parse_subscription_options(List *options, bool *connect, bool *enabled_given, - bool *enabled, bool *create_slot, +parse_subscription_options(List *options, + bool *connect, + bool *enabled_given, bool *enabled, + bool *create_slot, bool *slot_name_given, char **slot_name, - bool *copy_data, char **synchronous_commit, - bool *refresh) + bool *copy_data, + char **synchronous_commit, + bool *refresh, + bool *binary_given, bool *binary) { ListCell *lc; bool connect_given = false; @@ -90,6 +94,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *synchronous_commit = NULL; if (refresh) *refresh = true; + if (binary) + { + *binary_given = false; + *binary = false; + } /* Parse options */ foreach(lc, options) @@ -175,6 +184,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, refresh_given = true; *refresh = defGetBoolean(defel); } + else if (strcmp(defel->defname, "binary") == 0 && binary) + { + if (*binary_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *binary_given = true; + *binary = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -322,6 +341,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char *conninfo; char *slotname; bool slotname_given; + bool binary; + bool binary_given; char originname[NAMEDATALEN]; bool create_slot; List *publications; @@ -331,10 +352,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * * Connection and publication should not be specified here. */ - parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname_given, - &slotname, ©_data, &synchronous_commit, - NULL); + parse_subscription_options(stmt->options, + &connect, + &enabled_given, &enabled, + &create_slot, + &slotname_given, &slotname, + ©_data, + &synchronous_commit, + NULL, /* no "refresh" */ + &binary_given, &binary); /* * Since creating a replication slot is not transactional, rolling back @@ -400,6 +426,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -669,10 +696,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *slotname; bool slotname_given; char *synchronous_commit; + bool binary_given; + bool binary; - parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slotname_given, &slotname, - NULL, &synchronous_commit, NULL); + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + &slotname_given, &slotname, + NULL, /* no "copy_data" */ + &synchronous_commit, + NULL, /* no "refresh" */ + &binary_given, &binary); if (slotname_given) { @@ -697,6 +732,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subsynccommit - 1] = true; } + if (binary_given) + { + values[Anum_pg_subscription_subbinary - 1] = + BoolGetDatum(binary); + replaces[Anum_pg_subscription_subbinary - 1] = true; + } + update_tuple = true; break; } @@ -706,9 +748,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) bool enabled, enabled_given; - parse_subscription_options(stmt->options, NULL, - &enabled_given, &enabled, NULL, - NULL, NULL, NULL, NULL, NULL); + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + &enabled_given, &enabled, + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + NULL, /* no "copy_data" */ + NULL, /* no "synchronous_commit" */ + NULL, /* no "refresh" */ + NULL, NULL); /* no "binary" */ Assert(enabled_given); if (!sub->slotname && enabled) @@ -744,9 +792,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) bool copy_data; bool refresh; - parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, NULL, ©_data, - NULL, &refresh); + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + ©_data, + NULL, /* no "synchronous_commit" */ + &refresh, + NULL, NULL); /* no "binary" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -781,9 +835,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); - parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, NULL, ©_data, - NULL, NULL); + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + ©_data, + NULL, /* no "synchronous_commit" */ + NULL, /* no "refresh" */ + NULL, NULL); /* no "binary" */ AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e4fd1f9bb6..e9057230e4 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -424,6 +424,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQfreemem(pubnames_literal); pfree(pubnames_str); + if (options->proto.logical.binary && + PQserverVersion(conn->streamConn) >= 140000) + appendStringInfoString(&cmd, ", binary 'true'"); + appendStringInfoChar(&cmd, ')'); } else diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 3c6d0cd171..2b1356ee24 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -17,7 +17,6 @@ #include "catalog/pg_type.h" #include "libpq/pqformat.h" #include "replication/logicalproto.h" -#include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -31,7 +30,7 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple); + HeapTuple tuple, bool binary); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -139,7 +138,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) * Write INSERT to the output stream. */ void -logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) +logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'I'); /* action INSERT */ @@ -147,7 +146,7 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple); + logicalrep_write_tuple(out, rel, newtuple, binary); } /* @@ -179,7 +178,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple) + HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'U'); /* action UPDATE */ @@ -196,11 +195,11 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple); + logicalrep_write_tuple(out, rel, oldtuple, binary); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple); + logicalrep_write_tuple(out, rel, newtuple, binary); } /* @@ -248,7 +247,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, * Write DELETE to the output stream. */ void -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) +logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -264,7 +263,7 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple); + logicalrep_write_tuple(out, rel, oldtuple, binary); } /* @@ -437,7 +436,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) { TupleDesc desc; Datum values[MaxTupleAttributeNumber]; @@ -474,12 +473,18 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) if (isnull[i]) { - pq_sendbyte(out, 'n'); /* null column */ + pq_sendbyte(out, LOGICALREP_COLUMN_NULL); continue; } - else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i])) + + if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i])) { - pq_sendbyte(out, 'u'); /* unchanged toast column */ + /* + * Unchanged toasted datum. (Note that we don't promise to detect + * unchanged data in general; this is just a cheap check to avoid + * sending large values unnecessarily.) + */ + pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED); continue; } @@ -488,20 +493,48 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) elog(ERROR, "cache lookup failed for type %u", att->atttypid); typclass = (Form_pg_type) GETSTRUCT(typtup); - pq_sendbyte(out, 't'); /* 'text' data follows */ + /* + * Choose whether to send in binary. Obviously, the option must be + * requested and the type must have a send function. Also, if the + * type is not built-in then it must not be a composite or array type. + * Such types contain type OIDs, which will likely not match at the + * receiver if it's not a built-in type. + * + * XXX this could be relaxed if we changed record_recv and array_recv + * to be less picky. + * + * XXX this fails to apply the restriction to domains over such types. + */ + if (binary && + OidIsValid(typclass->typsend) && + (att->atttypid < FirstGenbkiObjectId || + (typclass->typtype != TYPTYPE_COMPOSITE && + typclass->typelem == InvalidOid))) + { + bytea *outputbytes; + int len; - outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); - pq_sendcountedtext(out, outputstr, strlen(outputstr), false); - pfree(outputstr); + pq_sendbyte(out, LOGICALREP_COLUMN_BINARY); + outputbytes = OidSendFunctionCall(typclass->typsend, values[i]); + len = VARSIZE(outputbytes) - VARHDRSZ; + pq_sendint(out, len, 4); /* length */ + pq_sendbytes(out, VARDATA(outputbytes), len); /* data */ + pfree(outputbytes); + } + else + { + pq_sendbyte(out, LOGICALREP_COLUMN_TEXT); + outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); + pq_sendcountedtext(out, outputstr, strlen(outputstr), false); + pfree(outputstr); + } ReleaseSysCache(typtup); } } /* - * Read tuple in remote format from stream. - * - * The returned tuple points into the input stringinfo. + * Read tuple in logical replication format from stream. */ static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) @@ -512,38 +545,52 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) /* Get number of attributes */ natts = pq_getmsgint(in, 2); - memset(tuple->changed, 0, sizeof(tuple->changed)); + /* Allocate space for per-column values; zero out unused StringInfoDatas */ + tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData)); + tuple->colstatus = (char *) palloc(natts * sizeof(char)); /* Read the data */ for (i = 0; i < natts; i++) { char kind; + int len; + StringInfo value = &tuple->colvalues[i]; kind = pq_getmsgbyte(in); + tuple->colstatus[i] = kind; switch (kind) { - case 'n': /* null */ - tuple->values[i] = NULL; - tuple->changed[i] = true; + case LOGICALREP_COLUMN_NULL: + /* nothing more to do */ break; - case 'u': /* unchanged column */ + case LOGICALREP_COLUMN_UNCHANGED: /* we don't receive the value of an unchanged column */ - tuple->values[i] = NULL; break; - case 't': /* text formatted value */ - { - int len; + case LOGICALREP_COLUMN_TEXT: + len = pq_getmsgint(in, 4); /* read length */ - tuple->changed[i] = true; + /* and data */ + value->data = palloc(len + 1); + pq_copymsgbytes(in, value->data, len); + value->data[len] = '\0'; + /* make StringInfo fully valid */ + value->len = len; + value->cursor = 0; + value->maxlen = len; + break; + case LOGICALREP_COLUMN_BINARY: + len = pq_getmsgint(in, 4); /* read length */ - len = pq_getmsgint(in, 4); /* read length */ - - /* and data */ - tuple->values[i] = palloc(len + 1); - pq_copymsgbytes(in, tuple->values[i], len); - tuple->values[i][len] = '\0'; - } + /* and data */ + value->data = palloc(len + 1); + pq_copymsgbytes(in, value->data, len); + /* not strictly necessary but per StringInfo practice */ + value->data[len] = '\0'; + /* make StringInfo fully valid */ + value->len = len; + value->cursor = 0; + value->maxlen = len; break; default: elog(ERROR, "unrecognized data representation type '%c'", kind); @@ -552,7 +599,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) } /* - * Write relation attributes to the stream. + * Write relation attribute metadata to the stream. */ static void logicalrep_write_attrs(StringInfo out, Relation rel) @@ -611,7 +658,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel) } /* - * Read relation attribute names from the stream. + * Read relation attribute metadata from the stream. */ static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f90a896fc3..407eee3c0b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -319,13 +319,13 @@ slot_store_error_callback(void *arg) } /* - * Store data in C string form into slot. - * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our - * use better. + * Store tuple data into slot. + * + * Incoming data can be either text or binary format. */ static void -slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, - char **values) +slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -343,27 +343,65 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, errcallback.previous = error_context_stack; error_context_stack = &errcallback; - /* Call the "in" function for each non-dropped attribute */ + /* Call the "in" function for each non-dropped, non-null attribute */ Assert(natts == rel->attrmap->maplen); for (i = 0; i < natts; i++) { Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); int remoteattnum = rel->attrmap->attnums[i]; - if (!att->attisdropped && remoteattnum >= 0 && - values[remoteattnum] != NULL) + if (!att->attisdropped && remoteattnum >= 0) { - Oid typinput; - Oid typioparam; + StringInfo colvalue = &tupleData->colvalues[remoteattnum]; errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); - slot->tts_isnull[i] = false; + if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, colvalue->data, + typioparam, att->atttypmod); + slot->tts_isnull[i] = false; + } + else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) + { + Oid typreceive; + Oid typioparam; + + /* + * In some code paths we may be asked to re-parse the same + * tuple data. Reset the StringInfo's cursor so that works. + */ + colvalue->cursor = 0; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, colvalue, + typioparam, att->atttypmod); + + /* Trouble if it didn't eat the whole buffer */ + if (colvalue->cursor != colvalue->len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format in logical replication column %d", + remoteattnum + 1))); + slot->tts_isnull[i] = false; + } + else + { + /* + * NULL value from remote. (We don't expect to see + * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as + * NULL.) + */ + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } errarg.local_attnum = -1; errarg.remote_attnum = -1; @@ -371,8 +409,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, else { /* - * We assign NULL to dropped attributes, NULL values, and missing - * values (missing values should be later filled using + * We assign NULL to dropped attributes and missing values + * (missing values should be later filled using * slot_fill_defaults). */ slot->tts_values[i] = (Datum) 0; @@ -387,20 +425,21 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, } /* - * Replace selected columns with user data provided as C strings. + * Replace updated columns with data from the LogicalRepTupleData struct. * This is somewhat similar to heap_modify_tuple but also calls the type * input functions on the user data. - * "slot" is filled with a copy of the tuple in "srcslot", with - * columns selected by the "replaces" array replaced with data values - * from "values". + * + * "slot" is filled with a copy of the tuple in "srcslot", replacing + * columns provided in "tupleData" and leaving others as-is. + * * Caution: unreplaced pass-by-ref columns in "slot" will point into the * storage for "srcslot". This is OK for current usage, but someday we may * need to materialize "slot" at the end to make it independent of "srcslot". */ static void -slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, - LogicalRepRelMapEntry *rel, - char **values, bool *replaces) +slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, + LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -438,31 +477,58 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, if (remoteattnum < 0) continue; - if (!replaces[remoteattnum]) - continue; - - if (values[remoteattnum] != NULL) + if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) { - Oid typinput; - Oid typioparam; + StringInfo colvalue = &tupleData->colvalues[remoteattnum]; errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); - slot->tts_isnull[i] = false; + if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, colvalue->data, + typioparam, att->atttypmod); + slot->tts_isnull[i] = false; + } + else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) + { + Oid typreceive; + Oid typioparam; + + /* + * In some code paths we may be asked to re-parse the same + * tuple data. Reset the StringInfo's cursor so that works. + */ + colvalue->cursor = 0; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, colvalue, + typioparam, att->atttypmod); + + /* Trouble if it didn't eat the whole buffer */ + if (colvalue->cursor != colvalue->len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format in logical replication column %d", + remoteattnum + 1))); + slot->tts_isnull[i] = false; + } + else + { + /* must be LOGICALREP_COLUMN_NULL */ + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } errarg.local_attnum = -1; errarg.remote_attnum = -1; } - else - { - slot->tts_values[i] = (Datum) 0; - slot->tts_isnull[i] = true; - } } /* Pop the error context stack */ @@ -641,7 +707,7 @@ apply_handle_insert(StringInfo s) /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, newtup.values); + slot_store_data(remoteslot, rel, &newtup); slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); @@ -765,7 +831,7 @@ apply_handle_update(StringInfo s) target_rte = list_nth(estate->es_range_table, 0); for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) { - if (newtup.changed[i]) + if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED) target_rte->updatedCols = bms_add_member(target_rte->updatedCols, i + 1 - FirstLowInvalidHeapAttributeNumber); } @@ -776,8 +842,8 @@ apply_handle_update(StringInfo s) /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, - has_oldtup ? oldtup.values : newtup.values); + slot_store_data(remoteslot, rel, + has_oldtup ? &oldtup : &newtup); MemoryContextSwitchTo(oldctx); /* For a partitioned table, apply update to correct partition. */ @@ -831,8 +897,7 @@ apply_handle_update_internal(ResultRelInfo *relinfo, { /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_cstrings(remoteslot, localslot, relmapentry, - newtup->values, newtup->changed); + slot_modify_data(remoteslot, localslot, relmapentry, newtup); MemoryContextSwitchTo(oldctx); EvalPlanQualSetSlot(&epqstate, remoteslot); @@ -900,7 +965,7 @@ apply_handle_delete(StringInfo s) /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, oldtup.values); + slot_store_data(remoteslot, rel, &oldtup); MemoryContextSwitchTo(oldctx); /* For a partitioned table, apply delete to correct partition. */ @@ -1096,9 +1161,9 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, if (found) { /* Apply the update. */ - slot_modify_cstrings(remoteslot_part, localslot, - part_entry, - newtup->values, newtup->changed); + slot_modify_data(remoteslot_part, localslot, + part_entry, + newtup); MemoryContextSwitchTo(oldctx); } else @@ -1312,8 +1377,8 @@ apply_handle_truncate(StringInfo s) } /* - * Even if we used CASCADE on the upstream primary we explicitly default to - * replaying changes without further cascading. This might be later + * Even if we used CASCADE on the upstream primary we explicitly default + * to replaying changes without further cascading. This might be later * changeable with a user specified option. */ ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); @@ -1850,60 +1915,21 @@ maybe_reread_subscription(void) proc_exit(0); } - /* - * Exit if connection string was changed. The launcher will start new - * worker. - */ - if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because the connection information was changed", - MySubscription->name))); - - proc_exit(0); - } - - /* - * Exit if subscription name was changed (it's used for - * fallback_application_name). The launcher will start new worker. - */ - if (strcmp(newsub->name, MySubscription->name) != 0) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because subscription was renamed", - MySubscription->name))); - - proc_exit(0); - } - /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); /* - * We need to make new connection to new slot if slot name has changed so - * exit here as well if that's the case. + * Exit if any parameter that affects the remote connection was changed. + * The launcher will start a new worker. */ - if (strcmp(newsub->slotname, MySubscription->slotname) != 0) + if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 || + strcmp(newsub->name, MySubscription->name) != 0 || + strcmp(newsub->slotname, MySubscription->slotname) != 0 || + newsub->binary != MySubscription->binary || + !equal(newsub->publications, MySubscription->publications)) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because the replication slot name was changed", - MySubscription->name))); - - proc_exit(0); - } - - /* - * Exit if publication list was changed. The launcher will start new - * worker. - */ - if (!equal(newsub->publications, MySubscription->publications)) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because subscription's publications were changed", + (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change", MySubscription->name))); proc_exit(0); @@ -2106,6 +2132,7 @@ ApplyWorkerMain(Datum main_arg) options.slotname = myslotname; options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; + options.proto.logical.binary = MySubscription->binary; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 15379e3118..81ef7dc4c1 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,6 +15,7 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "commands/defrem.h" #include "fmgr.h" #include "replication/logical.h" #include "replication/logicalproto.h" @@ -118,11 +119,14 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) static void parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names) + List **publication_names, bool *binary) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; + bool binary_option_given = false; + + *binary = false; foreach(lc, options) { @@ -168,6 +172,16 @@ parse_output_parameters(List *options, uint32 *protocol_version, (errcode(ERRCODE_INVALID_NAME), errmsg("invalid publication_names syntax"))); } + else if (strcmp(defel->defname, "binary") == 0) + { + if (binary_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + binary_option_given = true; + + *binary = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -202,7 +216,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Parse the params and ERROR if we see any we don't recognize */ parse_output_parameters(ctx->output_plugin_options, &data->protocol_version, - &data->publication_names); + &data->publication_names, + &data->binary); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) @@ -411,7 +426,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, tuple); + logicalrep_write_insert(ctx->out, relation, tuple, + data->binary); OutputPluginWrite(ctx, true); break; } @@ -435,7 +451,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, newtuple); + logicalrep_write_update(ctx->out, relation, oldtuple, newtuple, + data->binary); OutputPluginWrite(ctx, true); break; } @@ -455,7 +472,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, oldtuple); + logicalrep_write_delete(ctx->out, relation, oldtuple, + data->binary); OutputPluginWrite(ctx, true); } else diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 857c7c2278..94459b3539 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4205,6 +4205,7 @@ getSubscriptions(Archive *fout) int i_subslotname; int i_subsynccommit; int i_subpublications; + int i_subbinary; int i, ntups; @@ -4229,18 +4230,26 @@ getSubscriptions(Archive *fout) query = createPQExpBuffer(); - resetPQExpBuffer(query); - /* Get the subscriptions in current database. */ appendPQExpBuffer(query, - "SELECT s.tableoid, s.oid, s.subname," - "(%s s.subowner) AS rolname, " - " s.subconninfo, s.subslotname, s.subsynccommit, " - " s.subpublications " - "FROM pg_subscription s " - "WHERE s.subdbid = (SELECT oid FROM pg_database" - " WHERE datname = current_database())", + "SELECT s.tableoid, s.oid, s.subname,\n" + " (%s s.subowner) AS rolname,\n" + " s.subconninfo, s.subslotname, s.subsynccommit,\n" + " s.subpublications,\n", username_subquery); + + if (fout->remoteVersion >= 140000) + appendPQExpBuffer(query, + " s.subbinary\n"); + else + appendPQExpBuffer(query, + " false AS subbinary\n"); + + appendPQExpBuffer(query, + "FROM pg_subscription s\n" + "WHERE s.subdbid = (SELECT oid FROM pg_database\n" + " WHERE datname = current_database())"); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); ntups = PQntuples(res); @@ -4253,6 +4262,7 @@ getSubscriptions(Archive *fout) i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subpublications = PQfnumber(res, "subpublications"); + i_subbinary = PQfnumber(res, "subbinary"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4274,6 +4284,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subsynccommit)); subinfo[i].subpublications = pg_strdup(PQgetvalue(res, i, i_subpublications)); + subinfo[i].subbinary = + pg_strdup(PQgetvalue(res, i, i_subbinary)); if (strlen(subinfo[i].rolname) == 0) pg_log_warning("owner of subscription \"%s\" appears to be invalid", @@ -4342,6 +4354,9 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) else appendPQExpBufferStr(query, "NONE"); + if (strcmp(subinfo->subbinary, "t") == 0) + appendPQExpBuffer(query, ", binary = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 0c2fcfb3a9..da97b731b1 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -625,6 +625,7 @@ typedef struct _SubscriptionInfo char *rolname; char *subconninfo; char *subslotname; + char *subbinary; char *subsynccommit; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 3b870c3b17..e197dcdb4d 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5963,7 +5963,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false}; + false, false, false}; if (pset.sversion < 100000) { @@ -5989,6 +5989,12 @@ describeSubscriptions(const char *pattern, bool verbose) if (verbose) { + /* Binary mode is only supported in v14 and higher */ + if (pset.sversion >= 140000) + appendPQExpBuffer(&buf, + ", subbinary AS \"%s\"\n", + gettext_noop("Binary")); + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 6b3aa7c006..ccb586ad00 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202007131 +#define CATALOG_VERSION_NO 202007181 #endif diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0a756d42d8..a041ce9740 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ + bool subbinary; /* True if the subscription wants the + * publisher to send data in binary */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -73,6 +76,8 @@ typedef struct Subscription char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ + bool binary; /* Indicates if the subscription wants data in + * binary format */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 4860561be9..287288ab41 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -30,12 +30,19 @@ /* Tuple coming via logical replication. */ typedef struct LogicalRepTupleData { - /* column values in text format, or NULL for a null value: */ - char *values[MaxTupleAttributeNumber]; - /* markers for changed/unchanged column values: */ - bool changed[MaxTupleAttributeNumber]; + /* Array of StringInfos, one per column; some may be unused */ + StringInfoData *colvalues; + /* Array of markers for null/unchanged/text/binary, one per column */ + char *colstatus; } LogicalRepTupleData; +/* Possible values for LogicalRepTupleData.colstatus[colnum] */ +/* These values are also used in the on-the-wire protocol */ +#define LOGICALREP_COLUMN_NULL 'n' +#define LOGICALREP_COLUMN_UNCHANGED 'u' +#define LOGICALREP_COLUMN_TEXT 't' +#define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */ + typedef uint32 LogicalRepRelId; /* Relation information */ @@ -87,15 +94,15 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, Relation rel, - HeapTuple newtuple); + HeapTuple newtuple, bool binary); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple); + HeapTuple newtuple, bool binary); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, Relation rel, - HeapTuple oldtuple); + HeapTuple oldtuple, bool binary); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 2e8e9daf44..a8c676ed23 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -20,11 +20,11 @@ typedef struct PGOutputData MemoryContext context; /* private memory context for transient * allocations */ - /* client info */ + /* client-supplied info: */ uint32 protocol_version; - List *publication_names; List *publications; + bool binary; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index c75dcebea0..c2d5dbee54 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -177,6 +177,7 @@ typedef struct { uint32 proto_version; /* Logical protocol version */ List *publication_names; /* String list of publications */ + bool binary; /* Ask publisher to use binary */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index e7add9d2b8..d71db0d520 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | dbname=regress_doesnotexist2 (1 row) BEGIN; @@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -155,6 +155,29 @@ DROP SUBSCRIPTION IF EXISTS regress_testsub; NOTICE: subscription "regress_testsub" does not exist, skipping DROP SUBSCRIPTION regress_testsub; -- fail ERROR: subscription "regress_testsub" does not exist +-- fail - binary must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo); +ERROR: binary requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | dbname=regress_doesnotexist +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (binary = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist +(1 row) + +DROP SUBSCRIPTION regress_testsub; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 9e234ab8b3..eeb2ec06eb 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -117,6 +117,21 @@ COMMIT; DROP SUBSCRIPTION IF EXISTS regress_testsub; DROP SUBSCRIPTION regress_testsub; -- fail +-- fail - binary must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (binary = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); + +\dRs+ + +DROP SUBSCRIPTION regress_testsub; + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl new file mode 100644 index 0000000000..36a2f58e17 --- /dev/null +++ b/src/test/subscription/t/014_binary.pl @@ -0,0 +1,134 @@ +# Binary mode logical replication test + +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# Create and initialize a publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create and initialize subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create tables on both sides of the replication +my $ddl = qq( + CREATE TABLE public.test_numerical ( + a INTEGER PRIMARY KEY, + b NUMERIC, + c FLOAT, + d BIGINT + ); + CREATE TABLE public.test_arrays ( + a INTEGER[] PRIMARY KEY, + b NUMERIC[], + c TEXT[] + );); + +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +# Configure logical replication +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tpub FOR ALL TABLES"); + +my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' " + . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)"); + +# Ensure nodes are in sync with each other +$node_publisher->wait_for_catchup('tsub'); +$node_subscriber->poll_query_until('postgres', + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');" +) or die "Timed out while waiting for subscriber to synchronize data"; + +# Insert some content and make sure it's replicated across +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_arrays (a, b, c) VALUES + ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'), + ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}'); + + INSERT INTO public.test_numerical (a, b, c, d) VALUES + (1, 1.2, 1.3, 10), + (2, 2.2, 2.3, 20), + (3, 3.2, 3.3, 30); + )); + +$node_publisher->wait_for_catchup('tsub'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a"); + +is( $result, '1|1.2|1.3|10 +2|2.2|2.3|20 +3|3.2|3.3|30', 'check replicated data on subscriber'); + +# Test updates as well +$node_publisher->safe_psql( + 'postgres', qq( + UPDATE public.test_arrays SET b[1] = 42, c = NULL; + UPDATE public.test_numerical SET b = 42, c = NULL; + )); + +$node_publisher->wait_for_catchup('tsub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c FROM test_arrays ORDER BY a"); + +is( $result, '{1,2,3}|{42,1.2,1.3}| +{3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a"); + +is( $result, '1|42||10 +2|42||20 +3|42||30', 'check updated replicated data on subscriber'); + +# Test to reset back to text formatting, and then to binary again +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tsub SET (binary = false);"); + +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_numerical (a, b, c, d) VALUES + (4, 4.2, 4.3, 40); + )); + +$node_publisher->wait_for_catchup('tsub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a"); + +is( $result, '1|42||10 +2|42||20 +3|42||30 +4|4.2|4.3|40', 'check replicated data on subscriber'); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tsub SET (binary = true);"); + +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_arrays (a, b, c) VALUES + ('{2,3,1}', '{1.2, 1.3, 1.1}', '{"two", "three", "one"}'); + )); + +$node_publisher->wait_for_catchup('tsub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c FROM test_arrays ORDER BY a"); + +is( $result, '{1,2,3}|{42,1.2,1.3}| +{2,3,1}|{1.2,1.3,1.1}|{two,three,one} +{3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast');