Don't be so trusting that shm_toc_lookup() will always succeed.
Given the possibility of race conditions and so on, it seems entirely unsafe to just assume that shm_toc_lookup() always finds the key it's looking for --- but that was exactly what all but one call site were doing. To fix, add a "bool noError" argument, similarly to what we have in many other functions, and throw an error on an unexpected lookup failure. Remove now-redundant Asserts that a rather random subset of call sites had. I doubt this will throw any light on buildfarm member lorikeet's recent failures, because if an unnoticed lookup failure were involved, you'd kind of expect a null-pointer-dereference crash rather than the observed symptom. But you never know ... and this is better coding practice even if it never catches anything. Discussion: https://postgr.es/m/9697.1496675981@sss.pgh.pa.us
This commit is contained in:
parent
af51fea039
commit
d466335064
|
@ -392,12 +392,12 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Reset a few bits of fixed parallel state to a clean state. */
|
/* Reset a few bits of fixed parallel state to a clean state. */
|
||||||
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
|
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
|
||||||
fps->last_xlog_end = 0;
|
fps->last_xlog_end = 0;
|
||||||
|
|
||||||
/* Recreate error queues. */
|
/* Recreate error queues. */
|
||||||
error_queue_space =
|
error_queue_space =
|
||||||
shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
|
shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
|
||||||
for (i = 0; i < pcxt->nworkers; ++i)
|
for (i = 0; i < pcxt->nworkers; ++i)
|
||||||
{
|
{
|
||||||
char *start;
|
char *start;
|
||||||
|
@ -536,7 +536,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
|
||||||
{
|
{
|
||||||
FixedParallelState *fps;
|
FixedParallelState *fps;
|
||||||
|
|
||||||
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
|
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
|
||||||
if (fps->last_xlog_end > XactLastRecEnd)
|
if (fps->last_xlog_end > XactLastRecEnd)
|
||||||
XactLastRecEnd = fps->last_xlog_end;
|
XactLastRecEnd = fps->last_xlog_end;
|
||||||
}
|
}
|
||||||
|
@ -973,8 +973,7 @@ ParallelWorkerMain(Datum main_arg)
|
||||||
errmsg("invalid magic number in dynamic shared memory segment")));
|
errmsg("invalid magic number in dynamic shared memory segment")));
|
||||||
|
|
||||||
/* Look up fixed parallel state. */
|
/* Look up fixed parallel state. */
|
||||||
fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);
|
fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
|
||||||
Assert(fps != NULL);
|
|
||||||
MyFixedParallelState = fps;
|
MyFixedParallelState = fps;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -983,7 +982,7 @@ ParallelWorkerMain(Datum main_arg)
|
||||||
* errors that happen here will not be reported back to the process that
|
* errors that happen here will not be reported back to the process that
|
||||||
* requested that this worker be launched.
|
* requested that this worker be launched.
|
||||||
*/
|
*/
|
||||||
error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE);
|
error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
|
||||||
mq = (shm_mq *) (error_queue_space +
|
mq = (shm_mq *) (error_queue_space +
|
||||||
ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
|
ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
|
||||||
shm_mq_set_sender(mq, MyProc);
|
shm_mq_set_sender(mq, MyProc);
|
||||||
|
@ -1027,8 +1026,7 @@ ParallelWorkerMain(Datum main_arg)
|
||||||
* this before restoring GUCs, because the libraries might define custom
|
* this before restoring GUCs, because the libraries might define custom
|
||||||
* variables.
|
* variables.
|
||||||
*/
|
*/
|
||||||
libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY);
|
libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
|
||||||
Assert(libraryspace != NULL);
|
|
||||||
RestoreLibraryState(libraryspace);
|
RestoreLibraryState(libraryspace);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1036,8 +1034,7 @@ ParallelWorkerMain(Datum main_arg)
|
||||||
* loading an additional library, though most likely the entry point is in
|
* loading an additional library, though most likely the entry point is in
|
||||||
* the core backend or in a library we just loaded.
|
* the core backend or in a library we just loaded.
|
||||||
*/
|
*/
|
||||||
entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT);
|
entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
|
||||||
Assert(entrypointstate != NULL);
|
|
||||||
library_name = entrypointstate;
|
library_name = entrypointstate;
|
||||||
function_name = entrypointstate + strlen(library_name) + 1;
|
function_name = entrypointstate + strlen(library_name) + 1;
|
||||||
|
|
||||||
|
@ -1054,30 +1051,26 @@ ParallelWorkerMain(Datum main_arg)
|
||||||
SetClientEncoding(GetDatabaseEncoding());
|
SetClientEncoding(GetDatabaseEncoding());
|
||||||
|
|
||||||
/* Restore GUC values from launching backend. */
|
/* Restore GUC values from launching backend. */
|
||||||
gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
|
gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
|
||||||
Assert(gucspace != NULL);
|
|
||||||
StartTransactionCommand();
|
StartTransactionCommand();
|
||||||
RestoreGUCState(gucspace);
|
RestoreGUCState(gucspace);
|
||||||
CommitTransactionCommand();
|
CommitTransactionCommand();
|
||||||
|
|
||||||
/* Crank up a transaction state appropriate to a parallel worker. */
|
/* Crank up a transaction state appropriate to a parallel worker. */
|
||||||
tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE);
|
tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
|
||||||
StartParallelWorkerTransaction(tstatespace);
|
StartParallelWorkerTransaction(tstatespace);
|
||||||
|
|
||||||
/* Restore combo CID state. */
|
/* Restore combo CID state. */
|
||||||
combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID);
|
combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
|
||||||
Assert(combocidspace != NULL);
|
|
||||||
RestoreComboCIDState(combocidspace);
|
RestoreComboCIDState(combocidspace);
|
||||||
|
|
||||||
/* Restore transaction snapshot. */
|
/* Restore transaction snapshot. */
|
||||||
tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT);
|
tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
|
||||||
Assert(tsnapspace != NULL);
|
|
||||||
RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
|
RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
|
||||||
fps->parallel_master_pgproc);
|
fps->parallel_master_pgproc);
|
||||||
|
|
||||||
/* Restore active snapshot. */
|
/* Restore active snapshot. */
|
||||||
asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT);
|
asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
|
||||||
Assert(asnapspace != NULL);
|
|
||||||
PushActiveSnapshot(RestoreSnapshot(asnapspace));
|
PushActiveSnapshot(RestoreSnapshot(asnapspace));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -341,7 +341,7 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
|
||||||
mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
|
mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
|
||||||
pcxt->nworkers));
|
pcxt->nworkers));
|
||||||
else
|
else
|
||||||
tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
|
tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
|
||||||
|
|
||||||
/* Create the queues, and become the receiver for each. */
|
/* Create the queues, and become the receiver for each. */
|
||||||
for (i = 0; i < pcxt->nworkers; ++i)
|
for (i = 0; i < pcxt->nworkers; ++i)
|
||||||
|
@ -684,7 +684,7 @@ ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
|
||||||
char *mqspace;
|
char *mqspace;
|
||||||
shm_mq *mq;
|
shm_mq *mq;
|
||||||
|
|
||||||
mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE);
|
mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
|
||||||
mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
|
mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
|
||||||
mq = (shm_mq *) mqspace;
|
mq = (shm_mq *) mqspace;
|
||||||
shm_mq_set_sender(mq, MyProc);
|
shm_mq_set_sender(mq, MyProc);
|
||||||
|
@ -705,14 +705,14 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
|
||||||
char *queryString;
|
char *queryString;
|
||||||
|
|
||||||
/* Get the query string from shared memory */
|
/* Get the query string from shared memory */
|
||||||
queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT);
|
queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
|
||||||
|
|
||||||
/* Reconstruct leader-supplied PlannedStmt. */
|
/* Reconstruct leader-supplied PlannedStmt. */
|
||||||
pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT);
|
pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
|
||||||
pstmt = (PlannedStmt *) stringToNode(pstmtspace);
|
pstmt = (PlannedStmt *) stringToNode(pstmtspace);
|
||||||
|
|
||||||
/* Reconstruct ParamListInfo. */
|
/* Reconstruct ParamListInfo. */
|
||||||
paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS);
|
paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS, false);
|
||||||
paramLI = RestoreParamList(¶mspace);
|
paramLI = RestoreParamList(¶mspace);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -843,7 +843,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
|
||||||
|
|
||||||
/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
|
/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
|
||||||
receiver = ExecParallelGetReceiver(seg, toc);
|
receiver = ExecParallelGetReceiver(seg, toc);
|
||||||
instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION);
|
instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
|
||||||
if (instrumentation != NULL)
|
if (instrumentation != NULL)
|
||||||
instrument_options = instrumentation->instrument_options;
|
instrument_options = instrumentation->instrument_options;
|
||||||
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
|
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
|
||||||
|
@ -858,7 +858,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
|
||||||
InstrStartParallelQuery();
|
InstrStartParallelQuery();
|
||||||
|
|
||||||
/* Attach to the dynamic shared memory area. */
|
/* Attach to the dynamic shared memory area. */
|
||||||
area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA);
|
area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
|
||||||
area = dsa_attach_in_place(area_space, seg);
|
area = dsa_attach_in_place(area_space, seg);
|
||||||
|
|
||||||
/* Start up the executor */
|
/* Start up the executor */
|
||||||
|
@ -875,7 +875,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
|
||||||
ExecutorFinish(queryDesc);
|
ExecutorFinish(queryDesc);
|
||||||
|
|
||||||
/* Report buffer usage during parallel execution. */
|
/* Report buffer usage during parallel execution. */
|
||||||
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE);
|
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
|
||||||
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
|
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
|
||||||
|
|
||||||
/* Report instrumentation data if any instrumentation options are set. */
|
/* Report instrumentation data if any instrumentation options are set. */
|
||||||
|
|
|
@ -1005,7 +1005,7 @@ ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc)
|
||||||
ParallelBitmapHeapState *pstate;
|
ParallelBitmapHeapState *pstate;
|
||||||
Snapshot snapshot;
|
Snapshot snapshot;
|
||||||
|
|
||||||
pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
|
pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
|
||||||
node->pstate = pstate;
|
node->pstate = pstate;
|
||||||
|
|
||||||
snapshot = RestoreSnapshot(pstate->phs_snapshot_data);
|
snapshot = RestoreSnapshot(pstate->phs_snapshot_data);
|
||||||
|
|
|
@ -194,7 +194,7 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
|
||||||
int plan_node_id = node->ss.ps.plan->plan_node_id;
|
int plan_node_id = node->ss.ps.plan->plan_node_id;
|
||||||
void *coordinate;
|
void *coordinate;
|
||||||
|
|
||||||
coordinate = shm_toc_lookup(toc, plan_node_id);
|
coordinate = shm_toc_lookup(toc, plan_node_id, false);
|
||||||
methods->InitializeWorkerCustomScan(node, toc, coordinate);
|
methods->InitializeWorkerCustomScan(node, toc, coordinate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -344,7 +344,7 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
|
||||||
int plan_node_id = node->ss.ps.plan->plan_node_id;
|
int plan_node_id = node->ss.ps.plan->plan_node_id;
|
||||||
void *coordinate;
|
void *coordinate;
|
||||||
|
|
||||||
coordinate = shm_toc_lookup(toc, plan_node_id);
|
coordinate = shm_toc_lookup(toc, plan_node_id, false);
|
||||||
fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
|
fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -676,7 +676,7 @@ ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, shm_toc *toc)
|
||||||
{
|
{
|
||||||
ParallelIndexScanDesc piscan;
|
ParallelIndexScanDesc piscan;
|
||||||
|
|
||||||
piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
|
piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
|
||||||
node->ioss_ScanDesc =
|
node->ioss_ScanDesc =
|
||||||
index_beginscan_parallel(node->ss.ss_currentRelation,
|
index_beginscan_parallel(node->ss.ss_currentRelation,
|
||||||
node->ioss_RelationDesc,
|
node->ioss_RelationDesc,
|
||||||
|
|
|
@ -1714,7 +1714,7 @@ ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc)
|
||||||
{
|
{
|
||||||
ParallelIndexScanDesc piscan;
|
ParallelIndexScanDesc piscan;
|
||||||
|
|
||||||
piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
|
piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
|
||||||
node->iss_ScanDesc =
|
node->iss_ScanDesc =
|
||||||
index_beginscan_parallel(node->ss.ss_currentRelation,
|
index_beginscan_parallel(node->ss.ss_currentRelation,
|
||||||
node->iss_RelationDesc,
|
node->iss_RelationDesc,
|
||||||
|
|
|
@ -332,7 +332,7 @@ ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc)
|
||||||
{
|
{
|
||||||
ParallelHeapScanDesc pscan;
|
ParallelHeapScanDesc pscan;
|
||||||
|
|
||||||
pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
|
pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
|
||||||
node->ss.ss_currentScanDesc =
|
node->ss.ss_currentScanDesc =
|
||||||
heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
|
heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
|
||||||
}
|
}
|
||||||
|
|
|
@ -208,6 +208,9 @@ shm_toc_insert(shm_toc *toc, uint64 key, void *address)
|
||||||
/*
|
/*
|
||||||
* Look up a TOC entry.
|
* Look up a TOC entry.
|
||||||
*
|
*
|
||||||
|
* If the key is not found, returns NULL if noError is true, otherwise
|
||||||
|
* throws elog(ERROR).
|
||||||
|
*
|
||||||
* Unlike the other functions in this file, this operation acquires no lock;
|
* Unlike the other functions in this file, this operation acquires no lock;
|
||||||
* it uses only barriers. It probably wouldn't hurt concurrency very much even
|
* it uses only barriers. It probably wouldn't hurt concurrency very much even
|
||||||
* if it did get a lock, but since it's reasonably likely that a group of
|
* if it did get a lock, but since it's reasonably likely that a group of
|
||||||
|
@ -215,7 +218,7 @@ shm_toc_insert(shm_toc *toc, uint64 key, void *address)
|
||||||
* right around the same time, there seems to be some value in avoiding it.
|
* right around the same time, there seems to be some value in avoiding it.
|
||||||
*/
|
*/
|
||||||
void *
|
void *
|
||||||
shm_toc_lookup(shm_toc *toc, uint64 key)
|
shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
|
||||||
{
|
{
|
||||||
uint64 nentry;
|
uint64 nentry;
|
||||||
uint64 i;
|
uint64 i;
|
||||||
|
@ -226,10 +229,15 @@ shm_toc_lookup(shm_toc *toc, uint64 key)
|
||||||
|
|
||||||
/* Now search for a matching entry. */
|
/* Now search for a matching entry. */
|
||||||
for (i = 0; i < nentry; ++i)
|
for (i = 0; i < nentry; ++i)
|
||||||
|
{
|
||||||
if (toc->toc_entry[i].key == key)
|
if (toc->toc_entry[i].key == key)
|
||||||
return ((char *) toc) + toc->toc_entry[i].offset;
|
return ((char *) toc) + toc->toc_entry[i].offset;
|
||||||
|
}
|
||||||
|
|
||||||
/* No matching entry was found. */
|
/* No matching entry was found. */
|
||||||
|
if (!noError)
|
||||||
|
elog(ERROR, "could not find key " UINT64_FORMAT " in shm TOC at %p",
|
||||||
|
key, toc);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@ extern shm_toc *shm_toc_attach(uint64 magic, void *address);
|
||||||
extern void *shm_toc_allocate(shm_toc *toc, Size nbytes);
|
extern void *shm_toc_allocate(shm_toc *toc, Size nbytes);
|
||||||
extern Size shm_toc_freespace(shm_toc *toc);
|
extern Size shm_toc_freespace(shm_toc *toc);
|
||||||
extern void shm_toc_insert(shm_toc *toc, uint64 key, void *address);
|
extern void shm_toc_insert(shm_toc *toc, uint64 key, void *address);
|
||||||
extern void *shm_toc_lookup(shm_toc *toc, uint64 key);
|
extern void *shm_toc_lookup(shm_toc *toc, uint64 key, bool noError);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Tools for estimating how large a chunk of shared memory will be needed
|
* Tools for estimating how large a chunk of shared memory will be needed
|
||||||
|
|
|
@ -95,7 +95,7 @@ test_shm_mq_main(Datum main_arg)
|
||||||
* find it. Our worker number gives our identity: there may be just one
|
* find it. Our worker number gives our identity: there may be just one
|
||||||
* worker involved in this parallel operation, or there may be many.
|
* worker involved in this parallel operation, or there may be many.
|
||||||
*/
|
*/
|
||||||
hdr = shm_toc_lookup(toc, 0);
|
hdr = shm_toc_lookup(toc, 0, false);
|
||||||
SpinLockAcquire(&hdr->mutex);
|
SpinLockAcquire(&hdr->mutex);
|
||||||
myworkernumber = ++hdr->workers_attached;
|
myworkernumber = ++hdr->workers_attached;
|
||||||
SpinLockRelease(&hdr->mutex);
|
SpinLockRelease(&hdr->mutex);
|
||||||
|
@ -158,10 +158,10 @@ attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber,
|
||||||
shm_mq *inq;
|
shm_mq *inq;
|
||||||
shm_mq *outq;
|
shm_mq *outq;
|
||||||
|
|
||||||
inq = shm_toc_lookup(toc, myworkernumber);
|
inq = shm_toc_lookup(toc, myworkernumber, false);
|
||||||
shm_mq_set_receiver(inq, MyProc);
|
shm_mq_set_receiver(inq, MyProc);
|
||||||
*inqhp = shm_mq_attach(inq, seg, NULL);
|
*inqhp = shm_mq_attach(inq, seg, NULL);
|
||||||
outq = shm_toc_lookup(toc, myworkernumber + 1);
|
outq = shm_toc_lookup(toc, myworkernumber + 1, false);
|
||||||
shm_mq_set_sender(outq, MyProc);
|
shm_mq_set_sender(outq, MyProc);
|
||||||
*outqhp = shm_mq_attach(outq, seg, NULL);
|
*outqhp = shm_mq_attach(outq, seg, NULL);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue