diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 92d281a7d1..60ba50658d 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -1,17 +1,102 @@ Parsed test spec with 2 sessions -starting permutation: listen begin check notify check -step listen: LISTEN a; -step begin: BEGIN; -step check: SELECT pg_notification_queue_usage() > 0 AS nonzero; +starting permutation: listenc notify1 notify2 notify3 notifyf +step listenc: LISTEN c1; LISTEN c2; +step notify1: NOTIFY c1; +notifier: NOTIFY "c1" with payload "" from notifier +step notify2: NOTIFY c2, 'payload'; +notifier: NOTIFY "c2" with payload "payload" from notifier +step notify3: NOTIFY c3, 'payload3'; +step notifyf: SELECT pg_notify('c2', NULL); +pg_notify + + +notifier: NOTIFY "c2" with payload "" from notifier + +starting permutation: listenc notifyd1 notifyd2 notifys1 +step listenc: LISTEN c1; LISTEN c2; +step notifyd1: NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload'; +notifier: NOTIFY "c2" with payload "payload" from notifier +notifier: NOTIFY "c1" with payload "" from notifier +step notifyd2: NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2'; +notifier: NOTIFY "c1" with payload "" from notifier +notifier: NOTIFY "c1" with payload "p1" from notifier +notifier: NOTIFY "c1" with payload "p2" from notifier +step notifys1: + BEGIN; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + SAVEPOINT s1; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads'; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads'; + RELEASE SAVEPOINT s1; + SAVEPOINT s2; + NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload'; + NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads'; + NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload'; + NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads'; + ROLLBACK TO SAVEPOINT s2; + COMMIT; + +notifier: NOTIFY "c1" with payload "payload" from notifier +notifier: NOTIFY "c2" with payload "payload" from notifier +notifier: NOTIFY "c1" with payload "payload" from notifier +notifier: NOTIFY "c2" with payload "payload" from notifier +notifier: NOTIFY "c1" with payload "payloads" from notifier +notifier: NOTIFY "c2" with payload "payloads" from notifier + +starting permutation: llisten notify1 notify2 notify3 notifyf lcheck +step llisten: LISTEN c1; LISTEN c2; +step notify1: NOTIFY c1; +step notify2: NOTIFY c2, 'payload'; +step notify3: NOTIFY c3, 'payload3'; +step notifyf: SELECT pg_notify('c2', NULL); +pg_notify + + +step lcheck: SELECT 1 AS x; +x + +1 +listener: NOTIFY "c1" with payload "" from notifier +listener: NOTIFY "c2" with payload "payload" from notifier +listener: NOTIFY "c2" with payload "" from notifier + +starting permutation: listenc llisten notify1 notify2 notify3 notifyf lcheck +step listenc: LISTEN c1; LISTEN c2; +step llisten: LISTEN c1; LISTEN c2; +step notify1: NOTIFY c1; +notifier: NOTIFY "c1" with payload "" from notifier +step notify2: NOTIFY c2, 'payload'; +notifier: NOTIFY "c2" with payload "payload" from notifier +step notify3: NOTIFY c3, 'payload3'; +step notifyf: SELECT pg_notify('c2', NULL); +pg_notify + + +notifier: NOTIFY "c2" with payload "" from notifier +step lcheck: SELECT 1 AS x; +x + +1 +listener: NOTIFY "c1" with payload "" from notifier +listener: NOTIFY "c2" with payload "payload" from notifier +listener: NOTIFY "c2" with payload "" from notifier + +starting permutation: llisten lbegin usage bignotify usage +step llisten: LISTEN c1; LISTEN c2; +step lbegin: BEGIN; +step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero; nonzero f -step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; +step bignotify: SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; count 1000 -step check: SELECT pg_notification_queue_usage() > 0 AS nonzero; +step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero; nonzero t diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c index 1d1cc12306..8a49520395 100644 --- a/src/test/isolation/isolationtester.c +++ b/src/test/isolation/isolationtester.c @@ -23,10 +23,12 @@ /* * conns[0] is the global setup, teardown, and watchdog connection. Additional - * connections represent spec-defined sessions. + * connections represent spec-defined sessions. We also track the backend + * PID, in numeric and string formats, for each connection. */ static PGconn **conns = NULL; -static const char **backend_pids = NULL; +static int *backend_pids = NULL; +static const char **backend_pid_strs = NULL; static int nconns = 0; /* In dry run only output permutations to be run by the tester. */ @@ -42,7 +44,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps); #define STEP_NONBLOCK 0x1 /* return 0 as soon as cmd waits for a lock */ #define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */ -static bool try_complete_step(Step *step, int flags); +static bool try_complete_step(TestSpec *testspec, Step *step, int flags); static int step_qsort_cmp(const void *a, const void *b); static int step_bsearch_cmp(const void *a, const void *b); @@ -161,8 +163,10 @@ main(int argc, char **argv) * extra for lock wait detection and global work. */ nconns = 1 + testspec->nsessions; - conns = calloc(nconns, sizeof(PGconn *)); - backend_pids = calloc(nconns, sizeof(*backend_pids)); + conns = (PGconn **) pg_malloc0(nconns * sizeof(PGconn *)); + backend_pids = pg_malloc0(nconns * sizeof(*backend_pids)); + backend_pid_strs = pg_malloc0(nconns * sizeof(*backend_pid_strs)); + for (i = 0; i < nconns; i++) { conns[i] = PQconnectdb(conninfo); @@ -188,26 +192,9 @@ main(int argc, char **argv) blackholeNoticeProcessor, NULL); - /* Get the backend pid for lock wait checking. */ - res = PQexec(conns[i], "SELECT pg_backend_pid()"); - if (PQresultStatus(res) == PGRES_TUPLES_OK) - { - if (PQntuples(res) == 1 && PQnfields(res) == 1) - backend_pids[i] = pg_strdup(PQgetvalue(res, 0, 0)); - else - { - fprintf(stderr, "backend pid query returned %d rows and %d columns, expected 1 row and 1 column", - PQntuples(res), PQnfields(res)); - exit_nicely(); - } - } - else - { - fprintf(stderr, "backend pid query failed: %s", - PQerrorMessage(conns[i])); - exit_nicely(); - } - PQclear(res); + /* Save each connection's backend PID for subsequent use. */ + backend_pids[i] = PQbackendPID(conns[i]); + backend_pid_strs[i] = psprintf("%d", backend_pids[i]); } /* Set the session index fields in steps. */ @@ -232,9 +219,9 @@ main(int argc, char **argv) appendPQExpBufferStr(&wait_query, "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{"); /* The spec syntax requires at least one session; assume that here. */ - appendPQExpBufferStr(&wait_query, backend_pids[1]); + appendPQExpBufferStr(&wait_query, backend_pid_strs[1]); for (i = 2; i < nconns; i++) - appendPQExpBuffer(&wait_query, ",%s", backend_pids[i]); + appendPQExpBuffer(&wait_query, ",%s", backend_pid_strs[i]); appendPQExpBufferStr(&wait_query, "}')"); res = PQprepare(conns[0], PREP_WAITING, wait_query.data, 0, NULL); @@ -553,7 +540,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) oldstep = waiting[w]; /* Wait for previous step on this connection. */ - try_complete_step(oldstep, STEP_RETRY); + try_complete_step(testspec, oldstep, STEP_RETRY); /* Remove that step from the waiting[] array. */ if (w + 1 < nwaiting) @@ -575,7 +562,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) nerrorstep = 0; while (w < nwaiting) { - if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY)) + if (try_complete_step(testspec, waiting[w], + STEP_NONBLOCK | STEP_RETRY)) { /* Still blocked on a lock, leave it alone. */ w++; @@ -604,14 +592,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) } /* Try to complete this step without blocking. */ - mustwait = try_complete_step(step, STEP_NONBLOCK); + mustwait = try_complete_step(testspec, step, STEP_NONBLOCK); /* Check for completion of any steps that were previously waiting. */ w = 0; nerrorstep = 0; while (w < nwaiting) { - if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY)) + if (try_complete_step(testspec, waiting[w], + STEP_NONBLOCK | STEP_RETRY)) w++; else { @@ -634,7 +623,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) /* Wait for any remaining queries. */ for (w = 0; w < nwaiting; ++w) { - try_complete_step(waiting[w], STEP_RETRY); + try_complete_step(testspec, waiting[w], STEP_RETRY); report_error_message(waiting[w]); } @@ -697,7 +686,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) * a lock, returns true. Otherwise, returns false. */ static bool -try_complete_step(Step *step, int flags) +try_complete_step(TestSpec *testspec, Step *step, int flags) { PGconn *conn = conns[1 + step->session]; fd_set read_set; @@ -706,6 +695,7 @@ try_complete_step(Step *step, int flags) int sock = PQsocket(conn); int ret; PGresult *res; + PGnotify *notify; bool canceled = false; if (sock < 0) @@ -742,7 +732,7 @@ try_complete_step(Step *step, int flags) bool waiting; res = PQexecPrepared(conns[0], PREP_WAITING, 1, - &backend_pids[step->session + 1], + &backend_pid_strs[step->session + 1], NULL, NULL, 0); if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) != 1) @@ -884,6 +874,36 @@ try_complete_step(Step *step, int flags) PQclear(res); } + /* Report any available NOTIFY messages, too */ + PQconsumeInput(conn); + while ((notify = PQnotifies(conn)) != NULL) + { + /* Try to identify which session it came from */ + const char *sendername = NULL; + char pidstring[32]; + int i; + + for (i = 0; i < testspec->nsessions; i++) + { + if (notify->be_pid == backend_pids[i + 1]) + { + sendername = testspec->sessions[i]->name; + break; + } + } + if (sendername == NULL) + { + /* Doesn't seem to be any test session, so show the hard way */ + snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid); + sendername = pidstring; + } + printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n", + testspec->sessions[step->session]->name, + notify->relname, notify->extra, sendername); + PQfreemem(notify); + PQconsumeInput(conn); + } + return false; } diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec index 8adad42c7c..daf7bef903 100644 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@ -1,14 +1,70 @@ -# Verify that pg_notification_queue_usage correctly reports a non-zero result, -# after submitting notifications while another connection is listening for -# those notifications and waiting inside an active transaction. +# Tests for LISTEN/NOTIFY -session "listener" -step "listen" { LISTEN a; } -step "begin" { BEGIN; } -teardown { ROLLBACK; UNLISTEN *; } +# Most of these tests use only the "notifier" session and hence exercise only +# self-notifies, which are convenient because they minimize timing concerns. +# Note we assume that each step is delivered to the backend as a single Query +# message so it will run as one transaction. session "notifier" -step "check" { SELECT pg_notification_queue_usage() > 0 AS nonzero; } -step "notify" { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; } +step "listenc" { LISTEN c1; LISTEN c2; } +step "notify1" { NOTIFY c1; } +step "notify2" { NOTIFY c2, 'payload'; } +step "notify3" { NOTIFY c3, 'payload3'; } # not listening to c3 +step "notifyf" { SELECT pg_notify('c2', NULL); } +step "notifyd1" { NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload'; } +step "notifyd2" { NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2'; } +step "notifys1" { + BEGIN; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + SAVEPOINT s1; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads'; + NOTIFY c1, 'payload'; NOTIFY "c2", 'payload'; + NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads'; + RELEASE SAVEPOINT s1; + SAVEPOINT s2; + NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload'; + NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads'; + NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload'; + NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads'; + ROLLBACK TO SAVEPOINT s2; + COMMIT; +} +step "usage" { SELECT pg_notification_queue_usage() > 0 AS nonzero; } +step "bignotify" { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; } +teardown { UNLISTEN *; } -permutation "listen" "begin" "check" "notify" "check" +# The listener session is used for cross-backend notify checks. + +session "listener" +step "llisten" { LISTEN c1; LISTEN c2; } +step "lcheck" { SELECT 1 AS x; } +step "lbegin" { BEGIN; } +teardown { UNLISTEN *; } + + +# Trivial cases. +permutation "listenc" "notify1" "notify2" "notify3" "notifyf" + +# Check simple and less-simple deduplication. +permutation "listenc" "notifyd1" "notifyd2" "notifys1" + +# Cross-backend notification delivery. We use a "select 1" to force the +# listener session to check for notifies. In principle we could just wait +# for delivery, but that would require extra support in isolationtester +# and might have portability-of-timing issues. +permutation "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck" + +# Again, with local delivery too. +permutation "listenc" "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck" + +# Verify that pg_notification_queue_usage correctly reports a non-zero result, +# after submitting notifications while another connection is listening for +# those notifications and waiting inside an active transaction. We have to +# fill a page of the notify SLRU to make this happen, which is a good deal +# of traffic. To not bloat the expected output, we intentionally don't +# commit the listener's transaction, so that it never reports these events. +# Hence, this should be the last test in this script. + +permutation "llisten" "lbegin" "usage" "bignotify" "usage"