diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index edbc9ab02a..de858e165a 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -1347,25 +1347,16 @@ Datum dblink_cancel_query(PG_FUNCTION_ARGS) { PGconn *conn; - PGcancelConn *cancelConn; char *msg; + TimestampTz endtime; dblink_init(); conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); - cancelConn = PQcancelCreate(conn); - - PG_TRY(); - { - if (!PQcancelBlocking(cancelConn)) - msg = pchomp(PQcancelErrorMessage(cancelConn)); - else - msg = "OK"; - } - PG_FINALLY(); - { - PQcancelFinish(cancelConn); - } - PG_END_TRY(); + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + 30000); + msg = libpqsrv_cancel(conn, endtime); + if (msg == NULL) + msg = "OK"; PG_RETURN_TEXT_P(cstring_to_text(msg)); } diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4931ebf591..2532e453c4 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel); static bool pgfdw_cancel_query(PGconn *conn); -static bool pgfdw_cancel_query_begin(PGconn *conn); +static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime); static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, @@ -1315,36 +1315,31 @@ pgfdw_cancel_query(PGconn *conn) endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), CONNECTION_CLEANUP_TIMEOUT); - if (!pgfdw_cancel_query_begin(conn)) + if (!pgfdw_cancel_query_begin(conn, endtime)) return false; return pgfdw_cancel_query_end(conn, endtime, false); } +/* + * Submit a cancel request to the given connection, waiting only until + * the given time. + * + * We sleep interruptibly until we receive confirmation that the cancel + * request has been accepted, and if it is, return true; if the timeout + * lapses without that, or the request fails for whatever reason, return + * false. + */ static bool -pgfdw_cancel_query_begin(PGconn *conn) +pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime) { - PGcancel *cancel; - char errbuf[256]; + char *errormsg = libpqsrv_cancel(conn, endtime); - /* - * Issue cancel request. Unfortunately, there's no good way to limit the - * amount of time that we might block inside PQgetCancel(). - */ - if ((cancel = PQgetCancel(conn))) - { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) - { - ereport(WARNING, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not send cancel request: %s", - errbuf))); - PQfreeCancel(cancel); - return false; - } - PQfreeCancel(cancel); - } + if (errormsg != NULL) + ereport(WARNING, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", errormsg)); - return true; + return errormsg == NULL; } static bool @@ -1685,7 +1680,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, */ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) { - if (!pgfdw_cancel_query_begin(entry->conn)) + TimestampTz endtime; + + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + CONNECTION_CLEANUP_TIMEOUT); + if (!pgfdw_cancel_query_begin(entry->conn, endtime)) return false; /* Unable to cancel running query */ *cancel_requested = lappend(*cancel_requested, entry); } diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 3f0110c52b..b7af86d351 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c (10 rows) ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: (count(*)) + Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5)) + Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE)) +(4 rows) + +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +ERROR: canceling statement due to statement timeout +RESET statement_timeout; -- ==================================================================== -- Check that userid to use when querying the remote table is correctly -- propagated into foreign rels present in subqueries under an UNION ALL diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 5fffc4c53b..6e1c819159 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10; ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +RESET statement_timeout; + -- ==================================================================== -- Check that userid to use when querying the remote table is correctly -- propagated into foreign rels present in subqueries under an UNION ALL diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h index 5d33bcf32f..2adf92030a 100644 --- a/src/include/libpq/libpq-be-fe-helpers.h +++ b/src/include/libpq/libpq-be-fe-helpers.h @@ -44,6 +44,8 @@ #include "miscadmin.h" #include "storage/fd.h" #include "storage/latch.h" +#include "utils/timestamp.h" +#include "utils/wait_event.h" static inline void libpqsrv_connect_prepare(void); @@ -365,4 +367,91 @@ libpqsrv_get_result(PGconn *conn, uint32 wait_event_info) return PQgetResult(conn); } +/* + * Submit a cancel request to the given connection, waiting only until + * the given time. + * + * We sleep interruptibly until we receive confirmation that the cancel + * request has been accepted, and if it is, return NULL; if the cancel + * request fails, return an error message string (which is not to be + * freed). + * + * For other problems (to wit: OOM when strdup'ing an error message from + * libpq), this function can ereport(ERROR). + * + * Note: this function leaks a string's worth of memory when reporting + * libpq errors. Make sure to call it in a transient memory context. + */ +static inline char * +libpqsrv_cancel(PGconn *conn, TimestampTz endtime) +{ + PGcancelConn *cancel_conn; + char *error = NULL; + + cancel_conn = PQcancelCreate(conn); + if (cancel_conn == NULL) + return _("out of memory"); + + /* In what follows, do not leak any PGcancelConn on any errors. */ + + PG_TRY(); + { + if (!PQcancelStart(cancel_conn)) + { + error = pchomp(PQcancelErrorMessage(cancel_conn)); + goto exit; + } + + for (;;) + { + PostgresPollingStatusType pollres; + TimestampTz now; + long cur_timeout; + int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; + + pollres = PQcancelPoll(cancel_conn); + if (pollres == PGRES_POLLING_OK) + break; /* success! */ + + /* If timeout has expired, give up, else get sleep time. */ + now = GetCurrentTimestamp(); + cur_timeout = TimestampDifferenceMilliseconds(now, endtime); + if (cur_timeout <= 0) + { + error = _("cancel request timed out"); + break; + } + + switch (pollres) + { + case PGRES_POLLING_READING: + waitEvents |= WL_SOCKET_READABLE; + break; + case PGRES_POLLING_WRITING: + waitEvents |= WL_SOCKET_WRITEABLE; + break; + default: + error = pchomp(PQcancelErrorMessage(cancel_conn)); + goto exit; + } + + /* Sleep until there's something to do */ + WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn), + cur_timeout, PG_WAIT_CLIENT); + + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + } +exit: ; + } + PG_FINALLY(); + { + PQcancelFinish(cancel_conn); + } + PG_END_TRY(); + + return error; +} + #endif /* LIBPQ_BE_FE_HELPERS_H */