Pass the source text for a parallel query to the workers.

With this change, you can see the query that a parallel worker is
executing in pg_stat_activity, and if the worker crashes you can
see what query it was executing when it crashed.

Rafia Sabih, reviewed by Kuntal Ghosh and Amit Kapila and slightly
revised by me.
This commit is contained in:
Robert Haas 2017-02-22 12:15:17 +05:30
parent b4316928d5
commit 4c728f3829
4 changed files with 29 additions and 1 deletions

View File

@ -190,6 +190,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
estate->es_param_exec_vals = (ParamExecData *)
palloc0(queryDesc->plannedstmt->nParamExec * sizeof(ParamExecData));
estate->es_sourceText = queryDesc->sourceText;
/*
* If non-read-only query, set the command ID to mark output tuples with
*/

View File

@ -39,6 +39,7 @@
#include "utils/dsa.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "pgstat.h"
/*
* Magic numbers for parallel executor communication. We use constants
@ -51,6 +52,7 @@
#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
@ -368,6 +370,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
int instrumentation_len = 0;
int instrument_offset = 0;
Size dsa_minsize = dsa_minimum_size();
char *query_string;
int query_len;
/* Allocate object for return value. */
pei = palloc0(sizeof(ParallelExecutorInfo));
@ -387,6 +391,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* for the various things we need to store.
*/
/* Estimate space for query text. */
query_len = strlen(estate->es_sourceText);
shm_toc_estimate_chunk(&pcxt->estimator, query_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate space for serialized PlannedStmt. */
pstmt_len = strlen(pstmt_data) + 1;
shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
@ -451,6 +460,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* asked for has been allocated or initialized yet, though, so do that.
*/
/* Store query string */
query_string = shm_toc_allocate(pcxt->toc, query_len);
memcpy(query_string, estate->es_sourceText, query_len);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
/* Store serialized PlannedStmt. */
pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
memcpy(pstmt_space, pstmt_data, pstmt_len);
@ -661,6 +675,10 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
char *paramspace;
PlannedStmt *pstmt;
ParamListInfo paramLI;
char *queryString;
/* Get the query string from shared memory */
queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT);
/* Reconstruct leader-supplied PlannedStmt. */
pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT);
@ -679,7 +697,7 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
* revising this someday.
*/
return CreateQueryDesc(pstmt,
"<parallel query>",
queryString,
GetActiveSnapshot(), InvalidSnapshot,
receiver, paramLI, instrument_options);
}
@ -799,6 +817,12 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
instrument_options = instrumentation->instrument_options;
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
/* Setting debug_query_string for individual workers */
debug_query_string = queryDesc->sourceText;
/* Report workers' query for monitoring purposes */
pgstat_report_activity(STATE_RUNNING, debug_query_string);
/* Prepare to track buffer usage during query execution. */
InstrStartParallelQuery();

View File

@ -139,6 +139,7 @@ CreateExecutorState(void)
estate->es_epqTuple = NULL;
estate->es_epqTupleSet = NULL;
estate->es_epqScanDone = NULL;
estate->es_sourceText = NULL;
/*
* Return the executor state structure

View File

@ -371,6 +371,7 @@ typedef struct EState
Snapshot es_crosscheck_snapshot; /* crosscheck time qual for RI */
List *es_range_table; /* List of RangeTblEntry */
PlannedStmt *es_plannedstmt; /* link to top of plan tree */
const char *es_sourceText; /* Source text from QueryDesc */
JunkFilter *es_junkFilter; /* top-level junk filter, if any */