diff options
Diffstat (limited to 'nsprpub/pr/tests/multiwait.c')
-rw-r--r-- | nsprpub/pr/tests/multiwait.c | 295 |
1 files changed, 188 insertions, 107 deletions
diff --git a/nsprpub/pr/tests/multiwait.c b/nsprpub/pr/tests/multiwait.c index 61b08df15..243b315e8 100644 --- a/nsprpub/pr/tests/multiwait.c +++ b/nsprpub/pr/tests/multiwait.c @@ -45,7 +45,9 @@ static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50; ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__)) static void _MW_Assert(const char *s, const char *file, PRIntn ln) { - if (NULL != debug) PL_FPrintError(debug, NULL); + if (NULL != debug) { + PL_FPrintError(debug, NULL); + } PR_Assert(s, file, ln); } /* _MW_Assert */ #else @@ -56,7 +58,8 @@ static void PrintRecvDesc(PRRecvWait *desc, const char *msg) { const char *tag[] = { "PR_MW_INTERRUPT", "PR_MW_TIMEOUT", - "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"}; + "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING" + }; PR_fprintf( debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n", msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout); @@ -75,8 +78,9 @@ static Shared *MakeShared(const char *title) static void DestroyShared(Shared *shared) { PRStatus rv; - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: destroying group\n", shared->title); + } rv = PR_DestroyWaitGroup(shared->group); MW_ASSERT(PR_SUCCESS == rv); PR_DestroyLock(shared->list_lock); @@ -96,18 +100,21 @@ static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout) PR_AtomicIncrement(&desc_allocated); - if (verbosity > chatty) + if (verbosity > chatty) { PrintRecvDesc(desc_out, "Allocated"); + } return desc_out; } /* CreateRecvWait */ static void DestroyRecvWait(PRRecvWait *desc_out) { - if (verbosity > chatty) + if (verbosity > chatty) { PrintRecvDesc(desc_out, "Destroying"); + } PR_Close(desc_out->fd); - if (NULL != desc_out->buffer.start) + if (NULL != desc_out->buffer.start) { PR_DELETE(desc_out->buffer.start); + } PR_Free(desc_out); (void)PR_AtomicDecrement(&desc_allocated); } /* DestroyRecvWait */ @@ -116,13 +123,16 @@ static void CancelGroup(Shared *shared) { PRRecvWait *desc_out; - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title); + } do { desc_out = PR_CancelWaitGroup(shared->group); - if (NULL != desc_out) DestroyRecvWait(desc_out); + if (NULL != desc_out) { + DestroyRecvWait(desc_out); + } } while (NULL != desc_out); MW_ASSERT(0 == desc_allocated); @@ -139,11 +149,14 @@ static void PR_CALLBACK ClientThread(void* arg) Shared *shared = (Shared*)arg; PRFileDesc *server = PR_NewTCPSocket(); if ((NULL == server) - && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return; + && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) { + return; + } MW_ASSERT(NULL != server); - if (verbosity > chatty) + if (verbosity > chatty) { PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server); + } /* Initialize the buffer so that Purify won't complain */ memset(buffer, 0, sizeof(buffer)); @@ -151,33 +164,40 @@ static void PR_CALLBACK ClientThread(void* arg) rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address); MW_ASSERT(PR_SUCCESS == rv); - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: Client opening connection\n", shared->title); + } rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT); if (PR_FAILURE == rv) { - if (verbosity > silent) PL_FPrintError(debug, "Client connect failed"); + if (verbosity > silent) { + PL_FPrintError(debug, "Client connect failed"); + } return; } while (ops_done < ops_required) { bytes = PR_Send( - server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); - if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; + server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); + if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) { + break; + } MW_ASSERT(sizeof(buffer) == bytes); if (verbosity > chatty) PR_fprintf( debug, "%s: Client sent %d bytes\n", shared->title, sizeof(buffer)); bytes = PR_Recv( - server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); + server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); if (verbosity > chatty) PR_fprintf( debug, "%s: Client received %d bytes\n", shared->title, sizeof(buffer)); - if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; + if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) { + break; + } MW_ASSERT(sizeof(buffer) == bytes); PR_Sleep(shared->timeout); } @@ -196,12 +216,16 @@ static void OneInThenCancelled(Shared *shared) desc_in->fd = PR_NewTCPSocket(); desc_in->timeout = shared->timeout; - if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); + if (verbosity > chatty) { + PrintRecvDesc(desc_in, "Adding desc"); + } rv = PR_AddWaitFileDesc(shared->group, desc_in); MW_ASSERT(PR_SUCCESS == rv); - if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling"); + if (verbosity > chatty) { + PrintRecvDesc(desc_in, "Cancelling"); + } rv = PR_CancelWaitFileDesc(shared->group, desc_in); MW_ASSERT(PR_SUCCESS == rv); @@ -209,13 +233,16 @@ static void OneInThenCancelled(Shared *shared) MW_ASSERT(desc_out == desc_in); MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome); MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); - if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); + if (verbosity > chatty) { + PrintRecvDesc(desc_out, "Ready"); + } rv = PR_Close(desc_in->fd); MW_ASSERT(PR_SUCCESS == rv); - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: destroying group\n", shared->title); + } PR_DELETE(desc_in); } /* OneInThenCancelled */ @@ -228,7 +255,9 @@ static void OneOpOneThread(Shared *shared) desc_in->fd = PR_NewTCPSocket(); desc_in->timeout = shared->timeout; - if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); + if (verbosity > chatty) { + PrintRecvDesc(desc_in, "Adding desc"); + } rv = PR_AddWaitFileDesc(shared->group, desc_in); MW_ASSERT(PR_SUCCESS == rv); @@ -236,7 +265,9 @@ static void OneOpOneThread(Shared *shared) MW_ASSERT(desc_out == desc_in); MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); - if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); + if (verbosity > chatty) { + PrintRecvDesc(desc_out, "Ready"); + } rv = PR_Close(desc_in->fd); MW_ASSERT(PR_SUCCESS == rv); @@ -251,8 +282,9 @@ static void ManyOpOneThread(Shared *shared) PRRecvWait *desc_in; PRRecvWait *desc_out; - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects); + } for (index = 0; index < wait_objects; ++index) { @@ -267,7 +299,9 @@ static void ManyOpOneThread(Shared *shared) desc_out = PR_WaitRecvReady(shared->group); MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); - if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding"); + if (verbosity > chatty) { + PrintRecvDesc(desc_out, "Ready/readding"); + } rv = PR_AddWaitFileDesc(shared->group, desc_out); MW_ASSERT(PR_SUCCESS == rv); (void)PR_AtomicIncrement(&ops_done); @@ -287,18 +321,26 @@ static void PR_CALLBACK SomeOpsThread(void *arg) if (NULL == desc_out) { MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); - if (verbosity > quiet) PR_fprintf(debug, "Aborted\n"); + if (verbosity > quiet) { + PR_fprintf(debug, "Aborted\n"); + } break; } MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); - if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); + if (verbosity > chatty) { + PrintRecvDesc(desc_out, "Ready"); + } - if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding"); + if (verbosity > chatty) { + PrintRecvDesc(desc_out, "Re-Adding"); + } desc_out->timeout = shared->timeout; rv = PR_AddWaitFileDesc(shared->group, desc_out); PR_AtomicIncrement(&ops_done); - if (ops_done > ops_required) break; + if (ops_done > ops_required) { + break; + } } while (PR_SUCCESS == rv); MW_ASSERT(PR_SUCCESS == rv); } /* SomeOpsThread */ @@ -314,19 +356,21 @@ static void SomeOpsSomeThreads(Shared *shared) /* Create some threads */ - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: creating threads\n", shared->title); + } for (index = 0; index < worker_threads; ++index) { thread[index] = PR_CreateThread( - PR_USER_THREAD, SomeOpsThread, shared, - PR_PRIORITY_HIGH, thread_scope, - PR_JOINABLE_THREAD, 16 * 1024); + PR_USER_THREAD, SomeOpsThread, shared, + PR_PRIORITY_HIGH, thread_scope, + PR_JOINABLE_THREAD, 16 * 1024); } /* then create some operations */ - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: creating desc\n", shared->title); + } for (index = 0; index < wait_objects; ++index) { desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); @@ -334,12 +378,16 @@ static void SomeOpsSomeThreads(Shared *shared) MW_ASSERT(PR_SUCCESS == rv); } - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: sleeping\n", shared->title); - while (ops_done < ops_required) PR_Sleep(shared->timeout); + } + while (ops_done < ops_required) { + PR_Sleep(shared->timeout); + } - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title); + } for (index = 0; index < worker_threads; ++index) { rv = PR_Interrupt(thread[index]); @@ -361,19 +409,25 @@ static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc) debug, "%s: Service received %d bytes\n", shared->title, desc->bytesRecv); - if (0 == desc->bytesRecv) goto quitting; + if (0 == desc->bytesRecv) { + goto quitting; + } if ((-1 == desc->bytesRecv) - && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted; + && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) { + goto aborted; + } bytes_out = PR_Send( - desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout); + desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout); if (verbosity > chatty) PR_fprintf( debug, "%s: Service sent %d bytes\n", shared->title, bytes_out); if ((-1 == bytes_out) - && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted; + && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) { + goto aborted; + } MW_ASSERT(bytes_out == desc->bytesRecv); return PR_SUCCESS; @@ -393,8 +447,9 @@ static void PR_CALLBACK ServiceThread(void *arg) if (NULL != desc_out) { desc_out->timeout = PR_INTERVAL_NO_TIMEOUT; - if (verbosity > chatty) + if (verbosity > chatty) { PrintRecvDesc(desc_out, "Service re-adding"); + } rv = PR_AddWaitFileDesc(shared->group, desc_out); MW_ASSERT(PR_SUCCESS == rv); } @@ -411,8 +466,9 @@ static void PR_CALLBACK ServiceThread(void *arg) case PR_MW_SUCCESS: { PR_AtomicIncrement(&ops_done); - if (verbosity > chatty) + if (verbosity > chatty) { PrintRecvDesc(desc_out, "Service ready"); + } rv = ServiceRequest(shared, desc_out); break; } @@ -423,15 +479,18 @@ static void PR_CALLBACK ServiceThread(void *arg) case PR_MW_TIMEOUT: MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); case PR_MW_FAILURE: - if (verbosity > silent) + if (verbosity > silent) { PL_FPrintError(debug, "RecvReady failure"); + } break; default: break; } } while (PR_SUCCESS == rv); - if (NULL != desc_out) DestroyRecvWait(desc_out); + if (NULL != desc_out) { + DestroyRecvWait(desc_out); + } } /* ServiceThread */ @@ -451,12 +510,14 @@ static void PR_CALLBACK EnumerationThread(void *arg) desc = NULL; while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc))) { - if (verbosity > chatty) PrintRecvDesc(desc, shared->title); + if (verbosity > chatty) { + PrintRecvDesc(desc, shared->title); + } count += 1; } if (verbosity > silent) PR_fprintf(debug, - "%s Enumerated %d objects\n", shared->title, count); + "%s Enumerated %d objects\n", shared->title, count); } MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); @@ -477,14 +538,15 @@ static void PR_CALLBACK ServerThread(void *arg) PRNetAddr server_address, client_address; worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title); + } for (index = 0; index < worker_threads; ++index) { worker_thread[index] = PR_CreateThread( - PR_USER_THREAD, ServiceThread, shared, - PR_PRIORITY_HIGH, thread_scope, - PR_JOINABLE_THREAD, 16 * 1024); + PR_USER_THREAD, ServiceThread, shared, + PR_PRIORITY_HIGH, thread_scope, + PR_JOINABLE_THREAD, 16 * 1024); } rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address); @@ -499,12 +561,15 @@ static void PR_CALLBACK ServerThread(void *arg) rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv); while (ops_done < ops_required) { - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: Server accepting connection\n", shared->title); + } service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT); if (NULL == service) { - if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break; + if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) { + break; + } PL_PrintError("Accept failed"); MW_ASSERT(PR_FALSE && "Accept failed"); } @@ -512,15 +577,17 @@ static void PR_CALLBACK ServerThread(void *arg) { desc_in = CreateRecvWait(service, shared->timeout); desc_in->timeout = PR_INTERVAL_NO_TIMEOUT; - if (verbosity > chatty) + if (verbosity > chatty) { PrintRecvDesc(desc_in, "Service adding"); + } rv = PR_AddWaitFileDesc(shared->group, desc_in); MW_ASSERT(PR_SUCCESS == rv); } } - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title); + } for (index = 0; index < worker_threads; ++index) { rv = PR_Interrupt(worker_thread[index]); @@ -552,41 +619,48 @@ static void RealOneGroupIO(Shared *shared) PRIntn index; PRThread *server_thread, *enumeration_thread, **client_thread; - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: creating server_thread\n", shared->title); + } server_thread = PR_CreateThread( - PR_USER_THREAD, ServerThread, shared, - PR_PRIORITY_HIGH, thread_scope, - PR_JOINABLE_THREAD, 16 * 1024); + PR_USER_THREAD, ServerThread, shared, + PR_PRIORITY_HIGH, thread_scope, + PR_JOINABLE_THREAD, 16 * 1024); - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title); + } enumeration_thread = PR_CreateThread( - PR_USER_THREAD, EnumerationThread, shared, - PR_PRIORITY_HIGH, thread_scope, - PR_JOINABLE_THREAD, 16 * 1024); + PR_USER_THREAD, EnumerationThread, shared, + PR_PRIORITY_HIGH, thread_scope, + PR_JOINABLE_THREAD, 16 * 1024); - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title); + } PR_Sleep(5 * shared->timeout); - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: creating client_threads\n", shared->title); + } client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads); for (index = 0; index < client_threads; ++index) { client_thread[index] = PR_CreateThread( - PR_USER_THREAD, ClientThread, shared, - PR_PRIORITY_NORMAL, thread_scope, - PR_JOINABLE_THREAD, 16 * 1024); + PR_USER_THREAD, ClientThread, shared, + PR_PRIORITY_NORMAL, thread_scope, + PR_JOINABLE_THREAD, 16 * 1024); } - while (ops_done < ops_required) PR_Sleep(shared->timeout); + while (ops_done < ops_required) { + PR_Sleep(shared->timeout); + } - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title); + } for (index = 0; index < client_threads; ++index) { rv = PR_Interrupt(client_thread[index]); @@ -596,15 +670,17 @@ static void RealOneGroupIO(Shared *shared) } PR_DELETE(client_thread); - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title); + } rv = PR_Interrupt(enumeration_thread); MW_ASSERT(PR_SUCCESS == rv); rv = PR_JoinThread(enumeration_thread); MW_ASSERT(PR_SUCCESS == rv); - if (verbosity > quiet) + if (verbosity > quiet) { PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title); + } rv = PR_Interrupt(server_thread); MW_ASSERT(PR_SUCCESS == rv); rv = PR_JoinThread(server_thread); @@ -617,8 +693,9 @@ static void RunThisOne( Shared *shared; if ((NULL == test_name) || (0 == PL_strcmp(name, test_name))) { - if (verbosity > silent) + if (verbosity > silent) { PR_fprintf(debug, "%s()\n", name); + } shared = MakeShared(name); ops_done = 0; func(shared); /* run the test */ @@ -629,8 +706,7 @@ static void RunThisOne( static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta) { - PRIntn verbage = (PRIntn)verbosity; - return (Verbosity)(verbage += delta); + return (Verbosity)(((PRIntn)verbosity) + delta); } /* ChangeVerbosity */ int main(int argc, char **argv) @@ -641,46 +717,51 @@ int main(int argc, char **argv) while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) { - if (PL_OPT_BAD == os) continue; + if (PL_OPT_BAD == os) { + continue; + } switch (opt->option) { - case 0: - test_name = opt->value; - break; - case 'd': /* debug mode */ - if (verbosity < noisy) - verbosity = ChangeVerbosity(verbosity, 1); - break; - case 'q': /* debug mode */ - if (verbosity > silent) - verbosity = ChangeVerbosity(verbosity, -1); - break; - case 'G': /* use global threads */ - thread_scope = PR_GLOBAL_THREAD; - break; - case 'c': /* number of client threads */ - client_threads = atoi(opt->value); - break; - case 'o': /* operations to compelete */ - ops_required = atoi(opt->value); - break; - case 'p': /* default port */ - default_port = atoi(opt->value); - break; - case 't': /* number of threads waiting */ - worker_threads = atoi(opt->value); - break; - case 'w': /* number of wait objects */ - wait_objects = atoi(opt->value); - break; - default: - break; + case 0: + test_name = opt->value; + break; + case 'd': /* debug mode */ + if (verbosity < noisy) { + verbosity = ChangeVerbosity(verbosity, 1); + } + break; + case 'q': /* debug mode */ + if (verbosity > silent) { + verbosity = ChangeVerbosity(verbosity, -1); + } + break; + case 'G': /* use global threads */ + thread_scope = PR_GLOBAL_THREAD; + break; + case 'c': /* number of client threads */ + client_threads = atoi(opt->value); + break; + case 'o': /* operations to compelete */ + ops_required = atoi(opt->value); + break; + case 'p': /* default port */ + default_port = atoi(opt->value); + break; + case 't': /* number of threads waiting */ + worker_threads = atoi(opt->value); + break; + case 'w': /* number of wait objects */ + wait_objects = atoi(opt->value); + break; + default: + break; } } PL_DestroyOptState(opt); - if (verbosity > 0) + if (verbosity > 0) { debug = PR_GetSpecialFD(PR_StandardError); + } RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name); RunThisOne(OneOpOneThread, "OneOpOneThread", test_name); |