From 5f8de423f190bbb79a62f804151bc24824fa32d8 Mon Sep 17 00:00:00 2001 From: "Matt A. Tobin" Date: Fri, 2 Feb 2018 04:16:08 -0500 Subject: Add m-esr52 at 52.6.0 --- gfx/2d/JobScheduler.cpp | 288 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 288 insertions(+) create mode 100644 gfx/2d/JobScheduler.cpp (limited to 'gfx/2d/JobScheduler.cpp') diff --git a/gfx/2d/JobScheduler.cpp b/gfx/2d/JobScheduler.cpp new file mode 100644 index 000000000..2c687cde0 --- /dev/null +++ b/gfx/2d/JobScheduler.cpp @@ -0,0 +1,288 @@ +/* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*- + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "JobScheduler.h" +#include "Logging.h" + +namespace mozilla { +namespace gfx { + +JobScheduler* JobScheduler::sSingleton = nullptr; + +bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues) +{ + MOZ_ASSERT(!sSingleton); + MOZ_ASSERT(aNumThreads >= aNumQueues); + + sSingleton = new JobScheduler(); + sSingleton->mNextQueue = 0; + + for (uint32_t i = 0; i < aNumQueues; ++i) { + sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue()); + } + + for (uint32_t i = 0; i < aNumThreads; ++i) { + sSingleton->mWorkerThreads.push_back(WorkerThread::Create(sSingleton->mDrawingQueues[i%aNumQueues])); + } + return true; +} + +void JobScheduler::ShutDown() +{ + MOZ_ASSERT(IsEnabled()); + if (!IsEnabled()) { + return; + } + + for (auto queue : sSingleton->mDrawingQueues) { + queue->ShutDown(); + delete queue; + } + + for (WorkerThread* thread : sSingleton->mWorkerThreads) { + // this will block until the thread is joined. + delete thread; + } + + sSingleton->mWorkerThreads.clear(); + delete sSingleton; + sSingleton = nullptr; +} + +JobStatus +JobScheduler::ProcessJob(Job* aJob) +{ + MOZ_ASSERT(aJob); + auto status = aJob->Run(); + if (status == JobStatus::Error || status == JobStatus::Complete) { + delete aJob; + } + return status; +} + +void +JobScheduler::SubmitJob(Job* aJob) +{ + MOZ_ASSERT(aJob); + RefPtr start = aJob->GetStartSync(); + if (start && start->Register(aJob)) { + // The Job buffer starts with a non-signaled sync object, it + // is now registered in the list of task buffers waiting on the + // sync object, so we should not place it in the queue. + return; + } + + GetQueueForJob(aJob)->SubmitJob(aJob); +} + +void +JobScheduler::Join(SyncObject* aCompletion) +{ + RefPtr waitForCompletion = new EventObject(); + JobScheduler::SubmitJob(new SetEventJob(waitForCompletion, aCompletion)); + waitForCompletion->Wait(); +} + +MultiThreadedJobQueue* +JobScheduler::GetQueueForJob(Job* aJob) +{ + return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue() + : GetDrawingQueue(); +} + +Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread) +: mNextWaitingJob(nullptr) +, mStartSync(aStart) +, mCompletionSync(aCompletion) +, mPinToThread(aThread) +{ + if (mStartSync) { + mStartSync->AddSubsequent(this); + } + if (mCompletionSync) { + mCompletionSync->AddPrerequisite(this); + } +} + +Job::~Job() +{ + if (mCompletionSync) { + //printf(" -- Job %p dtor completion %p\n", this, mCompletionSync); + mCompletionSync->Signal(); + mCompletionSync = nullptr; + } +} + +JobStatus +SetEventJob::Run() +{ + mEvent->Set(); + return JobStatus::Complete; +} + +SetEventJob::SetEventJob(EventObject* aEvent, + SyncObject* aStart, SyncObject* aCompletion, + WorkerThread* aWorker) +: Job(aStart, aCompletion, aWorker) +, mEvent(aEvent) +{} + +SetEventJob::~SetEventJob() +{} + +SyncObject::SyncObject(uint32_t aNumPrerequisites) +: mSignals(aNumPrerequisites) +, mFirstWaitingJob(nullptr) +#ifdef DEBUG +, mNumPrerequisites(aNumPrerequisites) +, mAddedPrerequisites(0) +#endif +{} + +SyncObject::~SyncObject() +{ + MOZ_ASSERT(mFirstWaitingJob == nullptr); +} + +bool +SyncObject::Register(Job* aJob) +{ + MOZ_ASSERT(aJob); + + // For now, ensure that when we schedule the first subsequent, we have already + // created all of the prerequisites. This is an arbitrary restriction because + // we specify the number of prerequisites in the constructor, but in the typical + // scenario, if the assertion FreezePrerequisite blows up here it probably means + // we got the initial nmber of prerequisites wrong. We can decide to remove + // this restriction if needed. + FreezePrerequisites(); + + int32_t signals = mSignals; + + if (signals > 0) { + AddWaitingJob(aJob); + // Since Register and Signal can be called concurrently, it can happen that + // reading mSignals in Register happens before decrementing mSignals in Signal, + // but SubmitWaitingJobs happens before AddWaitingJob. This ordering means + // the SyncObject ends up in the signaled state with a task sitting in the + // waiting list. To prevent that we check mSignals a second time and submit + // again if signals reached zero in the mean time. + // We do this instead of holding a mutex around mSignals+mJobs to reduce + // lock contention. + int32_t signals2 = mSignals; + if (signals2 == 0) { + SubmitWaitingJobs(); + } + return true; + } + + return false; +} + +void +SyncObject::Signal() +{ + int32_t signals = --mSignals; + MOZ_ASSERT(signals >= 0); + + if (signals == 0) { + SubmitWaitingJobs(); + } +} + +void +SyncObject::AddWaitingJob(Job* aJob) +{ + // Push (using atomics) the task into the list of waiting tasks. + for (;;) { + Job* first = mFirstWaitingJob; + aJob->mNextWaitingJob = first; + if (mFirstWaitingJob.compareExchange(first, aJob)) { + break; + } + } +} + +void SyncObject::SubmitWaitingJobs() +{ + // Scheduling the tasks can cause code that modifies 's reference + // count to run concurrently, and cause the caller of this function to + // be owned by another thread. We need to make sure the reference count + // does not reach 0 on another thread before the end of this method, so + // hold a strong ref to prevent that! + RefPtr kungFuDeathGrip(this); + + // First atomically swap mFirstWaitingJob and waitingJobs... + Job* waitingJobs = nullptr; + for (;;) { + waitingJobs = mFirstWaitingJob; + if (mFirstWaitingJob.compareExchange(waitingJobs, nullptr)) { + break; + } + } + + // ... and submit all of the waiting tasks in waitingJob now that they belong + // to this thread. + while (waitingJobs) { + Job* next = waitingJobs->mNextWaitingJob; + waitingJobs->mNextWaitingJob = nullptr; + JobScheduler::GetQueueForJob(waitingJobs)->SubmitJob(waitingJobs); + waitingJobs = next; + } +} + +bool +SyncObject::IsSignaled() +{ + return mSignals == 0; +} + +void +SyncObject::FreezePrerequisites() +{ + MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites); +} + +void +SyncObject::AddPrerequisite(Job* aJob) +{ + MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites); +} + +void +SyncObject::AddSubsequent(Job* aJob) +{ +} + +WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue) +: mQueue(aJobQueue) +{ + aJobQueue->RegisterThread(); +} + +void +WorkerThread::Run() +{ + SetName("gfx worker"); + + for (;;) { + Job* commands = nullptr; + if (!mQueue->WaitForJob(commands)) { + mQueue->UnregisterThread(); + return; + } + + JobStatus status = JobScheduler::ProcessJob(commands); + + if (status == JobStatus::Error) { + // Don't try to handle errors for now, but that's open to discussions. + // I expect errors to be mostly OOM issues. + gfxDevCrash(LogReason::JobStatusError) << "Invalid job status " << (int)status; + } + } +} + +} //namespace +} //namespace -- cgit v1.2.3