diff options
Diffstat (limited to 'xpcom/io/nsPipe3.cpp')
-rw-r--r-- | xpcom/io/nsPipe3.cpp | 2007 |
1 files changed, 2007 insertions, 0 deletions
diff --git a/xpcom/io/nsPipe3.cpp b/xpcom/io/nsPipe3.cpp new file mode 100644 index 000000000..56932adfc --- /dev/null +++ b/xpcom/io/nsPipe3.cpp @@ -0,0 +1,2007 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include <algorithm> +#include "mozilla/Attributes.h" +#include "mozilla/ReentrantMonitor.h" +#include "nsIBufferedStreams.h" +#include "nsICloneableInputStream.h" +#include "nsIPipe.h" +#include "nsIEventTarget.h" +#include "nsISeekableStream.h" +#include "mozilla/RefPtr.h" +#include "nsSegmentedBuffer.h" +#include "nsStreamUtils.h" +#include "nsCOMPtr.h" +#include "nsCRT.h" +#include "mozilla/Logging.h" +#include "nsIClassInfoImpl.h" +#include "nsAlgorithm.h" +#include "nsMemory.h" +#include "nsIAsyncInputStream.h" +#include "nsIAsyncOutputStream.h" + +using namespace mozilla; + +#ifdef LOG +#undef LOG +#endif +// +// set MOZ_LOG=nsPipe:5 +// +static LazyLogModule sPipeLog("nsPipe"); +#define LOG(args) MOZ_LOG(sPipeLog, mozilla::LogLevel::Debug, args) + +#define DEFAULT_SEGMENT_SIZE 4096 +#define DEFAULT_SEGMENT_COUNT 16 + +class nsPipe; +class nsPipeEvents; +class nsPipeInputStream; +class nsPipeOutputStream; +class AutoReadSegment; + +namespace { + +enum MonitorAction +{ + DoNotNotifyMonitor, + NotifyMonitor +}; + +enum SegmentChangeResult +{ + SegmentNotChanged, + SegmentAdvanceBufferRead +}; + +} // namespace + +//----------------------------------------------------------------------------- + +// this class is used to delay notifications until the end of a particular +// scope. it helps avoid the complexity of issuing callbacks while inside +// a critical section. +class nsPipeEvents +{ +public: + nsPipeEvents() { } + ~nsPipeEvents(); + + inline void NotifyInputReady(nsIAsyncInputStream* aStream, + nsIInputStreamCallback* aCallback) + { + mInputList.AppendElement(InputEntry(aStream, aCallback)); + } + + inline void NotifyOutputReady(nsIAsyncOutputStream* aStream, + nsIOutputStreamCallback* aCallback) + { + NS_ASSERTION(!mOutputCallback, "already have an output event"); + mOutputStream = aStream; + mOutputCallback = aCallback; + } + +private: + struct InputEntry + { + InputEntry(nsIAsyncInputStream* aStream, nsIInputStreamCallback* aCallback) + : mStream(aStream) + , mCallback(aCallback) + { + MOZ_ASSERT(mStream); + MOZ_ASSERT(mCallback); + } + + nsCOMPtr<nsIAsyncInputStream> mStream; + nsCOMPtr<nsIInputStreamCallback> mCallback; + }; + + nsTArray<InputEntry> mInputList; + + nsCOMPtr<nsIAsyncOutputStream> mOutputStream; + nsCOMPtr<nsIOutputStreamCallback> mOutputCallback; +}; + +//----------------------------------------------------------------------------- + +// This class is used to maintain input stream state. Its broken out from the +// nsPipeInputStream class because generally the nsPipe should be modifying +// this state and not the input stream itself. +struct nsPipeReadState +{ + nsPipeReadState() + : mReadCursor(nullptr) + , mReadLimit(nullptr) + , mSegment(0) + , mAvailable(0) + , mActiveRead(false) + , mNeedDrain(false) + { } + + char* mReadCursor; + char* mReadLimit; + int32_t mSegment; + uint32_t mAvailable; + + // This flag is managed using the AutoReadSegment RAII stack class. + bool mActiveRead; + + // Set to indicate that the input stream has closed and should be drained, + // but that drain has been delayed due to an active read. When the read + // completes, this flag indicate the drain should then be performed. + bool mNeedDrain; +}; + +//----------------------------------------------------------------------------- + +// an input end of a pipe (maintained as a list of refs within the pipe) +class nsPipeInputStream final + : public nsIAsyncInputStream + , public nsISeekableStream + , public nsISearchableInputStream + , public nsICloneableInputStream + , public nsIClassInfo + , public nsIBufferedInputStream +{ +public: + NS_DECL_THREADSAFE_ISUPPORTS + NS_DECL_NSIINPUTSTREAM + NS_DECL_NSIASYNCINPUTSTREAM + NS_DECL_NSISEEKABLESTREAM + NS_DECL_NSISEARCHABLEINPUTSTREAM + NS_DECL_NSICLONEABLEINPUTSTREAM + NS_DECL_NSICLASSINFO + NS_DECL_NSIBUFFEREDINPUTSTREAM + + explicit nsPipeInputStream(nsPipe* aPipe) + : mPipe(aPipe) + , mLogicalOffset(0) + , mInputStatus(NS_OK) + , mBlocking(true) + , mBlocked(false) + , mCallbackFlags(0) + { } + + explicit nsPipeInputStream(const nsPipeInputStream& aOther) + : mPipe(aOther.mPipe) + , mLogicalOffset(aOther.mLogicalOffset) + , mInputStatus(aOther.mInputStatus) + , mBlocking(aOther.mBlocking) + , mBlocked(false) + , mCallbackFlags(0) + , mReadState(aOther.mReadState) + { } + + nsresult Fill(); + void SetNonBlocking(bool aNonBlocking) + { + mBlocking = !aNonBlocking; + } + + uint32_t Available(); + + // synchronously wait for the pipe to become readable. + nsresult Wait(); + + // These two don't acquire the monitor themselves. Instead they + // expect their caller to have done so and to pass the monitor as + // evidence. + MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&, + const ReentrantMonitorAutoEnter& ev); + MonitorAction OnInputException(nsresult, nsPipeEvents&, + const ReentrantMonitorAutoEnter& ev); + + nsPipeReadState& ReadState() + { + return mReadState; + } + + const nsPipeReadState& ReadState() const + { + return mReadState; + } + + nsresult Status() const; + + // A version of Status() that doesn't acquire the monitor. + nsresult Status(const ReentrantMonitorAutoEnter& ev) const; + +private: + virtual ~nsPipeInputStream(); + + RefPtr<nsPipe> mPipe; + + int64_t mLogicalOffset; + // Individual input streams can be closed without effecting the rest of the + // pipe. So track individual input stream status separately. |mInputStatus| + // is protected by |mPipe->mReentrantMonitor|. + nsresult mInputStatus; + bool mBlocking; + + // these variables can only be accessed while inside the pipe's monitor + bool mBlocked; + nsCOMPtr<nsIInputStreamCallback> mCallback; + uint32_t mCallbackFlags; + + // requires pipe's monitor; usually treat as an opaque token to pass to nsPipe + nsPipeReadState mReadState; +}; + +//----------------------------------------------------------------------------- + +// the output end of a pipe (allocated as a member of the pipe). +class nsPipeOutputStream + : public nsIAsyncOutputStream + , public nsIClassInfo +{ +public: + // since this class will be allocated as a member of the pipe, we do not + // need our own ref count. instead, we share the lifetime (the ref count) + // of the entire pipe. this macro is just convenience since it does not + // declare a mRefCount variable; however, don't let the name fool you... + // we are not inheriting from nsPipe ;-) + NS_DECL_ISUPPORTS_INHERITED + + NS_DECL_NSIOUTPUTSTREAM + NS_DECL_NSIASYNCOUTPUTSTREAM + NS_DECL_NSICLASSINFO + + explicit nsPipeOutputStream(nsPipe* aPipe) + : mPipe(aPipe) + , mWriterRefCnt(0) + , mLogicalOffset(0) + , mBlocking(true) + , mBlocked(false) + , mWritable(true) + , mCallbackFlags(0) + { } + + void SetNonBlocking(bool aNonBlocking) + { + mBlocking = !aNonBlocking; + } + void SetWritable(bool aWritable) + { + mWritable = aWritable; + } + + // synchronously wait for the pipe to become writable. + nsresult Wait(); + + MonitorAction OnOutputWritable(nsPipeEvents&); + MonitorAction OnOutputException(nsresult, nsPipeEvents&); + +private: + nsPipe* mPipe; + + // separate refcnt so that we know when to close the producer + mozilla::ThreadSafeAutoRefCnt mWriterRefCnt; + int64_t mLogicalOffset; + bool mBlocking; + + // these variables can only be accessed while inside the pipe's monitor + bool mBlocked; + bool mWritable; + nsCOMPtr<nsIOutputStreamCallback> mCallback; + uint32_t mCallbackFlags; +}; + +//----------------------------------------------------------------------------- + +class nsPipe final : public nsIPipe +{ +public: + friend class nsPipeInputStream; + friend class nsPipeOutputStream; + friend class AutoReadSegment; + + NS_DECL_THREADSAFE_ISUPPORTS + NS_DECL_NSIPIPE + + // nsPipe methods: + nsPipe(); + +private: + ~nsPipe(); + + // + // Methods below may only be called while inside the pipe's monitor. Some + // of these methods require passing a ReentrantMonitorAutoEnter to prove the + // monitor is held. + // + + void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex, + char*& aCursor, char*& aLimit); + SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState, + const ReentrantMonitorAutoEnter &ev); + bool ReadSegmentBeingWritten(nsPipeReadState& aReadState); + uint32_t CountSegmentReferences(int32_t aSegment); + void SetAllNullReadCursors(); + bool AllReadCursorsMatchWriteCursor(); + void RollBackAllReadCursors(char* aWriteCursor); + void UpdateAllReadCursors(char* aWriteCursor); + void ValidateAllReadCursors(); + uint32_t GetBufferSegmentCount(const nsPipeReadState& aReadState, + const ReentrantMonitorAutoEnter& ev) const; + bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const; + + // + // methods below may be called while outside the pipe's monitor + // + + void DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents); + nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen); + void AdvanceWriteCursor(uint32_t aCount); + + void OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason); + void OnPipeException(nsresult aReason, bool aOutputOnly = false); + + nsresult CloneInputStream(nsPipeInputStream* aOriginal, + nsIInputStream** aCloneOut); + + // methods below should only be called by AutoReadSegment + nsresult GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment, + uint32_t& aLength); + void ReleaseReadSegment(nsPipeReadState& aReadState, + nsPipeEvents& aEvents); + void AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount); + + // We can't inherit from both nsIInputStream and nsIOutputStream + // because they collide on their Close method. Consequently we nest their + // implementations to avoid the extra object allocation. + nsPipeOutputStream mOutput; + + // Since the input stream can be cloned, we may have more than one. Use + // a weak reference as the streams will clear their entry here in their + // destructor. Using a strong reference would create a reference cycle. + // Only usable while mReentrantMonitor is locked. + nsTArray<nsPipeInputStream*> mInputList; + + // But hold a strong ref to our original input stream. For backward + // compatibility we need to be able to consistently return this same + // object from GetInputStream(). Note, mOriginalInput is also stored + // in mInputList as a weak ref. + RefPtr<nsPipeInputStream> mOriginalInput; + + ReentrantMonitor mReentrantMonitor; + nsSegmentedBuffer mBuffer; + + // The maximum number of segments to allow to be buffered in advance + // of the fastest reader. This is collection of segments is called + // the "advance buffer". + uint32_t mMaxAdvanceBufferSegmentCount; + + int32_t mWriteSegment; + char* mWriteCursor; + char* mWriteLimit; + + // |mStatus| is protected by |mReentrantMonitor|. + nsresult mStatus; + bool mInited; +}; + +//----------------------------------------------------------------------------- + +// RAII class representing an active read segment. When it goes out of scope +// it automatically updates the read cursor and releases the read segment. +class MOZ_STACK_CLASS AutoReadSegment final +{ +public: + AutoReadSegment(nsPipe* aPipe, nsPipeReadState& aReadState, + uint32_t aMaxLength) + : mPipe(aPipe) + , mReadState(aReadState) + , mStatus(NS_ERROR_FAILURE) + , mSegment(nullptr) + , mLength(0) + , mOffset(0) + { + MOZ_ASSERT(mPipe); + MOZ_ASSERT(!mReadState.mActiveRead); + mStatus = mPipe->GetReadSegment(mReadState, mSegment, mLength); + if (NS_SUCCEEDED(mStatus)) { + MOZ_ASSERT(mReadState.mActiveRead); + MOZ_ASSERT(mSegment); + mLength = std::min(mLength, aMaxLength); + MOZ_ASSERT(mLength); + } + } + + ~AutoReadSegment() + { + if (NS_SUCCEEDED(mStatus)) { + if (mOffset) { + mPipe->AdvanceReadCursor(mReadState, mOffset); + } else { + nsPipeEvents events; + mPipe->ReleaseReadSegment(mReadState, events); + } + } + MOZ_ASSERT(!mReadState.mActiveRead); + } + + nsresult Status() const + { + return mStatus; + } + + const char* Data() const + { + MOZ_ASSERT(NS_SUCCEEDED(mStatus)); + MOZ_ASSERT(mSegment); + return mSegment + mOffset; + } + + uint32_t Length() const + { + MOZ_ASSERT(NS_SUCCEEDED(mStatus)); + MOZ_ASSERT(mLength >= mOffset); + return mLength - mOffset; + } + + void + Advance(uint32_t aCount) + { + MOZ_ASSERT(NS_SUCCEEDED(mStatus)); + MOZ_ASSERT(aCount <= (mLength - mOffset)); + mOffset += aCount; + } + + nsPipeReadState& + ReadState() const + { + return mReadState; + } + +private: + // guaranteed to remain alive due to limited stack lifetime of AutoReadSegment + nsPipe* mPipe; + nsPipeReadState& mReadState; + nsresult mStatus; + const char* mSegment; + uint32_t mLength; + uint32_t mOffset; +}; + +// +// NOTES on buffer architecture: +// +// +-----------------+ - - mBuffer.GetSegment(0) +// | | +// + - - - - - - - - + - - nsPipeReadState.mReadCursor +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// +-----------------+ - - nsPipeReadState.mReadLimit +// | +// +-----------------+ +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// +-----------------+ +// | +// +-----------------+ - - mBuffer.GetSegment(mWriteSegment) +// |/////////////////| +// |/////////////////| +// |/////////////////| +// + - - - - - - - - + - - mWriteCursor +// | | +// | | +// +-----------------+ - - mWriteLimit +// +// (shaded region contains data) +// +// NOTE: Each input stream produced by the nsPipe contains its own, separate +// nsPipeReadState. This means there are multiple mReadCursor and +// mReadLimit values in play. The pipe cannot discard old data until +// all mReadCursors have moved beyond that point in the stream. +// +// Likewise, each input stream reader will have it's own amount of +// buffered data. The pipe size threshold, however, is only applied +// to the input stream that is being read fastest. We call this +// the "advance buffer" in that its in advance of all readers. We +// allow slower input streams to buffer more data so that we don't +// stall processing of the faster input stream. +// +// NOTE: on some systems (notably OS/2), the heap allocator uses an arena for +// small allocations (e.g., 64 byte allocations). this means that buffers may +// be allocated back-to-back. in the diagram above, for example, mReadLimit +// would actually be pointing at the beginning of the next segment. when +// making changes to this file, please keep this fact in mind. +// + +//----------------------------------------------------------------------------- +// nsPipe methods: +//----------------------------------------------------------------------------- + +nsPipe::nsPipe() + : mOutput(this) + , mOriginalInput(new nsPipeInputStream(this)) + , mReentrantMonitor("nsPipe.mReentrantMonitor") + , mMaxAdvanceBufferSegmentCount(0) + , mWriteSegment(-1) + , mWriteCursor(nullptr) + , mWriteLimit(nullptr) + , mStatus(NS_OK) + , mInited(false) +{ + mInputList.AppendElement(mOriginalInput); +} + +nsPipe::~nsPipe() +{ +} + +NS_IMPL_ADDREF(nsPipe) +NS_IMPL_QUERY_INTERFACE(nsPipe, nsIPipe) + +NS_IMETHODIMP_(MozExternalRefCountType) +nsPipe::Release() +{ + MOZ_ASSERT(int32_t(mRefCnt) > 0, "dup release"); + nsrefcnt count = --mRefCnt; + NS_LOG_RELEASE(this, count, "nsPipe"); + if (count == 0) { + delete (this); + return 0; + } + // Avoid racing on |mOriginalInput| by only looking at it when + // the refcount is 1, that is, we are the only pointer (hence only + // thread) to access it. + if (count == 1 && mOriginalInput) { + mOriginalInput = nullptr; + return 1; + } + return count; +} + +NS_IMETHODIMP +nsPipe::Init(bool aNonBlockingIn, + bool aNonBlockingOut, + uint32_t aSegmentSize, + uint32_t aSegmentCount) +{ + mInited = true; + + if (aSegmentSize == 0) { + aSegmentSize = DEFAULT_SEGMENT_SIZE; + } + if (aSegmentCount == 0) { + aSegmentCount = DEFAULT_SEGMENT_COUNT; + } + + // protect against overflow + uint32_t maxCount = uint32_t(-1) / aSegmentSize; + if (aSegmentCount > maxCount) { + aSegmentCount = maxCount; + } + + // The internal buffer is always "infinite" so that we can allow + // the size to expand when cloned streams are read at different + // rates. We enforce a limit on how much data can be buffered + // ahead of the fastest reader in GetWriteSegment(). + nsresult rv = mBuffer.Init(aSegmentSize, UINT32_MAX); + if (NS_FAILED(rv)) { + return rv; + } + + mMaxAdvanceBufferSegmentCount = aSegmentCount; + + mOutput.SetNonBlocking(aNonBlockingOut); + mOriginalInput->SetNonBlocking(aNonBlockingIn); + + return NS_OK; +} + +NS_IMETHODIMP +nsPipe::GetInputStream(nsIAsyncInputStream** aInputStream) +{ + if (NS_WARN_IF(!mInited)) { + return NS_ERROR_NOT_INITIALIZED; + } + RefPtr<nsPipeInputStream> ref = mOriginalInput; + ref.forget(aInputStream); + return NS_OK; +} + +NS_IMETHODIMP +nsPipe::GetOutputStream(nsIAsyncOutputStream** aOutputStream) +{ + if (NS_WARN_IF(!mInited)) { + return NS_ERROR_NOT_INITIALIZED; + } + NS_ADDREF(*aOutputStream = &mOutput); + return NS_OK; +} + +void +nsPipe::PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex, + char*& aCursor, char*& aLimit) +{ + if (aIndex == 0) { + NS_ASSERTION(!aReadState.mReadCursor || mBuffer.GetSegmentCount(), + "unexpected state"); + aCursor = aReadState.mReadCursor; + aLimit = aReadState.mReadLimit; + } else { + uint32_t absoluteIndex = aReadState.mSegment + aIndex; + uint32_t numSegments = mBuffer.GetSegmentCount(); + if (absoluteIndex >= numSegments) { + aCursor = aLimit = nullptr; + } else { + aCursor = mBuffer.GetSegment(absoluteIndex); + if (mWriteSegment == (int32_t)absoluteIndex) { + aLimit = mWriteCursor; + } else { + aLimit = aCursor + mBuffer.GetSegmentSize(); + } + } + } +} + +nsresult +nsPipe::GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment, + uint32_t& aLength) +{ + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + if (aReadState.mReadCursor == aReadState.mReadLimit) { + return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK; + } + + // The input stream locks the pipe while getting the buffer to read from, + // but then unlocks while actual data copying is taking place. In + // order to avoid deleting the buffer out from under this lockless read + // set a flag to indicate a read is active. This flag is only modified + // while the lock is held. + MOZ_ASSERT(!aReadState.mActiveRead); + aReadState.mActiveRead = true; + + aSegment = aReadState.mReadCursor; + aLength = aReadState.mReadLimit - aReadState.mReadCursor; + + return NS_OK; +} + +void +nsPipe::ReleaseReadSegment(nsPipeReadState& aReadState, nsPipeEvents& aEvents) +{ + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + MOZ_ASSERT(aReadState.mActiveRead); + aReadState.mActiveRead = false; + + // When a read completes and releases the mActiveRead flag, we may have blocked + // a drain from completing. This occurs when the input stream is closed during + // the read. In these cases, we need to complete the drain as soon as the + // active read completes. + if (aReadState.mNeedDrain) { + aReadState.mNeedDrain = false; + DrainInputStream(aReadState, aEvents); + } +} + +void +nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aBytesRead) +{ + NS_ASSERTION(aBytesRead, "don't call if no bytes read"); + + nsPipeEvents events; + { + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + LOG(("III advancing read cursor by %u\n", aBytesRead)); + NS_ASSERTION(aBytesRead <= mBuffer.GetSegmentSize(), "read too much"); + + aReadState.mReadCursor += aBytesRead; + NS_ASSERTION(aReadState.mReadCursor <= aReadState.mReadLimit, + "read cursor exceeds limit"); + + MOZ_ASSERT(aReadState.mAvailable >= aBytesRead); + aReadState.mAvailable -= aBytesRead; + + // Check to see if we're at the end of the available read data. If we + // are, and this segment is not still being written, then we can possibly + // free up the segment. + if (aReadState.mReadCursor == aReadState.mReadLimit && + !ReadSegmentBeingWritten(aReadState)) { + + // Advance the segment position. If we have read any segments from the + // advance buffer then we can potentially notify blocked writers. + if (AdvanceReadSegment(aReadState, mon) == SegmentAdvanceBufferRead && + mOutput.OnOutputWritable(events) == NotifyMonitor) { + mon.NotifyAll(); + } + } + + ReleaseReadSegment(aReadState, events); + } +} + +SegmentChangeResult +nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState, + const ReentrantMonitorAutoEnter &ev) +{ + // Calculate how many segments are buffered for this stream to start. + uint32_t startBufferSegments = GetBufferSegmentCount(aReadState, ev); + + int32_t currentSegment = aReadState.mSegment; + + // Move to the next segment to read + aReadState.mSegment += 1; + + // If this was the last reference to the first segment, then remove it. + if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) { + + // shift write and read segment index (-1 indicates an empty buffer). + mWriteSegment -= 1; + + // Directly modify the current read state. If the associated input + // stream is closed simultaneous with reading, then it may not be + // in the mInputList any more. + aReadState.mSegment -= 1; + + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + // Skip the current read state structure since we modify it manually + // before entering this loop. + if (&mInputList[i]->ReadState() == &aReadState) { + continue; + } + mInputList[i]->ReadState().mSegment -= 1; + } + + // done with this segment + mBuffer.DeleteFirstSegment(); + LOG(("III deleting first segment\n")); + } + + if (mWriteSegment < aReadState.mSegment) { + // read cursor has hit the end of written data, so reset it + MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1)); + aReadState.mReadCursor = nullptr; + aReadState.mReadLimit = nullptr; + // also, the buffer is completely empty, so reset the write cursor + if (mWriteSegment == -1) { + mWriteCursor = nullptr; + mWriteLimit = nullptr; + } + } else { + // advance read cursor and limit to next buffer segment + aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment); + if (mWriteSegment == aReadState.mSegment) { + aReadState.mReadLimit = mWriteCursor; + } else { + aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize(); + } + } + + // Calculate how many segments are buffered for the stream after + // reading. + uint32_t endBufferSegments = GetBufferSegmentCount(aReadState, ev); + + // If the stream has read a segment out of the set of advanced buffer + // segments, then the writer may advance. + if (startBufferSegments >= mMaxAdvanceBufferSegmentCount && + endBufferSegments < mMaxAdvanceBufferSegmentCount) { + return SegmentAdvanceBufferRead; + } + + // Otherwise there are no significant changes to the segment structure. + return SegmentNotChanged; +} + +void +nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents) +{ + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + // If a segment is actively being read in ReadSegments() for this input + // stream, then we cannot drain the stream. This can happen because + // ReadSegments() does not hold the lock while copying from the buffer. + // If we detect this condition, simply note that we need a drain once + // the read completes and return immediately. + if (aReadState.mActiveRead) { + MOZ_ASSERT(!aReadState.mNeedDrain); + aReadState.mNeedDrain = true; + return; + } + + aReadState.mAvailable = 0; + + while(mWriteSegment >= aReadState.mSegment) { + + // If the last segment to free is still being written to, we're done + // draining. We can't free any more. + if (ReadSegmentBeingWritten(aReadState)) { + break; + } + + // Don't bother checking if this results in an advance buffer segment + // read. Since we are draining the entire stream we will read an + // advance buffer segment no matter what. + AdvanceReadSegment(aReadState, mon); + } + + // If we have read any segments from the advance buffer then we can + // potentially notify blocked writers. + if (!IsAdvanceBufferFull(mon) && + mOutput.OnOutputWritable(aEvents) == NotifyMonitor) { + mon.NotifyAll(); + } +} + +bool +nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + bool beingWritten = mWriteSegment == aReadState.mSegment && + mWriteLimit > mWriteCursor; + NS_ASSERTION(!beingWritten || aReadState.mReadLimit == mWriteCursor, + "unexpected state"); + return beingWritten; +} + +nsresult +nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen) +{ + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + if (NS_FAILED(mStatus)) { + return mStatus; + } + + // write cursor and limit may both be null indicating an empty buffer. + if (mWriteCursor == mWriteLimit) { + // The pipe is full if we have hit our limit on advance data buffering. + // This means the fastest reader is still reading slower than data is + // being written into the pipe. + if (IsAdvanceBufferFull(mon)) { + return NS_BASE_STREAM_WOULD_BLOCK; + } + + // The nsSegmentedBuffer is configured to be "infinite", so this + // should never return nullptr here. + char* seg = mBuffer.AppendNewSegment(); + if (!seg) { + return NS_ERROR_OUT_OF_MEMORY; + } + + LOG(("OOO appended new segment\n")); + mWriteCursor = seg; + mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize(); + ++mWriteSegment; + } + + // make sure read cursor is initialized + SetAllNullReadCursors(); + + // check to see if we can roll-back our read and write cursors to the + // beginning of the current/first segment. this is purely an optimization. + if (mWriteSegment == 0 && AllReadCursorsMatchWriteCursor()) { + char* head = mBuffer.GetSegment(0); + LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head)); + RollBackAllReadCursors(head); + mWriteCursor = head; + } + + aSegment = mWriteCursor; + aSegmentLen = mWriteLimit - mWriteCursor; + return NS_OK; +} + +void +nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten) +{ + NS_ASSERTION(aBytesWritten, "don't call if no bytes written"); + + nsPipeEvents events; + { + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + LOG(("OOO advancing write cursor by %u\n", aBytesWritten)); + + char* newWriteCursor = mWriteCursor + aBytesWritten; + NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit"); + + // update read limit if reading in the same segment + UpdateAllReadCursors(newWriteCursor); + + mWriteCursor = newWriteCursor; + + ValidateAllReadCursors(); + + // update the writable flag on the output stream + if (mWriteCursor == mWriteLimit) { + mOutput.SetWritable(!IsAdvanceBufferFull(mon)); + } + + // notify input stream that pipe now contains additional data + bool needNotify = false; + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + if (mInputList[i]->OnInputReadable(aBytesWritten, events, mon) + == NotifyMonitor) { + needNotify = true; + } + } + + if (needNotify) { + mon.NotifyAll(); + } + } +} + +void +nsPipe::OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason) +{ + MOZ_ASSERT(NS_FAILED(aReason)); + + nsPipeEvents events; + { + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + // Its possible to re-enter this method when we call OnPipeException() or + // OnInputExection() below. If there is a caller stuck in our synchronous + // Wait() method, then they will get woken up with a failure code which + // re-enters this method. Therefore, gracefully handle unknown streams + // here. + + // If we only have one stream open and it is the given stream, then shut + // down the entire pipe. + if (mInputList.Length() == 1) { + if (mInputList[0] == aStream) { + OnPipeException(aReason); + } + return; + } + + // Otherwise just close the particular stream that hit an exception. + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + if (mInputList[i] != aStream) { + continue; + } + + MonitorAction action = mInputList[i]->OnInputException(aReason, events, + mon); + mInputList.RemoveElementAt(i); + + // Notify after element is removed in case we re-enter as a result. + if (action == NotifyMonitor) { + mon.NotifyAll(); + } + + return; + } + } +} + +void +nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly) +{ + LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n", + aReason, aOutputOnly)); + + nsPipeEvents events; + { + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + // if we've already hit an exception, then ignore this one. + if (NS_FAILED(mStatus)) { + return; + } + + mStatus = aReason; + + bool needNotify = false; + + nsTArray<nsPipeInputStream*> tmpInputList; + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + // an output-only exception applies to the input end if the pipe has + // zero bytes available. + if (aOutputOnly && mInputList[i]->Available()) { + tmpInputList.AppendElement(mInputList[i]); + continue; + } + + if (mInputList[i]->OnInputException(aReason, events, mon) + == NotifyMonitor) { + needNotify = true; + } + } + mInputList = tmpInputList; + + if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) { + needNotify = true; + } + + // Notify after we have removed any input streams from mInputList + if (needNotify) { + mon.NotifyAll(); + } + } +} + +nsresult +nsPipe::CloneInputStream(nsPipeInputStream* aOriginal, + nsIInputStream** aCloneOut) +{ + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + RefPtr<nsPipeInputStream> ref = new nsPipeInputStream(*aOriginal); + mInputList.AppendElement(ref); + nsCOMPtr<nsIAsyncInputStream> downcast = ref.forget(); + downcast.forget(aCloneOut); + return NS_OK; +} + +uint32_t +nsPipe::CountSegmentReferences(int32_t aSegment) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + uint32_t count = 0; + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + if (aSegment >= mInputList[i]->ReadState().mSegment) { + count += 1; + } + } + return count; +} + +void +nsPipe::SetAllNullReadCursors() +{ + mReentrantMonitor.AssertCurrentThreadIn(); + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + nsPipeReadState& readState = mInputList[i]->ReadState(); + if (!readState.mReadCursor) { + NS_ASSERTION(mWriteSegment == readState.mSegment, + "unexpected null read cursor"); + readState.mReadCursor = readState.mReadLimit = mWriteCursor; + } + } +} + +bool +nsPipe::AllReadCursorsMatchWriteCursor() +{ + mReentrantMonitor.AssertCurrentThreadIn(); + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + const nsPipeReadState& readState = mInputList[i]->ReadState(); + if (readState.mSegment != mWriteSegment || + readState.mReadCursor != mWriteCursor) { + return false; + } + } + return true; +} + +void +nsPipe::RollBackAllReadCursors(char* aWriteCursor) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + nsPipeReadState& readState = mInputList[i]->ReadState(); + MOZ_ASSERT(mWriteSegment == readState.mSegment); + MOZ_ASSERT(mWriteCursor == readState.mReadCursor); + MOZ_ASSERT(mWriteCursor == readState.mReadLimit); + readState.mReadCursor = aWriteCursor; + readState.mReadLimit = aWriteCursor; + } +} + +void +nsPipe::UpdateAllReadCursors(char* aWriteCursor) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + nsPipeReadState& readState = mInputList[i]->ReadState(); + if (mWriteSegment == readState.mSegment && + readState.mReadLimit == mWriteCursor) { + readState.mReadLimit = aWriteCursor; + } + } +} + +void +nsPipe::ValidateAllReadCursors() +{ + mReentrantMonitor.AssertCurrentThreadIn(); + // The only way mReadCursor == mWriteCursor is if: + // + // - mReadCursor is at the start of a segment (which, based on how + // nsSegmentedBuffer works, means that this segment is the "first" + // segment) + // - mWriteCursor points at the location past the end of the current + // write segment (so the current write filled the current write + // segment, so we've incremented mWriteCursor to point past the end + // of it) + // - the segment to which data has just been written is located + // exactly one segment's worth of bytes before the first segment + // where mReadCursor is located + // + // Consequently, the byte immediately after the end of the current + // write segment is the first byte of the first segment, so + // mReadCursor == mWriteCursor. (Another way to think about this is + // to consider the buffer architecture diagram above, but consider it + // with an arena allocator which allocates from the *end* of the + // arena to the *beginning* of the arena.) +#ifdef DEBUG + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + const nsPipeReadState& state = mInputList[i]->ReadState(); + NS_ASSERTION(state.mReadCursor != mWriteCursor || + (mBuffer.GetSegment(state.mSegment) == state.mReadCursor && + mWriteCursor == mWriteLimit), + "read cursor is bad"); + } +#endif +} + +uint32_t +nsPipe::GetBufferSegmentCount(const nsPipeReadState& aReadState, + const ReentrantMonitorAutoEnter& ev) const +{ + // The write segment can be smaller than the current reader position + // in some cases. For example, when the first write segment has not + // been allocated yet mWriteSegment is negative. In these cases + // the stream is effectively using zero segments. + if (mWriteSegment < aReadState.mSegment) { + return 0; + } + + MOZ_ASSERT(mWriteSegment >= 0); + MOZ_ASSERT(aReadState.mSegment >= 0); + + // Otherwise at least one segment is being used. We add one here + // since a single segment is being used when the write and read + // segment indices are the same. + return 1 + mWriteSegment - aReadState.mSegment; +} + +bool +nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const +{ + // If we have fewer total segments than the limit we can immediately + // determine we are not full. Note, we must add one to mWriteSegment + // to convert from a index to a count. + MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= -1); + MOZ_DIAGNOSTIC_ASSERT(mWriteSegment < INT32_MAX); + uint32_t totalWriteSegments = mWriteSegment + 1; + if (totalWriteSegments < mMaxAdvanceBufferSegmentCount) { + return false; + } + + // Otherwise we must inspect all of our reader streams. We need + // to determine the buffer depth of the fastest reader. + uint32_t minBufferSegments = UINT32_MAX; + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + // Only count buffer segments from input streams that are open. + if (NS_FAILED(mInputList[i]->Status(ev))) { + continue; + } + const nsPipeReadState& state = mInputList[i]->ReadState(); + uint32_t bufferSegments = GetBufferSegmentCount(state, ev); + minBufferSegments = std::min(minBufferSegments, bufferSegments); + // We only care if any reader has fewer segments buffered than + // our threshold. We can stop once we hit that threshold. + if (minBufferSegments < mMaxAdvanceBufferSegmentCount) { + return false; + } + } + + // Note, its possible for minBufferSegments to exceed our + // mMaxAdvanceBufferSegmentCount here. This happens when a cloned + // reader gets far behind, but then the fastest reader stream is + // closed. This leaves us with a single stream that is buffered + // beyond our max. Naturally we continue to indicate the pipe + // is full at this point. + + return true; +} + +//----------------------------------------------------------------------------- +// nsPipeEvents methods: +//----------------------------------------------------------------------------- + +nsPipeEvents::~nsPipeEvents() +{ + // dispatch any pending events + + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + mInputList[i].mCallback->OnInputStreamReady(mInputList[i].mStream); + } + mInputList.Clear(); + + if (mOutputCallback) { + mOutputCallback->OnOutputStreamReady(mOutputStream); + mOutputCallback = nullptr; + mOutputStream = nullptr; + } +} + +//----------------------------------------------------------------------------- +// nsPipeInputStream methods: +//----------------------------------------------------------------------------- + +NS_IMPL_ADDREF(nsPipeInputStream); +NS_IMPL_RELEASE(nsPipeInputStream); + +NS_INTERFACE_TABLE_HEAD(nsPipeInputStream) + NS_INTERFACE_TABLE_BEGIN + NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIAsyncInputStream) + NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISeekableStream) + NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISearchableInputStream) + NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsICloneableInputStream) + NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIBufferedInputStream) + NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIClassInfo) + NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsIInputStream, + nsIAsyncInputStream) + NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsISupports, + nsIAsyncInputStream) + NS_INTERFACE_TABLE_END +NS_INTERFACE_TABLE_TAIL + +NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream, + nsIInputStream, + nsIAsyncInputStream, + nsISeekableStream, + nsISearchableInputStream, + nsICloneableInputStream, + nsIBufferedInputStream) + +NS_IMPL_THREADSAFE_CI(nsPipeInputStream) + +NS_IMETHODIMP +nsPipeInputStream::Init(nsIInputStream*, uint32_t) +{ + MOZ_CRASH("nsPipeInputStream should never be initialized with " + "nsIBufferedInputStream::Init!\n"); +} + +uint32_t +nsPipeInputStream::Available() +{ + mPipe->mReentrantMonitor.AssertCurrentThreadIn(); + return mReadState.mAvailable; +} + +nsresult +nsPipeInputStream::Wait() +{ + NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream"); + + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + while (NS_SUCCEEDED(Status(mon)) && (mReadState.mAvailable == 0)) { + LOG(("III pipe input: waiting for data\n")); + + mBlocked = true; + mon.Wait(); + mBlocked = false; + + LOG(("III pipe input: woke up [status=%x available=%u]\n", + Status(mon), mReadState.mAvailable)); + } + + return Status(mon) == NS_BASE_STREAM_CLOSED ? NS_OK : Status(mon); +} + +MonitorAction +nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, + nsPipeEvents& aEvents, + const ReentrantMonitorAutoEnter& ev) +{ + MonitorAction result = DoNotNotifyMonitor; + + mPipe->mReentrantMonitor.AssertCurrentThreadIn(); + mReadState.mAvailable += aBytesWritten; + + if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { + aEvents.NotifyInputReady(this, mCallback); + mCallback = nullptr; + mCallbackFlags = 0; + } else if (mBlocked) { + result = NotifyMonitor; + } + + return result; +} + +MonitorAction +nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents, + const ReentrantMonitorAutoEnter& ev) +{ + LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n", + this, aReason)); + + MonitorAction result = DoNotNotifyMonitor; + + NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception"); + + if (NS_SUCCEEDED(mInputStatus)) { + mInputStatus = aReason; + } + + // force count of available bytes to zero. + mPipe->DrainInputStream(mReadState, aEvents); + + if (mCallback) { + aEvents.NotifyInputReady(this, mCallback); + mCallback = nullptr; + mCallbackFlags = 0; + } else if (mBlocked) { + result = NotifyMonitor; + } + + return result; +} + +NS_IMETHODIMP +nsPipeInputStream::CloseWithStatus(nsresult aReason) +{ + LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, aReason)); + + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + if (NS_FAILED(mInputStatus)) { + return NS_OK; + } + + if (NS_SUCCEEDED(aReason)) { + aReason = NS_BASE_STREAM_CLOSED; + } + + mPipe->OnInputStreamException(this, aReason); + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::Close() +{ + return CloseWithStatus(NS_BASE_STREAM_CLOSED); +} + +NS_IMETHODIMP +nsPipeInputStream::Available(uint64_t* aResult) +{ + // nsPipeInputStream supports under 4GB stream only + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + // return error if closed + if (!mReadState.mAvailable && NS_FAILED(Status(mon))) { + return Status(mon); + } + + *aResult = (uint64_t)mReadState.mAvailable; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter, + void* aClosure, + uint32_t aCount, + uint32_t* aReadCount) +{ + LOG(("III ReadSegments [this=%x count=%u]\n", this, aCount)); + + nsresult rv = NS_OK; + + *aReadCount = 0; + while (aCount) { + AutoReadSegment segment(mPipe, mReadState, aCount); + rv = segment.Status(); + if (NS_FAILED(rv)) { + // ignore this error if we've already read something. + if (*aReadCount > 0) { + rv = NS_OK; + break; + } + if (rv == NS_BASE_STREAM_WOULD_BLOCK) { + // pipe is empty + if (!mBlocking) { + break; + } + // wait for some data to be written to the pipe + rv = Wait(); + if (NS_SUCCEEDED(rv)) { + continue; + } + } + // ignore this error, just return. + if (rv == NS_BASE_STREAM_CLOSED) { + rv = NS_OK; + break; + } + mPipe->OnInputStreamException(this, rv); + break; + } + + uint32_t writeCount; + while (segment.Length()) { + writeCount = 0; + + rv = aWriter(static_cast<nsIAsyncInputStream*>(this), aClosure, + segment.Data(), *aReadCount, segment.Length(), &writeCount); + + if (NS_FAILED(rv) || writeCount == 0) { + aCount = 0; + // any errors returned from the writer end here: do not + // propagate to the caller of ReadSegments. + rv = NS_OK; + break; + } + + NS_ASSERTION(writeCount <= segment.Length(), "wrote more than expected"); + segment.Advance(writeCount); + aCount -= writeCount; + *aReadCount += writeCount; + mLogicalOffset += writeCount; + } + } + + return rv; +} + +NS_IMETHODIMP +nsPipeInputStream::Read(char* aToBuf, uint32_t aBufLen, uint32_t* aReadCount) +{ + return ReadSegments(NS_CopySegmentToBuffer, aToBuf, aBufLen, aReadCount); +} + +NS_IMETHODIMP +nsPipeInputStream::IsNonBlocking(bool* aNonBlocking) +{ + *aNonBlocking = !mBlocking; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::AsyncWait(nsIInputStreamCallback* aCallback, + uint32_t aFlags, + uint32_t aRequestedCount, + nsIEventTarget* aTarget) +{ + LOG(("III AsyncWait [this=%x]\n", this)); + + nsPipeEvents pipeEvents; + { + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + // replace a pending callback + mCallback = nullptr; + mCallbackFlags = 0; + + if (!aCallback) { + return NS_OK; + } + + nsCOMPtr<nsIInputStreamCallback> proxy; + if (aTarget) { + proxy = NS_NewInputStreamReadyEvent(aCallback, aTarget); + aCallback = proxy; + } + + if (NS_FAILED(Status(mon)) || + (mReadState.mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) { + // stream is already closed or readable; post event. + pipeEvents.NotifyInputReady(this, aCallback); + } else { + // queue up callback object to be notified when data becomes available + mCallback = aCallback; + mCallbackFlags = aFlags; + } + } + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::Seek(int32_t aWhence, int64_t aOffset) +{ + NS_NOTREACHED("nsPipeInputStream::Seek"); + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP +nsPipeInputStream::Tell(int64_t* aOffset) +{ + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + // return error if closed + if (!mReadState.mAvailable && NS_FAILED(Status(mon))) { + return Status(mon); + } + + *aOffset = mLogicalOffset; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::SetEOF() +{ + NS_NOTREACHED("nsPipeInputStream::SetEOF"); + return NS_ERROR_NOT_IMPLEMENTED; +} + +static bool strings_equal(bool aIgnoreCase, + const char* aS1, const char* aS2, uint32_t aLen) +{ + return aIgnoreCase + ? !nsCRT::strncasecmp(aS1, aS2, aLen) : !nsCRT::strncmp(aS1, aS2, aLen); +} + +NS_IMETHODIMP +nsPipeInputStream::Search(const char* aForString, + bool aIgnoreCase, + bool* aFound, + uint32_t* aOffsetSearchedTo) +{ + LOG(("III Search [for=%s ic=%u]\n", aForString, aIgnoreCase)); + + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + char* cursor1; + char* limit1; + uint32_t index = 0, offset = 0; + uint32_t strLen = strlen(aForString); + + mPipe->PeekSegment(mReadState, 0, cursor1, limit1); + if (cursor1 == limit1) { + *aFound = false; + *aOffsetSearchedTo = 0; + LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo)); + return NS_OK; + } + + while (true) { + uint32_t i, len1 = limit1 - cursor1; + + // check if the string is in the buffer segment + for (i = 0; i < len1 - strLen + 1; i++) { + if (strings_equal(aIgnoreCase, &cursor1[i], aForString, strLen)) { + *aFound = true; + *aOffsetSearchedTo = offset + i; + LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo)); + return NS_OK; + } + } + + // get the next segment + char* cursor2; + char* limit2; + uint32_t len2; + + index++; + offset += len1; + + mPipe->PeekSegment(mReadState, index, cursor2, limit2); + if (cursor2 == limit2) { + *aFound = false; + *aOffsetSearchedTo = offset - strLen + 1; + LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo)); + return NS_OK; + } + len2 = limit2 - cursor2; + + // check if the string is straddling the next buffer segment + uint32_t lim = XPCOM_MIN(strLen, len2 + 1); + for (i = 0; i < lim; ++i) { + uint32_t strPart1Len = strLen - i - 1; + uint32_t strPart2Len = strLen - strPart1Len; + const char* strPart2 = &aForString[strLen - strPart2Len]; + uint32_t bufSeg1Offset = len1 - strPart1Len; + if (strings_equal(aIgnoreCase, &cursor1[bufSeg1Offset], aForString, strPart1Len) && + strings_equal(aIgnoreCase, cursor2, strPart2, strPart2Len)) { + *aFound = true; + *aOffsetSearchedTo = offset - strPart1Len; + LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo)); + return NS_OK; + } + } + + // finally continue with the next buffer + cursor1 = cursor2; + limit1 = limit2; + } + + NS_NOTREACHED("can't get here"); + return NS_ERROR_UNEXPECTED; // keep compiler happy +} + +NS_IMETHODIMP +nsPipeInputStream::GetCloneable(bool* aCloneableOut) +{ + *aCloneableOut = true; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::Clone(nsIInputStream** aCloneOut) +{ + return mPipe->CloneInputStream(this, aCloneOut); +} + +nsresult +nsPipeInputStream::Status(const ReentrantMonitorAutoEnter& ev) const +{ + if (NS_FAILED(mInputStatus)) { + return mInputStatus; + } + + if (mReadState.mAvailable) { + // Still something to read and this input stream state is OK. + return NS_OK; + } + + // Nothing to read, just fall through to the pipe's state that + // may reflect state of its output stream side (already closed). + return mPipe->mStatus; +} + +nsresult +nsPipeInputStream::Status() const +{ + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + return Status(mon); +} + +nsPipeInputStream::~nsPipeInputStream() +{ + Close(); +} + +//----------------------------------------------------------------------------- +// nsPipeOutputStream methods: +//----------------------------------------------------------------------------- + +NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream, + nsIOutputStream, + nsIAsyncOutputStream, + nsIClassInfo) + +NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream, + nsIOutputStream, + nsIAsyncOutputStream) + +NS_IMPL_THREADSAFE_CI(nsPipeOutputStream) + +nsresult +nsPipeOutputStream::Wait() +{ + NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream"); + + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) { + LOG(("OOO pipe output: waiting for space\n")); + mBlocked = true; + mon.Wait(); + mBlocked = false; + LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n", + mPipe->mStatus, mWritable)); + } + + return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; +} + +MonitorAction +nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents) +{ + MonitorAction result = DoNotNotifyMonitor; + + mWritable = true; + + if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { + aEvents.NotifyOutputReady(this, mCallback); + mCallback = nullptr; + mCallbackFlags = 0; + } else if (mBlocked) { + result = NotifyMonitor; + } + + return result; +} + +MonitorAction +nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents) +{ + LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n", + this, aReason)); + + MonitorAction result = DoNotNotifyMonitor; + + NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception"); + mWritable = false; + + if (mCallback) { + aEvents.NotifyOutputReady(this, mCallback); + mCallback = nullptr; + mCallbackFlags = 0; + } else if (mBlocked) { + result = NotifyMonitor; + } + + return result; +} + + +NS_IMETHODIMP_(MozExternalRefCountType) +nsPipeOutputStream::AddRef() +{ + ++mWriterRefCnt; + return mPipe->AddRef(); +} + +NS_IMETHODIMP_(MozExternalRefCountType) +nsPipeOutputStream::Release() +{ + if (--mWriterRefCnt == 0) { + Close(); + } + return mPipe->Release(); +} + +NS_IMETHODIMP +nsPipeOutputStream::CloseWithStatus(nsresult aReason) +{ + LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, aReason)); + + if (NS_SUCCEEDED(aReason)) { + aReason = NS_BASE_STREAM_CLOSED; + } + + // input stream may remain open + mPipe->OnPipeException(aReason, true); + return NS_OK; +} + +NS_IMETHODIMP +nsPipeOutputStream::Close() +{ + return CloseWithStatus(NS_BASE_STREAM_CLOSED); +} + +NS_IMETHODIMP +nsPipeOutputStream::WriteSegments(nsReadSegmentFun aReader, + void* aClosure, + uint32_t aCount, + uint32_t* aWriteCount) +{ + LOG(("OOO WriteSegments [this=%x count=%u]\n", this, aCount)); + + nsresult rv = NS_OK; + + char* segment; + uint32_t segmentLen; + + *aWriteCount = 0; + while (aCount) { + rv = mPipe->GetWriteSegment(segment, segmentLen); + if (NS_FAILED(rv)) { + if (rv == NS_BASE_STREAM_WOULD_BLOCK) { + // pipe is full + if (!mBlocking) { + // ignore this error if we've already written something + if (*aWriteCount > 0) { + rv = NS_OK; + } + break; + } + // wait for the pipe to have an empty segment. + rv = Wait(); + if (NS_SUCCEEDED(rv)) { + continue; + } + } + mPipe->OnPipeException(rv); + break; + } + + // write no more than aCount + if (segmentLen > aCount) { + segmentLen = aCount; + } + + uint32_t readCount, originalLen = segmentLen; + while (segmentLen) { + readCount = 0; + + rv = aReader(this, aClosure, segment, *aWriteCount, segmentLen, &readCount); + + if (NS_FAILED(rv) || readCount == 0) { + aCount = 0; + // any errors returned from the aReader end here: do not + // propagate to the caller of WriteSegments. + rv = NS_OK; + break; + } + + NS_ASSERTION(readCount <= segmentLen, "read more than expected"); + segment += readCount; + segmentLen -= readCount; + aCount -= readCount; + *aWriteCount += readCount; + mLogicalOffset += readCount; + } + + if (segmentLen < originalLen) { + mPipe->AdvanceWriteCursor(originalLen - segmentLen); + } + } + + return rv; +} + +static nsresult +nsReadFromRawBuffer(nsIOutputStream* aOutStr, + void* aClosure, + char* aToRawSegment, + uint32_t aOffset, + uint32_t aCount, + uint32_t* aReadCount) +{ + const char* fromBuf = (const char*)aClosure; + memcpy(aToRawSegment, &fromBuf[aOffset], aCount); + *aReadCount = aCount; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeOutputStream::Write(const char* aFromBuf, + uint32_t aBufLen, + uint32_t* aWriteCount) +{ + return WriteSegments(nsReadFromRawBuffer, (void*)aFromBuf, aBufLen, aWriteCount); +} + +NS_IMETHODIMP +nsPipeOutputStream::Flush(void) +{ + // nothing to do + return NS_OK; +} + +static nsresult +nsReadFromInputStream(nsIOutputStream* aOutStr, + void* aClosure, + char* aToRawSegment, + uint32_t aOffset, + uint32_t aCount, + uint32_t* aReadCount) +{ + nsIInputStream* fromStream = (nsIInputStream*)aClosure; + return fromStream->Read(aToRawSegment, aCount, aReadCount); +} + +NS_IMETHODIMP +nsPipeOutputStream::WriteFrom(nsIInputStream* aFromStream, + uint32_t aCount, + uint32_t* aWriteCount) +{ + return WriteSegments(nsReadFromInputStream, aFromStream, aCount, aWriteCount); +} + +NS_IMETHODIMP +nsPipeOutputStream::IsNonBlocking(bool* aNonBlocking) +{ + *aNonBlocking = !mBlocking; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback* aCallback, + uint32_t aFlags, + uint32_t aRequestedCount, + nsIEventTarget* aTarget) +{ + LOG(("OOO AsyncWait [this=%x]\n", this)); + + nsPipeEvents pipeEvents; + { + ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); + + // replace a pending callback + mCallback = nullptr; + mCallbackFlags = 0; + + if (!aCallback) { + return NS_OK; + } + + nsCOMPtr<nsIOutputStreamCallback> proxy; + if (aTarget) { + proxy = NS_NewOutputStreamReadyEvent(aCallback, aTarget); + aCallback = proxy; + } + + if (NS_FAILED(mPipe->mStatus) || + (mWritable && !(aFlags & WAIT_CLOSURE_ONLY))) { + // stream is already closed or writable; post event. + pipeEvents.NotifyOutputReady(this, aCallback); + } else { + // queue up callback object to be notified when data becomes available + mCallback = aCallback; + mCallbackFlags = aFlags; + } + } + return NS_OK; +} + +//////////////////////////////////////////////////////////////////////////////// + +nsresult +NS_NewPipe(nsIInputStream** aPipeIn, + nsIOutputStream** aPipeOut, + uint32_t aSegmentSize, + uint32_t aMaxSize, + bool aNonBlockingInput, + bool aNonBlockingOutput) +{ + if (aSegmentSize == 0) { + aSegmentSize = DEFAULT_SEGMENT_SIZE; + } + + // Handle aMaxSize of UINT32_MAX as a special case + uint32_t segmentCount; + if (aMaxSize == UINT32_MAX) { + segmentCount = UINT32_MAX; + } else { + segmentCount = aMaxSize / aSegmentSize; + } + + nsIAsyncInputStream* in; + nsIAsyncOutputStream* out; + nsresult rv = NS_NewPipe2(&in, &out, aNonBlockingInput, aNonBlockingOutput, + aSegmentSize, segmentCount); + if (NS_FAILED(rv)) { + return rv; + } + + *aPipeIn = in; + *aPipeOut = out; + return NS_OK; +} + +nsresult +NS_NewPipe2(nsIAsyncInputStream** aPipeIn, + nsIAsyncOutputStream** aPipeOut, + bool aNonBlockingInput, + bool aNonBlockingOutput, + uint32_t aSegmentSize, + uint32_t aSegmentCount) +{ + nsPipe* pipe = new nsPipe(); + nsresult rv = pipe->Init(aNonBlockingInput, + aNonBlockingOutput, + aSegmentSize, + aSegmentCount); + if (NS_FAILED(rv)) { + NS_ADDREF(pipe); + NS_RELEASE(pipe); + return rv; + } + + // These always succeed because the pipe is initialized above. + MOZ_ALWAYS_SUCCEEDS(pipe->GetInputStream(aPipeIn)); + MOZ_ALWAYS_SUCCEEDS(pipe->GetOutputStream(aPipeOut)); + return NS_OK; +} + +nsresult +nsPipeConstructor(nsISupports* aOuter, REFNSIID aIID, void** aResult) +{ + if (aOuter) { + return NS_ERROR_NO_AGGREGATION; + } + nsPipe* pipe = new nsPipe(); + NS_ADDREF(pipe); + nsresult rv = pipe->QueryInterface(aIID, aResult); + NS_RELEASE(pipe); + return rv; +} + +//////////////////////////////////////////////////////////////////////////////// |