/*------------------------------------------------------------------------- * * pqcomm.c * Communication functions between the Frontend and the Backend * * Copyright (c) 1994, Regents of the University of California * * $Id: pqcomm.c,v 1.67 1999/02/18 01:13:26 tgl Exp $ * *------------------------------------------------------------------------- */ /* * INTERFACE ROUTINES * pq_init - initialize libpq * pq_getport - return the PGPORT setting * pq_close - close input / output connections * pq_flush - flush pending output * pq_recvbuf - load some bytes into the input buffer * pq_getstr - get a null terminated string from connection * pq_getchar - get 1 character from connection * pq_peekchar - peek at next character from connection * pq_getnchar - get n characters from connection, and null-terminate * pq_getint - get an integer from connection * pq_putchar - send 1 character to connection * pq_putstr - send a null terminated string to connection * pq_putnchar - send n characters to connection * pq_putint - send an integer to connection * pq_putncharlen - send n characters to connection * (also send an int header indicating * the length) * pq_getinaddr - initialize address from host and port number * pq_getinserv - initialize address from host and service name * * StreamDoUnlink - Shutdown UNIX socket connection * StreamServerPort - Open socket stream * StreamConnection - Create new connection with client * StreamClose - Close a client/backend connection * * NOTES * Frontend is now completely in interfaces/libpq, and no * functions from this file are used there. * */ #include "postgres.h" #include #if defined(HAVE_STRING_H) #include #else #include #endif #include #include #include #include /* for ttyname() */ #include #include #include #include #include #include #include #include #if defined(linux) #ifndef SOMAXCONN #define SOMAXCONN 5 /* from Linux listen(2) man page */ #endif /* SOMAXCONN */ #endif /* linux */ #include "miscadmin.h" #include "libpq/pqsignal.h" #include "libpq/auth.h" #include "libpq/libpq.h" /* where the declarations go */ #include "storage/ipc.h" #ifdef MULTIBYTE #include "mb/pg_wchar.h" #endif #include "utils/trace.h" extern FILE * debug_port; /* in util.c */ /* * Buffers */ unsigned char PqSendBuffer[PQ_BUFFER_SIZE]; unsigned char PqRecvBuffer[PQ_BUFFER_SIZE]; int PqSendPointer,PqRecvPointer,PqRecvLength; /* -------------------------------- * pq_init - open portal file descriptors * -------------------------------- */ void pq_init(int fd) { PqSendPointer = PqRecvPointer = PqRecvLength = 0; PQnotifies_init(); if (getenv("LIBPQ_DEBUG")) debug_port = stderr; } /* ------------------------- * pq_getchar() * * get a character from the input file, or EOF if trouble * -------------------------------- */ int pq_getchar(void) { while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } return PqRecvBuffer[PqRecvPointer++]; } /* ------------------------- * pq_peekchar() * * get a character from the connection, but leave it in the buffer * to be read again * -------------------------------- */ int pq_peekchar(void) { while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } /* Note we don't bump the pointer... */ return PqRecvBuffer[PqRecvPointer]; } /* -------------------------------- * pq_getport - return the PGPORT setting * -------------------------------- */ int pq_getport() { char *envport = getenv("PGPORT"); if (envport) return atoi(envport); return atoi(DEF_PGPORT); } /* -------------------------------- * pq_close - close input / output connections * -------------------------------- */ void pq_close() { close(MyProcPort->sock); PQnotifies_init(); } /* -------------------------------- * pq_flush - flush pending output * * returns 0 if OK, EOF if trouble * -------------------------------- */ int pq_flush() { unsigned char *bufptr = PqSendBuffer; unsigned char *bufend = PqSendBuffer + PqSendPointer; while (bufptr < bufend) { int r = send(MyProcPort->sock, bufptr, bufend - bufptr, 0); if (r <= 0) { if (errno == EINTR) continue; /* Ok if we were interrupted */ /* We would like to use elog() here, but cannot because elog * tries to write to the client, which would cause a recursive * flush attempt! So just write it out to the postmaster log. */ fprintf(stderr, "pq_flush: send() failed, errno %d\n", errno); /* We drop the buffered data anyway so that processing * can continue, even though we'll probably quit soon. */ PqSendPointer = 0; return EOF; } bufptr += r; } PqSendPointer = 0; return 0; } /* -------------------------------- * pq_recvbuf - load some bytes into the input buffer * * returns 0 if OK, EOF if trouble * -------------------------------- */ int pq_recvbuf() { if (PqRecvPointer > 0) { if (PqRecvLength > PqRecvPointer) { /* still some unread data, left-justify it in the buffer */ memmove(PqRecvBuffer, PqRecvBuffer+PqRecvPointer, PqRecvLength-PqRecvPointer); PqRecvLength -= PqRecvPointer; PqRecvPointer = 0; } else PqRecvLength = PqRecvPointer = 0; } /* Can fill buffer from PqRecvLength and upwards */ for (;;) { int r = recv(MyProcPort->sock, PqRecvBuffer + PqRecvLength, PQ_BUFFER_SIZE - PqRecvLength, 0); if (r < 0) { if (errno == EINTR) continue; /* Ok if interrupted */ /* We would like to use elog() here, but dare not because elog * tries to write to the client, which will cause problems * if we have a hard communications failure ... * So just write the message to the postmaster log. */ fprintf(stderr, "pq_recvbuf: recv() failed, errno=%d\n", errno); return EOF; } if (r == 0) { /* as above, elog not safe */ fprintf(stderr, "pq_recvbuf: unexpected EOF on client connection\n"); return EOF; } /* r contains number of bytes read, so just incr length */ PqRecvLength += r; return 0; } } /* -------------------------------- * pq_getstr - get a null terminated string from connection * -------------------------------- */ int pq_getstr(char *s, int maxlen) { int c; #ifdef MULTIBYTE char *p; #endif c = pqGetString(s, maxlen); #ifdef MULTIBYTE p = (char*) pg_client_to_server((unsigned char *) s, maxlen); if (s != p) /* actual conversion has been done? */ strcpy(s, p); #endif return c; } /* -------------------------------- * pq_getnchar - get n characters from connection, and null terminate * -------------------------------- */ int pq_getnchar(char *s, int off, int maxlen) { int r = pqGetNBytes(s + off, maxlen); s[off+maxlen] = '\0'; return r; } /* -------------------------------- * pq_getint - get an integer from connection * we receive an integer a byte at a type and reconstruct it so that * machines with different ENDIAN representations can talk to each * other * -------------------------------- */ int pq_getint(int b) { int n, status = 1; /* * mjl: Seems inconsisten w/ return value of pq_putint (void). Also, * EOF is a valid return value for an int! XXX */ switch (b) { case 1: status = ((n = pq_getchar()) == EOF); break; case 2: status = pqGetShort(&n); break; case 4: status = pqGetLong(&n); break; default: fprintf(stderr, "** Unsupported size %d\n", b); } if (status) { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: pq_getint failed: errno=%d\n", errno); fputs(PQerrormsg, stderr); pqdebug("%s", PQerrormsg); n = 0; } return n; } /* -------------------------------- * pq_putstr - send a null terminated string to connection * -------------------------------- */ void pq_putstr(char *s) { #ifdef MULTIBYTE unsigned char *p; p = pg_server_to_client(s, strlen(s)); if (pqPutString(p)) #else if (pqPutString(s)) #endif { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: pq_putstr: fputs() failed: errno=%d\n", errno); fputs(PQerrormsg, stderr); pqdebug("%s", PQerrormsg); } } /* -------------------------------- * pq_putnchar - send n characters to connection * -------------------------------- */ void pq_putnchar(char *s, int n) { if (pqPutNBytes(s, n)) { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: pq_putnchar: pqPutNBytes() failed: errno=%d\n", errno); fputs(PQerrormsg, stderr); pqdebug("%s", PQerrormsg); } } /* -------------------------------- * pq_putint - send an integer to connection * we chop an integer into bytes and send individual bytes * machines with different ENDIAN representations can still talk to each * other * -------------------------------- */ void pq_putint(int i, int b) { int status; status = 1; switch (b) { case 1: status = (pq_putchar(i) == EOF); break; case 2: status = pqPutShort(i); break; case 4: status = pqPutLong(i); break; default: fprintf(stderr, "** Unsupported size %d\n", b); } if (status) { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: pq_putint failed: errno=%d\n", errno); fputs(PQerrormsg, stderr); pqdebug("%s", PQerrormsg); } } /* -------------------------------- * pq_getinaddr - initialize address from host and port number * -------------------------------- */ int pq_getinaddr(struct sockaddr_in * sin, char *host, int port) { struct hostent *hs; MemSet((char *) sin, 0, sizeof(*sin)); if (host) { if (*host >= '0' && *host <= '9') sin->sin_addr.s_addr = inet_addr(host); else { if (!(hs = gethostbyname(host))) { perror(host); return 1; } if (hs->h_addrtype != AF_INET) { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: pq_getinaddr: %s not on Internet\n", host); fputs(PQerrormsg, stderr); pqdebug("%s", PQerrormsg); return 1; } memmove((char *) &sin->sin_addr, hs->h_addr, hs->h_length); } } sin->sin_family = AF_INET; sin->sin_port = htons(port); return 0; } /* -------------------------------- * pq_getinserv - initialize address from host and servive name * -------------------------------- */ int pq_getinserv(struct sockaddr_in * sin, char *host, char *serv) { struct servent *ss; if (*serv >= '0' && *serv <= '9') return pq_getinaddr(sin, host, atoi(serv)); if (!(ss = getservbyname(serv, NULL))) { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: pq_getinserv: unknown service: %s\n", serv); fputs(PQerrormsg, stderr); pqdebug("%s", PQerrormsg); return 1; } return pq_getinaddr(sin, host, ntohs(ss->s_port)); } /* * Streams -- wrapper around Unix socket system calls * * * Stream functions are used for vanilla TCP connection protocol. */ static char sock_path[MAXPGPATH + 1] = ""; /* StreamDoUnlink() * Shutdown routine for backend connection * If a Unix socket is used for communication, explicitly close it. */ void StreamDoUnlink() { Assert(sock_path[0]); unlink(sock_path); } /* * StreamServerPort -- open a sock stream "listening" port. * * This initializes the Postmaster's connection * accepting port. * * ASSUME: that this doesn't need to be non-blocking because * the Postmaster uses select() to tell when the socket * is ready. * * RETURNS: STATUS_OK or STATUS_ERROR */ int StreamServerPort(char *hostName, short portName, int *fdP) { SockAddr saddr; int fd, err, family; size_t len; int one = 1; #ifdef HAVE_FCNTL_SETLK int lock_fd; #endif family = ((hostName != NULL) ? AF_INET : AF_UNIX); if ((fd = socket(family, SOCK_STREAM, 0)) < 0) { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: StreamServerPort: socket() failed: errno=%d\n", errno); fputs(PQerrormsg, stderr); pqdebug("%s", PQerrormsg); return STATUS_ERROR; } if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one))) == -1) { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: StreamServerPort: setsockopt (SO_REUSEADDR) failed: errno=%d\n", errno); fputs(PQerrormsg, stderr); pqdebug("%s", PQerrormsg); return STATUS_ERROR; } MemSet((char *) &saddr, 0, sizeof(saddr)); saddr.sa.sa_family = family; if (family == AF_UNIX) { len = UNIXSOCK_PATH(saddr.un, portName); strcpy(sock_path, saddr.un.sun_path); /* * If the socket exists but nobody has an advisory lock on it we * can safely delete the file. */ #ifdef HAVE_FCNTL_SETLK #ifndef __CYGWIN32__ if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK, 0666)) >= 0) #else if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK | O_BINARY, 0666)) >= 0) #endif { struct flock lck; lck.l_whence = SEEK_SET; lck.l_start = lck.l_len = 0; lck.l_type = F_WRLCK; if (fcntl(lock_fd, F_SETLK, &lck) == 0) { TPRINTF(TRACE_VERBOSE, "flock on %s, deleting", sock_path); unlink(sock_path); } else TPRINTF(TRACE_VERBOSE, "flock failed for %s", sock_path); close(lock_fd); } #endif /* HAVE_FCNTL_SETLK */ } else { saddr.in.sin_addr.s_addr = htonl(INADDR_ANY); saddr.in.sin_port = htons(portName); len = sizeof(struct sockaddr_in); } err = bind(fd, &saddr.sa, len); if (err < 0) { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: StreamServerPort: bind() failed: errno=%d\n", errno); pqdebug("%s", PQerrormsg); strcat(PQerrormsg, "\tIs another postmaster already running on that port?\n"); if (family == AF_UNIX) { snprintf(PQerrormsg + strlen(PQerrormsg), ERROR_MSG_LENGTH, "\tIf not, remove socket node (%s) and retry.\n", sock_path); } else { strcat(PQerrormsg, "\tIf not, wait a few seconds and retry.\n"); } fputs(PQerrormsg, stderr); return STATUS_ERROR; } if (family == AF_UNIX) { on_proc_exit(StreamDoUnlink, NULL); /* * Open the socket file and get an advisory lock on it. The * lock_fd is left open to keep the lock. */ #ifdef HAVE_FCNTL_SETLK #ifndef __CYGWIN32__ if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK, 0666)) >= 0) #else if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK | O_BINARY, 0666)) >= 0) #endif { struct flock lck; lck.l_whence = SEEK_SET; lck.l_start = lck.l_len = 0; lck.l_type = F_WRLCK; if (fcntl(lock_fd, F_SETLK, &lck) != 0) TPRINTF(TRACE_VERBOSE, "flock error for %s", sock_path); } #endif /* HAVE_FCNTL_SETLK */ } listen(fd, SOMAXCONN); /* * MS: I took this code from Dillon's version. It makes the listening * port non-blocking. That is not necessary (and may tickle kernel * bugs). * * fcntl(fd, F_SETFD, 1); fcntl(fd, F_SETFL, FNDELAY); */ *fdP = fd; if (family == AF_UNIX) chmod(sock_path, 0777); return STATUS_OK; } /* * StreamConnection -- create a new connection with client using * server port. * * This one should be non-blocking. * * RETURNS: STATUS_OK or STATUS_ERROR */ int StreamConnection(int server_fd, Port *port) { SOCKET_SIZE_TYPE addrlen; /* accept connection (and fill in the client (remote) address) */ addrlen = sizeof(port->raddr); if ((port->sock = accept(server_fd, (struct sockaddr *) & port->raddr, &addrlen)) < 0) { elog(ERROR, "postmaster: StreamConnection: accept: %m"); return STATUS_ERROR; } /* fill in the server (local) address */ addrlen = sizeof(port->laddr); if (getsockname(port->sock, (struct sockaddr *) & port->laddr, &addrlen) < 0) { elog(ERROR, "postmaster: StreamConnection: getsockname: %m"); return STATUS_ERROR; } /* select TCP_NODELAY option if it's a TCP connection */ if (port->laddr.sa.sa_family == AF_INET) { struct protoent *pe; int on = 1; pe = getprotobyname("TCP"); if (pe == NULL) { elog(ERROR, "postmaster: getprotobyname failed"); return STATUS_ERROR; } if (setsockopt(port->sock, pe->p_proto, TCP_NODELAY, &on, sizeof(on)) < 0) { elog(ERROR, "postmaster: setsockopt failed: %m"); return STATUS_ERROR; } } /* reset to non-blocking */ fcntl(port->sock, F_SETFL, 1); return STATUS_OK; } /* * StreamClose -- close a client/backend connection */ void StreamClose(int sock) { close(sock); } #ifdef MULTIBYTE void pq_putncharlen(char *s, int n) { unsigned char *p; int len; p = pg_server_to_client(s, n); len = strlen(p); pq_putint(len, sizeof(int)); pq_putnchar(p, len); } #endif /* * Act like the stdio putc() function. Write one character * to the stream. Return this character, or EOF on error. */ int pq_putchar(unsigned char c) { if (PqSendPointer >= PQ_BUFFER_SIZE) if (pq_flush()) /* If buffer is full, then flush it out */ return EOF; PqSendBuffer[PqSendPointer++] = c; /* Put in buffer */ return c; }