diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 53692c0020..51f7338404 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -6531,8 +6531,33 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, relid oid - OID of the table on which the COPY command is executed. - It is set to 0 if copying from a SELECT query. + OID of the table on which the COPY command is + executed. It is set to 0 if copying from a + SELECT query. + + + + + + command text + + + The command that is running: COPY FROM, or + COPY TO. + + + + + + type text + + + The io type that the data is read from or written to: + FILE, PROGRAM, + PIPE (for COPY FROM STDIN and + COPY TO STDOUT), or CALLBACK + (used for example during the initial table synchronization in + logical replication). @@ -6551,16 +6576,26 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, Size of source file for COPY FROM command in bytes. - It is set to 0 if not available. + It is set to 0 if not available. - lines_processed bigint + tuples_processed bigint - Number of lines already processed by COPY command. + Number of tuples already processed by COPY command. + + + + + + tuples_excluded bigint + + + Number of tuples not processed because they were excluded by the + WHERE clause of the COPY command. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fb1116d09a..15221be67d 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1129,9 +1129,18 @@ CREATE VIEW pg_stat_progress_copy AS SELECT S.pid AS pid, S.datid AS datid, D.datname AS datname, S.relid AS relid, + CASE S.param5 WHEN 1 THEN 'COPY FROM' + WHEN 2 THEN 'COPY TO' + END AS command, + CASE S.param6 WHEN 1 THEN 'FILE' + WHEN 2 THEN 'PROGRAM' + WHEN 3 THEN 'PIPE' + WHEN 4 THEN 'CALLBACK' + END AS "type", S.param1 AS bytes_processed, S.param2 AS bytes_total, - S.param3 AS lines_processed + S.param3 AS tuples_processed, + S.param4 AS tuples_excluded FROM pg_stat_get_progress_info('COPY') AS S LEFT JOIN pg_database D ON S.datid = D.oid; diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index f05e2d2347..2ed696d429 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -539,7 +539,8 @@ CopyFrom(CopyFromState cstate) BulkInsertState bistate = NULL; CopyInsertMethod insertMethod; CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ - uint64 processed = 0; + int64 processed = 0; + int64 excluded = 0; bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; @@ -869,7 +870,15 @@ CopyFrom(CopyFromState cstate) econtext->ecxt_scantuple = myslot; /* Skip items that don't match COPY's WHERE clause */ if (!ExecQual(cstate->qualexpr, econtext)) + { + /* + * Report that this tuple was filtered out by the WHERE + * clause. + */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED, + ++excluded); continue; + } } /* Determine the partition to insert the tuple into */ @@ -1104,10 +1113,11 @@ CopyFrom(CopyFromState cstate) /* * We count only tuples not suppressed by a BEFORE INSERT trigger * or FDW; this is the same definition used by nodeModifyTable.c - * for counting tuples inserted by an INSERT command. Update + * for counting tuples inserted by an INSERT command. Update * progress of the COPY command as well. */ - pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + ++processed); } } @@ -1193,6 +1203,16 @@ BeginCopyFrom(ParseState *pstate, ExprState **defexprs; MemoryContext oldcontext; bool volatile_defexprs; + const int progress_cols[] = { + PROGRESS_COPY_COMMAND, + PROGRESS_COPY_TYPE, + PROGRESS_COPY_BYTES_TOTAL + }; + int64 progress_vals[] = { + PROGRESS_COPY_COMMAND_FROM, + 0, + 0 + }; /* Allocate workspace and zero all fields */ cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData)); @@ -1430,11 +1450,13 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { + progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; cstate->copy_src = COPY_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) { + progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput == DestRemote) ReceiveCopyBegin(cstate); @@ -1447,6 +1469,7 @@ BeginCopyFrom(ParseState *pstate, if (cstate->is_program) { + progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R); if (cstate->copy_file == NULL) ereport(ERROR, @@ -1458,6 +1481,7 @@ BeginCopyFrom(ParseState *pstate, { struct stat st; + progress_vals[1] = PROGRESS_COPY_TYPE_FILE; cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); if (cstate->copy_file == NULL) { @@ -1484,10 +1508,12 @@ BeginCopyFrom(ParseState *pstate, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", cstate->filename))); - pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size); + progress_vals[2] = st.st_size; } } + pgstat_progress_update_multi_param(3, progress_cols, progress_vals); + if (cstate->opts.binary) { /* Read and verify binary header */ diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 46155015cf..7257a54e93 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -353,6 +353,14 @@ BeginCopyTo(ParseState *pstate, TupleDesc tupDesc; int num_phys_attrs; MemoryContext oldcontext; + const int progress_cols[] = { + PROGRESS_COPY_COMMAND, + PROGRESS_COPY_TYPE + }; + int64 progress_vals[] = { + PROGRESS_COPY_COMMAND_TO, + 0 + }; if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) { @@ -659,6 +667,8 @@ BeginCopyTo(ParseState *pstate, if (pipe) { + progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; + Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; @@ -670,6 +680,7 @@ BeginCopyTo(ParseState *pstate, if (is_program) { + progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W); if (cstate->copy_file == NULL) ereport(ERROR, @@ -682,6 +693,8 @@ BeginCopyTo(ParseState *pstate, mode_t oumask; /* Pre-existing umask value */ struct stat st; + progress_vals[1] = PROGRESS_COPY_TYPE_FILE; + /* * Prevent write to relative path ... too easy to shoot oneself in * the foot by overwriting a database file ... @@ -731,6 +744,8 @@ BeginCopyTo(ParseState *pstate, /* initialize progress */ pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); + pgstat_progress_update_multi_param(2, progress_cols, progress_vals); + cstate->bytes_processed = 0; MemoryContextSwitchTo(oldcontext); @@ -881,8 +896,12 @@ DoCopyTo(CopyToState cstate) /* Format and send the data */ CopyOneRowTo(cstate, slot); - /* Increment amount of processed tuples and update the progress */ - pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); + /* + * Increment the number of processed tuples, and report the + * progress. + */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + ++processed); } ExecDropSingleTupleTableSlot(slot); @@ -1251,8 +1270,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) /* Send the data */ CopyOneRowTo(cstate, slot); - /* Increment amount of processed tuples and update the progress */ - pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed); + /* Increment the number of processed tuples, and report the progress */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + ++myState->processed); return true; } diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 81fd68348d..b25a2b5a65 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202103091 +#define CATALOG_VERSION_NO 202103092 #endif diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index 95ec5d02e9..c6b139d57d 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -133,9 +133,22 @@ #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5 -/* Commands of PROGRESS_COPY */ +/* Progress parameters for PROGRESS_COPY */ #define PROGRESS_COPY_BYTES_PROCESSED 0 #define PROGRESS_COPY_BYTES_TOTAL 1 -#define PROGRESS_COPY_LINES_PROCESSED 2 +#define PROGRESS_COPY_TUPLES_PROCESSED 2 +#define PROGRESS_COPY_TUPLES_EXCLUDED 3 +#define PROGRESS_COPY_COMMAND 4 +#define PROGRESS_COPY_TYPE 5 + +/* Commands of COPY (as advertised via PROGRESS_COPY_COMMAND) */ +#define PROGRESS_COPY_COMMAND_FROM 1 +#define PROGRESS_COPY_COMMAND_TO 2 + +/* Types of COPY commands (as advertised via PROGRESS_COPY_TYPE) */ +#define PROGRESS_COPY_TYPE_FILE 1 +#define PROGRESS_COPY_TYPE_PROGRAM 2 +#define PROGRESS_COPY_TYPE_PIPE 3 +#define PROGRESS_COPY_TYPE_CALLBACK 4 #endif diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index dd5cc9c221..c2f8328d18 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1950,9 +1950,22 @@ pg_stat_progress_copy| SELECT s.pid, s.datid, d.datname, s.relid, + CASE s.param5 + WHEN 1 THEN 'COPY FROM'::text + WHEN 2 THEN 'COPY TO'::text + ELSE NULL::text + END AS command, + CASE s.param6 + WHEN 1 THEN 'FILE'::text + WHEN 2 THEN 'PROGRAM'::text + WHEN 3 THEN 'PIPE'::text + WHEN 4 THEN 'CALLBACK'::text + ELSE NULL::text + END AS type, s.param1 AS bytes_processed, s.param2 AS bytes_total, - s.param3 AS lines_processed + s.param3 AS tuples_processed, + s.param4 AS tuples_excluded FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_create_index| SELECT s.pid,