diff options
Diffstat (limited to 'dom/media/MediaCache.cpp')
-rw-r--r-- | dom/media/MediaCache.cpp | 2472 |
1 files changed, 2472 insertions, 0 deletions
diff --git a/dom/media/MediaCache.cpp b/dom/media/MediaCache.cpp new file mode 100644 index 000000000..37399f851 --- /dev/null +++ b/dom/media/MediaCache.cpp @@ -0,0 +1,2472 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim:set ts=2 sw=2 sts=2 et cindent: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "mozilla/ReentrantMonitor.h" + +#include "MediaCache.h" +#include "prio.h" +#include "nsContentUtils.h" +#include "nsThreadUtils.h" +#include "MediaResource.h" +#include "mozilla/Logging.h" +#include "mozilla/Preferences.h" +#include "FileBlockCache.h" +#include "nsAnonymousTemporaryFile.h" +#include "nsIObserverService.h" +#include "nsISeekableStream.h" +#include "nsIPrincipal.h" +#include "mozilla/Attributes.h" +#include "mozilla/Services.h" +#include <algorithm> + +namespace mozilla { + +LazyLogModule gMediaCacheLog("MediaCache"); +#define CACHE_LOG(type, msg) MOZ_LOG(gMediaCacheLog, type, msg) + +// Readahead blocks for non-seekable streams will be limited to this +// fraction of the cache space. We don't normally evict such blocks +// because replacing them requires a seek, but we need to make sure +// they don't monopolize the cache. +static const double NONSEEKABLE_READAHEAD_MAX = 0.5; + +// Data N seconds before the current playback position is given the same priority +// as data REPLAY_PENALTY_FACTOR*N seconds ahead of the current playback +// position. REPLAY_PENALTY_FACTOR is greater than 1 to reflect that +// data in the past is less likely to be played again than data in the future. +// We want to give data just behind the current playback position reasonably +// high priority in case codecs need to retrieve that data (e.g. because +// tracks haven't been muxed well or are being decoded at uneven rates). +// 1/REPLAY_PENALTY_FACTOR as much data will be kept behind the +// current playback position as will be kept ahead of the current playback +// position. +static const uint32_t REPLAY_PENALTY_FACTOR = 3; + +// When looking for a reusable block, scan forward this many blocks +// from the desired "best" block location to look for free blocks, +// before we resort to scanning the whole cache. The idea is to try to +// store runs of stream blocks close-to-consecutively in the cache if we +// can. +static const uint32_t FREE_BLOCK_SCAN_LIMIT = 16; + +#ifdef DEBUG +// Turn this on to do very expensive cache state validation +// #define DEBUG_VERIFY_CACHE +#endif + +// There is at most one media cache (although that could quite easily be +// relaxed if we wanted to manage multiple caches with independent +// size limits). +static MediaCache* gMediaCache; + +class MediaCacheFlusher final : public nsIObserver, + public nsSupportsWeakReference +{ + MediaCacheFlusher() {} + ~MediaCacheFlusher(); +public: + NS_DECL_ISUPPORTS + NS_DECL_NSIOBSERVER + + static void Init(); +}; + +static MediaCacheFlusher* gMediaCacheFlusher; + +NS_IMPL_ISUPPORTS(MediaCacheFlusher, nsIObserver, nsISupportsWeakReference) + +MediaCacheFlusher::~MediaCacheFlusher() +{ + gMediaCacheFlusher = nullptr; +} + +void MediaCacheFlusher::Init() +{ + if (gMediaCacheFlusher) { + return; + } + + gMediaCacheFlusher = new MediaCacheFlusher(); + NS_ADDREF(gMediaCacheFlusher); + + nsCOMPtr<nsIObserverService> observerService = + mozilla::services::GetObserverService(); + if (observerService) { + observerService->AddObserver(gMediaCacheFlusher, "last-pb-context-exited", true); + observerService->AddObserver(gMediaCacheFlusher, "cacheservice:empty-cache", true); + } +} + +class MediaCache { +public: + friend class MediaCacheStream::BlockList; + typedef MediaCacheStream::BlockList BlockList; + static const int64_t BLOCK_SIZE = MediaCacheStream::BLOCK_SIZE; + + MediaCache() : mNextResourceID(1), + mReentrantMonitor("MediaCache.mReentrantMonitor"), + mUpdateQueued(false) +#ifdef DEBUG + , mInUpdate(false) +#endif + { + MOZ_COUNT_CTOR(MediaCache); + } + ~MediaCache() { + NS_ASSERTION(mStreams.IsEmpty(), "Stream(s) still open!"); + Truncate(); + NS_ASSERTION(mIndex.Length() == 0, "Blocks leaked?"); + if (mFileCache) { + mFileCache->Close(); + mFileCache = nullptr; + } + MOZ_COUNT_DTOR(MediaCache); + } + + // Main thread only. Creates the backing cache file. If this fails, + // then the cache is still in a semi-valid state; mFD will be null, + // so all I/O on the cache file will fail. + nsresult Init(); + // Shut down the global cache if it's no longer needed. We shut down + // the cache as soon as there are no streams. This means that during + // normal operation we are likely to start up the cache and shut it down + // many times, but that's OK since starting it up is cheap and + // shutting it down cleans things up and releases disk space. + static void MaybeShutdown(); + + // Brutally flush the cache contents. Main thread only. + static void Flush(); + void FlushInternal(); + + // Cache-file access methods. These are the lowest-level cache methods. + // mReentrantMonitor must be held; these can be called on any thread. + // This can return partial reads. + nsresult ReadCacheFile(int64_t aOffset, void* aData, int32_t aLength, + int32_t* aBytes); + // This will fail if all aLength bytes are not read + nsresult ReadCacheFileAllBytes(int64_t aOffset, void* aData, int32_t aLength); + + int64_t AllocateResourceID() + { + mReentrantMonitor.AssertCurrentThreadIn(); + return mNextResourceID++; + } + + // mReentrantMonitor must be held, called on main thread. + // These methods are used by the stream to set up and tear down streams, + // and to handle reads and writes. + // Add aStream to the list of streams. + void OpenStream(MediaCacheStream* aStream); + // Remove aStream from the list of streams. + void ReleaseStream(MediaCacheStream* aStream); + // Free all blocks belonging to aStream. + void ReleaseStreamBlocks(MediaCacheStream* aStream); + // Find a cache entry for this data, and write the data into it + void AllocateAndWriteBlock(MediaCacheStream* aStream, const void* aData, + MediaCacheStream::ReadMode aMode); + + // mReentrantMonitor must be held; can be called on any thread + // Notify the cache that a seek has been requested. Some blocks may + // need to change their class between PLAYED_BLOCK and READAHEAD_BLOCK. + // This does not trigger channel seeks directly, the next Update() + // will do that if necessary. The caller will call QueueUpdate(). + void NoteSeek(MediaCacheStream* aStream, int64_t aOldOffset); + // Notify the cache that a block has been read from. This is used + // to update last-use times. The block may not actually have a + // cache entry yet since Read can read data from a stream's + // in-memory mPartialBlockBuffer while the block is only partly full, + // and thus hasn't yet been committed to the cache. The caller will + // call QueueUpdate(). + void NoteBlockUsage(MediaCacheStream* aStream, int32_t aBlockIndex, + int64_t aStreamOffset, + MediaCacheStream::ReadMode aMode, TimeStamp aNow); + // Mark aStream as having the block, adding it as an owner. + void AddBlockOwnerAsReadahead(int32_t aBlockIndex, MediaCacheStream* aStream, + int32_t aStreamBlockIndex); + + // This queues a call to Update() on the main thread. + void QueueUpdate(); + + // Notify all streams for the resource ID that the suspended status changed + // at the end of MediaCache::Update. + void QueueSuspendedStatusUpdate(int64_t aResourceID); + + // Updates the cache state asynchronously on the main thread: + // -- try to trim the cache back to its desired size, if necessary + // -- suspend channels that are going to read data that's lower priority + // than anything currently cached + // -- resume channels that are going to read data that's higher priority + // than something currently cached + // -- seek channels that need to seek to a new location + void Update(); + +#ifdef DEBUG_VERIFY_CACHE + // Verify invariants, especially block list invariants + void Verify(); +#else + void Verify() {} +#endif + + ReentrantMonitor& GetReentrantMonitor() { return mReentrantMonitor; } + + /** + * An iterator that makes it easy to iterate through all streams that + * have a given resource ID and are not closed. + * Can be used on the main thread or while holding the media cache lock. + */ + class ResourceStreamIterator { + public: + explicit ResourceStreamIterator(int64_t aResourceID) : + mResourceID(aResourceID), mNext(0) {} + MediaCacheStream* Next() + { + while (mNext < gMediaCache->mStreams.Length()) { + MediaCacheStream* stream = gMediaCache->mStreams[mNext]; + ++mNext; + if (stream->GetResourceID() == mResourceID && !stream->IsClosed()) + return stream; + } + return nullptr; + } + private: + int64_t mResourceID; + uint32_t mNext; + }; + +protected: + // Find a free or reusable block and return its index. If there are no + // free blocks and no reusable blocks, add a new block to the cache + // and return it. Can return -1 on OOM. + int32_t FindBlockForIncomingData(TimeStamp aNow, MediaCacheStream* aStream); + // Find a reusable block --- a free block, if there is one, otherwise + // the reusable block with the latest predicted-next-use, or -1 if + // there aren't any freeable blocks. Only block indices less than + // aMaxSearchBlockIndex are considered. If aForStream is non-null, + // then aForStream and aForStreamBlock indicate what media data will + // be placed; FindReusableBlock will favour returning free blocks + // near other blocks for that point in the stream. + int32_t FindReusableBlock(TimeStamp aNow, + MediaCacheStream* aForStream, + int32_t aForStreamBlock, + int32_t aMaxSearchBlockIndex); + bool BlockIsReusable(int32_t aBlockIndex); + // Given a list of blocks sorted with the most reusable blocks at the + // end, find the last block whose stream is not pinned (if any) + // and whose cache entry index is less than aBlockIndexLimit + // and append it to aResult. + void AppendMostReusableBlock(BlockList* aBlockList, + nsTArray<uint32_t>* aResult, + int32_t aBlockIndexLimit); + + enum BlockClass { + // block belongs to mMetadataBlockList because data has been consumed + // from it in "metadata mode" --- in particular blocks read during + // Ogg seeks go into this class. These blocks may have played data + // in them too. + METADATA_BLOCK, + // block belongs to mPlayedBlockList because its offset is + // less than the stream's current reader position + PLAYED_BLOCK, + // block belongs to the stream's mReadaheadBlockList because its + // offset is greater than or equal to the stream's current + // reader position + READAHEAD_BLOCK + }; + + struct BlockOwner { + constexpr BlockOwner() {} + + // The stream that owns this block, or null if the block is free. + MediaCacheStream* mStream = nullptr; + // The block index in the stream. Valid only if mStream is non-null. + // Initialized to an insane value to highlight misuse. + uint32_t mStreamBlock = UINT32_MAX; + // Time at which this block was last used. Valid only if + // mClass is METADATA_BLOCK or PLAYED_BLOCK. + TimeStamp mLastUseTime; + BlockClass mClass = READAHEAD_BLOCK; + }; + + struct Block { + // Free blocks have an empty mOwners array + nsTArray<BlockOwner> mOwners; + }; + + // Get the BlockList that the block should belong to given its + // current owner + BlockList* GetListForBlock(BlockOwner* aBlock); + // Get the BlockOwner for the given block index and owning stream + // (returns null if the stream does not own the block) + BlockOwner* GetBlockOwner(int32_t aBlockIndex, MediaCacheStream* aStream); + // Returns true iff the block is free + bool IsBlockFree(int32_t aBlockIndex) + { return mIndex[aBlockIndex].mOwners.IsEmpty(); } + // Add the block to the free list and mark its streams as not having + // the block in cache + void FreeBlock(int32_t aBlock); + // Mark aStream as not having the block, removing it as an owner. If + // the block has no more owners it's added to the free list. + void RemoveBlockOwner(int32_t aBlockIndex, MediaCacheStream* aStream); + // Swap all metadata associated with the two blocks. The caller + // is responsible for swapping up any cache file state. + void SwapBlocks(int32_t aBlockIndex1, int32_t aBlockIndex2); + // Insert the block into the readahead block list for the stream + // at the right point in the list. + void InsertReadaheadBlock(BlockOwner* aBlockOwner, int32_t aBlockIndex); + + // Guess the duration until block aBlock will be next used + TimeDuration PredictNextUse(TimeStamp aNow, int32_t aBlock); + // Guess the duration until the next incoming data on aStream will be used + TimeDuration PredictNextUseForIncomingData(MediaCacheStream* aStream); + + // Truncate the file and index array if there are free blocks at the + // end + void Truncate(); + + // This member is main-thread only. It's used to allocate unique + // resource IDs to streams. + int64_t mNextResourceID; + + // The monitor protects all the data members here. Also, off-main-thread + // readers that need to block will Wait() on this monitor. When new + // data becomes available in the cache, we NotifyAll() on this monitor. + ReentrantMonitor mReentrantMonitor; + // This is only written while on the main thread and the monitor is held. + // Thus, it can be safely read from the main thread or while holding the monitor. + nsTArray<MediaCacheStream*> mStreams; + // The Blocks describing the cache entries. + nsTArray<Block> mIndex; + // Writer which performs IO, asynchronously writing cache blocks. + RefPtr<FileBlockCache> mFileCache; + // The list of free blocks; they are not ordered. + BlockList mFreeBlocks; + // True if an event to run Update() has been queued but not processed + bool mUpdateQueued; +#ifdef DEBUG + bool mInUpdate; +#endif + // A list of resource IDs to notify about the change in suspended status. + nsTArray<int64_t> mSuspendedStatusToNotify; +}; + +NS_IMETHODIMP +MediaCacheFlusher::Observe(nsISupports *aSubject, char const *aTopic, char16_t const *aData) +{ + if (strcmp(aTopic, "last-pb-context-exited") == 0) { + MediaCache::Flush(); + } + if (strcmp(aTopic, "cacheservice:empty-cache") == 0) { + MediaCache::Flush(); + } + return NS_OK; +} + +MediaCacheStream::MediaCacheStream(ChannelMediaResource* aClient) + : mClient(aClient), + mInitialized(false), + mHasHadUpdate(false), + mClosed(false), + mDidNotifyDataEnded(false), + mResourceID(0), + mIsTransportSeekable(false), + mCacheSuspended(false), + mChannelEnded(false), + mChannelOffset(0), + mStreamLength(-1), + mStreamOffset(0), + mPlaybackBytesPerSecond(10000), + mPinCount(0), + mCurrentMode(MODE_PLAYBACK), + mMetadataInPartialBlockBuffer(false), + mPartialBlockBuffer(MakeUnique<int64_t[]>(BLOCK_SIZE/sizeof(int64_t))) +{ +} + +size_t MediaCacheStream::SizeOfExcludingThis( + MallocSizeOf aMallocSizeOf) const +{ + // Looks like these are not owned: + // - mClient + // - mPrincipal + size_t size = mBlocks.ShallowSizeOfExcludingThis(aMallocSizeOf); + size += mReadaheadBlocks.SizeOfExcludingThis(aMallocSizeOf); + size += mMetadataBlocks.SizeOfExcludingThis(aMallocSizeOf); + size += mPlayedBlocks.SizeOfExcludingThis(aMallocSizeOf); + size += aMallocSizeOf(mPartialBlockBuffer.get()); + + return size; +} + +size_t MediaCacheStream::BlockList::SizeOfExcludingThis( + MallocSizeOf aMallocSizeOf) const +{ + return mEntries.ShallowSizeOfExcludingThis(aMallocSizeOf); +} + +void MediaCacheStream::BlockList::AddFirstBlock(int32_t aBlock) +{ + NS_ASSERTION(!mEntries.GetEntry(aBlock), "Block already in list"); + Entry* entry = mEntries.PutEntry(aBlock); + + if (mFirstBlock < 0) { + entry->mNextBlock = entry->mPrevBlock = aBlock; + } else { + entry->mNextBlock = mFirstBlock; + entry->mPrevBlock = mEntries.GetEntry(mFirstBlock)->mPrevBlock; + mEntries.GetEntry(entry->mNextBlock)->mPrevBlock = aBlock; + mEntries.GetEntry(entry->mPrevBlock)->mNextBlock = aBlock; + } + mFirstBlock = aBlock; + ++mCount; +} + +void MediaCacheStream::BlockList::AddAfter(int32_t aBlock, int32_t aBefore) +{ + NS_ASSERTION(!mEntries.GetEntry(aBlock), "Block already in list"); + Entry* entry = mEntries.PutEntry(aBlock); + + Entry* addAfter = mEntries.GetEntry(aBefore); + NS_ASSERTION(addAfter, "aBefore not in list"); + + entry->mNextBlock = addAfter->mNextBlock; + entry->mPrevBlock = aBefore; + mEntries.GetEntry(entry->mNextBlock)->mPrevBlock = aBlock; + mEntries.GetEntry(entry->mPrevBlock)->mNextBlock = aBlock; + ++mCount; +} + +void MediaCacheStream::BlockList::RemoveBlock(int32_t aBlock) +{ + Entry* entry = mEntries.GetEntry(aBlock); + NS_ASSERTION(entry, "Block not in list"); + + if (entry->mNextBlock == aBlock) { + NS_ASSERTION(entry->mPrevBlock == aBlock, "Linked list inconsistency"); + NS_ASSERTION(mFirstBlock == aBlock, "Linked list inconsistency"); + mFirstBlock = -1; + } else { + if (mFirstBlock == aBlock) { + mFirstBlock = entry->mNextBlock; + } + mEntries.GetEntry(entry->mNextBlock)->mPrevBlock = entry->mPrevBlock; + mEntries.GetEntry(entry->mPrevBlock)->mNextBlock = entry->mNextBlock; + } + mEntries.RemoveEntry(entry); + --mCount; +} + +int32_t MediaCacheStream::BlockList::GetLastBlock() const +{ + if (mFirstBlock < 0) + return -1; + return mEntries.GetEntry(mFirstBlock)->mPrevBlock; +} + +int32_t MediaCacheStream::BlockList::GetNextBlock(int32_t aBlock) const +{ + int32_t block = mEntries.GetEntry(aBlock)->mNextBlock; + if (block == mFirstBlock) + return -1; + return block; +} + +int32_t MediaCacheStream::BlockList::GetPrevBlock(int32_t aBlock) const +{ + if (aBlock == mFirstBlock) + return -1; + return mEntries.GetEntry(aBlock)->mPrevBlock; +} + +#ifdef DEBUG +void MediaCacheStream::BlockList::Verify() +{ + int32_t count = 0; + if (mFirstBlock >= 0) { + int32_t block = mFirstBlock; + do { + Entry* entry = mEntries.GetEntry(block); + NS_ASSERTION(mEntries.GetEntry(entry->mNextBlock)->mPrevBlock == block, + "Bad prev link"); + NS_ASSERTION(mEntries.GetEntry(entry->mPrevBlock)->mNextBlock == block, + "Bad next link"); + block = entry->mNextBlock; + ++count; + } while (block != mFirstBlock); + } + NS_ASSERTION(count == mCount, "Bad count"); +} +#endif + +static void UpdateSwappedBlockIndex(int32_t* aBlockIndex, + int32_t aBlock1Index, int32_t aBlock2Index) +{ + int32_t index = *aBlockIndex; + if (index == aBlock1Index) { + *aBlockIndex = aBlock2Index; + } else if (index == aBlock2Index) { + *aBlockIndex = aBlock1Index; + } +} + +void +MediaCacheStream::BlockList::NotifyBlockSwapped(int32_t aBlockIndex1, + int32_t aBlockIndex2) +{ + Entry* e1 = mEntries.GetEntry(aBlockIndex1); + Entry* e2 = mEntries.GetEntry(aBlockIndex2); + int32_t e1Prev = -1, e1Next = -1, e2Prev = -1, e2Next = -1; + + // Fix mFirstBlock + UpdateSwappedBlockIndex(&mFirstBlock, aBlockIndex1, aBlockIndex2); + + // Fix mNextBlock/mPrevBlock links. First capture previous/next links + // so we don't get confused due to aliasing. + if (e1) { + e1Prev = e1->mPrevBlock; + e1Next = e1->mNextBlock; + } + if (e2) { + e2Prev = e2->mPrevBlock; + e2Next = e2->mNextBlock; + } + // Update the entries. + if (e1) { + mEntries.GetEntry(e1Prev)->mNextBlock = aBlockIndex2; + mEntries.GetEntry(e1Next)->mPrevBlock = aBlockIndex2; + } + if (e2) { + mEntries.GetEntry(e2Prev)->mNextBlock = aBlockIndex1; + mEntries.GetEntry(e2Next)->mPrevBlock = aBlockIndex1; + } + + // Fix hashtable keys. First remove stale entries. + if (e1) { + e1Prev = e1->mPrevBlock; + e1Next = e1->mNextBlock; + mEntries.RemoveEntry(aBlockIndex1); + // Refresh pointer after hashtable mutation. + e2 = mEntries.GetEntry(aBlockIndex2); + } + if (e2) { + e2Prev = e2->mPrevBlock; + e2Next = e2->mNextBlock; + mEntries.RemoveEntry(aBlockIndex2); + } + // Put new entries back. + if (e1) { + e1 = mEntries.PutEntry(aBlockIndex2); + e1->mNextBlock = e1Next; + e1->mPrevBlock = e1Prev; + } + if (e2) { + e2 = mEntries.PutEntry(aBlockIndex1); + e2->mNextBlock = e2Next; + e2->mPrevBlock = e2Prev; + } +} + +nsresult +MediaCache::Init() +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + NS_ASSERTION(!mFileCache, "Cache file already open?"); + + PRFileDesc* fileDesc = nullptr; + nsresult rv = NS_OpenAnonymousTemporaryFile(&fileDesc); + NS_ENSURE_SUCCESS(rv,rv); + + mFileCache = new FileBlockCache(); + rv = mFileCache->Open(fileDesc); + NS_ENSURE_SUCCESS(rv,rv); + + MediaCacheFlusher::Init(); + + return NS_OK; +} + +void +MediaCache::Flush() +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + if (!gMediaCache) + return; + + gMediaCache->FlushInternal(); +} + +void +MediaCache::FlushInternal() +{ + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + for (uint32_t blockIndex = 0; blockIndex < mIndex.Length(); ++blockIndex) { + FreeBlock(blockIndex); + } + + // Truncate file, close it, and reopen + Truncate(); + NS_ASSERTION(mIndex.Length() == 0, "Blocks leaked?"); + if (mFileCache) { + mFileCache->Close(); + mFileCache = nullptr; + } + Init(); +} + +void +MediaCache::MaybeShutdown() +{ + NS_ASSERTION(NS_IsMainThread(), + "MediaCache::MaybeShutdown called on non-main thread"); + if (!gMediaCache->mStreams.IsEmpty()) { + // Don't shut down yet, streams are still alive + return; + } + + // Since we're on the main thread, no-one is going to add a new stream + // while we shut down. + // This function is static so we don't have to delete 'this'. + delete gMediaCache; + gMediaCache = nullptr; + NS_IF_RELEASE(gMediaCacheFlusher); +} + +static void +InitMediaCache() +{ + if (gMediaCache) + return; + + gMediaCache = new MediaCache(); + nsresult rv = gMediaCache->Init(); + if (NS_FAILED(rv)) { + delete gMediaCache; + gMediaCache = nullptr; + } +} + +nsresult +MediaCache::ReadCacheFile(int64_t aOffset, void* aData, int32_t aLength, + int32_t* aBytes) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + if (!mFileCache) + return NS_ERROR_FAILURE; + + return mFileCache->Read(aOffset, reinterpret_cast<uint8_t*>(aData), aLength, aBytes); +} + +nsresult +MediaCache::ReadCacheFileAllBytes(int64_t aOffset, void* aData, int32_t aLength) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + int64_t offset = aOffset; + int32_t count = aLength; + // Cast to char* so we can do byte-wise pointer arithmetic + char* data = static_cast<char*>(aData); + while (count > 0) { + int32_t bytes; + nsresult rv = ReadCacheFile(offset, data, count, &bytes); + if (NS_FAILED(rv)) + return rv; + if (bytes == 0) + return NS_ERROR_FAILURE; + count -= bytes; + data += bytes; + offset += bytes; + } + return NS_OK; +} + +static int32_t GetMaxBlocks() +{ + // We look up the cache size every time. This means dynamic changes + // to the pref are applied. + // Cache size is in KB + int32_t cacheSize = Preferences::GetInt("media.cache_size", 500*1024); + int64_t maxBlocks = static_cast<int64_t>(cacheSize)*1024/MediaCache::BLOCK_SIZE; + maxBlocks = std::max<int64_t>(maxBlocks, 1); + return int32_t(std::min<int64_t>(maxBlocks, INT32_MAX)); +} + +int32_t +MediaCache::FindBlockForIncomingData(TimeStamp aNow, + MediaCacheStream* aStream) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + int32_t blockIndex = FindReusableBlock(aNow, aStream, + aStream->mChannelOffset/BLOCK_SIZE, INT32_MAX); + + if (blockIndex < 0 || !IsBlockFree(blockIndex)) { + // The block returned is already allocated. + // Don't reuse it if a) there's room to expand the cache or + // b) the data we're going to store in the free block is not higher + // priority than the data already stored in the free block. + // The latter can lead us to go over the cache limit a bit. + if ((mIndex.Length() < uint32_t(GetMaxBlocks()) || blockIndex < 0 || + PredictNextUseForIncomingData(aStream) >= PredictNextUse(aNow, blockIndex))) { + blockIndex = mIndex.Length(); + if (!mIndex.AppendElement()) + return -1; + mFreeBlocks.AddFirstBlock(blockIndex); + return blockIndex; + } + } + + return blockIndex; +} + +bool +MediaCache::BlockIsReusable(int32_t aBlockIndex) +{ + Block* block = &mIndex[aBlockIndex]; + for (uint32_t i = 0; i < block->mOwners.Length(); ++i) { + MediaCacheStream* stream = block->mOwners[i].mStream; + if (stream->mPinCount > 0 || + stream->mStreamOffset/BLOCK_SIZE == block->mOwners[i].mStreamBlock) { + return false; + } + } + return true; +} + +void +MediaCache::AppendMostReusableBlock(BlockList* aBlockList, + nsTArray<uint32_t>* aResult, + int32_t aBlockIndexLimit) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + int32_t blockIndex = aBlockList->GetLastBlock(); + if (blockIndex < 0) + return; + do { + // Don't consider blocks for pinned streams, or blocks that are + // beyond the specified limit, or a block that contains a stream's + // current read position (such a block contains both played data + // and readahead data) + if (blockIndex < aBlockIndexLimit && BlockIsReusable(blockIndex)) { + aResult->AppendElement(blockIndex); + return; + } + blockIndex = aBlockList->GetPrevBlock(blockIndex); + } while (blockIndex >= 0); +} + +int32_t +MediaCache::FindReusableBlock(TimeStamp aNow, + MediaCacheStream* aForStream, + int32_t aForStreamBlock, + int32_t aMaxSearchBlockIndex) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + uint32_t length = std::min(uint32_t(aMaxSearchBlockIndex), uint32_t(mIndex.Length())); + + if (aForStream && aForStreamBlock > 0 && + uint32_t(aForStreamBlock) <= aForStream->mBlocks.Length()) { + int32_t prevCacheBlock = aForStream->mBlocks[aForStreamBlock - 1]; + if (prevCacheBlock >= 0) { + uint32_t freeBlockScanEnd = + std::min(length, prevCacheBlock + FREE_BLOCK_SCAN_LIMIT); + for (uint32_t i = prevCacheBlock; i < freeBlockScanEnd; ++i) { + if (IsBlockFree(i)) + return i; + } + } + } + + if (!mFreeBlocks.IsEmpty()) { + int32_t blockIndex = mFreeBlocks.GetFirstBlock(); + do { + if (blockIndex < aMaxSearchBlockIndex) + return blockIndex; + blockIndex = mFreeBlocks.GetNextBlock(blockIndex); + } while (blockIndex >= 0); + } + + // Build a list of the blocks we should consider for the "latest + // predicted time of next use". We can exploit the fact that the block + // linked lists are ordered by increasing time of next use. This is + // actually the whole point of having the linked lists. + AutoTArray<uint32_t,8> candidates; + for (uint32_t i = 0; i < mStreams.Length(); ++i) { + MediaCacheStream* stream = mStreams[i]; + if (stream->mPinCount > 0) { + // No point in even looking at this stream's blocks + continue; + } + + AppendMostReusableBlock(&stream->mMetadataBlocks, &candidates, length); + AppendMostReusableBlock(&stream->mPlayedBlocks, &candidates, length); + + // Don't consider readahead blocks in non-seekable streams. If we + // remove the block we won't be able to seek back to read it later. + if (stream->mIsTransportSeekable) { + AppendMostReusableBlock(&stream->mReadaheadBlocks, &candidates, length); + } + } + + TimeDuration latestUse; + int32_t latestUseBlock = -1; + for (uint32_t i = 0; i < candidates.Length(); ++i) { + TimeDuration nextUse = PredictNextUse(aNow, candidates[i]); + if (nextUse > latestUse) { + latestUse = nextUse; + latestUseBlock = candidates[i]; + } + } + + return latestUseBlock; +} + +MediaCache::BlockList* +MediaCache::GetListForBlock(BlockOwner* aBlock) +{ + switch (aBlock->mClass) { + case METADATA_BLOCK: + NS_ASSERTION(aBlock->mStream, "Metadata block has no stream?"); + return &aBlock->mStream->mMetadataBlocks; + case PLAYED_BLOCK: + NS_ASSERTION(aBlock->mStream, "Metadata block has no stream?"); + return &aBlock->mStream->mPlayedBlocks; + case READAHEAD_BLOCK: + NS_ASSERTION(aBlock->mStream, "Readahead block has no stream?"); + return &aBlock->mStream->mReadaheadBlocks; + default: + NS_ERROR("Invalid block class"); + return nullptr; + } +} + +MediaCache::BlockOwner* +MediaCache::GetBlockOwner(int32_t aBlockIndex, MediaCacheStream* aStream) +{ + Block* block = &mIndex[aBlockIndex]; + for (uint32_t i = 0; i < block->mOwners.Length(); ++i) { + if (block->mOwners[i].mStream == aStream) + return &block->mOwners[i]; + } + return nullptr; +} + +void +MediaCache::SwapBlocks(int32_t aBlockIndex1, int32_t aBlockIndex2) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + Block* block1 = &mIndex[aBlockIndex1]; + Block* block2 = &mIndex[aBlockIndex2]; + + block1->mOwners.SwapElements(block2->mOwners); + + // Now all references to block1 have to be replaced with block2 and + // vice versa. + // First update stream references to blocks via mBlocks. + const Block* blocks[] = { block1, block2 }; + int32_t blockIndices[] = { aBlockIndex1, aBlockIndex2 }; + for (int32_t i = 0; i < 2; ++i) { + for (uint32_t j = 0; j < blocks[i]->mOwners.Length(); ++j) { + const BlockOwner* b = &blocks[i]->mOwners[j]; + b->mStream->mBlocks[b->mStreamBlock] = blockIndices[i]; + } + } + + // Now update references to blocks in block lists. + mFreeBlocks.NotifyBlockSwapped(aBlockIndex1, aBlockIndex2); + + nsTHashtable<nsPtrHashKey<MediaCacheStream> > visitedStreams; + + for (int32_t i = 0; i < 2; ++i) { + for (uint32_t j = 0; j < blocks[i]->mOwners.Length(); ++j) { + MediaCacheStream* stream = blocks[i]->mOwners[j].mStream; + // Make sure that we don't update the same stream twice --- that + // would result in swapping the block references back again! + if (visitedStreams.GetEntry(stream)) + continue; + visitedStreams.PutEntry(stream); + stream->mReadaheadBlocks.NotifyBlockSwapped(aBlockIndex1, aBlockIndex2); + stream->mPlayedBlocks.NotifyBlockSwapped(aBlockIndex1, aBlockIndex2); + stream->mMetadataBlocks.NotifyBlockSwapped(aBlockIndex1, aBlockIndex2); + } + } + + Verify(); +} + +void +MediaCache::RemoveBlockOwner(int32_t aBlockIndex, MediaCacheStream* aStream) +{ + Block* block = &mIndex[aBlockIndex]; + for (uint32_t i = 0; i < block->mOwners.Length(); ++i) { + BlockOwner* bo = &block->mOwners[i]; + if (bo->mStream == aStream) { + GetListForBlock(bo)->RemoveBlock(aBlockIndex); + bo->mStream->mBlocks[bo->mStreamBlock] = -1; + block->mOwners.RemoveElementAt(i); + if (block->mOwners.IsEmpty()) { + mFreeBlocks.AddFirstBlock(aBlockIndex); + } + return; + } + } +} + +void +MediaCache::AddBlockOwnerAsReadahead(int32_t aBlockIndex, + MediaCacheStream* aStream, + int32_t aStreamBlockIndex) +{ + Block* block = &mIndex[aBlockIndex]; + if (block->mOwners.IsEmpty()) { + mFreeBlocks.RemoveBlock(aBlockIndex); + } + BlockOwner* bo = block->mOwners.AppendElement(); + bo->mStream = aStream; + bo->mStreamBlock = aStreamBlockIndex; + aStream->mBlocks[aStreamBlockIndex] = aBlockIndex; + bo->mClass = READAHEAD_BLOCK; + InsertReadaheadBlock(bo, aBlockIndex); +} + +void +MediaCache::FreeBlock(int32_t aBlock) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + Block* block = &mIndex[aBlock]; + if (block->mOwners.IsEmpty()) { + // already free + return; + } + + CACHE_LOG(LogLevel::Debug, ("Released block %d", aBlock)); + + for (uint32_t i = 0; i < block->mOwners.Length(); ++i) { + BlockOwner* bo = &block->mOwners[i]; + GetListForBlock(bo)->RemoveBlock(aBlock); + bo->mStream->mBlocks[bo->mStreamBlock] = -1; + } + block->mOwners.Clear(); + mFreeBlocks.AddFirstBlock(aBlock); + Verify(); +} + +TimeDuration +MediaCache::PredictNextUse(TimeStamp aNow, int32_t aBlock) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + NS_ASSERTION(!IsBlockFree(aBlock), "aBlock is free"); + + Block* block = &mIndex[aBlock]; + // Blocks can be belong to multiple streams. The predicted next use + // time is the earliest time predicted by any of the streams. + TimeDuration result; + for (uint32_t i = 0; i < block->mOwners.Length(); ++i) { + BlockOwner* bo = &block->mOwners[i]; + TimeDuration prediction; + switch (bo->mClass) { + case METADATA_BLOCK: + // This block should be managed in LRU mode. For metadata we predict + // that the time until the next use is the time since the last use. + prediction = aNow - bo->mLastUseTime; + break; + case PLAYED_BLOCK: { + // This block should be managed in LRU mode, and we should impose + // a "replay delay" to reflect the likelihood of replay happening + NS_ASSERTION(static_cast<int64_t>(bo->mStreamBlock)*BLOCK_SIZE < + bo->mStream->mStreamOffset, + "Played block after the current stream position?"); + int64_t bytesBehind = + bo->mStream->mStreamOffset - static_cast<int64_t>(bo->mStreamBlock)*BLOCK_SIZE; + int64_t millisecondsBehind = + bytesBehind*1000/bo->mStream->mPlaybackBytesPerSecond; + prediction = TimeDuration::FromMilliseconds( + std::min<int64_t>(millisecondsBehind*REPLAY_PENALTY_FACTOR, INT32_MAX)); + break; + } + case READAHEAD_BLOCK: { + int64_t bytesAhead = + static_cast<int64_t>(bo->mStreamBlock)*BLOCK_SIZE - bo->mStream->mStreamOffset; + NS_ASSERTION(bytesAhead >= 0, + "Readahead block before the current stream position?"); + int64_t millisecondsAhead = + bytesAhead*1000/bo->mStream->mPlaybackBytesPerSecond; + prediction = TimeDuration::FromMilliseconds( + std::min<int64_t>(millisecondsAhead, INT32_MAX)); + break; + } + default: + NS_ERROR("Invalid class for predicting next use"); + return TimeDuration(0); + } + if (i == 0 || prediction < result) { + result = prediction; + } + } + return result; +} + +TimeDuration +MediaCache::PredictNextUseForIncomingData(MediaCacheStream* aStream) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + int64_t bytesAhead = aStream->mChannelOffset - aStream->mStreamOffset; + if (bytesAhead <= -BLOCK_SIZE) { + // Hmm, no idea when data behind us will be used. Guess 24 hours. + return TimeDuration::FromSeconds(24*60*60); + } + if (bytesAhead <= 0) + return TimeDuration(0); + int64_t millisecondsAhead = bytesAhead*1000/aStream->mPlaybackBytesPerSecond; + return TimeDuration::FromMilliseconds( + std::min<int64_t>(millisecondsAhead, INT32_MAX)); +} + +enum StreamAction { NONE, SEEK, SEEK_AND_RESUME, RESUME, SUSPEND }; + +void +MediaCache::Update() +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + // The action to use for each stream. We store these so we can make + // decisions while holding the cache lock but implement those decisions + // without holding the cache lock, since we need to call out to + // stream, decoder and element code. + AutoTArray<StreamAction,10> actions; + + { + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + mUpdateQueued = false; +#ifdef DEBUG + mInUpdate = true; +#endif + + int32_t maxBlocks = GetMaxBlocks(); + TimeStamp now = TimeStamp::Now(); + + int32_t freeBlockCount = mFreeBlocks.GetCount(); + TimeDuration latestPredictedUseForOverflow = 0; + if (mIndex.Length() > uint32_t(maxBlocks)) { + // Try to trim back the cache to its desired maximum size. The cache may + // have overflowed simply due to data being received when we have + // no blocks in the main part of the cache that are free or lower + // priority than the new data. The cache can also be overflowing because + // the media.cache_size preference was reduced. + // First, figure out what the least valuable block in the cache overflow + // is. We don't want to replace any blocks in the main part of the + // cache whose expected time of next use is earlier or equal to that. + // If we allow that, we can effectively end up discarding overflowing + // blocks (by moving an overflowing block to the main part of the cache, + // and then overwriting it with another overflowing block), and we try + // to avoid that since it requires HTTP seeks. + // We also use this loop to eliminate overflowing blocks from + // freeBlockCount. + for (int32_t blockIndex = mIndex.Length() - 1; blockIndex >= maxBlocks; + --blockIndex) { + if (IsBlockFree(blockIndex)) { + // Don't count overflowing free blocks in our free block count + --freeBlockCount; + continue; + } + TimeDuration predictedUse = PredictNextUse(now, blockIndex); + latestPredictedUseForOverflow = std::max(latestPredictedUseForOverflow, predictedUse); + } + } else { + freeBlockCount += maxBlocks - mIndex.Length(); + } + + // Now try to move overflowing blocks to the main part of the cache. + for (int32_t blockIndex = mIndex.Length() - 1; blockIndex >= maxBlocks; + --blockIndex) { + if (IsBlockFree(blockIndex)) + continue; + + Block* block = &mIndex[blockIndex]; + // Try to relocate the block close to other blocks for the first stream. + // There is no point in trying to make it close to other blocks in + // *all* the streams it might belong to. + int32_t destinationBlockIndex = + FindReusableBlock(now, block->mOwners[0].mStream, + block->mOwners[0].mStreamBlock, maxBlocks); + if (destinationBlockIndex < 0) { + // Nowhere to place this overflow block. We won't be able to + // place any more overflow blocks. + break; + } + + if (IsBlockFree(destinationBlockIndex) || + PredictNextUse(now, destinationBlockIndex) > latestPredictedUseForOverflow) { + // Reuse blocks in the main part of the cache that are less useful than + // the least useful overflow blocks + + nsresult rv = mFileCache->MoveBlock(blockIndex, destinationBlockIndex); + + if (NS_SUCCEEDED(rv)) { + // We successfully copied the file data. + CACHE_LOG(LogLevel::Debug, ("Swapping blocks %d and %d (trimming cache)", + blockIndex, destinationBlockIndex)); + // Swapping the block metadata here lets us maintain the + // correct positions in the linked lists + SwapBlocks(blockIndex, destinationBlockIndex); + //Free the overflowing block even if the copy failed. + CACHE_LOG(LogLevel::Debug, ("Released block %d (trimming cache)", blockIndex)); + FreeBlock(blockIndex); + } + } else { + CACHE_LOG(LogLevel::Debug, ("Could not trim cache block %d (destination %d, predicted next use %f, latest predicted use for overflow %f", + blockIndex, destinationBlockIndex, + PredictNextUse(now, destinationBlockIndex).ToSeconds(), + latestPredictedUseForOverflow.ToSeconds())); + } + } + // Try chopping back the array of cache entries and the cache file. + Truncate(); + + // Count the blocks allocated for readahead of non-seekable streams + // (these blocks can't be freed but we don't want them to monopolize the + // cache) + int32_t nonSeekableReadaheadBlockCount = 0; + for (uint32_t i = 0; i < mStreams.Length(); ++i) { + MediaCacheStream* stream = mStreams[i]; + if (!stream->mIsTransportSeekable) { + nonSeekableReadaheadBlockCount += stream->mReadaheadBlocks.GetCount(); + } + } + + // If freeBlockCount is zero, then compute the latest of + // the predicted next-uses for all blocks + TimeDuration latestNextUse; + if (freeBlockCount == 0) { + int32_t reusableBlock = FindReusableBlock(now, nullptr, 0, maxBlocks); + if (reusableBlock >= 0) { + latestNextUse = PredictNextUse(now, reusableBlock); + } + } + + int32_t resumeThreshold = Preferences::GetInt("media.cache_resume_threshold", 10); + int32_t readaheadLimit = Preferences::GetInt("media.cache_readahead_limit", 30); + + for (uint32_t i = 0; i < mStreams.Length(); ++i) { + actions.AppendElement(NONE); + + MediaCacheStream* stream = mStreams[i]; + if (stream->mClosed) { + CACHE_LOG(LogLevel::Debug, ("Stream %p closed", stream)); + continue; + } + + // Figure out where we should be reading from. It's the first + // uncached byte after the current mStreamOffset. + int64_t dataOffset = stream->GetCachedDataEndInternal(stream->mStreamOffset); + MOZ_ASSERT(dataOffset >= 0); + + // Compute where we'd actually seek to to read at readOffset + int64_t desiredOffset = dataOffset; + if (stream->mIsTransportSeekable) { + if (desiredOffset > stream->mChannelOffset && + desiredOffset <= stream->mChannelOffset + SEEK_VS_READ_THRESHOLD) { + // Assume it's more efficient to just keep reading up to the + // desired position instead of trying to seek + desiredOffset = stream->mChannelOffset; + } + } else { + // We can't seek directly to the desired offset... + if (stream->mChannelOffset > desiredOffset) { + // Reading forward won't get us anywhere, we need to go backwards. + // Seek back to 0 (the client will reopen the stream) and then + // read forward. + NS_WARNING("Can't seek backwards, so seeking to 0"); + desiredOffset = 0; + // Flush cached blocks out, since if this is a live stream + // the cached data may be completely different next time we + // read it. We have to assume that live streams don't + // advertise themselves as being seekable... + ReleaseStreamBlocks(stream); + } else { + // otherwise reading forward is looking good, so just stay where we + // are and don't trigger a channel seek! + desiredOffset = stream->mChannelOffset; + } + } + + // Figure out if we should be reading data now or not. It's amazing + // how complex this is, but each decision is simple enough. + bool enableReading; + if (stream->mStreamLength >= 0 && dataOffset >= stream->mStreamLength) { + // We want data at the end of the stream, where there's nothing to + // read. We don't want to try to read if we're suspended, because that + // might create a new channel and seek unnecessarily (and incorrectly, + // since HTTP doesn't allow seeking to the actual EOF), and we don't want + // to suspend if we're not suspended and already reading at the end of + // the stream, since there just might be more data than the server + // advertised with Content-Length, and we may as well keep reading. + // But we don't want to seek to the end of the stream if we're not + // already there. + CACHE_LOG(LogLevel::Debug, ("Stream %p at end of stream", stream)); + enableReading = !stream->mCacheSuspended && + stream->mStreamLength == stream->mChannelOffset; + } else if (desiredOffset < stream->mStreamOffset) { + // We're reading to try to catch up to where the current stream + // reader wants to be. Better not stop. + CACHE_LOG(LogLevel::Debug, ("Stream %p catching up", stream)); + enableReading = true; + } else if (desiredOffset < stream->mStreamOffset + BLOCK_SIZE) { + // The stream reader is waiting for us, or nearly so. Better feed it. + CACHE_LOG(LogLevel::Debug, ("Stream %p feeding reader", stream)); + enableReading = true; + } else if (!stream->mIsTransportSeekable && + nonSeekableReadaheadBlockCount >= maxBlocks*NONSEEKABLE_READAHEAD_MAX) { + // This stream is not seekable and there are already too many blocks + // being cached for readahead for nonseekable streams (which we can't + // free). So stop reading ahead now. + CACHE_LOG(LogLevel::Debug, ("Stream %p throttling non-seekable readahead", stream)); + enableReading = false; + } else if (mIndex.Length() > uint32_t(maxBlocks)) { + // We're in the process of bringing the cache size back to the + // desired limit, so don't bring in more data yet + CACHE_LOG(LogLevel::Debug, ("Stream %p throttling to reduce cache size", stream)); + enableReading = false; + } else { + TimeDuration predictedNewDataUse = PredictNextUseForIncomingData(stream); + + if (stream->mCacheSuspended && + predictedNewDataUse.ToSeconds() > resumeThreshold) { + // Don't need data for a while, so don't bother waking up the stream + CACHE_LOG(LogLevel::Debug, ("Stream %p avoiding wakeup since more data is not needed", stream)); + enableReading = false; + } else if (predictedNewDataUse.ToSeconds() > readaheadLimit) { + // Don't read ahead more than this much + CACHE_LOG(LogLevel::Debug, ("Stream %p throttling to avoid reading ahead too far", stream)); + enableReading = false; + } else if (freeBlockCount > 0) { + // Free blocks in the cache, so keep reading + CACHE_LOG(LogLevel::Debug, ("Stream %p reading since there are free blocks", stream)); + enableReading = true; + } else if (latestNextUse <= TimeDuration(0)) { + // No reusable blocks, so can't read anything + CACHE_LOG(LogLevel::Debug, ("Stream %p throttling due to no reusable blocks", stream)); + enableReading = false; + } else { + // Read ahead if the data we expect to read is more valuable than + // the least valuable block in the main part of the cache + CACHE_LOG(LogLevel::Debug, ("Stream %p predict next data in %f, current worst block is %f", + stream, predictedNewDataUse.ToSeconds(), latestNextUse.ToSeconds())); + enableReading = predictedNewDataUse < latestNextUse; + } + } + + if (enableReading) { + for (uint32_t j = 0; j < i; ++j) { + MediaCacheStream* other = mStreams[j]; + if (other->mResourceID == stream->mResourceID && + !other->mClosed && !other->mClient->IsSuspended() && + other->mChannelOffset/BLOCK_SIZE == desiredOffset/BLOCK_SIZE) { + // This block is already going to be read by the other stream. + // So don't try to read it from this stream as well. + enableReading = false; + CACHE_LOG(LogLevel::Debug, ("Stream %p waiting on same block (%lld) from stream %p", + stream, desiredOffset/BLOCK_SIZE, other)); + break; + } + } + } + + if (stream->mChannelOffset != desiredOffset && enableReading) { + // We need to seek now. + NS_ASSERTION(stream->mIsTransportSeekable || desiredOffset == 0, + "Trying to seek in a non-seekable stream!"); + // Round seek offset down to the start of the block. This is essential + // because we don't want to think we have part of a block already + // in mPartialBlockBuffer. + stream->mChannelOffset = (desiredOffset/BLOCK_SIZE)*BLOCK_SIZE; + actions[i] = stream->mCacheSuspended ? SEEK_AND_RESUME : SEEK; + } else if (enableReading && stream->mCacheSuspended) { + actions[i] = RESUME; + } else if (!enableReading && !stream->mCacheSuspended) { + actions[i] = SUSPEND; + } + } +#ifdef DEBUG + mInUpdate = false; +#endif + } + + // Update the channel state without holding our cache lock. While we're + // doing this, decoder threads may be running and seeking, reading or changing + // other cache state. That's OK, they'll trigger new Update events and we'll + // get back here and revise our decisions. The important thing here is that + // performing these actions only depends on mChannelOffset and + // the action, which can only be written by the main thread (i.e., this + // thread), so we don't have races here. + + // First, update the mCacheSuspended/mCacheEnded flags so that they're all correct + // when we fire our CacheClient commands below. Those commands can rely on these flags + // being set correctly for all streams. + for (uint32_t i = 0; i < mStreams.Length(); ++i) { + MediaCacheStream* stream = mStreams[i]; + switch (actions[i]) { + case SEEK: + case SEEK_AND_RESUME: + stream->mCacheSuspended = false; + stream->mChannelEnded = false; + break; + case RESUME: + stream->mCacheSuspended = false; + break; + case SUSPEND: + stream->mCacheSuspended = true; + break; + default: + break; + } + stream->mHasHadUpdate = true; + } + + for (uint32_t i = 0; i < mStreams.Length(); ++i) { + MediaCacheStream* stream = mStreams[i]; + nsresult rv; + switch (actions[i]) { + case SEEK: + case SEEK_AND_RESUME: + CACHE_LOG(LogLevel::Debug, ("Stream %p CacheSeek to %lld (resume=%d)", stream, + (long long)stream->mChannelOffset, actions[i] == SEEK_AND_RESUME)); + rv = stream->mClient->CacheClientSeek(stream->mChannelOffset, + actions[i] == SEEK_AND_RESUME); + break; + case RESUME: + CACHE_LOG(LogLevel::Debug, ("Stream %p Resumed", stream)); + rv = stream->mClient->CacheClientResume(); + QueueSuspendedStatusUpdate(stream->mResourceID); + break; + case SUSPEND: + CACHE_LOG(LogLevel::Debug, ("Stream %p Suspended", stream)); + rv = stream->mClient->CacheClientSuspend(); + QueueSuspendedStatusUpdate(stream->mResourceID); + break; + default: + rv = NS_OK; + break; + } + + if (NS_FAILED(rv)) { + // Close the streams that failed due to error. This will cause all + // client Read and Seek operations on those streams to fail. Blocked + // Reads will also be woken up. + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + stream->CloseInternal(mon); + } + } + + // Notify streams about the suspended status changes. + for (uint32_t i = 0; i < mSuspendedStatusToNotify.Length(); ++i) { + MediaCache::ResourceStreamIterator iter(mSuspendedStatusToNotify[i]); + while (MediaCacheStream* stream = iter.Next()) { + stream->mClient->CacheClientNotifySuspendedStatusChanged(); + } + } + mSuspendedStatusToNotify.Clear(); +} + +class UpdateEvent : public Runnable +{ +public: + NS_IMETHOD Run() override + { + if (gMediaCache) { + gMediaCache->Update(); + } + return NS_OK; + } +}; + +void +MediaCache::QueueUpdate() +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + // Queuing an update while we're in an update raises a high risk of + // triggering endless events + NS_ASSERTION(!mInUpdate, + "Queuing an update while we're in an update"); + if (mUpdateQueued) + return; + mUpdateQueued = true; + // XXX MediaCache does updates when decoders are still running at + // shutdown and get freed in the final cycle-collector cleanup. So + // don't leak a runnable in that case. + nsCOMPtr<nsIThread> mainThread = do_GetMainThread(); + if (mainThread) { + nsCOMPtr<nsIRunnable> event = new UpdateEvent(); + mainThread->Dispatch(event.forget(), NS_DISPATCH_NORMAL); + } +} + +void +MediaCache::QueueSuspendedStatusUpdate(int64_t aResourceID) +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + if (!mSuspendedStatusToNotify.Contains(aResourceID)) { + mSuspendedStatusToNotify.AppendElement(aResourceID); + } +} + +#ifdef DEBUG_VERIFY_CACHE +void +MediaCache::Verify() +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + mFreeBlocks.Verify(); + for (uint32_t i = 0; i < mStreams.Length(); ++i) { + MediaCacheStream* stream = mStreams[i]; + stream->mReadaheadBlocks.Verify(); + stream->mPlayedBlocks.Verify(); + stream->mMetadataBlocks.Verify(); + + // Verify that the readahead blocks are listed in stream block order + int32_t block = stream->mReadaheadBlocks.GetFirstBlock(); + int32_t lastStreamBlock = -1; + while (block >= 0) { + uint32_t j = 0; + while (mIndex[block].mOwners[j].mStream != stream) { + ++j; + } + int32_t nextStreamBlock = + int32_t(mIndex[block].mOwners[j].mStreamBlock); + NS_ASSERTION(lastStreamBlock < nextStreamBlock, + "Blocks not increasing in readahead stream"); + lastStreamBlock = nextStreamBlock; + block = stream->mReadaheadBlocks.GetNextBlock(block); + } + } +} +#endif + +void +MediaCache::InsertReadaheadBlock(BlockOwner* aBlockOwner, + int32_t aBlockIndex) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + // Find the last block whose stream block is before aBlockIndex's + // stream block, and insert after it + MediaCacheStream* stream = aBlockOwner->mStream; + int32_t readaheadIndex = stream->mReadaheadBlocks.GetLastBlock(); + while (readaheadIndex >= 0) { + BlockOwner* bo = GetBlockOwner(readaheadIndex, stream); + NS_ASSERTION(bo, "stream must own its blocks"); + if (bo->mStreamBlock < aBlockOwner->mStreamBlock) { + stream->mReadaheadBlocks.AddAfter(aBlockIndex, readaheadIndex); + return; + } + NS_ASSERTION(bo->mStreamBlock > aBlockOwner->mStreamBlock, + "Duplicated blocks??"); + readaheadIndex = stream->mReadaheadBlocks.GetPrevBlock(readaheadIndex); + } + + stream->mReadaheadBlocks.AddFirstBlock(aBlockIndex); + Verify(); +} + +void +MediaCache::AllocateAndWriteBlock(MediaCacheStream* aStream, const void* aData, + MediaCacheStream::ReadMode aMode) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + int32_t streamBlockIndex = aStream->mChannelOffset/BLOCK_SIZE; + + // Remove all cached copies of this block + ResourceStreamIterator iter(aStream->mResourceID); + while (MediaCacheStream* stream = iter.Next()) { + while (streamBlockIndex >= int32_t(stream->mBlocks.Length())) { + stream->mBlocks.AppendElement(-1); + } + if (stream->mBlocks[streamBlockIndex] >= 0) { + // We no longer want to own this block + int32_t globalBlockIndex = stream->mBlocks[streamBlockIndex]; + CACHE_LOG(LogLevel::Debug, ("Released block %d from stream %p block %d(%lld)", + globalBlockIndex, stream, streamBlockIndex, (long long)streamBlockIndex*BLOCK_SIZE)); + RemoveBlockOwner(globalBlockIndex, stream); + } + } + + // Extend the mBlocks array as necessary + + TimeStamp now = TimeStamp::Now(); + int32_t blockIndex = FindBlockForIncomingData(now, aStream); + if (blockIndex >= 0) { + FreeBlock(blockIndex); + + Block* block = &mIndex[blockIndex]; + CACHE_LOG(LogLevel::Debug, ("Allocated block %d to stream %p block %d(%lld)", + blockIndex, aStream, streamBlockIndex, (long long)streamBlockIndex*BLOCK_SIZE)); + + mFreeBlocks.RemoveBlock(blockIndex); + + // Tell each stream using this resource about the new block. + ResourceStreamIterator iter(aStream->mResourceID); + while (MediaCacheStream* stream = iter.Next()) { + BlockOwner* bo = block->mOwners.AppendElement(); + if (!bo) + return; + + bo->mStream = stream; + bo->mStreamBlock = streamBlockIndex; + bo->mLastUseTime = now; + stream->mBlocks[streamBlockIndex] = blockIndex; + if (streamBlockIndex*BLOCK_SIZE < stream->mStreamOffset) { + bo->mClass = aMode == MediaCacheStream::MODE_PLAYBACK + ? PLAYED_BLOCK : METADATA_BLOCK; + // This must be the most-recently-used block, since we + // marked it as used now (which may be slightly bogus, but we'll + // treat it as used for simplicity). + GetListForBlock(bo)->AddFirstBlock(blockIndex); + Verify(); + } else { + // This may not be the latest readahead block, although it usually + // will be. We may have to scan for the right place to insert + // the block in the list. + bo->mClass = READAHEAD_BLOCK; + InsertReadaheadBlock(bo, blockIndex); + } + } + + nsresult rv = mFileCache->WriteBlock(blockIndex, reinterpret_cast<const uint8_t*>(aData)); + if (NS_FAILED(rv)) { + CACHE_LOG(LogLevel::Debug, ("Released block %d from stream %p block %d(%lld)", + blockIndex, aStream, streamBlockIndex, (long long)streamBlockIndex*BLOCK_SIZE)); + FreeBlock(blockIndex); + } + } + + // Queue an Update since the cache state has changed (for example + // we might want to stop loading because the cache is full) + QueueUpdate(); +} + +void +MediaCache::OpenStream(MediaCacheStream* aStream) +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + CACHE_LOG(LogLevel::Debug, ("Stream %p opened", aStream)); + mStreams.AppendElement(aStream); + aStream->mResourceID = AllocateResourceID(); + + // Queue an update since a new stream has been opened. + gMediaCache->QueueUpdate(); +} + +void +MediaCache::ReleaseStream(MediaCacheStream* aStream) +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + CACHE_LOG(LogLevel::Debug, ("Stream %p closed", aStream)); + mStreams.RemoveElement(aStream); + + // Update MediaCache again for |mStreams| is changed. + // We need to re-run Update() to ensure streams reading from the same resource + // as the removed stream get a chance to continue reading. + gMediaCache->QueueUpdate(); +} + +void +MediaCache::ReleaseStreamBlocks(MediaCacheStream* aStream) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + // XXX scanning the entire stream doesn't seem great, if not much of it + // is cached, but the only easy alternative is to scan the entire cache + // which isn't better + uint32_t length = aStream->mBlocks.Length(); + for (uint32_t i = 0; i < length; ++i) { + int32_t blockIndex = aStream->mBlocks[i]; + if (blockIndex >= 0) { + CACHE_LOG(LogLevel::Debug, ("Released block %d from stream %p block %d(%lld)", + blockIndex, aStream, i, (long long)i*BLOCK_SIZE)); + RemoveBlockOwner(blockIndex, aStream); + } + } +} + +void +MediaCache::Truncate() +{ + uint32_t end; + for (end = mIndex.Length(); end > 0; --end) { + if (!IsBlockFree(end - 1)) + break; + mFreeBlocks.RemoveBlock(end - 1); + } + + if (end < mIndex.Length()) { + mIndex.TruncateLength(end); + // XXX We could truncate the cache file here, but we don't seem + // to have a cross-platform API for doing that. At least when all + // streams are closed we shut down the cache, which erases the + // file at that point. + } +} + +void +MediaCache::NoteBlockUsage(MediaCacheStream* aStream, int32_t aBlockIndex, + int64_t aStreamOffset, + MediaCacheStream::ReadMode aMode, TimeStamp aNow) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + if (aBlockIndex < 0) { + // this block is not in the cache yet + return; + } + + BlockOwner* bo = GetBlockOwner(aBlockIndex, aStream); + if (!bo) { + // this block is not in the cache yet + return; + } + + // The following check has to be <= because the stream offset has + // not yet been updated for the data read from this block + NS_ASSERTION(bo->mStreamBlock*BLOCK_SIZE <= aStreamOffset, + "Using a block that's behind the read position?"); + + GetListForBlock(bo)->RemoveBlock(aBlockIndex); + bo->mClass = + (aMode == MediaCacheStream::MODE_METADATA || bo->mClass == METADATA_BLOCK) + ? METADATA_BLOCK : PLAYED_BLOCK; + // Since this is just being used now, it can definitely be at the front + // of mMetadataBlocks or mPlayedBlocks + GetListForBlock(bo)->AddFirstBlock(aBlockIndex); + bo->mLastUseTime = aNow; + Verify(); +} + +void +MediaCache::NoteSeek(MediaCacheStream* aStream, int64_t aOldOffset) +{ + mReentrantMonitor.AssertCurrentThreadIn(); + + if (aOldOffset < aStream->mStreamOffset) { + // We seeked forward. Convert blocks from readahead to played. + // Any readahead block that intersects the seeked-over range must + // be converted. + int32_t blockIndex = aOldOffset/BLOCK_SIZE; + int32_t endIndex = + std::min<int64_t>((aStream->mStreamOffset + BLOCK_SIZE - 1)/BLOCK_SIZE, + aStream->mBlocks.Length()); + TimeStamp now = TimeStamp::Now(); + while (blockIndex < endIndex) { + int32_t cacheBlockIndex = aStream->mBlocks[blockIndex]; + if (cacheBlockIndex >= 0) { + // Marking the block used may not be exactly what we want but + // it's simple + NoteBlockUsage(aStream, cacheBlockIndex, aStream->mStreamOffset, + MediaCacheStream::MODE_PLAYBACK, now); + } + ++blockIndex; + } + } else { + // We seeked backward. Convert from played to readahead. + // Any played block that is entirely after the start of the seeked-over + // range must be converted. + int32_t blockIndex = + (aStream->mStreamOffset + BLOCK_SIZE - 1)/BLOCK_SIZE; + int32_t endIndex = + std::min<int64_t>((aOldOffset + BLOCK_SIZE - 1)/BLOCK_SIZE, + aStream->mBlocks.Length()); + while (blockIndex < endIndex) { + MOZ_ASSERT(endIndex > 0); + int32_t cacheBlockIndex = aStream->mBlocks[endIndex - 1]; + if (cacheBlockIndex >= 0) { + BlockOwner* bo = GetBlockOwner(cacheBlockIndex, aStream); + NS_ASSERTION(bo, "Stream doesn't own its blocks?"); + if (bo->mClass == PLAYED_BLOCK) { + aStream->mPlayedBlocks.RemoveBlock(cacheBlockIndex); + bo->mClass = READAHEAD_BLOCK; + // Adding this as the first block is sure to be OK since + // this must currently be the earliest readahead block + // (that's why we're proceeding backwards from the end of + // the seeked range to the start) + aStream->mReadaheadBlocks.AddFirstBlock(cacheBlockIndex); + Verify(); + } + } + --endIndex; + } + } +} + +void +MediaCacheStream::NotifyDataLength(int64_t aLength) +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + mStreamLength = aLength; +} + +void +MediaCacheStream::NotifyDataStarted(int64_t aOffset) +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + NS_WARNING_ASSERTION(aOffset == mChannelOffset, + "Server is giving us unexpected offset"); + MOZ_ASSERT(aOffset >= 0); + mChannelOffset = aOffset; + if (mStreamLength >= 0) { + // If we started reading at a certain offset, then for sure + // the stream is at least that long. + mStreamLength = std::max(mStreamLength, mChannelOffset); + } +} + +bool +MediaCacheStream::UpdatePrincipal(nsIPrincipal* aPrincipal) +{ + return nsContentUtils::CombineResourcePrincipals(&mPrincipal, aPrincipal); +} + +void +MediaCacheStream::NotifyDataReceived(int64_t aSize, const char* aData, + nsIPrincipal* aPrincipal) +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + // Update principals before putting the data in the cache. This is important, + // we want to make sure all principals are updated before any consumer + // can see the new data. + // We do this without holding the cache monitor, in case the client wants + // to do something that takes a lock. + { + MediaCache::ResourceStreamIterator iter(mResourceID); + while (MediaCacheStream* stream = iter.Next()) { + if (stream->UpdatePrincipal(aPrincipal)) { + stream->mClient->CacheClientNotifyPrincipalChanged(); + } + } + } + + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + int64_t size = aSize; + const char* data = aData; + + CACHE_LOG(LogLevel::Debug, ("Stream %p DataReceived at %lld count=%lld", + this, (long long)mChannelOffset, (long long)aSize)); + + // We process the data one block (or part of a block) at a time + while (size > 0) { + uint32_t blockIndex = mChannelOffset/BLOCK_SIZE; + int32_t blockOffset = int32_t(mChannelOffset - blockIndex*BLOCK_SIZE); + int32_t chunkSize = std::min<int64_t>(BLOCK_SIZE - blockOffset, size); + + // This gets set to something non-null if we have a whole block + // of data to write to the cache + const char* blockDataToStore = nullptr; + ReadMode mode = MODE_PLAYBACK; + if (blockOffset == 0 && chunkSize == BLOCK_SIZE) { + // We received a whole block, so avoid a useless copy through + // mPartialBlockBuffer + blockDataToStore = data; + } else { + if (blockOffset == 0) { + // We've just started filling this buffer so now is a good time + // to clear this flag. + mMetadataInPartialBlockBuffer = false; + } + memcpy(reinterpret_cast<char*>(mPartialBlockBuffer.get()) + blockOffset, + data, chunkSize); + + if (blockOffset + chunkSize == BLOCK_SIZE) { + // We completed a block, so lets write it out. + blockDataToStore = reinterpret_cast<char*>(mPartialBlockBuffer.get()); + if (mMetadataInPartialBlockBuffer) { + mode = MODE_METADATA; + } + } + } + + if (blockDataToStore) { + gMediaCache->AllocateAndWriteBlock(this, blockDataToStore, mode); + } + + mChannelOffset += chunkSize; + size -= chunkSize; + data += chunkSize; + } + + MediaCache::ResourceStreamIterator iter(mResourceID); + while (MediaCacheStream* stream = iter.Next()) { + if (stream->mStreamLength >= 0) { + // The stream is at least as long as what we've read + stream->mStreamLength = std::max(stream->mStreamLength, mChannelOffset); + } + stream->mClient->CacheClientNotifyDataReceived(); + } + + // Notify in case there's a waiting reader + // XXX it would be fairly easy to optimize things a lot more to + // avoid waking up reader threads unnecessarily + mon.NotifyAll(); +} + +void +MediaCacheStream::FlushPartialBlockInternal(bool aNotifyAll, + ReentrantMonitorAutoEnter& aReentrantMonitor) +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + int32_t blockOffset = int32_t(mChannelOffset%BLOCK_SIZE); + if (blockOffset > 0) { + CACHE_LOG(LogLevel::Debug, + ("Stream %p writing partial block: [%d] bytes; " + "mStreamOffset [%lld] mChannelOffset[%lld] mStreamLength [%lld] " + "notifying: [%s]", + this, blockOffset, mStreamOffset, mChannelOffset, mStreamLength, + aNotifyAll ? "yes" : "no")); + + // Write back the partial block + memset(reinterpret_cast<char*>(mPartialBlockBuffer.get()) + blockOffset, 0, + BLOCK_SIZE - blockOffset); + gMediaCache->AllocateAndWriteBlock(this, mPartialBlockBuffer.get(), + mMetadataInPartialBlockBuffer ? MODE_METADATA : MODE_PLAYBACK); + } + + // |mChannelOffset == 0| means download ends with no bytes received. + // We should also wake up those readers who are waiting for data + // that will never come. + if ((blockOffset > 0 || mChannelOffset == 0) && aNotifyAll) { + // Wake up readers who may be waiting for this data + aReentrantMonitor.NotifyAll(); + } +} + +void +MediaCacheStream::FlushPartialBlock() +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + + // Write the current partial block to memory. + // Note: This writes a full block, so if data is not at the end of the + // stream, the decoder must subsequently choose correct start and end offsets + // for reading/seeking. + FlushPartialBlockInternal(false, mon); + + gMediaCache->QueueUpdate(); +} + +void +MediaCacheStream::NotifyDataEnded(nsresult aStatus) +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + + if (NS_FAILED(aStatus)) { + // Disconnect from other streams sharing our resource, since they + // should continue trying to load. Our load might have been deliberately + // canceled and that shouldn't affect other streams. + mResourceID = gMediaCache->AllocateResourceID(); + } + + // It is prudent to update channel/cache status before calling + // CacheClientNotifyDataEnded() which will read |mChannelEnded|. + FlushPartialBlockInternal(true, mon); + mChannelEnded = true; + gMediaCache->QueueUpdate(); + + MediaCache::ResourceStreamIterator iter(mResourceID); + while (MediaCacheStream* stream = iter.Next()) { + if (NS_SUCCEEDED(aStatus)) { + // We read the whole stream, so remember the true length + stream->mStreamLength = mChannelOffset; + } + if (!stream->mDidNotifyDataEnded) { + stream->mDidNotifyDataEnded = true; + stream->mNotifyDataEndedStatus = aStatus; + stream->mClient->CacheClientNotifyDataEnded(aStatus); + } + } +} + +void +MediaCacheStream::NotifyChannelRecreated() +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + mChannelEnded = false; + mDidNotifyDataEnded = false; +} + +MediaCacheStream::~MediaCacheStream() +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + NS_ASSERTION(!mPinCount, "Unbalanced Pin"); + + if (gMediaCache) { + NS_ASSERTION(mClosed, "Stream was not closed"); + gMediaCache->ReleaseStream(this); + MediaCache::MaybeShutdown(); + } +} + +void +MediaCacheStream::SetTransportSeekable(bool aIsTransportSeekable) +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + NS_ASSERTION(mIsTransportSeekable || aIsTransportSeekable || + mChannelOffset == 0, "channel offset must be zero when we become non-seekable"); + mIsTransportSeekable = aIsTransportSeekable; + // Queue an Update since we may change our strategy for dealing + // with this stream + gMediaCache->QueueUpdate(); +} + +bool +MediaCacheStream::IsTransportSeekable() +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + return mIsTransportSeekable; +} + +bool +MediaCacheStream::AreAllStreamsForResourceSuspended() +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + MediaCache::ResourceStreamIterator iter(mResourceID); + // Look for a stream that's able to read the data we need + int64_t dataOffset = -1; + while (MediaCacheStream* stream = iter.Next()) { + if (stream->mCacheSuspended || stream->mChannelEnded || stream->mClosed) { + continue; + } + if (dataOffset < 0) { + dataOffset = GetCachedDataEndInternal(mStreamOffset); + } + // Ignore streams that are reading beyond the data we need + if (stream->mChannelOffset > dataOffset) { + continue; + } + return false; + } + + return true; +} + +void +MediaCacheStream::Close() +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + if (!mInitialized) + return; + + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + CloseInternal(mon); + // Queue an Update since we may have created more free space. Don't do + // it from CloseInternal since that gets called by Update() itself + // sometimes, and we try to not to queue updates from Update(). + gMediaCache->QueueUpdate(); +} + +void +MediaCacheStream::EnsureCacheUpdate() +{ + if (mHasHadUpdate) + return; + gMediaCache->Update(); +} + +void +MediaCacheStream::CloseInternal(ReentrantMonitorAutoEnter& aReentrantMonitor) +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + if (mClosed) + return; + mClosed = true; + // Closing a stream will change the return value of + // MediaCacheStream::AreAllStreamsForResourceSuspended as well as + // ChannelMediaResource::IsSuspendedByCache. Let's notify it. + gMediaCache->QueueSuspendedStatusUpdate(mResourceID); + gMediaCache->ReleaseStreamBlocks(this); + // Wake up any blocked readers + aReentrantMonitor.NotifyAll(); +} + +void +MediaCacheStream::Pin() +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + ++mPinCount; + // Queue an Update since we may no longer want to read more into the + // cache, if this stream's block have become non-evictable + gMediaCache->QueueUpdate(); +} + +void +MediaCacheStream::Unpin() +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + NS_ASSERTION(mPinCount > 0, "Unbalanced Unpin"); + --mPinCount; + // Queue an Update since we may be able to read more into the + // cache, if this stream's block have become evictable + gMediaCache->QueueUpdate(); +} + +int64_t +MediaCacheStream::GetLength() +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + return mStreamLength; +} + +int64_t +MediaCacheStream::GetNextCachedData(int64_t aOffset) +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + return GetNextCachedDataInternal(aOffset); +} + +int64_t +MediaCacheStream::GetCachedDataEnd(int64_t aOffset) +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + return GetCachedDataEndInternal(aOffset); +} + +bool +MediaCacheStream::IsDataCachedToEndOfStream(int64_t aOffset) +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + if (mStreamLength < 0) + return false; + return GetCachedDataEndInternal(aOffset) >= mStreamLength; +} + +int64_t +MediaCacheStream::GetCachedDataEndInternal(int64_t aOffset) +{ + gMediaCache->GetReentrantMonitor().AssertCurrentThreadIn(); + uint32_t startBlockIndex = aOffset/BLOCK_SIZE; + uint32_t blockIndex = startBlockIndex; + while (blockIndex < mBlocks.Length() && mBlocks[blockIndex] != -1) { + ++blockIndex; + } + int64_t result = blockIndex*BLOCK_SIZE; + if (blockIndex == mChannelOffset/BLOCK_SIZE) { + // The block containing mChannelOffset may be partially read but not + // yet committed to the main cache + result = mChannelOffset; + } + if (mStreamLength >= 0) { + // The last block in the cache may only be partially valid, so limit + // the cached range to the stream length + result = std::min(result, mStreamLength); + } + return std::max(result, aOffset); +} + +int64_t +MediaCacheStream::GetNextCachedDataInternal(int64_t aOffset) +{ + gMediaCache->GetReentrantMonitor().AssertCurrentThreadIn(); + if (aOffset == mStreamLength) + return -1; + + uint32_t startBlockIndex = aOffset/BLOCK_SIZE; + uint32_t channelBlockIndex = mChannelOffset/BLOCK_SIZE; + + if (startBlockIndex == channelBlockIndex && + aOffset < mChannelOffset) { + // The block containing mChannelOffset is partially read, but not + // yet committed to the main cache. aOffset lies in the partially + // read portion, thus it is effectively cached. + return aOffset; + } + + if (startBlockIndex >= mBlocks.Length()) + return -1; + + // Is the current block cached? + if (mBlocks[startBlockIndex] != -1) + return aOffset; + + // Count the number of uncached blocks + bool hasPartialBlock = (mChannelOffset % BLOCK_SIZE) != 0; + uint32_t blockIndex = startBlockIndex + 1; + while (true) { + if ((hasPartialBlock && blockIndex == channelBlockIndex) || + (blockIndex < mBlocks.Length() && mBlocks[blockIndex] != -1)) { + // We at the incoming channel block, which has has data in it, + // or are we at a cached block. Return index of block start. + return blockIndex * BLOCK_SIZE; + } + + // No more cached blocks? + if (blockIndex >= mBlocks.Length()) + return -1; + + ++blockIndex; + } + + NS_NOTREACHED("Should return in loop"); + return -1; +} + +void +MediaCacheStream::SetReadMode(ReadMode aMode) +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + if (aMode == mCurrentMode) + return; + mCurrentMode = aMode; + gMediaCache->QueueUpdate(); +} + +void +MediaCacheStream::SetPlaybackRate(uint32_t aBytesPerSecond) +{ + NS_ASSERTION(aBytesPerSecond > 0, "Zero playback rate not allowed"); + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + if (aBytesPerSecond == mPlaybackBytesPerSecond) + return; + mPlaybackBytesPerSecond = aBytesPerSecond; + gMediaCache->QueueUpdate(); +} + +nsresult +MediaCacheStream::Seek(int32_t aWhence, int64_t aOffset) +{ + NS_ASSERTION(!NS_IsMainThread(), "Don't call on main thread"); + + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + if (mClosed) + return NS_ERROR_FAILURE; + + int64_t oldOffset = mStreamOffset; + int64_t newOffset = mStreamOffset; + switch (aWhence) { + case PR_SEEK_END: + if (mStreamLength < 0) + return NS_ERROR_FAILURE; + newOffset = mStreamLength + aOffset; + break; + case PR_SEEK_CUR: + newOffset += aOffset; + break; + case PR_SEEK_SET: + newOffset = aOffset; + break; + default: + NS_ERROR("Unknown whence"); + return NS_ERROR_FAILURE; + } + + if (newOffset < 0) + return NS_ERROR_FAILURE; + mStreamOffset = newOffset; + + CACHE_LOG(LogLevel::Debug, ("Stream %p Seek to %lld", this, (long long)mStreamOffset)); + gMediaCache->NoteSeek(this, oldOffset); + + gMediaCache->QueueUpdate(); + return NS_OK; +} + +int64_t +MediaCacheStream::Tell() +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + return mStreamOffset; +} + +nsresult +MediaCacheStream::Read(char* aBuffer, uint32_t aCount, uint32_t* aBytes) +{ + NS_ASSERTION(!NS_IsMainThread(), "Don't call on main thread"); + + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + if (mClosed) + return NS_ERROR_FAILURE; + + // Cache the offset in case it is changed again when we are waiting for the + // monitor to be notified to avoid reading at the wrong position. + auto streamOffset = mStreamOffset; + + uint32_t count = 0; + // Read one block (or part of a block) at a time + while (count < aCount) { + uint32_t streamBlock = uint32_t(streamOffset/BLOCK_SIZE); + uint32_t offsetInStreamBlock = uint32_t(streamOffset - streamBlock*BLOCK_SIZE); + int64_t size = std::min<int64_t>(aCount - count, BLOCK_SIZE - offsetInStreamBlock); + + if (mStreamLength >= 0) { + // Don't try to read beyond the end of the stream + int64_t bytesRemaining = mStreamLength - streamOffset; + if (bytesRemaining <= 0) { + // Get out of here and return NS_OK + break; + } + size = std::min(size, bytesRemaining); + // Clamp size until 64-bit file size issues are fixed. + size = std::min(size, int64_t(INT32_MAX)); + } + + int32_t cacheBlock = streamBlock < mBlocks.Length() ? mBlocks[streamBlock] : -1; + if (cacheBlock < 0) { + // We don't have a complete cached block here. + + if (count > 0) { + // Some data has been read, so return what we've got instead of + // blocking or trying to find a stream with a partial block. + break; + } + + // See if the data is available in the partial cache block of any + // stream reading this resource. We need to do this in case there is + // another stream with this resource that has all the data to the end of + // the stream but the data doesn't end on a block boundary. + MediaCacheStream* streamWithPartialBlock = nullptr; + MediaCache::ResourceStreamIterator iter(mResourceID); + while (MediaCacheStream* stream = iter.Next()) { + if (uint32_t(stream->mChannelOffset/BLOCK_SIZE) == streamBlock && + streamOffset < stream->mChannelOffset) { + streamWithPartialBlock = stream; + break; + } + } + if (streamWithPartialBlock) { + // We can just use the data in mPartialBlockBuffer. In fact we should + // use it rather than waiting for the block to fill and land in + // the cache. + int64_t bytes = std::min<int64_t>(size, streamWithPartialBlock->mChannelOffset - streamOffset); + // Clamp bytes until 64-bit file size issues are fixed. + bytes = std::min(bytes, int64_t(INT32_MAX)); + MOZ_ASSERT(bytes >= 0 && bytes <= aCount, "Bytes out of range."); + memcpy(aBuffer, + reinterpret_cast<char*>(streamWithPartialBlock->mPartialBlockBuffer.get()) + offsetInStreamBlock, bytes); + if (mCurrentMode == MODE_METADATA) { + streamWithPartialBlock->mMetadataInPartialBlockBuffer = true; + } + streamOffset += bytes; + count = bytes; + break; + } + + // No data has been read yet, so block + mon.Wait(); + if (mClosed) { + // We may have successfully read some data, but let's just throw + // that out. + return NS_ERROR_FAILURE; + } + continue; + } + + gMediaCache->NoteBlockUsage(this, cacheBlock, streamOffset, mCurrentMode, TimeStamp::Now()); + + int64_t offset = cacheBlock*BLOCK_SIZE + offsetInStreamBlock; + int32_t bytes; + MOZ_ASSERT(size >= 0 && size <= INT32_MAX, "Size out of range."); + nsresult rv = gMediaCache->ReadCacheFile(offset, aBuffer + count, int32_t(size), &bytes); + if (NS_FAILED(rv)) { + if (count == 0) + return rv; + // If we did successfully read some data, may as well return it + break; + } + streamOffset += bytes; + count += bytes; + } + + if (count > 0) { + // Some data was read, so queue an update since block priorities may + // have changed + gMediaCache->QueueUpdate(); + } + CACHE_LOG(LogLevel::Debug, ("Stream %p Read at %lld count=%d", this, streamOffset-count, count)); + *aBytes = count; + mStreamOffset = streamOffset; + return NS_OK; +} + +nsresult +MediaCacheStream::ReadAt(int64_t aOffset, char* aBuffer, + uint32_t aCount, uint32_t* aBytes) +{ + NS_ASSERTION(!NS_IsMainThread(), "Don't call on main thread"); + + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + nsresult rv = Seek(nsISeekableStream::NS_SEEK_SET, aOffset); + if (NS_FAILED(rv)) return rv; + return Read(aBuffer, aCount, aBytes); +} + +nsresult +MediaCacheStream::ReadFromCache(char* aBuffer, int64_t aOffset, int64_t aCount) +{ + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + if (mClosed) + return NS_ERROR_FAILURE; + + // Read one block (or part of a block) at a time + uint32_t count = 0; + int64_t streamOffset = aOffset; + while (count < aCount) { + uint32_t streamBlock = uint32_t(streamOffset/BLOCK_SIZE); + uint32_t offsetInStreamBlock = + uint32_t(streamOffset - streamBlock*BLOCK_SIZE); + int64_t size = std::min<int64_t>(aCount - count, BLOCK_SIZE - offsetInStreamBlock); + + if (mStreamLength >= 0) { + // Don't try to read beyond the end of the stream + int64_t bytesRemaining = mStreamLength - streamOffset; + if (bytesRemaining <= 0) { + return NS_ERROR_FAILURE; + } + size = std::min(size, bytesRemaining); + // Clamp size until 64-bit file size issues are fixed. + size = std::min(size, int64_t(INT32_MAX)); + } + + int32_t bytes; + uint32_t channelBlock = uint32_t(mChannelOffset/BLOCK_SIZE); + int32_t cacheBlock = streamBlock < mBlocks.Length() ? mBlocks[streamBlock] : -1; + if (channelBlock == streamBlock && streamOffset < mChannelOffset) { + // We can just use the data in mPartialBlockBuffer. In fact we should + // use it rather than waiting for the block to fill and land in + // the cache. + // Clamp bytes until 64-bit file size issues are fixed. + int64_t toCopy = std::min<int64_t>(size, mChannelOffset - streamOffset); + bytes = std::min(toCopy, int64_t(INT32_MAX)); + MOZ_ASSERT(bytes >= 0 && bytes <= toCopy, "Bytes out of range."); + memcpy(aBuffer + count, + reinterpret_cast<char*>(mPartialBlockBuffer.get()) + offsetInStreamBlock, bytes); + } else { + if (cacheBlock < 0) { + // We expect all blocks to be cached! Fail! + return NS_ERROR_FAILURE; + } + int64_t offset = cacheBlock*BLOCK_SIZE + offsetInStreamBlock; + MOZ_ASSERT(size >= 0 && size <= INT32_MAX, "Size out of range."); + nsresult rv = gMediaCache->ReadCacheFile(offset, aBuffer + count, int32_t(size), &bytes); + if (NS_FAILED(rv)) { + return rv; + } + } + streamOffset += bytes; + count += bytes; + } + + return NS_OK; +} + +nsresult +MediaCacheStream::Init() +{ + NS_ASSERTION(NS_IsMainThread(), "Only call on main thread"); + + if (mInitialized) + return NS_OK; + + InitMediaCache(); + if (!gMediaCache) + return NS_ERROR_FAILURE; + gMediaCache->OpenStream(this); + mInitialized = true; + return NS_OK; +} + +nsresult +MediaCacheStream::InitAsClone(MediaCacheStream* aOriginal) +{ + if (!aOriginal->IsAvailableForSharing()) + return NS_ERROR_FAILURE; + + if (mInitialized) + return NS_OK; + + nsresult rv = Init(); + if (NS_FAILED(rv)) + return rv; + mResourceID = aOriginal->mResourceID; + + // Grab cache blocks from aOriginal as readahead blocks for our stream + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + + mPrincipal = aOriginal->mPrincipal; + mStreamLength = aOriginal->mStreamLength; + mIsTransportSeekable = aOriginal->mIsTransportSeekable; + + // Cloned streams are initially suspended, since there is no channel open + // initially for a clone. + mCacheSuspended = true; + mChannelEnded = true; + + if (aOriginal->mDidNotifyDataEnded) { + mNotifyDataEndedStatus = aOriginal->mNotifyDataEndedStatus; + mDidNotifyDataEnded = true; + mClient->CacheClientNotifyDataEnded(mNotifyDataEndedStatus); + } + + for (uint32_t i = 0; i < aOriginal->mBlocks.Length(); ++i) { + int32_t cacheBlockIndex = aOriginal->mBlocks[i]; + if (cacheBlockIndex < 0) + continue; + + while (i >= mBlocks.Length()) { + mBlocks.AppendElement(-1); + } + // Every block is a readahead block for the clone because the clone's initial + // stream offset is zero + gMediaCache->AddBlockOwnerAsReadahead(cacheBlockIndex, this, i); + } + + return NS_OK; +} + +nsresult MediaCacheStream::GetCachedRanges(MediaByteRangeSet& aRanges) +{ + // Take the monitor, so that the cached data ranges can't grow while we're + // trying to loop over them. + ReentrantMonitorAutoEnter mon(gMediaCache->GetReentrantMonitor()); + + // We must be pinned while running this, otherwise the cached data ranges may + // shrink while we're trying to loop over them. + NS_ASSERTION(mPinCount > 0, "Must be pinned"); + + int64_t startOffset = GetNextCachedDataInternal(0); + while (startOffset >= 0) { + int64_t endOffset = GetCachedDataEndInternal(startOffset); + NS_ASSERTION(startOffset < endOffset, "Buffered range must end after its start"); + // Bytes [startOffset..endOffset] are cached. + aRanges += MediaByteRange(startOffset, endOffset); + startOffset = GetNextCachedDataInternal(endOffset); + NS_ASSERTION(startOffset == -1 || startOffset > endOffset, + "Must have advanced to start of next range, or hit end of stream"); + } + return NS_OK; +} + +} // namespace mozilla + |