diff --git a/src/backend/libpq/be-secure-openssl.c b/src/backend/libpq/be-secure-openssl.c index 1dd7770e5d..20b4742d12 100644 --- a/src/backend/libpq/be-secure-openssl.c +++ b/src/backend/libpq/be-secure-openssl.c @@ -71,6 +71,8 @@ #endif #include "libpq/libpq.h" +#include "miscadmin.h" +#include "storage/latch.h" #include "tcop/tcopprot.h" #include "utils/memutils.h" @@ -338,6 +340,7 @@ be_tls_open_server(Port *port) { int r; int err; + int waitfor; Assert(!port->ssl); Assert(!port->peer); @@ -371,12 +374,15 @@ aloop: { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: -#ifdef WIN32 - pgwin32_waitforsinglesocket(SSL_get_fd(port->ssl), - (err == SSL_ERROR_WANT_READ) ? - FD_READ | FD_CLOSE | FD_ACCEPT : FD_WRITE | FD_CLOSE, - INFINITE); -#endif + /* not allowed during connection establishment */ + Assert(!port->noblock); + + if (err == SSL_ERROR_WANT_READ) + waitfor = WL_SOCKET_READABLE; + else + waitfor = WL_SOCKET_WRITEABLE; + + WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0); goto aloop; case SSL_ERROR_SYSCALL: if (r < 0) @@ -504,6 +510,7 @@ be_tls_read(Port *port, void *ptr, size_t len) { ssize_t n; int err; + int waitfor; rloop: errno = 0; @@ -516,18 +523,20 @@ rloop: break; case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: + /* Don't retry if the socket is in nonblocking mode. */ if (port->noblock) { errno = EWOULDBLOCK; n = -1; break; } -#ifdef WIN32 - pgwin32_waitforsinglesocket(SSL_get_fd(port->ssl), - (err == SSL_ERROR_WANT_READ) ? - FD_READ | FD_CLOSE : FD_WRITE | FD_CLOSE, - INFINITE); -#endif + + if (err == SSL_ERROR_WANT_READ) + waitfor = WL_SOCKET_READABLE; + else + waitfor = WL_SOCKET_WRITEABLE; + + WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0); goto rloop; case SSL_ERROR_SYSCALL: /* leave it to caller to ereport the value of errno */ @@ -567,6 +576,7 @@ be_tls_write(Port *port, void *ptr, size_t len) { ssize_t n; int err; + int waitfor; /* * If SSL renegotiations are enabled and we're getting close to the @@ -630,12 +640,13 @@ wloop: break; case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: -#ifdef WIN32 - pgwin32_waitforsinglesocket(SSL_get_fd(port->ssl), - (err == SSL_ERROR_WANT_READ) ? - FD_READ | FD_CLOSE : FD_WRITE | FD_CLOSE, - INFINITE); -#endif + + if (err == SSL_ERROR_WANT_READ) + waitfor = WL_SOCKET_READABLE; + else + waitfor = WL_SOCKET_WRITEABLE; + + WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0); goto wloop; case SSL_ERROR_SYSCALL: /* leave it to caller to ereport the value of errno */ @@ -722,7 +733,7 @@ my_sock_read(BIO *h, char *buf, int size) if (res <= 0) { /* If we were interrupted, tell caller to retry */ - if (errno == EINTR) + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { BIO_set_retry_read(h); } @@ -741,7 +752,8 @@ my_sock_write(BIO *h, const char *buf, int size) BIO_clear_retry_flags(h); if (res <= 0) { - if (errno == EINTR) + /* If we were interrupted, tell caller to retry */ + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { BIO_set_retry_write(h); } diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index c592f850c2..c9a8f6df4f 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -32,8 +32,10 @@ #endif #include "libpq/libpq.h" +#include "miscadmin.h" #include "tcop/tcopprot.h" #include "utils/memutils.h" +#include "storage/proc.h" char *ssl_cert_file; @@ -147,7 +149,39 @@ secure_raw_read(Port *port, void *ptr, size_t len) prepare_for_client_read(); + /* + * Try to read from the socket without blocking. If it succeeds we're + * done, otherwise we'll wait for the socket using the latch mechanism. + */ +rloop: +#ifdef WIN32 + pgwin32_noblock = true; +#endif n = recv(port->sock, ptr, len, 0); +#ifdef WIN32 + pgwin32_noblock = false; +#endif + + if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN)) + { + int w; + int save_errno = errno; + + w = WaitLatchOrSocket(MyLatch, + WL_SOCKET_READABLE, + port->sock, 0); + + if (w & WL_SOCKET_READABLE) + { + goto rloop; + } + + /* + * Restore errno, clobbered by WaitLatchOrSocket, so the caller can + * react properly. + */ + errno = save_errno; + } client_read_ended(); @@ -170,7 +204,9 @@ secure_write(Port *port, void *ptr, size_t len) } else #endif + { n = secure_raw_write(port, ptr, len); + } return n; } @@ -178,5 +214,44 @@ secure_write(Port *port, void *ptr, size_t len) ssize_t secure_raw_write(Port *port, const void *ptr, size_t len) { - return send(port->sock, ptr, len, 0); + ssize_t n; + +wloop: + +#ifdef WIN32 + pgwin32_noblock = true; +#endif + n = send(port->sock, ptr, len, 0); +#ifdef WIN32 + pgwin32_noblock = false; +#endif + + if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN)) + { + int w; + int save_errno = errno; + + /* + * We probably want to check for latches being set at some point + * here. That'd allow us to handle interrupts while blocked on + * writes. If set we'd not retry directly, but return. That way we + * don't do anything while (possibly) inside a ssl library. + */ + w = WaitLatchOrSocket(MyLatch, + WL_SOCKET_WRITEABLE, + port->sock, 0); + + if (w & WL_SOCKET_WRITEABLE) + { + goto wloop; + } + + /* + * Restore errno, clobbered by WaitLatchOrSocket, so the caller can + * react properly. + */ + errno = save_errno; + } + + return n; } diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 254fd8285b..0d97aa45fb 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -181,6 +181,22 @@ pq_init(void) PqCommReadingMsg = false; DoingCopyOut = false; on_proc_exit(socket_close, 0); + + /* + * In backends (as soon as forked) we operate the underlying socket in + * nonblocking mode and use latches to implement blocking semantics if + * needed. That allows us to provide safely interruptible reads. + * + * Use COMMERROR on failure, because ERROR would try to send the error to + * the client, which might require changing the mode again, leading to + * infinite recursion. + */ +#ifndef WIN32 + if (!pg_set_noblock(MyProcPort->sock)) + ereport(COMMERROR, + (errmsg("could not set socket to nonblocking mode: %m"))); +#endif + } /* -------------------------------- @@ -820,31 +836,6 @@ socket_set_nonblocking(bool nonblocking) (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), errmsg("there is no client connection"))); - if (MyProcPort->noblock == nonblocking) - return; - -#ifdef WIN32 - pgwin32_noblock = nonblocking ? 1 : 0; -#else - - /* - * Use COMMERROR on failure, because ERROR would try to send the error to - * the client, which might require changing the mode again, leading to - * infinite recursion. - */ - if (nonblocking) - { - if (!pg_set_noblock(MyProcPort->sock)) - ereport(COMMERROR, - (errmsg("could not set socket to nonblocking mode: %m"))); - } - else - { - if (!pg_set_block(MyProcPort->sock)) - ereport(COMMERROR, - (errmsg("could not set socket to blocking mode: %m"))); - } -#endif MyProcPort->noblock = nonblocking; }