/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "mozilla/dom/cache/ReadStream.h"

#include "mozilla/Unused.h"
#include "mozilla/dom/cache/CacheStreamControlChild.h"
#include "mozilla/dom/cache/CacheStreamControlParent.h"
#include "mozilla/dom/cache/CacheTypes.h"
#include "mozilla/ipc/IPCStreamUtils.h"
#include "mozilla/SnappyUncompressInputStream.h"
#include "nsIAsyncInputStream.h"
#include "nsTArray.h"

namespace mozilla {
namespace dom {
namespace cache {

using mozilla::Unused;
using mozilla::ipc::AutoIPCStream;
using mozilla::ipc::IPCStream;

// ----------------------------------------------------------------------------

// The inner stream class.  This is where all of the real work is done.  As
// an invariant Inner::Close() must be called before ~Inner().  This is
// guaranteed by our outer ReadStream class.
class ReadStream::Inner final : public ReadStream::Controllable
{
public:
  Inner(StreamControl* aControl, const nsID& aId,
        nsIInputStream* aStream);

  void
  Serialize(CacheReadStreamOrVoid* aReadStreamOut,
            nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
            ErrorResult& aRv);

  void
  Serialize(CacheReadStream* aReadStreamOut,
            nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
            ErrorResult& aRv);

  // ReadStream::Controllable methods
  virtual void
  CloseStream() override;

  virtual void
  CloseStreamWithoutReporting() override;

  virtual bool
  MatchId(const nsID& aId) const override;

  virtual bool
  HasEverBeenRead() const override;

  // Simulate nsIInputStream methods, but we don't actually inherit from it
  nsresult
  Close();

  nsresult
  Available(uint64_t *aNumAvailableOut);

  nsresult
  Read(char *aBuf, uint32_t aCount, uint32_t *aNumReadOut);

  nsresult
  ReadSegments(nsWriteSegmentFun aWriter, void *aClosure, uint32_t aCount,
               uint32_t *aNumReadOut);

  nsresult
  IsNonBlocking(bool *aNonBlockingOut);

private:
  class NoteClosedRunnable;
  class ForgetRunnable;

  ~Inner();

  void
  NoteClosed();

  void
  Forget();

  void
  NoteClosedOnOwningThread();

  void
  ForgetOnOwningThread();

  // Weak ref to the stream control actor.  The actor will always call either
  // CloseStream() or CloseStreamWithoutReporting() before it's destroyed.  The
  // weak ref is cleared in the resulting NoteClosedOnOwningThread() or
  // ForgetOnOwningThread() method call.
  StreamControl* mControl;

  const nsID mId;
  nsCOMPtr<nsIThread> mOwningThread;

  enum State
  {
    Open,
    Closed,
    NumStates
  };
  Atomic<State> mState;
  Atomic<bool> mHasEverBeenRead;


  // The wrapped stream objects may not be threadsafe.  We need to be able
  // to close a stream on our owning thread while an IO thread is simultaneously
  // reading the same stream.  Therefore, protect all access to these stream
  // objects with a mutex.
  Mutex mMutex;
  nsCOMPtr<nsIInputStream> mStream;
  nsCOMPtr<nsIInputStream> mSnappyStream;

  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(cache::ReadStream::Inner, override)
};

// ----------------------------------------------------------------------------

// Runnable to notify actors that the ReadStream has closed.  This must
// be done on the thread associated with the PBackground actor.  Must be
// cancelable to execute on Worker threads (which can occur when the
// ReadStream is constructed on a child process Worker thread).
class ReadStream::Inner::NoteClosedRunnable final : public CancelableRunnable
{
public:
  explicit NoteClosedRunnable(ReadStream::Inner* aStream)
    : mStream(aStream)
  { }

  NS_IMETHOD Run() override
  {
    mStream->NoteClosedOnOwningThread();
    mStream = nullptr;
    return NS_OK;
  }

  // Note, we must proceed with the Run() method since our actor will not
  // clean itself up until we note that the stream is closed.
  nsresult Cancel() override
  {
    Run();
    return NS_OK;
  }

private:
  ~NoteClosedRunnable() { }

  RefPtr<ReadStream::Inner> mStream;
};

// ----------------------------------------------------------------------------

// Runnable to clear actors without reporting that the ReadStream has
// closed.  Since this can trigger actor destruction, we need to do
// it on the thread associated with the PBackground actor.  Must be
// cancelable to execute on Worker threads (which can occur when the
// ReadStream is constructed on a child process Worker thread).
class ReadStream::Inner::ForgetRunnable final : public CancelableRunnable
{
public:
  explicit ForgetRunnable(ReadStream::Inner* aStream)
    : mStream(aStream)
  { }

  NS_IMETHOD Run() override
  {
    mStream->ForgetOnOwningThread();
    mStream = nullptr;
    return NS_OK;
  }

  // Note, we must proceed with the Run() method so that we properly
  // call RemoveListener on the actor.
  nsresult Cancel() override
  {
    Run();
    return NS_OK;
  }

private:
  ~ForgetRunnable() { }

  RefPtr<ReadStream::Inner> mStream;
};

// ----------------------------------------------------------------------------

ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId,
                         nsIInputStream* aStream)
  : mControl(aControl)
  , mId(aId)
  , mOwningThread(NS_GetCurrentThread())
  , mState(Open)
  , mHasEverBeenRead(false)
  , mMutex("dom::cache::ReadStream")
  , mStream(aStream)
  , mSnappyStream(new SnappyUncompressInputStream(aStream))
{
  MOZ_DIAGNOSTIC_ASSERT(mStream);
  MOZ_DIAGNOSTIC_ASSERT(mControl);
  mControl->AddReadStream(this);
}

void
ReadStream::Inner::Serialize(CacheReadStreamOrVoid* aReadStreamOut,
                             nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
                             ErrorResult& aRv)
{
  MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
  MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);
  *aReadStreamOut = CacheReadStream();
  Serialize(&aReadStreamOut->get_CacheReadStream(), aStreamCleanupList, aRv);
}

void
ReadStream::Inner::Serialize(CacheReadStream* aReadStreamOut,
                             nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
                             ErrorResult& aRv)
{
  MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
  MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);

  if (mState != Open) {
    aRv.ThrowTypeError<MSG_CACHE_STREAM_CLOSED>();
    return;
  }

  MOZ_DIAGNOSTIC_ASSERT(mControl);

  aReadStreamOut->id() = mId;
  mControl->SerializeControl(aReadStreamOut);

  {
    MutexAutoLock lock(mMutex);
    mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList);
  }

  MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut->stream().type() ==
                        IPCStream::TInputStreamParamsWithFds);

  // We're passing ownership across the IPC barrier with the control, so
  // do not signal that the stream is closed here.
  Forget();
}

void
ReadStream::Inner::CloseStream()
{
  MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
  Close();
}

void
ReadStream::Inner::CloseStreamWithoutReporting()
{
  MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
  Forget();
}

bool
ReadStream::Inner::MatchId(const nsID& aId) const
{
  MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
  return mId.Equals(aId);
}

bool
ReadStream::Inner::HasEverBeenRead() const
{
  MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
  return mHasEverBeenRead;
}

nsresult
ReadStream::Inner::Close()
{
  // stream ops can happen on any thread
  nsresult rv = NS_OK;
  {
    MutexAutoLock lock(mMutex);
    rv = mSnappyStream->Close();
  }
  NoteClosed();
  return rv;
}

nsresult
ReadStream::Inner::Available(uint64_t* aNumAvailableOut)
{
  // stream ops can happen on any thread
  nsresult rv = NS_OK;
  {
    MutexAutoLock lock(mMutex);
    rv = mSnappyStream->Available(aNumAvailableOut);
  }

  if (NS_FAILED(rv)) {
    Close();
  }

  return rv;
}

nsresult
ReadStream::Inner::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
{
  // stream ops can happen on any thread
  MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);

  nsresult rv = NS_OK;
  {
    MutexAutoLock lock(mMutex);
    rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut);
  }

  if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) ||
      *aNumReadOut == 0) {
    Close();
  }

  mHasEverBeenRead = true;

  return rv;
}

nsresult
ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
                                uint32_t aCount, uint32_t* aNumReadOut)
{
  // stream ops can happen on any thread
  MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);

  if (aCount) {
    mHasEverBeenRead = true;
  }


  nsresult rv = NS_OK;
  {
    MutexAutoLock lock(mMutex);
    rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
  }

  if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK &&
                        rv != NS_ERROR_NOT_IMPLEMENTED) || *aNumReadOut == 0) {
    Close();
  }

  // Verify bytes were actually read before marking as being ever read.  For
  // example, code can test if the stream supports ReadSegments() by calling
  // this method with a dummy callback which doesn't read anything.  We don't
  // want to trigger on that.
  if (*aNumReadOut) {
    mHasEverBeenRead = true;
  }

  return rv;
}

nsresult
ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut)
{
  // stream ops can happen on any thread
  MutexAutoLock lock(mMutex);
  return mSnappyStream->IsNonBlocking(aNonBlockingOut);
}

ReadStream::Inner::~Inner()
{
  // Any thread
  MOZ_DIAGNOSTIC_ASSERT(mState == Closed);
  MOZ_DIAGNOSTIC_ASSERT(!mControl);
}

void
ReadStream::Inner::NoteClosed()
{
  // Any thread
  if (mState == Closed) {
    return;
  }

  if (NS_GetCurrentThread() == mOwningThread) {
    NoteClosedOnOwningThread();
    return;
  }

  nsCOMPtr<nsIRunnable> runnable = new NoteClosedRunnable(this);
  MOZ_ALWAYS_SUCCEEDS(
    mOwningThread->Dispatch(runnable, nsIThread::DISPATCH_NORMAL));
}

void
ReadStream::Inner::Forget()
{
  // Any thread
  if (mState == Closed) {
    return;
  }

  if (NS_GetCurrentThread() == mOwningThread) {
    ForgetOnOwningThread();
    return;
  }

  nsCOMPtr<nsIRunnable> runnable = new ForgetRunnable(this);
  MOZ_ALWAYS_SUCCEEDS(
    mOwningThread->Dispatch(runnable, nsIThread::DISPATCH_NORMAL));
}

void
ReadStream::Inner::NoteClosedOnOwningThread()
{
  MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);

  // Mark closed and do nothing if we were already closed
  if (!mState.compareExchange(Open, Closed)) {
    return;
  }

  MOZ_DIAGNOSTIC_ASSERT(mControl);
  mControl->NoteClosed(this, mId);
  mControl = nullptr;
}

void
ReadStream::Inner::ForgetOnOwningThread()
{
  MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);

  // Mark closed and do nothing if we were already closed
  if (!mState.compareExchange(Open, Closed)) {
    return;
  }

  MOZ_DIAGNOSTIC_ASSERT(mControl);
  mControl->ForgetReadStream(this);
  mControl = nullptr;
}

// ----------------------------------------------------------------------------

NS_IMPL_ISUPPORTS(cache::ReadStream, nsIInputStream, ReadStream);

// static
already_AddRefed<ReadStream>
ReadStream::Create(const CacheReadStreamOrVoid& aReadStreamOrVoid)
{
  if (aReadStreamOrVoid.type() == CacheReadStreamOrVoid::Tvoid_t) {
    return nullptr;
  }

  return Create(aReadStreamOrVoid.get_CacheReadStream());
}

// static
already_AddRefed<ReadStream>
ReadStream::Create(const CacheReadStream& aReadStream)
{
  // The parameter may or may not be for a Cache created stream.  The way we
  // tell is by looking at the stream control actor.  If the actor exists,
  // then we know the Cache created it.
  if (!aReadStream.controlChild() && !aReadStream.controlParent()) {
    return nullptr;
  }

  MOZ_DIAGNOSTIC_ASSERT(aReadStream.stream().type() ==
                        IPCStream::TInputStreamParamsWithFds);

  // Control is guaranteed to survive this method as ActorDestroy() cannot
  // run on this thread until we complete.
  StreamControl* control;
  if (aReadStream.controlChild()) {
    auto actor = static_cast<CacheStreamControlChild*>(aReadStream.controlChild());
    control = actor;
  } else {
    auto actor = static_cast<CacheStreamControlParent*>(aReadStream.controlParent());
    control = actor;
  }
  MOZ_DIAGNOSTIC_ASSERT(control);

  nsCOMPtr<nsIInputStream> stream = DeserializeIPCStream(aReadStream.stream());
  MOZ_DIAGNOSTIC_ASSERT(stream);

  // Currently we expect all cache read streams to be blocking file streams.
#if !defined(RELEASE_OR_BETA)
  nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(stream);
  MOZ_DIAGNOSTIC_ASSERT(!asyncStream);
#endif

  RefPtr<Inner> inner = new Inner(control, aReadStream.id(), stream);
  RefPtr<ReadStream> ref = new ReadStream(inner);
  return ref.forget();
}

// static
already_AddRefed<ReadStream>
ReadStream::Create(PCacheStreamControlParent* aControl, const nsID& aId,
                   nsIInputStream* aStream)
{
  MOZ_DIAGNOSTIC_ASSERT(aControl);
  auto actor = static_cast<CacheStreamControlParent*>(aControl);
  RefPtr<Inner> inner = new Inner(actor, aId, aStream);
  RefPtr<ReadStream> ref = new ReadStream(inner);
  return ref.forget();
}

void
ReadStream::Serialize(CacheReadStreamOrVoid* aReadStreamOut,
                      nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
                      ErrorResult& aRv)
{
  mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv);
}

void
ReadStream::Serialize(CacheReadStream* aReadStreamOut,
                      nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
                      ErrorResult& aRv)
{
  mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv);
}

ReadStream::ReadStream(ReadStream::Inner* aInner)
  : mInner(aInner)
{
  MOZ_DIAGNOSTIC_ASSERT(mInner);
}

ReadStream::~ReadStream()
{
  // Explicitly close the inner stream so that it does not have to
  // deal with implicitly closing at destruction time.
  mInner->Close();
}

NS_IMETHODIMP
ReadStream::Close()
{
  return mInner->Close();
}

NS_IMETHODIMP
ReadStream::Available(uint64_t* aNumAvailableOut)
{
  return mInner->Available(aNumAvailableOut);
}

NS_IMETHODIMP
ReadStream::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
{
  return mInner->Read(aBuf, aCount, aNumReadOut);
}

NS_IMETHODIMP
ReadStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
                         uint32_t aCount, uint32_t* aNumReadOut)
{
  return mInner->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
}

NS_IMETHODIMP
ReadStream::IsNonBlocking(bool* aNonBlockingOut)
{
  return mInner->IsNonBlocking(aNonBlockingOut);
}

} // namespace cache
} // namespace dom
} // namespace mozilla