/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ // Original author: ekr@rtfm.com #include "MediaPipeline.h" #ifndef USE_FAKE_MEDIA_STREAMS #include "MediaStreamGraphImpl.h" #endif #include #include "nspr.h" #include "srtp.h" #if !defined(MOZILLA_EXTERNAL_LINKAGE) #include "VideoSegment.h" #include "Layers.h" #include "LayersLogging.h" #include "ImageTypes.h" #include "ImageContainer.h" #include "DOMMediaStream.h" #include "MediaStreamTrack.h" #include "MediaStreamListener.h" #include "MediaStreamVideoSink.h" #include "VideoUtils.h" #include "VideoStreamTrack.h" #ifdef WEBRTC_GONK #include "GrallocImages.h" #include "mozilla/layers/GrallocTextureClient.h" #endif #endif #include "nsError.h" #include "AudioSegment.h" #include "MediaSegment.h" #include "MediaPipelineFilter.h" #include "databuffer.h" #include "transportflow.h" #include "transportlayer.h" #include "transportlayerdtls.h" #include "transportlayerice.h" #include "runnable_utils.h" #include "libyuv/convert.h" #include "mozilla/SharedThreadPool.h" #if !defined(MOZILLA_EXTERNAL_LINKAGE) #include "mozilla/PeerIdentity.h" #include "mozilla/TaskQueue.h" #endif #include "mozilla/gfx/Point.h" #include "mozilla/gfx/Types.h" #include "mozilla/UniquePtr.h" #include "mozilla/UniquePtrExtensions.h" #include "mozilla/Sprintf.h" #include "webrtc/common_types.h" #include "webrtc/common_video/interface/native_handle.h" #include "webrtc/common_video/libyuv/include/webrtc_libyuv.h" #include "webrtc/video_engine/include/vie_errors.h" #include "logging.h" // Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at // 48KHz) #define AUDIO_SAMPLE_BUFFER_MAX 480*2*2 static_assert((WEBRTC_DEFAULT_SAMPLE_RATE/100)*sizeof(uint16_t) * 2 <= AUDIO_SAMPLE_BUFFER_MAX, "AUDIO_SAMPLE_BUFFER_MAX is not large enough"); using namespace mozilla; using namespace mozilla::dom; using namespace mozilla::gfx; using namespace mozilla::layers; // Logging context MOZ_MTLOG_MODULE("mediapipeline") namespace mozilla { #if !defined(MOZILLA_EXTERNAL_LINKAGE) class VideoConverterListener { public: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(VideoConverterListener) virtual void OnVideoFrameConverted(unsigned char* aVideoFrame, unsigned int aVideoFrameLength, unsigned short aWidth, unsigned short aHeight, VideoType aVideoType, uint64_t aCaptureTime) = 0; virtual void OnVideoFrameConverted(webrtc::I420VideoFrame& aVideoFrame) = 0; protected: virtual ~VideoConverterListener() {} }; // I420 buffer size macros #define YSIZE(x,y) ((x)*(y)) #define CRSIZE(x,y) ((((x)+1) >> 1) * (((y)+1) >> 1)) #define I420SIZE(x,y) (YSIZE((x),(y)) + 2 * CRSIZE((x),(y))) // An async video frame format converter. // // Input is typically a MediaStream(Track)Listener driven by MediaStreamGraph. // // We keep track of the size of the TaskQueue so we can drop frames if // conversion is taking too long. // // Output is passed through to all added VideoConverterListeners on a TaskQueue // thread whenever a frame is converted. class VideoFrameConverter { public: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(VideoFrameConverter) VideoFrameConverter() : mLength(0) , last_img_(-1) // -1 is not a guaranteed invalid serial. See bug 1262134. , disabled_frame_sent_(false) #ifdef DEBUG , mThrottleCount(0) , mThrottleRecord(0) #endif , mMutex("VideoFrameConverter") { MOZ_COUNT_CTOR(VideoFrameConverter); RefPtr pool = SharedThreadPool::Get(NS_LITERAL_CSTRING("VideoFrameConverter")); mTaskQueue = MakeAndAddRef(pool.forget()); } void QueueVideoChunk(VideoChunk& aChunk, bool aForceBlack) { if (aChunk.IsNull()) { return; } // We get passed duplicate frames every ~10ms even with no frame change. int32_t serial = aChunk.mFrame.GetImage()->GetSerial(); if (serial == last_img_) { return; } last_img_ = serial; // A throttling limit of 1 allows us to convert 2 frames concurrently. // It's short enough to not build up too significant a delay, while // giving us a margin to not cause some machines to drop every other frame. const int32_t queueThrottlingLimit = 1; if (mLength > queueThrottlingLimit) { MOZ_MTLOG(ML_DEBUG, "VideoFrameConverter " << this << " queue is full." << " Throttling by throwing away a frame."); #ifdef DEBUG ++mThrottleCount; mThrottleRecord = std::max(mThrottleCount, mThrottleRecord); #endif return; } #ifdef DEBUG if (mThrottleCount > 0) { auto level = ML_DEBUG; if (mThrottleCount > 5) { // Log at a higher level when we have large drops. level = ML_INFO; } MOZ_MTLOG(level, "VideoFrameConverter " << this << " stopped" << " throttling after throwing away " << mThrottleCount << " frames. Longest throttle so far was " << mThrottleRecord << " frames."); mThrottleCount = 0; } #endif bool forceBlack = aForceBlack || aChunk.mFrame.GetForceBlack(); if (forceBlack) { // Reset the last-img check. // -1 is not a guaranteed invalid serial. See bug 1262134. last_img_ = -1; if (disabled_frame_sent_) { // After disabling we just pass one black frame to the encoder. // Allocating and setting it to black steals some performance // that can be avoided. We don't handle resolution changes while // disabled for now. return; } disabled_frame_sent_ = true; } else { disabled_frame_sent_ = false; } ++mLength; // Atomic nsCOMPtr runnable = NewRunnableMethod, bool>( this, &VideoFrameConverter::ProcessVideoFrame, aChunk.mFrame.GetImage(), forceBlack); mTaskQueue->Dispatch(runnable.forget()); } void AddListener(VideoConverterListener* aListener) { MutexAutoLock lock(mMutex); MOZ_ASSERT(!mListeners.Contains(aListener)); mListeners.AppendElement(aListener); } bool RemoveListener(VideoConverterListener* aListener) { MutexAutoLock lock(mMutex); return mListeners.RemoveElement(aListener); } void Shutdown() { mTaskQueue->BeginShutdown(); mTaskQueue->AwaitShutdownAndIdle(); } protected: virtual ~VideoFrameConverter() { MOZ_COUNT_DTOR(VideoFrameConverter); } void VideoFrameConverted(unsigned char* aVideoFrame, unsigned int aVideoFrameLength, unsigned short aWidth, unsigned short aHeight, VideoType aVideoType, uint64_t aCaptureTime) { MutexAutoLock lock(mMutex); for (RefPtr& listener : mListeners) { listener->OnVideoFrameConverted(aVideoFrame, aVideoFrameLength, aWidth, aHeight, aVideoType, aCaptureTime); } } void VideoFrameConverted(webrtc::I420VideoFrame& aVideoFrame) { MutexAutoLock lock(mMutex); for (RefPtr& listener : mListeners) { listener->OnVideoFrameConverted(aVideoFrame); } } void ProcessVideoFrame(Image* aImage, bool aForceBlack) { --mLength; // Atomic MOZ_ASSERT(mLength >= 0); if (aForceBlack) { IntSize size = aImage->GetSize(); uint32_t yPlaneLen = YSIZE(size.width, size.height); uint32_t cbcrPlaneLen = 2 * CRSIZE(size.width, size.height); uint32_t length = yPlaneLen + cbcrPlaneLen; // Send a black image. auto pixelData = MakeUniqueFallible(length); if (pixelData) { // YCrCb black = 0x10 0x80 0x80 memset(pixelData.get(), 0x10, yPlaneLen); // Fill Cb/Cr planes memset(pixelData.get() + yPlaneLen, 0x80, cbcrPlaneLen); MOZ_MTLOG(ML_DEBUG, "Sending a black video frame"); VideoFrameConverted(pixelData.get(), length, size.width, size.height, mozilla::kVideoI420, 0); } return; } ImageFormat format = aImage->GetFormat(); #ifdef WEBRTC_GONK GrallocImage* nativeImage = aImage->AsGrallocImage(); if (nativeImage) { android::sp graphicBuffer = nativeImage->GetGraphicBuffer(); int pixelFormat = graphicBuffer->getPixelFormat(); /* PixelFormat is an enum == int */ mozilla::VideoType destFormat; switch (pixelFormat) { case HAL_PIXEL_FORMAT_YV12: // all android must support this destFormat = mozilla::kVideoYV12; break; case GrallocImage::HAL_PIXEL_FORMAT_YCbCr_420_SP: destFormat = mozilla::kVideoNV21; break; case GrallocImage::HAL_PIXEL_FORMAT_YCbCr_420_P: destFormat = mozilla::kVideoI420; break; default: // XXX Bug NNNNNNN // use http://dxr.mozilla.org/mozilla-central/source/content/media/omx/I420ColorConverterHelper.cpp // to convert unknown types (OEM-specific) to I420 MOZ_MTLOG(ML_ERROR, "Un-handled GRALLOC buffer type:" << pixelFormat); MOZ_CRASH(); } void *basePtr; graphicBuffer->lock(android::GraphicBuffer::USAGE_SW_READ_MASK, &basePtr); uint32_t width = graphicBuffer->getWidth(); uint32_t height = graphicBuffer->getHeight(); // XXX gralloc buffer's width and stride could be different depends on implementations. if (destFormat != mozilla::kVideoI420) { unsigned char *video_frame = static_cast(basePtr); webrtc::I420VideoFrame i420_frame; int stride_y = width; int stride_uv = (width + 1) / 2; int target_width = width; int target_height = height; if (i420_frame.CreateEmptyFrame(target_width, abs(target_height), stride_y, stride_uv, stride_uv) < 0) { MOZ_ASSERT(false, "Can't allocate empty i420frame"); return; } webrtc::VideoType commonVideoType = webrtc::RawVideoTypeToCommonVideoVideoType( static_cast((int)destFormat)); if (ConvertToI420(commonVideoType, video_frame, 0, 0, width, height, I420SIZE(width, height), webrtc::kVideoRotation_0, &i420_frame)) { MOZ_ASSERT(false, "Can't convert video type for sending to I420"); return; } i420_frame.set_ntp_time_ms(0); VideoFrameConverted(i420_frame); } else { VideoFrameConverted(static_cast(basePtr), I420SIZE(width, height), width, height, destFormat, 0); } graphicBuffer->unlock(); return; } else #endif if (format == ImageFormat::PLANAR_YCBCR) { // Cast away constness b/c some of the accessors are non-const PlanarYCbCrImage* yuv = const_cast( static_cast(aImage)); const PlanarYCbCrData *data = yuv->GetData(); if (data) { uint8_t *y = data->mYChannel; uint8_t *cb = data->mCbChannel; uint8_t *cr = data->mCrChannel; int32_t yStride = data->mYStride; int32_t cbCrStride = data->mCbCrStride; uint32_t width = yuv->GetSize().width; uint32_t height = yuv->GetSize().height; webrtc::I420VideoFrame i420_frame; int rv = i420_frame.CreateFrame(y, cb, cr, width, height, yStride, cbCrStride, cbCrStride, webrtc::kVideoRotation_0); if (rv != 0) { NS_ERROR("Creating an I420 frame failed"); return; } MOZ_MTLOG(ML_DEBUG, "Sending an I420 video frame"); VideoFrameConverted(i420_frame); return; } } RefPtr surf = aImage->GetAsSourceSurface(); if (!surf) { MOZ_MTLOG(ML_ERROR, "Getting surface from " << Stringify(format) << " image failed"); return; } RefPtr data = surf->GetDataSurface(); if (!data) { MOZ_MTLOG(ML_ERROR, "Getting data surface from " << Stringify(format) << " image with " << Stringify(surf->GetType()) << "(" << Stringify(surf->GetFormat()) << ") surface failed"); return; } IntSize size = aImage->GetSize(); int half_width = (size.width + 1) >> 1; int half_height = (size.height + 1) >> 1; int c_size = half_width * half_height; int buffer_size = YSIZE(size.width, size.height) + 2 * c_size; auto yuv_scoped = MakeUniqueFallible(buffer_size); if (!yuv_scoped) { return; } uint8* yuv = yuv_scoped.get(); DataSourceSurface::ScopedMap map(data, DataSourceSurface::READ); if (!map.IsMapped()) { MOZ_MTLOG(ML_ERROR, "Reading DataSourceSurface from " << Stringify(format) << " image with " << Stringify(surf->GetType()) << "(" << Stringify(surf->GetFormat()) << ") surface failed"); return; } int rv; int cb_offset = YSIZE(size.width, size.height); int cr_offset = cb_offset + c_size; switch (surf->GetFormat()) { case SurfaceFormat::B8G8R8A8: case SurfaceFormat::B8G8R8X8: rv = libyuv::ARGBToI420(static_cast(map.GetData()), map.GetStride(), yuv, size.width, yuv + cb_offset, half_width, yuv + cr_offset, half_width, size.width, size.height); break; case SurfaceFormat::R5G6B5_UINT16: rv = libyuv::RGB565ToI420(static_cast(map.GetData()), map.GetStride(), yuv, size.width, yuv + cb_offset, half_width, yuv + cr_offset, half_width, size.width, size.height); break; default: MOZ_MTLOG(ML_ERROR, "Unsupported RGB video format" << Stringify(surf->GetFormat())); MOZ_ASSERT(PR_FALSE); return; } if (rv != 0) { MOZ_MTLOG(ML_ERROR, Stringify(surf->GetFormat()) << " to I420 conversion failed"); return; } MOZ_MTLOG(ML_DEBUG, "Sending an I420 video frame converted from " << Stringify(surf->GetFormat())); VideoFrameConverted(yuv, buffer_size, size.width, size.height, mozilla::kVideoI420, 0); } Atomic mLength; RefPtr mTaskQueue; // Written and read from the queueing thread (normally MSG). int32_t last_img_; // serial number of last Image bool disabled_frame_sent_; // If a black frame has been sent after disabling. #ifdef DEBUG uint32_t mThrottleCount; uint32_t mThrottleRecord; #endif // mMutex guards the below variables. Mutex mMutex; nsTArray> mListeners; }; #endif // An async inserter for audio data, to avoid running audio codec encoders // on the MSG/input audio thread. Basically just bounces all the audio // data to a single audio processing/input queue. We could if we wanted to // use multiple threads and a TaskQueue. class AudioProxyThread { public: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread) explicit AudioProxyThread(AudioSessionConduit *aConduit) : mConduit(aConduit) { MOZ_ASSERT(mConduit); MOZ_COUNT_CTOR(AudioProxyThread); #if !defined(MOZILLA_EXTERNAL_LINKAGE) // Use only 1 thread; also forces FIFO operation // We could use multiple threads, but that may be dicier with the webrtc.org // code. If so we'd need to use TaskQueues like the videoframe converter RefPtr pool = SharedThreadPool::Get(NS_LITERAL_CSTRING("AudioProxy"), 1); mThread = pool.get(); #else nsCOMPtr thread; if (!NS_WARN_IF(NS_FAILED(NS_NewNamedThread("AudioProxy", getter_AddRefs(thread))))) { mThread = thread; } #endif } // called on mThread void InternalProcessAudioChunk( TrackRate rate, AudioChunk& chunk, bool enabled) { // Convert to interleaved, 16-bits integer audio, with a maximum of two // channels (since the WebRTC.org code below makes the assumption that the // input audio is either mono or stereo). uint32_t outputChannels = chunk.ChannelCount() == 1 ? 1 : 2; const int16_t* samples = nullptr; UniquePtr convertedSamples; // We take advantage of the fact that the common case (microphone directly to // PeerConnection, that is, a normal call), the samples are already 16-bits // mono, so the representation in interleaved and planar is the same, and we // can just use that. if (enabled && outputChannels == 1 && chunk.mBufferFormat == AUDIO_FORMAT_S16) { samples = chunk.ChannelData().Elements()[0]; } else { convertedSamples = MakeUnique(chunk.mDuration * outputChannels); if (!enabled || chunk.mBufferFormat == AUDIO_FORMAT_SILENCE) { PodZero(convertedSamples.get(), chunk.mDuration * outputChannels); } else if (chunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) { DownmixAndInterleave(chunk.ChannelData(), chunk.mDuration, chunk.mVolume, outputChannels, convertedSamples.get()); } else if (chunk.mBufferFormat == AUDIO_FORMAT_S16) { DownmixAndInterleave(chunk.ChannelData(), chunk.mDuration, chunk.mVolume, outputChannels, convertedSamples.get()); } samples = convertedSamples.get(); } MOZ_ASSERT(!(rate%100)); // rate should be a multiple of 100 // Check if the rate or the number of channels has changed since the last time // we came through. I realize it may be overkill to check if the rate has // changed, but I believe it is possible (e.g. if we change sources) and it // costs us very little to handle this case. uint32_t audio_10ms = rate / 100; if (!packetizer_ || packetizer_->PacketSize() != audio_10ms || packetizer_->Channels() != outputChannels) { // It's ok to drop the audio still in the packetizer here. packetizer_ = new AudioPacketizer(audio_10ms, outputChannels); } packetizer_->Input(samples, chunk.mDuration); while (packetizer_->PacketsAvailable()) { uint32_t samplesPerPacket = packetizer_->PacketSize() * packetizer_->Channels(); // We know that webrtc.org's code going to copy the samples down the line, // so we can just use a stack buffer here instead of malloc-ing. int16_t packet[AUDIO_SAMPLE_BUFFER_MAX]; packetizer_->Output(packet); mConduit->SendAudioFrame(packet, samplesPerPacket, rate, 0); } } void QueueAudioChunk(TrackRate rate, AudioChunk& chunk, bool enabled) { RUN_ON_THREAD(mThread, WrapRunnable(RefPtr(this), &AudioProxyThread::InternalProcessAudioChunk, rate, chunk, enabled), NS_DISPATCH_NORMAL); } protected: virtual ~AudioProxyThread() { // Conduits must be released on MainThread, and we might have the last reference // We don't need to worry about runnables still trying to access the conduit, since // the runnables hold a ref to AudioProxyThread. NS_ReleaseOnMainThread(mConduit.forget()); MOZ_COUNT_DTOR(AudioProxyThread); } RefPtr mConduit; nsCOMPtr mThread; // Only accessed on mThread nsAutoPtr> packetizer_; }; static char kDTLSExporterLabel[] = "EXTRACTOR-dtls_srtp"; MediaPipeline::MediaPipeline(const std::string& pc, Direction direction, nsCOMPtr main_thread, nsCOMPtr sts_thread, const std::string& track_id, int level, RefPtr conduit, RefPtr rtp_transport, RefPtr rtcp_transport, nsAutoPtr filter) : direction_(direction), track_id_(track_id), level_(level), conduit_(conduit), rtp_(rtp_transport, rtcp_transport ? RTP : MUX), rtcp_(rtcp_transport ? rtcp_transport : rtp_transport, rtcp_transport ? RTCP : MUX), main_thread_(main_thread), sts_thread_(sts_thread), rtp_packets_sent_(0), rtcp_packets_sent_(0), rtp_packets_received_(0), rtcp_packets_received_(0), rtp_bytes_sent_(0), rtp_bytes_received_(0), pc_(pc), description_(), filter_(filter), rtp_parser_(webrtc::RtpHeaderParser::Create()) { // To indicate rtcp-mux rtcp_transport should be nullptr. // Therefore it's an error to send in the same flow for // both rtp and rtcp. MOZ_ASSERT(rtp_transport != rtcp_transport); // PipelineTransport() will access this->sts_thread_; moved here for safety transport_ = new PipelineTransport(this); } MediaPipeline::~MediaPipeline() { ASSERT_ON_THREAD(main_thread_); MOZ_MTLOG(ML_INFO, "Destroying MediaPipeline: " << description_); } nsresult MediaPipeline::Init() { ASSERT_ON_THREAD(main_thread_); if (direction_ == RECEIVE) { conduit_->SetReceiverTransport(transport_); } else { conduit_->SetTransmitterTransport(transport_); } RUN_ON_THREAD(sts_thread_, WrapRunnable( RefPtr(this), &MediaPipeline::Init_s), NS_DISPATCH_NORMAL); return NS_OK; } nsresult MediaPipeline::Init_s() { ASSERT_ON_THREAD(sts_thread_); return AttachTransport_s(); } // Disconnect us from the transport so that we can cleanly destruct the // pipeline on the main thread. ShutdownMedia_m() must have already been // called void MediaPipeline::DetachTransport_s() { ASSERT_ON_THREAD(sts_thread_); disconnect_all(); transport_->Detach(); rtp_.Detach(); rtcp_.Detach(); } nsresult MediaPipeline::AttachTransport_s() { ASSERT_ON_THREAD(sts_thread_); nsresult res; MOZ_ASSERT(rtp_.transport_); MOZ_ASSERT(rtcp_.transport_); res = ConnectTransport_s(rtp_); if (NS_FAILED(res)) { return res; } if (rtcp_.transport_ != rtp_.transport_) { res = ConnectTransport_s(rtcp_); if (NS_FAILED(res)) { return res; } } transport_->Attach(this); return NS_OK; } void MediaPipeline::UpdateTransport_m(int level, RefPtr rtp_transport, RefPtr rtcp_transport, nsAutoPtr filter) { RUN_ON_THREAD(sts_thread_, WrapRunnable( this, &MediaPipeline::UpdateTransport_s, level, rtp_transport, rtcp_transport, filter), NS_DISPATCH_NORMAL); } void MediaPipeline::UpdateTransport_s(int level, RefPtr rtp_transport, RefPtr rtcp_transport, nsAutoPtr filter) { bool rtcp_mux = false; if (!rtcp_transport) { rtcp_transport = rtp_transport; rtcp_mux = true; } if ((rtp_transport != rtp_.transport_) || (rtcp_transport != rtcp_.transport_)) { DetachTransport_s(); rtp_ = TransportInfo(rtp_transport, rtcp_mux ? MUX : RTP); rtcp_ = TransportInfo(rtcp_transport, rtcp_mux ? MUX : RTCP); AttachTransport_s(); } level_ = level; if (filter_ && filter) { // Use the new filter, but don't forget any remote SSRCs that we've learned // by receiving traffic. filter_->Update(*filter); } else { filter_ = filter; } } void MediaPipeline::SelectSsrc_m(size_t ssrc_index) { RUN_ON_THREAD(sts_thread_, WrapRunnable( this, &MediaPipeline::SelectSsrc_s, ssrc_index), NS_DISPATCH_NORMAL); } void MediaPipeline::SelectSsrc_s(size_t ssrc_index) { filter_ = new MediaPipelineFilter; if (ssrc_index < ssrcs_received_.size()) { filter_->AddRemoteSSRC(ssrcs_received_[ssrc_index]); } else { MOZ_MTLOG(ML_WARNING, "SelectSsrc called with " << ssrc_index << " but we " << "have only seen " << ssrcs_received_.size() << " ssrcs"); } } void MediaPipeline::StateChange(TransportFlow *flow, TransportLayer::State state) { TransportInfo* info = GetTransportInfo_s(flow); MOZ_ASSERT(info); if (state == TransportLayer::TS_OPEN) { MOZ_MTLOG(ML_INFO, "Flow is ready"); TransportReady_s(*info); } else if (state == TransportLayer::TS_CLOSED || state == TransportLayer::TS_ERROR) { TransportFailed_s(*info); } } static bool MakeRtpTypeToStringArray(const char** array) { static const char* RTP_str = "RTP"; static const char* RTCP_str = "RTCP"; static const char* MUX_str = "RTP/RTCP mux"; array[MediaPipeline::RTP] = RTP_str; array[MediaPipeline::RTCP] = RTCP_str; array[MediaPipeline::MUX] = MUX_str; return true; } static const char* ToString(MediaPipeline::RtpType type) { static const char* array[(int)MediaPipeline::MAX_RTP_TYPE] = {nullptr}; // Dummy variable to cause init to happen only on first call static bool dummy = MakeRtpTypeToStringArray(array); (void)dummy; return array[type]; } nsresult MediaPipeline::TransportReady_s(TransportInfo &info) { MOZ_ASSERT(!description_.empty()); // TODO(ekr@rtfm.com): implement some kind of notification on // failure. bug 852665. if (info.state_ != MP_CONNECTING) { MOZ_MTLOG(ML_ERROR, "Transport ready for flow in wrong state:" << description_ << ": " << ToString(info.type_)); return NS_ERROR_FAILURE; } MOZ_MTLOG(ML_INFO, "Transport ready for pipeline " << static_cast(this) << " flow " << description_ << ": " << ToString(info.type_)); // TODO(bcampen@mozilla.com): Should we disconnect from the flow on failure? nsresult res; // Now instantiate the SRTP objects TransportLayerDtls *dtls = static_cast( info.transport_->GetLayer(TransportLayerDtls::ID())); MOZ_ASSERT(dtls); // DTLS is mandatory uint16_t cipher_suite; res = dtls->GetSrtpCipher(&cipher_suite); if (NS_FAILED(res)) { MOZ_MTLOG(ML_ERROR, "Failed to negotiate DTLS-SRTP. This is an error"); info.state_ = MP_CLOSED; UpdateRtcpMuxState(info); return res; } // SRTP Key Exporter as per RFC 5764 S 4.2 unsigned char srtp_block[SRTP_TOTAL_KEY_LENGTH * 2]; res = dtls->ExportKeyingMaterial(kDTLSExporterLabel, false, "", srtp_block, sizeof(srtp_block)); if (NS_FAILED(res)) { MOZ_MTLOG(ML_ERROR, "Failed to compute DTLS-SRTP keys. This is an error"); info.state_ = MP_CLOSED; UpdateRtcpMuxState(info); MOZ_CRASH(); // TODO: Remove once we have enough field experience to // know it doesn't happen. bug 798797. Note that the // code after this never executes. return res; } // Slice and dice as per RFC 5764 S 4.2 unsigned char client_write_key[SRTP_TOTAL_KEY_LENGTH]; unsigned char server_write_key[SRTP_TOTAL_KEY_LENGTH]; int offset = 0; memcpy(client_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH); offset += SRTP_MASTER_KEY_LENGTH; memcpy(server_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH); offset += SRTP_MASTER_KEY_LENGTH; memcpy(client_write_key + SRTP_MASTER_KEY_LENGTH, srtp_block + offset, SRTP_MASTER_SALT_LENGTH); offset += SRTP_MASTER_SALT_LENGTH; memcpy(server_write_key + SRTP_MASTER_KEY_LENGTH, srtp_block + offset, SRTP_MASTER_SALT_LENGTH); offset += SRTP_MASTER_SALT_LENGTH; MOZ_ASSERT(offset == sizeof(srtp_block)); unsigned char *write_key; unsigned char *read_key; if (dtls->role() == TransportLayerDtls::CLIENT) { write_key = client_write_key; read_key = server_write_key; } else { write_key = server_write_key; read_key = client_write_key; } MOZ_ASSERT(!info.send_srtp_ && !info.recv_srtp_); info.send_srtp_ = SrtpFlow::Create(cipher_suite, false, write_key, SRTP_TOTAL_KEY_LENGTH); info.recv_srtp_ = SrtpFlow::Create(cipher_suite, true, read_key, SRTP_TOTAL_KEY_LENGTH); if (!info.send_srtp_ || !info.recv_srtp_) { MOZ_MTLOG(ML_ERROR, "Couldn't create SRTP flow for " << ToString(info.type_)); info.state_ = MP_CLOSED; UpdateRtcpMuxState(info); return NS_ERROR_FAILURE; } MOZ_MTLOG(ML_INFO, "Listening for " << ToString(info.type_) << " packets received on " << static_cast(dtls->downward())); switch (info.type_) { case RTP: dtls->downward()->SignalPacketReceived.connect( this, &MediaPipeline::RtpPacketReceived); break; case RTCP: dtls->downward()->SignalPacketReceived.connect( this, &MediaPipeline::RtcpPacketReceived); break; case MUX: dtls->downward()->SignalPacketReceived.connect( this, &MediaPipeline::PacketReceived); break; default: MOZ_CRASH(); } info.state_ = MP_OPEN; UpdateRtcpMuxState(info); return NS_OK; } nsresult MediaPipeline::TransportFailed_s(TransportInfo &info) { ASSERT_ON_THREAD(sts_thread_); info.state_ = MP_CLOSED; UpdateRtcpMuxState(info); MOZ_MTLOG(ML_INFO, "Transport closed for flow " << ToString(info.type_)); NS_WARNING( "MediaPipeline Transport failed. This is not properly cleaned up yet"); // TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the // connection was good and now it is bad. // TODO(ekr@rtfm.com): Report up so that the PC knows we // have experienced an error. return NS_OK; } void MediaPipeline::UpdateRtcpMuxState(TransportInfo &info) { if (info.type_ == MUX) { if (info.transport_ == rtcp_.transport_) { rtcp_.state_ = info.state_; if (!rtcp_.send_srtp_) { rtcp_.send_srtp_ = info.send_srtp_; rtcp_.recv_srtp_ = info.recv_srtp_; } } } } nsresult MediaPipeline::SendPacket(TransportFlow *flow, const void *data, int len) { ASSERT_ON_THREAD(sts_thread_); // Note that we bypass the DTLS layer here TransportLayerDtls *dtls = static_cast( flow->GetLayer(TransportLayerDtls::ID())); MOZ_ASSERT(dtls); TransportResult res = dtls->downward()-> SendPacket(static_cast(data), len); if (res != len) { // Ignore blocking indications if (res == TE_WOULDBLOCK) return NS_OK; MOZ_MTLOG(ML_ERROR, "Failed write on stream " << description_); return NS_BASE_STREAM_CLOSED; } return NS_OK; } void MediaPipeline::increment_rtp_packets_sent(int32_t bytes) { ++rtp_packets_sent_; rtp_bytes_sent_ += bytes; if (!(rtp_packets_sent_ % 100)) { MOZ_MTLOG(ML_INFO, "RTP sent packet count for " << description_ << " Pipeline " << static_cast(this) << " Flow : " << static_cast(rtp_.transport_) << ": " << rtp_packets_sent_ << " (" << rtp_bytes_sent_ << " bytes)"); } } void MediaPipeline::increment_rtcp_packets_sent() { ++rtcp_packets_sent_; if (!(rtcp_packets_sent_ % 100)) { MOZ_MTLOG(ML_INFO, "RTCP sent packet count for " << description_ << " Pipeline " << static_cast(this) << " Flow : " << static_cast(rtcp_.transport_) << ": " << rtcp_packets_sent_); } } void MediaPipeline::increment_rtp_packets_received(int32_t bytes) { ++rtp_packets_received_; rtp_bytes_received_ += bytes; if (!(rtp_packets_received_ % 100)) { MOZ_MTLOG(ML_INFO, "RTP received packet count for " << description_ << " Pipeline " << static_cast(this) << " Flow : " << static_cast(rtp_.transport_) << ": " << rtp_packets_received_ << " (" << rtp_bytes_received_ << " bytes)"); } } void MediaPipeline::increment_rtcp_packets_received() { ++rtcp_packets_received_; if (!(rtcp_packets_received_ % 100)) { MOZ_MTLOG(ML_INFO, "RTCP received packet count for " << description_ << " Pipeline " << static_cast(this) << " Flow : " << static_cast(rtcp_.transport_) << ": " << rtcp_packets_received_); } } void MediaPipeline::RtpPacketReceived(TransportLayer *layer, const unsigned char *data, size_t len) { if (!transport_->pipeline()) { MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport disconnected"); return; } if (!conduit_) { MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; media disconnected"); return; } if (rtp_.state_ != MP_OPEN) { MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; pipeline not open"); return; } if (rtp_.transport_->state() != TransportLayer::TS_OPEN) { MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport not open"); return; } // This should never happen. MOZ_ASSERT(rtp_.recv_srtp_); if (direction_ == TRANSMIT) { return; } if (!len) { return; } // Filter out everything but RTP/RTCP if (data[0] < 128 || data[0] > 191) { return; } webrtc::RTPHeader header; if (!rtp_parser_->Parse(data, len, &header)) { return; } if (std::find(ssrcs_received_.begin(), ssrcs_received_.end(), header.ssrc) == ssrcs_received_.end()) { ssrcs_received_.push_back(header.ssrc); } if (filter_ && !filter_->Filter(header)) { return; } // Make a copy rather than cast away constness auto inner_data = MakeUnique(len); memcpy(inner_data.get(), data, len); int out_len = 0; nsresult res = rtp_.recv_srtp_->UnprotectRtp(inner_data.get(), len, len, &out_len); if (!NS_SUCCEEDED(res)) { char tmp[16]; SprintfLiteral(tmp, "%.2x %.2x %.2x %.2x", inner_data[0], inner_data[1], inner_data[2], inner_data[3]); MOZ_MTLOG(ML_NOTICE, "Error unprotecting RTP in " << description_ << "len= " << len << "[" << tmp << "...]"); return; } MOZ_MTLOG(ML_DEBUG, description_ << " received RTP packet."); increment_rtp_packets_received(out_len); (void)conduit_->ReceivedRTPPacket(inner_data.get(), out_len); // Ignore error codes } void MediaPipeline::RtcpPacketReceived(TransportLayer *layer, const unsigned char *data, size_t len) { if (!transport_->pipeline()) { MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; transport disconnected"); return; } if (!conduit_) { MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; media disconnected"); return; } if (rtcp_.state_ != MP_OPEN) { MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; pipeline not open"); return; } if (rtcp_.transport_->state() != TransportLayer::TS_OPEN) { MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport not open"); return; } if (!len) { return; } // Filter out everything but RTP/RTCP if (data[0] < 128 || data[0] > 191) { return; } // We do not filter RTCP for send pipelines, since the webrtc.org code for // senders already has logic to ignore RRs that do not apply. // TODO bug 1279153: remove SR check for reduced size RTCP if (filter_ && direction_ == RECEIVE) { if (!filter_->FilterSenderReport(data, len)) { MOZ_MTLOG(ML_NOTICE, "Dropping incoming RTCP packet; filtered out"); return; } } // Make a copy rather than cast away constness auto inner_data = MakeUnique(len); memcpy(inner_data.get(), data, len); int out_len; nsresult res = rtcp_.recv_srtp_->UnprotectRtcp(inner_data.get(), len, len, &out_len); if (!NS_SUCCEEDED(res)) return; MOZ_MTLOG(ML_DEBUG, description_ << " received RTCP packet."); increment_rtcp_packets_received(); MOZ_ASSERT(rtcp_.recv_srtp_); // This should never happen (void)conduit_->ReceivedRTCPPacket(inner_data.get(), out_len); // Ignore error codes } bool MediaPipeline::IsRtp(const unsigned char *data, size_t len) { if (len < 2) return false; // Check if this is a RTCP packet. Logic based on the types listed in // media/webrtc/trunk/src/modules/rtp_rtcp/source/rtp_utility.cc // Anything outside this range is RTP. if ((data[1] < 192) || (data[1] > 207)) return true; if (data[1] == 192) // FIR return false; if (data[1] == 193) // NACK, but could also be RTP. This makes us sad return true; // but it's how webrtc.org behaves. if (data[1] == 194) return true; if (data[1] == 195) // IJ. return false; if ((data[1] > 195) && (data[1] < 200)) // the > 195 is redundant return true; if ((data[1] >= 200) && (data[1] <= 207)) // SR, RR, SDES, BYE, return false; // APP, RTPFB, PSFB, XR MOZ_ASSERT(false); // Not reached, belt and suspenders. return true; } void MediaPipeline::PacketReceived(TransportLayer *layer, const unsigned char *data, size_t len) { if (!transport_->pipeline()) { MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; transport disconnected"); return; } if (IsRtp(data, len)) { RtpPacketReceived(layer, data, len); } else { RtcpPacketReceived(layer, data, len); } } class MediaPipelineTransmit::PipelineListener : public DirectMediaStreamTrackListener { friend class MediaPipelineTransmit; public: explicit PipelineListener(const RefPtr& conduit) : conduit_(conduit), track_id_(TRACK_INVALID), mMutex("MediaPipelineTransmit::PipelineListener"), track_id_external_(TRACK_INVALID), active_(false), enabled_(false), direct_connect_(false) { } ~PipelineListener() { if (!NS_IsMainThread()) { // release conduit on mainthread. Must use forget()! nsresult rv = NS_DispatchToMainThread(new ConduitDeleteEvent(conduit_.forget())); MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main"); if (NS_FAILED(rv)) { MOZ_CRASH(); } } else { conduit_ = nullptr; } #if !defined(MOZILLA_EXTERNAL_LINKAGE) if (converter_) { converter_->Shutdown(); } #endif } // Dispatches setting the internal TrackID to TRACK_INVALID to the media // graph thread to keep it in sync with other MediaStreamGraph operations // like RemoveListener() and AddListener(). The TrackID will be updated on // the next NewData() callback. void UnsetTrackId(MediaStreamGraphImpl* graph); void SetActive(bool active) { active_ = active; } void SetEnabled(bool enabled) { enabled_ = enabled; } // These are needed since nested classes don't have access to any particular // instance of the parent void SetAudioProxy(const RefPtr& proxy) { audio_processing_ = proxy; } #if !defined(MOZILLA_EXTERNAL_LINKAGE) void SetVideoFrameConverter(const RefPtr& converter) { converter_ = converter; } void OnVideoFrameConverted(unsigned char* aVideoFrame, unsigned int aVideoFrameLength, unsigned short aWidth, unsigned short aHeight, VideoType aVideoType, uint64_t aCaptureTime) { MOZ_ASSERT(conduit_->type() == MediaSessionConduit::VIDEO); static_cast(conduit_.get())->SendVideoFrame( aVideoFrame, aVideoFrameLength, aWidth, aHeight, aVideoType, aCaptureTime); } void OnVideoFrameConverted(webrtc::I420VideoFrame& aVideoFrame) { MOZ_ASSERT(conduit_->type() == MediaSessionConduit::VIDEO); static_cast(conduit_.get())->SendVideoFrame(aVideoFrame); } #endif // Implement MediaStreamTrackListener void NotifyQueuedChanges(MediaStreamGraph* aGraph, StreamTime aTrackOffset, const MediaSegment& aQueuedMedia) override; // Implement DirectMediaStreamTrackListener void NotifyRealtimeTrackData(MediaStreamGraph* aGraph, StreamTime aTrackOffset, const MediaSegment& aMedia) override; void NotifyDirectListenerInstalled(InstallationResult aResult) override; void NotifyDirectListenerUninstalled() override; private: void UnsetTrackIdImpl() { MutexAutoLock lock(mMutex); track_id_ = track_id_external_ = TRACK_INVALID; } void NewData(MediaStreamGraph* graph, StreamTime offset, const MediaSegment& media); RefPtr conduit_; RefPtr audio_processing_; #if !defined(MOZILLA_EXTERNAL_LINKAGE) RefPtr converter_; #endif // May be TRACK_INVALID until we see data from the track TrackID track_id_; // this is the current TrackID this listener is attached to Mutex mMutex; // protected by mMutex // May be TRACK_INVALID until we see data from the track TrackID track_id_external_; // this is queried from other threads // active is true if there is a transport to send on mozilla::Atomic active_; // enabled is true if the media access control permits sending // actual content; when false you get black/silence mozilla::Atomic enabled_; // Written and read on the MediaStreamGraph thread bool direct_connect_; }; #if !defined(MOZILLA_EXTERNAL_LINKAGE) // Implements VideoConverterListener for MediaPipeline. // // We pass converted frames on to MediaPipelineTransmit::PipelineListener // where they are further forwarded to VideoConduit. // MediaPipelineTransmit calls Detach() during shutdown to ensure there is // no cyclic dependencies between us and PipelineListener. class MediaPipelineTransmit::VideoFrameFeeder : public VideoConverterListener { public: explicit VideoFrameFeeder(const RefPtr& listener) : listener_(listener), mutex_("VideoFrameFeeder") { MOZ_COUNT_CTOR(VideoFrameFeeder); } void Detach() { MutexAutoLock lock(mutex_); listener_ = nullptr; } void OnVideoFrameConverted(unsigned char* aVideoFrame, unsigned int aVideoFrameLength, unsigned short aWidth, unsigned short aHeight, VideoType aVideoType, uint64_t aCaptureTime) override { MutexAutoLock lock(mutex_); if (!listener_) { return; } listener_->OnVideoFrameConverted(aVideoFrame, aVideoFrameLength, aWidth, aHeight, aVideoType, aCaptureTime); } void OnVideoFrameConverted(webrtc::I420VideoFrame& aVideoFrame) override { MutexAutoLock lock(mutex_); if (!listener_) { return; } listener_->OnVideoFrameConverted(aVideoFrame); } protected: virtual ~VideoFrameFeeder() { MOZ_COUNT_DTOR(VideoFrameFeeder); } RefPtr listener_; Mutex mutex_; }; #endif class MediaPipelineTransmit::PipelineVideoSink : public MediaStreamVideoSink { public: explicit PipelineVideoSink(const RefPtr& conduit, MediaPipelineTransmit::PipelineListener* listener) : conduit_(conduit) , pipelineListener_(listener) { } virtual void SetCurrentFrames(const VideoSegment& aSegment) override; virtual void ClearFrames() override {} private: ~PipelineVideoSink() { // release conduit on mainthread. Must use forget()! nsresult rv = NS_DispatchToMainThread(new ConduitDeleteEvent(conduit_.forget())); MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main"); if (NS_FAILED(rv)) { MOZ_CRASH(); } } RefPtr conduit_; MediaPipelineTransmit::PipelineListener* pipelineListener_; }; MediaPipelineTransmit::MediaPipelineTransmit( const std::string& pc, nsCOMPtr main_thread, nsCOMPtr sts_thread, dom::MediaStreamTrack* domtrack, const std::string& track_id, int level, RefPtr conduit, RefPtr rtp_transport, RefPtr rtcp_transport, nsAutoPtr filter) : MediaPipeline(pc, TRANSMIT, main_thread, sts_thread, track_id, level, conduit, rtp_transport, rtcp_transport, filter), listener_(new PipelineListener(conduit)), video_sink_(new PipelineVideoSink(conduit, listener_)), domtrack_(domtrack) { if (!IsVideo()) { audio_processing_ = MakeAndAddRef(static_cast(conduit.get())); listener_->SetAudioProxy(audio_processing_); } #if !defined(MOZILLA_EXTERNAL_LINKAGE) else { // Video // For video we send frames to an async VideoFrameConverter that calls // back to a VideoFrameFeeder that feeds I420 frames to VideoConduit. feeder_ = MakeAndAddRef(listener_); converter_ = MakeAndAddRef(); converter_->AddListener(feeder_); listener_->SetVideoFrameConverter(converter_); } #endif } MediaPipelineTransmit::~MediaPipelineTransmit() { #if !defined(MOZILLA_EXTERNAL_LINKAGE) if (feeder_) { feeder_->Detach(); } #endif } nsresult MediaPipelineTransmit::Init() { AttachToTrack(track_id_); return MediaPipeline::Init(); } void MediaPipelineTransmit::AttachToTrack(const std::string& track_id) { ASSERT_ON_THREAD(main_thread_); description_ = pc_ + "| "; description_ += conduit_->type() == MediaSessionConduit::AUDIO ? "Transmit audio[" : "Transmit video["; description_ += track_id; description_ += "]"; // TODO(ekr@rtfm.com): Check for errors MOZ_MTLOG(ML_DEBUG, "Attaching pipeline to track " << static_cast(domtrack_) << " conduit type=" << (conduit_->type() == MediaSessionConduit::AUDIO ?"audio":"video")); // Register the Listener directly with the source if we can. // We also register it as a non-direct listener so we fall back to that // if installing the direct listener fails. As a direct listener we get access // to direct unqueued (and not resampled) data. domtrack_->AddDirectListener(listener_); domtrack_->AddListener(listener_); #if !defined(MOZILLA_EXTERNAL_LINKAGE) domtrack_->AddDirectListener(video_sink_); #endif #ifndef MOZILLA_INTERNAL_API // this enables the unit tests that can't fiddle with principals and the like listener_->SetEnabled(true); #endif } bool MediaPipelineTransmit::IsVideo() const { return !!domtrack_->AsVideoStreamTrack(); } #if !defined(MOZILLA_EXTERNAL_LINKAGE) void MediaPipelineTransmit::UpdateSinkIdentity_m(MediaStreamTrack* track, nsIPrincipal* principal, const PeerIdentity* sinkIdentity) { ASSERT_ON_THREAD(main_thread_); if (track != nullptr && track != domtrack_) { // If a track is specified, then it might not be for this pipeline, // since we receive notifications for all tracks on the PC. // nullptr means that the PeerIdentity has changed and shall be applied // to all tracks of the PC. return; } bool enableTrack = principal->Subsumes(domtrack_->GetPrincipal()); if (!enableTrack) { // first try didn't work, but there's a chance that this is still available // if our track is bound to a peerIdentity, and the peer connection (our // sink) is bound to the same identity, then we can enable the track. const PeerIdentity* trackIdentity = domtrack_->GetPeerIdentity(); if (sinkIdentity && trackIdentity) { enableTrack = (*sinkIdentity == *trackIdentity); } } listener_->SetEnabled(enableTrack); } #endif void MediaPipelineTransmit::DetachMedia() { ASSERT_ON_THREAD(main_thread_); if (domtrack_) { domtrack_->RemoveDirectListener(listener_); domtrack_->RemoveListener(listener_); domtrack_->RemoveDirectListener(video_sink_); domtrack_ = nullptr; } // Let the listener be destroyed with the pipeline (or later). } nsresult MediaPipelineTransmit::TransportReady_s(TransportInfo &info) { ASSERT_ON_THREAD(sts_thread_); // Call base ready function. MediaPipeline::TransportReady_s(info); // Should not be set for a transmitter if (&info == &rtp_) { listener_->SetActive(true); } return NS_OK; } nsresult MediaPipelineTransmit::ReplaceTrack(MediaStreamTrack& domtrack) { // MainThread, checked in calls we make #if !defined(MOZILLA_EXTERNAL_LINKAGE) nsString nsTrackId; domtrack.GetId(nsTrackId); std::string track_id(NS_ConvertUTF16toUTF8(nsTrackId).get()); #else std::string track_id = domtrack.GetId(); #endif MOZ_MTLOG(ML_DEBUG, "Reattaching pipeline " << description_ << " to track " << static_cast(&domtrack) << " track " << track_id << " conduit type=" << (conduit_->type() == MediaSessionConduit::AUDIO ?"audio":"video")); DetachMedia(); domtrack_ = &domtrack; // Detach clears it // Unsets the track id after RemoveListener() takes effect. listener_->UnsetTrackId(domtrack_->GraphImpl()); track_id_ = track_id; AttachToTrack(track_id); return NS_OK; } void MediaPipeline::DisconnectTransport_s(TransportInfo &info) { MOZ_ASSERT(info.transport_); ASSERT_ON_THREAD(sts_thread_); info.transport_->SignalStateChange.disconnect(this); // We do this even if we're a transmitter, since we are still possibly // registered to receive RTCP. TransportLayerDtls *dtls = static_cast( info.transport_->GetLayer(TransportLayerDtls::ID())); MOZ_ASSERT(dtls); // DTLS is mandatory MOZ_ASSERT(dtls->downward()); dtls->downward()->SignalPacketReceived.disconnect(this); } nsresult MediaPipeline::ConnectTransport_s(TransportInfo &info) { MOZ_ASSERT(info.transport_); ASSERT_ON_THREAD(sts_thread_); // Look to see if the transport is ready if (info.transport_->state() == TransportLayer::TS_OPEN) { nsresult res = TransportReady_s(info); if (NS_FAILED(res)) { MOZ_MTLOG(ML_ERROR, "Error calling TransportReady(); res=" << static_cast(res) << " in " << __FUNCTION__); return res; } } else if (info.transport_->state() == TransportLayer::TS_ERROR) { MOZ_MTLOG(ML_ERROR, ToString(info.type_) << "transport is already in error state"); TransportFailed_s(info); return NS_ERROR_FAILURE; } info.transport_->SignalStateChange.connect(this, &MediaPipeline::StateChange); return NS_OK; } MediaPipeline::TransportInfo* MediaPipeline::GetTransportInfo_s( TransportFlow *flow) { ASSERT_ON_THREAD(sts_thread_); if (flow == rtp_.transport_) { return &rtp_; } if (flow == rtcp_.transport_) { return &rtcp_; } return nullptr; } nsresult MediaPipeline::PipelineTransport::SendRtpPacket( const void *data, int len) { nsAutoPtr buf(new DataBuffer(static_cast(data), len, len + SRTP_MAX_EXPANSION)); RUN_ON_THREAD(sts_thread_, WrapRunnable( RefPtr(this), &MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s, buf, true), NS_DISPATCH_NORMAL); return NS_OK; } nsresult MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s( nsAutoPtr data, bool is_rtp) { ASSERT_ON_THREAD(sts_thread_); if (!pipeline_) { return NS_OK; // Detached } TransportInfo& transport = is_rtp ? pipeline_->rtp_ : pipeline_->rtcp_; if (!transport.send_srtp_) { MOZ_MTLOG(ML_DEBUG, "Couldn't write RTP/RTCP packet; SRTP not set up yet"); return NS_OK; } MOZ_ASSERT(transport.transport_); NS_ENSURE_TRUE(transport.transport_, NS_ERROR_NULL_POINTER); // libsrtp enciphers in place, so we need a big enough buffer. MOZ_ASSERT(data->capacity() >= data->len() + SRTP_MAX_EXPANSION); int out_len; nsresult res; if (is_rtp) { res = transport.send_srtp_->ProtectRtp(data->data(), data->len(), data->capacity(), &out_len); } else { res = transport.send_srtp_->ProtectRtcp(data->data(), data->len(), data->capacity(), &out_len); } if (!NS_SUCCEEDED(res)) { return res; } // paranoia; don't have uninitialized bytes included in data->len() data->SetLength(out_len); MOZ_MTLOG(ML_DEBUG, pipeline_->description_ << " sending " << (is_rtp ? "RTP" : "RTCP") << " packet"); if (is_rtp) { pipeline_->increment_rtp_packets_sent(out_len); } else { pipeline_->increment_rtcp_packets_sent(); } return pipeline_->SendPacket(transport.transport_, data->data(), out_len); } nsresult MediaPipeline::PipelineTransport::SendRtcpPacket( const void *data, int len) { nsAutoPtr buf(new DataBuffer(static_cast(data), len, len + SRTP_MAX_EXPANSION)); RUN_ON_THREAD(sts_thread_, WrapRunnable( RefPtr(this), &MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s, buf, false), NS_DISPATCH_NORMAL); return NS_OK; } void MediaPipelineTransmit::PipelineListener:: UnsetTrackId(MediaStreamGraphImpl* graph) { #ifndef USE_FAKE_MEDIA_STREAMS class Message : public ControlMessage { public: explicit Message(PipelineListener* listener) : ControlMessage(nullptr), listener_(listener) {} virtual void Run() override { listener_->UnsetTrackIdImpl(); } RefPtr listener_; }; graph->AppendMessage(MakeUnique(this)); #else UnsetTrackIdImpl(); #endif } // Called if we're attached with AddDirectListener() void MediaPipelineTransmit::PipelineListener:: NotifyRealtimeTrackData(MediaStreamGraph* graph, StreamTime offset, const MediaSegment& media) { MOZ_MTLOG(ML_DEBUG, "MediaPipeline::NotifyRealtimeTrackData() listener=" << this << ", offset=" << offset << ", duration=" << media.GetDuration()); NewData(graph, offset, media); } void MediaPipelineTransmit::PipelineListener:: NotifyQueuedChanges(MediaStreamGraph* graph, StreamTime offset, const MediaSegment& queued_media) { MOZ_MTLOG(ML_DEBUG, "MediaPipeline::NotifyQueuedChanges()"); // ignore non-direct data if we're also getting direct data if (!direct_connect_) { NewData(graph, offset, queued_media); } } void MediaPipelineTransmit::PipelineListener:: NotifyDirectListenerInstalled(InstallationResult aResult) { MOZ_MTLOG(ML_INFO, "MediaPipeline::NotifyDirectListenerInstalled() listener= " << this << ", result=" << static_cast(aResult)); direct_connect_ = InstallationResult::SUCCESS == aResult; } void MediaPipelineTransmit::PipelineListener:: NotifyDirectListenerUninstalled() { MOZ_MTLOG(ML_INFO, "MediaPipeline::NotifyDirectListenerUninstalled() listener=" << this); direct_connect_ = false; } void MediaPipelineTransmit::PipelineListener:: NewData(MediaStreamGraph* graph, StreamTime offset, const MediaSegment& media) { if (!active_) { MOZ_MTLOG(ML_DEBUG, "Discarding packets because transport not ready"); return; } if (conduit_->type() != (media.GetType() == MediaSegment::AUDIO ? MediaSessionConduit::AUDIO : MediaSessionConduit::VIDEO)) { MOZ_ASSERT(false, "The media type should always be correct since the " "listener is locked to a specific track"); return; } // TODO(ekr@rtfm.com): For now assume that we have only one // track type and it's destined for us // See bug 784517 if (media.GetType() == MediaSegment::AUDIO) { AudioSegment* audio = const_cast( static_cast(&media)); AudioSegment::ChunkIterator iter(*audio); while(!iter.IsEnded()) { TrackRate rate; #ifdef USE_FAKE_MEDIA_STREAMS rate = Fake_MediaStream::GraphRate(); #else rate = graph->GraphRate(); #endif audio_processing_->QueueAudioChunk(rate, *iter, enabled_); iter.Next(); } } else { // Ignore } } void MediaPipelineTransmit::PipelineVideoSink:: SetCurrentFrames(const VideoSegment& aSegment) { MOZ_ASSERT(pipelineListener_); if (!pipelineListener_->active_) { MOZ_MTLOG(ML_DEBUG, "Discarding packets because transport not ready"); return; } if (conduit_->type() != MediaSessionConduit::VIDEO) { // Ignore data of wrong kind in case we have a muxed stream return; } #if !defined(MOZILLA_EXTERNAL_LINKAGE) VideoSegment* video = const_cast(&aSegment); VideoSegment::ChunkIterator iter(*video); while(!iter.IsEnded()) { pipelineListener_->converter_->QueueVideoChunk(*iter, !pipelineListener_->enabled_); iter.Next(); } #endif } class TrackAddedCallback { public: virtual void TrackAdded(TrackTicks current_ticks) = 0; NS_INLINE_DECL_THREADSAFE_REFCOUNTING(TrackAddedCallback); protected: virtual ~TrackAddedCallback() {} }; class GenericReceiveListener; class GenericReceiveCallback : public TrackAddedCallback { public: explicit GenericReceiveCallback(GenericReceiveListener* listener) : listener_(listener) {} void TrackAdded(TrackTicks time); private: RefPtr listener_; }; // Add a listener on the MSG thread using the MSG command queue static void AddListener(MediaStream* source, MediaStreamListener* listener) { #if !defined(MOZILLA_EXTERNAL_LINKAGE) class Message : public ControlMessage { public: Message(MediaStream* stream, MediaStreamListener* listener) : ControlMessage(stream), listener_(listener) {} virtual void Run() override { mStream->AddListenerImpl(listener_.forget()); } private: RefPtr listener_; }; MOZ_ASSERT(listener); source->GraphImpl()->AppendMessage(MakeUnique(source, listener)); #else source->AddListener(listener); #endif } class GenericReceiveListener : public MediaStreamListener { public: GenericReceiveListener(SourceMediaStream *source, TrackID track_id) : source_(source), track_id_(track_id), played_ticks_(0), principal_handle_(PRINCIPAL_HANDLE_NONE) {} virtual ~GenericReceiveListener() {} void AddSelf() { AddListener(source_, this); } void EndTrack() { source_->EndTrack(track_id_); } #ifndef USE_FAKE_MEDIA_STREAMS // Must be called on the main thread void SetPrincipalHandle_m(const PrincipalHandle& principal_handle) { class Message : public ControlMessage { public: Message(GenericReceiveListener* listener, MediaStream* stream, const PrincipalHandle& principal_handle) : ControlMessage(stream), listener_(listener), principal_handle_(principal_handle) {} void Run() override { listener_->SetPrincipalHandle_msg(principal_handle_); } RefPtr listener_; PrincipalHandle principal_handle_; }; source_->GraphImpl()->AppendMessage(MakeUnique(this, source_, principal_handle)); } // Must be called on the MediaStreamGraph thread void SetPrincipalHandle_msg(const PrincipalHandle& principal_handle) { principal_handle_ = principal_handle; } #endif // USE_FAKE_MEDIA_STREAMS protected: SourceMediaStream *source_; const TrackID track_id_; TrackTicks played_ticks_; PrincipalHandle principal_handle_; }; MediaPipelineReceive::MediaPipelineReceive( const std::string& pc, nsCOMPtr main_thread, nsCOMPtr sts_thread, SourceMediaStream *stream, const std::string& track_id, int level, RefPtr conduit, RefPtr rtp_transport, RefPtr rtcp_transport, nsAutoPtr filter) : MediaPipeline(pc, RECEIVE, main_thread, sts_thread, track_id, level, conduit, rtp_transport, rtcp_transport, filter), stream_(stream), segments_added_(0) { MOZ_ASSERT(stream_); } MediaPipelineReceive::~MediaPipelineReceive() { MOZ_ASSERT(!stream_); // Check that we have shut down already. } class MediaPipelineReceiveAudio::PipelineListener : public GenericReceiveListener { public: PipelineListener(SourceMediaStream * source, TrackID track_id, const RefPtr& conduit) : GenericReceiveListener(source, track_id), conduit_(conduit) { } ~PipelineListener() { if (!NS_IsMainThread()) { // release conduit on mainthread. Must use forget()! nsresult rv = NS_DispatchToMainThread(new ConduitDeleteEvent(conduit_.forget())); MOZ_ASSERT(!NS_FAILED(rv),"Could not dispatch conduit shutdown to main"); if (NS_FAILED(rv)) { MOZ_CRASH(); } } else { conduit_ = nullptr; } } // Implement MediaStreamListener void NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) override { MOZ_ASSERT(source_); if (!source_) { MOZ_MTLOG(ML_ERROR, "NotifyPull() called from a non-SourceMediaStream"); return; } // This comparison is done in total time to avoid accumulated roundoff errors. while (source_->TicksToTimeRoundDown(WEBRTC_DEFAULT_SAMPLE_RATE, played_ticks_) < desired_time) { int16_t scratch_buffer[AUDIO_SAMPLE_BUFFER_MAX]; int samples_length; // This fetches 10ms of data, either mono or stereo MediaConduitErrorCode err = static_cast(conduit_.get())->GetAudioFrame( scratch_buffer, WEBRTC_DEFAULT_SAMPLE_RATE, 0, // TODO(ekr@rtfm.com): better estimate of "capture" (really playout) delay samples_length); if (err != kMediaConduitNoError) { // Insert silence on conduit/GIPS failure (extremely unlikely) MOZ_MTLOG(ML_ERROR, "Audio conduit failed (" << err << ") to return data @ " << played_ticks_ << " (desired " << desired_time << " -> " << source_->StreamTimeToSeconds(desired_time) << ")"); // if this is not enough we'll loop and provide more samples_length = WEBRTC_DEFAULT_SAMPLE_RATE/100; PodArrayZero(scratch_buffer); } MOZ_ASSERT(samples_length * sizeof(uint16_t) < AUDIO_SAMPLE_BUFFER_MAX); MOZ_MTLOG(ML_DEBUG, "Audio conduit returned buffer of length " << samples_length); RefPtr samples = SharedBuffer::Create(samples_length * sizeof(uint16_t)); int16_t *samples_data = static_cast(samples->Data()); AudioSegment segment; // We derive the number of channels of the stream from the number of samples // the AudioConduit gives us, considering it gives us packets of 10ms and we // know the rate. uint32_t channelCount = samples_length / (WEBRTC_DEFAULT_SAMPLE_RATE / 100); AutoTArray channels; AutoTArray outputChannels; size_t frames = samples_length / channelCount; channels.SetLength(channelCount); size_t offset = 0; for (size_t i = 0; i < channelCount; i++) { channels[i] = samples_data + offset; offset += frames; } DeinterleaveAndConvertBuffer(scratch_buffer, frames, channelCount, channels.Elements()); outputChannels.AppendElements(channels); segment.AppendFrames(samples.forget(), outputChannels, frames, principal_handle_); // Handle track not actually added yet or removed/finished if (source_->AppendToTrack(track_id_, &segment)) { played_ticks_ += frames; } else { MOZ_MTLOG(ML_ERROR, "AppendToTrack failed"); // we can't un-read the data, but that's ok since we don't want to // buffer - but don't i-loop! return; } } } private: RefPtr conduit_; }; MediaPipelineReceiveAudio::MediaPipelineReceiveAudio( const std::string& pc, nsCOMPtr main_thread, nsCOMPtr sts_thread, SourceMediaStream* stream, const std::string& media_stream_track_id, TrackID numeric_track_id, int level, RefPtr conduit, RefPtr rtp_transport, RefPtr rtcp_transport, nsAutoPtr filter) : MediaPipelineReceive(pc, main_thread, sts_thread, stream, media_stream_track_id, level, conduit, rtp_transport, rtcp_transport, filter), listener_(new PipelineListener(stream, numeric_track_id, conduit)) {} void MediaPipelineReceiveAudio::DetachMedia() { ASSERT_ON_THREAD(main_thread_); if (stream_ && listener_) { listener_->EndTrack(); stream_->RemoveListener(listener_); stream_ = nullptr; } } nsresult MediaPipelineReceiveAudio::Init() { ASSERT_ON_THREAD(main_thread_); MOZ_MTLOG(ML_DEBUG, __FUNCTION__); description_ = pc_ + "| Receive audio["; description_ += track_id_; description_ += "]"; listener_->AddSelf(); return MediaPipelineReceive::Init(); } #ifndef USE_FAKE_MEDIA_STREAMS void MediaPipelineReceiveAudio::SetPrincipalHandle_m(const PrincipalHandle& principal_handle) { listener_->SetPrincipalHandle_m(principal_handle); } #endif // USE_FAKE_MEDIA_STREAMS class MediaPipelineReceiveVideo::PipelineListener : public GenericReceiveListener { public: PipelineListener(SourceMediaStream * source, TrackID track_id) : GenericReceiveListener(source, track_id), width_(0), height_(0), #if defined(MOZILLA_INTERNAL_API) image_container_(), image_(), #endif monitor_("Video PipelineListener") { #if !defined(MOZILLA_EXTERNAL_LINKAGE) image_container_ = LayerManager::CreateImageContainer(ImageContainer::ASYNCHRONOUS); #endif } // Implement MediaStreamListener void NotifyPull(MediaStreamGraph* graph, StreamTime desired_time) override { #if defined(MOZILLA_INTERNAL_API) ReentrantMonitorAutoEnter enter(monitor_); RefPtr image = image_; StreamTime delta = desired_time - played_ticks_; // Don't append if we've already provided a frame that supposedly // goes past the current aDesiredTime Doing so means a negative // delta and thus messes up handling of the graph if (delta > 0) { VideoSegment segment; segment.AppendFrame(image.forget(), delta, IntSize(width_, height_), principal_handle_); // Handle track not actually added yet or removed/finished if (source_->AppendToTrack(track_id_, &segment)) { played_ticks_ = desired_time; } else { MOZ_MTLOG(ML_ERROR, "AppendToTrack failed"); return; } } #endif } // Accessors for external writes from the renderer void FrameSizeChange(unsigned int width, unsigned int height, unsigned int number_of_streams) { ReentrantMonitorAutoEnter enter(monitor_); width_ = width; height_ = height; } void RenderVideoFrame(const unsigned char* buffer, size_t buffer_size, uint32_t time_stamp, int64_t render_time, const RefPtr& video_image) { RenderVideoFrame(buffer, buffer_size, width_, (width_ + 1) >> 1, time_stamp, render_time, video_image); } void RenderVideoFrame(const unsigned char* buffer, size_t buffer_size, uint32_t y_stride, uint32_t cbcr_stride, uint32_t time_stamp, int64_t render_time, const RefPtr& video_image) { #ifdef MOZILLA_INTERNAL_API ReentrantMonitorAutoEnter enter(monitor_); #endif // MOZILLA_INTERNAL_API #if defined(MOZILLA_INTERNAL_API) if (buffer) { // Create a video frame using |buffer|. #ifdef MOZ_WIDGET_GONK RefPtr yuvImage = new GrallocImage(); #else RefPtr yuvImage = image_container_->CreatePlanarYCbCrImage(); #endif uint8_t* frame = const_cast(static_cast (buffer)); PlanarYCbCrData yuvData; yuvData.mYChannel = frame; yuvData.mYSize = IntSize(y_stride, height_); yuvData.mYStride = y_stride; yuvData.mCbCrStride = cbcr_stride; yuvData.mCbChannel = frame + height_ * yuvData.mYStride; yuvData.mCrChannel = yuvData.mCbChannel + ((height_ + 1) >> 1) * yuvData.mCbCrStride; yuvData.mCbCrSize = IntSize(yuvData.mCbCrStride, (height_ + 1) >> 1); yuvData.mPicX = 0; yuvData.mPicY = 0; yuvData.mPicSize = IntSize(width_, height_); yuvData.mStereoMode = StereoMode::MONO; if (!yuvImage->CopyData(yuvData)) { MOZ_ASSERT(false); return; } image_ = yuvImage; } #ifdef WEBRTC_GONK else { // Decoder produced video frame that can be appended to the track directly. MOZ_ASSERT(video_image); image_ = video_image; } #endif // WEBRTC_GONK #endif // MOZILLA_INTERNAL_API } private: int width_; int height_; #if defined(MOZILLA_INTERNAL_API) RefPtr image_container_; RefPtr image_; #endif mozilla::ReentrantMonitor monitor_; // Monitor for processing WebRTC frames. // Protects image_ against: // - Writing from the GIPS thread // - Reading from the MSG thread }; class MediaPipelineReceiveVideo::PipelineRenderer : public VideoRenderer { public: explicit PipelineRenderer(MediaPipelineReceiveVideo *pipeline) : pipeline_(pipeline) {} void Detach() { pipeline_ = nullptr; } // Implement VideoRenderer void FrameSizeChange(unsigned int width, unsigned int height, unsigned int number_of_streams) override { pipeline_->listener_->FrameSizeChange(width, height, number_of_streams); } void RenderVideoFrame(const unsigned char* buffer, size_t buffer_size, uint32_t time_stamp, int64_t render_time, const ImageHandle& handle) override { pipeline_->listener_->RenderVideoFrame(buffer, buffer_size, time_stamp, render_time, handle.GetImage()); } void RenderVideoFrame(const unsigned char* buffer, size_t buffer_size, uint32_t y_stride, uint32_t cbcr_stride, uint32_t time_stamp, int64_t render_time, const ImageHandle& handle) override { pipeline_->listener_->RenderVideoFrame(buffer, buffer_size, y_stride, cbcr_stride, time_stamp, render_time, handle.GetImage()); } private: MediaPipelineReceiveVideo *pipeline_; // Raw pointer to avoid cycles }; MediaPipelineReceiveVideo::MediaPipelineReceiveVideo( const std::string& pc, nsCOMPtr main_thread, nsCOMPtr sts_thread, SourceMediaStream *stream, const std::string& media_stream_track_id, TrackID numeric_track_id, int level, RefPtr conduit, RefPtr rtp_transport, RefPtr rtcp_transport, nsAutoPtr filter) : MediaPipelineReceive(pc, main_thread, sts_thread, stream, media_stream_track_id, level, conduit, rtp_transport, rtcp_transport, filter), renderer_(new PipelineRenderer(this)), listener_(new PipelineListener(stream, numeric_track_id)) {} void MediaPipelineReceiveVideo::DetachMedia() { ASSERT_ON_THREAD(main_thread_); // stop generating video and thus stop invoking the PipelineRenderer // and PipelineListener - the renderer has a raw ptr to the Pipeline to // avoid cycles, and the render callbacks are invoked from a different // thread so simple null-checks would cause TSAN bugs without locks. static_cast(conduit_.get())->DetachRenderer(); if (stream_ && listener_) { listener_->EndTrack(); stream_->RemoveListener(listener_); stream_ = nullptr; } } nsresult MediaPipelineReceiveVideo::Init() { ASSERT_ON_THREAD(main_thread_); MOZ_MTLOG(ML_DEBUG, __FUNCTION__); description_ = pc_ + "| Receive video["; description_ += track_id_; description_ += "]"; #if defined(MOZILLA_INTERNAL_API) listener_->AddSelf(); #endif // Always happens before we can DetachMedia() static_cast(conduit_.get())-> AttachRenderer(renderer_); return MediaPipelineReceive::Init(); } #ifndef USE_FAKE_MEDIA_STREAMS void MediaPipelineReceiveVideo::SetPrincipalHandle_m(const PrincipalHandle& principal_handle) { listener_->SetPrincipalHandle_m(principal_handle); } #endif // USE_FAKE_MEDIA_STREAMS } // end namespace