/* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*-
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "JobScheduler.h"
#include "Logging.h"

namespace mozilla {
namespace gfx {

JobScheduler* JobScheduler::sSingleton = nullptr;

bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues)
{
  MOZ_ASSERT(!sSingleton);
  MOZ_ASSERT(aNumThreads >= aNumQueues);

  sSingleton = new JobScheduler();
  sSingleton->mNextQueue = 0;

  for (uint32_t i = 0; i < aNumQueues; ++i) {
    sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue());
  }

  for (uint32_t i = 0; i < aNumThreads; ++i) {
    sSingleton->mWorkerThreads.push_back(WorkerThread::Create(sSingleton->mDrawingQueues[i%aNumQueues]));
  }
  return true;
}

void JobScheduler::ShutDown()
{
  MOZ_ASSERT(IsEnabled());
  if (!IsEnabled()) {
    return;
  }

  for (auto queue : sSingleton->mDrawingQueues) {
    queue->ShutDown();
    delete queue;
  }

  for (WorkerThread* thread : sSingleton->mWorkerThreads) {
    // this will block until the thread is joined.
    delete thread;
  }

  sSingleton->mWorkerThreads.clear();
  delete sSingleton;
  sSingleton = nullptr;
}

JobStatus
JobScheduler::ProcessJob(Job* aJob)
{
  MOZ_ASSERT(aJob);
  auto status = aJob->Run();
  if (status == JobStatus::Error || status == JobStatus::Complete) {
    delete aJob;
  }
  return status;
}

void
JobScheduler::SubmitJob(Job* aJob)
{
  MOZ_ASSERT(aJob);
  RefPtr<SyncObject> start = aJob->GetStartSync();
  if (start && start->Register(aJob)) {
    // The Job buffer starts with a non-signaled sync object, it
    // is now registered in the list of task buffers waiting on the
    // sync object, so we should not place it in the queue.
    return;
  }

  GetQueueForJob(aJob)->SubmitJob(aJob);
}

void
JobScheduler::Join(SyncObject* aCompletion)
{
  RefPtr<EventObject> waitForCompletion = new EventObject();
  JobScheduler::SubmitJob(new SetEventJob(waitForCompletion, aCompletion));
  waitForCompletion->Wait();
}

MultiThreadedJobQueue*
JobScheduler::GetQueueForJob(Job* aJob)
{
  return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue()
                                    : GetDrawingQueue();
}

Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread)
: mNextWaitingJob(nullptr)
, mStartSync(aStart)
, mCompletionSync(aCompletion)
, mPinToThread(aThread)
{
  if (mStartSync) {
    mStartSync->AddSubsequent(this);
  }
  if (mCompletionSync) {
    mCompletionSync->AddPrerequisite(this);
  }
}

Job::~Job()
{
  if (mCompletionSync) {
    //printf(" -- Job %p dtor completion %p\n", this, mCompletionSync);
    mCompletionSync->Signal();
    mCompletionSync = nullptr;
  }
}

JobStatus
SetEventJob::Run()
{
  mEvent->Set();
  return JobStatus::Complete;
}

SetEventJob::SetEventJob(EventObject* aEvent,
                           SyncObject* aStart, SyncObject* aCompletion,
                           WorkerThread* aWorker)
: Job(aStart, aCompletion, aWorker)
, mEvent(aEvent)
{}

SetEventJob::~SetEventJob()
{}

SyncObject::SyncObject(uint32_t aNumPrerequisites)
: mSignals(aNumPrerequisites)
, mFirstWaitingJob(nullptr)
#ifdef DEBUG
, mNumPrerequisites(aNumPrerequisites)
, mAddedPrerequisites(0)
#endif
{}

SyncObject::~SyncObject()
{
  MOZ_ASSERT(mFirstWaitingJob == nullptr);
}

bool
SyncObject::Register(Job* aJob)
{
  MOZ_ASSERT(aJob);

  // For now, ensure that when we schedule the first subsequent, we have already
  // created all of the prerequisites. This is an arbitrary restriction because
  // we specify the number of prerequisites in the constructor, but in the typical
  // scenario, if the assertion FreezePrerequisite blows up here it probably means
  // we got the initial nmber of prerequisites wrong. We can decide to remove
  // this restriction if needed.
  FreezePrerequisites();

  int32_t signals = mSignals;

  if (signals > 0) {
    AddWaitingJob(aJob);
    // Since Register and Signal can be called concurrently, it can happen that
    // reading mSignals in Register happens before decrementing mSignals in Signal,
    // but SubmitWaitingJobs happens before AddWaitingJob. This ordering means
    // the SyncObject ends up in the signaled state with a task sitting in the
    // waiting list. To prevent that we check mSignals a second time and submit
    // again if signals reached zero in the mean time.
    // We do this instead of holding a mutex around mSignals+mJobs to reduce
    // lock contention.
    int32_t signals2 = mSignals;
    if (signals2 == 0) {
      SubmitWaitingJobs();
    }
    return true;
  }

  return false;
}

void
SyncObject::Signal()
{
  int32_t signals = --mSignals;
  MOZ_ASSERT(signals >= 0);

  if (signals == 0) {
    SubmitWaitingJobs();
  }
}

void
SyncObject::AddWaitingJob(Job* aJob)
{
  // Push (using atomics) the task into the list of waiting tasks.
  for (;;) {
    Job* first = mFirstWaitingJob;
    aJob->mNextWaitingJob = first;
    if (mFirstWaitingJob.compareExchange(first, aJob)) {
      break;
    }
  }
}

void SyncObject::SubmitWaitingJobs()
{
  // Scheduling the tasks can cause code that modifies <this>'s reference
  // count to run concurrently, and cause the caller of this function to
  // be owned by another thread. We need to make sure the reference count
  // does not reach 0 on another thread before the end of this method, so
  // hold a strong ref to prevent that!
  RefPtr<SyncObject> kungFuDeathGrip(this);

  // First atomically swap mFirstWaitingJob and waitingJobs...
  Job* waitingJobs = nullptr;
  for (;;) {
    waitingJobs = mFirstWaitingJob;
    if (mFirstWaitingJob.compareExchange(waitingJobs, nullptr)) {
      break;
    }
  }

  // ... and submit all of the waiting tasks in waitingJob now that they belong
  // to this thread.
  while (waitingJobs) {
    Job* next = waitingJobs->mNextWaitingJob;
    waitingJobs->mNextWaitingJob = nullptr;
    JobScheduler::GetQueueForJob(waitingJobs)->SubmitJob(waitingJobs);
    waitingJobs = next;
  }
}

bool
SyncObject::IsSignaled()
{
  return mSignals == 0;
}

void
SyncObject::FreezePrerequisites()
{
  MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites);
}

void
SyncObject::AddPrerequisite(Job* aJob)
{
  MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites);
}

void
SyncObject::AddSubsequent(Job* aJob)
{
}

WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
: mQueue(aJobQueue)
{
  aJobQueue->RegisterThread();
}

void
WorkerThread::Run()
{
  SetName("gfx worker");

  for (;;) {
    Job* commands = nullptr;
    if (!mQueue->WaitForJob(commands)) {
      mQueue->UnregisterThread();
      return;
    }

    JobStatus status = JobScheduler::ProcessJob(commands);

    if (status == JobStatus::Error) {
      // Don't try to handle errors for now, but that's open to discussions.
      // I expect errors to be mostly OOM issues.
      gfxDevCrash(LogReason::JobStatusError) << "Invalid job status " << (int)status;
    }
  }
}

} //namespace
} //namespace