Block interrupts during HandleParallelMessages().
As noted by Alvaro, there are CHECK_FOR_INTERRUPTS() calls in the shm_mq.c functions called by HandleParallelMessages(). I believe they're all unreachable since we always pass nowait = true, but it doesn't seem like a great idea to assume that no such call will ever be reachable from HandleParallelMessages(). If that did happen, there would be a risk of a recursive call to HandleParallelMessages(), which it does not appear to be designed for --- for example, there's nothing that would prevent out-of-order processing of received messages. And certainly such cases cannot easily be tested. So let's prevent it by holding off interrupts for the duration of the function. Back-patch to 9.5 which contains identical code. Discussion: <14869.1470083848@sss.pgh.pa.us>
This commit is contained in:
parent
c4d3a039f0
commit
b6a97b91ff
|
@ -704,14 +704,21 @@ HandleParallelMessages(void)
|
||||||
{
|
{
|
||||||
dlist_iter iter;
|
dlist_iter iter;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This is invoked from ProcessInterrupts(), and since some of the
|
||||||
|
* functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
|
||||||
|
* for recursive calls if more signals are received while this runs. It's
|
||||||
|
* unclear that recursive entry would be safe, and it doesn't seem useful
|
||||||
|
* even if it is safe, so let's block interrupts until done.
|
||||||
|
*/
|
||||||
|
HOLD_INTERRUPTS();
|
||||||
|
|
||||||
ParallelMessagePending = false;
|
ParallelMessagePending = false;
|
||||||
|
|
||||||
dlist_foreach(iter, &pcxt_list)
|
dlist_foreach(iter, &pcxt_list)
|
||||||
{
|
{
|
||||||
ParallelContext *pcxt;
|
ParallelContext *pcxt;
|
||||||
int i;
|
int i;
|
||||||
Size nbytes;
|
|
||||||
void *data;
|
|
||||||
|
|
||||||
pcxt = dlist_container(ParallelContext, node, iter.cur);
|
pcxt = dlist_container(ParallelContext, node, iter.cur);
|
||||||
if (pcxt->worker == NULL)
|
if (pcxt->worker == NULL)
|
||||||
|
@ -721,13 +728,15 @@ HandleParallelMessages(void)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Read as many messages as we can from each worker, but stop when
|
* Read as many messages as we can from each worker, but stop when
|
||||||
* either (1) the error queue goes away, which can happen if we
|
* either (1) the worker's error queue goes away, which can happen
|
||||||
* receive a Terminate message from the worker; or (2) no more
|
* if we receive a Terminate message from the worker; or (2) no
|
||||||
* messages can be read from the worker without blocking.
|
* more messages can be read from the worker without blocking.
|
||||||
*/
|
*/
|
||||||
while (pcxt->worker[i].error_mqh != NULL)
|
while (pcxt->worker[i].error_mqh != NULL)
|
||||||
{
|
{
|
||||||
shm_mq_result res;
|
shm_mq_result res;
|
||||||
|
Size nbytes;
|
||||||
|
void *data;
|
||||||
|
|
||||||
res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
|
res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
|
||||||
&data, true);
|
&data, true);
|
||||||
|
@ -749,6 +758,8 @@ HandleParallelMessages(void)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RESUME_INTERRUPTS();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
Loading…
Reference in New Issue