summaryrefslogtreecommitdiffstats
path: root/netwerk/base/nsInputStreamPump.cpp
diff options
context:
space:
mode:
authorMatt A. Tobin <mattatobin@localhost.localdomain>2018-02-02 04:16:08 -0500
committerMatt A. Tobin <mattatobin@localhost.localdomain>2018-02-02 04:16:08 -0500
commit5f8de423f190bbb79a62f804151bc24824fa32d8 (patch)
tree10027f336435511475e392454359edea8e25895d /netwerk/base/nsInputStreamPump.cpp
parent49ee0794b5d912db1f95dce6eb52d781dc210db5 (diff)
downloadUXP-5f8de423f190bbb79a62f804151bc24824fa32d8.tar
UXP-5f8de423f190bbb79a62f804151bc24824fa32d8.tar.gz
UXP-5f8de423f190bbb79a62f804151bc24824fa32d8.tar.lz
UXP-5f8de423f190bbb79a62f804151bc24824fa32d8.tar.xz
UXP-5f8de423f190bbb79a62f804151bc24824fa32d8.zip
Add m-esr52 at 52.6.0
Diffstat (limited to 'netwerk/base/nsInputStreamPump.cpp')
-rw-r--r--netwerk/base/nsInputStreamPump.cpp766
1 files changed, 766 insertions, 0 deletions
diff --git a/netwerk/base/nsInputStreamPump.cpp b/netwerk/base/nsInputStreamPump.cpp
new file mode 100644
index 000000000..19c2a790a
--- /dev/null
+++ b/netwerk/base/nsInputStreamPump.cpp
@@ -0,0 +1,766 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/* vim:set ts=4 sts=4 sw=4 et cin: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "nsIOService.h"
+#include "nsInputStreamPump.h"
+#include "nsIStreamTransportService.h"
+#include "nsISeekableStream.h"
+#include "nsITransport.h"
+#include "nsIThreadRetargetableStreamListener.h"
+#include "nsThreadUtils.h"
+#include "nsCOMPtr.h"
+#include "mozilla/Logging.h"
+#include "GeckoProfiler.h"
+#include "nsIStreamListener.h"
+#include "nsILoadGroup.h"
+#include "nsNetCID.h"
+#include <algorithm>
+
+static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
+
+//
+// MOZ_LOG=nsStreamPump:5
+//
+static mozilla::LazyLogModule gStreamPumpLog("nsStreamPump");
+#undef LOG
+#define LOG(args) MOZ_LOG(gStreamPumpLog, mozilla::LogLevel::Debug, args)
+
+//-----------------------------------------------------------------------------
+// nsInputStreamPump methods
+//-----------------------------------------------------------------------------
+
+nsInputStreamPump::nsInputStreamPump()
+ : mState(STATE_IDLE)
+ , mStreamOffset(0)
+ , mStreamLength(UINT64_MAX)
+ , mStatus(NS_OK)
+ , mSuspendCount(0)
+ , mLoadFlags(LOAD_NORMAL)
+ , mProcessingCallbacks(false)
+ , mWaitingForInputStreamReady(false)
+ , mCloseWhenDone(false)
+ , mRetargeting(false)
+ , mMonitor("nsInputStreamPump")
+{
+}
+
+nsInputStreamPump::~nsInputStreamPump()
+{
+}
+
+nsresult
+nsInputStreamPump::Create(nsInputStreamPump **result,
+ nsIInputStream *stream,
+ int64_t streamPos,
+ int64_t streamLen,
+ uint32_t segsize,
+ uint32_t segcount,
+ bool closeWhenDone)
+{
+ nsresult rv = NS_ERROR_OUT_OF_MEMORY;
+ RefPtr<nsInputStreamPump> pump = new nsInputStreamPump();
+ if (pump) {
+ rv = pump->Init(stream, streamPos, streamLen,
+ segsize, segcount, closeWhenDone);
+ if (NS_SUCCEEDED(rv)) {
+ pump.forget(result);
+ }
+ }
+ return rv;
+}
+
+struct PeekData {
+ PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure)
+ : mFunc(fun), mClosure(closure) {}
+
+ nsInputStreamPump::PeekSegmentFun mFunc;
+ void* mClosure;
+};
+
+static nsresult
+CallPeekFunc(nsIInputStream *aInStream, void *aClosure,
+ const char *aFromSegment, uint32_t aToOffset, uint32_t aCount,
+ uint32_t *aWriteCount)
+{
+ NS_ASSERTION(aToOffset == 0, "Called more than once?");
+ NS_ASSERTION(aCount > 0, "Called without data?");
+
+ PeekData* data = static_cast<PeekData*>(aClosure);
+ data->mFunc(data->mClosure,
+ reinterpret_cast<const uint8_t*>(aFromSegment), aCount);
+ return NS_BINDING_ABORTED;
+}
+
+nsresult
+nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure)
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ NS_ASSERTION(mAsyncStream, "PeekStream called without stream");
+
+ // See if the pipe is closed by checking the return of Available.
+ uint64_t dummy64;
+ nsresult rv = mAsyncStream->Available(&dummy64);
+ if (NS_FAILED(rv))
+ return rv;
+ uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX);
+
+ PeekData data(callback, closure);
+ return mAsyncStream->ReadSegments(CallPeekFunc,
+ &data,
+ nsIOService::gDefaultSegmentSize,
+ &dummy);
+}
+
+nsresult
+nsInputStreamPump::EnsureWaiting()
+{
+ mMonitor.AssertCurrentThreadIn();
+
+ // no need to worry about multiple threads... an input stream pump lives
+ // on only one thread at a time.
+ MOZ_ASSERT(mAsyncStream);
+ if (!mWaitingForInputStreamReady && !mProcessingCallbacks) {
+ // Ensure OnStateStop is called on the main thread.
+ if (mState == STATE_STOP) {
+ nsCOMPtr<nsIThread> mainThread = do_GetMainThread();
+ if (mTargetThread != mainThread) {
+ mTargetThread = do_QueryInterface(mainThread);
+ }
+ }
+ MOZ_ASSERT(mTargetThread);
+ nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
+ if (NS_FAILED(rv)) {
+ NS_ERROR("AsyncWait failed");
+ return rv;
+ }
+ // Any retargeting during STATE_START or START_TRANSFER is complete
+ // after the call to AsyncWait; next callback wil be on mTargetThread.
+ mRetargeting = false;
+ mWaitingForInputStreamReady = true;
+ }
+ return NS_OK;
+}
+
+//-----------------------------------------------------------------------------
+// nsInputStreamPump::nsISupports
+//-----------------------------------------------------------------------------
+
+// although this class can only be accessed from one thread at a time, we do
+// allow its ownership to move from thread to thread, assuming the consumer
+// understands the limitations of this.
+NS_IMPL_ISUPPORTS(nsInputStreamPump,
+ nsIRequest,
+ nsIThreadRetargetableRequest,
+ nsIInputStreamCallback,
+ nsIInputStreamPump)
+
+//-----------------------------------------------------------------------------
+// nsInputStreamPump::nsIRequest
+//-----------------------------------------------------------------------------
+
+NS_IMETHODIMP
+nsInputStreamPump::GetName(nsACString &result)
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ result.Truncate();
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsInputStreamPump::IsPending(bool *result)
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ *result = (mState != STATE_IDLE);
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsInputStreamPump::GetStatus(nsresult *status)
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ *status = mStatus;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsInputStreamPump::Cancel(nsresult status)
+{
+ MOZ_ASSERT(NS_IsMainThread());
+
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ LOG(("nsInputStreamPump::Cancel [this=%p status=%x]\n",
+ this, status));
+
+ if (NS_FAILED(mStatus)) {
+ LOG((" already canceled\n"));
+ return NS_OK;
+ }
+
+ NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
+ mStatus = status;
+
+ // close input stream
+ if (mAsyncStream) {
+ mAsyncStream->CloseWithStatus(status);
+ if (mSuspendCount == 0)
+ EnsureWaiting();
+ // Otherwise, EnsureWaiting will be called by Resume().
+ // Note that while suspended, OnInputStreamReady will
+ // not do anything, and also note that calling asyncWait
+ // on a closed stream works and will dispatch an event immediately.
+ }
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsInputStreamPump::Suspend()
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ LOG(("nsInputStreamPump::Suspend [this=%p]\n", this));
+ NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
+ ++mSuspendCount;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsInputStreamPump::Resume()
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ LOG(("nsInputStreamPump::Resume [this=%p]\n", this));
+ NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
+ NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
+
+ if (--mSuspendCount == 0)
+ EnsureWaiting();
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags)
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ *aLoadFlags = mLoadFlags;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags)
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ mLoadFlags = aLoadFlags;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup)
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ NS_IF_ADDREF(*aLoadGroup = mLoadGroup);
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup)
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ mLoadGroup = aLoadGroup;
+ return NS_OK;
+}
+
+//-----------------------------------------------------------------------------
+// nsInputStreamPump::nsIInputStreamPump implementation
+//-----------------------------------------------------------------------------
+
+NS_IMETHODIMP
+nsInputStreamPump::Init(nsIInputStream *stream,
+ int64_t streamPos, int64_t streamLen,
+ uint32_t segsize, uint32_t segcount,
+ bool closeWhenDone)
+{
+ NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
+
+ mStreamOffset = uint64_t(streamPos);
+ if (int64_t(streamLen) >= int64_t(0))
+ mStreamLength = uint64_t(streamLen);
+ mStream = stream;
+ mSegSize = segsize;
+ mSegCount = segcount;
+ mCloseWhenDone = closeWhenDone;
+
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
+ NS_ENSURE_ARG_POINTER(listener);
+ MOZ_ASSERT(NS_IsMainThread(), "nsInputStreamPump should be read from the "
+ "main thread only.");
+
+ //
+ // OK, we need to use the stream transport service if
+ //
+ // (1) the stream is blocking
+ // (2) the stream does not support nsIAsyncInputStream
+ //
+
+ bool nonBlocking;
+ nsresult rv = mStream->IsNonBlocking(&nonBlocking);
+ if (NS_FAILED(rv)) return rv;
+
+ if (nonBlocking) {
+ mAsyncStream = do_QueryInterface(mStream);
+ //
+ // if the stream supports nsIAsyncInputStream, and if we need to seek
+ // to a starting offset, then we must do so here. in the non-async
+ // stream case, the stream transport service will take care of seeking
+ // for us.
+ //
+ if (mAsyncStream && (mStreamOffset != UINT64_MAX)) {
+ nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mStream);
+ if (seekable)
+ seekable->Seek(nsISeekableStream::NS_SEEK_SET, mStreamOffset);
+ }
+ }
+
+ if (!mAsyncStream) {
+ // ok, let's use the stream transport service to read this stream.
+ nsCOMPtr<nsIStreamTransportService> sts =
+ do_GetService(kStreamTransportServiceCID, &rv);
+ if (NS_FAILED(rv)) return rv;
+
+ nsCOMPtr<nsITransport> transport;
+ rv = sts->CreateInputTransport(mStream, mStreamOffset, mStreamLength,
+ mCloseWhenDone, getter_AddRefs(transport));
+ if (NS_FAILED(rv)) return rv;
+
+ nsCOMPtr<nsIInputStream> wrapper;
+ rv = transport->OpenInputStream(0, mSegSize, mSegCount, getter_AddRefs(wrapper));
+ if (NS_FAILED(rv)) return rv;
+
+ mAsyncStream = do_QueryInterface(wrapper, &rv);
+ if (NS_FAILED(rv)) return rv;
+ }
+
+ // release our reference to the original stream. from this point forward,
+ // we only reference the "stream" via mAsyncStream.
+ mStream = nullptr;
+
+ // mStreamOffset now holds the number of bytes currently read. we use this
+ // to enforce the mStreamLength restriction.
+ mStreamOffset = 0;
+
+ // grab event queue (we must do this here by contract, since all notifications
+ // must go to the thread which called AsyncRead)
+ mTargetThread = do_GetCurrentThread();
+ NS_ENSURE_STATE(mTargetThread);
+
+ rv = EnsureWaiting();
+ if (NS_FAILED(rv)) return rv;
+
+ if (mLoadGroup)
+ mLoadGroup->AddRequest(this, nullptr);
+
+ mState = STATE_START;
+ mListener = listener;
+ mListenerContext = ctxt;
+ return NS_OK;
+}
+
+//-----------------------------------------------------------------------------
+// nsInputStreamPump::nsIInputStreamCallback implementation
+//-----------------------------------------------------------------------------
+
+NS_IMETHODIMP
+nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream *stream)
+{
+ LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this));
+
+ PROFILER_LABEL("nsInputStreamPump", "OnInputStreamReady",
+ js::ProfileEntry::Category::NETWORK);
+
+ // this function has been called from a PLEvent, so we can safely call
+ // any listener or progress sink methods directly from here.
+
+ for (;;) {
+ // There should only be one iteration of this loop happening at a time.
+ // To prevent AsyncWait() (called during callbacks or on other threads)
+ // from creating a parallel OnInputStreamReady(), we use:
+ // -- a monitor; and
+ // -- a boolean mProcessingCallbacks to detect parallel loops
+ // when exiting the monitor for callbacks.
+ ReentrantMonitorAutoEnter lock(mMonitor);
+
+ // Prevent parallel execution during callbacks, while out of monitor.
+ if (mProcessingCallbacks) {
+ MOZ_ASSERT(!mProcessingCallbacks);
+ break;
+ }
+ mProcessingCallbacks = true;
+ if (mSuspendCount || mState == STATE_IDLE) {
+ mWaitingForInputStreamReady = false;
+ mProcessingCallbacks = false;
+ break;
+ }
+
+ uint32_t nextState;
+ switch (mState) {
+ case STATE_START:
+ nextState = OnStateStart();
+ break;
+ case STATE_TRANSFER:
+ nextState = OnStateTransfer();
+ break;
+ case STATE_STOP:
+ mRetargeting = false;
+ nextState = OnStateStop();
+ break;
+ default:
+ nextState = 0;
+ NS_NOTREACHED("Unknown enum value.");
+ return NS_ERROR_UNEXPECTED;
+ }
+
+ bool stillTransferring = (mState == STATE_TRANSFER &&
+ nextState == STATE_TRANSFER);
+ if (stillTransferring) {
+ NS_ASSERTION(NS_SUCCEEDED(mStatus),
+ "Should not have failed status for ongoing transfer");
+ } else {
+ NS_ASSERTION(mState != nextState,
+ "Only OnStateTransfer can be called more than once.");
+ }
+ if (mRetargeting) {
+ NS_ASSERTION(mState != STATE_STOP,
+ "Retargeting should not happen during OnStateStop.");
+ }
+
+ // Set mRetargeting so EnsureWaiting will be called. It ensures that
+ // OnStateStop is called on the main thread.
+ if (nextState == STATE_STOP && !NS_IsMainThread()) {
+ mRetargeting = true;
+ }
+
+ // Unset mProcessingCallbacks here (while we have lock) so our own call to
+ // EnsureWaiting isn't blocked by it.
+ mProcessingCallbacks = false;
+
+ // We must break the loop when we're switching event delivery to another
+ // thread and the input stream pump is suspended, otherwise
+ // OnStateStop() might be called off the main thread. See bug 1026951
+ // comment #107 for the exact scenario.
+ if (mSuspendCount && mRetargeting) {
+ mState = nextState;
+ mWaitingForInputStreamReady = false;
+ break;
+ }
+
+ // Wait asynchronously if there is still data to transfer, or we're
+ // switching event delivery to another thread.
+ if (!mSuspendCount && (stillTransferring || mRetargeting)) {
+ mState = nextState;
+ mWaitingForInputStreamReady = false;
+ nsresult rv = EnsureWaiting();
+ if (NS_SUCCEEDED(rv))
+ break;
+
+ // Failure to start asynchronous wait: stop transfer.
+ // Do not set mStatus if it was previously set to report a failure.
+ if (NS_SUCCEEDED(mStatus)) {
+ mStatus = rv;
+ }
+ nextState = STATE_STOP;
+ }
+
+ mState = nextState;
+ }
+ return NS_OK;
+}
+
+uint32_t
+nsInputStreamPump::OnStateStart()
+{
+ mMonitor.AssertCurrentThreadIn();
+
+ PROFILER_LABEL("nsInputStreamPump", "OnStateStart",
+ js::ProfileEntry::Category::NETWORK);
+
+ LOG((" OnStateStart [this=%p]\n", this));
+
+ nsresult rv;
+
+ // need to check the reason why the stream is ready. this is required
+ // so our listener can check our status from OnStartRequest.
+ // XXX async streams should have a GetStatus method!
+ if (NS_SUCCEEDED(mStatus)) {
+ uint64_t avail;
+ rv = mAsyncStream->Available(&avail);
+ if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED)
+ mStatus = rv;
+ }
+
+ {
+ // Note: Must exit monitor for call to OnStartRequest to avoid
+ // deadlocks when calls to RetargetDeliveryTo for multiple
+ // nsInputStreamPumps are needed (e.g. nsHttpChannel).
+ mMonitor.Exit();
+ rv = mListener->OnStartRequest(this, mListenerContext);
+ mMonitor.Enter();
+ }
+
+ // an error returned from OnStartRequest should cause us to abort; however,
+ // we must not stomp on mStatus if already canceled.
+ if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus))
+ mStatus = rv;
+
+ return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
+}
+
+uint32_t
+nsInputStreamPump::OnStateTransfer()
+{
+ mMonitor.AssertCurrentThreadIn();
+
+ PROFILER_LABEL("nsInputStreamPump", "OnStateTransfer",
+ js::ProfileEntry::Category::NETWORK);
+
+ LOG((" OnStateTransfer [this=%p]\n", this));
+
+ // if canceled, go directly to STATE_STOP...
+ if (NS_FAILED(mStatus))
+ return STATE_STOP;
+
+ nsresult rv;
+
+ uint64_t avail;
+ rv = mAsyncStream->Available(&avail);
+ LOG((" Available returned [stream=%x rv=%x avail=%llu]\n", mAsyncStream.get(), rv, avail));
+
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ rv = NS_OK;
+ avail = 0;
+ }
+ else if (NS_SUCCEEDED(rv) && avail) {
+ // figure out how much data to report (XXX detect overflow??)
+ if (avail > mStreamLength - mStreamOffset)
+ avail = mStreamLength - mStreamOffset;
+
+ if (avail) {
+ // we used to limit avail to 16K - we were afraid some ODA handlers
+ // might assume they wouldn't get more than 16K at once
+ // we're removing that limit since it speeds up local file access.
+ // Now there's an implicit 64K limit of 4 16K segments
+ // NOTE: ok, so the story is as follows. OnDataAvailable impls
+ // are by contract supposed to consume exactly |avail| bytes.
+ // however, many do not... mailnews... stream converters...
+ // cough, cough. the input stream pump is fairly tolerant
+ // in this regard; however, if an ODA does not consume any
+ // data from the stream, then we could potentially end up in
+ // an infinite loop. we do our best here to try to catch
+ // such an error. (see bug 189672)
+
+ // in most cases this QI will succeed (mAsyncStream is almost always
+ // a nsPipeInputStream, which implements nsISeekableStream::Tell).
+ int64_t offsetBefore;
+ nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mAsyncStream);
+ if (seekable && NS_FAILED(seekable->Tell(&offsetBefore))) {
+ NS_NOTREACHED("Tell failed on readable stream");
+ offsetBefore = 0;
+ }
+
+ uint32_t odaAvail =
+ avail > UINT32_MAX ?
+ UINT32_MAX : uint32_t(avail);
+
+ LOG((" calling OnDataAvailable [offset=%llu count=%llu(%u)]\n",
+ mStreamOffset, avail, odaAvail));
+
+ {
+ // Note: Must exit monitor for call to OnStartRequest to avoid
+ // deadlocks when calls to RetargetDeliveryTo for multiple
+ // nsInputStreamPumps are needed (e.g. nsHttpChannel).
+ mMonitor.Exit();
+ rv = mListener->OnDataAvailable(this, mListenerContext,
+ mAsyncStream, mStreamOffset,
+ odaAvail);
+ mMonitor.Enter();
+ }
+
+ // don't enter this code if ODA failed or called Cancel
+ if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
+ // test to see if this ODA failed to consume data
+ if (seekable) {
+ // NOTE: if Tell fails, which can happen if the stream is
+ // now closed, then we assume that everything was read.
+ int64_t offsetAfter;
+ if (NS_FAILED(seekable->Tell(&offsetAfter)))
+ offsetAfter = offsetBefore + odaAvail;
+ if (offsetAfter > offsetBefore)
+ mStreamOffset += (offsetAfter - offsetBefore);
+ else if (mSuspendCount == 0) {
+ //
+ // possible infinite loop if we continue pumping data!
+ //
+ // NOTE: although not allowed by nsIStreamListener, we
+ // will allow the ODA impl to Suspend the pump. IMAP
+ // does this :-(
+ //
+ NS_ERROR("OnDataAvailable implementation consumed no data");
+ mStatus = NS_ERROR_UNEXPECTED;
+ }
+ }
+ else
+ mStreamOffset += odaAvail; // assume ODA behaved well
+ }
+ }
+ }
+
+ // an error returned from Available or OnDataAvailable should cause us to
+ // abort; however, we must not stomp on mStatus if already canceled.
+
+ if (NS_SUCCEEDED(mStatus)) {
+ if (NS_FAILED(rv))
+ mStatus = rv;
+ else if (avail) {
+ // if stream is now closed, advance to STATE_STOP right away.
+ // Available may return 0 bytes available at the moment; that
+ // would not mean that we are done.
+ // XXX async streams should have a GetStatus method!
+ rv = mAsyncStream->Available(&avail);
+ if (NS_SUCCEEDED(rv))
+ return STATE_TRANSFER;
+ if (rv != NS_BASE_STREAM_CLOSED)
+ mStatus = rv;
+ }
+ }
+ return STATE_STOP;
+}
+
+nsresult
+nsInputStreamPump::CallOnStateStop()
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ MOZ_ASSERT(NS_IsMainThread(),
+ "CallOnStateStop should only be called on the main thread.");
+
+ mState = OnStateStop();
+ return NS_OK;
+}
+
+uint32_t
+nsInputStreamPump::OnStateStop()
+{
+ mMonitor.AssertCurrentThreadIn();
+
+ if (!NS_IsMainThread()) {
+ // Hopefully temporary hack: OnStateStop should only run on the main
+ // thread, but we're seeing some rare off-main-thread calls. For now
+ // just redispatch to the main thread in release builds, and crash in
+ // debug builds.
+ MOZ_ASSERT(NS_IsMainThread(),
+ "OnStateStop should only be called on the main thread.");
+ nsresult rv = NS_DispatchToMainThread(
+ NewRunnableMethod(this, &nsInputStreamPump::CallOnStateStop));
+ NS_ENSURE_SUCCESS(rv, STATE_IDLE);
+ return STATE_IDLE;
+ }
+
+ PROFILER_LABEL("nsInputStreamPump", "OnStateStop",
+ js::ProfileEntry::Category::NETWORK);
+
+ LOG((" OnStateStop [this=%p status=%x]\n", this, mStatus));
+
+ // if an error occurred, we must be sure to pass the error onto the async
+ // stream. in some cases, this is redundant, but since close is idempotent,
+ // this is OK. otherwise, be sure to honor the "close-when-done" option.
+
+ if (!mAsyncStream || !mListener) {
+ MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?");
+ MOZ_ASSERT(mListener, "null mListener: OnStateStop called twice?");
+ return STATE_IDLE;
+ }
+
+ if (NS_FAILED(mStatus))
+ mAsyncStream->CloseWithStatus(mStatus);
+ else if (mCloseWhenDone)
+ mAsyncStream->Close();
+
+ mAsyncStream = nullptr;
+ mTargetThread = nullptr;
+ mIsPending = false;
+ {
+ // Note: Must exit monitor for call to OnStartRequest to avoid
+ // deadlocks when calls to RetargetDeliveryTo for multiple
+ // nsInputStreamPumps are needed (e.g. nsHttpChannel).
+ mMonitor.Exit();
+ mListener->OnStopRequest(this, mListenerContext, mStatus);
+ mMonitor.Enter();
+ }
+ mListener = nullptr;
+ mListenerContext = nullptr;
+
+ if (mLoadGroup)
+ mLoadGroup->RemoveRequest(this, nullptr, mStatus);
+
+ return STATE_IDLE;
+}
+
+//-----------------------------------------------------------------------------
+// nsIThreadRetargetableRequest
+//-----------------------------------------------------------------------------
+
+NS_IMETHODIMP
+nsInputStreamPump::RetargetDeliveryTo(nsIEventTarget* aNewTarget)
+{
+ ReentrantMonitorAutoEnter mon(mMonitor);
+
+ NS_ENSURE_ARG(aNewTarget);
+ NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER,
+ NS_ERROR_UNEXPECTED);
+
+ // If canceled, do not retarget. Return with canceled status.
+ if (NS_FAILED(mStatus)) {
+ return mStatus;
+ }
+
+ if (aNewTarget == mTargetThread) {
+ NS_WARNING("Retargeting delivery to same thread");
+ return NS_OK;
+ }
+
+ // Ensure that |mListener| and any subsequent listeners can be retargeted
+ // to another thread.
+ nsresult rv = NS_OK;
+ nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener =
+ do_QueryInterface(mListener, &rv);
+ if (NS_SUCCEEDED(rv) && retargetableListener) {
+ rv = retargetableListener->CheckListenerChain();
+ if (NS_SUCCEEDED(rv)) {
+ mTargetThread = aNewTarget;
+ mRetargeting = true;
+ }
+ }
+ LOG(("nsInputStreamPump::RetargetDeliveryTo [this=%x aNewTarget=%p] "
+ "%s listener [%p] rv[%x]",
+ this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"),
+ (nsIStreamListener*)mListener, rv));
+ return rv;
+}