Here's an attempt at new socket and signal code for win32.

It works on the principle of turning sockets into non-blocking, and then
emulate blocking behaviour on top of that, while allowing signals to
run. Signals are now implemented using an event instead of APCs, thus
getting rid of the issue of APCs not being compatible with "old style"
sockets functions.

It also moves the win32 specific code away from pqsignal.h/c into
port/win32, and also removes the "thread style workaround" of the APC
issue previously in place.

In order to make things work, a few things are also changed in pgstat.c:

1) There is now a separate pipe to the collector and the bufferer. This
is required because the pipe will otherwise only be signalled in one of
the processes when the postmaster goes down. The MS winsock code for
select() must have some kind of workaround for this behaviour, but I
have found no stable way of doing that. You really are not supposed to
use the same socket from more than one process (unless you use
WSADuplicateSocket(), in which case the docs specifically say that only
one will be flagged).

2) The check for "postmaster death" is moved into a separate select()
call after the main loop. The previous behaviour select():ed on the
postmaster pipe, while later explicitly saying "we do NOT check for
postmaster exit inside the loop".
The issue was that the code relies on the same select() call seeing both
the postmaster pipe *and* the pgstat pipe go away. This does not always
happen, and it appears that useing WSAEventSelect() makes it even more
common that it does not.
Since it's only called when the process exits, I don't think using a
separate select() call will have any significant impact on how the stats
collector works.

Magnus Hagander
This commit is contained in:
Bruce Momjian 2004-04-12 16:19:18 +00:00
parent abdabeb995
commit a4c40f140d
11 changed files with 831 additions and 374 deletions

View File

@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/libpq/pqsignal.c,v 1.32 2004/02/18 16:25:12 momjian Exp $
* $PostgreSQL: pgsql/src/backend/libpq/pqsignal.c,v 1.33 2004/04/12 16:19:18 momjian Exp $
*
* NOTES
* This shouldn't be in libpq, but the monitor and some other
@ -38,9 +38,6 @@
* is to do signal-handler reinstallation, which doesn't work well
* at all.
* ------------------------------------------------------------------------*/
#ifdef WIN32
#define _WIN32_WINNT 0x0400
#endif
#include "postgres.h"
@ -131,7 +128,9 @@ pqinitmask(void)
}
/* Win32 signal handling is in backend/port/win32/signal.c */
#ifndef WIN32
/*
* Set up a signal handler
*/
@ -154,290 +153,4 @@ pqsignal(int signo, pqsigfunc func)
return oact.sa_handler;
#endif /* !HAVE_POSIX_SIGNALS */
}
#else
/* Win32 specific signals code */
/* pg_signal_crit_sec is used to protect only pg_signal_queue. That is the only
* variable that can be accessed from the signal sending threads! */
static CRITICAL_SECTION pg_signal_crit_sec;
static int pg_signal_queue;
#define PG_SIGNAL_COUNT 32
static pqsigfunc pg_signal_array[PG_SIGNAL_COUNT];
static pqsigfunc pg_signal_defaults[PG_SIGNAL_COUNT];
static int pg_signal_mask;
HANDLE pgwin32_main_thread_handle;
/* Signal handling thread function */
static DWORD WINAPI pg_signal_thread(LPVOID param);
static BOOL WINAPI pg_console_handler(DWORD dwCtrlType);
/* Initialization */
void
pgwin32_signal_initialize(void)
{
int i;
HANDLE signal_thread_handle;
InitializeCriticalSection(&pg_signal_crit_sec);
for (i = 0; i < PG_SIGNAL_COUNT; i++)
{
pg_signal_array[i] = SIG_DFL;
pg_signal_defaults[i] = SIG_IGN;
}
pg_signal_mask = 0;
pg_signal_queue = 0;
/* Get handle to main thread so we can post calls to it later */
if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
GetCurrentProcess(), &pgwin32_main_thread_handle,
0, FALSE, DUPLICATE_SAME_ACCESS))
ereport(FATAL,
(errmsg_internal("Failed to get main thread handle!")));
/* Create thread for handling signals */
signal_thread_handle = CreateThread(NULL, 0, pg_signal_thread, NULL, 0, NULL);
if (signal_thread_handle == NULL)
ereport(FATAL,
(errmsg_internal("Failed to create signal handler thread!")));
if (!SetConsoleCtrlHandler(pg_console_handler, TRUE))
ereport(FATAL,
(errmsg_internal("Failed to set console control handler!")));
}
/* Dispatch all signals currently queued and not blocked
* Blocked signals are ignored, and will be fired at the time of
* the sigsetmask() call. */
static void
dispatch_queued_signals(void)
{
int i;
EnterCriticalSection(&pg_signal_crit_sec);
while (pg_signal_queue & ~pg_signal_mask)
{
/* One or more unblocked signals queued for execution */
int exec_mask = pg_signal_queue & ~pg_signal_mask;
for (i = 0; i < PG_SIGNAL_COUNT; i++)
{
if (exec_mask & sigmask(i))
{
/* Execute this signal */
pqsigfunc sig = pg_signal_array[i];
if (sig == SIG_DFL)
sig = pg_signal_defaults[i];
pg_signal_queue &= ~sigmask(i);
if (sig != SIG_ERR && sig != SIG_IGN && sig != SIG_DFL)
{
LeaveCriticalSection(&pg_signal_crit_sec);
sig(i);
EnterCriticalSection(&pg_signal_crit_sec);
break; /* Restart outer loop, in case signal mask
* or queue has been modified inside
* signal handler */
}
}
}
}
LeaveCriticalSection(&pg_signal_crit_sec);
}
/* signal masking. Only called on main thread, no sync required */
int
pqsigsetmask(int mask)
{
int prevmask;
prevmask = pg_signal_mask;
pg_signal_mask = mask;
/*
* Dispatch any signals queued up right away, in case we have
* unblocked one or more signals previously queued
*/
dispatch_queued_signals();
return prevmask;
}
/* signal manipulation. Only called on main thread, no sync required */
pqsigfunc
pqsignal(int signum, pqsigfunc handler)
{
pqsigfunc prevfunc;
if (signum >= PG_SIGNAL_COUNT || signum < 0)
return SIG_ERR;
prevfunc = pg_signal_array[signum];
pg_signal_array[signum] = handler;
return prevfunc;
}
/* signal sending */
int
pqkill(int pid, int sig)
{
char pipename[128];
BYTE sigData = sig;
BYTE sigRet = 0;
DWORD bytes;
if (sig >= PG_SIGNAL_COUNT || sig <= 0)
{
errno = EINVAL;
return -1;
}
if (pid <= 0)
{
/* No support for process groups */
errno = EINVAL;
return -1;
}
wsprintf(pipename, "\\\\.\\pipe\\pgsignal_%i", pid);
if (!CallNamedPipe(pipename, &sigData, 1, &sigRet, 1, &bytes, 1000))
{
if (GetLastError() == ERROR_FILE_NOT_FOUND)
errno = ESRCH;
else if (GetLastError() == ERROR_ACCESS_DENIED)
errno = EPERM;
else
errno = EINVAL;
return -1;
}
if (bytes != 1 || sigRet != sig)
{
errno = ESRCH;
return -1;
}
return 0;
}
/* APC callback scheduled on main thread when signals are fired */
static void CALLBACK
pg_signal_apc(ULONG_PTR param)
{
dispatch_queued_signals();
}
/*
* All functions below execute on the signal handler thread
* and must be synchronized as such!
* NOTE! The only global variable that can be used is
* pg_signal_queue!
*/
void
pg_queue_signal(int signum)
{
if (signum >= PG_SIGNAL_COUNT || signum < 0)
return;
EnterCriticalSection(&pg_signal_crit_sec);
pg_signal_queue |= sigmask(signum);
LeaveCriticalSection(&pg_signal_crit_sec);
QueueUserAPC(pg_signal_apc, pgwin32_main_thread_handle, (ULONG_PTR) NULL);
}
/* Signal dispatching thread */
static DWORD WINAPI
pg_signal_dispatch_thread(LPVOID param)
{
HANDLE pipe = (HANDLE) param;
BYTE sigNum;
DWORD bytes;
if (!ReadFile(pipe, &sigNum, 1, &bytes, NULL))
{
/* Client died before sending */
CloseHandle(pipe);
return 0;
}
if (bytes != 1)
{
/* Received <bytes> bytes over signal pipe (should be 1) */
CloseHandle(pipe);
return 0;
}
WriteFile(pipe, &sigNum, 1, &bytes, NULL); /* Don't care if it works
* or not.. */
FlushFileBuffers(pipe);
DisconnectNamedPipe(pipe);
CloseHandle(pipe);
pg_queue_signal(sigNum);
return 0;
}
/* Signal handling thread */
static DWORD WINAPI
pg_signal_thread(LPVOID param)
{
char pipename[128];
HANDLE pipe = INVALID_HANDLE_VALUE;
wsprintf(pipename, "\\\\.\\pipe\\pgsignal_%i", GetCurrentProcessId());
for (;;)
{
BOOL fConnected;
HANDLE hThread;
pipe = CreateNamedPipe(pipename, PIPE_ACCESS_DUPLEX,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES, 16, 16, 1000, NULL);
if (pipe == INVALID_HANDLE_VALUE)
{
fprintf(stderr, gettext("Failed to create signal listener pipe: %i. Retrying.\n"), (int) GetLastError());
SleepEx(500, TRUE);
continue;
}
fConnected = ConnectNamedPipe(pipe, NULL) ? TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);
if (fConnected)
{
hThread = CreateThread(NULL, 0,
(LPTHREAD_START_ROUTINE) pg_signal_dispatch_thread,
(LPVOID) pipe, 0, NULL);
if (hThread == INVALID_HANDLE_VALUE)
fprintf(stderr, gettext("Failed to create signal dispatch thread: %i\n"), (int) GetLastError());
else
CloseHandle(hThread);
}
else
/* Connection failed. Cleanup and try again */
CloseHandle(pipe);
}
return 0;
}
/* Console control handler will execute on a thread created
by the OS at the time of invocation */
static BOOL WINAPI pg_console_handler(DWORD dwCtrlType) {
if (dwCtrlType == CTRL_C_EVENT ||
dwCtrlType == CTRL_BREAK_EVENT ||
dwCtrlType == CTRL_CLOSE_EVENT ||
dwCtrlType == CTRL_SHUTDOWN_EVENT) {
pg_queue_signal(SIGINT);
return TRUE;
}
return FALSE;
}
#endif
#endif /* WIN32 */

View File

@ -4,7 +4,7 @@
# Makefile for port/win32
#
# IDENTIFICATION
# $PostgreSQL: pgsql/src/backend/port/win32/Makefile,v 1.3 2004/02/18 16:25:12 momjian Exp $
# $PostgreSQL: pgsql/src/backend/port/win32/Makefile,v 1.4 2004/04/12 16:19:18 momjian Exp $
#
#-------------------------------------------------------------------------
@ -12,7 +12,7 @@ subdir = src/backend/port/win32
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = sema.o shmem.o timer.o
OBJS = sema.o shmem.o timer.o socket.o signal.o
all: SUBSYS.o

View File

@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/port/win32/sema.c,v 1.5 2004/02/12 20:37:34 momjian Exp $
* $PostgreSQL: pgsql/src/backend/port/win32/sema.c,v 1.6 2004/04/12 16:19:18 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@ -228,11 +228,12 @@ semop(int semId, struct sembuf * sops, int nsops)
if (sops[0].sem_op == -1)
{
DWORD ret;
HANDLE wh[2];
if (sops[0].sem_flg & IPC_NOWAIT)
ret = WaitForSingleObject(cur_handle, 0);
else
ret = WaitForSingleObject(cur_handle, INFINITE);
wh[0] = cur_handle;
wh[1] = pgwin32_signal_event;
ret = WaitForMultipleObjects(2, wh, FALSE, (sops[0].sem_flg & IPC_NOWAIT)?0:INFINITE);
if (ret == WAIT_OBJECT_0)
{
@ -240,6 +241,12 @@ semop(int semId, struct sembuf * sops, int nsops)
sem_counts[sops[0].sem_num]--;
return 0;
}
else if (ret == WAIT_OBJECT_0+1)
{
/* Signal event is set - we have a signal to deliver */
pgwin32_dispatch_queued_signals();
errno = EINTR;
}
else if (ret == WAIT_TIMEOUT)
/* Couldn't get it */
errno = EAGAIN;

View File

@ -0,0 +1,300 @@
/*-------------------------------------------------------------------------
*
* signal.c
* Microsoft Windows Win32 Signal Emulation Functions
*
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/port/win32/signal.c,v 1.1 2004/04/12 16:19:18 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <libpq/pqsignal.h>
/* pg_signal_crit_sec is used to protect only pg_signal_queue. That is the only
* variable that can be accessed from the signal sending threads! */
static CRITICAL_SECTION pg_signal_crit_sec;
static int pg_signal_queue;
#define PG_SIGNAL_COUNT 32
static pqsigfunc pg_signal_array[PG_SIGNAL_COUNT];
static pqsigfunc pg_signal_defaults[PG_SIGNAL_COUNT];
static int pg_signal_mask;
DLLIMPORT HANDLE pgwin32_signal_event;
/* Signal handling thread function */
static DWORD WINAPI pg_signal_thread(LPVOID param);
static BOOL WINAPI pg_console_handler(DWORD dwCtrlType);
/* Sleep function that can be interrupted by signals */
void pgwin32_backend_usleep(long microsec) {
if (WaitForSingleObject(pgwin32_signal_event, (microsec < 500 ? 1 : (microsec + 500) / 1000)) == WAIT_OBJECT_0) {
pgwin32_dispatch_queued_signals();
errno = EINTR;
return;
}
}
/* Initialization */
void
pgwin32_signal_initialize(void)
{
int i;
HANDLE signal_thread_handle;
InitializeCriticalSection(&pg_signal_crit_sec);
for (i = 0; i < PG_SIGNAL_COUNT; i++)
{
pg_signal_array[i] = SIG_DFL;
pg_signal_defaults[i] = SIG_IGN;
}
pg_signal_mask = 0;
pg_signal_queue = 0;
/* Create the global event handle used to flag signals */
pgwin32_signal_event = CreateEvent(NULL, TRUE, FALSE, NULL);
if (pgwin32_signal_event == NULL)
ereport(FATAL,
(errmsg_internal("Failed to create signal event: %i!",(int)GetLastError())));
/* Create thread for handling signals */
signal_thread_handle = CreateThread(NULL, 0, pg_signal_thread, NULL, 0, NULL);
if (signal_thread_handle == NULL)
ereport(FATAL,
(errmsg_internal("Failed to create signal handler thread!")));
/* Create console control handle to pick up Ctrl-C etc */
if (!SetConsoleCtrlHandler(pg_console_handler, TRUE))
ereport(FATAL,
(errmsg_internal("Failed to set console control handler!")));
}
/* Dispatch all signals currently queued and not blocked
* Blocked signals are ignored, and will be fired at the time of
* the sigsetmask() call. */
void
pgwin32_dispatch_queued_signals(void)
{
int i;
EnterCriticalSection(&pg_signal_crit_sec);
while (pg_signal_queue & ~pg_signal_mask)
{
/* One or more unblocked signals queued for execution */
int exec_mask = pg_signal_queue & ~pg_signal_mask;
for (i = 0; i < PG_SIGNAL_COUNT; i++)
{
if (exec_mask & sigmask(i))
{
/* Execute this signal */
pqsigfunc sig = pg_signal_array[i];
if (sig == SIG_DFL)
sig = pg_signal_defaults[i];
pg_signal_queue &= ~sigmask(i);
if (sig != SIG_ERR && sig != SIG_IGN && sig != SIG_DFL)
{
LeaveCriticalSection(&pg_signal_crit_sec);
sig(i);
EnterCriticalSection(&pg_signal_crit_sec);
break; /* Restart outer loop, in case signal mask
* or queue has been modified inside
* signal handler */
}
}
}
}
ResetEvent(pgwin32_signal_event);
LeaveCriticalSection(&pg_signal_crit_sec);
}
/* signal masking. Only called on main thread, no sync required */
int
pqsigsetmask(int mask)
{
int prevmask;
prevmask = pg_signal_mask;
pg_signal_mask = mask;
/*
* Dispatch any signals queued up right away, in case we have
* unblocked one or more signals previously queued
*/
pgwin32_dispatch_queued_signals();
return prevmask;
}
/* signal manipulation. Only called on main thread, no sync required */
pqsigfunc
pqsignal(int signum, pqsigfunc handler)
{
pqsigfunc prevfunc;
if (signum >= PG_SIGNAL_COUNT || signum < 0)
return SIG_ERR;
prevfunc = pg_signal_array[signum];
pg_signal_array[signum] = handler;
return prevfunc;
}
/* signal sending */
int
pqkill(int pid, int sig)
{
char pipename[128];
BYTE sigData = sig;
BYTE sigRet = 0;
DWORD bytes;
if (sig >= PG_SIGNAL_COUNT || sig <= 0)
{
errno = EINVAL;
return -1;
}
if (pid <= 0)
{
/* No support for process groups */
errno = EINVAL;
return -1;
}
wsprintf(pipename, "\\\\.\\pipe\\pgsignal_%i", pid);
if (!CallNamedPipe(pipename, &sigData, 1, &sigRet, 1, &bytes, 1000))
{
if (GetLastError() == ERROR_FILE_NOT_FOUND)
errno = ESRCH;
else if (GetLastError() == ERROR_ACCESS_DENIED)
errno = EPERM;
else
errno = EINVAL;
return -1;
}
if (bytes != 1 || sigRet != sig)
{
errno = ESRCH;
return -1;
}
return 0;
}
/*
* All functions below execute on the signal handler thread
* and must be synchronized as such!
* NOTE! The only global variable that can be used is
* pg_signal_queue!
*/
void
pg_queue_signal(int signum)
{
if (signum >= PG_SIGNAL_COUNT || signum < 0)
return;
EnterCriticalSection(&pg_signal_crit_sec);
pg_signal_queue |= sigmask(signum);
LeaveCriticalSection(&pg_signal_crit_sec);
SetEvent(pgwin32_signal_event);
}
/* Signal dispatching thread */
static DWORD WINAPI
pg_signal_dispatch_thread(LPVOID param)
{
HANDLE pipe = (HANDLE) param;
BYTE sigNum;
DWORD bytes;
if (!ReadFile(pipe, &sigNum, 1, &bytes, NULL))
{
/* Client died before sending */
CloseHandle(pipe);
return 0;
}
if (bytes != 1)
{
/* Received <bytes> bytes over signal pipe (should be 1) */
CloseHandle(pipe);
return 0;
}
WriteFile(pipe, &sigNum, 1, &bytes, NULL); /* Don't care if it works
* or not.. */
FlushFileBuffers(pipe);
DisconnectNamedPipe(pipe);
CloseHandle(pipe);
pg_queue_signal(sigNum);
return 0;
}
/* Signal handling thread */
static DWORD WINAPI
pg_signal_thread(LPVOID param)
{
char pipename[128];
HANDLE pipe = INVALID_HANDLE_VALUE;
wsprintf(pipename, "\\\\.\\pipe\\pgsignal_%i", GetCurrentProcessId());
for (;;)
{
BOOL fConnected;
HANDLE hThread;
pipe = CreateNamedPipe(pipename, PIPE_ACCESS_DUPLEX,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES, 16, 16, 1000, NULL);
if (pipe == INVALID_HANDLE_VALUE)
{
fprintf(stderr, gettext("Failed to create signal listener pipe: %i. Retrying.\n"), (int) GetLastError());
SleepEx(500, FALSE);
continue;
}
fConnected = ConnectNamedPipe(pipe, NULL) ? TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);
if (fConnected)
{
hThread = CreateThread(NULL, 0,
(LPTHREAD_START_ROUTINE) pg_signal_dispatch_thread,
(LPVOID) pipe, 0, NULL);
if (hThread == INVALID_HANDLE_VALUE)
fprintf(stderr, gettext("Failed to create signal dispatch thread: %i\n"), (int) GetLastError());
else
CloseHandle(hThread);
}
else
/* Connection failed. Cleanup and try again */
CloseHandle(pipe);
}
return 0;
}
/* Console control handler will execute on a thread created
by the OS at the time of invocation */
static BOOL WINAPI pg_console_handler(DWORD dwCtrlType) {
if (dwCtrlType == CTRL_C_EVENT ||
dwCtrlType == CTRL_BREAK_EVENT ||
dwCtrlType == CTRL_CLOSE_EVENT ||
dwCtrlType == CTRL_SHUTDOWN_EVENT) {
pg_queue_signal(SIGINT);
return TRUE;
}
return FALSE;
}

View File

@ -0,0 +1,434 @@
/*-------------------------------------------------------------------------
*
* socket.c
* Microsoft Windows Win32 Socket Functions
*
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/port/win32/socket.c,v 1.1 2004/04/12 16:19:18 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#undef socket
#undef accept
#undef connect
#undef select
#undef recv
#undef send
/*
* Blocking socket functions implemented so they listen on both
* the socket and the signal event, required for signal handling.
*/
/*
* Convert the last socket error code into errno
*/
static void TranslateSocketError(void) {
switch (WSAGetLastError()) {
case WSANOTINITIALISED:
case WSAENETDOWN:
case WSAEINPROGRESS:
case WSAEINVAL:
case WSAESOCKTNOSUPPORT:
case WSAEFAULT:
case WSAEINVALIDPROVIDER:
case WSAEINVALIDPROCTABLE:
case WSAEMSGSIZE:
errno = EINVAL;
break;
case WSAEAFNOSUPPORT:
errno = EAFNOSUPPORT;
break;
case WSAEMFILE:
errno = EMFILE;
break;
case WSAENOBUFS:
errno = ENOBUFS;
break;
case WSAEPROTONOSUPPORT:
case WSAEPROTOTYPE:
errno = EPROTONOSUPPORT;
break;
case WSAECONNREFUSED:
errno = ECONNREFUSED;
break;
case WSAEINTR:
errno = EINTR;
break;
case WSAENOTSOCK:
errno = EBADFD;
break;
case WSAEOPNOTSUPP:
errno = EOPNOTSUPP;
break;
case WSAEWOULDBLOCK:
errno = EWOULDBLOCK;
break;
case WSAEACCES:
errno = EACCES;
break;
case WSAENOTCONN:
case WSAENETRESET:
case WSAECONNRESET:
case WSAESHUTDOWN:
case WSAECONNABORTED:
case WSAEDISCON:
errno = ECONNREFUSED; /*ENOTCONN?*/
break;
default:
ereport(NOTICE,
(errmsg_internal("Unknown win32 socket error code: %i",WSAGetLastError())));
errno = EINVAL;
}
}
static int pgwin32_poll_signals(void) {
if (WaitForSingleObject(pgwin32_signal_event,0) == WAIT_OBJECT_0) {
pgwin32_dispatch_queued_signals();
errno = EINTR;
return 1;
}
return 0;
}
static int pgwin32_waitforsinglesocket(SOCKET s, int what) {
static HANDLE waitevent = INVALID_HANDLE_VALUE;
HANDLE events[2];
int r;
if (waitevent == INVALID_HANDLE_VALUE) {
waitevent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (waitevent == INVALID_HANDLE_VALUE)
ereport(ERROR,
(errmsg_internal("Failed to create socket waiting event: %i",(int)GetLastError())));
}
else
if (!ResetEvent(waitevent))
ereport(ERROR,
(errmsg_internal("Failed to reset socket waiting event: %i",(int)GetLastError())));
if (WSAEventSelect(s, waitevent, what) == SOCKET_ERROR) {
TranslateSocketError();
return 0;
}
events[0] = pgwin32_signal_event;
events[1] = waitevent;
r = WaitForMultipleObjects(2, events, FALSE, INFINITE);
if (r == WAIT_OBJECT_0) {
pgwin32_dispatch_queued_signals();
errno = EINTR;
return 0;
}
if (r == WAIT_OBJECT_0+1)
return 1;
ereport(ERROR,
(errmsg_internal("Bad return from WaitForMultipleObjects: %i (%i)",r,(int)GetLastError())));
return 0;
}
/*
* Create a socket, setting it to overlapped and non-blocking
*/
SOCKET pgwin32_socket(int af, int type, int protocol) {
SOCKET s;
unsigned long on = 1;
s = WSASocket(af, type, protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
if (s == INVALID_SOCKET) {
TranslateSocketError();
return INVALID_SOCKET;
}
if (ioctlsocket(s, FIONBIO, &on)) {
TranslateSocketError();
return INVALID_SOCKET;
}
errno = 0;
return s;
}
SOCKET pgwin32_accept(SOCKET s, struct sockaddr* addr, int* addrlen) {
SOCKET rs;
/* Poll for signals, but don't return with EINTR, since we don't
handle that in pqcomm.c */
pgwin32_poll_signals();
rs = WSAAccept(s, addr, addrlen, NULL, 0);
if (rs == INVALID_SOCKET) {
TranslateSocketError();
return INVALID_SOCKET;
}
return rs;
}
/* No signal delivery during connect. */
int pgwin32_connect(SOCKET s, const struct sockaddr *addr, int addrlen) {
int r;
r = WSAConnect(s, addr, addrlen, NULL, NULL, NULL, NULL);
if (r == 0)
return 0;
if (WSAGetLastError() != WSAEWOULDBLOCK) {
TranslateSocketError();
return -1;
}
while (pgwin32_waitforsinglesocket(s, FD_CONNECT) == 0) {
/* Loop endlessly as long as we are just delivering signals */
}
return 0;
}
int pgwin32_recv(SOCKET s, char *buf, int len, int f) {
WSABUF wbuf;
int r;
DWORD b;
DWORD flags = f;
if (pgwin32_poll_signals())
return -1;
wbuf.len = len;
wbuf.buf = buf;
r = WSARecv(s, &wbuf, 1, &b, &flags, NULL, NULL);
if (r != SOCKET_ERROR && b > 0)
/* Read succeeded right away */
return b;
if (r == SOCKET_ERROR &&
WSAGetLastError() != WSAEWOULDBLOCK) {
TranslateSocketError();
return -1;
}
/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
if (pgwin32_waitforsinglesocket(s, FD_READ | FD_CLOSE | FD_ACCEPT) == 0)
return -1;
r = WSARecv(s, &wbuf, 1, &b, &flags, NULL, NULL);
if (r == SOCKET_ERROR) {
TranslateSocketError();
return -1;
}
return b;
}
int pgwin32_send(SOCKET s, char *buf, int len, int flags) {
WSABUF wbuf;
int r;
DWORD b;
if (pgwin32_poll_signals())
return -1;
wbuf.len = len;
wbuf.buf = buf;
r = WSASend(s, &wbuf, 1, &b, flags, NULL, NULL);
if (r != SOCKET_ERROR && b > 0)
/* Write succeeded right away */
return b;
if (r == SOCKET_ERROR &&
WSAGetLastError() != WSAEWOULDBLOCK) {
TranslateSocketError();
return -1;
}
/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE) == 0)
return -1;
r = WSASend(s, &wbuf, 1, &b, flags, NULL, NULL);
if (r == SOCKET_ERROR) {
TranslateSocketError();
return -1;
}
return b;
}
/*
* Wait for activity on one or more sockets.
* While waiting, allow signals to run
*
* NOTE! Currently does not implement exceptfds check,
* since it is not used in postgresql!
*/
int pgwin32_select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const struct timeval* timeout) {
WSAEVENT events[FD_SETSIZE*2]; /* worst case is readfds totally different
* from writefds, so 2*FD_SETSIZE sockets */
SOCKET sockets[FD_SETSIZE*2];
int numevents=0;
int i;
int r;
DWORD timeoutval = WSA_INFINITE;
FD_SET outreadfds;
FD_SET outwritefds;
int nummatches = 0;
Assert(exceptfds == NULL);
if (pgwin32_poll_signals())
return -1;
FD_ZERO(&outreadfds);
FD_ZERO(&outwritefds);
/* Write FDs are different in the way that it is only flagged by
* WSASelectEvent() if we have tried to write to them first. So try
* an empty write */
if (writefds) {
for (i = 0; i < writefds->fd_count; i++) {
char c;
WSABUF buf;
DWORD sent;
buf.buf = &c;
buf.len = 0;
r = WSASend(writefds->fd_array[i], &buf, 1, &sent, 0, NULL, NULL);
if (r == 0) /* Completed - means things are fine! */
FD_SET(writefds->fd_array[i], &outwritefds);
else { /* Not completed */
if (WSAGetLastError() != WSAEWOULDBLOCK)
/* Not completed, and not just "would block", so
* an error occured */
FD_SET(writefds->fd_array[i], &outwritefds);
}
}
if (outwritefds.fd_count > 0) {
memcpy(writefds,&outwritefds,sizeof(fd_set));
if (readfds)
FD_ZERO(readfds);
return outwritefds.fd_count;
}
}
/* Now set up for an actual select */
if (timeout != NULL) {
/* timeoutval is in milliseconds */
timeoutval = timeout->tv_sec*1000 + timeout->tv_usec / 1000;
}
if (readfds != NULL) {
for (i=0; i < readfds->fd_count; i++) {
events[numevents] = WSACreateEvent();
sockets[numevents] = readfds->fd_array[i];
numevents++;
}
}
if (writefds != NULL) {
for (i=0; i < writefds->fd_count; i++) {
if (!readfds ||
!FD_ISSET(writefds->fd_array[i], readfds)) {
/* If the socket is not in the read list */
events[numevents] = WSACreateEvent();
sockets[numevents] = writefds->fd_array[i];
numevents++;
}
}
}
for (i=0; i < numevents; i++) {
int flags = 0;
if (readfds && FD_ISSET(sockets[i],readfds))
flags |= FD_READ | FD_ACCEPT | FD_CLOSE;
if (writefds && FD_ISSET(sockets[i],writefds))
flags |= FD_WRITE | FD_CLOSE;
if (WSAEventSelect(sockets[i], events[i], flags) == SOCKET_ERROR) {
TranslateSocketError();
for (i = 0; i < numevents; i++)
WSACloseEvent(events[i]);
return -1;
}
}
events[numevents] = pgwin32_signal_event;
r = WaitForMultipleObjectsEx(numevents+1, events, FALSE, timeoutval, FALSE);
if (r != WSA_WAIT_TIMEOUT && r != (WAIT_OBJECT_0+numevents)) {
/* We scan all events, even those not signalled, in case more
* than one event has been tagged but Wait.. can only return one.
*/
WSANETWORKEVENTS resEvents;
for (i=0; i < numevents; i++) {
ZeroMemory(&resEvents,sizeof(resEvents));
if (WSAEnumNetworkEvents(sockets[i],events[i],&resEvents) == SOCKET_ERROR)
ereport(FATAL,
(errmsg_internal("failed to enumerate network events: %i",(int)GetLastError())));
/* Read activity? */
if (readfds && FD_ISSET(sockets[i], readfds)) {
if ((resEvents.lNetworkEvents & FD_READ) ||
(resEvents.lNetworkEvents & FD_ACCEPT) ||
(resEvents.lNetworkEvents & FD_CLOSE)) {
FD_SET(sockets[i],&outreadfds);
nummatches++;
}
}
/* Write activity? */
if (writefds && FD_ISSET(sockets[i], writefds)) {
if ((resEvents.lNetworkEvents & FD_WRITE) ||
(resEvents.lNetworkEvents & FD_CLOSE)) {
FD_SET(sockets[i],&outwritefds);
nummatches++;
}
}
}
}
/* Clean up all handles */
for (i = 0; i < numevents; i++) {
WSAEventSelect(sockets[i], events[i], 0);
WSACloseEvent(events[i]);
}
if (r == WSA_WAIT_TIMEOUT) {
if (readfds)
FD_ZERO(readfds);
if (writefds)
FD_ZERO(writefds);
return 0;
}
if (r == WAIT_OBJECT_0+numevents) {
pgwin32_dispatch_queued_signals();
errno = EINTR;
if (readfds)
FD_ZERO(readfds);
if (writefds)
FD_ZERO(writefds);
return -1;
}
/* Overwrite socket sets with our resulting values */
if (readfds)
memcpy(readfds, &outreadfds, sizeof(fd_set));
if (writefds)
memcpy(writefds, &outwritefds, sizeof(fd_set));
return nummatches;
}

View File

@ -13,7 +13,7 @@
*
* Copyright (c) 2001-2003, PostgreSQL Global Development Group
*
* $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.65 2004/04/05 03:16:21 momjian Exp $
* $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.66 2004/04/12 16:19:18 momjian Exp $
* ----------
*/
#include "postgres.h"
@ -83,6 +83,7 @@ NON_EXEC_STATIC int pgStatSock = -1;
static int pgStatPipe[2];
static struct sockaddr_storage pgStatAddr;
static int pgStatPmPipe[2] = {-1, -1};
static int pgStatCollectorPmPipe[2] = {-1, -1};
static int pgStatPid;
static time_t last_pgstat_start_time;
@ -410,7 +411,7 @@ pgstat_init(void)
/*
* Create the pipe that controls the statistics collector shutdown
*/
if (pgpipe(pgStatPmPipe) < 0)
if (pgpipe(pgStatPmPipe) < 0 || pgpipe(pgStatCollectorPmPipe) < 0)
{
ereport(LOG,
(errcode_for_socket_access(),
@ -451,9 +452,9 @@ static pid_t
pgstat_forkexec(STATS_PROCESS_TYPE procType)
{
pid_t pid;
char *av[13];
char *av[15];
int ac = 0, bufc = 0, i;
char pgstatBuf[10][MAXPGPATH];
char pgstatBuf[12][MAXPGPATH];
av[ac++] = "postgres";
switch (procType)
@ -475,6 +476,8 @@ pgstat_forkexec(STATS_PROCESS_TYPE procType)
snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatSock);
snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatPmPipe[0]);
snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatPmPipe[1]);
snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatCollectorPmPipe[0]);
snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatCollectorPmPipe[1]);
snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatPipe[0]);
snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatPipe[1]);
@ -518,11 +521,13 @@ pgstat_forkexec(STATS_PROCESS_TYPE procType)
static void
pgstat_parseArgs(PGSTAT_FORK_ARGS)
{
Assert(argc == 10);
Assert(argc == 12);
argc = 0;
pgStatSock = atoi(argv[argc++]);
pgStatPmPipe[0] = atoi(argv[argc++]);
pgStatPmPipe[1] = atoi(argv[argc++]);
pgStatCollectorPmPipe[0] = atoi(argv[argc++]);
pgStatCollectorPmPipe[1] = atoi(argv[argc++]);
pgStatPipe[0] = atoi(argv[argc++]);
pgStatPipe[1] = atoi(argv[argc++]);
MaxBackends = atoi(argv[argc++]);
@ -676,6 +681,12 @@ pgstat_close_sockets(void)
if (pgStatPmPipe[1] >= 0)
closesocket(pgStatPmPipe[1]);
pgStatPmPipe[1] = -1;
if (pgStatCollectorPmPipe[0] >= 0)
closesocket(pgStatCollectorPmPipe[0]);
pgStatCollectorPmPipe[0] = -1;
if (pgStatCollectorPmPipe[1] >= 0)
closesocket(pgStatCollectorPmPipe[1]);
pgStatCollectorPmPipe[1] = -1;
}
@ -1491,6 +1502,8 @@ pgstat_main(PGSTAT_FORK_ARGS)
*/
closesocket(pgStatPmPipe[1]);
pgStatPmPipe[1] = -1;
closesocket(pgStatCollectorPmPipe[1]);
pgStatCollectorPmPipe[1] = -1;
/*
* Start a buffering process to read from the socket, so we have a
@ -1547,7 +1560,6 @@ pgstat_mainChild(PGSTAT_FORK_ARGS)
fd_set rfds;
int readPipe;
int pmPipe;
int maxfd;
int nready;
int len = 0;
struct timeval timeout;
@ -1564,7 +1576,7 @@ pgstat_mainChild(PGSTAT_FORK_ARGS)
closesocket(pgStatPipe[1]);
closesocket(pgStatSock);
pmPipe = pgStatPmPipe[0];
pmPipe = pgStatCollectorPmPipe[0];
/*
* In the child we can have default SIGCHLD handling (in case we want
@ -1666,14 +1678,11 @@ pgstat_mainChild(PGSTAT_FORK_ARGS)
*/
FD_ZERO(&rfds);
FD_SET(readPipe, &rfds);
FD_SET(pmPipe, &rfds);
maxfd = Max(readPipe, pmPipe);
/*
* Now wait for something to do.
*/
nready = select(maxfd + 1, &rfds, NULL, NULL,
nready = select(readPipe+1, &rfds, NULL, NULL,
(need_statwrite) ? &timeout : NULL);
if (nready < 0)
{
@ -1845,7 +1854,12 @@ pgstat_mainChild(PGSTAT_FORK_ARGS)
* is still open. If it is read-ready (ie, EOF), the postmaster must
* have quit.
*/
if (FD_ISSET(pmPipe, &rfds))
FD_ZERO(&rfds);
FD_SET(pmPipe, &rfds);
timeout.tv_sec = 0;
timeout.tv_usec = 0;
nready = select(pmPipe+1,&rfds,NULL,NULL,&timeout);
if (nready > 0 && FD_ISSET(pmPipe, &rfds))
pgstat_write_statsfile();
}

View File

@ -37,7 +37,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.379 2004/03/24 15:20:54 tgl Exp $
* $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.380 2004/04/12 16:19:18 momjian Exp $
*
* NOTES
*
@ -2017,35 +2017,6 @@ reaper_done:
}
#ifdef WIN32
/*
* On WIN32, we cannot use socket functions inside
* an APC (signal handler). If we do, select() will return
* with incorrect return values, causing the postmaster to
* enter a blocking accept(). We work around this by
* running it on a separate thread. We still block the main
* thread until it is done, so we don't scribble over any
* data from the wrong thread (pgstat functions aqre not
* thread safe).
*/
static DWORD WINAPI win32_pgstat_beterm_thread(LPVOID param)
{
pgstat_beterm((int)param);
return 0;
}
static void win32_pgstat_beterm(int pid) {
HANDLE beterm_thread = CreateThread(NULL, 64*1024, win32_pgstat_beterm_thread, (LPVOID)pid, 0, NULL);
if (!beterm_thread)
ereport(FATAL,
(errmsg_internal("failed to create beterm sender thread: %i", (int)GetLastError())));
if (WaitForSingleObject(beterm_thread,INFINITE) != WAIT_OBJECT_0)
ereport(FATAL,
(errmsg_internal("failed to wait for beterm sender thread: %i", (int)GetLastError())));
CloseHandle(beterm_thread);
}
#endif
/*
* CleanupProc -- cleanup after terminated backend.
*
@ -2099,11 +2070,7 @@ CleanupProc(int pid,
else if (pid == BgWriterPID)
BgWriterPID = 0;
else
#ifndef WIN32
pgstat_beterm(pid);
#else
win32_pgstat_beterm(pid);
#endif
return;
}

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/libpq/pqsignal.h,v 1.25 2004/02/08 22:28:57 neilc Exp $
* $PostgreSQL: pgsql/src/include/libpq/pqsignal.h,v 1.26 2004/04/12 16:19:18 momjian Exp $
*
* NOTES
* This shouldn't be in libpq, but the monitor and some other
@ -18,9 +18,7 @@
#ifndef PQSIGNAL_H
#define PQSIGNAL_H
#ifndef WIN32
#include <signal.h>
#endif
#ifdef HAVE_SIGPROCMASK
extern sigset_t UnBlockSig,
@ -46,23 +44,5 @@ typedef void (*pqsigfunc) (int);
extern void pqinitmask(void);
extern pqsigfunc pqsignal(int signo, pqsigfunc func);
extern void pg_queue_signal(int signum);
#ifdef WIN32
#define sigmask(sig) ( 1 << (sig-1) )
void pgwin32_signal_initialize(void);
extern HANDLE pgwin32_main_thread_handle;
#define PG_POLL_SIGNALS() WaitForSingleObjectEx(pgwin32_main_thread_handle,0,TRUE);
/* Signal function return values */
#undef SIG_DFL
#undef SIG_ERR
#undef SIG_IGN
#define SIG_DFL ((pqsigfunc)0)
#define SIG_ERR ((pqsigfunc)-1)
#define SIG_IGN ((pqsigfunc)1)
#endif
#endif /* PQSIGNAL_H */

View File

@ -13,7 +13,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/miscadmin.h,v 1.155 2004/03/24 22:40:29 tgl Exp $
* $PostgreSQL: pgsql/src/include/miscadmin.h,v 1.156 2004/04/12 16:19:18 momjian Exp $
*
* NOTES
* some of the information in this file should be moved to
@ -83,7 +83,8 @@ do { \
#else
#define CHECK_FOR_INTERRUPTS() \
do { \
WaitForSingleObjectEx(GetCurrentThread(),0,TRUE); \
if (WaitForSingleObject(pgwin32_signal_event,0) == WAIT_OBJECT_0) \
pgwin32_dispatch_queued_signals(); \
if (InterruptPending) \
ProcessInterrupts(); \
} while(0)

View File

@ -1,4 +1,4 @@
/* $PostgreSQL: pgsql/src/include/port/win32.h,v 1.20 2004/03/02 18:35:59 momjian Exp $ */
/* $PostgreSQL: pgsql/src/include/port/win32.h,v 1.21 2004/04/12 16:19:18 momjian Exp $ */
/* undefine and redefine after #include */
#undef mkdir
@ -98,15 +98,49 @@ int semctl(int semId, int semNum, int flag, union semun);
int semget(int semKey, int semNum, int flags);
int semop(int semId, struct sembuf * sops, int flag);
#define sleep(sec) (Sleep(sec * 1000), /* no return value */ 0)
/* In backend/port/win32/signal.c */
void pgwin32_signal_initialize(void);
extern DLLIMPORT HANDLE pgwin32_signal_event;
void pgwin32_dispatch_queued_signals(void);
void pg_queue_signal(int signum);
#define sigmask(sig) ( 1 << (sig-1) )
/* Signal function return values */
#undef SIG_DFL
#undef SIG_ERR
#undef SIG_IGN
#define SIG_DFL ((pqsigfunc)0)
#define SIG_ERR ((pqsigfunc)-1)
#define SIG_IGN ((pqsigfunc)1)
#ifndef FRONTEND
/* In libpq/pqsignal.c */
#define kill(pid,sig) pqkill(pid,sig)
int pqkill(int pid, int sig);
extern int pqkill(int pid, int sig);
#define pg_usleep(t) pgwin32_backend_usleep(t)
void pgwin32_backend_usleep(long microsec);
#endif
/* In backend/port/win32/socket.c */
#ifndef FRONTEND
#define socket(af, type, protocol) pgwin32_socket(af, type, protocol)
#define accept(s, addr, addrlen) pgwin32_accept(s, addr, addrlen)
#define connect(s, name, namelen) pgwin32_connect(s, name, namelen)
#define select(n, r, w, e, timeout) pgwin32_select(n, r, w, e, timeout)
#define recv(s, buf, len, flags) pgwin32_recv(s, buf, len, flags)
#define send(s, buf, len, flags) pgwin32_send(s, buf, len, flags)
SOCKET pgwin32_socket(int af, int type, int protocol);
SOCKET pgwin32_accept(SOCKET s, struct sockaddr* addr, int* addrlen);
int pgwin32_connect(SOCKET s, const struct sockaddr* name, int namelen);
int pgwin32_select(int nfds, fd_set* readfs, fd_set* writefds, fd_set* exceptfds, const struct timeval* timeout);
int pgwin32_recv(SOCKET s, char* buf, int len, int flags);
int pgwin32_send(SOCKET s, char* buf, int len, int flags);
#endif
/* Some extra signals */
#define SIGHUP 1
#define SIGQUIT 3
@ -179,3 +213,8 @@ int setitimer(int which, const struct itimerval *value, struct itimerval *ovalue
#define EWOULDBLOCK WSAEWOULDBLOCK
#define ECONNRESET WSAECONNRESET
#define EINPROGRESS WSAEINPROGRESS
#define ENOBUFS WSAENOBUFS
#define EPROTONOSUPPORT WSAEPROTONOSUPPORT
#define ECONNREFUSED WSAECONNREFUSED
#define EBADFD WSAENOTSOCK
#define EOPNOTSUPP WSAEOPNOTSUPP

View File

@ -6,7 +6,7 @@
*
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
*
* $PostgreSQL: pgsql/src/port/pgsleep.c,v 1.2 2004/02/10 04:23:03 tgl Exp $
* $PostgreSQL: pgsql/src/port/pgsleep.c,v 1.3 2004/04/12 16:19:18 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@ -15,7 +15,6 @@
#include <unistd.h>
#include <sys/time.h>
/*
* pg_usleep --- delay the specified number of microseconds.
*
@ -25,6 +24,9 @@
*
* On machines where "long" is 32 bits, the maximum delay is ~2000 seconds.
*/
#ifdef pg_usleep
#undef pg_usleep
#endif
void
pg_usleep(long microsec)
{
@ -37,7 +39,7 @@ pg_usleep(long microsec)
delay.tv_usec = microsec % 1000000L;
(void) select(0, NULL, NULL, NULL, &delay);
#else
SleepEx((microsec < 500 ? 1 : (microsec + 500) / 1000), TRUE);
SleepEx((microsec < 500 ? 1 : (microsec + 500) / 1000), FALSE);
#endif
}
}