diff options
Diffstat (limited to 'dom/cache/ReadStream.cpp')
-rw-r--r-- | dom/cache/ReadStream.cpp | 578 |
1 files changed, 578 insertions, 0 deletions
diff --git a/dom/cache/ReadStream.cpp b/dom/cache/ReadStream.cpp new file mode 100644 index 000000000..efce027a1 --- /dev/null +++ b/dom/cache/ReadStream.cpp @@ -0,0 +1,578 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "mozilla/dom/cache/ReadStream.h" + +#include "mozilla/Unused.h" +#include "mozilla/dom/cache/CacheStreamControlChild.h" +#include "mozilla/dom/cache/CacheStreamControlParent.h" +#include "mozilla/dom/cache/CacheTypes.h" +#include "mozilla/ipc/IPCStreamUtils.h" +#include "mozilla/SnappyUncompressInputStream.h" +#include "nsIAsyncInputStream.h" +#include "nsTArray.h" + +namespace mozilla { +namespace dom { +namespace cache { + +using mozilla::Unused; +using mozilla::ipc::AutoIPCStream; +using mozilla::ipc::IPCStream; + +// ---------------------------------------------------------------------------- + +// The inner stream class. This is where all of the real work is done. As +// an invariant Inner::Close() must be called before ~Inner(). This is +// guaranteed by our outer ReadStream class. +class ReadStream::Inner final : public ReadStream::Controllable +{ +public: + Inner(StreamControl* aControl, const nsID& aId, + nsIInputStream* aStream); + + void + Serialize(CacheReadStreamOrVoid* aReadStreamOut, + nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList, + ErrorResult& aRv); + + void + Serialize(CacheReadStream* aReadStreamOut, + nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList, + ErrorResult& aRv); + + // ReadStream::Controllable methods + virtual void + CloseStream() override; + + virtual void + CloseStreamWithoutReporting() override; + + virtual bool + MatchId(const nsID& aId) const override; + + virtual bool + HasEverBeenRead() const override; + + // Simulate nsIInputStream methods, but we don't actually inherit from it + nsresult + Close(); + + nsresult + Available(uint64_t *aNumAvailableOut); + + nsresult + Read(char *aBuf, uint32_t aCount, uint32_t *aNumReadOut); + + nsresult + ReadSegments(nsWriteSegmentFun aWriter, void *aClosure, uint32_t aCount, + uint32_t *aNumReadOut); + + nsresult + IsNonBlocking(bool *aNonBlockingOut); + +private: + class NoteClosedRunnable; + class ForgetRunnable; + + ~Inner(); + + void + NoteClosed(); + + void + Forget(); + + void + NoteClosedOnOwningThread(); + + void + ForgetOnOwningThread(); + + // Weak ref to the stream control actor. The actor will always call either + // CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The + // weak ref is cleared in the resulting NoteClosedOnOwningThread() or + // ForgetOnOwningThread() method call. + StreamControl* mControl; + + const nsID mId; + nsCOMPtr<nsIThread> mOwningThread; + + enum State + { + Open, + Closed, + NumStates + }; + Atomic<State> mState; + Atomic<bool> mHasEverBeenRead; + + + // The wrapped stream objects may not be threadsafe. We need to be able + // to close a stream on our owning thread while an IO thread is simultaneously + // reading the same stream. Therefore, protect all access to these stream + // objects with a mutex. + Mutex mMutex; + nsCOMPtr<nsIInputStream> mStream; + nsCOMPtr<nsIInputStream> mSnappyStream; + + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(cache::ReadStream::Inner, override) +}; + +// ---------------------------------------------------------------------------- + +// Runnable to notify actors that the ReadStream has closed. This must +// be done on the thread associated with the PBackground actor. Must be +// cancelable to execute on Worker threads (which can occur when the +// ReadStream is constructed on a child process Worker thread). +class ReadStream::Inner::NoteClosedRunnable final : public CancelableRunnable +{ +public: + explicit NoteClosedRunnable(ReadStream::Inner* aStream) + : mStream(aStream) + { } + + NS_IMETHOD Run() override + { + mStream->NoteClosedOnOwningThread(); + mStream = nullptr; + return NS_OK; + } + + // Note, we must proceed with the Run() method since our actor will not + // clean itself up until we note that the stream is closed. + nsresult Cancel() override + { + Run(); + return NS_OK; + } + +private: + ~NoteClosedRunnable() { } + + RefPtr<ReadStream::Inner> mStream; +}; + +// ---------------------------------------------------------------------------- + +// Runnable to clear actors without reporting that the ReadStream has +// closed. Since this can trigger actor destruction, we need to do +// it on the thread associated with the PBackground actor. Must be +// cancelable to execute on Worker threads (which can occur when the +// ReadStream is constructed on a child process Worker thread). +class ReadStream::Inner::ForgetRunnable final : public CancelableRunnable +{ +public: + explicit ForgetRunnable(ReadStream::Inner* aStream) + : mStream(aStream) + { } + + NS_IMETHOD Run() override + { + mStream->ForgetOnOwningThread(); + mStream = nullptr; + return NS_OK; + } + + // Note, we must proceed with the Run() method so that we properly + // call RemoveListener on the actor. + nsresult Cancel() override + { + Run(); + return NS_OK; + } + +private: + ~ForgetRunnable() { } + + RefPtr<ReadStream::Inner> mStream; +}; + +// ---------------------------------------------------------------------------- + +ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId, + nsIInputStream* aStream) + : mControl(aControl) + , mId(aId) + , mOwningThread(NS_GetCurrentThread()) + , mState(Open) + , mHasEverBeenRead(false) + , mMutex("dom::cache::ReadStream") + , mStream(aStream) + , mSnappyStream(new SnappyUncompressInputStream(aStream)) +{ + MOZ_DIAGNOSTIC_ASSERT(mStream); + MOZ_DIAGNOSTIC_ASSERT(mControl); + mControl->AddReadStream(this); +} + +void +ReadStream::Inner::Serialize(CacheReadStreamOrVoid* aReadStreamOut, + nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList, + ErrorResult& aRv) +{ + MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread); + MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut); + *aReadStreamOut = CacheReadStream(); + Serialize(&aReadStreamOut->get_CacheReadStream(), aStreamCleanupList, aRv); +} + +void +ReadStream::Inner::Serialize(CacheReadStream* aReadStreamOut, + nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList, + ErrorResult& aRv) +{ + MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread); + MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut); + + if (mState != Open) { + aRv.ThrowTypeError<MSG_CACHE_STREAM_CLOSED>(); + return; + } + + MOZ_DIAGNOSTIC_ASSERT(mControl); + + aReadStreamOut->id() = mId; + mControl->SerializeControl(aReadStreamOut); + + { + MutexAutoLock lock(mMutex); + mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList); + } + + MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut->stream().type() == + IPCStream::TInputStreamParamsWithFds); + + // We're passing ownership across the IPC barrier with the control, so + // do not signal that the stream is closed here. + Forget(); +} + +void +ReadStream::Inner::CloseStream() +{ + MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread); + Close(); +} + +void +ReadStream::Inner::CloseStreamWithoutReporting() +{ + MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread); + Forget(); +} + +bool +ReadStream::Inner::MatchId(const nsID& aId) const +{ + MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread); + return mId.Equals(aId); +} + +bool +ReadStream::Inner::HasEverBeenRead() const +{ + MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread); + return mHasEverBeenRead; +} + +nsresult +ReadStream::Inner::Close() +{ + // stream ops can happen on any thread + nsresult rv = NS_OK; + { + MutexAutoLock lock(mMutex); + rv = mSnappyStream->Close(); + } + NoteClosed(); + return rv; +} + +nsresult +ReadStream::Inner::Available(uint64_t* aNumAvailableOut) +{ + // stream ops can happen on any thread + nsresult rv = NS_OK; + { + MutexAutoLock lock(mMutex); + rv = mSnappyStream->Available(aNumAvailableOut); + } + + if (NS_FAILED(rv)) { + Close(); + } + + return rv; +} + +nsresult +ReadStream::Inner::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut) +{ + // stream ops can happen on any thread + MOZ_DIAGNOSTIC_ASSERT(aNumReadOut); + + nsresult rv = NS_OK; + { + MutexAutoLock lock(mMutex); + rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut); + } + + if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) || + *aNumReadOut == 0) { + Close(); + } + + mHasEverBeenRead = true; + + return rv; +} + +nsresult +ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, + uint32_t aCount, uint32_t* aNumReadOut) +{ + // stream ops can happen on any thread + MOZ_DIAGNOSTIC_ASSERT(aNumReadOut); + + if (aCount) { + mHasEverBeenRead = true; + } + + + nsresult rv = NS_OK; + { + MutexAutoLock lock(mMutex); + rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount, aNumReadOut); + } + + if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK && + rv != NS_ERROR_NOT_IMPLEMENTED) || *aNumReadOut == 0) { + Close(); + } + + // Verify bytes were actually read before marking as being ever read. For + // example, code can test if the stream supports ReadSegments() by calling + // this method with a dummy callback which doesn't read anything. We don't + // want to trigger on that. + if (*aNumReadOut) { + mHasEverBeenRead = true; + } + + return rv; +} + +nsresult +ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut) +{ + // stream ops can happen on any thread + MutexAutoLock lock(mMutex); + return mSnappyStream->IsNonBlocking(aNonBlockingOut); +} + +ReadStream::Inner::~Inner() +{ + // Any thread + MOZ_DIAGNOSTIC_ASSERT(mState == Closed); + MOZ_DIAGNOSTIC_ASSERT(!mControl); +} + +void +ReadStream::Inner::NoteClosed() +{ + // Any thread + if (mState == Closed) { + return; + } + + if (NS_GetCurrentThread() == mOwningThread) { + NoteClosedOnOwningThread(); + return; + } + + nsCOMPtr<nsIRunnable> runnable = new NoteClosedRunnable(this); + MOZ_ALWAYS_SUCCEEDS( + mOwningThread->Dispatch(runnable, nsIThread::DISPATCH_NORMAL)); +} + +void +ReadStream::Inner::Forget() +{ + // Any thread + if (mState == Closed) { + return; + } + + if (NS_GetCurrentThread() == mOwningThread) { + ForgetOnOwningThread(); + return; + } + + nsCOMPtr<nsIRunnable> runnable = new ForgetRunnable(this); + MOZ_ALWAYS_SUCCEEDS( + mOwningThread->Dispatch(runnable, nsIThread::DISPATCH_NORMAL)); +} + +void +ReadStream::Inner::NoteClosedOnOwningThread() +{ + MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread); + + // Mark closed and do nothing if we were already closed + if (!mState.compareExchange(Open, Closed)) { + return; + } + + MOZ_DIAGNOSTIC_ASSERT(mControl); + mControl->NoteClosed(this, mId); + mControl = nullptr; +} + +void +ReadStream::Inner::ForgetOnOwningThread() +{ + MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread); + + // Mark closed and do nothing if we were already closed + if (!mState.compareExchange(Open, Closed)) { + return; + } + + MOZ_DIAGNOSTIC_ASSERT(mControl); + mControl->ForgetReadStream(this); + mControl = nullptr; +} + +// ---------------------------------------------------------------------------- + +NS_IMPL_ISUPPORTS(cache::ReadStream, nsIInputStream, ReadStream); + +// static +already_AddRefed<ReadStream> +ReadStream::Create(const CacheReadStreamOrVoid& aReadStreamOrVoid) +{ + if (aReadStreamOrVoid.type() == CacheReadStreamOrVoid::Tvoid_t) { + return nullptr; + } + + return Create(aReadStreamOrVoid.get_CacheReadStream()); +} + +// static +already_AddRefed<ReadStream> +ReadStream::Create(const CacheReadStream& aReadStream) +{ + // The parameter may or may not be for a Cache created stream. The way we + // tell is by looking at the stream control actor. If the actor exists, + // then we know the Cache created it. + if (!aReadStream.controlChild() && !aReadStream.controlParent()) { + return nullptr; + } + + MOZ_DIAGNOSTIC_ASSERT(aReadStream.stream().type() == + IPCStream::TInputStreamParamsWithFds); + + // Control is guaranteed to survive this method as ActorDestroy() cannot + // run on this thread until we complete. + StreamControl* control; + if (aReadStream.controlChild()) { + auto actor = static_cast<CacheStreamControlChild*>(aReadStream.controlChild()); + control = actor; + } else { + auto actor = static_cast<CacheStreamControlParent*>(aReadStream.controlParent()); + control = actor; + } + MOZ_DIAGNOSTIC_ASSERT(control); + + nsCOMPtr<nsIInputStream> stream = DeserializeIPCStream(aReadStream.stream()); + MOZ_DIAGNOSTIC_ASSERT(stream); + + // Currently we expect all cache read streams to be blocking file streams. +#if !defined(RELEASE_OR_BETA) + nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(stream); + MOZ_DIAGNOSTIC_ASSERT(!asyncStream); +#endif + + RefPtr<Inner> inner = new Inner(control, aReadStream.id(), stream); + RefPtr<ReadStream> ref = new ReadStream(inner); + return ref.forget(); +} + +// static +already_AddRefed<ReadStream> +ReadStream::Create(PCacheStreamControlParent* aControl, const nsID& aId, + nsIInputStream* aStream) +{ + MOZ_DIAGNOSTIC_ASSERT(aControl); + auto actor = static_cast<CacheStreamControlParent*>(aControl); + RefPtr<Inner> inner = new Inner(actor, aId, aStream); + RefPtr<ReadStream> ref = new ReadStream(inner); + return ref.forget(); +} + +void +ReadStream::Serialize(CacheReadStreamOrVoid* aReadStreamOut, + nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList, + ErrorResult& aRv) +{ + mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv); +} + +void +ReadStream::Serialize(CacheReadStream* aReadStreamOut, + nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList, + ErrorResult& aRv) +{ + mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv); +} + +ReadStream::ReadStream(ReadStream::Inner* aInner) + : mInner(aInner) +{ + MOZ_DIAGNOSTIC_ASSERT(mInner); +} + +ReadStream::~ReadStream() +{ + // Explicitly close the inner stream so that it does not have to + // deal with implicitly closing at destruction time. + mInner->Close(); +} + +NS_IMETHODIMP +ReadStream::Close() +{ + return mInner->Close(); +} + +NS_IMETHODIMP +ReadStream::Available(uint64_t* aNumAvailableOut) +{ + return mInner->Available(aNumAvailableOut); +} + +NS_IMETHODIMP +ReadStream::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut) +{ + return mInner->Read(aBuf, aCount, aNumReadOut); +} + +NS_IMETHODIMP +ReadStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, + uint32_t aCount, uint32_t* aNumReadOut) +{ + return mInner->ReadSegments(aWriter, aClosure, aCount, aNumReadOut); +} + +NS_IMETHODIMP +ReadStream::IsNonBlocking(bool* aNonBlockingOut) +{ + return mInner->IsNonBlocking(aNonBlockingOut); +} + +} // namespace cache +} // namespace dom +} // namespace mozilla |