summaryrefslogtreecommitdiffstats
path: root/dom/fetch/Fetch.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'dom/fetch/Fetch.cpp')
-rw-r--r--dom/fetch/Fetch.cpp289
1 files changed, 259 insertions, 30 deletions
diff --git a/dom/fetch/Fetch.cpp b/dom/fetch/Fetch.cpp
index f944352e3..191f4cfc3 100644
--- a/dom/fetch/Fetch.cpp
+++ b/dom/fetch/Fetch.cpp
@@ -39,6 +39,7 @@
#include "mozilla/dom/URLSearchParams.h"
#include "mozilla/dom/workers/ServiceWorkerManager.h"
+#include "FetchObserver.h"
#include "InternalRequest.h"
#include "InternalResponse.h"
@@ -52,38 +53,141 @@ namespace dom {
using namespace workers;
+// This class helps the proxying of AbortSignal changes cross threads.
+class AbortSignalProxy final : public AbortSignal::Follower
+{
+ // This is created and released on the main-thread.
+ RefPtr<AbortSignal> mSignalMainThread;
+
+ // This value is used only for the creation of AbortSignal on the
+ // main-thread. They are not updated.
+ const bool mAborted;
+
+ // This runnable propagates changes from the AbortSignal on workers to the
+ // AbortSignal on main-thread.
+ class AbortSignalProxyRunnable final : public Runnable
+ {
+ RefPtr<AbortSignalProxy> mProxy;
+
+ public:
+ explicit AbortSignalProxyRunnable(AbortSignalProxy* aProxy)
+ : mProxy(aProxy)
+ {}
+
+ NS_IMETHOD
+ Run() override
+ {
+ MOZ_ASSERT(NS_IsMainThread());
+ AbortSignal* signal = mProxy->GetOrCreateSignalForMainThread();
+ signal->Abort();
+ return NS_OK;
+ }
+ };
+
+public:
+ NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbortSignalProxy)
+
+ explicit AbortSignalProxy(AbortSignal* aSignal)
+ : mAborted(aSignal->Aborted())
+ {
+ Follow(aSignal);
+ }
+
+ void
+ Aborted() override
+ {
+ RefPtr<AbortSignalProxyRunnable> runnable =
+ new AbortSignalProxyRunnable(this);
+ NS_DispatchToMainThread(runnable);
+ }
+
+ AbortSignal*
+ GetOrCreateSignalForMainThread()
+ {
+ MOZ_ASSERT(NS_IsMainThread());
+ if (!mSignalMainThread) {
+ mSignalMainThread = new AbortSignal(mAborted);
+ }
+ return mSignalMainThread;
+ }
+
+ void
+ Shutdown()
+ {
+ Unfollow();
+ }
+
+private:
+ ~AbortSignalProxy()
+ {
+ NS_ReleaseOnMainThread(mSignalMainThread.forget());
+ }
+};
+
class WorkerFetchResolver final : public FetchDriverObserver
{
friend class MainThreadFetchRunnable;
+ friend class WorkerDataAvailableRunnable;
+ friend class WorkerFetchResponseEndBase;
friend class WorkerFetchResponseEndRunnable;
friend class WorkerFetchResponseRunnable;
RefPtr<PromiseWorkerProxy> mPromiseProxy;
+ RefPtr<AbortSignalProxy> mSignalProxy;
+ RefPtr<FetchObserver> mFetchObserver;
+
public:
// Returns null if worker is shutting down.
static already_AddRefed<WorkerFetchResolver>
- Create(workers::WorkerPrivate* aWorkerPrivate, Promise* aPromise)
+ Create(workers::WorkerPrivate* aWorkerPrivate, Promise* aPromise,
+ AbortSignal* aSignal, FetchObserver* aObserver)
{
MOZ_ASSERT(aWorkerPrivate);
aWorkerPrivate->AssertIsOnWorkerThread();
- RefPtr<PromiseWorkerProxy> proxy = PromiseWorkerProxy::Create(aWorkerPrivate, aPromise);
+ RefPtr<PromiseWorkerProxy> proxy =
+ PromiseWorkerProxy::Create(aWorkerPrivate, aPromise);
if (!proxy) {
return nullptr;
}
- RefPtr<WorkerFetchResolver> r = new WorkerFetchResolver(proxy);
+ RefPtr<AbortSignalProxy> signalProxy;
+ if (aSignal) {
+ signalProxy = new AbortSignalProxy(aSignal);
+ }
+
+ RefPtr<WorkerFetchResolver> r =
+ new WorkerFetchResolver(proxy, signalProxy, aObserver);
return r.forget();
}
+ AbortSignal*
+ GetAbortSignal()
+ {
+ MOZ_ASSERT(NS_IsMainThread());
+
+ if (!mSignalProxy) {
+ return nullptr;
+ }
+
+ return mSignalProxy->GetOrCreateSignalForMainThread();
+ }
+
void
OnResponseAvailableInternal(InternalResponse* aResponse) override;
void
- OnResponseEnd() override;
+ OnResponseEnd(FetchDriverObserver::EndReason eReason) override;
+
+ void
+ OnDataAvailable() override;
private:
- explicit WorkerFetchResolver(PromiseWorkerProxy* aProxy)
+ WorkerFetchResolver(PromiseWorkerProxy* aProxy,
+ AbortSignalProxy* aSignalProxy,
+ FetchObserver* aObserver)
: mPromiseProxy(aProxy)
+ , mSignalProxy(aSignalProxy)
+ , mFetchObserver(aObserver)
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(mPromiseProxy);
@@ -100,12 +204,16 @@ class MainThreadFetchResolver final : public FetchDriverObserver
{
RefPtr<Promise> mPromise;
RefPtr<Response> mResponse;
+ RefPtr<FetchObserver> mFetchObserver;
nsCOMPtr<nsIDocument> mDocument;
NS_DECL_OWNINGTHREAD
public:
- explicit MainThreadFetchResolver(Promise* aPromise);
+ MainThreadFetchResolver(Promise* aPromise, FetchObserver* aObserver)
+ : mPromise(aPromise)
+ , mFetchObserver(aObserver)
+ {}
void
OnResponseAvailableInternal(InternalResponse* aResponse) override;
@@ -115,11 +223,20 @@ public:
mDocument = aDocument;
}
- virtual void OnResponseEnd() override
+ void OnResponseEnd(FetchDriverObserver::EndReason aReason) override
{
+ if (aReason == eAborted) {
+ mPromise->MaybeReject(NS_ERROR_DOM_ABORT_ERR);
+ }
+
+ mFetchObserver = nullptr;
+
FlushConsoleReport();
}
+ void
+ OnDataAvailable() override;
+
private:
~MainThreadFetchResolver();
@@ -170,9 +287,11 @@ public:
fetch->SetWorkerScript(spec);
}
+ RefPtr<AbortSignal> signal = mResolver->GetAbortSignal();
+
// ...but release it before calling Fetch, because mResolver's callback can
// be called synchronously and they want the mutex, too.
- return fetch->Fetch(mResolver);
+ return fetch->Fetch(signal, mResolver);
}
};
@@ -210,6 +329,23 @@ FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput,
RefPtr<InternalRequest> r = request->GetInternalRequest();
+ RefPtr<AbortSignal> signal;
+ if (aInit.mSignal.WasPassed()) {
+ signal = &aInit.mSignal.Value();
+ }
+
+ if (signal && signal->Aborted()) {
+ // An already aborted signal should reject immediately.
+ aRv.Throw(NS_ERROR_DOM_ABORT_ERR);
+ return nullptr;
+ }
+
+ RefPtr<FetchObserver> observer;
+ if (aInit.mObserve.WasPassed()) {
+ observer = new FetchObserver(aGlobal, signal);
+ aInit.mObserve.Value().HandleEvent(*observer);
+ }
+
if (NS_IsMainThread()) {
nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(aGlobal);
nsCOMPtr<nsIDocument> doc;
@@ -236,11 +372,12 @@ FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput,
}
}
- RefPtr<MainThreadFetchResolver> resolver = new MainThreadFetchResolver(p);
+ RefPtr<MainThreadFetchResolver> resolver =
+ new MainThreadFetchResolver(p, observer);
RefPtr<FetchDriver> fetch = new FetchDriver(r, principal, loadGroup);
fetch->SetDocument(doc);
resolver->SetDocument(doc);
- aRv = fetch->Fetch(resolver);
+ aRv = fetch->Fetch(signal, resolver);
if (NS_WARN_IF(aRv.Failed())) {
return nullptr;
}
@@ -252,7 +389,8 @@ FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput,
r->SetSkipServiceWorker();
}
- RefPtr<WorkerFetchResolver> resolver = WorkerFetchResolver::Create(worker, p);
+ RefPtr<WorkerFetchResolver> resolver =
+ WorkerFetchResolver::Create(worker, p, signal, observer);
if (!resolver) {
NS_WARNING("Could not add WorkerFetchResolver workerHolder to worker");
aRv.Throw(NS_ERROR_DOM_ABORT_ERR);
@@ -266,11 +404,6 @@ FetchRequest(nsIGlobalObject* aGlobal, const RequestOrUSVString& aInput,
return p.forget();
}
-MainThreadFetchResolver::MainThreadFetchResolver(Promise* aPromise)
- : mPromise(aPromise)
-{
-}
-
void
MainThreadFetchResolver::OnResponseAvailableInternal(InternalResponse* aResponse)
{
@@ -278,16 +411,39 @@ MainThreadFetchResolver::OnResponseAvailableInternal(InternalResponse* aResponse
AssertIsOnMainThread();
if (aResponse->Type() != ResponseType::Error) {
+ if (mFetchObserver) {
+ mFetchObserver->SetState(FetchState::Complete);
+ }
+
nsCOMPtr<nsIGlobalObject> go = mPromise->GetParentObject();
mResponse = new Response(go, aResponse);
mPromise->MaybeResolve(mResponse);
} else {
+ if (mFetchObserver) {
+ mFetchObserver->SetState(FetchState::Errored);
+ }
+
ErrorResult result;
result.ThrowTypeError<MSG_FETCH_FAILED>();
mPromise->MaybeReject(result);
}
}
+void
+MainThreadFetchResolver::OnDataAvailable()
+{
+ NS_ASSERT_OWNINGTHREAD(MainThreadFetchResolver);
+ AssertIsOnMainThread();
+
+ if (!mFetchObserver) {
+ return;
+ }
+
+ if (mFetchObserver->State() == FetchState::Requesting) {
+ mFetchObserver->SetState(FetchState::Responding);
+ }
+}
+
MainThreadFetchResolver::~MainThreadFetchResolver()
{
NS_ASSERT_OWNINGTHREAD(MainThreadFetchResolver);
@@ -306,6 +462,7 @@ public:
, mResolver(aResolver)
, mInternalResponse(aResponse)
{
+ MOZ_ASSERT(mResolver);
}
bool
@@ -317,10 +474,18 @@ public:
RefPtr<Promise> promise = mResolver->mPromiseProxy->WorkerPromise();
if (mInternalResponse->Type() != ResponseType::Error) {
+ if (mResolver->mFetchObserver) {
+ mResolver->mFetchObserver->SetState(FetchState::Complete);
+ }
+
RefPtr<nsIGlobalObject> global = aWorkerPrivate->GlobalScope();
RefPtr<Response> response = new Response(global, mInternalResponse);
promise->MaybeResolve(response);
} else {
+ if (mResolver->mFetchObserver) {
+ mResolver->mFetchObserver->SetState(FetchState::Errored);
+ }
+
ErrorResult result;
result.ThrowTypeError<MSG_FETCH_FAILED>();
promise->MaybeReject(result);
@@ -329,14 +494,42 @@ public:
}
};
+class WorkerDataAvailableRunnable final : public MainThreadWorkerRunnable
+{
+ RefPtr<WorkerFetchResolver> mResolver;
+public:
+ WorkerDataAvailableRunnable(WorkerPrivate* aWorkerPrivate,
+ WorkerFetchResolver* aResolver)
+ : MainThreadWorkerRunnable(aWorkerPrivate)
+ , mResolver(aResolver)
+ {
+ }
+
+ bool
+ WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
+ {
+ MOZ_ASSERT(aWorkerPrivate);
+ aWorkerPrivate->AssertIsOnWorkerThread();
+
+ if (mResolver->mFetchObserver &&
+ mResolver->mFetchObserver->State() == FetchState::Requesting) {
+ mResolver->mFetchObserver->SetState(FetchState::Responding);
+ }
+
+ return true;
+ }
+};
+
class WorkerFetchResponseEndBase
{
- RefPtr<PromiseWorkerProxy> mPromiseProxy;
+protected:
+ RefPtr<WorkerFetchResolver> mResolver;
+
public:
- explicit WorkerFetchResponseEndBase(PromiseWorkerProxy* aPromiseProxy)
- : mPromiseProxy(aPromiseProxy)
+ explicit WorkerFetchResponseEndBase(WorkerFetchResolver* aResolver)
+ : mResolver(aResolver)
{
- MOZ_ASSERT(mPromiseProxy);
+ MOZ_ASSERT(aResolver);
}
void
@@ -344,23 +537,41 @@ public:
{
MOZ_ASSERT(aWorkerPrivate);
aWorkerPrivate->AssertIsOnWorkerThread();
- mPromiseProxy->CleanUp();
+
+ mResolver->mPromiseProxy->CleanUp();
+
+ mResolver->mFetchObserver = nullptr;
+
+ if (mResolver->mSignalProxy) {
+ mResolver->mSignalProxy->Shutdown();
+ mResolver->mSignalProxy = nullptr;
+ }
}
};
class WorkerFetchResponseEndRunnable final : public MainThreadWorkerRunnable
, public WorkerFetchResponseEndBase
{
+ FetchDriverObserver::EndReason mReason;
+
public:
- explicit WorkerFetchResponseEndRunnable(PromiseWorkerProxy* aPromiseProxy)
- : MainThreadWorkerRunnable(aPromiseProxy->GetWorkerPrivate())
- , WorkerFetchResponseEndBase(aPromiseProxy)
+ WorkerFetchResponseEndRunnable(WorkerPrivate* aWorkerPrivate,
+ WorkerFetchResolver* aResolver,
+ FetchDriverObserver::EndReason aReason)
+ : MainThreadWorkerRunnable(aWorkerPrivate)
+ , WorkerFetchResponseEndBase(aResolver)
+ , mReason(aReason)
{
}
bool
WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
{
+ if (mReason == FetchDriverObserver::eAborted) {
+ RefPtr<Promise> promise = mResolver->mPromiseProxy->WorkerPromise();
+ promise->MaybeReject(NS_ERROR_DOM_ABORT_ERR);
+ }
+
WorkerRunInternal(aWorkerPrivate);
return true;
}
@@ -379,9 +590,10 @@ class WorkerFetchResponseEndControlRunnable final : public MainThreadWorkerContr
, public WorkerFetchResponseEndBase
{
public:
- explicit WorkerFetchResponseEndControlRunnable(PromiseWorkerProxy* aPromiseProxy)
- : MainThreadWorkerControlRunnable(aPromiseProxy->GetWorkerPrivate())
- , WorkerFetchResponseEndBase(aPromiseProxy)
+ WorkerFetchResponseEndControlRunnable(WorkerPrivate* aWorkerPrivate,
+ WorkerFetchResolver* aResolver)
+ : MainThreadWorkerControlRunnable(aWorkerPrivate)
+ , WorkerFetchResponseEndBase(aResolver)
{
}
@@ -415,7 +627,22 @@ WorkerFetchResolver::OnResponseAvailableInternal(InternalResponse* aResponse)
}
void
-WorkerFetchResolver::OnResponseEnd()
+WorkerFetchResolver::OnDataAvailable()
+{
+ AssertIsOnMainThread();
+
+ MutexAutoLock lock(mPromiseProxy->Lock());
+ if (mPromiseProxy->CleanedUp()) {
+ return;
+ }
+
+ RefPtr<WorkerDataAvailableRunnable> r =
+ new WorkerDataAvailableRunnable(mPromiseProxy->GetWorkerPrivate(), this);
+ Unused << r->Dispatch();
+}
+
+void
+WorkerFetchResolver::OnResponseEnd(FetchDriverObserver::EndReason aReason)
{
AssertIsOnMainThread();
MutexAutoLock lock(mPromiseProxy->Lock());
@@ -426,11 +653,13 @@ WorkerFetchResolver::OnResponseEnd()
FlushConsoleReport();
RefPtr<WorkerFetchResponseEndRunnable> r =
- new WorkerFetchResponseEndRunnable(mPromiseProxy);
+ new WorkerFetchResponseEndRunnable(mPromiseProxy->GetWorkerPrivate(),
+ this, aReason);
if (!r->Dispatch()) {
RefPtr<WorkerFetchResponseEndControlRunnable> cr =
- new WorkerFetchResponseEndControlRunnable(mPromiseProxy);
+ new WorkerFetchResponseEndControlRunnable(mPromiseProxy->GetWorkerPrivate(),
+ this);
// This can fail if the worker thread is canceled or killed causing
// the PromiseWorkerProxy to give up its WorkerHolder immediately,
// allowing the worker thread to become Dead.