/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "prio.h"
#include "prmem.h"
#include "prprf.h"
#include "prlog.h"
#include "prerror.h"
#include "prnetdb.h"
#include "prthread.h"

#include "plerror.h"
#include "plgetopt.h"
#include "prwin16.h"

#include <stdlib.h>
#include <string.h>

/*
** Testing layering of I/O
**
**      The layered server
** A thread that acts as a server. It creates a TCP listener with a dummy
** layer pushed on top. Then listens for incoming connections. Each connection
** request for connection will be layered as well, accept one request, echo
** it back and close.
**
**      The layered client
** Pretty much what you'd expect.
*/

static PRFileDesc *logFile;
static PRDescIdentity identity;
static PRNetAddr server_address;

static PRIOMethods myMethods;

typedef enum {rcv_get_debit, rcv_send_credit, rcv_data} RcvState;
typedef enum {xmt_send_debit, xmt_recv_credit, xmt_data} XmtState;

struct PRFilePrivate
{
    RcvState rcvstate;
    XmtState xmtstate;
    PRInt32 rcvreq, rcvinprogress;
    PRInt32 xmtreq, xmtinprogress;
};

typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;

static PRIntn minor_iterations = 5;
static PRIntn major_iterations = 1;
static Verbosity verbosity = quiet;
static PRUint16 default_port = 12273;

static PRFileDesc *PushLayer(PRFileDesc *stack)
{
    PRStatus rv;
    PRFileDesc *layer = PR_CreateIOLayerStub(identity, &myMethods);
    layer->secret = PR_NEWZAP(PRFilePrivate);
    rv = PR_PushIOLayer(stack, PR_GetLayersIdentity(stack), layer);
    PR_ASSERT(PR_SUCCESS == rv);
    if (verbosity > quiet) {
        PR_fprintf(logFile, "Pushed layer(0x%x) onto stack(0x%x)\n", layer, stack);
    }
    return stack;
}  /* PushLayer */

static PRFileDesc *PopLayer(PRFileDesc *stack)
{
    PRFileDesc *popped = PR_PopIOLayer(stack, identity);
    if (verbosity > quiet) {
        PR_fprintf(logFile, "Popped layer(0x%x) from stack(0x%x)\n", popped, stack);
    }
    PR_DELETE(popped->secret);
    popped->dtor(popped);
    return stack;
}  /* PopLayer */

static void PR_CALLBACK Client(void *arg)
{
    PRStatus rv;
    PRIntn mits;
    PRInt32 ready;
    PRUint8 buffer[100];
    PRPollDesc polldesc;
    PRIntn empty_flags = 0;
    PRIntn bytes_read, bytes_sent;
    PRFileDesc *stack = (PRFileDesc*)arg;

    /* Initialize the buffer so that Purify won't complain */
    memset(buffer, 0, sizeof(buffer));

    rv = PR_Connect(stack, &server_address, PR_INTERVAL_NO_TIMEOUT);
    if ((PR_FAILURE == rv) && (PR_IN_PROGRESS_ERROR == PR_GetError()))
    {
        if (verbosity > quiet) {
            PR_fprintf(logFile, "Client connect 'in progress'\n");
        }
        do
        {
            polldesc.fd = stack;
            polldesc.out_flags = 0;
            polldesc.in_flags = PR_POLL_WRITE | PR_POLL_EXCEPT;
            ready = PR_Poll(&polldesc, 1, PR_INTERVAL_NO_TIMEOUT);
            if ((1 != ready)  /* if not 1, then we're dead */
                || (0 == (polldesc.in_flags & polldesc.out_flags)))
            {
                PR_NOT_REACHED("Whoa!");
                break;
            }
            if (verbosity > quiet)
                PR_fprintf(
                    logFile, "Client connect 'in progress' [0x%x]\n",
                    polldesc.out_flags);
            rv = PR_GetConnectStatus(&polldesc);
            if ((PR_FAILURE == rv)
                && (PR_IN_PROGRESS_ERROR != PR_GetError())) {
                break;
            }
        } while (PR_FAILURE == rv);
    }
    PR_ASSERT(PR_SUCCESS == rv);
    if (verbosity > chatty) {
        PR_fprintf(logFile, "Client created connection\n");
    }

    for (mits = 0; mits < minor_iterations; ++mits)
    {
        bytes_sent = 0;
        if (verbosity > quiet) {
            PR_fprintf(logFile, "Client sending %d bytes\n", sizeof(buffer));
        }
        do
        {
            if (verbosity > chatty)
                PR_fprintf(
                    logFile, "Client sending %d bytes\n",
                    sizeof(buffer) - bytes_sent);
            ready = PR_Send(
                        stack, buffer + bytes_sent, sizeof(buffer) - bytes_sent,
                        empty_flags, PR_INTERVAL_NO_TIMEOUT);
            if (verbosity > chatty) {
                PR_fprintf(logFile, "Client send status [%d]\n", ready);
            }
            if (0 < ready) {
                bytes_sent += ready;
            }
            else if ((-1 == ready) && (PR_WOULD_BLOCK_ERROR == PR_GetError()))
            {
                polldesc.fd = stack;
                polldesc.out_flags = 0;
                polldesc.in_flags = PR_POLL_WRITE;
                ready = PR_Poll(&polldesc, 1, PR_INTERVAL_NO_TIMEOUT);
                if ((1 != ready)  /* if not 1, then we're dead */
                    || (0 == (polldesc.in_flags & polldesc.out_flags)))
                {
                    PR_NOT_REACHED("Whoa!");
                    break;
                }
            }
            else {
                break;
            }
        } while (bytes_sent < sizeof(buffer));
        PR_ASSERT(sizeof(buffer) == bytes_sent);

        bytes_read = 0;
        do
        {
            if (verbosity > chatty)
                PR_fprintf(
                    logFile, "Client receiving %d bytes\n",
                    bytes_sent - bytes_read);
            ready = PR_Recv(
                        stack, buffer + bytes_read, bytes_sent - bytes_read,
                        empty_flags, PR_INTERVAL_NO_TIMEOUT);
            if (verbosity > chatty)
                PR_fprintf(
                    logFile, "Client receive status [%d]\n", ready);
            if (0 < ready) {
                bytes_read += ready;
            }
            else if ((-1 == ready) && (PR_WOULD_BLOCK_ERROR == PR_GetError()))
            {
                polldesc.fd = stack;
                polldesc.out_flags = 0;
                polldesc.in_flags = PR_POLL_READ;
                ready = PR_Poll(&polldesc, 1, PR_INTERVAL_NO_TIMEOUT);
                if ((1 != ready)  /* if not 1, then we're dead */
                    || (0 == (polldesc.in_flags & polldesc.out_flags)))
                {
                    PR_NOT_REACHED("Whoa!");
                    break;
                }
            }
            else {
                break;
            }
        } while (bytes_read < bytes_sent);
        if (verbosity > chatty) {
            PR_fprintf(logFile, "Client received %d bytes\n", bytes_read);
        }
        PR_ASSERT(bytes_read == bytes_sent);
    }

    if (verbosity > quiet) {
        PR_fprintf(logFile, "Client shutting down stack\n");
    }

    rv = PR_Shutdown(stack, PR_SHUTDOWN_BOTH); PR_ASSERT(PR_SUCCESS == rv);
}  /* Client */

static void PR_CALLBACK Server(void *arg)
{
    PRStatus rv;
    PRInt32 ready;
    PRUint8 buffer[100];
    PRFileDesc *service;
    PRUintn empty_flags = 0;
    struct PRPollDesc polldesc;
    PRIntn bytes_read, bytes_sent;
    PRFileDesc *stack = (PRFileDesc*)arg;
    PRNetAddr client_address;

    do
    {
        if (verbosity > chatty) {
            PR_fprintf(logFile, "Server accepting connection\n");
        }
        service = PR_Accept(stack, &client_address, PR_INTERVAL_NO_TIMEOUT);
        if (verbosity > chatty) {
            PR_fprintf(logFile, "Server accept status [0x%p]\n", service);
        }
        if ((NULL == service) && (PR_WOULD_BLOCK_ERROR == PR_GetError()))
        {
            polldesc.fd = stack;
            polldesc.out_flags = 0;
            polldesc.in_flags = PR_POLL_READ | PR_POLL_EXCEPT;
            ready = PR_Poll(&polldesc, 1, PR_INTERVAL_NO_TIMEOUT);
            if ((1 != ready)  /* if not 1, then we're dead */
                || (0 == (polldesc.in_flags & polldesc.out_flags)))
            {
                PR_NOT_REACHED("Whoa!");
                break;
            }
        }
    } while (NULL == service);
    PR_ASSERT(NULL != service);

    if (verbosity > quiet) {
        PR_fprintf(logFile, "Server accepting connection\n");
    }

    do
    {
        bytes_read = 0;
        do
        {
            if (verbosity > chatty)
                PR_fprintf(
                    logFile, "Server receiving %d bytes\n",
                    sizeof(buffer) - bytes_read);
            ready = PR_Recv(
                        service, buffer + bytes_read, sizeof(buffer) - bytes_read,
                        empty_flags, PR_INTERVAL_NO_TIMEOUT);
            if (verbosity > chatty) {
                PR_fprintf(logFile, "Server receive status [%d]\n", ready);
            }
            if (0 < ready) {
                bytes_read += ready;
            }
            else if ((-1 == ready) && (PR_WOULD_BLOCK_ERROR == PR_GetError()))
            {
                polldesc.fd = service;
                polldesc.out_flags = 0;
                polldesc.in_flags = PR_POLL_READ;
                ready = PR_Poll(&polldesc, 1, PR_INTERVAL_NO_TIMEOUT);
                if ((1 != ready)  /* if not 1, then we're dead */
                    || (0 == (polldesc.in_flags & polldesc.out_flags)))
                {
                    PR_NOT_REACHED("Whoa!");
                    break;
                }
            }
            else {
                break;
            }
        } while (bytes_read < sizeof(buffer));

        if (0 != bytes_read)
        {
            if (verbosity > chatty) {
                PR_fprintf(logFile, "Server received %d bytes\n", bytes_read);
            }
            PR_ASSERT(bytes_read > 0);

            bytes_sent = 0;
            do
            {
                ready = PR_Send(
                            service, buffer + bytes_sent, bytes_read - bytes_sent,
                            empty_flags, PR_INTERVAL_NO_TIMEOUT);
                if (0 < ready)
                {
                    bytes_sent += ready;
                }
                else if ((-1 == ready) && (PR_WOULD_BLOCK_ERROR == PR_GetError()))
                {
                    polldesc.fd = service;
                    polldesc.out_flags = 0;
                    polldesc.in_flags = PR_POLL_WRITE;
                    ready = PR_Poll(&polldesc, 1, PR_INTERVAL_NO_TIMEOUT);
                    if ((1 != ready)  /* if not 1, then we're dead */
                        || (0 == (polldesc.in_flags & polldesc.out_flags)))
                    {
                        PR_NOT_REACHED("Whoa!");
                        break;
                    }
                }
                else {
                    break;
                }
            } while (bytes_sent < bytes_read);
            PR_ASSERT(bytes_read == bytes_sent);
            if (verbosity > chatty) {
                PR_fprintf(logFile, "Server sent %d bytes\n", bytes_sent);
            }
        }
    } while (0 != bytes_read);

    if (verbosity > quiet) {
        PR_fprintf(logFile, "Server shutting down stack\n");
    }
    rv = PR_Shutdown(service, PR_SHUTDOWN_BOTH); PR_ASSERT(PR_SUCCESS == rv);
    rv = PR_Close(service); PR_ASSERT(PR_SUCCESS == rv);

}  /* Server */

static PRStatus PR_CALLBACK MyClose(PRFileDesc *fd)
{
    PR_DELETE(fd->secret);  /* manage my secret file object */
    return (PR_GetDefaultIOMethods())->close(fd);  /* let him do all the work */
}  /* MyClose */

static PRInt16 PR_CALLBACK MyPoll(
    PRFileDesc *fd, PRInt16 in_flags, PRInt16 *out_flags)
{
    PRInt16 my_flags, new_flags;
    PRFilePrivate *mine = (PRFilePrivate*)fd->secret;
    if (0 != (PR_POLL_READ & in_flags))
    {
        /* client thinks he's reading */
        switch (mine->rcvstate)
        {
            case rcv_send_credit:
                my_flags = (in_flags & ~PR_POLL_READ) | PR_POLL_WRITE;
                break;
            case rcv_data:
            case rcv_get_debit:
                my_flags = in_flags;
            default: break;
        }
    }
    else if (0 != (PR_POLL_WRITE & in_flags))
    {
        /* client thinks he's writing */
        switch (mine->xmtstate)
        {
            case xmt_recv_credit:
                my_flags = (in_flags & ~PR_POLL_WRITE) | PR_POLL_READ;
                break;
            case xmt_send_debit:
            case xmt_data:
                my_flags = in_flags;
            default: break;
        }
    }
    else {
        PR_NOT_REACHED("How'd I get here?");
    }
    new_flags = (fd->lower->methods->poll)(fd->lower, my_flags, out_flags);
    if (verbosity > chatty)
        PR_fprintf(
            logFile, "Poll [i: 0x%x, m: 0x%x, o: 0x%x, n: 0x%x]\n",
            in_flags, my_flags, *out_flags, new_flags);
    return new_flags;
}  /* MyPoll */

static PRFileDesc * PR_CALLBACK MyAccept(
    PRFileDesc *fd, PRNetAddr *addr, PRIntervalTime timeout)
{
    PRStatus rv;
    PRFileDesc *newfd, *layer = fd;
    PRFileDesc *newstack;
    PRFilePrivate *newsecret;

    PR_ASSERT(fd != NULL);
    PR_ASSERT(fd->lower != NULL);

    newstack = PR_NEW(PRFileDesc);
    if (NULL == newstack)
    {
        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
        return NULL;
    }
    newsecret = PR_NEW(PRFilePrivate);
    if (NULL == newsecret)
    {
        PR_DELETE(newstack);
        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
        return NULL;
    }
    *newstack = *fd;  /* make a copy of the accepting layer */
    *newsecret = *fd->secret;
    newstack->secret = newsecret;

    newfd = (fd->lower->methods->accept)(fd->lower, addr, timeout);
    if (NULL == newfd)
    {
        PR_DELETE(newsecret);
        PR_DELETE(newstack);
        return NULL;
    }

    /* this PR_PushIOLayer call cannot fail */
    rv = PR_PushIOLayer(newfd, PR_TOP_IO_LAYER, newstack);
    PR_ASSERT(PR_SUCCESS == rv);
    return newfd;  /* that's it */
}

static PRInt32 PR_CALLBACK MyRecv(
    PRFileDesc *fd, void *buf, PRInt32 amount,
    PRIntn flags, PRIntervalTime timeout)
{
    char *b;
    PRInt32 rv;
    PRFileDesc *lo = fd->lower;
    PRFilePrivate *mine = (PRFilePrivate*)fd->secret;

    do
    {
        switch (mine->rcvstate)
        {
            case rcv_get_debit:
                b = (char*)&mine->rcvreq;
                mine->rcvreq = amount;
                rv = lo->methods->recv(
                         lo, b + mine->rcvinprogress,
                         sizeof(mine->rcvreq) - mine->rcvinprogress, flags, timeout);
                if (0 == rv) {
                    goto closed;
                }
                if ((-1 == rv) && (PR_WOULD_BLOCK_ERROR == PR_GetError())) {
                    break;
                }
                mine->rcvinprogress += rv;  /* accumulate the read */
                if (mine->rcvinprogress < sizeof(mine->rcvreq)) {
                    break;    /* loop */
                }
                mine->rcvstate = rcv_send_credit;
                mine->rcvinprogress = 0;
            case rcv_send_credit:
                b = (char*)&mine->rcvreq;
                rv = lo->methods->send(
                         lo, b + mine->rcvinprogress,
                         sizeof(mine->rcvreq) - mine->rcvinprogress, flags, timeout);
                if ((-1 == rv) && (PR_WOULD_BLOCK_ERROR == PR_GetError())) {
                    break;
                }
                mine->rcvinprogress += rv;  /* accumulate the read */
                if (mine->rcvinprogress < sizeof(mine->rcvreq)) {
                    break;    /* loop */
                }
                mine->rcvstate = rcv_data;
                mine->rcvinprogress = 0;
            case rcv_data:
                b = (char*)buf;
                rv = lo->methods->recv(
                         lo, b + mine->rcvinprogress,
                         mine->rcvreq - mine->rcvinprogress, flags, timeout);
                if (0 == rv) {
                    goto closed;
                }
                if ((-1 == rv) && (PR_WOULD_BLOCK_ERROR == PR_GetError())) {
                    break;
                }
                mine->rcvinprogress += rv;  /* accumulate the read */
                if (mine->rcvinprogress < amount) {
                    break;    /* loop */
                }
                mine->rcvstate = rcv_get_debit;
                mine->rcvinprogress = 0;
                return mine->rcvreq;  /* << -- that's it! */
            default:
                break;
        }
    } while (-1 != rv);
    return rv;
closed:
    mine->rcvinprogress = 0;
    mine->rcvstate = rcv_get_debit;
    return 0;
}  /* MyRecv */

static PRInt32 PR_CALLBACK MySend(
    PRFileDesc *fd, const void *buf, PRInt32 amount,
    PRIntn flags, PRIntervalTime timeout)
{
    char *b;
    PRInt32 rv;
    PRFileDesc *lo = fd->lower;
    PRFilePrivate *mine = (PRFilePrivate*)fd->secret;

    do
    {
        switch (mine->xmtstate)
        {
            case xmt_send_debit:
                b = (char*)&mine->xmtreq;
                mine->xmtreq = amount;
                rv = lo->methods->send(
                         lo, b - mine->xmtinprogress,
                         sizeof(mine->xmtreq) - mine->xmtinprogress, flags, timeout);
                if ((-1 == rv) && (PR_WOULD_BLOCK_ERROR == PR_GetError())) {
                    break;
                }
                mine->xmtinprogress += rv;
                if (mine->xmtinprogress < sizeof(mine->xmtreq)) {
                    break;
                }
                mine->xmtstate = xmt_recv_credit;
                mine->xmtinprogress = 0;
            case xmt_recv_credit:
                b = (char*)&mine->xmtreq;
                rv = lo->methods->recv(
                         lo, b + mine->xmtinprogress,
                         sizeof(mine->xmtreq) - mine->xmtinprogress, flags, timeout);
                if ((-1 == rv) && (PR_WOULD_BLOCK_ERROR == PR_GetError())) {
                    break;
                }
                mine->xmtinprogress += rv;
                if (mine->xmtinprogress < sizeof(mine->xmtreq)) {
                    break;
                }
                mine->xmtstate = xmt_data;
                mine->xmtinprogress = 0;
            case xmt_data:
                b = (char*)buf;
                rv = lo->methods->send(
                         lo, b + mine->xmtinprogress,
                         mine->xmtreq - mine->xmtinprogress, flags, timeout);
                if ((-1 == rv) && (PR_WOULD_BLOCK_ERROR == PR_GetError())) {
                    break;
                }
                mine->xmtinprogress += rv;
                if (mine->xmtinprogress < amount) {
                    break;
                }
                mine->xmtstate = xmt_send_debit;
                mine->xmtinprogress = 0;
                return mine->xmtreq;  /* <<-- That's the one! */
            default:
                break;
        }
    } while (-1 != rv);
    return rv;
}  /* MySend */

static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
{
    PRIntn verbage = (PRIntn)verbosity + delta;
    if (verbage < (PRIntn)silent) {
        verbage = (PRIntn)silent;
    }
    else if (verbage > (PRIntn)noisy) {
        verbage = (PRIntn)noisy;
    }
    return (Verbosity)verbage;
}  /* ChangeVerbosity */

int main(int argc, char **argv)
{
    PRStatus rv;
    PLOptStatus os;
    PRFileDesc *client, *service;
    PRNetAddr any_address;
    const char *server_name = NULL;
    const PRIOMethods *stubMethods;
    PRThread *client_thread, *server_thread;
    PRThreadScope thread_scope = PR_LOCAL_THREAD;
    PRSocketOptionData socket_noblock, socket_nodelay;
    PLOptState *opt = PL_CreateOptState(argc, argv, "dqGC:c:p:");
    while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
    {
        if (PL_OPT_BAD == os) {
            continue;
        }
        switch (opt->option)
        {
            case 0:
                server_name = opt->value;
                break;
            case 'd':  /* debug mode */
                if (verbosity < noisy) {
                    verbosity = ChangeVerbosity(verbosity, 1);
                }
                break;
            case 'q':  /* debug mode */
                if (verbosity > silent) {
                    verbosity = ChangeVerbosity(verbosity, -1);
                }
                break;
            case 'G':  /* use global threads */
                thread_scope = PR_GLOBAL_THREAD;
                break;
            case 'C':  /* number of threads waiting */
                major_iterations = atoi(opt->value);
                break;
            case 'c':  /* number of client threads */
                minor_iterations = atoi(opt->value);
                break;
            case 'p':  /* default port */
                default_port = atoi(opt->value);
                break;
            default:
                break;
        }
    }
    PL_DestroyOptState(opt);
    PR_STDIO_INIT();

    logFile = PR_GetSpecialFD(PR_StandardError);
    identity = PR_GetUniqueIdentity("Dummy");
    stubMethods = PR_GetDefaultIOMethods();

    /*
    ** The protocol we're going to implement is one where in order to initiate
    ** a send, the sender must first solicit permission. Therefore, every
    ** send is really a send - receive - send sequence.
    */
    myMethods = *stubMethods;  /* first get the entire batch */
    myMethods.accept = MyAccept;  /* then override the ones we care about */
    myMethods.recv = MyRecv;  /* then override the ones we care about */
    myMethods.send = MySend;  /* then override the ones we care about */
    myMethods.close = MyClose;  /* then override the ones we care about */
    myMethods.poll = MyPoll;  /* then override the ones we care about */

    if (NULL == server_name)
        rv = PR_InitializeNetAddr(
                 PR_IpAddrLoopback, default_port, &server_address);
    else
    {
        rv = PR_StringToNetAddr(server_name, &server_address);
        PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_InitializeNetAddr(
                 PR_IpAddrNull, default_port, &server_address);
    }
    PR_ASSERT(PR_SUCCESS == rv);

    socket_noblock.value.non_blocking = PR_TRUE;
    socket_noblock.option = PR_SockOpt_Nonblocking;
    socket_nodelay.value.no_delay = PR_TRUE;
    socket_nodelay.option = PR_SockOpt_NoDelay;

    /* one type w/o layering */

    while (major_iterations-- > 0)
    {
        if (verbosity > silent) {
            PR_fprintf(logFile, "Beginning non-layered test\n");
        }

        client = PR_NewTCPSocket(); PR_ASSERT(NULL != client);
        service = PR_NewTCPSocket(); PR_ASSERT(NULL != service);

        rv = PR_SetSocketOption(client, &socket_noblock);
        PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_SetSocketOption(service, &socket_noblock);
        PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_SetSocketOption(client, &socket_nodelay);
        PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_SetSocketOption(service, &socket_nodelay);
        PR_ASSERT(PR_SUCCESS == rv);

        rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &any_address);
        PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_Bind(service, &any_address); PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_Listen(service, 10); PR_ASSERT(PR_SUCCESS == rv);

        server_thread = PR_CreateThread(
                            PR_USER_THREAD, Server, service,
                            PR_PRIORITY_HIGH, thread_scope,
                            PR_JOINABLE_THREAD, 16 * 1024);
        PR_ASSERT(NULL != server_thread);

        client_thread = PR_CreateThread(
                            PR_USER_THREAD, Client, client,
                            PR_PRIORITY_NORMAL, thread_scope,
                            PR_JOINABLE_THREAD, 16 * 1024);
        PR_ASSERT(NULL != client_thread);

        rv = PR_JoinThread(client_thread);
        PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_JoinThread(server_thread);
        PR_ASSERT(PR_SUCCESS == rv);

        rv = PR_Close(client); PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_Close(service); PR_ASSERT(PR_SUCCESS == rv);
        if (verbosity > silent) {
            PR_fprintf(logFile, "Ending non-layered test\n");
        }

        /* with layering */
        if (verbosity > silent) {
            PR_fprintf(logFile, "Beginning layered test\n");
        }
        client = PR_NewTCPSocket(); PR_ASSERT(NULL != client);
        service = PR_NewTCPSocket(); PR_ASSERT(NULL != service);

        rv = PR_SetSocketOption(client, &socket_noblock);
        PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_SetSocketOption(service, &socket_noblock);
        PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_SetSocketOption(client, &socket_nodelay);
        PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_SetSocketOption(service, &socket_nodelay);
        PR_ASSERT(PR_SUCCESS == rv);

        PushLayer(client);
        PushLayer(service);

        rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &any_address);
        PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_Bind(service, &any_address); PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_Listen(service, 10); PR_ASSERT(PR_SUCCESS == rv);

        server_thread = PR_CreateThread(
                            PR_USER_THREAD, Server, service,
                            PR_PRIORITY_HIGH, thread_scope,
                            PR_JOINABLE_THREAD, 16 * 1024);
        PR_ASSERT(NULL != server_thread);

        client_thread = PR_CreateThread(
                            PR_USER_THREAD, Client, client,
                            PR_PRIORITY_NORMAL, thread_scope,
                            PR_JOINABLE_THREAD, 16 * 1024);
        PR_ASSERT(NULL != client_thread);

        rv = PR_JoinThread(client_thread);
        PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_JoinThread(server_thread);
        PR_ASSERT(PR_SUCCESS == rv);

        rv = PR_Close(PopLayer(client)); PR_ASSERT(PR_SUCCESS == rv);
        rv = PR_Close(PopLayer(service)); PR_ASSERT(PR_SUCCESS == rv);
        if (verbosity > silent) {
            PR_fprintf(logFile, "Ending layered test\n");
        }
    }
    return 0;
}  /* main */

/* nblayer.c */