/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include <algorithm>
#include "gtest/gtest.h"
#include "Helpers.h"
#include "mozilla/ReentrantMonitor.h"
#include "nsCOMPtr.h"
#include "nsCRT.h"
#include "nsIAsyncInputStream.h"
#include "nsIAsyncOutputStream.h"
#include "nsIBufferedStreams.h"
#include "nsIClassInfo.h"
#include "nsICloneableInputStream.h"
#include "nsIInputStream.h"
#include "nsIOutputStream.h"
#include "nsIPipe.h"
#include "nsISeekableStream.h"
#include "nsIThread.h"
#include "nsIRunnable.h"
#include "nsStreamUtils.h"
#include "nsString.h"
#include "nsThreadUtils.h"
#include "prprf.h"
#include "prinrval.h"

using namespace mozilla;

#define ITERATIONS      33333
char kTestPattern[] = "My hovercraft is full of eels.\n";

bool gTrace = false;

static nsresult
WriteAll(nsIOutputStream *os, const char *buf, uint32_t bufLen, uint32_t *lenWritten)
{
    const char *p = buf;
    *lenWritten = 0;
    while (bufLen) {
        uint32_t n;
        nsresult rv = os->Write(p, bufLen, &n);
        if (NS_FAILED(rv)) return rv;
        p += n;
        bufLen -= n;
        *lenWritten += n;
    }
    return NS_OK;
}

class nsReceiver final : public nsIRunnable {
public:
    NS_DECL_THREADSAFE_ISUPPORTS

    NS_IMETHOD Run() override {
        nsresult rv;
        char buf[101];
        uint32_t count;
        PRIntervalTime start = PR_IntervalNow();
        while (true) {
            rv = mIn->Read(buf, 100, &count);
            if (NS_FAILED(rv)) {
                printf("read failed\n");
                break;
            }
            if (count == 0) {
//                printf("EOF count = %d\n", mCount);
                break;
            }

            if (gTrace) {
                buf[count] = '\0';
                printf("read: %s\n", buf);
            }
            mCount += count;
        }
        PRIntervalTime end = PR_IntervalNow();
        printf("read  %d bytes, time = %dms\n", mCount,
               PR_IntervalToMilliseconds(end - start));
        return rv;
    }

    explicit nsReceiver(nsIInputStream* in) : mIn(in), mCount(0) {
    }

    uint32_t GetBytesRead() { return mCount; }

private:
    ~nsReceiver() {}

protected:
    nsCOMPtr<nsIInputStream> mIn;
    uint32_t            mCount;
};

NS_IMPL_ISUPPORTS(nsReceiver, nsIRunnable)

nsresult
TestPipe(nsIInputStream* in, nsIOutputStream* out)
{
    RefPtr<nsReceiver> receiver = new nsReceiver(in);
    if (!receiver)
        return NS_ERROR_OUT_OF_MEMORY;

    nsresult rv;

    nsCOMPtr<nsIThread> thread;
    rv = NS_NewThread(getter_AddRefs(thread), receiver);
    if (NS_FAILED(rv)) return rv;

    uint32_t total = 0;
    PRIntervalTime start = PR_IntervalNow();
    for (uint32_t i = 0; i < ITERATIONS; i++) {
        uint32_t writeCount;
        char *buf = PR_smprintf("%d %s", i, kTestPattern);
        uint32_t len = strlen(buf);
        rv = WriteAll(out, buf, len, &writeCount);
        if (gTrace) {
            printf("wrote: ");
            for (uint32_t j = 0; j < writeCount; j++) {
                putc(buf[j], stdout);
            }
            printf("\n");
        }
        PR_smprintf_free(buf);
        if (NS_FAILED(rv)) return rv;
        total += writeCount;
    }
    rv = out->Close();
    if (NS_FAILED(rv)) return rv;

    PRIntervalTime end = PR_IntervalNow();

    thread->Shutdown();

    printf("wrote %d bytes, time = %dms\n", total,
           PR_IntervalToMilliseconds(end - start));
    EXPECT_EQ(receiver->GetBytesRead(), total);

    return NS_OK;
}

////////////////////////////////////////////////////////////////////////////////

class nsShortReader final : public nsIRunnable {
public:
    NS_DECL_THREADSAFE_ISUPPORTS

    NS_IMETHOD Run() override {
        nsresult rv;
        char buf[101];
        uint32_t count;
        uint32_t total = 0;
        while (true) {
            //if (gTrace)
            //    printf("calling Read\n");
            rv = mIn->Read(buf, 100, &count);
            if (NS_FAILED(rv)) {
                printf("read failed\n");
                break;
            }
            if (count == 0) {
                break;
            }

            if (gTrace) {
                // For next |printf()| call and possible others elsewhere.
                buf[count] = '\0';

                printf("read %d bytes: %s\n", count, buf);
            }

            Received(count);
            total += count;
        }
        printf("read  %d bytes\n", total);
        return rv;
    }

    explicit nsShortReader(nsIInputStream* in) : mIn(in), mReceived(0) {
        mMon = new ReentrantMonitor("nsShortReader");
    }

    void Received(uint32_t count) {
        ReentrantMonitorAutoEnter mon(*mMon);
        mReceived += count;
        mon.Notify();
    }

    uint32_t WaitForReceipt(const uint32_t aWriteCount) {
        ReentrantMonitorAutoEnter mon(*mMon);
        uint32_t result = mReceived;

        while (result < aWriteCount) {
            mon.Wait();

            EXPECT_TRUE(mReceived > result);
            result = mReceived;
        }

        mReceived = 0;
        return result;
    }

private:
    ~nsShortReader() {}

protected:
    nsCOMPtr<nsIInputStream> mIn;
    uint32_t                 mReceived;
    ReentrantMonitor*        mMon;
};

NS_IMPL_ISUPPORTS(nsShortReader, nsIRunnable)

nsresult
TestShortWrites(nsIInputStream* in, nsIOutputStream* out)
{
    RefPtr<nsShortReader> receiver = new nsShortReader(in);
    if (!receiver)
        return NS_ERROR_OUT_OF_MEMORY;

    nsresult rv;

    nsCOMPtr<nsIThread> thread;
    rv = NS_NewThread(getter_AddRefs(thread), receiver);
    if (NS_FAILED(rv)) return rv;

    uint32_t total = 0;
    for (uint32_t i = 0; i < ITERATIONS; i++) {
        uint32_t writeCount;
        char* buf = PR_smprintf("%d %s", i, kTestPattern);
        uint32_t len = strlen(buf);
        len = len * rand() / RAND_MAX;
        len = std::min(1u, len);
        rv = WriteAll(out, buf, len, &writeCount);
        if (NS_FAILED(rv)) return rv;
        EXPECT_EQ(writeCount, len);
        total += writeCount;

        if (gTrace)
            printf("wrote %d bytes: %s\n", writeCount, buf);
        PR_smprintf_free(buf);
        //printf("calling Flush\n");
        out->Flush();
        //printf("calling WaitForReceipt\n");

#ifdef DEBUG
        const uint32_t received =
          receiver->WaitForReceipt(writeCount);
        EXPECT_EQ(received, writeCount);
#endif
    }
    rv = out->Close();
    if (NS_FAILED(rv)) return rv;

    thread->Shutdown();

    printf("wrote %d bytes\n", total);

    return NS_OK;
}

////////////////////////////////////////////////////////////////////////////////

class nsPump final : public nsIRunnable
{
public:
    NS_DECL_THREADSAFE_ISUPPORTS

    NS_IMETHOD Run() override {
        nsresult rv;
        uint32_t count;
        while (true) {
            rv = mOut->WriteFrom(mIn, ~0U, &count);
            if (NS_FAILED(rv)) {
                printf("Write failed\n");
                break;
            }
            if (count == 0) {
                printf("EOF count = %d\n", mCount);
                break;
            }

            if (gTrace) {
                printf("Wrote: %d\n", count);
            }
            mCount += count;
        }
        mOut->Close();
        return rv;
    }

    nsPump(nsIInputStream* in,
           nsIOutputStream* out)
        : mIn(in), mOut(out), mCount(0) {
    }

private:
    ~nsPump() {}

protected:
    nsCOMPtr<nsIInputStream>      mIn;
    nsCOMPtr<nsIOutputStream>     mOut;
    uint32_t                            mCount;
};

NS_IMPL_ISUPPORTS(nsPump, nsIRunnable)

TEST(Pipes, ChainedPipes)
{
    nsresult rv;
    if (gTrace) {
        printf("TestChainedPipes\n");
    }

    nsCOMPtr<nsIInputStream> in1;
    nsCOMPtr<nsIOutputStream> out1;
    rv = NS_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999);
    if (NS_FAILED(rv)) return;

    nsCOMPtr<nsIInputStream> in2;
    nsCOMPtr<nsIOutputStream> out2;
    rv = NS_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401);
    if (NS_FAILED(rv)) return;

    RefPtr<nsPump> pump = new nsPump(in1, out2);
    if (pump == nullptr) return;

    nsCOMPtr<nsIThread> thread;
    rv = NS_NewThread(getter_AddRefs(thread), pump);
    if (NS_FAILED(rv)) return;

    RefPtr<nsReceiver> receiver = new nsReceiver(in2);
    if (receiver == nullptr) return;

    nsCOMPtr<nsIThread> receiverThread;
    rv = NS_NewThread(getter_AddRefs(receiverThread), receiver);
    if (NS_FAILED(rv)) return;

    uint32_t total = 0;
    for (uint32_t i = 0; i < ITERATIONS; i++) {
        uint32_t writeCount;
        char* buf = PR_smprintf("%d %s", i, kTestPattern);
        uint32_t len = strlen(buf);
        len = len * rand() / RAND_MAX;
        len = std::max(1u, len);
        rv = WriteAll(out1, buf, len, &writeCount);
        if (NS_FAILED(rv)) return;
        EXPECT_EQ(writeCount, len);
        total += writeCount;

        if (gTrace)
            printf("wrote %d bytes: %s\n", writeCount, buf);

        PR_smprintf_free(buf);
    }
    if (gTrace) {
        printf("wrote total of %d bytes\n", total);
    }
    rv = out1->Close();
    if (NS_FAILED(rv)) return;

    thread->Shutdown();
    receiverThread->Shutdown();
}

////////////////////////////////////////////////////////////////////////////////

void
RunTests(uint32_t segSize, uint32_t segCount)
{
    nsresult rv;
    nsCOMPtr<nsIInputStream> in;
    nsCOMPtr<nsIOutputStream> out;
    uint32_t bufSize = segSize * segCount;
    if (gTrace) {
        printf("Testing New Pipes: segment size %d buffer size %d\n", segSize, bufSize);
        printf("Testing long writes...\n");
    }
    rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
    EXPECT_TRUE(NS_SUCCEEDED(rv));
    rv = TestPipe(in, out);
    EXPECT_TRUE(NS_SUCCEEDED(rv));

    if (gTrace) {
        printf("Testing short writes...\n");
    }
    rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
    EXPECT_TRUE(NS_SUCCEEDED(rv));
    rv = TestShortWrites(in, out);
    EXPECT_TRUE(NS_SUCCEEDED(rv));
}

TEST(Pipes, Main)
{
    RunTests(16, 1);
    RunTests(4096, 16);
}

////////////////////////////////////////////////////////////////////////////////

namespace {

static const uint32_t DEFAULT_SEGMENT_SIZE = 4 * 1024;

// An alternate pipe testing routing that uses NS_ConsumeStream() instead of
// manual read loop.
static void TestPipe2(uint32_t aNumBytes,
                      uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE)
{
  nsCOMPtr<nsIInputStream> reader;
  nsCOMPtr<nsIOutputStream> writer;

  uint32_t maxSize = std::max(aNumBytes, aSegmentSize);

  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
                           aSegmentSize, maxSize);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsTArray<char> inputData;
  testing::CreateData(aNumBytes, inputData);
  testing::WriteAllAndClose(writer, inputData);
  testing::ConsumeAndValidateStream(reader, inputData);
}

} // namespace

TEST(Pipes, Blocking_32k)
{
  TestPipe2(32 * 1024);
}

TEST(Pipes, Blocking_64k)
{
  TestPipe2(64 * 1024);
}

TEST(Pipes, Blocking_128k)
{
  TestPipe2(128 * 1024);
}

////////////////////////////////////////////////////////////////////////////////

namespace {

// Utility routine to validate pipe clone before.  There are many knobs.
//
// aTotalBytes              Total number of bytes to write to the pipe.
// aNumWrites               How many separate write calls should be made.  Bytes
//                          are evenly distributed over these write calls.
// aNumInitialClones        How many clones of the pipe input stream should be
//                          made before writing begins.
// aNumToCloseAfterWrite    How many streams should be closed after each write.
//                          One stream is always kept open.  This verifies that
//                          closing one stream does not effect other open
//                          streams.
// aNumToCloneAfterWrite    How many clones to create after each write.  Occurs
//                          after closing any streams.  This tests cloning
//                          active streams on a pipe that is being written to.
// aNumStreamToReadPerWrite How many streams to read fully after each write.
//                          This tests reading cloned streams at different rates
//                          while the pipe is being written to.
static void TestPipeClone(uint32_t aTotalBytes,
                          uint32_t aNumWrites,
                          uint32_t aNumInitialClones,
                          uint32_t aNumToCloseAfterWrite,
                          uint32_t aNumToCloneAfterWrite,
                          uint32_t aNumStreamsToReadPerWrite,
                          uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE)
{
  nsCOMPtr<nsIInputStream> reader;
  nsCOMPtr<nsIOutputStream> writer;

  uint32_t maxSize = std::max(aTotalBytes, aSegmentSize);

  // Use async input streams so we can NS_ConsumeStream() the current data
  // while the pipe is still being written to.
  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
                           aSegmentSize, maxSize,
                           true, false); // non-blocking - reader, writer
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(reader);
  ASSERT_TRUE(cloneable);
  ASSERT_TRUE(cloneable->GetCloneable());

  nsTArray<nsCString> outputDataList;

  nsTArray<nsCOMPtr<nsIInputStream>> streamList;

  // first stream is our original reader from the pipe
  streamList.AppendElement(reader);
  outputDataList.AppendElement();

  // Clone the initial input stream the specified number of times
  // before performing any writes.
  for (uint32_t i = 0; i < aNumInitialClones; ++i) {
    nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
    rv = cloneable->Clone(getter_AddRefs(*clone));
    ASSERT_TRUE(NS_SUCCEEDED(rv));
    ASSERT_TRUE(*clone);

    outputDataList.AppendElement();
  }

  nsTArray<char> inputData;
  testing::CreateData(aTotalBytes, inputData);

  const uint32_t bytesPerWrite = ((aTotalBytes - 1)/ aNumWrites) + 1;
  uint32_t offset = 0;
  uint32_t remaining = aTotalBytes;
  uint32_t nextStreamToRead = 0;

  while (remaining) {
    uint32_t numToWrite = std::min(bytesPerWrite, remaining);
    testing::Write(writer, inputData, offset, numToWrite);
    offset += numToWrite;
    remaining -= numToWrite;

    // Close the specified number of streams.  This allows us to
    // test that one closed clone does not break other open clones.
    for (uint32_t i = 0; i < aNumToCloseAfterWrite &&
                         streamList.Length() > 1; ++i) {

      uint32_t lastIndex = streamList.Length() - 1;
      streamList[lastIndex]->Close();
      streamList.RemoveElementAt(lastIndex);
      outputDataList.RemoveElementAt(lastIndex);

      if (nextStreamToRead >= streamList.Length()) {
        nextStreamToRead = 0;
      }
    }

    // Create the specified number of clones.  This lets us verify
    // that we can create clones in the middle of pipe reading and
    // writing.
    for (uint32_t i = 0; i < aNumToCloneAfterWrite; ++i) {
      nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
      rv = cloneable->Clone(getter_AddRefs(*clone));
      ASSERT_TRUE(NS_SUCCEEDED(rv));
      ASSERT_TRUE(*clone);

      // Initialize the new output data to make whats been read to data for
      // the original stream.  First stream is always the original stream.
      nsCString* outputData = outputDataList.AppendElement();
      *outputData = outputDataList[0];
    }

    // Read the specified number of streams.  This lets us verify that we
    // can read from the clones at different rates while the pipe is being
    // written to.
    for (uint32_t i = 0; i < aNumStreamsToReadPerWrite; ++i) {
      nsCOMPtr<nsIInputStream>& stream = streamList[nextStreamToRead];
      nsCString& outputData = outputDataList[nextStreamToRead];

      // Can't use ConsumeAndValidateStream() here because we're not
      // guaranteed the exact amount read.  It should just be at least
      // as many as numToWrite.
      nsAutoCString tmpOutputData;
      rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
      ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
      ASSERT_GE(tmpOutputData.Length(), numToWrite);

      outputData += tmpOutputData;

      nextStreamToRead += 1;
      if (nextStreamToRead >= streamList.Length()) {
        // Note: When we wrap around on the streams being read, its possible
        //       we will trigger a segment to be deleted from the pipe.  It
        //       would be nice to validate this here, but we don't have any
        //       QI'able interface that would let us check easily.

        nextStreamToRead = 0;
      }
    }
  }

  rv = writer->Close();
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsDependentCSubstring inputString(inputData.Elements(), inputData.Length());

  // Finally, read the remaining bytes from each stream.  This may be
  // different amounts of data depending on how much reading we did while
  // writing.  Verify that the end result matches the input data.
  for (uint32_t i = 0; i < streamList.Length(); ++i) {
    nsCOMPtr<nsIInputStream>& stream = streamList[i];
    nsCString& outputData = outputDataList[i];

    nsAutoCString tmpOutputData;
    rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
    ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
    stream->Close();

    // Append to total amount read from the stream
    outputData += tmpOutputData;

    ASSERT_EQ(inputString.Length(), outputData.Length());
    ASSERT_TRUE(inputString.Equals(outputData));
  }
}

} // namespace

TEST(Pipes, Clone_BeforeWrite_ReadAtEnd)
{
  TestPipeClone(32 * 1024, // total bytes
                16,        // num writes
                3,         // num initial clones
                0,         // num streams to close after each write
                0,         // num clones to add after each write
                0);        // num streams to read after each write
}

TEST(Pipes, Clone_BeforeWrite_ReadDuringWrite)
{
  // Since this reads all streams on every write, it should trigger the
  // pipe cursor roll back optimization.  Currently we can only verify
  // this with logging.

  TestPipeClone(32 * 1024, // total bytes
                16,        // num writes
                3,         // num initial clones
                0,         // num streams to close after each write
                0,         // num clones to add after each write
                4);        // num streams to read after each write
}

TEST(Pipes, Clone_DuringWrite_ReadAtEnd)
{
  TestPipeClone(32 * 1024, // total bytes
                16,        // num writes
                0,         // num initial clones
                0,         // num streams to close after each write
                1,         // num clones to add after each write
                0);        // num streams to read after each write
}

TEST(Pipes, Clone_DuringWrite_ReadDuringWrite)
{
  TestPipeClone(32 * 1024, // total bytes
                16,        // num writes
                0,         // num initial clones
                0,         // num streams to close after each write
                1,         // num clones to add after each write
                1);        // num streams to read after each write
}

TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)
{
  // Since this reads streams faster than we clone new ones, it should
  // trigger pipe segment deletion periodically.  Currently we can
  // only verify this with logging.

  TestPipeClone(32 * 1024, // total bytes
                16,        // num writes
                1,         // num initial clones
                1,         // num streams to close after each write
                2,         // num clones to add after each write
                3);        // num streams to read after each write
}

TEST(Pipes, Write_AsyncWait)
{
  nsCOMPtr<nsIAsyncInputStream> reader;
  nsCOMPtr<nsIAsyncOutputStream> writer;

  const uint32_t segmentSize = 1024;
  const uint32_t numSegments = 1;

  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
                            true, true,  // non-blocking - reader, writer
                            segmentSize, numSegments);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsTArray<char> inputData;
  testing::CreateData(segmentSize, inputData);

  uint32_t numWritten = 0;
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);

  RefPtr<testing::OutputStreamCallback> cb =
    new testing::OutputStreamCallback();

  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  ASSERT_FALSE(cb->Called());

  testing::ConsumeAndValidateStream(reader, inputData);

  ASSERT_TRUE(cb->Called());
}

TEST(Pipes, Write_AsyncWait_Clone)
{
  nsCOMPtr<nsIAsyncInputStream> reader;
  nsCOMPtr<nsIAsyncOutputStream> writer;

  const uint32_t segmentSize = 1024;
  const uint32_t numSegments = 1;

  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
                            true, true,  // non-blocking - reader, writer
                            segmentSize, numSegments);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsCOMPtr<nsIInputStream> clone;
  rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsTArray<char> inputData;
  testing::CreateData(segmentSize, inputData);

  uint32_t numWritten = 0;
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  // This attempts to write data beyond the original pipe size limit.  It
  // should fail since neither side of the clone has been read yet.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);

  RefPtr<testing::OutputStreamCallback> cb =
    new testing::OutputStreamCallback();

  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  ASSERT_FALSE(cb->Called());

  // Consume data on the original stream, but the clone still has not been read.
  testing::ConsumeAndValidateStream(reader, inputData);

  // A clone that is not being read should not stall the other input stream
  // reader.  Therefore the writer callback should trigger when the fastest
  // reader drains the other input stream.
  ASSERT_TRUE(cb->Called());

  // Attempt to write data.  This will buffer data beyond the pipe size limit in
  // order for the clone stream to still work.  This is allowed because the
  // other input stream has drained its buffered segments and is ready for more
  // data.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  // Again, this should fail since the origin stream has not been read again.
  // The pipe size should still restrict how far ahead we can buffer even
  // when there is a cloned stream not being read.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_FAILED(rv));

  cb = new testing::OutputStreamCallback();
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  // The write should again be blocked since we have written data and the
  // main reader is at its maximum advance buffer.
  ASSERT_FALSE(cb->Called());

  nsTArray<char> expectedCloneData;
  expectedCloneData.AppendElements(inputData);
  expectedCloneData.AppendElements(inputData);

  // We should now be able to consume the entire backlog of buffered data on
  // the cloned stream.
  testing::ConsumeAndValidateStream(clone, expectedCloneData);

  // Draining the clone side should also trigger the AsyncWait() writer
  // callback
  ASSERT_TRUE(cb->Called());

  // Finally, we should be able to consume the remaining data on the original
  // reader.
  testing::ConsumeAndValidateStream(reader, inputData);
}

TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal)
{
  nsCOMPtr<nsIAsyncInputStream> reader;
  nsCOMPtr<nsIAsyncOutputStream> writer;

  const uint32_t segmentSize = 1024;
  const uint32_t numSegments = 1;

  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
                            true, true,  // non-blocking - reader, writer
                            segmentSize, numSegments);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsCOMPtr<nsIInputStream> clone;
  rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsTArray<char> inputData;
  testing::CreateData(segmentSize, inputData);

  uint32_t numWritten = 0;
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  // This attempts to write data beyond the original pipe size limit.  It
  // should fail since neither side of the clone has been read yet.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);

  RefPtr<testing::OutputStreamCallback> cb =
    new testing::OutputStreamCallback();

  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  ASSERT_FALSE(cb->Called());

  // Consume data on the original stream, but the clone still has not been read.
  testing::ConsumeAndValidateStream(reader, inputData);

  // A clone that is not being read should not stall the other input stream
  // reader.  Therefore the writer callback should trigger when the fastest
  // reader drains the other input stream.
  ASSERT_TRUE(cb->Called());

  // Attempt to write data.  This will buffer data beyond the pipe size limit in
  // order for the clone stream to still work.  This is allowed because the
  // other input stream has drained its buffered segments and is ready for more
  // data.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  // Again, this should fail since the origin stream has not been read again.
  // The pipe size should still restrict how far ahead we can buffer even
  // when there is a cloned stream not being read.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_FAILED(rv));

  cb = new testing::OutputStreamCallback();
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  // The write should again be blocked since we have written data and the
  // main reader is at its maximum advance buffer.
  ASSERT_FALSE(cb->Called());

  // Close the original reader input stream.  This was the fastest reader,
  // so we should have a single stream that is buffered beyond our nominal
  // limit.
  reader->Close();

  // Because the clone stream is still buffered the writable callback should
  // not be fired.
  ASSERT_FALSE(cb->Called());

  // And we should not be able to perform a write.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_FAILED(rv));

  // Create another clone stream.  Now we have two streams that exceed our
  // maximum size limit
  nsCOMPtr<nsIInputStream> clone2;
  rv = NS_CloneInputStream(clone, getter_AddRefs(clone2));
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsTArray<char> expectedCloneData;
  expectedCloneData.AppendElements(inputData);
  expectedCloneData.AppendElements(inputData);

  // We should now be able to consume the entire backlog of buffered data on
  // the cloned stream.
  testing::ConsumeAndValidateStream(clone, expectedCloneData);

  // The pipe should now be writable because we have two open streams, one of which
  // is completely drained.
  ASSERT_TRUE(cb->Called());

  // Write again to reach our limit again.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  // The stream is again non-writeable.
  cb = new testing::OutputStreamCallback();
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_TRUE(NS_SUCCEEDED(rv));
  ASSERT_FALSE(cb->Called());

  // Close the empty stream.  This is different from our previous close since
  // before we were closing a stream with some data still buffered.
  clone->Close();

  // The pipe should not be writable.  The second clone is still fully buffered
  // over our limit.
  ASSERT_FALSE(cb->Called());
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_FAILED(rv));

  // Finally consume all of the buffered data on the second clone.
  expectedCloneData.AppendElements(inputData);
  testing::ConsumeAndValidateStream(clone2, expectedCloneData);

  // Draining the final clone should make the pipe writable again.
  ASSERT_TRUE(cb->Called());
}

TEST(Pipes, Read_AsyncWait)
{
  nsCOMPtr<nsIAsyncInputStream> reader;
  nsCOMPtr<nsIAsyncOutputStream> writer;

  const uint32_t segmentSize = 1024;
  const uint32_t numSegments = 1;

  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
                            true, true,  // non-blocking - reader, writer
                            segmentSize, numSegments);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsTArray<char> inputData;
  testing::CreateData(segmentSize, inputData);

  RefPtr<testing::InputStreamCallback> cb =
    new testing::InputStreamCallback();

  rv = reader->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  ASSERT_FALSE(cb->Called());

  uint32_t numWritten = 0;
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  ASSERT_TRUE(cb->Called());

  testing::ConsumeAndValidateStream(reader, inputData);
}

TEST(Pipes, Read_AsyncWait_Clone)
{
  nsCOMPtr<nsIAsyncInputStream> reader;
  nsCOMPtr<nsIAsyncOutputStream> writer;

  const uint32_t segmentSize = 1024;
  const uint32_t numSegments = 1;

  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
                            true, true,  // non-blocking - reader, writer
                            segmentSize, numSegments);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsCOMPtr<nsIInputStream> clone;
  rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsCOMPtr<nsIAsyncInputStream> asyncClone = do_QueryInterface(clone);
  ASSERT_TRUE(asyncClone);

  nsTArray<char> inputData;
  testing::CreateData(segmentSize, inputData);

  RefPtr<testing::InputStreamCallback> cb =
    new testing::InputStreamCallback();

  RefPtr<testing::InputStreamCallback> cb2 =
    new testing::InputStreamCallback();

  rv = reader->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  ASSERT_FALSE(cb->Called());

  rv = asyncClone->AsyncWait(cb2, 0, 0, nullptr);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  ASSERT_FALSE(cb2->Called());

  uint32_t numWritten = 0;
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  ASSERT_TRUE(cb->Called());
  ASSERT_TRUE(cb2->Called());

  testing::ConsumeAndValidateStream(reader, inputData);
}

namespace {

nsresult
CloseDuringReadFunc(nsIInputStream *aReader,
                    void* aClosure,
                    const char* aFromSegment,
                    uint32_t aToOffset,
                    uint32_t aCount,
                    uint32_t* aWriteCountOut)
{
  MOZ_RELEASE_ASSERT(aReader);
  MOZ_RELEASE_ASSERT(aClosure);
  MOZ_RELEASE_ASSERT(aFromSegment);
  MOZ_RELEASE_ASSERT(aWriteCountOut);
  MOZ_RELEASE_ASSERT(aToOffset == 0);

  // This is insanity and you probably should not do this under normal
  // conditions.  We want to simulate the case where the pipe is closed
  // (possibly from other end on another thread) simultaneously with the
  // read.  This is the easiest way to do trigger this case in a synchronous
  // gtest.
  MOZ_ALWAYS_SUCCEEDS(aReader->Close());

  nsTArray<char>* buffer = static_cast<nsTArray<char>*>(aClosure);
  buffer->AppendElements(aFromSegment, aCount);

  *aWriteCountOut = aCount;

  return NS_OK;
}

void
TestCloseDuringRead(uint32_t aSegmentSize, uint32_t aDataSize)
{
  nsCOMPtr<nsIInputStream> reader;
  nsCOMPtr<nsIOutputStream> writer;

  const uint32_t maxSize = aSegmentSize;

  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
                           aSegmentSize, maxSize);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsTArray<char> inputData;

  testing::CreateData(aDataSize, inputData);

  uint32_t numWritten = 0;
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsTArray<char> outputData;

  uint32_t numRead = 0;
  rv = reader->ReadSegments(CloseDuringReadFunc, &outputData,
                            inputData.Length(), &numRead);
  ASSERT_TRUE(NS_SUCCEEDED(rv));
  ASSERT_EQ(inputData.Length(), numRead);

  ASSERT_EQ(inputData, outputData);

  uint64_t available;
  rv = reader->Available(&available);
  ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv);
}

} // namespace

TEST(Pipes, Close_During_Read_Partial_Segment)
{
  TestCloseDuringRead(1024, 512);
}

TEST(Pipes, Close_During_Read_Full_Segment)
{
  TestCloseDuringRead(1024, 1024);
}

TEST(Pipes, Interfaces)
{
  nsCOMPtr<nsIInputStream> reader;
  nsCOMPtr<nsIOutputStream> writer;

  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer));
  ASSERT_TRUE(NS_SUCCEEDED(rv));

  nsCOMPtr<nsIAsyncInputStream> readerType1 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType1);

  nsCOMPtr<nsISeekableStream> readerType2 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType2);

  nsCOMPtr<nsISearchableInputStream> readerType3 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType3);

  nsCOMPtr<nsICloneableInputStream> readerType4 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType4);

  nsCOMPtr<nsIClassInfo> readerType5 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType5);

  nsCOMPtr<nsIBufferedInputStream> readerType6 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType6);
}