diff options
Diffstat (limited to 'netwerk/base/nsStreamTransportService.cpp')
-rw-r--r-- | netwerk/base/nsStreamTransportService.cpp | 563 |
1 files changed, 563 insertions, 0 deletions
diff --git a/netwerk/base/nsStreamTransportService.cpp b/netwerk/base/nsStreamTransportService.cpp new file mode 100644 index 000000000..3461480b6 --- /dev/null +++ b/netwerk/base/nsStreamTransportService.cpp @@ -0,0 +1,563 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "nsStreamTransportService.h" +#include "nsXPCOMCIDInternal.h" +#include "nsNetSegmentUtils.h" +#include "nsTransportUtils.h" +#include "nsStreamUtils.h" +#include "nsError.h" +#include "nsNetCID.h" + +#include "nsIAsyncInputStream.h" +#include "nsIAsyncOutputStream.h" +#include "nsISeekableStream.h" +#include "nsIPipe.h" +#include "nsITransport.h" +#include "nsIObserverService.h" +#include "nsIThreadPool.h" +#include "mozilla/Services.h" + +namespace mozilla { +namespace net { + +//----------------------------------------------------------------------------- +// nsInputStreamTransport +// +// Implements nsIInputStream as a wrapper around the real input stream. This +// allows the transport to support seeking, range-limiting, progress reporting, +// and close-when-done semantics while utilizing NS_AsyncCopy. +//----------------------------------------------------------------------------- + +class nsInputStreamTransport : public nsITransport + , public nsIInputStream +{ +public: + NS_DECL_THREADSAFE_ISUPPORTS + NS_DECL_NSITRANSPORT + NS_DECL_NSIINPUTSTREAM + + nsInputStreamTransport(nsIInputStream *source, + uint64_t offset, + uint64_t limit, + bool closeWhenDone) + : mSource(source) + , mOffset(offset) + , mLimit(limit) + , mCloseWhenDone(closeWhenDone) + , mFirstTime(true) + , mInProgress(false) + { + } + +private: + virtual ~nsInputStreamTransport() + { + } + + nsCOMPtr<nsIAsyncInputStream> mPipeIn; + + // while the copy is active, these members may only be accessed from the + // nsIInputStream implementation. + nsCOMPtr<nsITransportEventSink> mEventSink; + nsCOMPtr<nsIInputStream> mSource; + int64_t mOffset; + int64_t mLimit; + bool mCloseWhenDone; + bool mFirstTime; + + // this variable serves as a lock to prevent the state of the transport + // from being modified once the copy is in progress. + bool mInProgress; +}; + +NS_IMPL_ISUPPORTS(nsInputStreamTransport, + nsITransport, + nsIInputStream) + +/** nsITransport **/ + +NS_IMETHODIMP +nsInputStreamTransport::OpenInputStream(uint32_t flags, + uint32_t segsize, + uint32_t segcount, + nsIInputStream **result) +{ + NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); + + nsresult rv; + nsCOMPtr<nsIEventTarget> target = + do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv); + if (NS_FAILED(rv)) return rv; + + // XXX if the caller requests an unbuffered stream, then perhaps + // we'd want to simply return mSource; however, then we would + // not be reading mSource on a background thread. is this ok? + + bool nonblocking = !(flags & OPEN_BLOCKING); + + net_ResolveSegmentParams(segsize, segcount); + + nsCOMPtr<nsIAsyncOutputStream> pipeOut; + rv = NS_NewPipe2(getter_AddRefs(mPipeIn), + getter_AddRefs(pipeOut), + nonblocking, true, + segsize, segcount); + if (NS_FAILED(rv)) return rv; + + mInProgress = true; + + // startup async copy process... + rv = NS_AsyncCopy(this, pipeOut, target, + NS_ASYNCCOPY_VIA_WRITESEGMENTS, segsize); + if (NS_SUCCEEDED(rv)) + NS_ADDREF(*result = mPipeIn); + + return rv; +} + +NS_IMETHODIMP +nsInputStreamTransport::OpenOutputStream(uint32_t flags, + uint32_t segsize, + uint32_t segcount, + nsIOutputStream **result) +{ + // this transport only supports reading! + NS_NOTREACHED("nsInputStreamTransport::OpenOutputStream"); + return NS_ERROR_UNEXPECTED; +} + +NS_IMETHODIMP +nsInputStreamTransport::Close(nsresult reason) +{ + if (NS_SUCCEEDED(reason)) + reason = NS_BASE_STREAM_CLOSED; + + return mPipeIn->CloseWithStatus(reason); +} + +NS_IMETHODIMP +nsInputStreamTransport::SetEventSink(nsITransportEventSink *sink, + nsIEventTarget *target) +{ + NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); + + if (target) + return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), + sink, target); + + mEventSink = sink; + return NS_OK; +} + +/** nsIInputStream **/ + +NS_IMETHODIMP +nsInputStreamTransport::Close() +{ + if (mCloseWhenDone) + mSource->Close(); + + // make additional reads return early... + mOffset = mLimit = 0; + return NS_OK; +} + +NS_IMETHODIMP +nsInputStreamTransport::Available(uint64_t *result) +{ + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP +nsInputStreamTransport::Read(char *buf, uint32_t count, uint32_t *result) +{ + if (mFirstTime) { + mFirstTime = false; + if (mOffset != 0) { + // read from current position if offset equal to max + if (mOffset != -1) { + nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mSource); + if (seekable) + seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset); + } + // reset offset to zero so we can use it to enforce limit + mOffset = 0; + } + } + + // limit amount read + uint64_t max = count; + if (mLimit != -1) { + max = mLimit - mOffset; + if (max == 0) { + *result = 0; + return NS_OK; + } + } + + if (count > max) + count = static_cast<uint32_t>(max); + + nsresult rv = mSource->Read(buf, count, result); + + if (NS_SUCCEEDED(rv)) { + mOffset += *result; + if (mEventSink) + mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset, + mLimit); + } + return rv; +} + +NS_IMETHODIMP +nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void *closure, + uint32_t count, uint32_t *result) +{ + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP +nsInputStreamTransport::IsNonBlocking(bool *result) +{ + *result = false; + return NS_OK; +} + +//----------------------------------------------------------------------------- +// nsOutputStreamTransport +// +// Implements nsIOutputStream as a wrapper around the real input stream. This +// allows the transport to support seeking, range-limiting, progress reporting, +// and close-when-done semantics while utilizing NS_AsyncCopy. +//----------------------------------------------------------------------------- + +class nsOutputStreamTransport : public nsITransport + , public nsIOutputStream +{ +public: + NS_DECL_THREADSAFE_ISUPPORTS + NS_DECL_NSITRANSPORT + NS_DECL_NSIOUTPUTSTREAM + + nsOutputStreamTransport(nsIOutputStream *sink, + int64_t offset, + int64_t limit, + bool closeWhenDone) + : mSink(sink) + , mOffset(offset) + , mLimit(limit) + , mCloseWhenDone(closeWhenDone) + , mFirstTime(true) + , mInProgress(false) + { + } + +private: + virtual ~nsOutputStreamTransport() + { + } + + nsCOMPtr<nsIAsyncOutputStream> mPipeOut; + + // while the copy is active, these members may only be accessed from the + // nsIOutputStream implementation. + nsCOMPtr<nsITransportEventSink> mEventSink; + nsCOMPtr<nsIOutputStream> mSink; + int64_t mOffset; + int64_t mLimit; + bool mCloseWhenDone; + bool mFirstTime; + + // this variable serves as a lock to prevent the state of the transport + // from being modified once the copy is in progress. + bool mInProgress; +}; + +NS_IMPL_ISUPPORTS(nsOutputStreamTransport, + nsITransport, + nsIOutputStream) + +/** nsITransport **/ + +NS_IMETHODIMP +nsOutputStreamTransport::OpenInputStream(uint32_t flags, + uint32_t segsize, + uint32_t segcount, + nsIInputStream **result) +{ + // this transport only supports writing! + NS_NOTREACHED("nsOutputStreamTransport::OpenInputStream"); + return NS_ERROR_UNEXPECTED; +} + +NS_IMETHODIMP +nsOutputStreamTransport::OpenOutputStream(uint32_t flags, + uint32_t segsize, + uint32_t segcount, + nsIOutputStream **result) +{ + NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); + + nsresult rv; + nsCOMPtr<nsIEventTarget> target = + do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv); + if (NS_FAILED(rv)) return rv; + + // XXX if the caller requests an unbuffered stream, then perhaps + // we'd want to simply return mSink; however, then we would + // not be writing to mSink on a background thread. is this ok? + + bool nonblocking = !(flags & OPEN_BLOCKING); + + net_ResolveSegmentParams(segsize, segcount); + + nsCOMPtr<nsIAsyncInputStream> pipeIn; + rv = NS_NewPipe2(getter_AddRefs(pipeIn), + getter_AddRefs(mPipeOut), + true, nonblocking, + segsize, segcount); + if (NS_FAILED(rv)) return rv; + + mInProgress = true; + + // startup async copy process... + rv = NS_AsyncCopy(pipeIn, this, target, + NS_ASYNCCOPY_VIA_READSEGMENTS, segsize); + if (NS_SUCCEEDED(rv)) + NS_ADDREF(*result = mPipeOut); + + return rv; +} + +NS_IMETHODIMP +nsOutputStreamTransport::Close(nsresult reason) +{ + if (NS_SUCCEEDED(reason)) + reason = NS_BASE_STREAM_CLOSED; + + return mPipeOut->CloseWithStatus(reason); +} + +NS_IMETHODIMP +nsOutputStreamTransport::SetEventSink(nsITransportEventSink *sink, + nsIEventTarget *target) +{ + NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); + + if (target) + return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), + sink, target); + + mEventSink = sink; + return NS_OK; +} + +/** nsIOutputStream **/ + +NS_IMETHODIMP +nsOutputStreamTransport::Close() +{ + if (mCloseWhenDone) + mSink->Close(); + + // make additional writes return early... + mOffset = mLimit = 0; + return NS_OK; +} + +NS_IMETHODIMP +nsOutputStreamTransport::Flush() +{ + return NS_OK; +} + +NS_IMETHODIMP +nsOutputStreamTransport::Write(const char *buf, uint32_t count, uint32_t *result) +{ + if (mFirstTime) { + mFirstTime = false; + if (mOffset != 0) { + // write to current position if offset equal to max + if (mOffset != -1) { + nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mSink); + if (seekable) + seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset); + } + // reset offset to zero so we can use it to enforce limit + mOffset = 0; + } + } + + // limit amount written + uint64_t max = count; + if (mLimit != -1) { + max = mLimit - mOffset; + if (max == 0) { + *result = 0; + return NS_OK; + } + } + + if (count > max) + count = static_cast<uint32_t>(max); + + nsresult rv = mSink->Write(buf, count, result); + + if (NS_SUCCEEDED(rv)) { + mOffset += *result; + if (mEventSink) + mEventSink->OnTransportStatus(this, NS_NET_STATUS_WRITING, mOffset, + mLimit); + } + return rv; +} + +NS_IMETHODIMP +nsOutputStreamTransport::WriteSegments(nsReadSegmentFun reader, void *closure, + uint32_t count, uint32_t *result) +{ + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP +nsOutputStreamTransport::WriteFrom(nsIInputStream *in, uint32_t count, uint32_t *result) +{ + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP +nsOutputStreamTransport::IsNonBlocking(bool *result) +{ + *result = false; + return NS_OK; +} + +//----------------------------------------------------------------------------- +// nsStreamTransportService +//----------------------------------------------------------------------------- + +nsStreamTransportService::~nsStreamTransportService() +{ + NS_ASSERTION(!mPool, "thread pool wasn't shutdown"); +} + +nsresult +nsStreamTransportService::Init() +{ + mPool = do_CreateInstance(NS_THREADPOOL_CONTRACTID); + NS_ENSURE_STATE(mPool); + + // Configure the pool + mPool->SetName(NS_LITERAL_CSTRING("StreamTrans")); + mPool->SetThreadLimit(25); + mPool->SetIdleThreadLimit(1); + mPool->SetIdleThreadTimeout(PR_SecondsToInterval(30)); + + nsCOMPtr<nsIObserverService> obsSvc = + mozilla::services::GetObserverService(); + if (obsSvc) + obsSvc->AddObserver(this, "xpcom-shutdown-threads", false); + return NS_OK; +} + +NS_IMPL_ISUPPORTS(nsStreamTransportService, + nsIStreamTransportService, + nsIEventTarget, + nsIObserver) + +NS_IMETHODIMP +nsStreamTransportService::DispatchFromScript(nsIRunnable *task, uint32_t flags) +{ + nsCOMPtr<nsIRunnable> event(task); + return Dispatch(event.forget(), flags); +} + +NS_IMETHODIMP +nsStreamTransportService::Dispatch(already_AddRefed<nsIRunnable> task, uint32_t flags) +{ + nsCOMPtr<nsIRunnable> event(task); // so it gets released on failure paths + nsCOMPtr<nsIThreadPool> pool; + { + mozilla::MutexAutoLock lock(mShutdownLock); + if (mIsShutdown) { + return NS_ERROR_NOT_INITIALIZED; + } + pool = mPool; + } + NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED); + return pool->Dispatch(event.forget(), flags); +} + +NS_IMETHODIMP +nsStreamTransportService::DelayedDispatch(already_AddRefed<nsIRunnable>, uint32_t) +{ + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP +nsStreamTransportService::IsOnCurrentThread(bool *result) +{ + nsCOMPtr<nsIThreadPool> pool; + { + mozilla::MutexAutoLock lock(mShutdownLock); + if (mIsShutdown) { + return NS_ERROR_NOT_INITIALIZED; + } + pool = mPool; + } + NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED); + return pool->IsOnCurrentThread(result); +} + +NS_IMETHODIMP +nsStreamTransportService::CreateInputTransport(nsIInputStream *stream, + int64_t offset, + int64_t limit, + bool closeWhenDone, + nsITransport **result) +{ + nsInputStreamTransport *trans = + new nsInputStreamTransport(stream, offset, limit, closeWhenDone); + if (!trans) + return NS_ERROR_OUT_OF_MEMORY; + NS_ADDREF(*result = trans); + return NS_OK; +} + +NS_IMETHODIMP +nsStreamTransportService::CreateOutputTransport(nsIOutputStream *stream, + int64_t offset, + int64_t limit, + bool closeWhenDone, + nsITransport **result) +{ + nsOutputStreamTransport *trans = + new nsOutputStreamTransport(stream, offset, limit, closeWhenDone); + if (!trans) + return NS_ERROR_OUT_OF_MEMORY; + NS_ADDREF(*result = trans); + return NS_OK; +} + +NS_IMETHODIMP +nsStreamTransportService::Observe(nsISupports *subject, const char *topic, + const char16_t *data) +{ + NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops"); + + { + mozilla::MutexAutoLock lock(mShutdownLock); + mIsShutdown = true; + } + + if (mPool) { + mPool->Shutdown(); + mPool = nullptr; + } + return NS_OK; +} + +} // namespace net +} // namespace mozilla |