diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 088700e17c..eef1dc2b18 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -810,7 +810,17 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) case 'A': /* NotifyResponse */ { /* Propagate NotifyResponse. */ - pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1); + int32 pid; + const char *channel; + const char *payload; + + pid = pq_getmsgint(msg, 4); + channel = pq_getmsgrawstring(msg); + payload = pq_getmsgrawstring(msg); + pq_endmessage(msg); + + NotifyMyFrontEnd(channel, payload, pid); + break; } @@ -988,6 +998,12 @@ ParallelWorkerMain(Datum main_arg) BackgroundWorkerInitializeConnectionByOid(fps->database_id, fps->authenticated_user_id); + /* + * Set the client encoding to the database encoding, since that is what + * the leader will expect. + */ + SetClientEncoding(GetDatabaseEncoding()); + /* Restore GUC values from launching backend. */ gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC); Assert(gucspace != NULL); diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index c39ac3aeef..716f1c3318 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, char *page_buffer); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(void); -static void NotifyMyFrontEnd(const char *channel, - const char *payload, - int32 srcPid); static bool AsyncExistsPendingNotify(const char *channel, const char *payload); static void ClearPendingActionsAndNotifies(void); @@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void) /* * Send NOTIFY message to my front end. */ -static void +void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) { if (whereToSendOutput == DestRemote) diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c index 962d75db6e..4ad8266a51 100644 --- a/src/backend/commands/variable.c +++ b/src/backend/commands/variable.c @@ -755,6 +755,30 @@ assign_client_encoding(const char *newval, void *extra) { int encoding = *((int *) extra); + /* + * Parallel workers send data to the leader, not the client. They always + * send data using the database encoding. + */ + if (IsParallelWorker()) + { + /* + * During parallel worker startup, we want to accept the leader's + * client_encoding setting so that anyone who looks at the value in + * the worker sees the same value that they would see in the leader. + */ + if (InitializingParallelWorker) + return; + + /* + * A change other than during startup, for example due to a SET clause + * attached to a function definition, should be rejected, as there is + * nothing we can do inside the worker to make it take effect. + */ + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot change client_encoding in a parallel worker"))); + } + /* We do not expect an error if PrepareClientEncoding succeeded */ if (SetClientEncoding(encoding) < 0) elog(LOG, "SetClientEncoding(%d) failed", encoding); diff --git a/src/backend/libpq/pqformat.c b/src/backend/libpq/pqformat.c index 4ddea8285f..b5d9d64e54 100644 --- a/src/backend/libpq/pqformat.c +++ b/src/backend/libpq/pqformat.c @@ -65,6 +65,7 @@ * pq_copymsgbytes - copy raw data from a message buffer * pq_getmsgtext - get a counted text string (with conversion) * pq_getmsgstring - get a null-terminated text string (with conversion) + * pq_getmsgrawstring - get a null-terminated text string - NO conversion * pq_getmsgend - verify message fully consumed */ @@ -639,6 +640,35 @@ pq_getmsgstring(StringInfo msg) return pg_client_to_server(str, slen); } +/* -------------------------------- + * pq_getmsgrawstring - get a null-terminated text string - NO conversion + * + * Returns a pointer directly into the message buffer. + * -------------------------------- + */ +const char * +pq_getmsgrawstring(StringInfo msg) +{ + char *str; + int slen; + + str = &msg->data[msg->cursor]; + + /* + * It's safe to use strlen() here because a StringInfo is guaranteed to + * have a trailing null byte. But check we found a null inside the + * message. + */ + slen = strlen(str); + if (msg->cursor + slen >= msg->len) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid string in message"))); + msg->cursor += slen + 1; + + return str; +} + /* -------------------------------- * pq_getmsgend - verify message fully consumed * -------------------------------- diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 3225c1fa0e..0dcdee03db 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -232,7 +232,7 @@ pq_parse_errornotice(StringInfo msg, ErrorData *edata) pq_getmsgend(msg); break; } - value = pq_getmsgstring(msg); + value = pq_getmsgrawstring(msg); switch (code) { diff --git a/src/include/commands/async.h b/src/include/commands/async.h index b4c13fac4a..95559df19f 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -28,6 +28,10 @@ extern volatile sig_atomic_t notifyInterruptPending; extern Size AsyncShmemSize(void); extern void AsyncShmemInit(void); +extern void NotifyMyFrontEnd(const char *channel, + const char *payload, + int32 srcPid); + /* notify-related SQL statements */ extern void Async_Notify(const char *channel, const char *payload); extern void Async_Listen(const char *channel); diff --git a/src/include/libpq/pqformat.h b/src/include/libpq/pqformat.h index 65ebf37fbc..3c0d4b2622 100644 --- a/src/include/libpq/pqformat.h +++ b/src/include/libpq/pqformat.h @@ -44,6 +44,7 @@ extern const char *pq_getmsgbytes(StringInfo msg, int datalen); extern void pq_copymsgbytes(StringInfo msg, char *buf, int datalen); extern char *pq_getmsgtext(StringInfo msg, int rawbytes, int *nbytes); extern const char *pq_getmsgstring(StringInfo msg); +extern const char *pq_getmsgrawstring(StringInfo msg); extern void pq_getmsgend(StringInfo msg); #endif /* PQFORMAT_H */