From 10c0558ffefcd12bf1d3dc35587eba41d1ce4571 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Thu, 30 Jun 2016 18:35:32 -0400 Subject: [PATCH] Fix several mistakes around parallel workers and client_encoding. Previously, workers sent data to the leader using the client encoding. That mostly worked, but the leader the converted the data back to the server encoding. Since not all encoding conversions are reversible, that could provoke failures. Fix by using the database encoding for all communication between worker and leader. Also, while temporary changes to GUC settings, as from the SET clause of a function, are in general OK for parallel query, changing client_encoding this way inside of a parallel worker is not OK. Previously, that would have confused the leader; with these changes, it would not confuse the leader, but it wouldn't do anything either. So refuse such changes in parallel workers. Also, the previous code naively assumed that when it received a NotifyResonse from the worker, it could pass that directly back to the user. But now that worker-to-leader communication always uses the database encoding, that's clearly no longer correct - though, actually, the old way was always broken for V2 clients. So disassemble and reconstitute the message instead. Issues reported by Peter Eisentraut. Patch by me, reviewed by Peter Eisentraut. --- src/backend/access/transam/parallel.c | 18 +++++++++++++++- src/backend/commands/async.c | 5 +---- src/backend/commands/variable.c | 24 +++++++++++++++++++++ src/backend/libpq/pqformat.c | 30 +++++++++++++++++++++++++++ src/backend/libpq/pqmq.c | 2 +- src/include/commands/async.h | 4 ++++ src/include/libpq/pqformat.h | 1 + 7 files changed, 78 insertions(+), 6 deletions(-) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 088700e17c..eef1dc2b18 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -810,7 +810,17 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) case 'A': /* NotifyResponse */ { /* Propagate NotifyResponse. */ - pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1); + int32 pid; + const char *channel; + const char *payload; + + pid = pq_getmsgint(msg, 4); + channel = pq_getmsgrawstring(msg); + payload = pq_getmsgrawstring(msg); + pq_endmessage(msg); + + NotifyMyFrontEnd(channel, payload, pid); + break; } @@ -988,6 +998,12 @@ ParallelWorkerMain(Datum main_arg) BackgroundWorkerInitializeConnectionByOid(fps->database_id, fps->authenticated_user_id); + /* + * Set the client encoding to the database encoding, since that is what + * the leader will expect. + */ + SetClientEncoding(GetDatabaseEncoding()); + /* Restore GUC values from launching backend. */ gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC); Assert(gucspace != NULL); diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index c39ac3aeef..716f1c3318 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, char *page_buffer); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(void); -static void NotifyMyFrontEnd(const char *channel, - const char *payload, - int32 srcPid); static bool AsyncExistsPendingNotify(const char *channel, const char *payload); static void ClearPendingActionsAndNotifies(void); @@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void) /* * Send NOTIFY message to my front end. */ -static void +void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) { if (whereToSendOutput == DestRemote) diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c index 962d75db6e..4ad8266a51 100644 --- a/src/backend/commands/variable.c +++ b/src/backend/commands/variable.c @@ -755,6 +755,30 @@ assign_client_encoding(const char *newval, void *extra) { int encoding = *((int *) extra); + /* + * Parallel workers send data to the leader, not the client. They always + * send data using the database encoding. + */ + if (IsParallelWorker()) + { + /* + * During parallel worker startup, we want to accept the leader's + * client_encoding setting so that anyone who looks at the value in + * the worker sees the same value that they would see in the leader. + */ + if (InitializingParallelWorker) + return; + + /* + * A change other than during startup, for example due to a SET clause + * attached to a function definition, should be rejected, as there is + * nothing we can do inside the worker to make it take effect. + */ + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot change client_encoding in a parallel worker"))); + } + /* We do not expect an error if PrepareClientEncoding succeeded */ if (SetClientEncoding(encoding) < 0) elog(LOG, "SetClientEncoding(%d) failed", encoding); diff --git a/src/backend/libpq/pqformat.c b/src/backend/libpq/pqformat.c index 4ddea8285f..b5d9d64e54 100644 --- a/src/backend/libpq/pqformat.c +++ b/src/backend/libpq/pqformat.c @@ -65,6 +65,7 @@ * pq_copymsgbytes - copy raw data from a message buffer * pq_getmsgtext - get a counted text string (with conversion) * pq_getmsgstring - get a null-terminated text string (with conversion) + * pq_getmsgrawstring - get a null-terminated text string - NO conversion * pq_getmsgend - verify message fully consumed */ @@ -639,6 +640,35 @@ pq_getmsgstring(StringInfo msg) return pg_client_to_server(str, slen); } +/* -------------------------------- + * pq_getmsgrawstring - get a null-terminated text string - NO conversion + * + * Returns a pointer directly into the message buffer. + * -------------------------------- + */ +const char * +pq_getmsgrawstring(StringInfo msg) +{ + char *str; + int slen; + + str = &msg->data[msg->cursor]; + + /* + * It's safe to use strlen() here because a StringInfo is guaranteed to + * have a trailing null byte. But check we found a null inside the + * message. + */ + slen = strlen(str); + if (msg->cursor + slen >= msg->len) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid string in message"))); + msg->cursor += slen + 1; + + return str; +} + /* -------------------------------- * pq_getmsgend - verify message fully consumed * -------------------------------- diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 3225c1fa0e..0dcdee03db 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -232,7 +232,7 @@ pq_parse_errornotice(StringInfo msg, ErrorData *edata) pq_getmsgend(msg); break; } - value = pq_getmsgstring(msg); + value = pq_getmsgrawstring(msg); switch (code) { diff --git a/src/include/commands/async.h b/src/include/commands/async.h index b4c13fac4a..95559df19f 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -28,6 +28,10 @@ extern volatile sig_atomic_t notifyInterruptPending; extern Size AsyncShmemSize(void); extern void AsyncShmemInit(void); +extern void NotifyMyFrontEnd(const char *channel, + const char *payload, + int32 srcPid); + /* notify-related SQL statements */ extern void Async_Notify(const char *channel, const char *payload); extern void Async_Listen(const char *channel); diff --git a/src/include/libpq/pqformat.h b/src/include/libpq/pqformat.h index 65ebf37fbc..3c0d4b2622 100644 --- a/src/include/libpq/pqformat.h +++ b/src/include/libpq/pqformat.h @@ -44,6 +44,7 @@ extern const char *pq_getmsgbytes(StringInfo msg, int datalen); extern void pq_copymsgbytes(StringInfo msg, char *buf, int datalen); extern char *pq_getmsgtext(StringInfo msg, int rawbytes, int *nbytes); extern const char *pq_getmsgstring(StringInfo msg); +extern const char *pq_getmsgrawstring(StringInfo msg); extern void pq_getmsgend(StringInfo msg); #endif /* PQFORMAT_H */