/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim:set ts=2 sw=2 sts=2 et cindent: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "ScriptProcessorNode.h" #include "mozilla/dom/ScriptProcessorNodeBinding.h" #include "AudioBuffer.h" #include "AudioDestinationNode.h" #include "AudioNodeEngine.h" #include "AudioNodeStream.h" #include "AudioProcessingEvent.h" #include "WebAudioUtils.h" #include "mozilla/dom/ScriptSettings.h" #include "mozilla/Mutex.h" #include "mozilla/PodOperations.h" #include "nsAutoPtr.h" #include <deque> namespace mozilla { namespace dom { // The maximum latency, in seconds, that we can live with before dropping // buffers. static const float MAX_LATENCY_S = 0.5; NS_IMPL_ISUPPORTS_INHERITED0(ScriptProcessorNode, AudioNode) // This class manages a queue of output buffers shared between // the main thread and the Media Stream Graph thread. class SharedBuffers final { private: class OutputQueue final { public: explicit OutputQueue(const char* aName) : mMutex(aName) {} size_t SizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const { mMutex.AssertCurrentThreadOwns(); size_t amount = 0; for (size_t i = 0; i < mBufferList.size(); i++) { amount += mBufferList[i].SizeOfExcludingThis(aMallocSizeOf, false); } return amount; } Mutex& Lock() const { return const_cast<OutputQueue*>(this)->mMutex; } size_t ReadyToConsume() const { // Accessed on both main thread and media graph thread. mMutex.AssertCurrentThreadOwns(); return mBufferList.size(); } // Produce one buffer AudioChunk& Produce() { mMutex.AssertCurrentThreadOwns(); MOZ_ASSERT(NS_IsMainThread()); mBufferList.push_back(AudioChunk()); return mBufferList.back(); } // Consumes one buffer. AudioChunk Consume() { mMutex.AssertCurrentThreadOwns(); MOZ_ASSERT(!NS_IsMainThread()); MOZ_ASSERT(ReadyToConsume() > 0); AudioChunk front = mBufferList.front(); mBufferList.pop_front(); return front; } // Empties the buffer queue. void Clear() { mMutex.AssertCurrentThreadOwns(); mBufferList.clear(); } private: typedef std::deque<AudioChunk> BufferList; // Synchronizes access to mBufferList. Note that it's the responsibility // of the callers to perform the required locking, and we assert that every // time we access mBufferList. Mutex mMutex; // The list representing the queue. BufferList mBufferList; }; public: explicit SharedBuffers(float aSampleRate) : mOutputQueue("SharedBuffers::outputQueue") , mDelaySoFar(STREAM_TIME_MAX) , mSampleRate(aSampleRate) , mLatency(0.0) , mDroppingBuffers(false) { } size_t SizeOfIncludingThis(MallocSizeOf aMallocSizeOf) const { size_t amount = aMallocSizeOf(this); { MutexAutoLock lock(mOutputQueue.Lock()); amount += mOutputQueue.SizeOfExcludingThis(aMallocSizeOf); } return amount; } // main thread void FinishProducingOutputBuffer(ThreadSharedFloatArrayBufferList* aBuffer, uint32_t aBufferSize) { MOZ_ASSERT(NS_IsMainThread()); TimeStamp now = TimeStamp::Now(); if (mLastEventTime.IsNull()) { mLastEventTime = now; } else { // When main thread blocking has built up enough so // |mLatency > MAX_LATENCY_S|, frame dropping starts. It continues until // the output buffer is completely empty, at which point the accumulated // latency is also reset to 0. // It could happen that the output queue becomes empty before the input // node has fully caught up. In this case there will be events where // |(now - mLastEventTime)| is very short, making mLatency negative. // As this happens and the size of |mLatency| becomes greater than // MAX_LATENCY_S, frame dropping starts again to maintain an as short // output queue as possible. float latency = (now - mLastEventTime).ToSeconds(); float bufferDuration = aBufferSize / mSampleRate; mLatency += latency - bufferDuration; mLastEventTime = now; if (fabs(mLatency) > MAX_LATENCY_S) { mDroppingBuffers = true; } } MutexAutoLock lock(mOutputQueue.Lock()); if (mDroppingBuffers) { if (mOutputQueue.ReadyToConsume()) { return; } mDroppingBuffers = false; mLatency = 0; } for (uint32_t offset = 0; offset < aBufferSize; offset += WEBAUDIO_BLOCK_SIZE) { AudioChunk& chunk = mOutputQueue.Produce(); if (aBuffer) { chunk.mDuration = WEBAUDIO_BLOCK_SIZE; chunk.mBuffer = aBuffer; chunk.mChannelData.SetLength(aBuffer->GetChannels()); for (uint32_t i = 0; i < aBuffer->GetChannels(); ++i) { chunk.mChannelData[i] = aBuffer->GetData(i) + offset; } chunk.mVolume = 1.0f; chunk.mBufferFormat = AUDIO_FORMAT_FLOAT32; } else { chunk.SetNull(WEBAUDIO_BLOCK_SIZE); } } } // graph thread AudioChunk GetOutputBuffer() { MOZ_ASSERT(!NS_IsMainThread()); AudioChunk buffer; { MutexAutoLock lock(mOutputQueue.Lock()); if (mOutputQueue.ReadyToConsume() > 0) { if (mDelaySoFar == STREAM_TIME_MAX) { mDelaySoFar = 0; } buffer = mOutputQueue.Consume(); } else { // If we're out of buffers to consume, just output silence buffer.SetNull(WEBAUDIO_BLOCK_SIZE); if (mDelaySoFar != STREAM_TIME_MAX) { // Remember the delay that we just hit mDelaySoFar += WEBAUDIO_BLOCK_SIZE; } } } return buffer; } StreamTime DelaySoFar() const { MOZ_ASSERT(!NS_IsMainThread()); return mDelaySoFar == STREAM_TIME_MAX ? 0 : mDelaySoFar; } void Reset() { MOZ_ASSERT(!NS_IsMainThread()); mDelaySoFar = STREAM_TIME_MAX; mLatency = 0.0f; { MutexAutoLock lock(mOutputQueue.Lock()); mOutputQueue.Clear(); } mLastEventTime = TimeStamp(); } private: OutputQueue mOutputQueue; // How much delay we've seen so far. This measures the amount of delay // caused by the main thread lagging behind in producing output buffers. // STREAM_TIME_MAX means that we have not received our first buffer yet. StreamTime mDelaySoFar; // The samplerate of the context. float mSampleRate; // This is the latency caused by the buffering. If this grows too high, we // will drop buffers until it is acceptable. float mLatency; // This is the time at which we last produced a buffer, to detect if the main // thread has been blocked. TimeStamp mLastEventTime; // True if we should be dropping buffers. bool mDroppingBuffers; }; class ScriptProcessorNodeEngine final : public AudioNodeEngine { public: ScriptProcessorNodeEngine(ScriptProcessorNode* aNode, AudioDestinationNode* aDestination, uint32_t aBufferSize, uint32_t aNumberOfInputChannels) : AudioNodeEngine(aNode) , mDestination(aDestination->Stream()) , mSharedBuffers(new SharedBuffers(mDestination->SampleRate())) , mBufferSize(aBufferSize) , mInputChannelCount(aNumberOfInputChannels) , mInputWriteIndex(0) { } SharedBuffers* GetSharedBuffers() const { return mSharedBuffers; } enum { IS_CONNECTED, }; void SetInt32Parameter(uint32_t aIndex, int32_t aParam) override { switch (aIndex) { case IS_CONNECTED: mIsConnected = aParam; break; default: NS_ERROR("Bad Int32Parameter"); } // End index switch. } void ProcessBlock(AudioNodeStream* aStream, GraphTime aFrom, const AudioBlock& aInput, AudioBlock* aOutput, bool* aFinished) override { // This node is not connected to anything. Per spec, we don't fire the // onaudioprocess event. We also want to clear out the input and output // buffer queue, and output a null buffer. if (!mIsConnected) { aOutput->SetNull(WEBAUDIO_BLOCK_SIZE); mSharedBuffers->Reset(); mInputWriteIndex = 0; return; } // The input buffer is allocated lazily when non-null input is received. if (!aInput.IsNull() && !mInputBuffer) { mInputBuffer = ThreadSharedFloatArrayBufferList:: Create(mInputChannelCount, mBufferSize, fallible); if (mInputBuffer && mInputWriteIndex) { // Zero leading for null chunks that were skipped. for (uint32_t i = 0; i < mInputChannelCount; ++i) { float* channelData = mInputBuffer->GetDataForWrite(i); PodZero(channelData, mInputWriteIndex); } } } // First, record our input buffer, if its allocation succeeded. uint32_t inputChannelCount = mInputBuffer ? mInputBuffer->GetChannels() : 0; for (uint32_t i = 0; i < inputChannelCount; ++i) { float* writeData = mInputBuffer->GetDataForWrite(i) + mInputWriteIndex; if (aInput.IsNull()) { PodZero(writeData, aInput.GetDuration()); } else { MOZ_ASSERT(aInput.GetDuration() == WEBAUDIO_BLOCK_SIZE, "sanity check"); MOZ_ASSERT(aInput.ChannelCount() == inputChannelCount); AudioBlockCopyChannelWithScale(static_cast<const float*>(aInput.mChannelData[i]), aInput.mVolume, writeData); } } mInputWriteIndex += aInput.GetDuration(); // Now, see if we have data to output // Note that we need to do this before sending the buffer to the main // thread so that our delay time is updated. *aOutput = mSharedBuffers->GetOutputBuffer(); if (mInputWriteIndex >= mBufferSize) { SendBuffersToMainThread(aStream, aFrom); mInputWriteIndex -= mBufferSize; } } bool IsActive() const override { // Could return false when !mIsConnected after all output chunks produced // by main thread events calling // SharedBuffers::FinishProducingOutputBuffer() have been processed. return true; } size_t SizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const override { // Not owned: // - mDestination (probably) size_t amount = AudioNodeEngine::SizeOfExcludingThis(aMallocSizeOf); amount += mSharedBuffers->SizeOfIncludingThis(aMallocSizeOf); if (mInputBuffer) { amount += mInputBuffer->SizeOfIncludingThis(aMallocSizeOf); } return amount; } size_t SizeOfIncludingThis(MallocSizeOf aMallocSizeOf) const override { return aMallocSizeOf(this) + SizeOfExcludingThis(aMallocSizeOf); } private: void SendBuffersToMainThread(AudioNodeStream* aStream, GraphTime aFrom) { MOZ_ASSERT(!NS_IsMainThread()); // we now have a full input buffer ready to be sent to the main thread. StreamTime playbackTick = mDestination->GraphTimeToStreamTime(aFrom); // Add the duration of the current sample playbackTick += WEBAUDIO_BLOCK_SIZE; // Add the delay caused by the main thread playbackTick += mSharedBuffers->DelaySoFar(); // Compute the playback time in the coordinate system of the destination double playbackTime = mDestination->StreamTimeToSeconds(playbackTick); class Command final : public Runnable { public: Command(AudioNodeStream* aStream, already_AddRefed<ThreadSharedFloatArrayBufferList> aInputBuffer, double aPlaybackTime) : mStream(aStream) , mInputBuffer(aInputBuffer) , mPlaybackTime(aPlaybackTime) { } NS_IMETHOD Run() override { RefPtr<ThreadSharedFloatArrayBufferList> output; auto engine = static_cast<ScriptProcessorNodeEngine*>(mStream->Engine()); { auto node = static_cast<ScriptProcessorNode*> (engine->NodeMainThread()); if (!node) { return NS_OK; } if (node->HasListenersFor(nsGkAtoms::onaudioprocess)) { output = DispatchAudioProcessEvent(node); } // The node may have been destroyed during event dispatch. } // Append it to our output buffer queue engine->GetSharedBuffers()-> FinishProducingOutputBuffer(output, engine->mBufferSize); return NS_OK; } // Returns the output buffers if set in event handlers. ThreadSharedFloatArrayBufferList* DispatchAudioProcessEvent(ScriptProcessorNode* aNode) { AudioContext* context = aNode->Context(); if (!context) { return nullptr; } AutoJSAPI jsapi; if (NS_WARN_IF(!jsapi.Init(aNode->GetOwner()))) { return nullptr; } JSContext* cx = jsapi.cx(); uint32_t inputChannelCount = aNode->ChannelCount(); // Create the input buffer RefPtr<AudioBuffer> inputBuffer; if (mInputBuffer) { ErrorResult rv; inputBuffer = AudioBuffer::Create(context, inputChannelCount, aNode->BufferSize(), context->SampleRate(), mInputBuffer.forget(), rv); if (rv.Failed()) { rv.SuppressException(); return nullptr; } } // Ask content to produce data in the output buffer // Note that we always avoid creating the output buffer here, and we try to // avoid creating the input buffer as well. The AudioProcessingEvent class // knows how to lazily create them if needed once the script tries to access // them. Otherwise, we may be able to get away without creating them! RefPtr<AudioProcessingEvent> event = new AudioProcessingEvent(aNode, nullptr, nullptr); event->InitEvent(inputBuffer, inputChannelCount, mPlaybackTime); aNode->DispatchTrustedEvent(event); // Steal the output buffers if they have been set. // Don't create a buffer if it hasn't been used to return output; // FinishProducingOutputBuffer() will optimize output = null. // GetThreadSharedChannelsForRate() may also return null after OOM. if (event->HasOutputBuffer()) { ErrorResult rv; AudioBuffer* buffer = event->GetOutputBuffer(rv); // HasOutputBuffer() returning true means that GetOutputBuffer() // will not fail. MOZ_ASSERT(!rv.Failed()); return buffer->GetThreadSharedChannelsForRate(cx); } return nullptr; } private: RefPtr<AudioNodeStream> mStream; RefPtr<ThreadSharedFloatArrayBufferList> mInputBuffer; double mPlaybackTime; }; NS_DispatchToMainThread(new Command(aStream, mInputBuffer.forget(), playbackTime)); } friend class ScriptProcessorNode; AudioNodeStream* mDestination; nsAutoPtr<SharedBuffers> mSharedBuffers; RefPtr<ThreadSharedFloatArrayBufferList> mInputBuffer; const uint32_t mBufferSize; const uint32_t mInputChannelCount; // The write index into the current input buffer uint32_t mInputWriteIndex; bool mIsConnected = false; }; ScriptProcessorNode::ScriptProcessorNode(AudioContext* aContext, uint32_t aBufferSize, uint32_t aNumberOfInputChannels, uint32_t aNumberOfOutputChannels) : AudioNode(aContext, aNumberOfInputChannels, mozilla::dom::ChannelCountMode::Explicit, mozilla::dom::ChannelInterpretation::Speakers) , mBufferSize(aBufferSize ? aBufferSize : // respect what the web developer requested 4096) // choose our own buffer size -- 4KB for now , mNumberOfOutputChannels(aNumberOfOutputChannels) { MOZ_ASSERT(BufferSize() % WEBAUDIO_BLOCK_SIZE == 0, "Invalid buffer size"); ScriptProcessorNodeEngine* engine = new ScriptProcessorNodeEngine(this, aContext->Destination(), BufferSize(), aNumberOfInputChannels); mStream = AudioNodeStream::Create(aContext, engine, AudioNodeStream::NO_STREAM_FLAGS, aContext->Graph()); } ScriptProcessorNode::~ScriptProcessorNode() { } size_t ScriptProcessorNode::SizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const { size_t amount = AudioNode::SizeOfExcludingThis(aMallocSizeOf); return amount; } size_t ScriptProcessorNode::SizeOfIncludingThis(MallocSizeOf aMallocSizeOf) const { return aMallocSizeOf(this) + SizeOfExcludingThis(aMallocSizeOf); } void ScriptProcessorNode::EventListenerAdded(nsIAtom* aType) { AudioNode::EventListenerAdded(aType); if (aType == nsGkAtoms::onaudioprocess) { UpdateConnectedStatus(); } } void ScriptProcessorNode::EventListenerRemoved(nsIAtom* aType) { AudioNode::EventListenerRemoved(aType); if (aType == nsGkAtoms::onaudioprocess) { UpdateConnectedStatus(); } } JSObject* ScriptProcessorNode::WrapObject(JSContext* aCx, JS::Handle<JSObject*> aGivenProto) { return ScriptProcessorNodeBinding::Wrap(aCx, this, aGivenProto); } void ScriptProcessorNode::UpdateConnectedStatus() { bool isConnected = mHasPhantomInput || !(OutputNodes().IsEmpty() && OutputParams().IsEmpty() && InputNodes().IsEmpty()); // Events are queued even when there is no listener because a listener // may be added while events are in the queue. SendInt32ParameterToStream(ScriptProcessorNodeEngine::IS_CONNECTED, isConnected); if (isConnected && HasListenersFor(nsGkAtoms::onaudioprocess)) { MarkActive(); } else { MarkInactive(); } } } // namespace dom } // namespace mozilla