/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*-*/ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "MediaStreamGraphImpl.h" #include "mozilla/MathAlgorithms.h" #include "mozilla/Unused.h" #include "AudioSegment.h" #include "VideoSegment.h" #include "nsContentUtils.h" #include "nsIObserver.h" #include "nsPrintfCString.h" #include "nsServiceManagerUtils.h" #include "prerror.h" #include "mozilla/Logging.h" #include "mozilla/Attributes.h" #include "TrackUnionStream.h" #include "ImageContainer.h" #include "AudioCaptureStream.h" #include "AudioChannelService.h" #include "AudioNodeStream.h" #include "AudioNodeExternalInputStream.h" #include "MediaStreamListener.h" #include "MediaStreamVideoSink.h" #include "mozilla/dom/AudioContextBinding.h" #include "mozilla/media/MediaUtils.h" #include <algorithm> #include "GeckoProfiler.h" #include "VideoFrameContainer.h" #include "mozilla/Unused.h" #include "mozilla/media/MediaUtils.h" #ifdef MOZ_WEBRTC #include "AudioOutputObserver.h" #endif #include "mtransport/runnable_utils.h" #include "webaudio/blink/HRTFDatabaseLoader.h" using namespace mozilla::layers; using namespace mozilla::dom; using namespace mozilla::gfx; using namespace mozilla::media; namespace mozilla { LazyLogModule gMediaStreamGraphLog("MediaStreamGraph"); #define STREAM_LOG(type, msg) MOZ_LOG(gMediaStreamGraphLog, type, msg) // #define ENABLE_LIFECYCLE_LOG // We don't use NSPR log here because we want this interleaved with adb logcat // on Android/B2G #ifdef ENABLE_LIFECYCLE_LOG # ifdef ANDROID # include "android/log.h" # define LIFECYCLE_LOG(...) __android_log_print(ANDROID_LOG_INFO, "Gecko - MSG", ## __VA_ARGS__); printf(__VA_ARGS__);printf("\n"); # else # define LIFECYCLE_LOG(...) printf(__VA_ARGS__);printf("\n"); # endif #else # define LIFECYCLE_LOG(...) #endif enum SourceMediaStream::TrackCommands : uint32_t { TRACK_CREATE = TrackEventCommand::TRACK_EVENT_CREATED, TRACK_END = TrackEventCommand::TRACK_EVENT_ENDED, TRACK_UNUSED = TrackEventCommand::TRACK_EVENT_UNUSED, }; /** * A hash table containing the graph instances, one per AudioChannel. */ static nsDataHashtable<nsUint32HashKey, MediaStreamGraphImpl*> gGraphs; MediaStreamGraphImpl::~MediaStreamGraphImpl() { NS_ASSERTION(IsEmpty(), "All streams should have been destroyed by messages from the main thread"); STREAM_LOG(LogLevel::Debug, ("MediaStreamGraph %p destroyed", this)); LIFECYCLE_LOG("MediaStreamGraphImpl::~MediaStreamGraphImpl\n"); } void MediaStreamGraphImpl::FinishStream(MediaStream* aStream) { if (aStream->mFinished) return; STREAM_LOG(LogLevel::Debug, ("MediaStream %p will finish", aStream)); #ifdef DEBUG for (StreamTracks::TrackIter track(aStream->mTracks); !track.IsEnded(); track.Next()) { if (!track->IsEnded()) { STREAM_LOG(LogLevel::Error, ("MediaStream %p will finish, but track %d has not ended.", aStream, track->GetID())); NS_ASSERTION(false, "Finished stream cannot contain live track"); } } #endif aStream->mFinished = true; aStream->mTracks.AdvanceKnownTracksTime(STREAM_TIME_MAX); SetStreamOrderDirty(); } void MediaStreamGraphImpl::AddStreamGraphThread(MediaStream* aStream) { aStream->mTracksStartTime = mProcessedTime; if (aStream->AsSourceStream()) { SourceMediaStream* source = aStream->AsSourceStream(); TimeStamp currentTimeStamp = CurrentDriver()->GetCurrentTimeStamp(); TimeStamp processedTimeStamp = currentTimeStamp + TimeDuration::FromSeconds(MediaTimeToSeconds(mProcessedTime - IterationEnd())); source->SetStreamTracksStartTimeStamp(processedTimeStamp); } if (aStream->IsSuspended()) { mSuspendedStreams.AppendElement(aStream); STREAM_LOG(LogLevel::Debug, ("Adding media stream %p to the graph, in the suspended stream array", aStream)); } else { mStreams.AppendElement(aStream); STREAM_LOG(LogLevel::Debug, ("Adding media stream %p to graph %p, count %lu", aStream, this, mStreams.Length())); LIFECYCLE_LOG("Adding media stream %p to graph %p, count %lu", aStream, this, mStreams.Length()); } SetStreamOrderDirty(); } void MediaStreamGraphImpl::RemoveStreamGraphThread(MediaStream* aStream) { // Remove references in mStreamUpdates before we allow aStream to die. // Pending updates are not needed (since the main thread has already given // up the stream) so we will just drop them. { MonitorAutoLock lock(mMonitor); for (uint32_t i = 0; i < mStreamUpdates.Length(); ++i) { if (mStreamUpdates[i].mStream == aStream) { mStreamUpdates[i].mStream = nullptr; } } } // Ensure that mFirstCycleBreaker and mMixer are updated when necessary. SetStreamOrderDirty(); if (aStream->IsSuspended()) { mSuspendedStreams.RemoveElement(aStream); } else { mStreams.RemoveElement(aStream); } STREAM_LOG(LogLevel::Debug, ("Removed media stream %p from graph %p, count %lu", aStream, this, mStreams.Length())); LIFECYCLE_LOG("Removed media stream %p from graph %p, count %lu", aStream, this, mStreams.Length()); NS_RELEASE(aStream); // probably destroying it } void MediaStreamGraphImpl::ExtractPendingInput(SourceMediaStream* aStream, GraphTime aDesiredUpToTime, bool* aEnsureNextIteration) { bool finished; { MutexAutoLock lock(aStream->mMutex); if (aStream->mPullEnabled && !aStream->mFinished && !aStream->mListeners.IsEmpty()) { // Compute how much stream time we'll need assuming we don't block // the stream at all. StreamTime t = aStream->GraphTimeToStreamTime(aDesiredUpToTime); STREAM_LOG(LogLevel::Verbose, ("Calling NotifyPull aStream=%p t=%f current end=%f", aStream, MediaTimeToSeconds(t), MediaTimeToSeconds(aStream->mTracks.GetEnd()))); if (t > aStream->mTracks.GetEnd()) { *aEnsureNextIteration = true; #ifdef DEBUG if (aStream->mListeners.Length() == 0) { STREAM_LOG(LogLevel::Error, ("No listeners in NotifyPull aStream=%p desired=%f current end=%f", aStream, MediaTimeToSeconds(t), MediaTimeToSeconds(aStream->mTracks.GetEnd()))); aStream->DumpTrackInfo(); } #endif for (uint32_t j = 0; j < aStream->mListeners.Length(); ++j) { MediaStreamListener* l = aStream->mListeners[j]; { MutexAutoUnlock unlock(aStream->mMutex); l->NotifyPull(this, t); } } } } finished = aStream->mUpdateFinished; bool shouldNotifyTrackCreated = false; for (int32_t i = aStream->mUpdateTracks.Length() - 1; i >= 0; --i) { SourceMediaStream::TrackData* data = &aStream->mUpdateTracks[i]; aStream->ApplyTrackDisabling(data->mID, data->mData); // Dealing with NotifyQueuedTrackChanges and NotifyQueuedAudioData part. // The logic is different from the manipulating of aStream->mTracks part. // So it is not combined with the manipulating of aStream->mTracks part. StreamTime offset = (data->mCommands & SourceMediaStream::TRACK_CREATE) ? data->mStart : aStream->mTracks.FindTrack(data->mID)->GetSegment()->GetDuration(); // Audio case. if (data->mData->GetType() == MediaSegment::AUDIO) { if (data->mCommands) { MOZ_ASSERT(!(data->mCommands & SourceMediaStream::TRACK_UNUSED)); for (MediaStreamListener* l : aStream->mListeners) { if (data->mCommands & SourceMediaStream::TRACK_END) { l->NotifyQueuedAudioData(this, data->mID, offset, *(static_cast<AudioSegment*>(data->mData.get()))); } l->NotifyQueuedTrackChanges(this, data->mID, offset, static_cast<TrackEventCommand>(data->mCommands), *data->mData); if (data->mCommands & SourceMediaStream::TRACK_CREATE) { l->NotifyQueuedAudioData(this, data->mID, offset, *(static_cast<AudioSegment*>(data->mData.get()))); } } } else { for (MediaStreamListener* l : aStream->mListeners) { l->NotifyQueuedAudioData(this, data->mID, offset, *(static_cast<AudioSegment*>(data->mData.get()))); } } } // Video case. if (data->mData->GetType() == MediaSegment::VIDEO) { if (data->mCommands) { MOZ_ASSERT(!(data->mCommands & SourceMediaStream::TRACK_UNUSED)); for (MediaStreamListener* l : aStream->mListeners) { l->NotifyQueuedTrackChanges(this, data->mID, offset, static_cast<TrackEventCommand>(data->mCommands), *data->mData); } } } for (TrackBound<MediaStreamTrackListener>& b : aStream->mTrackListeners) { if (b.mTrackID != data->mID) { continue; } b.mListener->NotifyQueuedChanges(this, offset, *data->mData); if (data->mCommands & SourceMediaStream::TRACK_END) { b.mListener->NotifyEnded(); } } if (data->mCommands & SourceMediaStream::TRACK_CREATE) { MediaSegment* segment = data->mData.forget(); STREAM_LOG(LogLevel::Debug, ("SourceMediaStream %p creating track %d, start %lld, initial end %lld", aStream, data->mID, int64_t(data->mStart), int64_t(segment->GetDuration()))); data->mEndOfFlushedData += segment->GetDuration(); aStream->mTracks.AddTrack(data->mID, data->mStart, segment); // The track has taken ownership of data->mData, so let's replace // data->mData with an empty clone. data->mData = segment->CreateEmptyClone(); data->mCommands &= ~SourceMediaStream::TRACK_CREATE; shouldNotifyTrackCreated = true; } else if (data->mData->GetDuration() > 0) { MediaSegment* dest = aStream->mTracks.FindTrack(data->mID)->GetSegment(); STREAM_LOG(LogLevel::Verbose, ("SourceMediaStream %p track %d, advancing end from %lld to %lld", aStream, data->mID, int64_t(dest->GetDuration()), int64_t(dest->GetDuration() + data->mData->GetDuration()))); data->mEndOfFlushedData += data->mData->GetDuration(); dest->AppendFrom(data->mData); } if (data->mCommands & SourceMediaStream::TRACK_END) { aStream->mTracks.FindTrack(data->mID)->SetEnded(); aStream->mUpdateTracks.RemoveElementAt(i); } } if (shouldNotifyTrackCreated) { for (MediaStreamListener* l : aStream->mListeners) { l->NotifyFinishedTrackCreation(this); } } if (!aStream->mFinished) { aStream->mTracks.AdvanceKnownTracksTime(aStream->mUpdateKnownTracksTime); } } if (aStream->mTracks.GetEnd() > 0) { aStream->mHasCurrentData = true; } if (finished) { FinishStream(aStream); } } StreamTime MediaStreamGraphImpl::GraphTimeToStreamTimeWithBlocking(MediaStream* aStream, GraphTime aTime) { MOZ_ASSERT(aTime <= mStateComputedTime, "Don't ask about times where we haven't made blocking decisions yet"); return std::max<StreamTime>(0, std::min(aTime, aStream->mStartBlocking) - aStream->mTracksStartTime); } GraphTime MediaStreamGraphImpl::IterationEnd() const { return CurrentDriver()->IterationEnd(); } void MediaStreamGraphImpl::UpdateCurrentTimeForStreams(GraphTime aPrevCurrentTime) { for (MediaStream* stream : AllStreams()) { bool isAnyBlocked = stream->mStartBlocking < mStateComputedTime; bool isAnyUnblocked = stream->mStartBlocking > aPrevCurrentTime; // Calculate blocked time and fire Blocked/Unblocked events GraphTime blockedTime = mStateComputedTime - stream->mStartBlocking; NS_ASSERTION(blockedTime >= 0, "Error in blocking time"); stream->AdvanceTimeVaryingValuesToCurrentTime(mStateComputedTime, blockedTime); STREAM_LOG(LogLevel::Verbose, ("MediaStream %p bufferStartTime=%f blockedTime=%f", stream, MediaTimeToSeconds(stream->mTracksStartTime), MediaTimeToSeconds(blockedTime))); stream->mStartBlocking = mStateComputedTime; if (isAnyUnblocked && stream->mNotifiedBlocked) { for (uint32_t j = 0; j < stream->mListeners.Length(); ++j) { MediaStreamListener* l = stream->mListeners[j]; l->NotifyBlockingChanged(this, MediaStreamListener::UNBLOCKED); } stream->mNotifiedBlocked = false; } if (isAnyBlocked && !stream->mNotifiedBlocked) { for (uint32_t j = 0; j < stream->mListeners.Length(); ++j) { MediaStreamListener* l = stream->mListeners[j]; l->NotifyBlockingChanged(this, MediaStreamListener::BLOCKED); } stream->mNotifiedBlocked = true; } if (isAnyUnblocked) { NS_ASSERTION(!stream->mNotifiedFinished, "Shouldn't have already notified of finish *and* have output!"); for (uint32_t j = 0; j < stream->mListeners.Length(); ++j) { MediaStreamListener* l = stream->mListeners[j]; l->NotifyOutput(this, mProcessedTime); } } // The stream is fully finished when all of its track data has been played // out. if (stream->mFinished && !stream->mNotifiedFinished && mProcessedTime >= stream->StreamTimeToGraphTime(stream->GetStreamTracks().GetAllTracksEnd())) { stream->mNotifiedFinished = true; SetStreamOrderDirty(); for (uint32_t j = 0; j < stream->mListeners.Length(); ++j) { MediaStreamListener* l = stream->mListeners[j]; l->NotifyEvent(this, MediaStreamGraphEvent::EVENT_FINISHED); } } } } template<typename C, typename Chunk> void MediaStreamGraphImpl::ProcessChunkMetadataForInterval(MediaStream* aStream, TrackID aTrackID, C& aSegment, StreamTime aStart, StreamTime aEnd) { MOZ_ASSERT(aStream); MOZ_ASSERT(IsTrackIDExplicit(aTrackID)); StreamTime offset = 0; for (typename C::ConstChunkIterator chunk(aSegment); !chunk.IsEnded(); chunk.Next()) { if (offset >= aEnd) { break; } offset += chunk->GetDuration(); if (chunk->IsNull() || offset < aStart) { continue; } PrincipalHandle principalHandle = chunk->GetPrincipalHandle(); if (principalHandle != aSegment.GetLastPrincipalHandle()) { aSegment.SetLastPrincipalHandle(principalHandle); STREAM_LOG(LogLevel::Debug, ("MediaStream %p track %d, principalHandle " "changed in %sChunk with duration %lld", aStream, aTrackID, aSegment.GetType() == MediaSegment::AUDIO ? "Audio" : "Video", (long long) chunk->GetDuration())); for (const TrackBound<MediaStreamTrackListener>& listener : aStream->mTrackListeners) { if (listener.mTrackID == aTrackID) { listener.mListener->NotifyPrincipalHandleChanged(this, principalHandle); } } } } } void MediaStreamGraphImpl::ProcessChunkMetadata(GraphTime aPrevCurrentTime) { for (MediaStream* stream : AllStreams()) { StreamTime iterationStart = stream->GraphTimeToStreamTime(aPrevCurrentTime); StreamTime iterationEnd = stream->GraphTimeToStreamTime(mProcessedTime); for (StreamTracks::TrackIter tracks(stream->mTracks); !tracks.IsEnded(); tracks.Next()) { MediaSegment* segment = tracks->GetSegment(); if (!segment) { continue; } if (tracks->GetType() == MediaSegment::AUDIO) { AudioSegment* audio = static_cast<AudioSegment*>(segment); ProcessChunkMetadataForInterval<AudioSegment, AudioChunk>( stream, tracks->GetID(), *audio, iterationStart, iterationEnd); } else if (tracks->GetType() == MediaSegment::VIDEO) { VideoSegment* video = static_cast<VideoSegment*>(segment); ProcessChunkMetadataForInterval<VideoSegment, VideoChunk>( stream, tracks->GetID(), *video, iterationStart, iterationEnd); } else { MOZ_CRASH("Unknown track type"); } } } } GraphTime MediaStreamGraphImpl::WillUnderrun(MediaStream* aStream, GraphTime aEndBlockingDecisions) { // Finished streams can't underrun. ProcessedMediaStreams also can't cause // underrun currently, since we'll always be able to produce data for them // unless they block on some other stream. if (aStream->mFinished || aStream->AsProcessedStream()) { return aEndBlockingDecisions; } // This stream isn't finished or suspended. We don't need to call // StreamTimeToGraphTime since an underrun is the only thing that can block // it. GraphTime bufferEnd = aStream->GetTracksEnd() + aStream->mTracksStartTime; #ifdef DEBUG if (bufferEnd < mProcessedTime) { STREAM_LOG(LogLevel::Error, ("MediaStream %p underrun, " "bufferEnd %f < mProcessedTime %f (%lld < %lld), Streamtime %lld", aStream, MediaTimeToSeconds(bufferEnd), MediaTimeToSeconds(mProcessedTime), bufferEnd, mProcessedTime, aStream->GetTracksEnd())); aStream->DumpTrackInfo(); NS_ASSERTION(bufferEnd >= mProcessedTime, "Buffer underran"); } #endif return std::min(bufferEnd, aEndBlockingDecisions); } namespace { // Value of mCycleMarker for unvisited streams in cycle detection. const uint32_t NOT_VISITED = UINT32_MAX; // Value of mCycleMarker for ordered streams in muted cycles. const uint32_t IN_MUTED_CYCLE = 1; } // namespace bool MediaStreamGraphImpl::AudioTrackPresent(bool& aNeedsAEC) { AssertOnGraphThreadOrNotRunning(); bool audioTrackPresent = false; for (uint32_t i = 0; i < mStreams.Length() && audioTrackPresent == false; ++i) { MediaStream* stream = mStreams[i]; SourceMediaStream* source = stream->AsSourceStream(); #ifdef MOZ_WEBRTC if (source && source->NeedsMixing()) { aNeedsAEC = true; } #endif // If this is a AudioNodeStream, force a AudioCallbackDriver. if (stream->AsAudioNodeStream()) { audioTrackPresent = true; } else { for (StreamTracks::TrackIter tracks(stream->GetStreamTracks(), MediaSegment::AUDIO); !tracks.IsEnded(); tracks.Next()) { audioTrackPresent = true; } } if (source) { audioTrackPresent = source->HasPendingAudioTrack(); } } // XXX For some reason, there are race conditions when starting an audio input where // we find no active audio tracks. In any case, if we have an active audio input we // should not allow a switch back to a SystemClockDriver if (!audioTrackPresent && mInputDeviceUsers.Count() != 0) { NS_WARNING("No audio tracks, but full-duplex audio is enabled!!!!!"); audioTrackPresent = true; #ifdef MOZ_WEBRTC aNeedsAEC = true; #endif } return audioTrackPresent; } void MediaStreamGraphImpl::UpdateStreamOrder() { bool shouldAEC = false; bool audioTrackPresent = AudioTrackPresent(shouldAEC); // Note that this looks for any audio streams, input or output, and switches to a // SystemClockDriver if there are none. However, if another is already pending, let that // switch happen. if (!audioTrackPresent && mRealtime && CurrentDriver()->AsAudioCallbackDriver()) { MonitorAutoLock mon(mMonitor); if (CurrentDriver()->AsAudioCallbackDriver()->IsStarted() && !(CurrentDriver()->Switching())) { if (mLifecycleState == LIFECYCLE_RUNNING) { SystemClockDriver* driver = new SystemClockDriver(this); CurrentDriver()->SwitchAtNextIteration(driver); } } } bool switching = false; { MonitorAutoLock mon(mMonitor); switching = CurrentDriver()->Switching(); } if (audioTrackPresent && mRealtime && !CurrentDriver()->AsAudioCallbackDriver() && !switching) { MonitorAutoLock mon(mMonitor); if (mLifecycleState == LIFECYCLE_RUNNING) { AudioCallbackDriver* driver = new AudioCallbackDriver(this); CurrentDriver()->SwitchAtNextIteration(driver); } } #ifdef MOZ_WEBRTC // Whenever we change AEC state, notify the current driver, which also // will sample the state when the driver inits if (shouldAEC && !mFarendObserverRef && gFarendObserver) { mFarendObserverRef = gFarendObserver; mMixer.AddCallback(mFarendObserverRef); if (CurrentDriver()->AsAudioCallbackDriver()) { CurrentDriver()->AsAudioCallbackDriver()->SetMicrophoneActive(true); } } else if (!shouldAEC && mFarendObserverRef){ if (mMixer.FindCallback(mFarendObserverRef)) { mMixer.RemoveCallback(mFarendObserverRef); mFarendObserverRef = nullptr; if (CurrentDriver()->AsAudioCallbackDriver()) { CurrentDriver()->AsAudioCallbackDriver()->SetMicrophoneActive(false); } } } #endif if (!mStreamOrderDirty) { return; } mStreamOrderDirty = false; // The algorithm for finding cycles is based on Tim Leslie's iterative // implementation [1][2] of Pearce's variant [3] of Tarjan's strongly // connected components (SCC) algorithm. There are variations (a) to // distinguish whether streams in SCCs of size 1 are in a cycle and (b) to // re-run the algorithm over SCCs with breaks at DelayNodes. // // [1] http://www.timl.id.au/?p=327 // [2] https://github.com/scipy/scipy/blob/e2c502fca/scipy/sparse/csgraph/_traversal.pyx#L582 // [3] http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1707 // // There are two stacks. One for the depth-first search (DFS), mozilla::LinkedList<MediaStream> dfsStack; // and another for streams popped from the DFS stack, but still being // considered as part of SCCs involving streams on the stack. mozilla::LinkedList<MediaStream> sccStack; // An index into mStreams for the next stream found with no unsatisfied // upstream dependencies. uint32_t orderedStreamCount = 0; for (uint32_t i = 0; i < mStreams.Length(); ++i) { MediaStream* s = mStreams[i]; ProcessedMediaStream* ps = s->AsProcessedStream(); if (ps) { // The dfsStack initially contains a list of all processed streams in // unchanged order. dfsStack.insertBack(s); ps->mCycleMarker = NOT_VISITED; } else { // SourceMediaStreams have no inputs and so can be ordered now. mStreams[orderedStreamCount] = s; ++orderedStreamCount; } } // mNextStackMarker corresponds to "index" in Tarjan's algorithm. It is a // counter to label mCycleMarker on the next visited stream in the DFS // uniquely in the set of visited streams that are still being considered. // // In this implementation, the counter descends so that the values are // strictly greater than the values that mCycleMarker takes when the stream // has been ordered (0 or IN_MUTED_CYCLE). // // Each new stream labelled, as the DFS searches upstream, receives a value // less than those used for all other streams being considered. uint32_t nextStackMarker = NOT_VISITED - 1; // Reset list of DelayNodes in cycles stored at the tail of mStreams. mFirstCycleBreaker = mStreams.Length(); // Rearrange dfsStack order as required to DFS upstream and pop streams // in processing order to place in mStreams. while (auto ps = static_cast<ProcessedMediaStream*>(dfsStack.getFirst())) { const auto& inputs = ps->mInputs; MOZ_ASSERT(ps->AsProcessedStream()); if (ps->mCycleMarker == NOT_VISITED) { // Record the position on the visited stack, so that any searches // finding this stream again know how much of the stack is in the cycle. ps->mCycleMarker = nextStackMarker; --nextStackMarker; // Not-visited input streams should be processed first. // SourceMediaStreams have already been ordered. for (uint32_t i = inputs.Length(); i--; ) { if (inputs[i]->mSource->IsSuspended()) { continue; } auto input = inputs[i]->mSource->AsProcessedStream(); if (input && input->mCycleMarker == NOT_VISITED) { // It can be that this stream has an input which is from a suspended // AudioContext. if (input->isInList()) { input->remove(); dfsStack.insertFront(input); } } } continue; } // Returning from DFS. Pop from dfsStack. ps->remove(); // cycleStackMarker keeps track of the highest marker value on any // upstream stream, if any, found receiving input, directly or indirectly, // from the visited stack (and so from |ps|, making a cycle). In a // variation from Tarjan's SCC algorithm, this does not include |ps| // unless it is part of the cycle. uint32_t cycleStackMarker = 0; for (uint32_t i = inputs.Length(); i--; ) { if (inputs[i]->mSource->IsSuspended()) { continue; } auto input = inputs[i]->mSource->AsProcessedStream(); if (input) { cycleStackMarker = std::max(cycleStackMarker, input->mCycleMarker); } } if (cycleStackMarker <= IN_MUTED_CYCLE) { // All inputs have been ordered and their stack markers have been removed. // This stream is not part of a cycle. It can be processed next. ps->mCycleMarker = 0; mStreams[orderedStreamCount] = ps; ++orderedStreamCount; continue; } // A cycle has been found. Record this stream for ordering when all // streams in this SCC have been popped from the DFS stack. sccStack.insertFront(ps); if (cycleStackMarker > ps->mCycleMarker) { // Cycles have been found that involve streams that remain on the stack. // Leave mCycleMarker indicating the most downstream (last) stream on // the stack known to be part of this SCC. In this way, any searches on // other paths that find |ps| will know (without having to traverse from // this stream again) that they are part of this SCC (i.e. part of an // intersecting cycle). ps->mCycleMarker = cycleStackMarker; continue; } // |ps| is the root of an SCC involving no other streams on dfsStack, the // complete SCC has been recorded, and streams in this SCC are part of at // least one cycle. MOZ_ASSERT(cycleStackMarker == ps->mCycleMarker); // If there are DelayNodes in this SCC, then they may break the cycles. bool haveDelayNode = false; auto next = sccStack.getFirst(); // Streams in this SCC are identified by mCycleMarker <= cycleStackMarker. // (There may be other streams later in sccStack from other incompletely // searched SCCs, involving streams still on dfsStack.) // // DelayNodes in cycles must behave differently from those not in cycles, // so all DelayNodes in the SCC must be identified. while (next && static_cast<ProcessedMediaStream*>(next)-> mCycleMarker <= cycleStackMarker) { auto ns = next->AsAudioNodeStream(); // Get next before perhaps removing from list below. next = next->getNext(); if (ns && ns->Engine()->AsDelayNodeEngine()) { haveDelayNode = true; // DelayNodes break cycles by producing their output in a // preprocessing phase; they do not need to be ordered before their // consumers. Order them at the tail of mStreams so that they can be // handled specially. Do so now, so that DFS ignores them. ns->remove(); ns->mCycleMarker = 0; --mFirstCycleBreaker; mStreams[mFirstCycleBreaker] = ns; } } auto after_scc = next; while ((next = sccStack.getFirst()) != after_scc) { next->remove(); auto removed = static_cast<ProcessedMediaStream*>(next); if (haveDelayNode) { // Return streams to the DFS stack again (to order and detect cycles // without delayNodes). Any of these streams that are still inputs // for streams on the visited stack must be returned to the front of // the stack to be ordered before their dependents. We know that none // of these streams need input from streams on the visited stack, so // they can all be searched and ordered before the current stack head // is popped. removed->mCycleMarker = NOT_VISITED; dfsStack.insertFront(removed); } else { // Streams in cycles without any DelayNodes must be muted, and so do // not need input and can be ordered now. They must be ordered before // their consumers so that their muted output is available. removed->mCycleMarker = IN_MUTED_CYCLE; mStreams[orderedStreamCount] = removed; ++orderedStreamCount; } } } MOZ_ASSERT(orderedStreamCount == mFirstCycleBreaker); } void MediaStreamGraphImpl::NotifyHasCurrentData(MediaStream* aStream) { if (!aStream->mNotifiedHasCurrentData && aStream->mHasCurrentData) { for (uint32_t j = 0; j < aStream->mListeners.Length(); ++j) { MediaStreamListener* l = aStream->mListeners[j]; l->NotifyHasCurrentData(this); } aStream->mNotifiedHasCurrentData = true; } } void MediaStreamGraphImpl::CreateOrDestroyAudioStreams(MediaStream* aStream) { MOZ_ASSERT(mRealtime, "Should only attempt to create audio streams in real-time mode"); if (aStream->mAudioOutputs.IsEmpty()) { aStream->mAudioOutputStreams.Clear(); return; } if (!aStream->GetStreamTracks().GetAndResetTracksDirty() && !aStream->mAudioOutputStreams.IsEmpty()) { return; } STREAM_LOG(LogLevel::Debug, ("Updating AudioOutputStreams for MediaStream %p", aStream)); AutoTArray<bool,2> audioOutputStreamsFound; for (uint32_t i = 0; i < aStream->mAudioOutputStreams.Length(); ++i) { audioOutputStreamsFound.AppendElement(false); } for (StreamTracks::TrackIter tracks(aStream->GetStreamTracks(), MediaSegment::AUDIO); !tracks.IsEnded(); tracks.Next()) { uint32_t i; for (i = 0; i < audioOutputStreamsFound.Length(); ++i) { if (aStream->mAudioOutputStreams[i].mTrackID == tracks->GetID()) { break; } } if (i < audioOutputStreamsFound.Length()) { audioOutputStreamsFound[i] = true; } else { MediaStream::AudioOutputStream* audioOutputStream = aStream->mAudioOutputStreams.AppendElement(); audioOutputStream->mAudioPlaybackStartTime = mProcessedTime; audioOutputStream->mBlockedAudioTime = 0; audioOutputStream->mLastTickWritten = 0; audioOutputStream->mTrackID = tracks->GetID(); bool switching = false; { MonitorAutoLock lock(mMonitor); switching = CurrentDriver()->Switching(); } if (!CurrentDriver()->AsAudioCallbackDriver() && !switching) { MonitorAutoLock mon(mMonitor); if (mLifecycleState == LIFECYCLE_RUNNING) { AudioCallbackDriver* driver = new AudioCallbackDriver(this); CurrentDriver()->SwitchAtNextIteration(driver); } } } } for (int32_t i = audioOutputStreamsFound.Length() - 1; i >= 0; --i) { if (!audioOutputStreamsFound[i]) { aStream->mAudioOutputStreams.RemoveElementAt(i); } } } StreamTime MediaStreamGraphImpl::PlayAudio(MediaStream* aStream) { MOZ_ASSERT(mRealtime, "Should only attempt to play audio in realtime mode"); float volume = 0.0f; for (uint32_t i = 0; i < aStream->mAudioOutputs.Length(); ++i) { volume += aStream->mAudioOutputs[i].mVolume; } StreamTime ticksWritten = 0; for (uint32_t i = 0; i < aStream->mAudioOutputStreams.Length(); ++i) { ticksWritten = 0; MediaStream::AudioOutputStream& audioOutput = aStream->mAudioOutputStreams[i]; StreamTracks::Track* track = aStream->mTracks.FindTrack(audioOutput.mTrackID); AudioSegment* audio = track->Get<AudioSegment>(); AudioSegment output; StreamTime offset = aStream->GraphTimeToStreamTime(mProcessedTime); // We don't update aStream->mTracksStartTime here to account for time spent // blocked. Instead, we'll update it in UpdateCurrentTimeForStreams after // the blocked period has completed. But we do need to make sure we play // from the right offsets in the stream buffer, even if we've already // written silence for some amount of blocked time after the current time. GraphTime t = mProcessedTime; while (t < mStateComputedTime) { bool blocked = t >= aStream->mStartBlocking; GraphTime end = blocked ? mStateComputedTime : aStream->mStartBlocking; NS_ASSERTION(end <= mStateComputedTime, "mStartBlocking is wrong!"); // Check how many ticks of sound we can provide if we are blocked some // time in the middle of this cycle. StreamTime toWrite = end - t; if (blocked) { output.InsertNullDataAtStart(toWrite); ticksWritten += toWrite; STREAM_LOG(LogLevel::Verbose, ("MediaStream %p writing %ld blocking-silence samples for %f to %f (%ld to %ld)\n", aStream, toWrite, MediaTimeToSeconds(t), MediaTimeToSeconds(end), offset, offset + toWrite)); } else { StreamTime endTicksNeeded = offset + toWrite; StreamTime endTicksAvailable = audio->GetDuration(); if (endTicksNeeded <= endTicksAvailable) { STREAM_LOG(LogLevel::Verbose, ("MediaStream %p writing %ld samples for %f to %f " "(samples %ld to %ld)\n", aStream, toWrite, MediaTimeToSeconds(t), MediaTimeToSeconds(end), offset, endTicksNeeded)); output.AppendSlice(*audio, offset, endTicksNeeded); ticksWritten += toWrite; offset = endTicksNeeded; } else { // MOZ_ASSERT(track->IsEnded(), "Not enough data, and track not ended."); // If we are at the end of the track, maybe write the remaining // samples, and pad with/output silence. if (endTicksNeeded > endTicksAvailable && offset < endTicksAvailable) { output.AppendSlice(*audio, offset, endTicksAvailable); STREAM_LOG(LogLevel::Verbose, ("MediaStream %p writing %ld samples for %f to %f " "(samples %ld to %ld)\n", aStream, toWrite, MediaTimeToSeconds(t), MediaTimeToSeconds(end), offset, endTicksNeeded)); uint32_t available = endTicksAvailable - offset; ticksWritten += available; toWrite -= available; offset = endTicksAvailable; } output.AppendNullData(toWrite); STREAM_LOG(LogLevel::Verbose, ("MediaStream %p writing %ld padding slsamples for %f to " "%f (samples %ld to %ld)\n", aStream, toWrite, MediaTimeToSeconds(t), MediaTimeToSeconds(end), offset, endTicksNeeded)); ticksWritten += toWrite; } output.ApplyVolume(volume); } t = end; } audioOutput.mLastTickWritten = offset; // Need unique id for stream & track - and we want it to match the inserter output.WriteTo(LATENCY_STREAM_ID(aStream, track->GetID()), mMixer, AudioChannelCount(), mSampleRate); } return ticksWritten; } void MediaStreamGraphImpl::OpenAudioInputImpl(int aID, AudioDataListener *aListener) { // Bug 1238038 Need support for multiple mics at once if (mInputDeviceUsers.Count() > 0 && !mInputDeviceUsers.Get(aListener, nullptr)) { NS_ASSERTION(false, "Input from multiple mics not yet supported; bug 1238038"); // Need to support separate input-only AudioCallback drivers; they'll // call us back on "other" threads. We will need to echo-cancel them, though. return; } mInputWanted = true; // Add to count of users for this ID. // XXX Since we can't rely on IDs staying valid (ugh), use the listener as // a stand-in for the ID. Fix as part of support for multiple-captures // (Bug 1238038) uint32_t count = 0; mInputDeviceUsers.Get(aListener, &count); // ok if this fails count++; mInputDeviceUsers.Put(aListener, count); // creates a new entry in the hash if needed if (count == 1) { // first open for this listener // aID is a cubeb_devid, and we assume that opaque ptr is valid until // we close cubeb. mInputDeviceID = aID; mAudioInputs.AppendElement(aListener); // always monitor speaker data // Switch Drivers since we're adding input (to input-only or full-duplex) MonitorAutoLock mon(mMonitor); if (mLifecycleState == LIFECYCLE_RUNNING) { AudioCallbackDriver* driver = new AudioCallbackDriver(this); STREAM_LOG(LogLevel::Debug, ("OpenAudioInput: starting new AudioCallbackDriver(input) %p", driver)); LIFECYCLE_LOG("OpenAudioInput: starting new AudioCallbackDriver(input) %p", driver); driver->SetInputListener(aListener); CurrentDriver()->SwitchAtNextIteration(driver); } else { STREAM_LOG(LogLevel::Error, ("OpenAudioInput in shutdown!")); LIFECYCLE_LOG("OpenAudioInput in shutdown!"); NS_ASSERTION(false, "Can't open cubeb inputs in shutdown"); } } } nsresult MediaStreamGraphImpl::OpenAudioInput(int aID, AudioDataListener *aListener) { // So, so, so annoying. Can't AppendMessage except on Mainthread if (!NS_IsMainThread()) { NS_DispatchToMainThread(WrapRunnable(this, &MediaStreamGraphImpl::OpenAudioInput, aID, RefPtr<AudioDataListener>(aListener))); return NS_OK; } class Message : public ControlMessage { public: Message(MediaStreamGraphImpl *aGraph, int aID, AudioDataListener *aListener) : ControlMessage(nullptr), mGraph(aGraph), mID(aID), mListener(aListener) {} virtual void Run() { mGraph->OpenAudioInputImpl(mID, mListener); } MediaStreamGraphImpl *mGraph; int mID; RefPtr<AudioDataListener> mListener; }; // XXX Check not destroyed! this->AppendMessage(MakeUnique<Message>(this, aID, aListener)); return NS_OK; } void MediaStreamGraphImpl::CloseAudioInputImpl(AudioDataListener *aListener) { uint32_t count; DebugOnly<bool> result = mInputDeviceUsers.Get(aListener, &count); MOZ_ASSERT(result); if (--count > 0) { mInputDeviceUsers.Put(aListener, count); return; // still in use } mInputDeviceUsers.Remove(aListener); mInputDeviceID = -1; mInputWanted = false; AudioCallbackDriver *driver = CurrentDriver()->AsAudioCallbackDriver(); if (driver) { driver->RemoveInputListener(aListener); } mAudioInputs.RemoveElement(aListener); // Switch Drivers since we're adding or removing an input (to nothing/system or output only) bool shouldAEC = false; bool audioTrackPresent = AudioTrackPresent(shouldAEC); MonitorAutoLock mon(mMonitor); if (mLifecycleState == LIFECYCLE_RUNNING) { GraphDriver* driver; if (audioTrackPresent) { // We still have audio output STREAM_LOG(LogLevel::Debug, ("CloseInput: output present (AudioCallback)")); driver = new AudioCallbackDriver(this); CurrentDriver()->SwitchAtNextIteration(driver); } else if (CurrentDriver()->AsAudioCallbackDriver()) { STREAM_LOG(LogLevel::Debug, ("CloseInput: no output present (SystemClockCallback)")); driver = new SystemClockDriver(this); CurrentDriver()->SwitchAtNextIteration(driver); } // else SystemClockDriver->SystemClockDriver, no switch } } void MediaStreamGraphImpl::CloseAudioInput(AudioDataListener *aListener) { // So, so, so annoying. Can't AppendMessage except on Mainthread if (!NS_IsMainThread()) { NS_DispatchToMainThread(WrapRunnable(this, &MediaStreamGraphImpl::CloseAudioInput, RefPtr<AudioDataListener>(aListener))); return; } class Message : public ControlMessage { public: Message(MediaStreamGraphImpl *aGraph, AudioDataListener *aListener) : ControlMessage(nullptr), mGraph(aGraph), mListener(aListener) {} virtual void Run() { mGraph->CloseAudioInputImpl(mListener); } MediaStreamGraphImpl *mGraph; RefPtr<AudioDataListener> mListener; }; this->AppendMessage(MakeUnique<Message>(this, aListener)); } // All AudioInput listeners get the same speaker data (at least for now). void MediaStreamGraph::NotifyOutputData(AudioDataValue* aBuffer, size_t aFrames, TrackRate aRate, uint32_t aChannels) { for (auto& listener : mAudioInputs) { listener->NotifyOutputData(this, aBuffer, aFrames, aRate, aChannels); } } void MediaStreamGraph::AssertOnGraphThreadOrNotRunning() const { // either we're on the right thread (and calling CurrentDriver() is safe), // or we're going to assert anyways, so don't cross-check CurrentDriver #ifdef DEBUG MediaStreamGraphImpl const * graph = static_cast<MediaStreamGraphImpl const *>(this); // if all the safety checks fail, assert we own the monitor if (!graph->mDriver->OnThread()) { if (!(graph->mDetectedNotRunning && graph->mLifecycleState > MediaStreamGraphImpl::LIFECYCLE_RUNNING && NS_IsMainThread())) { graph->mMonitor.AssertCurrentThreadOwns(); } } #endif } bool MediaStreamGraphImpl::ShouldUpdateMainThread() { if (mRealtime) { return true; } TimeStamp now = TimeStamp::Now(); if ((now - mLastMainThreadUpdate).ToMilliseconds() > CurrentDriver()->IterationDuration()) { mLastMainThreadUpdate = now; return true; } return false; } void MediaStreamGraphImpl::PrepareUpdatesToMainThreadState(bool aFinalUpdate) { mMonitor.AssertCurrentThreadOwns(); // We don't want to frequently update the main thread about timing update // when we are not running in realtime. if (aFinalUpdate || ShouldUpdateMainThread()) { // Strip updates that will be obsoleted below, so as to keep the length of // mStreamUpdates sane. size_t keptUpdateCount = 0; for (size_t i = 0; i < mStreamUpdates.Length(); ++i) { MediaStream* stream = mStreamUpdates[i].mStream; // RemoveStreamGraphThread() clears mStream in updates for // streams that are removed from the graph. MOZ_ASSERT(!stream || stream->GraphImpl() == this); if (!stream || stream->MainThreadNeedsUpdates()) { // Discard this update as it has either been cleared when the stream // was destroyed or there will be a newer update below. continue; } if (keptUpdateCount != i) { mStreamUpdates[keptUpdateCount] = Move(mStreamUpdates[i]); MOZ_ASSERT(!mStreamUpdates[i].mStream); } ++keptUpdateCount; } mStreamUpdates.TruncateLength(keptUpdateCount); mStreamUpdates.SetCapacity(mStreamUpdates.Length() + mStreams.Length() + mSuspendedStreams.Length()); for (MediaStream* stream : AllStreams()) { if (!stream->MainThreadNeedsUpdates()) { continue; } StreamUpdate* update = mStreamUpdates.AppendElement(); update->mStream = stream; // No blocking to worry about here, since we've passed // UpdateCurrentTimeForStreams. update->mNextMainThreadCurrentTime = stream->GraphTimeToStreamTime(mProcessedTime); update->mNextMainThreadFinished = stream->mNotifiedFinished; } if (!mPendingUpdateRunnables.IsEmpty()) { mUpdateRunnables.AppendElements(Move(mPendingUpdateRunnables)); } } // If this is the final update, then a stable state event will soon be // posted just before this thread finishes, and so there is no need to also // post here. if (!aFinalUpdate && // Don't send the message to the main thread if it's not going to have // any work to do. !(mUpdateRunnables.IsEmpty() && mStreamUpdates.IsEmpty())) { EnsureStableStateEventPosted(); } } GraphTime MediaStreamGraphImpl::RoundUpToNextAudioBlock(GraphTime aTime) { StreamTime ticks = aTime; uint64_t block = ticks >> WEBAUDIO_BLOCK_SIZE_BITS; uint64_t nextBlock = block + 1; StreamTime nextTicks = nextBlock << WEBAUDIO_BLOCK_SIZE_BITS; return nextTicks; } void MediaStreamGraphImpl::ProduceDataForStreamsBlockByBlock(uint32_t aStreamIndex, TrackRate aSampleRate) { MOZ_ASSERT(aStreamIndex <= mFirstCycleBreaker, "Cycle breaker is not AudioNodeStream?"); GraphTime t = mProcessedTime; while (t < mStateComputedTime) { GraphTime next = RoundUpToNextAudioBlock(t); for (uint32_t i = mFirstCycleBreaker; i < mStreams.Length(); ++i) { auto ns = static_cast<AudioNodeStream*>(mStreams[i]); MOZ_ASSERT(ns->AsAudioNodeStream()); ns->ProduceOutputBeforeInput(t); } for (uint32_t i = aStreamIndex; i < mStreams.Length(); ++i) { ProcessedMediaStream* ps = mStreams[i]->AsProcessedStream(); if (ps) { ps->ProcessInput(t, next, (next == mStateComputedTime) ? ProcessedMediaStream::ALLOW_FINISH : 0); } } t = next; } NS_ASSERTION(t == mStateComputedTime, "Something went wrong with rounding to block boundaries"); } bool MediaStreamGraphImpl::AllFinishedStreamsNotified() { for (MediaStream* stream : AllStreams()) { if (stream->mFinished && !stream->mNotifiedFinished) { return false; } } return true; } void MediaStreamGraphImpl::RunMessageAfterProcessing(UniquePtr<ControlMessage> aMessage) { MOZ_ASSERT(CurrentDriver()->OnThread()); if (mFrontMessageQueue.IsEmpty()) { mFrontMessageQueue.AppendElement(); } // Only one block is used for messages from the graph thread. MOZ_ASSERT(mFrontMessageQueue.Length() == 1); mFrontMessageQueue[0].mMessages.AppendElement(Move(aMessage)); } void MediaStreamGraphImpl::RunMessagesInQueue() { // Calculate independent action times for each batch of messages (each // batch corresponding to an event loop task). This isolates the performance // of different scripts to some extent. for (uint32_t i = 0; i < mFrontMessageQueue.Length(); ++i) { nsTArray<UniquePtr<ControlMessage>>& messages = mFrontMessageQueue[i].mMessages; for (uint32_t j = 0; j < messages.Length(); ++j) { messages[j]->Run(); } } mFrontMessageQueue.Clear(); } void MediaStreamGraphImpl::UpdateGraph(GraphTime aEndBlockingDecisions) { MOZ_ASSERT(aEndBlockingDecisions >= mProcessedTime); // The next state computed time can be the same as the previous: it // means the driver would be have been blocking indefinitly, but the graph has // been woken up right after having been to sleep. MOZ_ASSERT(aEndBlockingDecisions >= mStateComputedTime); UpdateStreamOrder(); bool ensureNextIteration = false; // Grab pending stream input and compute blocking time for (MediaStream* stream : mStreams) { if (SourceMediaStream* is = stream->AsSourceStream()) { ExtractPendingInput(is, aEndBlockingDecisions, &ensureNextIteration); } if (stream->mFinished) { // The stream's not suspended, and since it's finished, underruns won't // stop it playing out. So there's no blocking other than what we impose // here. GraphTime endTime = stream->GetStreamTracks().GetAllTracksEnd() + stream->mTracksStartTime; if (endTime <= mStateComputedTime) { STREAM_LOG(LogLevel::Verbose, ("MediaStream %p is blocked due to being finished", stream)); stream->mStartBlocking = mStateComputedTime; } else { STREAM_LOG(LogLevel::Verbose, ("MediaStream %p is finished, but not blocked yet (end at %f, with blocking at %f)", stream, MediaTimeToSeconds(stream->GetTracksEnd()), MediaTimeToSeconds(endTime))); // Data can't be added to a finished stream, so underruns are irrelevant. stream->mStartBlocking = std::min(endTime, aEndBlockingDecisions); } } else { stream->mStartBlocking = WillUnderrun(stream, aEndBlockingDecisions); } } for (MediaStream* stream : mSuspendedStreams) { stream->mStartBlocking = mStateComputedTime; } // The loop is woken up so soon that IterationEnd() barely advances and we // end up having aEndBlockingDecision == mStateComputedTime. // Since stream blocking is computed in the interval of // [mStateComputedTime, aEndBlockingDecision), it won't be computed at all. // We should ensure next iteration so that pending blocking changes will be // computed in next loop. if (ensureNextIteration || aEndBlockingDecisions == mStateComputedTime) { EnsureNextIteration(); } } void MediaStreamGraphImpl::Process() { // Play stream contents. bool allBlockedForever = true; // True when we've done ProcessInput for all processed streams. bool doneAllProducing = false; // This is the number of frame that are written to the AudioStreams, for // this cycle. StreamTime ticksPlayed = 0; mMixer.StartMixing(); // Figure out what each stream wants to do for (uint32_t i = 0; i < mStreams.Length(); ++i) { MediaStream* stream = mStreams[i]; if (!doneAllProducing) { ProcessedMediaStream* ps = stream->AsProcessedStream(); if (ps) { AudioNodeStream* n = stream->AsAudioNodeStream(); if (n) { #ifdef DEBUG // Verify that the sampling rate for all of the following streams is the same for (uint32_t j = i + 1; j < mStreams.Length(); ++j) { AudioNodeStream* nextStream = mStreams[j]->AsAudioNodeStream(); if (nextStream) { MOZ_ASSERT(n->SampleRate() == nextStream->SampleRate(), "All AudioNodeStreams in the graph must have the same sampling rate"); } } #endif // Since an AudioNodeStream is present, go ahead and // produce audio block by block for all the rest of the streams. ProduceDataForStreamsBlockByBlock(i, n->SampleRate()); doneAllProducing = true; } else { ps->ProcessInput(mProcessedTime, mStateComputedTime, ProcessedMediaStream::ALLOW_FINISH); NS_ASSERTION(stream->mTracks.GetEnd() >= GraphTimeToStreamTimeWithBlocking(stream, mStateComputedTime), "Stream did not produce enough data"); } } } NotifyHasCurrentData(stream); // Only playback audio and video in real-time mode if (mRealtime) { CreateOrDestroyAudioStreams(stream); if (CurrentDriver()->AsAudioCallbackDriver()) { StreamTime ticksPlayedForThisStream = PlayAudio(stream); if (!ticksPlayed) { ticksPlayed = ticksPlayedForThisStream; } else { MOZ_ASSERT(!ticksPlayedForThisStream || ticksPlayedForThisStream == ticksPlayed, "Each stream should have the same number of frame."); } } } if (stream->mStartBlocking > mProcessedTime) { allBlockedForever = false; } } if (CurrentDriver()->AsAudioCallbackDriver() && ticksPlayed) { mMixer.FinishMixing(); } if (!allBlockedForever) { EnsureNextIteration(); } } bool MediaStreamGraphImpl::UpdateMainThreadState() { MonitorAutoLock lock(mMonitor); bool finalUpdate = mForceShutDown || (mProcessedTime >= mEndTime && AllFinishedStreamsNotified()) || (IsEmpty() && mBackMessageQueue.IsEmpty()); PrepareUpdatesToMainThreadState(finalUpdate); if (finalUpdate) { // Enter shutdown mode when this iteration is completed. // No need to Destroy streams here. The main-thread owner of each // stream is responsible for calling Destroy on them. return false; } CurrentDriver()->WaitForNextIteration(); SwapMessageQueues(); return true; } bool MediaStreamGraphImpl::OneIteration(GraphTime aStateEnd) { // Process graph message from the main thread for this iteration. RunMessagesInQueue(); UpdateStreamOrder(); GraphTime stateEnd = std::min(aStateEnd, mEndTime); UpdateGraph(stateEnd); mStateComputedTime = stateEnd; Process(); GraphTime oldProcessedTime = mProcessedTime; mProcessedTime = stateEnd; UpdateCurrentTimeForStreams(oldProcessedTime); ProcessChunkMetadata(oldProcessedTime); // Process graph messages queued from RunMessageAfterProcessing() on this // thread during the iteration. RunMessagesInQueue(); return UpdateMainThreadState(); } void MediaStreamGraphImpl::ApplyStreamUpdate(StreamUpdate* aUpdate) { mMonitor.AssertCurrentThreadOwns(); MediaStream* stream = aUpdate->mStream; if (!stream) return; stream->mMainThreadCurrentTime = aUpdate->mNextMainThreadCurrentTime; stream->mMainThreadFinished = aUpdate->mNextMainThreadFinished; if (stream->ShouldNotifyStreamFinished()) { stream->NotifyMainThreadListeners(); } } void MediaStreamGraphImpl::ForceShutDown(ShutdownTicket* aShutdownTicket) { NS_ASSERTION(NS_IsMainThread(), "Must be called on main thread"); STREAM_LOG(LogLevel::Debug, ("MediaStreamGraph %p ForceShutdown", this)); MonitorAutoLock lock(mMonitor); if (aShutdownTicket) { MOZ_ASSERT(!mForceShutdownTicket); // Avoid waiting forever for a graph to shut down // synchronously. Reports are that some 3rd-party audio drivers // occasionally hang in shutdown (both for us and Chrome). mShutdownTimer = do_CreateInstance(NS_TIMER_CONTRACTID); if (!mShutdownTimer) { return; } mShutdownTimer->InitWithCallback(this, MediaStreamGraph::AUDIO_CALLBACK_DRIVER_SHUTDOWN_TIMEOUT, nsITimer::TYPE_ONE_SHOT); } mForceShutDown = true; mForceShutdownTicket = aShutdownTicket; if (mLifecycleState == LIFECYCLE_THREAD_NOT_STARTED) { // We *could* have just sent this a message to start up, so don't // yank the rug out from under it. Tell it to startup and let it // shut down. RefPtr<GraphDriver> driver = CurrentDriver(); MonitorAutoUnlock unlock(mMonitor); driver->Start(); } EnsureNextIterationLocked(); } NS_IMETHODIMP MediaStreamGraphImpl::Notify(nsITimer* aTimer) { MonitorAutoLock lock(mMonitor); NS_ASSERTION(!mForceShutdownTicket, "MediaStreamGraph took too long to shut down!"); // Sigh, graph took too long to shut down. Stop blocking system // shutdown and hope all is well. mForceShutdownTicket = nullptr; return NS_OK; } /* static */ StaticRefPtr<nsIAsyncShutdownBlocker> gMediaStreamGraphShutdownBlocker; namespace { class MediaStreamGraphShutDownRunnable : public Runnable { public: explicit MediaStreamGraphShutDownRunnable(MediaStreamGraphImpl* aGraph) : mGraph(aGraph) {} NS_IMETHOD Run() { NS_ASSERTION(mGraph->mDetectedNotRunning, "We should know the graph thread control loop isn't running!"); LIFECYCLE_LOG("Shutting down graph %p", mGraph.get()); // We've asserted the graph isn't running. Use mDriver instead of CurrentDriver // to avoid thread-safety checks #if 0 // AudioCallbackDrivers are released asynchronously anyways // XXX a better test would be have setting mDetectedNotRunning make sure // any current callback has finished and block future ones -- or just // handle it all in Shutdown()! if (mGraph->mDriver->AsAudioCallbackDriver()) { MOZ_ASSERT(!mGraph->mDriver->AsAudioCallbackDriver()->InCallback()); } #endif mGraph->mDriver->Shutdown(); // This will wait until it's shutdown since // we'll start tearing down the graph after this // Safe to access these without the monitor since the graph isn't running. // We may be one of several graphs. Drop ticket to eventually unblock shutdown. if (mGraph->mShutdownTimer && !mGraph->mForceShutdownTicket) { MOZ_ASSERT(false, "AudioCallbackDriver took too long to shut down and we let shutdown" " continue - freezing and leaking"); // The timer fired, so we may be deeper in shutdown now. Block any further // teardown and just leak, for safety. return NS_OK; } mGraph->mForceShutdownTicket = nullptr; // We can't block past the final LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION // stage, since completion of that stage requires all streams to be freed, // which requires shutdown to proceed. // mGraph's thread is not running so it's OK to do whatever here if (mGraph->IsEmpty()) { // mGraph is no longer needed, so delete it. mGraph->Destroy(); } else { // The graph is not empty. We must be in a forced shutdown, or a // non-realtime graph that has finished processing. Some later // AppendMessage will detect that the manager has been emptied, and // delete it. NS_ASSERTION(mGraph->mForceShutDown || !mGraph->mRealtime, "Not in forced shutdown?"); for (MediaStream* stream : mGraph->AllStreams()) { // Clean up all MediaSegments since we cannot release Images too // late during shutdown. if (SourceMediaStream* source = stream->AsSourceStream()) { // Finishing a SourceStream prevents new data from being appended. source->Finish(); } stream->GetStreamTracks().Clear(); } mGraph->mLifecycleState = MediaStreamGraphImpl::LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION; } return NS_OK; } private: RefPtr<MediaStreamGraphImpl> mGraph; }; class MediaStreamGraphStableStateRunnable : public Runnable { public: explicit MediaStreamGraphStableStateRunnable(MediaStreamGraphImpl* aGraph, bool aSourceIsMSG) : mGraph(aGraph) , mSourceIsMSG(aSourceIsMSG) { } NS_IMETHOD Run() override { if (mGraph) { mGraph->RunInStableState(mSourceIsMSG); } return NS_OK; } private: RefPtr<MediaStreamGraphImpl> mGraph; bool mSourceIsMSG; }; /* * Control messages forwarded from main thread to graph manager thread */ class CreateMessage : public ControlMessage { public: explicit CreateMessage(MediaStream* aStream) : ControlMessage(aStream) {} void Run() override { mStream->GraphImpl()->AddStreamGraphThread(mStream); } void RunDuringShutdown() override { // Make sure to run this message during shutdown too, to make sure // that we balance the number of streams registered with the graph // as they're destroyed during shutdown. Run(); } }; } // namespace void MediaStreamGraphImpl::RunInStableState(bool aSourceIsMSG) { NS_ASSERTION(NS_IsMainThread(), "Must be called on main thread"); nsTArray<nsCOMPtr<nsIRunnable> > runnables; // When we're doing a forced shutdown, pending control messages may be // run on the main thread via RunDuringShutdown. Those messages must // run without the graph monitor being held. So, we collect them here. nsTArray<UniquePtr<ControlMessage>> controlMessagesToRunDuringShutdown; { MonitorAutoLock lock(mMonitor); if (aSourceIsMSG) { MOZ_ASSERT(mPostedRunInStableStateEvent); mPostedRunInStableStateEvent = false; } #ifdef ENABLE_LIFECYCLE_LOG // This should be kept in sync with the LifecycleState enum in // MediaStreamGraphImpl.h const char * LifecycleState_str[] = { "LIFECYCLE_THREAD_NOT_STARTED", "LIFECYCLE_RUNNING", "LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP", "LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN", "LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION" }; if (mLifecycleState != LIFECYCLE_RUNNING) { LIFECYCLE_LOG("Running %p in stable state. Current state: %s\n", this, LifecycleState_str[mLifecycleState]); } #endif runnables.SwapElements(mUpdateRunnables); for (uint32_t i = 0; i < mStreamUpdates.Length(); ++i) { StreamUpdate* update = &mStreamUpdates[i]; if (update->mStream) { ApplyStreamUpdate(update); } } mStreamUpdates.Clear(); if (mCurrentTaskMessageQueue.IsEmpty()) { if (mLifecycleState == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP && IsEmpty()) { // Complete shutdown. First, ensure that this graph is no longer used. // A new graph graph will be created if one is needed. // Asynchronously clean up old graph. We don't want to do this // synchronously because it spins the event loop waiting for threads // to shut down, and we don't want to do that in a stable state handler. mLifecycleState = LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN; LIFECYCLE_LOG("Sending MediaStreamGraphShutDownRunnable %p", this); nsCOMPtr<nsIRunnable> event = new MediaStreamGraphShutDownRunnable(this ); NS_DispatchToMainThread(event.forget()); LIFECYCLE_LOG("Disconnecting MediaStreamGraph %p", this); MediaStreamGraphImpl* graph; if (gGraphs.Get(uint32_t(mAudioChannel), &graph) && graph == this) { // null out gGraph if that's the graph being shut down gGraphs.Remove(uint32_t(mAudioChannel)); } } } else { if (mLifecycleState <= LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) { MessageBlock* block = mBackMessageQueue.AppendElement(); block->mMessages.SwapElements(mCurrentTaskMessageQueue); EnsureNextIterationLocked(); } // If the MediaStreamGraph has more messages going to it, try to revive // it to process those messages. Don't do this if we're in a forced // shutdown or it's a non-realtime graph that has already terminated // processing. if (mLifecycleState == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP && mRealtime && !mForceShutDown) { mLifecycleState = LIFECYCLE_RUNNING; // Revive the MediaStreamGraph since we have more messages going to it. // Note that we need to put messages into its queue before reviving it, // or it might exit immediately. { LIFECYCLE_LOG("Reviving a graph (%p) ! %s\n", this, CurrentDriver()->AsAudioCallbackDriver() ? "AudioDriver" : "SystemDriver"); RefPtr<GraphDriver> driver = CurrentDriver(); MonitorAutoUnlock unlock(mMonitor); driver->Revive(); } } } // Don't start the thread for a non-realtime graph until it has been // explicitly started by StartNonRealtimeProcessing. if (mLifecycleState == LIFECYCLE_THREAD_NOT_STARTED && (mRealtime || mNonRealtimeProcessing)) { mLifecycleState = LIFECYCLE_RUNNING; // Start the thread now. We couldn't start it earlier because // the graph might exit immediately on finding it has no streams. The // first message for a new graph must create a stream. { // We should exit the monitor for now, because starting a stream might // take locks, and we don't want to deadlock. LIFECYCLE_LOG("Starting a graph (%p) ! %s\n", this, CurrentDriver()->AsAudioCallbackDriver() ? "AudioDriver" : "SystemDriver"); RefPtr<GraphDriver> driver = CurrentDriver(); MonitorAutoUnlock unlock(mMonitor); driver->Start(); // It's not safe to Shutdown() a thread from StableState, and // releasing this may shutdown a SystemClockDriver thread. // Proxy the release to outside of StableState. NS_ReleaseOnMainThread(driver.forget(), true); // always proxy } } if ((mForceShutDown || !mRealtime) && mLifecycleState == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) { // Defer calls to RunDuringShutdown() to happen while mMonitor is not held. for (uint32_t i = 0; i < mBackMessageQueue.Length(); ++i) { MessageBlock& mb = mBackMessageQueue[i]; controlMessagesToRunDuringShutdown.AppendElements(Move(mb.mMessages)); } mBackMessageQueue.Clear(); MOZ_ASSERT(mCurrentTaskMessageQueue.IsEmpty()); // Stop MediaStreamGraph threads. Do not clear gGraph since // we have outstanding DOM objects that may need it. mLifecycleState = LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN; nsCOMPtr<nsIRunnable> event = new MediaStreamGraphShutDownRunnable(this); NS_DispatchToMainThread(event.forget()); } mDetectedNotRunning = mLifecycleState > LIFECYCLE_RUNNING; } // Make sure we get a new current time in the next event loop task if (!aSourceIsMSG) { MOZ_ASSERT(mPostedRunInStableState); mPostedRunInStableState = false; } for (uint32_t i = 0; i < controlMessagesToRunDuringShutdown.Length(); ++i) { controlMessagesToRunDuringShutdown[i]->RunDuringShutdown(); } #ifdef DEBUG mCanRunMessagesSynchronously = mDetectedNotRunning && mLifecycleState >= LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN; #endif for (uint32_t i = 0; i < runnables.Length(); ++i) { runnables[i]->Run(); // "Direct" tail dispatcher are supposed to run immediately following the // execution of the current task. So the meta-tasking that we do here in // RunInStableState() breaks that abstraction a bit unless we handle it here. // // This is particularly important because we can end up with a "stream // ended" notification immediately following a "stream available" notification, // and we need to make sure that the watcher responding to "stream available" // has a chance to run before the second notification starts tearing things // down. AbstractThread::MainThread()->TailDispatcher().DrainDirectTasks(); } } void MediaStreamGraphImpl::EnsureRunInStableState() { NS_ASSERTION(NS_IsMainThread(), "main thread only"); if (mPostedRunInStableState) return; mPostedRunInStableState = true; nsCOMPtr<nsIRunnable> event = new MediaStreamGraphStableStateRunnable(this, false); nsContentUtils::RunInStableState(event.forget()); } void MediaStreamGraphImpl::EnsureStableStateEventPosted() { mMonitor.AssertCurrentThreadOwns(); if (mPostedRunInStableStateEvent) return; mPostedRunInStableStateEvent = true; nsCOMPtr<nsIRunnable> event = new MediaStreamGraphStableStateRunnable(this, true); NS_DispatchToMainThread(event.forget()); } void MediaStreamGraphImpl::SignalMainThreadCleanup() { MOZ_ASSERT(mDriver->OnThread()); MonitorAutoLock lock(mMonitor); STREAM_LOG(LogLevel::Debug, ("MediaStreamGraph %p waiting for main thread cleanup", this)); mLifecycleState = MediaStreamGraphImpl::LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP; EnsureStableStateEventPosted(); } void MediaStreamGraphImpl::AppendMessage(UniquePtr<ControlMessage> aMessage) { MOZ_ASSERT(NS_IsMainThread(), "main thread only"); MOZ_ASSERT(!aMessage->GetStream() || !aMessage->GetStream()->IsDestroyed(), "Stream already destroyed"); if (mDetectedNotRunning && mLifecycleState > LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) { // The graph control loop is not running and main thread cleanup has // happened. From now on we can't append messages to mCurrentTaskMessageQueue, // because that will never be processed again, so just RunDuringShutdown // this message. // This should only happen during forced shutdown, or after a non-realtime // graph has finished processing. #ifdef DEBUG MOZ_ASSERT(mCanRunMessagesSynchronously); mCanRunMessagesSynchronously = false; #endif aMessage->RunDuringShutdown(); #ifdef DEBUG mCanRunMessagesSynchronously = true; #endif if (IsEmpty() && mLifecycleState >= LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION) { MediaStreamGraphImpl* graph; if (gGraphs.Get(uint32_t(mAudioChannel), &graph) && graph == this) { gGraphs.Remove(uint32_t(mAudioChannel)); } Destroy(); } return; } mCurrentTaskMessageQueue.AppendElement(Move(aMessage)); EnsureRunInStableState(); } MediaStream::MediaStream() : mTracksStartTime(0) , mStartBlocking(GRAPH_TIME_MAX) , mSuspendedCount(0) , mFinished(false) , mNotifiedFinished(false) , mNotifiedBlocked(false) , mHasCurrentData(false) , mNotifiedHasCurrentData(false) , mMainThreadCurrentTime(0) , mMainThreadFinished(false) , mFinishedNotificationSent(false) , mMainThreadDestroyed(false) , mNrOfMainThreadUsers(0) , mGraph(nullptr) , mAudioChannelType(dom::AudioChannel::Normal) { MOZ_COUNT_CTOR(MediaStream); } MediaStream::~MediaStream() { MOZ_COUNT_DTOR(MediaStream); NS_ASSERTION(mMainThreadDestroyed, "Should have been destroyed already"); NS_ASSERTION(mMainThreadListeners.IsEmpty(), "All main thread listeners should have been removed"); } size_t MediaStream::SizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const { size_t amount = 0; // Not owned: // - mGraph - Not reported here // - mConsumers - elements // Future: // - mVideoOutputs - elements // - mLastPlayedVideoFrame // - mListeners - elements // - mAudioOutputStream - elements amount += mTracks.SizeOfExcludingThis(aMallocSizeOf); amount += mAudioOutputs.ShallowSizeOfExcludingThis(aMallocSizeOf); amount += mVideoOutputs.ShallowSizeOfExcludingThis(aMallocSizeOf); amount += mListeners.ShallowSizeOfExcludingThis(aMallocSizeOf); amount += mMainThreadListeners.ShallowSizeOfExcludingThis(aMallocSizeOf); amount += mDisabledTracks.ShallowSizeOfExcludingThis(aMallocSizeOf); amount += mConsumers.ShallowSizeOfExcludingThis(aMallocSizeOf); return amount; } size_t MediaStream::SizeOfIncludingThis(MallocSizeOf aMallocSizeOf) const { return aMallocSizeOf(this) + SizeOfExcludingThis(aMallocSizeOf); } MediaStreamGraphImpl* MediaStream::GraphImpl() { return mGraph; } MediaStreamGraph* MediaStream::Graph() { return mGraph; } void MediaStream::SetGraphImpl(MediaStreamGraphImpl* aGraph) { MOZ_ASSERT(!mGraph, "Should only be called once"); mGraph = aGraph; mAudioChannelType = aGraph->AudioChannel(); mTracks.InitGraphRate(aGraph->GraphRate()); } void MediaStream::SetGraphImpl(MediaStreamGraph* aGraph) { MediaStreamGraphImpl* graph = static_cast<MediaStreamGraphImpl*>(aGraph); SetGraphImpl(graph); } StreamTime MediaStream::GraphTimeToStreamTime(GraphTime aTime) { NS_ASSERTION(mStartBlocking == GraphImpl()->mStateComputedTime || aTime <= mStartBlocking, "Incorrectly ignoring blocking!"); return aTime - mTracksStartTime; } GraphTime MediaStream::StreamTimeToGraphTime(StreamTime aTime) { NS_ASSERTION(mStartBlocking == GraphImpl()->mStateComputedTime || aTime + mTracksStartTime <= mStartBlocking, "Incorrectly ignoring blocking!"); return aTime + mTracksStartTime; } StreamTime MediaStream::GraphTimeToStreamTimeWithBlocking(GraphTime aTime) { return GraphImpl()->GraphTimeToStreamTimeWithBlocking(this, aTime); } void MediaStream::FinishOnGraphThread() { GraphImpl()->FinishStream(this); } StreamTracks::Track* MediaStream::FindTrack(TrackID aID) { return mTracks.FindTrack(aID); } StreamTracks::Track* MediaStream::EnsureTrack(TrackID aTrackId) { StreamTracks::Track* track = mTracks.FindTrack(aTrackId); if (!track) { nsAutoPtr<MediaSegment> segment(new AudioSegment()); for (uint32_t j = 0; j < mListeners.Length(); ++j) { MediaStreamListener* l = mListeners[j]; l->NotifyQueuedTrackChanges(Graph(), aTrackId, 0, TrackEventCommand::TRACK_EVENT_CREATED, *segment); // TODO If we ever need to ensure several tracks at once, we will have to // change this. l->NotifyFinishedTrackCreation(Graph()); } track = &mTracks.AddTrack(aTrackId, 0, segment.forget()); } return track; } void MediaStream::RemoveAllListenersImpl() { for (int32_t i = mListeners.Length() - 1; i >= 0; --i) { RefPtr<MediaStreamListener> listener = mListeners[i].forget(); listener->NotifyEvent(GraphImpl(), MediaStreamGraphEvent::EVENT_REMOVED); } mListeners.Clear(); } void MediaStream::DestroyImpl() { for (int32_t i = mConsumers.Length() - 1; i >= 0; --i) { mConsumers[i]->Disconnect(); } mGraph = nullptr; } void MediaStream::Destroy() { NS_ASSERTION(mNrOfMainThreadUsers == 0, "Do not mix Destroy() and RegisterUser()/UnregisterUser()"); // Keep this stream alive until we leave this method RefPtr<MediaStream> kungFuDeathGrip = this; class Message : public ControlMessage { public: explicit Message(MediaStream* aStream) : ControlMessage(aStream) {} void Run() override { mStream->RemoveAllListenersImpl(); auto graph = mStream->GraphImpl(); mStream->DestroyImpl(); graph->RemoveStreamGraphThread(mStream); } void RunDuringShutdown() override { Run(); } }; GraphImpl()->AppendMessage(MakeUnique<Message>(this)); // Message::RunDuringShutdown may have removed this stream from the graph, // but our kungFuDeathGrip above will have kept this stream alive if // necessary. mMainThreadDestroyed = true; } void MediaStream::RegisterUser() { MOZ_ASSERT(NS_IsMainThread()); ++mNrOfMainThreadUsers; } void MediaStream::UnregisterUser() { MOZ_ASSERT(NS_IsMainThread()); --mNrOfMainThreadUsers; NS_ASSERTION(mNrOfMainThreadUsers >= 0, "Double-removal of main thread user"); NS_ASSERTION(!IsDestroyed(), "Do not mix Destroy() and RegisterUser()/UnregisterUser()"); if (mNrOfMainThreadUsers == 0) { Destroy(); } } void MediaStream::AddAudioOutput(void* aKey) { class Message : public ControlMessage { public: Message(MediaStream* aStream, void* aKey) : ControlMessage(aStream), mKey(aKey) {} void Run() override { mStream->AddAudioOutputImpl(mKey); } void* mKey; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aKey)); } void MediaStream::SetAudioOutputVolumeImpl(void* aKey, float aVolume) { for (uint32_t i = 0; i < mAudioOutputs.Length(); ++i) { if (mAudioOutputs[i].mKey == aKey) { mAudioOutputs[i].mVolume = aVolume; return; } } NS_ERROR("Audio output key not found"); } void MediaStream::SetAudioOutputVolume(void* aKey, float aVolume) { class Message : public ControlMessage { public: Message(MediaStream* aStream, void* aKey, float aVolume) : ControlMessage(aStream), mKey(aKey), mVolume(aVolume) {} void Run() override { mStream->SetAudioOutputVolumeImpl(mKey, mVolume); } void* mKey; float mVolume; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aKey, aVolume)); } void MediaStream::AddAudioOutputImpl(void* aKey) { STREAM_LOG(LogLevel::Info, ("MediaStream %p Adding AudioOutput for key %p", this, aKey)); mAudioOutputs.AppendElement(AudioOutput(aKey)); } void MediaStream::RemoveAudioOutputImpl(void* aKey) { STREAM_LOG(LogLevel::Info, ("MediaStream %p Removing AudioOutput for key %p", this, aKey)); for (uint32_t i = 0; i < mAudioOutputs.Length(); ++i) { if (mAudioOutputs[i].mKey == aKey) { mAudioOutputs.RemoveElementAt(i); return; } } NS_ERROR("Audio output key not found"); } void MediaStream::RemoveAudioOutput(void* aKey) { class Message : public ControlMessage { public: Message(MediaStream* aStream, void* aKey) : ControlMessage(aStream), mKey(aKey) {} void Run() override { mStream->RemoveAudioOutputImpl(mKey); } void* mKey; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aKey)); } void MediaStream::AddVideoOutputImpl(already_AddRefed<MediaStreamVideoSink> aSink, TrackID aID) { RefPtr<MediaStreamVideoSink> sink = aSink; STREAM_LOG(LogLevel::Info, ("MediaStream %p Adding MediaStreamVideoSink %p as output", this, sink.get())); MOZ_ASSERT(aID != TRACK_NONE); for (auto entry : mVideoOutputs) { if (entry.mListener == sink && (entry.mTrackID == TRACK_ANY || entry.mTrackID == aID)) { return; } } TrackBound<MediaStreamVideoSink>* l = mVideoOutputs.AppendElement(); l->mListener = sink; l->mTrackID = aID; AddDirectTrackListenerImpl(sink.forget(), aID); } void MediaStream::RemoveVideoOutputImpl(MediaStreamVideoSink* aSink, TrackID aID) { STREAM_LOG(LogLevel::Info, ("MediaStream %p Removing MediaStreamVideoSink %p as output", this, aSink)); MOZ_ASSERT(aID != TRACK_NONE); // Ensure that any frames currently queued for playback by the compositor // are removed. aSink->ClearFrames(); for (size_t i = 0; i < mVideoOutputs.Length(); ++i) { if (mVideoOutputs[i].mListener == aSink && (mVideoOutputs[i].mTrackID == TRACK_ANY || mVideoOutputs[i].mTrackID == aID)) { mVideoOutputs.RemoveElementAt(i); } } RemoveDirectTrackListenerImpl(aSink, aID); } void MediaStream::AddVideoOutput(MediaStreamVideoSink* aSink, TrackID aID) { class Message : public ControlMessage { public: Message(MediaStream* aStream, MediaStreamVideoSink* aSink, TrackID aID) : ControlMessage(aStream), mSink(aSink), mID(aID) {} void Run() override { mStream->AddVideoOutputImpl(mSink.forget(), mID); } RefPtr<MediaStreamVideoSink> mSink; TrackID mID; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aSink, aID)); } void MediaStream::RemoveVideoOutput(MediaStreamVideoSink* aSink, TrackID aID) { class Message : public ControlMessage { public: Message(MediaStream* aStream, MediaStreamVideoSink* aSink, TrackID aID) : ControlMessage(aStream), mSink(aSink), mID(aID) {} void Run() override { mStream->RemoveVideoOutputImpl(mSink, mID); } RefPtr<MediaStreamVideoSink> mSink; TrackID mID; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aSink, aID)); } void MediaStream::Suspend() { class Message : public ControlMessage { public: explicit Message(MediaStream* aStream) : ControlMessage(aStream) {} void Run() override { mStream->GraphImpl()->IncrementSuspendCount(mStream); } }; // This can happen if this method has been called asynchronously, and the // stream has been destroyed since then. if (mMainThreadDestroyed) { return; } GraphImpl()->AppendMessage(MakeUnique<Message>(this)); } void MediaStream::Resume() { class Message : public ControlMessage { public: explicit Message(MediaStream* aStream) : ControlMessage(aStream) {} void Run() override { mStream->GraphImpl()->DecrementSuspendCount(mStream); } }; // This can happen if this method has been called asynchronously, and the // stream has been destroyed since then. if (mMainThreadDestroyed) { return; } GraphImpl()->AppendMessage(MakeUnique<Message>(this)); } void MediaStream::AddListenerImpl(already_AddRefed<MediaStreamListener> aListener) { MediaStreamListener* listener = *mListeners.AppendElement() = aListener; listener->NotifyBlockingChanged(GraphImpl(), mNotifiedBlocked ? MediaStreamListener::BLOCKED : MediaStreamListener::UNBLOCKED); for (StreamTracks::TrackIter it(mTracks); !it.IsEnded(); it.Next()) { MediaStream* inputStream = nullptr; TrackID inputTrackID = TRACK_INVALID; if (ProcessedMediaStream* ps = AsProcessedStream()) { // The only ProcessedMediaStream where we should have listeners is // TrackUnionStream - it's what's used as owned stream in DOMMediaStream, // the only main-thread exposed stream type. // TrackUnionStream guarantees that each of its tracks has an input track. // Other types do not implement GetInputStreamFor() and will return null. inputStream = ps->GetInputStreamFor(it->GetID()); MOZ_ASSERT(inputStream); inputTrackID = ps->GetInputTrackIDFor(it->GetID()); MOZ_ASSERT(IsTrackIDExplicit(inputTrackID)); } uint32_t flags = TrackEventCommand::TRACK_EVENT_CREATED; if (it->IsEnded()) { flags |= TrackEventCommand::TRACK_EVENT_ENDED; } nsAutoPtr<MediaSegment> segment(it->GetSegment()->CreateEmptyClone()); listener->NotifyQueuedTrackChanges(Graph(), it->GetID(), it->GetEnd(), static_cast<TrackEventCommand>(flags), *segment, inputStream, inputTrackID); } if (mNotifiedFinished) { listener->NotifyEvent(GraphImpl(), MediaStreamGraphEvent::EVENT_FINISHED); } if (mNotifiedHasCurrentData) { listener->NotifyHasCurrentData(GraphImpl()); } } void MediaStream::AddListener(MediaStreamListener* aListener) { class Message : public ControlMessage { public: Message(MediaStream* aStream, MediaStreamListener* aListener) : ControlMessage(aStream), mListener(aListener) {} void Run() override { mStream->AddListenerImpl(mListener.forget()); } RefPtr<MediaStreamListener> mListener; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aListener)); } void MediaStream::RemoveListenerImpl(MediaStreamListener* aListener) { // wouldn't need this if we could do it in the opposite order RefPtr<MediaStreamListener> listener(aListener); mListeners.RemoveElement(aListener); listener->NotifyEvent(GraphImpl(), MediaStreamGraphEvent::EVENT_REMOVED); } void MediaStream::RemoveListener(MediaStreamListener* aListener) { class Message : public ControlMessage { public: Message(MediaStream* aStream, MediaStreamListener* aListener) : ControlMessage(aStream), mListener(aListener) {} void Run() override { mStream->RemoveListenerImpl(mListener); } RefPtr<MediaStreamListener> mListener; }; // If the stream is destroyed the Listeners have or will be // removed. if (!IsDestroyed()) { GraphImpl()->AppendMessage(MakeUnique<Message>(this, aListener)); } } void MediaStream::AddTrackListenerImpl(already_AddRefed<MediaStreamTrackListener> aListener, TrackID aTrackID) { TrackBound<MediaStreamTrackListener>* l = mTrackListeners.AppendElement(); l->mListener = aListener; l->mTrackID = aTrackID; StreamTracks::Track* track = FindTrack(aTrackID); if (!track) { return; } PrincipalHandle lastPrincipalHandle = track->GetSegment()->GetLastPrincipalHandle(); l->mListener->NotifyPrincipalHandleChanged(Graph(), lastPrincipalHandle); } void MediaStream::AddTrackListener(MediaStreamTrackListener* aListener, TrackID aTrackID) { class Message : public ControlMessage { public: Message(MediaStream* aStream, MediaStreamTrackListener* aListener, TrackID aTrackID) : ControlMessage(aStream), mListener(aListener), mTrackID(aTrackID) {} virtual void Run() { mStream->AddTrackListenerImpl(mListener.forget(), mTrackID); } RefPtr<MediaStreamTrackListener> mListener; TrackID mTrackID; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aListener, aTrackID)); } void MediaStream::RemoveTrackListenerImpl(MediaStreamTrackListener* aListener, TrackID aTrackID) { for (size_t i = 0; i < mTrackListeners.Length(); ++i) { if (mTrackListeners[i].mListener == aListener && mTrackListeners[i].mTrackID == aTrackID) { mTrackListeners[i].mListener->NotifyRemoved(); mTrackListeners.RemoveElementAt(i); return; } } } void MediaStream::RemoveTrackListener(MediaStreamTrackListener* aListener, TrackID aTrackID) { class Message : public ControlMessage { public: Message(MediaStream* aStream, MediaStreamTrackListener* aListener, TrackID aTrackID) : ControlMessage(aStream), mListener(aListener), mTrackID(aTrackID) {} virtual void Run() { mStream->RemoveTrackListenerImpl(mListener, mTrackID); } RefPtr<MediaStreamTrackListener> mListener; TrackID mTrackID; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aListener, aTrackID)); } void MediaStream::AddDirectTrackListenerImpl(already_AddRefed<DirectMediaStreamTrackListener> aListener, TrackID aTrackID) { // Base implementation, for streams that don't support direct track listeners. RefPtr<DirectMediaStreamTrackListener> listener = aListener; listener->NotifyDirectListenerInstalled( DirectMediaStreamTrackListener::InstallationResult::STREAM_NOT_SUPPORTED); } void MediaStream::AddDirectTrackListener(DirectMediaStreamTrackListener* aListener, TrackID aTrackID) { class Message : public ControlMessage { public: Message(MediaStream* aStream, DirectMediaStreamTrackListener* aListener, TrackID aTrackID) : ControlMessage(aStream), mListener(aListener), mTrackID(aTrackID) {} virtual void Run() { mStream->AddDirectTrackListenerImpl(mListener.forget(), mTrackID); } RefPtr<DirectMediaStreamTrackListener> mListener; TrackID mTrackID; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aListener, aTrackID)); } void MediaStream::RemoveDirectTrackListenerImpl(DirectMediaStreamTrackListener* aListener, TrackID aTrackID) { // Base implementation, the listener was never added so nothing to do. RefPtr<DirectMediaStreamTrackListener> listener = aListener; } void MediaStream::RemoveDirectTrackListener(DirectMediaStreamTrackListener* aListener, TrackID aTrackID) { class Message : public ControlMessage { public: Message(MediaStream* aStream, DirectMediaStreamTrackListener* aListener, TrackID aTrackID) : ControlMessage(aStream), mListener(aListener), mTrackID(aTrackID) {} virtual void Run() { mStream->RemoveDirectTrackListenerImpl(mListener, mTrackID); } RefPtr<DirectMediaStreamTrackListener> mListener; TrackID mTrackID; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aListener, aTrackID)); } void MediaStream::RunAfterPendingUpdates(already_AddRefed<nsIRunnable> aRunnable) { MOZ_ASSERT(NS_IsMainThread()); MediaStreamGraphImpl* graph = GraphImpl(); nsCOMPtr<nsIRunnable> runnable(aRunnable); // Special case when a non-realtime graph has not started, to ensure the // runnable will run in finite time. if (!(graph->mRealtime || graph->mNonRealtimeProcessing)) { runnable->Run(); return; } class Message : public ControlMessage { public: explicit Message(MediaStream* aStream, already_AddRefed<nsIRunnable> aRunnable) : ControlMessage(aStream) , mRunnable(aRunnable) {} void Run() override { mStream->Graph()-> DispatchToMainThreadAfterStreamStateUpdate(mRunnable.forget()); } void RunDuringShutdown() override { // Don't run mRunnable now as it may call AppendMessage() which would // assume that there are no remaining controlMessagesToRunDuringShutdown. MOZ_ASSERT(NS_IsMainThread()); NS_DispatchToCurrentThread(mRunnable); } private: nsCOMPtr<nsIRunnable> mRunnable; }; graph->AppendMessage(MakeUnique<Message>(this, runnable.forget())); } void MediaStream::SetTrackEnabledImpl(TrackID aTrackID, DisabledTrackMode aMode) { if (aMode == DisabledTrackMode::ENABLED) { for (int32_t i = mDisabledTracks.Length() - 1; i >= 0; --i) { if (aTrackID == mDisabledTracks[i].mTrackID) { mDisabledTracks.RemoveElementAt(i); return; } } } else { for (const DisabledTrack& t : mDisabledTracks) { if (aTrackID == t.mTrackID) { NS_ERROR("Changing disabled track mode for a track is not allowed"); return; } } mDisabledTracks.AppendElement(Move(DisabledTrack(aTrackID, aMode))); } } DisabledTrackMode MediaStream::GetDisabledTrackMode(TrackID aTrackID) { for (const DisabledTrack& t : mDisabledTracks) { if (t.mTrackID == aTrackID) { return t.mMode; } } return DisabledTrackMode::ENABLED; } void MediaStream::SetTrackEnabled(TrackID aTrackID, DisabledTrackMode aMode) { class Message : public ControlMessage { public: Message(MediaStream* aStream, TrackID aTrackID, DisabledTrackMode aMode) : ControlMessage(aStream), mTrackID(aTrackID), mMode(aMode) {} void Run() override { mStream->SetTrackEnabledImpl(mTrackID, mMode); } TrackID mTrackID; DisabledTrackMode mMode; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aTrackID, aMode)); } void MediaStream::ApplyTrackDisabling(TrackID aTrackID, MediaSegment* aSegment, MediaSegment* aRawSegment) { DisabledTrackMode mode = GetDisabledTrackMode(aTrackID); if (mode == DisabledTrackMode::ENABLED) { return; } if (mode == DisabledTrackMode::SILENCE_BLACK) { aSegment->ReplaceWithDisabled(); if (aRawSegment) { aRawSegment->ReplaceWithDisabled(); } } else if (mode == DisabledTrackMode::SILENCE_FREEZE) { aSegment->ReplaceWithNull(); if (aRawSegment) { aRawSegment->ReplaceWithNull(); } } else { MOZ_CRASH("Unsupported mode"); } } void MediaStream::AddMainThreadListener(MainThreadMediaStreamListener* aListener) { MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(aListener); MOZ_ASSERT(!mMainThreadListeners.Contains(aListener)); mMainThreadListeners.AppendElement(aListener); // If it is not yet time to send the notification, then finish here. if (!mFinishedNotificationSent) { return; } class NotifyRunnable final : public Runnable { public: explicit NotifyRunnable(MediaStream* aStream) : mStream(aStream) {} NS_IMETHOD Run() override { MOZ_ASSERT(NS_IsMainThread()); mStream->NotifyMainThreadListeners(); return NS_OK; } private: ~NotifyRunnable() {} RefPtr<MediaStream> mStream; }; nsCOMPtr<nsIRunnable> runnable = new NotifyRunnable(this); Unused << NS_WARN_IF(NS_FAILED(NS_DispatchToMainThread(runnable.forget()))); } SourceMediaStream::SourceMediaStream() : MediaStream(), mMutex("mozilla::media::SourceMediaStream"), mUpdateKnownTracksTime(0), mPullEnabled(false), mUpdateFinished(false), mNeedsMixing(false) { } nsresult SourceMediaStream::OpenAudioInput(int aID, AudioDataListener *aListener) { if (GraphImpl()) { mInputListener = aListener; return GraphImpl()->OpenAudioInput(aID, aListener); } return NS_ERROR_FAILURE; } void SourceMediaStream::CloseAudioInput() { // Destroy() may have run already and cleared this if (GraphImpl() && mInputListener) { GraphImpl()->CloseAudioInput(mInputListener); } mInputListener = nullptr; } void SourceMediaStream::DestroyImpl() { CloseAudioInput(); GraphImpl()->AssertOnGraphThreadOrNotRunning(); for (int32_t i = mConsumers.Length() - 1; i >= 0; --i) { // Disconnect before we come under mMutex's lock since it can call back // through RemoveDirectTrackListenerImpl() and deadlock. mConsumers[i]->Disconnect(); } // Hold mMutex while mGraph is reset so that other threads holding mMutex // can null-check know that the graph will not destroyed. MutexAutoLock lock(mMutex); MediaStream::DestroyImpl(); } void SourceMediaStream::SetPullEnabled(bool aEnabled) { MutexAutoLock lock(mMutex); mPullEnabled = aEnabled; if (mPullEnabled && GraphImpl()) { GraphImpl()->EnsureNextIteration(); } } void SourceMediaStream::AddTrackInternal(TrackID aID, TrackRate aRate, StreamTime aStart, MediaSegment* aSegment, uint32_t aFlags) { MutexAutoLock lock(mMutex); nsTArray<TrackData> *track_data = (aFlags & ADDTRACK_QUEUED) ? &mPendingTracks : &mUpdateTracks; TrackData* data = track_data->AppendElement(); LIFECYCLE_LOG("AddTrackInternal: %lu/%lu", mPendingTracks.Length(), mUpdateTracks.Length()); data->mID = aID; data->mInputRate = aRate; data->mResamplerChannelCount = 0; data->mStart = aStart; data->mEndOfFlushedData = aStart; data->mCommands = TRACK_CREATE; data->mData = aSegment; ResampleAudioToGraphSampleRate(data, aSegment); if (!(aFlags & ADDTRACK_QUEUED) && GraphImpl()) { GraphImpl()->EnsureNextIteration(); } } void SourceMediaStream::AddAudioTrack(TrackID aID, TrackRate aRate, StreamTime aStart, AudioSegment* aSegment, uint32_t aFlags) { AddTrackInternal(aID, aRate, aStart, aSegment, aFlags); } void SourceMediaStream::FinishAddTracks() { MutexAutoLock lock(mMutex); mUpdateTracks.AppendElements(Move(mPendingTracks)); LIFECYCLE_LOG("FinishAddTracks: %lu/%lu", mPendingTracks.Length(), mUpdateTracks.Length()); if (GraphImpl()) { GraphImpl()->EnsureNextIteration(); } } void SourceMediaStream::ResampleAudioToGraphSampleRate(TrackData* aTrackData, MediaSegment* aSegment) { if (aSegment->GetType() != MediaSegment::AUDIO || aTrackData->mInputRate == GraphImpl()->GraphRate()) { return; } AudioSegment* segment = static_cast<AudioSegment*>(aSegment); int channels = segment->ChannelCount(); // If this segment is just silence, we delay instanciating the resampler. We // also need to recreate the resampler if the channel count changes. if (channels && aTrackData->mResamplerChannelCount != channels) { SpeexResamplerState* state = speex_resampler_init(channels, aTrackData->mInputRate, GraphImpl()->GraphRate(), SPEEX_RESAMPLER_QUALITY_MIN, nullptr); if (!state) { return; } aTrackData->mResampler.own(state); aTrackData->mResamplerChannelCount = channels; } segment->ResampleChunks(aTrackData->mResampler, aTrackData->mInputRate, GraphImpl()->GraphRate()); } void SourceMediaStream::AdvanceTimeVaryingValuesToCurrentTime(GraphTime aCurrentTime, GraphTime aBlockedTime) { MutexAutoLock lock(mMutex); mTracksStartTime += aBlockedTime; mStreamTracksStartTimeStamp += TimeDuration::FromSeconds(GraphImpl()->MediaTimeToSeconds(aBlockedTime)); mTracks.ForgetUpTo(aCurrentTime - mTracksStartTime); } bool SourceMediaStream::AppendToTrack(TrackID aID, MediaSegment* aSegment, MediaSegment *aRawSegment) { MutexAutoLock lock(mMutex); // ::EndAllTrackAndFinished() can end these before the sources notice bool appended = false; auto graph = GraphImpl(); if (!mFinished && graph) { TrackData *track = FindDataForTrack(aID); if (track) { // Data goes into mData, and on the next iteration of the MSG moves // into the track's segment after NotifyQueuedTrackChanges(). This adds // 0-10ms of delay before data gets to direct listeners. // Indirect listeners (via subsequent TrackUnion nodes) are synced to // playout time, and so can be delayed by buffering. // Apply track disabling before notifying any consumers directly // or inserting into the graph ApplyTrackDisabling(aID, aSegment, aRawSegment); ResampleAudioToGraphSampleRate(track, aSegment); // Must notify first, since AppendFrom() will empty out aSegment NotifyDirectConsumers(track, aRawSegment ? aRawSegment : aSegment); track->mData->AppendFrom(aSegment); // note: aSegment is now dead appended = true; GraphImpl()->EnsureNextIteration(); } else { aSegment->Clear(); } } return appended; } void SourceMediaStream::NotifyDirectConsumers(TrackData *aTrack, MediaSegment *aSegment) { mMutex.AssertCurrentThreadOwns(); MOZ_ASSERT(aTrack); for (uint32_t j = 0; j < mDirectListeners.Length(); ++j) { DirectMediaStreamListener* l = mDirectListeners[j]; StreamTime offset = 0; // FIX! need a separate StreamTime.... or the end of the internal buffer l->NotifyRealtimeData(static_cast<MediaStreamGraph*>(GraphImpl()), aTrack->mID, offset, aTrack->mCommands, *aSegment); } for (const TrackBound<DirectMediaStreamTrackListener>& source : mDirectTrackListeners) { if (aTrack->mID != source.mTrackID) { continue; } StreamTime offset = 0; // FIX! need a separate StreamTime.... or the end of the internal buffer source.mListener->NotifyRealtimeTrackDataAndApplyTrackDisabling(Graph(), offset, *aSegment); } } // These handle notifying all the listeners of an event void SourceMediaStream::NotifyListenersEventImpl(MediaStreamGraphEvent aEvent) { for (uint32_t j = 0; j < mListeners.Length(); ++j) { MediaStreamListener* l = mListeners[j]; l->NotifyEvent(GraphImpl(), aEvent); } } void SourceMediaStream::NotifyListenersEvent(MediaStreamGraphEvent aNewEvent) { class Message : public ControlMessage { public: Message(SourceMediaStream* aStream, MediaStreamGraphEvent aEvent) : ControlMessage(aStream), mEvent(aEvent) {} void Run() override { mStream->AsSourceStream()->NotifyListenersEventImpl(mEvent); } MediaStreamGraphEvent mEvent; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aNewEvent)); } void SourceMediaStream::AddDirectListener(DirectMediaStreamListener* aListener) { bool wasEmpty; { MutexAutoLock lock(mMutex); wasEmpty = mDirectListeners.IsEmpty(); mDirectListeners.AppendElement(aListener); } if (wasEmpty) { // Async NotifyListenersEvent(MediaStreamGraphEvent::EVENT_HAS_DIRECT_LISTENERS); } } void SourceMediaStream::RemoveDirectListener(DirectMediaStreamListener* aListener) { bool isEmpty; { MutexAutoLock lock(mMutex); mDirectListeners.RemoveElement(aListener); isEmpty = mDirectListeners.IsEmpty(); } if (isEmpty) { // Async NotifyListenersEvent(MediaStreamGraphEvent::EVENT_HAS_NO_DIRECT_LISTENERS); } } void SourceMediaStream::AddDirectTrackListenerImpl(already_AddRefed<DirectMediaStreamTrackListener> aListener, TrackID aTrackID) { MOZ_ASSERT(IsTrackIDExplicit(aTrackID)); TrackData* data; bool found = false; bool isAudio = false; bool isVideo = false; RefPtr<DirectMediaStreamTrackListener> listener = aListener; STREAM_LOG(LogLevel::Debug, ("Adding direct track listener %p bound to track %d to source stream %p", listener.get(), aTrackID, this)); { MutexAutoLock lock(mMutex); data = FindDataForTrack(aTrackID); found = !!data; if (found) { isAudio = data->mData->GetType() == MediaSegment::AUDIO; isVideo = data->mData->GetType() == MediaSegment::VIDEO; } // The track might be removed from mUpdateTrack but still exist in // mTracks. auto streamTrack = FindTrack(aTrackID); bool foundTrack = !!streamTrack; if (foundTrack) { MediaStreamVideoSink* videoSink = listener->AsMediaStreamVideoSink(); // Re-send missed VideoSegment to new added MediaStreamVideoSink. if (streamTrack->GetType() == MediaSegment::VIDEO && videoSink) { VideoSegment videoSegment; if (mTracks.GetForgottenDuration() < streamTrack->GetSegment()->GetDuration()) { videoSegment.AppendSlice(*streamTrack->GetSegment(), mTracks.GetForgottenDuration(), streamTrack->GetSegment()->GetDuration()); } else { VideoSegment* streamTrackSegment = static_cast<VideoSegment*>(streamTrack->GetSegment()); VideoChunk* lastChunk = streamTrackSegment->GetLastChunk(); if (lastChunk) { StreamTime startTime = streamTrackSegment->GetDuration() - lastChunk->GetDuration(); videoSegment.AppendSlice(*streamTrackSegment, startTime, streamTrackSegment->GetDuration()); } } if (found) { videoSegment.AppendSlice(*data->mData, 0, data->mData->GetDuration()); } videoSink->SetCurrentFrames(videoSegment); } } if (found && (isAudio || isVideo)) { for (auto entry : mDirectTrackListeners) { if (entry.mListener == listener && (entry.mTrackID == TRACK_ANY || entry.mTrackID == aTrackID)) { listener->NotifyDirectListenerInstalled( DirectMediaStreamTrackListener::InstallationResult::ALREADY_EXISTS); return; } } TrackBound<DirectMediaStreamTrackListener>* sourceListener = mDirectTrackListeners.AppendElement(); sourceListener->mListener = listener; sourceListener->mTrackID = aTrackID; } } if (!found) { STREAM_LOG(LogLevel::Warning, ("Couldn't find source track for direct track listener %p", listener.get())); listener->NotifyDirectListenerInstalled( DirectMediaStreamTrackListener::InstallationResult::TRACK_NOT_FOUND_AT_SOURCE); return; } if (!isAudio && !isVideo) { STREAM_LOG(LogLevel::Warning, ("Source track for direct track listener %p is unknown", listener.get())); // It is not a video or audio track. MOZ_ASSERT(true); return; } STREAM_LOG(LogLevel::Debug, ("Added direct track listener %p", listener.get())); listener->NotifyDirectListenerInstalled( DirectMediaStreamTrackListener::InstallationResult::SUCCESS); } void SourceMediaStream::RemoveDirectTrackListenerImpl(DirectMediaStreamTrackListener* aListener, TrackID aTrackID) { MutexAutoLock lock(mMutex); for (int32_t i = mDirectTrackListeners.Length() - 1; i >= 0; --i) { const TrackBound<DirectMediaStreamTrackListener>& source = mDirectTrackListeners[i]; if (source.mListener == aListener && source.mTrackID == aTrackID) { aListener->NotifyDirectListenerUninstalled(); mDirectTrackListeners.RemoveElementAt(i); } } } StreamTime SourceMediaStream::GetEndOfAppendedData(TrackID aID) { MutexAutoLock lock(mMutex); TrackData *track = FindDataForTrack(aID); if (track) { return track->mEndOfFlushedData + track->mData->GetDuration(); } NS_ERROR("Track not found"); return 0; } void SourceMediaStream::EndTrack(TrackID aID) { MutexAutoLock lock(mMutex); TrackData *track = FindDataForTrack(aID); if (track) { track->mCommands |= TrackEventCommand::TRACK_EVENT_ENDED; } if (auto graph = GraphImpl()) { graph->EnsureNextIteration(); } } void SourceMediaStream::AdvanceKnownTracksTime(StreamTime aKnownTime) { MutexAutoLock lock(mMutex); MOZ_ASSERT(aKnownTime >= mUpdateKnownTracksTime); mUpdateKnownTracksTime = aKnownTime; if (auto graph = GraphImpl()) { graph->EnsureNextIteration(); } } void SourceMediaStream::FinishWithLockHeld() { mMutex.AssertCurrentThreadOwns(); mUpdateFinished = true; if (auto graph = GraphImpl()) { graph->EnsureNextIteration(); } } void SourceMediaStream::SetTrackEnabledImpl(TrackID aTrackID, DisabledTrackMode aMode) { { MutexAutoLock lock(mMutex); for (TrackBound<DirectMediaStreamTrackListener>& l: mDirectTrackListeners) { if (l.mTrackID != aTrackID) { continue; } DisabledTrackMode oldMode = GetDisabledTrackMode(aTrackID); bool oldEnabled = oldMode == DisabledTrackMode::ENABLED; if (!oldEnabled && aMode == DisabledTrackMode::ENABLED) { STREAM_LOG(LogLevel::Debug, ("SourceMediaStream %p track %d setting " "direct listener enabled", this, aTrackID)); l.mListener->DecreaseDisabled(oldMode); } else if (oldEnabled && aMode != DisabledTrackMode::ENABLED) { STREAM_LOG(LogLevel::Debug, ("SourceMediaStream %p track %d setting " "direct listener disabled", this, aTrackID)); l.mListener->IncreaseDisabled(aMode); } } } MediaStream::SetTrackEnabledImpl(aTrackID, aMode); } void SourceMediaStream::EndAllTrackAndFinish() { MutexAutoLock lock(mMutex); for (uint32_t i = 0; i < mUpdateTracks.Length(); ++i) { SourceMediaStream::TrackData* data = &mUpdateTracks[i]; data->mCommands |= TrackEventCommand::TRACK_EVENT_ENDED; } mPendingTracks.Clear(); FinishWithLockHeld(); // we will call NotifyEvent() to let GetUserMedia know } SourceMediaStream::~SourceMediaStream() { } void SourceMediaStream::RegisterForAudioMixing() { MutexAutoLock lock(mMutex); mNeedsMixing = true; } bool SourceMediaStream::NeedsMixing() { MutexAutoLock lock(mMutex); return mNeedsMixing; } bool SourceMediaStream::HasPendingAudioTrack() { MutexAutoLock lock(mMutex); bool audioTrackPresent = false; for (auto& data : mPendingTracks) { if (data.mData->GetType() == MediaSegment::AUDIO) { audioTrackPresent = true; break; } } return audioTrackPresent; } void MediaInputPort::Init() { STREAM_LOG(LogLevel::Debug, ("Adding MediaInputPort %p (from %p to %p) to the graph", this, mSource, mDest)); mSource->AddConsumer(this); mDest->AddInput(this); // mPortCount decremented via MediaInputPort::Destroy's message ++mDest->GraphImpl()->mPortCount; } void MediaInputPort::Disconnect() { GraphImpl()->AssertOnGraphThreadOrNotRunning(); NS_ASSERTION(!mSource == !mDest, "mSource must either both be null or both non-null"); if (!mSource) return; mSource->RemoveConsumer(this); mDest->RemoveInput(this); mSource = nullptr; mDest = nullptr; GraphImpl()->SetStreamOrderDirty(); } MediaInputPort::InputInterval MediaInputPort::GetNextInputInterval(GraphTime aTime) { InputInterval result = { GRAPH_TIME_MAX, GRAPH_TIME_MAX, false }; if (aTime >= mDest->mStartBlocking) { return result; } result.mStart = aTime; result.mEnd = mDest->mStartBlocking; result.mInputIsBlocked = aTime >= mSource->mStartBlocking; if (!result.mInputIsBlocked) { result.mEnd = std::min(result.mEnd, mSource->mStartBlocking); } return result; } void MediaInputPort::Destroy() { class Message : public ControlMessage { public: explicit Message(MediaInputPort* aPort) : ControlMessage(nullptr), mPort(aPort) {} void Run() override { mPort->Disconnect(); --mPort->GraphImpl()->mPortCount; mPort->SetGraphImpl(nullptr); NS_RELEASE(mPort); } void RunDuringShutdown() override { Run(); } MediaInputPort* mPort; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this)); } MediaStreamGraphImpl* MediaInputPort::GraphImpl() { return mGraph; } MediaStreamGraph* MediaInputPort::Graph() { return mGraph; } void MediaInputPort::SetGraphImpl(MediaStreamGraphImpl* aGraph) { MOZ_ASSERT(!mGraph || !aGraph, "Should only be set once"); mGraph = aGraph; } void MediaInputPort::BlockSourceTrackIdImpl(TrackID aTrackId, BlockingMode aBlockingMode) { mBlockedTracks.AppendElement(Pair<TrackID, BlockingMode>(aTrackId, aBlockingMode)); } already_AddRefed<Pledge<bool>> MediaInputPort::BlockSourceTrackId(TrackID aTrackId, BlockingMode aBlockingMode) { class Message : public ControlMessage { public: explicit Message(MediaInputPort* aPort, TrackID aTrackId, BlockingMode aBlockingMode, already_AddRefed<nsIRunnable> aRunnable) : ControlMessage(aPort->GetDestination()), mPort(aPort), mTrackId(aTrackId), mBlockingMode(aBlockingMode), mRunnable(aRunnable) {} void Run() override { mPort->BlockSourceTrackIdImpl(mTrackId, mBlockingMode); if (mRunnable) { mStream->Graph()->DispatchToMainThreadAfterStreamStateUpdate(mRunnable.forget()); } } void RunDuringShutdown() override { Run(); } RefPtr<MediaInputPort> mPort; TrackID mTrackId; BlockingMode mBlockingMode; nsCOMPtr<nsIRunnable> mRunnable; }; MOZ_ASSERT(IsTrackIDExplicit(aTrackId), "Only explicit TrackID is allowed"); RefPtr<Pledge<bool>> pledge = new Pledge<bool>(); nsCOMPtr<nsIRunnable> runnable = NewRunnableFrom([pledge]() { MOZ_ASSERT(NS_IsMainThread()); pledge->Resolve(true); return NS_OK; }); GraphImpl()->AppendMessage(MakeUnique<Message>(this, aTrackId, aBlockingMode, runnable.forget())); return pledge.forget(); } already_AddRefed<MediaInputPort> ProcessedMediaStream::AllocateInputPort(MediaStream* aStream, TrackID aTrackID, TrackID aDestTrackID, uint16_t aInputNumber, uint16_t aOutputNumber, nsTArray<TrackID>* aBlockedTracks) { // This method creates two references to the MediaInputPort: one for // the main thread, and one for the MediaStreamGraph. class Message : public ControlMessage { public: explicit Message(MediaInputPort* aPort) : ControlMessage(aPort->GetDestination()), mPort(aPort) {} void Run() override { mPort->Init(); // The graph holds its reference implicitly mPort->GraphImpl()->SetStreamOrderDirty(); Unused << mPort.forget(); } void RunDuringShutdown() override { Run(); } RefPtr<MediaInputPort> mPort; }; MOZ_ASSERT(aStream->GraphImpl() == GraphImpl()); MOZ_ASSERT(aTrackID == TRACK_ANY || IsTrackIDExplicit(aTrackID), "Only TRACK_ANY and explicit ID are allowed for source track"); MOZ_ASSERT(aDestTrackID == TRACK_ANY || IsTrackIDExplicit(aDestTrackID), "Only TRACK_ANY and explicit ID are allowed for destination track"); MOZ_ASSERT(aTrackID != TRACK_ANY || aDestTrackID == TRACK_ANY, "Generic MediaInputPort cannot produce a single destination track"); RefPtr<MediaInputPort> port = new MediaInputPort(aStream, aTrackID, this, aDestTrackID, aInputNumber, aOutputNumber); if (aBlockedTracks) { for (TrackID trackID : *aBlockedTracks) { port->BlockSourceTrackIdImpl(trackID, BlockingMode::CREATION); } } port->SetGraphImpl(GraphImpl()); GraphImpl()->AppendMessage(MakeUnique<Message>(port)); return port.forget(); } void ProcessedMediaStream::Finish() { class Message : public ControlMessage { public: explicit Message(ProcessedMediaStream* aStream) : ControlMessage(aStream) {} void Run() override { mStream->GraphImpl()->FinishStream(mStream); } }; GraphImpl()->AppendMessage(MakeUnique<Message>(this)); } void ProcessedMediaStream::SetAutofinish(bool aAutofinish) { class Message : public ControlMessage { public: Message(ProcessedMediaStream* aStream, bool aAutofinish) : ControlMessage(aStream), mAutofinish(aAutofinish) {} void Run() override { static_cast<ProcessedMediaStream*>(mStream)->SetAutofinishImpl(mAutofinish); } bool mAutofinish; }; GraphImpl()->AppendMessage(MakeUnique<Message>(this, aAutofinish)); } void ProcessedMediaStream::DestroyImpl() { for (int32_t i = mInputs.Length() - 1; i >= 0; --i) { mInputs[i]->Disconnect(); } MediaStream::DestroyImpl(); // The stream order is only important if there are connections, in which // case MediaInputPort::Disconnect() called SetStreamOrderDirty(). // MediaStreamGraphImpl::RemoveStreamGraphThread() will also call // SetStreamOrderDirty(), for other reasons. } MediaStreamGraphImpl::MediaStreamGraphImpl(GraphDriverType aDriverRequested, TrackRate aSampleRate, dom::AudioChannel aChannel) : MediaStreamGraph(aSampleRate) , mPortCount(0) , mInputWanted(false) , mInputDeviceID(-1) , mOutputWanted(true) , mOutputDeviceID(-1) , mNeedAnotherIteration(false) , mGraphDriverAsleep(false) , mMonitor("MediaStreamGraphImpl") , mLifecycleState(LIFECYCLE_THREAD_NOT_STARTED) , mEndTime(GRAPH_TIME_MAX) , mForceShutDown(false) , mPostedRunInStableStateEvent(false) , mDetectedNotRunning(false) , mPostedRunInStableState(false) , mRealtime(aDriverRequested != OFFLINE_THREAD_DRIVER) , mNonRealtimeProcessing(false) , mStreamOrderDirty(false) , mLatencyLog(AsyncLatencyLogger::Get()) #ifdef MOZ_WEBRTC , mFarendObserverRef(nullptr) #endif , mSelfRef(this) #ifdef DEBUG , mCanRunMessagesSynchronously(false) #endif , mAudioChannel(aChannel) { if (mRealtime) { if (aDriverRequested == AUDIO_THREAD_DRIVER) { AudioCallbackDriver* driver = new AudioCallbackDriver(this); mDriver = driver; } else { mDriver = new SystemClockDriver(this); } } else { mDriver = new OfflineClockDriver(this, MEDIA_GRAPH_TARGET_PERIOD_MS); } mLastMainThreadUpdate = TimeStamp::Now(); RegisterWeakAsyncMemoryReporter(this); } void MediaStreamGraphImpl::Destroy() { // First unregister from memory reporting. UnregisterWeakMemoryReporter(this); // Clear the self reference which will destroy this instance if all // associated GraphDrivers are destroyed. mSelfRef = nullptr; } MediaStreamGraph* MediaStreamGraph::GetInstance(MediaStreamGraph::GraphDriverType aGraphDriverRequested, dom::AudioChannel aChannel) { NS_ASSERTION(NS_IsMainThread(), "Main thread only"); uint32_t channel = static_cast<uint32_t>(aChannel); MediaStreamGraphImpl* graph = nullptr; if (!gGraphs.Get(channel, &graph)) { if (!gMediaStreamGraphShutdownBlocker) { class Blocker : public media::ShutdownBlocker { public: Blocker() : media::ShutdownBlocker(NS_LITERAL_STRING( "MediaStreamGraph shutdown: blocking on msg thread")) {} NS_IMETHOD BlockShutdown(nsIAsyncShutdownClient* aProfileBeforeChange) override { // Distribute the global async shutdown blocker in a ticket. If there // are zero graphs then shutdown is unblocked when we go out of scope. RefPtr<MediaStreamGraphImpl::ShutdownTicket> ticket = new MediaStreamGraphImpl::ShutdownTicket(gMediaStreamGraphShutdownBlocker.get()); gMediaStreamGraphShutdownBlocker = nullptr; for (auto iter = gGraphs.Iter(); !iter.Done(); iter.Next()) { iter.UserData()->ForceShutDown(ticket); } return NS_OK; } }; gMediaStreamGraphShutdownBlocker = new Blocker(); nsCOMPtr<nsIAsyncShutdownClient> barrier = MediaStreamGraphImpl::GetShutdownBarrier(); nsresult rv = barrier-> AddBlocker(gMediaStreamGraphShutdownBlocker, NS_LITERAL_STRING(__FILE__), __LINE__, NS_LITERAL_STRING("MediaStreamGraph shutdown")); MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv)); } graph = new MediaStreamGraphImpl(aGraphDriverRequested, CubebUtils::PreferredSampleRate(), aChannel); gGraphs.Put(channel, graph); STREAM_LOG(LogLevel::Debug, ("Starting up MediaStreamGraph %p for channel %s", graph, AudioChannelValues::strings[channel].value)); } return graph; } MediaStreamGraph* MediaStreamGraph::CreateNonRealtimeInstance(TrackRate aSampleRate) { NS_ASSERTION(NS_IsMainThread(), "Main thread only"); MediaStreamGraphImpl* graph = new MediaStreamGraphImpl(OFFLINE_THREAD_DRIVER, aSampleRate, AudioChannel::Normal); STREAM_LOG(LogLevel::Debug, ("Starting up Offline MediaStreamGraph %p", graph)); return graph; } void MediaStreamGraph::DestroyNonRealtimeInstance(MediaStreamGraph* aGraph) { NS_ASSERTION(NS_IsMainThread(), "Main thread only"); MOZ_ASSERT(aGraph->IsNonRealtime(), "Should not destroy the global graph here"); MediaStreamGraphImpl* graph = static_cast<MediaStreamGraphImpl*>(aGraph); if (!graph->mNonRealtimeProcessing) { // Start the graph, but don't produce anything graph->StartNonRealtimeProcessing(0); } graph->ForceShutDown(nullptr); } NS_IMPL_ISUPPORTS(MediaStreamGraphImpl, nsIMemoryReporter, nsITimerCallback) NS_IMETHODIMP MediaStreamGraphImpl::CollectReports(nsIHandleReportCallback* aHandleReport, nsISupports* aData, bool aAnonymize) { if (mLifecycleState >= LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN) { // Shutting down, nothing to report. FinishCollectReports(aHandleReport, aData, nsTArray<AudioNodeSizes>()); return NS_OK; } class Message final : public ControlMessage { public: Message(MediaStreamGraphImpl *aGraph, nsIHandleReportCallback* aHandleReport, nsISupports *aHandlerData) : ControlMessage(nullptr) , mGraph(aGraph) , mHandleReport(aHandleReport) , mHandlerData(aHandlerData) {} void Run() override { mGraph->CollectSizesForMemoryReport(mHandleReport.forget(), mHandlerData.forget()); } void RunDuringShutdown() override { // Run this message during shutdown too, so that endReports is called. Run(); } MediaStreamGraphImpl *mGraph; // nsMemoryReporterManager keeps the callback and data alive only if it // does not time out. nsCOMPtr<nsIHandleReportCallback> mHandleReport; nsCOMPtr<nsISupports> mHandlerData; }; // When a non-realtime graph has not started, there is no thread yet, so // collect sizes on this thread. if (!(mRealtime || mNonRealtimeProcessing)) { CollectSizesForMemoryReport(do_AddRef(aHandleReport), do_AddRef(aData)); return NS_OK; } AppendMessage(MakeUnique<Message>(this, aHandleReport, aData)); return NS_OK; } void MediaStreamGraphImpl::CollectSizesForMemoryReport( already_AddRefed<nsIHandleReportCallback> aHandleReport, already_AddRefed<nsISupports> aHandlerData) { class FinishCollectRunnable final : public Runnable { public: explicit FinishCollectRunnable( already_AddRefed<nsIHandleReportCallback> aHandleReport, already_AddRefed<nsISupports> aHandlerData) : mHandleReport(aHandleReport) , mHandlerData(aHandlerData) {} NS_IMETHOD Run() override { MediaStreamGraphImpl::FinishCollectReports(mHandleReport, mHandlerData, Move(mAudioStreamSizes)); return NS_OK; } nsTArray<AudioNodeSizes> mAudioStreamSizes; private: ~FinishCollectRunnable() {} // Avoiding nsCOMPtr because NSCAP_ASSERT_NO_QUERY_NEEDED in its // constructor modifies the ref-count, which cannot be done off main // thread. RefPtr<nsIHandleReportCallback> mHandleReport; RefPtr<nsISupports> mHandlerData; }; RefPtr<FinishCollectRunnable> runnable = new FinishCollectRunnable(Move(aHandleReport), Move(aHandlerData)); auto audioStreamSizes = &runnable->mAudioStreamSizes; for (MediaStream* s : AllStreams()) { AudioNodeStream* stream = s->AsAudioNodeStream(); if (stream) { AudioNodeSizes* usage = audioStreamSizes->AppendElement(); stream->SizeOfAudioNodesIncludingThis(MallocSizeOf, *usage); } } NS_DispatchToMainThread(runnable.forget()); } void MediaStreamGraphImpl:: FinishCollectReports(nsIHandleReportCallback* aHandleReport, nsISupports* aData, const nsTArray<AudioNodeSizes>& aAudioStreamSizes) { MOZ_ASSERT(NS_IsMainThread()); nsCOMPtr<nsIMemoryReporterManager> manager = do_GetService("@mozilla.org/memory-reporter-manager;1"); if (!manager) return; #define REPORT(_path, _amount, _desc) \ aHandleReport->Callback(EmptyCString(), _path, KIND_HEAP, UNITS_BYTES, \ _amount, NS_LITERAL_CSTRING(_desc), aData); for (size_t i = 0; i < aAudioStreamSizes.Length(); i++) { const AudioNodeSizes& usage = aAudioStreamSizes[i]; const char* const nodeType = usage.mNodeType ? usage.mNodeType : "<unknown>"; nsPrintfCString enginePath("explicit/webaudio/audio-node/%s/engine-objects", nodeType); REPORT(enginePath, usage.mEngine, "Memory used by AudioNode engine objects (Web Audio)."); nsPrintfCString streamPath("explicit/webaudio/audio-node/%s/stream-objects", nodeType); REPORT(streamPath, usage.mStream, "Memory used by AudioNode stream objects (Web Audio)."); } size_t hrtfLoaders = WebCore::HRTFDatabaseLoader::sizeOfLoaders(MallocSizeOf); if (hrtfLoaders) { REPORT(NS_LITERAL_CSTRING( "explicit/webaudio/audio-node/PannerNode/hrtf-databases"), hrtfLoaders, "Memory used by PannerNode databases (Web Audio)."); } #undef REPORT manager->EndReport(); } SourceMediaStream* MediaStreamGraph::CreateSourceStream() { SourceMediaStream* stream = new SourceMediaStream(); AddStream(stream); return stream; } ProcessedMediaStream* MediaStreamGraph::CreateTrackUnionStream() { TrackUnionStream* stream = new TrackUnionStream(); AddStream(stream); return stream; } ProcessedMediaStream* MediaStreamGraph::CreateAudioCaptureStream(TrackID aTrackId) { AudioCaptureStream* stream = new AudioCaptureStream(aTrackId); AddStream(stream); return stream; } void MediaStreamGraph::AddStream(MediaStream* aStream) { NS_ADDREF(aStream); MediaStreamGraphImpl* graph = static_cast<MediaStreamGraphImpl*>(this); aStream->SetGraphImpl(graph); graph->AppendMessage(MakeUnique<CreateMessage>(aStream)); } class GraphStartedRunnable final : public Runnable { public: GraphStartedRunnable(AudioNodeStream* aStream, MediaStreamGraph* aGraph) : mStream(aStream) , mGraph(aGraph) { } NS_IMETHOD Run() override { mGraph->NotifyWhenGraphStarted(mStream); return NS_OK; } private: RefPtr<AudioNodeStream> mStream; MediaStreamGraph* mGraph; }; void MediaStreamGraph::NotifyWhenGraphStarted(AudioNodeStream* aStream) { MOZ_ASSERT(NS_IsMainThread()); class GraphStartedNotificationControlMessage : public ControlMessage { public: explicit GraphStartedNotificationControlMessage(AudioNodeStream* aStream) : ControlMessage(aStream) { } void Run() override { // This runs on the graph thread, so when this runs, and the current // driver is an AudioCallbackDriver, we know the audio hardware is // started. If not, we are going to switch soon, keep reposting this // ControlMessage. MediaStreamGraphImpl* graphImpl = mStream->GraphImpl(); if (graphImpl->CurrentDriver()->AsAudioCallbackDriver()) { nsCOMPtr<nsIRunnable> event = new dom::StateChangeTask( mStream->AsAudioNodeStream(), nullptr, AudioContextState::Running); NS_DispatchToMainThread(event.forget()); } else { nsCOMPtr<nsIRunnable> event = new GraphStartedRunnable( mStream->AsAudioNodeStream(), mStream->Graph()); NS_DispatchToMainThread(event.forget()); } } void RunDuringShutdown() override { } }; if (!aStream->IsDestroyed()) { MediaStreamGraphImpl* graphImpl = static_cast<MediaStreamGraphImpl*>(this); graphImpl->AppendMessage(MakeUnique<GraphStartedNotificationControlMessage>(aStream)); } } void MediaStreamGraphImpl::IncrementSuspendCount(MediaStream* aStream) { if (!aStream->IsSuspended()) { MOZ_ASSERT(mStreams.Contains(aStream)); mStreams.RemoveElement(aStream); mSuspendedStreams.AppendElement(aStream); SetStreamOrderDirty(); } aStream->IncrementSuspendCount(); } void MediaStreamGraphImpl::DecrementSuspendCount(MediaStream* aStream) { bool wasSuspended = aStream->IsSuspended(); aStream->DecrementSuspendCount(); if (wasSuspended && !aStream->IsSuspended()) { MOZ_ASSERT(mSuspendedStreams.Contains(aStream)); mSuspendedStreams.RemoveElement(aStream); mStreams.AppendElement(aStream); ProcessedMediaStream* ps = aStream->AsProcessedStream(); if (ps) { ps->mCycleMarker = NOT_VISITED; } SetStreamOrderDirty(); } } void MediaStreamGraphImpl::SuspendOrResumeStreams(AudioContextOperation aAudioContextOperation, const nsTArray<MediaStream*>& aStreamSet) { // For our purpose, Suspend and Close are equivalent: we want to remove the // streams from the set of streams that are going to be processed. for (MediaStream* stream : aStreamSet) { if (aAudioContextOperation == AudioContextOperation::Resume) { DecrementSuspendCount(stream); } else { IncrementSuspendCount(stream); } } STREAM_LOG(LogLevel::Debug, ("Moving streams between suspended and running" "state: mStreams: %d, mSuspendedStreams: %d\n", mStreams.Length(), mSuspendedStreams.Length())); #ifdef DEBUG // The intersection of the two arrays should be null. for (uint32_t i = 0; i < mStreams.Length(); i++) { for (uint32_t j = 0; j < mSuspendedStreams.Length(); j++) { MOZ_ASSERT( mStreams[i] != mSuspendedStreams[j], "The suspended stream set and running stream set are not disjoint."); } } #endif } void MediaStreamGraphImpl::AudioContextOperationCompleted(MediaStream* aStream, void* aPromise, AudioContextOperation aOperation) { // This can be called from the thread created to do cubeb operation, or the // MSG thread. The pointers passed back here are refcounted, so are still // alive. MonitorAutoLock lock(mMonitor); AudioContextState state; switch (aOperation) { case AudioContextOperation::Suspend: state = AudioContextState::Suspended; break; case AudioContextOperation::Resume: state = AudioContextState::Running; break; case AudioContextOperation::Close: state = AudioContextState::Closed; break; default: MOZ_CRASH("Not handled."); } nsCOMPtr<nsIRunnable> event = new dom::StateChangeTask( aStream->AsAudioNodeStream(), aPromise, state); NS_DispatchToMainThread(event.forget()); } void MediaStreamGraphImpl::ApplyAudioContextOperationImpl( MediaStream* aDestinationStream, const nsTArray<MediaStream*>& aStreams, AudioContextOperation aOperation, void* aPromise) { MOZ_ASSERT(CurrentDriver()->OnThread()); SuspendOrResumeStreams(aOperation, aStreams); bool switching = false; GraphDriver* nextDriver = nullptr; { MonitorAutoLock lock(mMonitor); switching = CurrentDriver()->Switching(); if (switching) { nextDriver = CurrentDriver()->NextDriver(); } } // If we have suspended the last AudioContext, and we don't have other // streams that have audio, this graph will automatically switch to a // SystemCallbackDriver, because it can't find a MediaStream that has an audio // track. When resuming, force switching to an AudioCallbackDriver (if we're // not already switching). It would have happened at the next iteration // anyways, but doing this now save some time. if (aOperation == AudioContextOperation::Resume) { if (!CurrentDriver()->AsAudioCallbackDriver()) { AudioCallbackDriver* driver; if (switching) { MOZ_ASSERT(nextDriver->AsAudioCallbackDriver()); driver = nextDriver->AsAudioCallbackDriver(); } else { driver = new AudioCallbackDriver(this); MonitorAutoLock lock(mMonitor); CurrentDriver()->SwitchAtNextIteration(driver); } driver->EnqueueStreamAndPromiseForOperation(aDestinationStream, aPromise, aOperation); } else { // We are resuming a context, but we are already using an // AudioCallbackDriver, we can resolve the promise now. AudioContextOperationCompleted(aDestinationStream, aPromise, aOperation); } } // Close, suspend: check if we are going to switch to a // SystemAudioCallbackDriver, and pass the promise to the AudioCallbackDriver // if that's the case, so it can notify the content. // This is the same logic as in UpdateStreamOrder, but it's simpler to have it // here as well so we don't have to store the Promise(s) on the Graph. if (aOperation != AudioContextOperation::Resume) { bool shouldAEC = false; bool audioTrackPresent = AudioTrackPresent(shouldAEC); if (!audioTrackPresent && CurrentDriver()->AsAudioCallbackDriver()) { CurrentDriver()->AsAudioCallbackDriver()-> EnqueueStreamAndPromiseForOperation(aDestinationStream, aPromise, aOperation); SystemClockDriver* driver; if (nextDriver) { MOZ_ASSERT(!nextDriver->AsAudioCallbackDriver()); } else { driver = new SystemClockDriver(this); MonitorAutoLock lock(mMonitor); CurrentDriver()->SwitchAtNextIteration(driver); } // We are closing or suspending an AudioContext, but we just got resumed. // Queue the operation on the next driver so that the ordering is // preserved. } else if (!audioTrackPresent && switching) { MOZ_ASSERT(nextDriver->AsAudioCallbackDriver()); nextDriver->AsAudioCallbackDriver()-> EnqueueStreamAndPromiseForOperation(aDestinationStream, aPromise, aOperation); } else { // We are closing or suspending an AudioContext, but something else is // using the audio stream, we can resolve the promise now. AudioContextOperationCompleted(aDestinationStream, aPromise, aOperation); } } } void MediaStreamGraph::ApplyAudioContextOperation(MediaStream* aDestinationStream, const nsTArray<MediaStream*>& aStreams, AudioContextOperation aOperation, void* aPromise) { class AudioContextOperationControlMessage : public ControlMessage { public: AudioContextOperationControlMessage(MediaStream* aDestinationStream, const nsTArray<MediaStream*>& aStreams, AudioContextOperation aOperation, void* aPromise) : ControlMessage(aDestinationStream) , mStreams(aStreams) , mAudioContextOperation(aOperation) , mPromise(aPromise) { } void Run() override { mStream->GraphImpl()->ApplyAudioContextOperationImpl(mStream, mStreams, mAudioContextOperation, mPromise); } void RunDuringShutdown() override { MOZ_ASSERT(false, "We should be reviving the graph?"); } private: // We don't need strong references here for the same reason ControlMessage // doesn't. nsTArray<MediaStream*> mStreams; AudioContextOperation mAudioContextOperation; void* mPromise; }; MediaStreamGraphImpl* graphImpl = static_cast<MediaStreamGraphImpl*>(this); graphImpl->AppendMessage( MakeUnique<AudioContextOperationControlMessage>(aDestinationStream, aStreams, aOperation, aPromise)); } bool MediaStreamGraph::IsNonRealtime() const { const MediaStreamGraphImpl* impl = static_cast<const MediaStreamGraphImpl*>(this); MediaStreamGraphImpl* graph; return !gGraphs.Get(uint32_t(impl->AudioChannel()), &graph) || graph != impl; } void MediaStreamGraph::StartNonRealtimeProcessing(uint32_t aTicksToProcess) { NS_ASSERTION(NS_IsMainThread(), "main thread only"); MediaStreamGraphImpl* graph = static_cast<MediaStreamGraphImpl*>(this); NS_ASSERTION(!graph->mRealtime, "non-realtime only"); if (graph->mNonRealtimeProcessing) return; graph->mEndTime = graph->RoundUpToNextAudioBlock(graph->mStateComputedTime + aTicksToProcess - 1); graph->mNonRealtimeProcessing = true; graph->EnsureRunInStableState(); } void ProcessedMediaStream::AddInput(MediaInputPort* aPort) { mInputs.AppendElement(aPort); GraphImpl()->SetStreamOrderDirty(); } void MediaStreamGraph::RegisterCaptureStreamForWindow( uint64_t aWindowId, ProcessedMediaStream* aCaptureStream) { MOZ_ASSERT(NS_IsMainThread()); MediaStreamGraphImpl* graphImpl = static_cast<MediaStreamGraphImpl*>(this); graphImpl->RegisterCaptureStreamForWindow(aWindowId, aCaptureStream); } void MediaStreamGraphImpl::RegisterCaptureStreamForWindow( uint64_t aWindowId, ProcessedMediaStream* aCaptureStream) { MOZ_ASSERT(NS_IsMainThread()); WindowAndStream winAndStream; winAndStream.mWindowId = aWindowId; winAndStream.mCaptureStreamSink = aCaptureStream; mWindowCaptureStreams.AppendElement(winAndStream); } void MediaStreamGraph::UnregisterCaptureStreamForWindow(uint64_t aWindowId) { MOZ_ASSERT(NS_IsMainThread()); MediaStreamGraphImpl* graphImpl = static_cast<MediaStreamGraphImpl*>(this); graphImpl->UnregisterCaptureStreamForWindow(aWindowId); } void MediaStreamGraphImpl::UnregisterCaptureStreamForWindow(uint64_t aWindowId) { MOZ_ASSERT(NS_IsMainThread()); for (int32_t i = mWindowCaptureStreams.Length() - 1; i >= 0; i--) { if (mWindowCaptureStreams[i].mWindowId == aWindowId) { mWindowCaptureStreams.RemoveElementAt(i); } } } already_AddRefed<MediaInputPort> MediaStreamGraph::ConnectToCaptureStream(uint64_t aWindowId, MediaStream* aMediaStream) { return aMediaStream->GraphImpl()->ConnectToCaptureStream(aWindowId, aMediaStream); } already_AddRefed<MediaInputPort> MediaStreamGraphImpl::ConnectToCaptureStream(uint64_t aWindowId, MediaStream* aMediaStream) { MOZ_ASSERT(NS_IsMainThread()); for (uint32_t i = 0; i < mWindowCaptureStreams.Length(); i++) { if (mWindowCaptureStreams[i].mWindowId == aWindowId) { ProcessedMediaStream* sink = mWindowCaptureStreams[i].mCaptureStreamSink; return sink->AllocateInputPort(aMediaStream); } } return nullptr; } } // namespace mozilla