diff options
Diffstat (limited to 'netwerk/sctp/datachannel/DataChannel.cpp')
-rw-r--r-- | netwerk/sctp/datachannel/DataChannel.cpp | 2661 |
1 files changed, 2661 insertions, 0 deletions
diff --git a/netwerk/sctp/datachannel/DataChannel.cpp b/netwerk/sctp/datachannel/DataChannel.cpp new file mode 100644 index 000000000..bf7790f51 --- /dev/null +++ b/netwerk/sctp/datachannel/DataChannel.cpp @@ -0,0 +1,2661 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include <algorithm> +#include <stdio.h> +#include <stdlib.h> +#if !defined(__Userspace_os_Windows) +#include <arpa/inet.h> +#endif +// usrsctp.h expects to have errno definitions prior to its inclusion. +#include <errno.h> + +#define SCTP_DEBUG 1 +#define SCTP_STDINT_INCLUDE <stdint.h> + +#ifdef _MSC_VER +// Disable "warning C4200: nonstandard extension used : zero-sized array in +// struct/union" +// ...which the third-party file usrsctp.h runs afoul of. +#pragma warning(push) +#pragma warning(disable:4200) +#endif + +#include "usrsctp.h" + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +#include "DataChannelLog.h" + +#include "nsServiceManagerUtils.h" +#include "nsIObserverService.h" +#include "nsIObserver.h" +#include "mozilla/Services.h" +#include "mozilla/Sprintf.h" +#include "nsProxyRelease.h" +#include "nsThread.h" +#include "nsThreadUtils.h" +#include "nsAutoPtr.h" +#include "nsNetUtil.h" +#include "nsNetCID.h" +#include "mozilla/StaticPtr.h" +#include "mozilla/Unused.h" +#ifdef MOZ_PEERCONNECTION +#include "mtransport/runnable_utils.h" +#endif + +#define DATACHANNEL_LOG(args) LOG(args) +#include "DataChannel.h" +#include "DataChannelProtocol.h" + +// Let us turn on and off important assertions in non-debug builds +#ifdef DEBUG +#define ASSERT_WEBRTC(x) MOZ_ASSERT((x)) +#elif defined(MOZ_WEBRTC_ASSERT_ALWAYS) +#define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0) +#endif + +static bool sctp_initialized; + +namespace mozilla { + +LazyLogModule gDataChannelLog("DataChannel"); +static LazyLogModule gSCTPLog("SCTP"); + +class DataChannelShutdown : public nsIObserver +{ +public: + // This needs to be tied to some form object that is guaranteed to be + // around (singleton likely) unless we want to shutdown sctp whenever + // we're not using it (and in which case we'd keep a refcnt'd object + // ref'd by each DataChannelConnection to release the SCTP usrlib via + // sctp_finish). Right now, the single instance of this class is + // owned by the observer service. + + NS_DECL_ISUPPORTS + + DataChannelShutdown() {} + + void Init() + { + nsCOMPtr<nsIObserverService> observerService = + mozilla::services::GetObserverService(); + if (!observerService) + return; + + nsresult rv = observerService->AddObserver(this, + "xpcom-will-shutdown", + false); + MOZ_ASSERT(rv == NS_OK); + (void) rv; + } + +private: + // The only instance of DataChannelShutdown is owned by the observer + // service, so there is no need to call RemoveObserver here. + virtual ~DataChannelShutdown() = default; + +public: + NS_IMETHOD Observe(nsISupports* aSubject, const char* aTopic, + const char16_t* aData) override { + if (strcmp(aTopic, "xpcom-will-shutdown") == 0) { + LOG(("Shutting down SCTP")); + if (sctp_initialized) { + usrsctp_finish(); + sctp_initialized = false; + } + nsCOMPtr<nsIObserverService> observerService = + mozilla::services::GetObserverService(); + if (!observerService) + return NS_ERROR_FAILURE; + + nsresult rv = observerService->RemoveObserver(this, + "xpcom-will-shutdown"); + MOZ_ASSERT(rv == NS_OK); + (void) rv; + } + return NS_OK; + } +}; + +NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver); + +BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data, + size_t length) : mLength(length) +{ + mSpa = new sctp_sendv_spa; + *mSpa = spa; + auto *tmp = new char[length]; // infallible malloc! + memcpy(tmp, data, length); + mData = tmp; +} + +BufferedMsg::~BufferedMsg() +{ + delete mSpa; + delete mData; +} + +static int +receive_cb(struct socket* sock, union sctp_sockstore addr, + void *data, size_t datalen, + struct sctp_rcvinfo rcv, int flags, void *ulp_info) +{ + DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info); + return connection->ReceiveCallback(sock, data, datalen, rcv, flags); +} + +static +DataChannelConnection * +GetConnectionFromSocket(struct socket* sock) +{ + struct sockaddr *addrs = nullptr; + int naddrs = usrsctp_getladdrs(sock, 0, &addrs); + if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) { + return nullptr; + } + // usrsctp_getladdrs() returns the addresses bound to this socket, which + // contains the SctpDataMediaChannel* as sconn_addr. Read the pointer, + // then free the list of addresses once we have the pointer. We only open + // AF_CONN sockets, and they should all have the sconn_addr set to the + // pointer that created them, so [0] is as good as any other. + struct sockaddr_conn *sconn = reinterpret_cast<struct sockaddr_conn *>(&addrs[0]); + DataChannelConnection *connection = + reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr); + usrsctp_freeladdrs(addrs); + + return connection; +} + +// called when the buffer empties to the threshold value +static int +threshold_event(struct socket* sock, uint32_t sb_free) +{ + DataChannelConnection *connection = GetConnectionFromSocket(sock); + if (connection) { + LOG(("SendDeferred()")); + connection->SendDeferredMessages(); + } else { + LOG(("Can't find connection for socket %p", sock)); + } + return 0; +} + +static void +debug_printf(const char *format, ...) +{ + va_list ap; + char buffer[1024]; + + if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { + va_start(ap, format); +#ifdef _WIN32 + if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) { +#else + if (VsprintfLiteral(buffer, format, ap) > 0) { +#endif + PR_LogPrint("%s", buffer); + } + va_end(ap); + } +} + +DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) : + mLock("netwerk::sctp::DataChannelConnection") +{ + mState = CLOSED; + mSocket = nullptr; + mMasterSocket = nullptr; + mListener = listener; + mLocalPort = 0; + mRemotePort = 0; + LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get())); + mInternalIOThread = nullptr; +} + +DataChannelConnection::~DataChannelConnection() +{ + LOG(("Deleting DataChannelConnection %p", (void *) this)); + // This may die on the MainThread, or on the STS thread + ASSERT_WEBRTC(mState == CLOSED); + MOZ_ASSERT(!mMasterSocket); + MOZ_ASSERT(mPending.GetSize() == 0); + + // Already disconnected from sigslot/mTransportFlow + // TransportFlows must be released from the STS thread + if (!IsSTSThread()) { + ASSERT_WEBRTC(NS_IsMainThread()); + if (mTransportFlow) { + ASSERT_WEBRTC(mSTS); + NS_ProxyRelease(mSTS, mTransportFlow.forget()); + } + + if (mInternalIOThread) { + // Avoid spinning the event thread from here (which if we're mainthread + // is in the event loop already) + NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread), + &nsIThread::Shutdown), + NS_DISPATCH_NORMAL); + } + } else { + // on STS, safe to call shutdown + if (mInternalIOThread) { + mInternalIOThread->Shutdown(); + } + } +} + +void +DataChannelConnection::Destroy() +{ + // Though it's probably ok to do this and close the sockets; + // if we really want it to do true clean shutdowns it can + // create a dependant Internal object that would remain around + // until the network shut down the association or timed out. + LOG(("Destroying DataChannelConnection %p", (void *) this)); + ASSERT_WEBRTC(NS_IsMainThread()); + CloseAll(); + + MutexAutoLock lock(mLock); + // If we had a pending reset, we aren't waiting for it - clear the list so + // we can deregister this DataChannelConnection without leaking. + ClearResets(); + + MOZ_ASSERT(mSTS); + ASSERT_WEBRTC(NS_IsMainThread()); + // Must do this in Destroy() since we may then delete this object. + // Do this before dispatching to create a consistent ordering of calls to + // the SCTP stack. + if (mUsingDtls) { + usrsctp_deregister_address(static_cast<void *>(this)); + LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this))); + } + + // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed, + // the usrsctp_close() calls can move back here (and just proxy the + // disconnect_all()) + RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this), + &DataChannelConnection::DestroyOnSTS, + mSocket, mMasterSocket), + NS_DISPATCH_NORMAL); + + // These will be released on STS + mSocket = nullptr; + mMasterSocket = nullptr; // also a flag that we've Destroyed this connection + + // We can't get any more new callbacks from the SCTP library + // All existing callbacks have refs to DataChannelConnection + + // nsDOMDataChannel objects have refs to DataChannels that have refs to us +} + +void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket, + struct socket *aSocket) +{ + if (aSocket && aSocket != aMasterSocket) + usrsctp_close(aSocket); + if (aMasterSocket) + usrsctp_close(aMasterSocket); + + disconnect_all(); +} + +bool +DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls) +{ + struct sctp_initmsg initmsg; + struct sctp_udpencaps encaps; + struct sctp_assoc_value av; + struct sctp_event event; + socklen_t len; + + uint16_t event_types[] = {SCTP_ASSOC_CHANGE, + SCTP_PEER_ADDR_CHANGE, + SCTP_REMOTE_ERROR, + SCTP_SHUTDOWN_EVENT, + SCTP_ADAPTATION_INDICATION, + SCTP_SEND_FAILED_EVENT, + SCTP_STREAM_RESET_EVENT, + SCTP_STREAM_CHANGE_EVENT}; + { + ASSERT_WEBRTC(NS_IsMainThread()); + + // MutexAutoLock lock(mLock); Not needed since we're on mainthread always + if (!sctp_initialized) { + if (aUsingDtls) { + LOG(("sctp_init(DTLS)")); +#ifdef MOZ_PEERCONNECTION + usrsctp_init(0, + DataChannelConnection::SctpDtlsOutput, + debug_printf + ); +#else + NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport"); +#endif + } else { + LOG(("sctp_init(%u)", aPort)); + usrsctp_init(aPort, + nullptr, + debug_printf + ); + } + + // Set logging to SCTP:LogLevel::Debug to get SCTP debugs + if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { + usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL); + } + + usrsctp_sysctl_set_sctp_blackhole(2); + // ECN is currently not supported by the Firefox code + usrsctp_sysctl_set_sctp_ecn_enable(0); + sctp_initialized = true; + + RefPtr<DataChannelShutdown> shutdown = new DataChannelShutdown(); + shutdown->Init(); + } + } + + // XXX FIX! make this a global we get once + // Find the STS thread + nsresult rv; + mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); + MOZ_ASSERT(NS_SUCCEEDED(rv)); + + // Open sctp with a callback + if ((mMasterSocket = usrsctp_socket( + aUsingDtls ? AF_CONN : AF_INET, + SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event, + usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) { + return false; + } + + // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking + // in associations for normal IO + if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) { + LOG(("Couldn't set non_blocking on SCTP socket")); + // We can't handle connect() safely if it will block, not that this will + // even happen. + goto error_cleanup; + } + + // Make sure when we close the socket, make sure it doesn't call us back again! + // This would cause it try to use an invalid DataChannelConnection pointer + struct linger l; + l.l_onoff = 1; + l.l_linger = 0; + if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER, + (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) { + LOG(("Couldn't set SO_LINGER on SCTP socket")); + // unsafe to allow it to continue if this fails + goto error_cleanup; + } + + // XXX Consider disabling this when we add proper SDP negotiation. + // We may want to leave enabled for supporting 'cloning' of SDP offers, which + // implies re-use of the same pseudo-port number, or forcing a renegotiation. + { + uint32_t on = 1; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT, + (const void *)&on, (socklen_t)sizeof(on)) < 0) { + LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket")); + } + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY, + (const void *)&on, (socklen_t)sizeof(on)) < 0) { + LOG(("Couldn't set SCTP_NODELAY on SCTP socket")); + } + } + + if (!aUsingDtls) { + memset(&encaps, 0, sizeof(encaps)); + encaps.sue_address.ss_family = AF_INET; + encaps.sue_port = htons(aPort); + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT, + (const void*)&encaps, + (socklen_t)sizeof(struct sctp_udpencaps)) < 0) { + LOG(("*** failed encaps errno %d", errno)); + goto error_cleanup; + } + LOG(("SCTP encapsulation local port %d", aPort)); + } + + av.assoc_id = SCTP_ALL_ASSOC; + av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, + (socklen_t)sizeof(struct sctp_assoc_value)) < 0) { + LOG(("*** failed enable stream reset errno %d", errno)); + goto error_cleanup; + } + + /* Enable the events of interest. */ + memset(&event, 0, sizeof(event)); + event.se_assoc_id = SCTP_ALL_ASSOC; + event.se_on = 1; + for (unsigned short event_type : event_types) { + event.se_type = event_type; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) { + LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno)); + goto error_cleanup; + } + } + + // Update number of streams + mStreams.AppendElements(aNumStreams); + for (uint32_t i = 0; i < aNumStreams; ++i) { + mStreams[i] = nullptr; + } + memset(&initmsg, 0, sizeof(initmsg)); + len = sizeof(initmsg); + if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) { + LOG(("*** failed getsockopt SCTP_INITMSG")); + goto error_cleanup; + } + LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams, + initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); + initmsg.sinit_num_ostreams = aNumStreams; + initmsg.sinit_max_instreams = MAX_NUM_STREAMS; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, + (socklen_t)sizeof(initmsg)) < 0) { + LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); + goto error_cleanup; + } + + mSocket = nullptr; + if (aUsingDtls) { + mUsingDtls = true; + usrsctp_register_address(static_cast<void *>(this)); + LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this))); + } else { + mUsingDtls = false; + } + return true; + +error_cleanup: + usrsctp_close(mMasterSocket); + mMasterSocket = nullptr; + mUsingDtls = false; + return false; +} + +#ifdef MOZ_PEERCONNECTION +void +DataChannelConnection::SetEvenOdd() +{ + ASSERT_WEBRTC(IsSTSThread()); + + TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>( + mTransportFlow->GetLayer(TransportLayerDtls::ID())); + MOZ_ASSERT(dtls); // DTLS is mandatory + mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT); +} + +bool +DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport) +{ + LOG(("Connect DTLS local %u, remote %u", localport, remoteport)); + + NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!"); + NS_ENSURE_TRUE(aFlow, false); + + mTransportFlow = aFlow; + mLocalPort = localport; + mRemotePort = remoteport; + mState = CONNECTING; + + RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this), + &DataChannelConnection::SetSignals), + NS_DISPATCH_NORMAL); + return true; +} + +void +DataChannelConnection::SetSignals() +{ + ASSERT_WEBRTC(IsSTSThread()); + ASSERT_WEBRTC(mTransportFlow); + LOG(("Setting transport signals, state: %d", mTransportFlow->state())); + mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput); + // SignalStateChange() doesn't call you with the initial state + mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect); + CompleteConnect(mTransportFlow, mTransportFlow->state()); +} + +void +DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state) +{ + LOG(("Data transport state: %d", state)); + MutexAutoLock lock(mLock); + ASSERT_WEBRTC(IsSTSThread()); + // We should abort connection on TS_ERROR. + // Note however that the association will also fail (perhaps with a delay) and + // notify us in that way + if (state != TransportLayer::TS_OPEN || !mMasterSocket) + return; + + struct sockaddr_conn addr; + memset(&addr, 0, sizeof(addr)); + addr.sconn_family = AF_CONN; +#if defined(__Userspace_os_Darwin) + addr.sconn_len = sizeof(addr); +#endif + addr.sconn_port = htons(mLocalPort); + addr.sconn_addr = static_cast<void *>(this); + + LOG(("Calling usrsctp_bind")); + int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), + sizeof(addr)); + if (r < 0) { + LOG(("usrsctp_bind failed: %d", r)); + } else { + // This is the remote addr + addr.sconn_port = htons(mRemotePort); + LOG(("Calling usrsctp_connect")); + r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), + sizeof(addr)); + if (r >= 0 || errno == EINPROGRESS) { + struct sctp_paddrparams paddrparams; + socklen_t opt_len; + + memset(&paddrparams, 0, sizeof(struct sctp_paddrparams)); + memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn)); + opt_len = (socklen_t)sizeof(struct sctp_paddrparams); + r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, + &paddrparams, &opt_len); + if (r < 0) { + LOG(("usrsctp_getsockopt failed: %d", r)); + } else { + // draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4 1200, IPV6 1280 + paddrparams.spp_pathmtu = 1200; // safe for either + paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE; + paddrparams.spp_flags |= SPP_PMTUD_DISABLE; + opt_len = (socklen_t)sizeof(struct sctp_paddrparams); + r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, + &paddrparams, opt_len); + if (r < 0) { + LOG(("usrsctp_getsockopt failed: %d", r)); + } else { + LOG(("usrsctp: PMTUD disabled, MTU set to %u", paddrparams.spp_pathmtu)); + } + } + } + if (r < 0) { + if (errno == EINPROGRESS) { + // non-blocking + return; + } else { + LOG(("usrsctp_connect failed: %d", errno)); + mState = CLOSED; + } + } else { + // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that + // This also avoids issues with calling TransportFlow stuff on Mainthread + return; + } + } + // Note: currently this doesn't actually notify the application + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CONNECTION, + this))); + return; +} + +// Process any pending Opens +void +DataChannelConnection::ProcessQueuedOpens() +{ + // The nsDeque holds channels with an AddRef applied. Another reference + // (may) be held by the DOMDataChannel, unless it's been GC'd. No other + // references should exist. + + // Can't copy nsDeque's. Move into temp array since any that fail will + // go back to mPending + nsDeque temp; + DataChannel *temp_channel; // really already_AddRefed<> + while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) { + temp.Push(static_cast<void *>(temp_channel)); + } + + RefPtr<DataChannel> channel; + // All these entries have an AddRef(); make that explicit now via the dont_AddRef() + while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) { + if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { + LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream)); + channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN; + // OpenFinish returns a reference itself, so we need to take it can Release it + channel = OpenFinish(channel.forget()); // may reset the flag and re-push + } else { + NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?"); + } + } + +} +void +DataChannelConnection::SctpDtlsInput(TransportFlow *flow, + const unsigned char *data, size_t len) +{ + if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { + char *buf; + + if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) { + PR_LogPrint("%s", buf); + usrsctp_freedumpbuffer(buf); + } + } + // Pass the data to SCTP + usrsctp_conninput(static_cast<void *>(this), data, len, 0); +} + +int +DataChannelConnection::SendPacket(unsigned char data[], size_t len, bool release) +{ + //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len)); + int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0; + if (release) + delete [] data; + return res; +} + +/* static */ +int +DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length, + uint8_t tos, uint8_t set_df) +{ + DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr); + int res; + + if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { + char *buf; + + if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) { + PR_LogPrint("%s", buf); + usrsctp_freedumpbuffer(buf); + } + } + // We're async proxying even if on the STSThread because this is called + // with internal SCTP locks held in some cases (such as in usrsctp_connect()). + // SCTP has an option for Apple, on IP connections only, to release at least + // one of the locks before calling a packet output routine; with changes to + // the underlying SCTP stack this might remove the need to use an async proxy. + if ((false /*peer->IsSTSThread()*/)) { + res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false); + } else { + auto *data = new unsigned char[length]; + memcpy(data, buffer, length); + // Commented out since we have to Dispatch SendPacket to avoid deadlock" + // res = -1; + + // XXX It might be worthwhile to add an assertion against the thread + // somehow getting into the DataChannel/SCTP code again, as + // DISPATCH_SYNC is not fully blocking. This may be tricky, as it + // needs to be a per-thread check, not a global. + peer->mSTS->Dispatch(WrapRunnable( + RefPtr<DataChannelConnection>(peer), + &DataChannelConnection::SendPacket, data, length, true), + NS_DISPATCH_NORMAL); + res = 0; // cheat! Packets can always be dropped later anyways + } + return res; +} +#endif + +#ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT +// listen for incoming associations +// Blocks! - Don't call this from main thread! + +#error This code will not work as-is since SetEvenOdd() runs on Mainthread + +bool +DataChannelConnection::Listen(unsigned short port) +{ + struct sockaddr_in addr; + socklen_t addr_len; + + NS_WARNING_ASSERTION(!NS_IsMainThread(), + "Blocks, do not call from main thread!!!"); + + /* Acting as the 'server' */ + memset((void *)&addr, 0, sizeof(addr)); +#ifdef HAVE_SIN_LEN + addr.sin_len = sizeof(struct sockaddr_in); +#endif + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = htonl(INADDR_ANY); + LOG(("Waiting for connections on port %u", ntohs(addr.sin_port))); + mState = CONNECTING; + if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) { + LOG(("***Failed userspace_bind")); + return false; + } + if (usrsctp_listen(mMasterSocket, 1) < 0) { + LOG(("***Failed userspace_listen")); + return false; + } + + LOG(("Accepting connection")); + addr_len = 0; + if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) { + LOG(("***Failed accept")); + return false; + } + mState = OPEN; + + struct linger l; + l.l_onoff = 1; + l.l_linger = 0; + if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER, + (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) { + LOG(("Couldn't set SO_LINGER on SCTP socket")); + } + + SetEvenOdd(); + + // Notify Connection open + // XXX We need to make sure connection sticks around until the message is delivered + LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CONNECTION, + this, (DataChannel *) nullptr))); + return true; +} + +// Blocks! - Don't call this from main thread! +bool +DataChannelConnection::Connect(const char *addr, unsigned short port) +{ + struct sockaddr_in addr4; + struct sockaddr_in6 addr6; + + NS_WARNING_ASSERTION(!NS_IsMainThread(), + "Blocks, do not call from main thread!!!"); + + /* Acting as the connector */ + LOG(("Connecting to %s, port %u", addr, port)); + memset((void *)&addr4, 0, sizeof(struct sockaddr_in)); + memset((void *)&addr6, 0, sizeof(struct sockaddr_in6)); +#ifdef HAVE_SIN_LEN + addr4.sin_len = sizeof(struct sockaddr_in); +#endif +#ifdef HAVE_SIN6_LEN + addr6.sin6_len = sizeof(struct sockaddr_in6); +#endif + addr4.sin_family = AF_INET; + addr6.sin6_family = AF_INET6; + addr4.sin_port = htons(port); + addr6.sin6_port = htons(port); + mState = CONNECTING; + +#if !defined(__Userspace_os_Windows) + if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) { + if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) { + LOG(("*** Failed userspace_connect")); + return false; + } + } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) { + if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) { + LOG(("*** Failed userspace_connect")); + return false; + } + } else { + LOG(("*** Illegal destination address.")); + } +#else + { + struct sockaddr_storage ss; + int sslen = sizeof(ss); + + if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) { + addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr; + if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) { + LOG(("*** Failed userspace_connect")); + return false; + } + } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) { + addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr; + if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) { + LOG(("*** Failed userspace_connect")); + return false; + } + } else { + LOG(("*** Illegal destination address.")); + } + } +#endif + + mSocket = mMasterSocket; + + LOG(("connect() succeeded! Entering connected mode")); + mState = OPEN; + + SetEvenOdd(); + + // Notify Connection open + // XXX We need to make sure connection sticks around until the message is delivered + LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CONNECTION, + this, (DataChannel *) nullptr))); + return true; +} +#endif + +DataChannel * +DataChannelConnection::FindChannelByStream(uint16_t stream) +{ + return mStreams.SafeElementAt(stream); +} + +uint16_t +DataChannelConnection::FindFreeStream() +{ + uint32_t i, j, limit; + + limit = mStreams.Length(); + if (limit > MAX_NUM_STREAMS) + limit = MAX_NUM_STREAMS; + + for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) { + if (!mStreams[i]) { + // Verify it's not still in the process of closing + for (j = 0; j < mStreamsResetting.Length(); ++j) { + if (mStreamsResetting[j] == i) { + break; + } + } + if (j == mStreamsResetting.Length()) + break; + } + } + if (i >= limit) { + return INVALID_STREAM; + } + return i; +} + +bool +DataChannelConnection::RequestMoreStreams(int32_t aNeeded) +{ + struct sctp_status status; + struct sctp_add_streams sas; + uint32_t outStreamsNeeded; + socklen_t len; + + if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) { + aNeeded = MAX_NUM_STREAMS - mStreams.Length(); + } + if (aNeeded <= 0) { + return false; + } + + len = (socklen_t)sizeof(struct sctp_status); + if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) { + LOG(("***failed: getsockopt SCTP_STATUS")); + return false; + } + outStreamsNeeded = aNeeded; // number to add + + // Note: if multiple channel opens happen when we don't have enough space, + // we'll call RequestMoreStreams() multiple times + memset(&sas, 0, sizeof(sas)); + sas.sas_instrms = 0; + sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */ + // Doesn't block, we get an event when it succeeds or fails + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas, + (socklen_t) sizeof(struct sctp_add_streams)) < 0) { + if (errno == EALREADY) { + LOG(("Already have %u output streams", outStreamsNeeded)); + return true; + } + + LOG(("***failed: setsockopt ADD errno=%d", errno)); + return false; + } + LOG(("Requested %u more streams", outStreamsNeeded)); + // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the + // values are larger than mStreams.Length() + return true; +} + +int32_t +DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream) +{ + struct sctp_sndinfo sndinfo; + + // Note: Main-thread IO, but doesn't block + memset(&sndinfo, 0, sizeof(struct sctp_sndinfo)); + sndinfo.snd_sid = stream; + sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL); + if (usrsctp_sendv(mSocket, msg, len, nullptr, 0, + &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), + SCTP_SENDV_SNDINFO, 0) < 0) { + //LOG(("***failed: sctp_sendv")); don't log because errno is a return! + return (0); + } + return (1); +} + +int32_t +DataChannelConnection::SendOpenAckMessage(uint16_t stream) +{ + struct rtcweb_datachannel_ack ack; + + memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack)); + ack.msg_type = DATA_CHANNEL_ACK; + + return SendControlMessage(&ack, sizeof(ack), stream); +} + +int32_t +DataChannelConnection::SendOpenRequestMessage(const nsACString& label, + const nsACString& protocol, + uint16_t stream, bool unordered, + uint16_t prPolicy, uint32_t prValue) +{ + const int label_len = label.Length(); // not including nul + const int proto_len = protocol.Length(); // not including nul + // careful - request struct include one char for the label + const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 + + label_len + proto_len; + struct rtcweb_datachannel_open_request *req = + (struct rtcweb_datachannel_open_request*) moz_xmalloc(req_size); + + memset(req, 0, req_size); + req->msg_type = DATA_CHANNEL_OPEN_REQUEST; + switch (prPolicy) { + case SCTP_PR_SCTP_NONE: + req->channel_type = DATA_CHANNEL_RELIABLE; + break; + case SCTP_PR_SCTP_TTL: + req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED; + break; + case SCTP_PR_SCTP_RTX: + req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT; + break; + default: + // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno! + free(req); + return (0); + } + if (unordered) { + // Per the current types, all differ by 0x80 between ordered and unordered + req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future + } + + req->reliability_param = htonl(prValue); + req->priority = htons(0); /* XXX: add support */ + req->label_length = htons(label_len); + req->protocol_length = htons(proto_len); + memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len); + memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len); + + int32_t result = SendControlMessage(req, req_size, stream); + + free(req); + return result; +} + +// XXX This should use a separate thread (outbound queue) which should +// select() to know when to *try* to send data to the socket again. +// Alternatively, it can use a timeout, but that's guaranteed to be wrong +// (just not sure in what direction). We could re-implement NSPR's +// PR_POLL_WRITE/etc handling... with a lot of work. + +// Better yet, use the SCTP stack's notifications on buffer state to avoid +// filling the SCTP's buffers. + +// returns if we're still blocked or not +bool +DataChannelConnection::SendDeferredMessages() +{ + uint32_t i; + RefPtr<DataChannel> channel; // we may null out the refs to this + bool still_blocked = false; + + // This may block while something is modifying channels, but should not block for IO + MutexAutoLock lock(mLock); + + // XXX For total fairness, on a still_blocked we'd start next time at the + // same index. Sorry, not going to bother for now. + for (i = 0; i < mStreams.Length(); ++i) { + channel = mStreams[i]; + if (!channel) + continue; + + // Only one of these should be set.... + if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) { + if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol, + channel->mStream, + channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED, + channel->mPrPolicy, channel->mPrValue)) { + channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ; + + channel->mState = OPEN; + channel->mReady = true; + LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, + channel))); + } else { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + still_blocked = true; + } else { + // Close the channel, inform the user + mStreams[channel->mStream] = nullptr; + channel->mState = CLOSED; + // Don't need to reset; we didn't open it + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, + channel))); + } + } + } + if (still_blocked) + break; + + if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) { + if (SendOpenAckMessage(channel->mStream)) { + channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK; + } else { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + still_blocked = true; + } else { + // Close the channel, inform the user + CloseInt(channel); + // XXX send error via DataChannelOnMessageAvailable (bug 843625) + } + } + } + if (still_blocked) + break; + + if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) { + bool failed_send = false; + int32_t result; + + if (channel->mState == CLOSED || channel->mState == CLOSING) { + channel->mBufferedData.Clear(); + } + + uint32_t buffered_amount = channel->GetBufferedAmountLocked(); + uint32_t threshold = channel->GetBufferedAmountLowThreshold(); + bool was_over_threshold = buffered_amount >= threshold; + + while (!channel->mBufferedData.IsEmpty() && + !failed_send) { + struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa; + const char *data = channel->mBufferedData[0]->mData; + size_t len = channel->mBufferedData[0]->mLength; + + // SCTP will return EMSGSIZE if the message is bigger than the buffer + // size (or EAGAIN if there isn't space) + if ((result = usrsctp_sendv(mSocket, data, len, + nullptr, 0, + (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa), + SCTP_SENDV_SPA, + 0)) < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // leave queued for resend + failed_send = true; + LOG(("queue full again when resending %d bytes (%d)", len, result)); + } else { + LOG(("error %d re-sending string", errno)); + failed_send = true; + } + } else { + LOG(("Resent buffer of %d bytes (%d)", len, result)); + // In theory this could underflow if >4GB was buffered and re + // truncated in GetBufferedAmount(), but this won't cause any problems. + buffered_amount -= channel->mBufferedData[0]->mLength; + channel->mBufferedData.RemoveElementAt(0); + // can never fire with default threshold of 0 + if (was_over_threshold && buffered_amount < threshold) { + LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__, + channel->mLabel.get(), channel->mProtocol.get(), channel->mStream)); + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD, + this, channel))); + was_over_threshold = false; + } + if (buffered_amount == 0) { + // buffered-to-not-buffered transition; tell the DOM code in case this makes it + // available for GC + LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__, + channel->mLabel.get(), channel->mProtocol.get(), channel->mStream)); + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::NO_LONGER_BUFFERED, + this, channel))); + } + } + } + if (channel->mBufferedData.IsEmpty()) + channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA; + else + still_blocked = true; + } + if (still_blocked) + break; + } + + return still_blocked; +} + +void +DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req, + size_t length, + uint16_t stream) +{ + RefPtr<DataChannel> channel; + uint32_t prValue; + uint16_t prPolicy; + uint32_t flags; + + mLock.AssertCurrentThreadOwns(); + + if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) { + LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length, + (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))); + if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) + return; + } + + LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req))); + + switch (req->channel_type) { + case DATA_CHANNEL_RELIABLE: + case DATA_CHANNEL_RELIABLE_UNORDERED: + prPolicy = SCTP_PR_SCTP_NONE; + break; + case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: + case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED: + prPolicy = SCTP_PR_SCTP_RTX; + break; + case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: + case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED: + prPolicy = SCTP_PR_SCTP_TTL; + break; + default: + LOG(("Unknown channel type", req->channel_type)); + /* XXX error handling */ + return; + } + prValue = ntohl(req->reliability_param); + flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0; + + if ((channel = FindChannelByStream(stream))) { + if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) { + LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.", + stream, channel->mState)); + /* XXX: some error handling */ + } else { + LOG(("Open for externally negotiated channel %u", stream)); + // XXX should also check protocol, maybe label + if (prPolicy != channel->mPrPolicy || + prValue != channel->mPrValue || + flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) + { + LOG(("WARNING: external negotiation mismatch with OpenRequest:" + "channel %u, policy %u/%u, value %u/%u, flags %x/%x", + stream, prPolicy, channel->mPrPolicy, + prValue, channel->mPrValue, flags, channel->mFlags)); + } + } + return; + } + if (stream >= mStreams.Length()) { + LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length())); + return; + } + + nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length))); + nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)], + ntohs(req->protocol_length))); + + channel = new DataChannel(this, + stream, + DataChannel::CONNECTING, + label, + protocol, + prPolicy, prValue, + flags, + nullptr, nullptr); + mStreams[stream] = channel; + + channel->mState = DataChannel::WAITING_TO_OPEN; + + LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__, + channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState)); + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, + this, channel))); + + LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); + + if (!SendOpenAckMessage(stream)) { + // XXX Only on EAGAIN!? And if not, then close the channel?? + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK; + // Note: we're locked, so there's no danger of a race with the + // buffer-threshold callback + } + + // Now process any queued data messages for the channel (which will + // themselves likely get queued until we leave WAITING_TO_OPEN, plus any + // more that come in before that happens) + DeliverQueuedData(stream); +} + +// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK. +// That would make this code moot. Keep it for now for backwards compatibility. +void +DataChannelConnection::DeliverQueuedData(uint16_t stream) +{ + mLock.AssertCurrentThreadOwns(); + + uint32_t i = 0; + while (i < mQueuedData.Length()) { + // Careful! we may modify the array length from within the loop! + if (mQueuedData[i]->mStream == stream) { + LOG(("Delivering queued data for stream %u, length %u", + stream, (unsigned int) mQueuedData[i]->mLength)); + // Deliver the queued data + HandleDataMessage(mQueuedData[i]->mPpid, + mQueuedData[i]->mData, mQueuedData[i]->mLength, + mQueuedData[i]->mStream); + mQueuedData.RemoveElementAt(i); + continue; // don't bump index since we removed the element + } + i++; + } +} + +void +DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack, + size_t length, uint16_t stream) +{ + DataChannel *channel; + + mLock.AssertCurrentThreadOwns(); + + channel = FindChannelByStream(stream); + NS_ENSURE_TRUE_VOID(channel); + + LOG(("OpenAck received for stream %u, waiting=%d", stream, + (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0)); + + channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK; +} + +void +DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream) +{ + /* XXX: Send an error message? */ + LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream)); + // XXX Log to JS error console if possible +} + +void +DataChannelConnection::HandleDataMessage(uint32_t ppid, + const void *data, size_t length, + uint16_t stream) +{ + DataChannel *channel; + const char *buffer = (const char *) data; + + mLock.AssertCurrentThreadOwns(); + + channel = FindChannelByStream(stream); + + // XXX A closed channel may trip this... check + // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK. + // That would make this code moot. Keep it for now for backwards compatibility. + if (!channel) { + // In the updated 0-RTT open case, the sender can send data immediately + // after Open, and doesn't set the in-order bit (since we don't have a + // response or ack). Also, with external negotiation, data can come in + // before we're told about the external negotiation. We need to buffer + // data until either a) Open comes in, if the ordering get messed up, + // or b) the app tells us this channel was externally negotiated. When + // these occur, we deliver the data. + + // Since this is rare and non-performance, keep a single list of queued + // data messages to deliver once the channel opens. + LOG(("Queuing data for stream %u, length %u", stream, length)); + // Copies data + mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length)); + return; + } + + // XXX should this be a simple if, no warnings/debugbreaks? + NS_ENSURE_TRUE_VOID(channel->mState != CLOSED); + + { + nsAutoCString recvData(buffer, length); // copies (<64) or allocates + bool is_binary = true; + + if (ppid == DATA_CHANNEL_PPID_DOMSTRING || + ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) { + is_binary = false; + } + if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) { + NS_WARNING("DataChannel message aborted by fragment type change!"); + channel->mRecvBuffer.Truncate(0); + } + channel->mIsRecvBinary = is_binary; + + switch (ppid) { + case DATA_CHANNEL_PPID_DOMSTRING: + case DATA_CHANNEL_PPID_BINARY: + channel->mRecvBuffer += recvData; + LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u", + is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(), + channel->mStream)); + return; // Not ready to notify application + + case DATA_CHANNEL_PPID_DOMSTRING_LAST: + LOG(("DataChannel: String message received of length %lu on channel %u", + length, channel->mStream)); + if (!channel->mRecvBuffer.IsEmpty()) { + channel->mRecvBuffer += recvData; + LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel)); + channel->SendOrQueue(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_DATA, this, + channel, channel->mRecvBuffer, -1)); + channel->mRecvBuffer.Truncate(0); + return; + } + // else send using recvData normally + length = -1; // Flag for DOMString + + // WebSockets checks IsUTF8() here; we can try to deliver it + break; + + case DATA_CHANNEL_PPID_BINARY_LAST: + LOG(("DataChannel: Received binary message of length %lu on channel id %u", + length, channel->mStream)); + if (!channel->mRecvBuffer.IsEmpty()) { + channel->mRecvBuffer += recvData; + LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel)); + channel->SendOrQueue(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_DATA, this, + channel, channel->mRecvBuffer, + channel->mRecvBuffer.Length())); + channel->mRecvBuffer.Truncate(0); + return; + } + // else send using recvData normally + break; + + default: + NS_ERROR("Unknown data PPID"); + return; + } + /* Notify onmessage */ + LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel)); + channel->SendOrQueue(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_DATA, this, + channel, recvData, length)); + } +} + +// Called with mLock locked! +void +DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream) +{ + const struct rtcweb_datachannel_open_request *req; + const struct rtcweb_datachannel_ack *ack; + + mLock.AssertCurrentThreadOwns(); + + switch (ppid) { + case DATA_CHANNEL_PPID_CONTROL: + req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer); + + NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message + switch (req->msg_type) { + case DATA_CHANNEL_OPEN_REQUEST: + // structure includes a possibly-unused char label[1] (in a packed structure) + NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1); + + HandleOpenRequestMessage(req, length, stream); + break; + case DATA_CHANNEL_ACK: + // >= sizeof(*ack) checked above + + ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer); + HandleOpenAckMessage(ack, length, stream); + break; + default: + HandleUnknownMessage(ppid, length, stream); + break; + } + break; + case DATA_CHANNEL_PPID_DOMSTRING: + case DATA_CHANNEL_PPID_DOMSTRING_LAST: + case DATA_CHANNEL_PPID_BINARY: + case DATA_CHANNEL_PPID_BINARY_LAST: + HandleDataMessage(ppid, buffer, length, stream); + break; + default: + LOG(("Message of length %lu, PPID %u on stream %u received.", + length, ppid, stream)); + break; + } +} + +void +DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac) +{ + uint32_t i, n; + + switch (sac->sac_state) { + case SCTP_COMM_UP: + LOG(("Association change: SCTP_COMM_UP")); + if (mState == CONNECTING) { + mSocket = mMasterSocket; + mState = OPEN; + + SetEvenOdd(); + + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CONNECTION, + this))); + LOG(("DTLS connect() succeeded! Entering connected mode")); + + // Open any streams pending... + ProcessQueuedOpens(); + + } else if (mState == OPEN) { + LOG(("DataConnection Already OPEN")); + } else { + LOG(("Unexpected state: %d", mState)); + } + break; + case SCTP_COMM_LOST: + LOG(("Association change: SCTP_COMM_LOST")); + // This association is toast, so also close all the channels -- from mainthread! + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_DISCONNECTED, + this))); + break; + case SCTP_RESTART: + LOG(("Association change: SCTP_RESTART")); + break; + case SCTP_SHUTDOWN_COMP: + LOG(("Association change: SCTP_SHUTDOWN_COMP")); + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_DISCONNECTED, + this))); + break; + case SCTP_CANT_STR_ASSOC: + LOG(("Association change: SCTP_CANT_STR_ASSOC")); + break; + default: + LOG(("Association change: UNKNOWN")); + break; + } + LOG(("Association change: streams (in/out) = (%u/%u)", + sac->sac_inbound_streams, sac->sac_outbound_streams)); + + NS_ENSURE_TRUE_VOID(sac); + n = sac->sac_length - sizeof(*sac); + if (((sac->sac_state == SCTP_COMM_UP) || + (sac->sac_state == SCTP_RESTART)) && (n > 0)) { + for (i = 0; i < n; ++i) { + switch (sac->sac_info[i]) { + case SCTP_ASSOC_SUPPORTS_PR: + LOG(("Supports: PR")); + break; + case SCTP_ASSOC_SUPPORTS_AUTH: + LOG(("Supports: AUTH")); + break; + case SCTP_ASSOC_SUPPORTS_ASCONF: + LOG(("Supports: ASCONF")); + break; + case SCTP_ASSOC_SUPPORTS_MULTIBUF: + LOG(("Supports: MULTIBUF")); + break; + case SCTP_ASSOC_SUPPORTS_RE_CONFIG: + LOG(("Supports: RE-CONFIG")); + break; + default: + LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i])); + break; + } + } + } else if (((sac->sac_state == SCTP_COMM_LOST) || + (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) { + LOG(("Association: ABORT =")); + for (i = 0; i < n; ++i) { + LOG((" 0x%02x", sac->sac_info[i])); + } + } + if ((sac->sac_state == SCTP_CANT_STR_ASSOC) || + (sac->sac_state == SCTP_SHUTDOWN_COMP) || + (sac->sac_state == SCTP_COMM_LOST)) { + return; + } +} + +void +DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc) +{ + const char *addr = ""; +#if !defined(__Userspace_os_Windows) + char addr_buf[INET6_ADDRSTRLEN]; + struct sockaddr_in *sin; + struct sockaddr_in6 *sin6; +#endif + + switch (spc->spc_aaddr.ss_family) { + case AF_INET: +#if !defined(__Userspace_os_Windows) + sin = (struct sockaddr_in *)&spc->spc_aaddr; + addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN); +#endif + break; + case AF_INET6: +#if !defined(__Userspace_os_Windows) + sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr; + addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN); +#endif + break; + case AF_CONN: + addr = "DTLS connection"; + break; + default: + break; + } + LOG(("Peer address %s is now ", addr)); + switch (spc->spc_state) { + case SCTP_ADDR_AVAILABLE: + LOG(("SCTP_ADDR_AVAILABLE")); + break; + case SCTP_ADDR_UNREACHABLE: + LOG(("SCTP_ADDR_UNREACHABLE")); + break; + case SCTP_ADDR_REMOVED: + LOG(("SCTP_ADDR_REMOVED")); + break; + case SCTP_ADDR_ADDED: + LOG(("SCTP_ADDR_ADDED")); + break; + case SCTP_ADDR_MADE_PRIM: + LOG(("SCTP_ADDR_MADE_PRIM")); + break; + case SCTP_ADDR_CONFIRMED: + LOG(("SCTP_ADDR_CONFIRMED")); + break; + default: + LOG(("UNKNOWN")); + break; + } + LOG((" (error = 0x%08x).\n", spc->spc_error)); +} + +void +DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre) +{ + size_t i, n; + + n = sre->sre_length - sizeof(struct sctp_remote_error); + LOG(("Remote Error (error = 0x%04x): ", sre->sre_error)); + for (i = 0; i < n; ++i) { + LOG((" 0x%02x", sre-> sre_data[i])); + } +} + +void +DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse) +{ + LOG(("Shutdown event.")); + /* XXX: notify all channels. */ + // Attempts to actually send anything will fail +} + +void +DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai) +{ + LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind)); +} + +void +DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe) +{ + size_t i, n; + + if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) { + LOG(("Unsent ")); + } + if (ssfe->ssfe_flags & SCTP_DATA_SENT) { + LOG(("Sent ")); + } + if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) { + LOG(("(flags = %x) ", ssfe->ssfe_flags)); + } + LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x", + ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid, + ssfe->ssfe_info.snd_flags, ssfe->ssfe_error)); + n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event); + for (i = 0; i < n; ++i) { + LOG((" 0x%02x", ssfe->ssfe_data[i])); + } +} + +void +DataChannelConnection::ClearResets() +{ + // Clear all pending resets + if (!mStreamsResetting.IsEmpty()) { + LOG(("Clearing resets for %d streams", mStreamsResetting.Length())); + } + + for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) { + RefPtr<DataChannel> channel; + channel = FindChannelByStream(mStreamsResetting[i]); + if (channel) { + LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get())); + mStreams[channel->mStream] = nullptr; + } + } + mStreamsResetting.Clear(); +} + +void +DataChannelConnection::ResetOutgoingStream(uint16_t stream) +{ + uint32_t i; + + mLock.AssertCurrentThreadOwns(); + LOG(("Connection %p: Resetting outgoing stream %u", + (void *) this, stream)); + // Rarely has more than a couple items and only for a short time + for (i = 0; i < mStreamsResetting.Length(); ++i) { + if (mStreamsResetting[i] == stream) { + return; + } + } + mStreamsResetting.AppendElement(stream); +} + +void +DataChannelConnection::SendOutgoingStreamReset() +{ + struct sctp_reset_streams *srs; + uint32_t i; + size_t len; + + LOG(("Connection %p: Sending outgoing stream reset for %d streams", + (void *) this, mStreamsResetting.Length())); + mLock.AssertCurrentThreadOwns(); + if (mStreamsResetting.IsEmpty()) { + LOG(("No streams to reset")); + return; + } + len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t); + srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc + memset(srs, 0, len); + srs->srs_flags = SCTP_STREAM_RESET_OUTGOING; + srs->srs_number_streams = mStreamsResetting.Length(); + for (i = 0; i < mStreamsResetting.Length(); ++i) { + srs->srs_stream_list[i] = mStreamsResetting[i]; + } + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) { + LOG(("***failed: setsockopt RESET, errno %d", errno)); + // if errno == EALREADY, this is normal - we can't send another reset + // with one pending. + // When we get an incoming reset (which may be a response to our + // outstanding one), see if we have any pending outgoing resets and + // send them + } else { + mStreamsResetting.Clear(); + } + free(srs); +} + +void +DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst) +{ + uint32_t n, i; + RefPtr<DataChannel> channel; // since we may null out the ref to the channel + + if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) && + !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { + n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t); + for (i = 0; i < n; ++i) { + if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { + channel = FindChannelByStream(strrst->strreset_stream_list[i]); + if (channel) { + // The other side closed the channel + // We could be in three states: + // 1. Normal state (input and output streams (OPEN) + // Notify application, send a RESET in response on our + // outbound channel. Go to CLOSED + // 2. We sent our own reset (CLOSING); either they crossed on the + // wire, or this is a response to our Reset. + // Go to CLOSED + // 3. We've sent a open but haven't gotten a response yet (CONNECTING) + // I believe this is impossible, as we don't have an input stream yet. + + LOG(("Incoming: Channel %u closed, state %d", + channel->mStream, channel->mState)); + ASSERT_WEBRTC(channel->mState == DataChannel::OPEN || + channel->mState == DataChannel::CLOSING || + channel->mState == DataChannel::CONNECTING || + channel->mState == DataChannel::WAITING_TO_OPEN); + if (channel->mState == DataChannel::OPEN || + channel->mState == DataChannel::WAITING_TO_OPEN) { + // Mark the stream for reset (the reset is sent below) + ResetOutgoingStream(channel->mStream); + } + mStreams[channel->mStream] = nullptr; + + LOG(("Disconnected DataChannel %p from connection %p", + (void *) channel.get(), (void *) channel->mConnection.get())); + // This sends ON_CHANNEL_CLOSED to mainthread + channel->StreamClosedLocked(); + } else { + LOG(("Can't find incoming channel %d",i)); + } + } + } + } + + // Process any pending resets now: + if (!mStreamsResetting.IsEmpty()) { + LOG(("Sending %d pending resets", mStreamsResetting.Length())); + SendOutgoingStreamReset(); + } +} + +void +DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg) +{ + uint16_t stream; + RefPtr<DataChannel> channel; + + if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) { + LOG(("*** Failed increasing number of streams from %u (%u/%u)", + mStreams.Length(), + strchg->strchange_instrms, + strchg->strchange_outstrms)); + // XXX FIX! notify pending opens of failure + return; + } else { + if (strchg->strchange_instrms > mStreams.Length()) { + LOG(("Other side increased streams from %u to %u", + mStreams.Length(), strchg->strchange_instrms)); + } + if (strchg->strchange_outstrms > mStreams.Length() || + strchg->strchange_instrms > mStreams.Length()) { + uint16_t old_len = mStreams.Length(); + uint16_t new_len = std::max(strchg->strchange_outstrms, + strchg->strchange_instrms); + LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)", + old_len, new_len, new_len - old_len, + strchg->strchange_instrms)); + // make sure both are the same length + mStreams.AppendElements(new_len - old_len); + LOG(("New length = %d (was %d)", mStreams.Length(), old_len)); + for (size_t i = old_len; i < mStreams.Length(); ++i) { + mStreams[i] = nullptr; + } + // Re-process any channels waiting for streams. + // Linear search, but we don't increase channels often and + // the array would only get long in case of an app error normally + + // Make sure we request enough streams if there's a big jump in streams + // Could make a more complex API for OpenXxxFinish() and avoid this loop + size_t num_needed = mPending.GetSize(); + LOG(("%d of %d new streams already needed", num_needed, + new_len - old_len)); + num_needed -= (new_len - old_len); // number we added + if (num_needed > 0) { + if (num_needed < 16) + num_needed = 16; + LOG(("Not enough new streams, asking for %d more", num_needed)); + RequestMoreStreams(num_needed); + } else if (strchg->strchange_outstrms < strchg->strchange_instrms) { + LOG(("Requesting %d output streams to match partner", + strchg->strchange_instrms - strchg->strchange_outstrms)); + RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms); + } + + ProcessQueuedOpens(); + } + // else probably not a change in # of streams + } + + for (uint32_t i = 0; i < mStreams.Length(); ++i) { + channel = mStreams[i]; + if (!channel) + continue; + + if ((channel->mState == CONNECTING) && + (channel->mStream == INVALID_STREAM)) { + if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) || + (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) { + /* XXX: Signal to the other end. */ + channel->mState = CLOSED; + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, + channel))); + // maybe fire onError (bug 843625) + } else { + stream = FindFreeStream(); + if (stream != INVALID_STREAM) { + channel->mStream = stream; + mStreams[stream] = channel; + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; + // Note: we're locked, so there's no danger of a race with the + // buffer-threshold callback + } else { + /* We will not find more ... */ + break; + } + } + } + } +} + +// Called with mLock locked! +void +DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n) +{ + mLock.AssertCurrentThreadOwns(); + if (notif->sn_header.sn_length != (uint32_t)n) { + return; + } + switch (notif->sn_header.sn_type) { + case SCTP_ASSOC_CHANGE: + HandleAssociationChangeEvent(&(notif->sn_assoc_change)); + break; + case SCTP_PEER_ADDR_CHANGE: + HandlePeerAddressChangeEvent(&(notif->sn_paddr_change)); + break; + case SCTP_REMOTE_ERROR: + HandleRemoteErrorEvent(&(notif->sn_remote_error)); + break; + case SCTP_SHUTDOWN_EVENT: + HandleShutdownEvent(&(notif->sn_shutdown_event)); + break; + case SCTP_ADAPTATION_INDICATION: + HandleAdaptationIndication(&(notif->sn_adaptation_event)); + break; + case SCTP_PARTIAL_DELIVERY_EVENT: + LOG(("SCTP_PARTIAL_DELIVERY_EVENT")); + break; + case SCTP_AUTHENTICATION_EVENT: + LOG(("SCTP_AUTHENTICATION_EVENT")); + break; + case SCTP_SENDER_DRY_EVENT: + //LOG(("SCTP_SENDER_DRY_EVENT")); + break; + case SCTP_NOTIFICATIONS_STOPPED_EVENT: + LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT")); + break; + case SCTP_SEND_FAILED_EVENT: + HandleSendFailedEvent(&(notif->sn_send_failed_event)); + break; + case SCTP_STREAM_RESET_EVENT: + HandleStreamResetEvent(&(notif->sn_strreset_event)); + break; + case SCTP_ASSOC_RESET_EVENT: + LOG(("SCTP_ASSOC_RESET_EVENT")); + break; + case SCTP_STREAM_CHANGE_EVENT: + HandleStreamChangeEvent(&(notif->sn_strchange_event)); + break; + default: + LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type)); + break; + } + } + +int +DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen, + struct sctp_rcvinfo rcv, int32_t flags) +{ + ASSERT_WEBRTC(!NS_IsMainThread()); + + if (!data) { + usrsctp_close(sock); // SCTP has finished shutting down + } else { + MutexAutoLock lock(mLock); + if (flags & MSG_NOTIFICATION) { + HandleNotification(static_cast<union sctp_notification *>(data), datalen); + } else { + HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid); + } + } + // sctp allocates 'data' with malloc(), and expects the receiver to free + // it (presumably with free). + // XXX future optimization: try to deliver messages without an internal + // alloc/copy, and if so delay the free until later. + free(data); + // usrsctp defines the callback as returning an int, but doesn't use it + return 1; +} + +already_AddRefed<DataChannel> +DataChannelConnection::Open(const nsACString& label, const nsACString& protocol, + Type type, bool inOrder, + uint32_t prValue, DataChannelListener *aListener, + nsISupports *aContext, bool aExternalNegotiated, + uint16_t aStream) +{ + // aStream == INVALID_STREAM to have the protocol allocate + uint16_t prPolicy = SCTP_PR_SCTP_NONE; + uint32_t flags; + + LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u", + PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(), + type, inOrder, prValue, aListener, aContext, + aExternalNegotiated ? "true" : "false", aStream)); + switch (type) { + case DATA_CHANNEL_RELIABLE: + prPolicy = SCTP_PR_SCTP_NONE; + break; + case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: + prPolicy = SCTP_PR_SCTP_RTX; + break; + case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: + prPolicy = SCTP_PR_SCTP_TTL; + break; + } + if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) { + return nullptr; + } + + // Don't look past currently-negotiated streams + if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) { + LOG(("ERROR: external negotiation of already-open channel %u", aStream)); + // XXX How do we indicate this up to the application? Probably the + // caller's job, but we may need to return an error code. + return nullptr; + } + + flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0; + RefPtr<DataChannel> channel(new DataChannel(this, + aStream, + DataChannel::CONNECTING, + label, protocol, + type, prValue, + flags, + aListener, aContext)); + if (aExternalNegotiated) { + channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED; + } + + MutexAutoLock lock(mLock); // OpenFinish assumes this + return OpenFinish(channel.forget()); +} + +// Separate routine so we can also call it to finish up from pending opens +already_AddRefed<DataChannel> +DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel) +{ + RefPtr<DataChannel> channel(aChannel); // takes the reference passed in + // Normally 1 reference if called from ::Open(), or 2 if called from + // ProcessQueuedOpens() unless the DOMDataChannel was gc'd + uint16_t stream = channel->mStream; + bool queue = false; + + mLock.AssertCurrentThreadOwns(); + + // Cases we care about: + // Pre-negotiated: + // Not Open: + // Doesn't fit: + // -> change initial ask or renegotiate after open + // -> queue open + // Open: + // Doesn't fit: + // -> RequestMoreStreams && queue + // Does fit: + // -> open + // Not negotiated: + // Not Open: + // -> queue open + // Open: + // -> Try to get a stream + // Doesn't fit: + // -> RequestMoreStreams && queue + // Does fit: + // -> open + // So the Open cases are basically the same + // Not Open cases are simply queue for non-negotiated, and + // either change the initial ask or possibly renegotiate after open. + + if (mState == OPEN) { + if (stream == INVALID_STREAM) { + stream = FindFreeStream(); // may be INVALID_STREAM if we need more + } + if (stream == INVALID_STREAM || stream >= mStreams.Length()) { + // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams + // to avoid going back immediately for more if the ask to N, N+1, etc + int32_t more_needed = (stream == INVALID_STREAM) ? 16 : + (stream-((int32_t)mStreams.Length())) + 16; + if (!RequestMoreStreams(more_needed)) { + // Something bad happened... we're done + goto request_error_cleanup; + } + queue = true; + } + } else { + // not OPEN + if (stream != INVALID_STREAM && stream >= mStreams.Length() && + mState == CLOSED) { + // Update number of streams for init message + struct sctp_initmsg initmsg; + socklen_t len = sizeof(initmsg); + int32_t total_needed = stream+16; + + memset(&initmsg, 0, sizeof(initmsg)); + if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) { + LOG(("*** failed getsockopt SCTP_INITMSG")); + goto request_error_cleanup; + } + LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed, + initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); + initmsg.sinit_num_ostreams = total_needed; + initmsg.sinit_max_instreams = MAX_NUM_STREAMS; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, + (socklen_t)sizeof(initmsg)) < 0) { + LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); + goto request_error_cleanup; + } + + int32_t old_len = mStreams.Length(); + mStreams.AppendElements(total_needed - old_len); + for (int32_t i = old_len; i < total_needed; ++i) { + mStreams[i] = nullptr; + } + } + // else if state is CONNECTING, we'll just re-negotiate when OpenFinish + // is called, if needed + queue = true; + } + if (queue) { + LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream)); + // Also serves to mark we told the app + channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN; + // we need a ref for the nsDeQue and one to return + DataChannel* rawChannel = channel; + rawChannel->AddRef(); + mPending.Push(rawChannel); + return channel.forget(); + } + + MOZ_ASSERT(stream != INVALID_STREAM); + // just allocated (& OPEN), or externally negotiated + mStreams[stream] = channel; // holds a reference + channel->mStream = stream; + +#ifdef TEST_QUEUED_DATA + // It's painful to write a test for this... + channel->mState = OPEN; + channel->mReady = true; + SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST); +#endif + + if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) { + // Don't send unordered until this gets cleared + channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK; + } + + if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) { + if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol, + stream, + !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), + channel->mPrPolicy, channel->mPrValue)) { + LOG(("SendOpenRequest failed, errno = %d", errno)); + if (errno == EAGAIN || errno == EWOULDBLOCK) { + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; + // Note: we're locked, so there's no danger of a race with the + // buffer-threshold callback + return channel.forget(); + } else { + if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { + // We already returned the channel to the app. + NS_ERROR("Failed to send open request"); + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, + channel))); + } + // If we haven't returned the channel yet, it will get destroyed when we exit + // this function. + mStreams[stream] = nullptr; + channel->mStream = INVALID_STREAM; + // we'll be destroying the channel + channel->mState = CLOSED; + return nullptr; + } + /* NOTREACHED */ + } + } + // Either externally negotiated or we sent Open + channel->mState = OPEN; + channel->mReady = true; + // FIX? Move into DOMDataChannel? I don't think we can send it yet here + LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, + channel))); + + return channel.forget(); + +request_error_cleanup: + channel->mState = CLOSED; + if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { + // We already returned the channel to the app. + NS_ERROR("Failed to request more streams"); + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, + channel))); + return channel.forget(); + } + // we'll be destroying the channel, but it never really got set up + // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and + // Dispatch it to ourselves + return nullptr; +} + +int32_t +DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data, + size_t length, uint32_t ppid) +{ + uint16_t flags; + struct sctp_sendv_spa spa; + int32_t result; + + NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0); + NS_WARNING_ASSERTION(length > 0, "Length is 0?!"); + + // To avoid problems where an in-order OPEN is lost and an + // out-of-order data message "beats" it, require data to be in-order + // until we get an ACK. + if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) && + !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) { + flags = SCTP_UNORDERED; + } else { + flags = 0; + } + + spa.sendv_sndinfo.snd_ppid = htonl(ppid); + spa.sendv_sndinfo.snd_sid = channel->mStream; + spa.sendv_sndinfo.snd_flags = flags; + spa.sendv_sndinfo.snd_context = 0; + spa.sendv_sndinfo.snd_assoc_id = 0; + spa.sendv_flags = SCTP_SEND_SNDINFO_VALID; + + if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) { + spa.sendv_prinfo.pr_policy = channel->mPrPolicy; + spa.sendv_prinfo.pr_value = channel->mPrValue; + spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; + } + + // Note: Main-thread IO, but doesn't block! + // XXX FIX! to deal with heavy overruns of JS trying to pass data in + // (more than the buffersize) queue data onto another thread to do the + // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp + + // SCTP will return EMSGSIZE if the message is bigger than the buffer + // size (or EAGAIN if there isn't space) + + // Avoid a race between buffer-full-failure (where we have to add the + // packet to the buffered-data queue) and the buffer-now-only-half-full + // callback, which happens on a different thread. Otherwise we might + // fail here, then before we add it to the queue get the half-full + // callback, find nothing to do, then on this thread add it to the + // queue - which would sit there. Also, if we later send more data, it + // would arrive ahead of the buffered message, but if the buffer ever + // got to 1/2 full, the message would get sent - but at a semi-random + // time, after other data it was supposed to be in front of. + + // Must lock before empty check for similar reasons! + MutexAutoLock lock(mLock); + if (channel->mBufferedData.IsEmpty()) { + result = usrsctp_sendv(mSocket, data, length, + nullptr, 0, + (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa), + SCTP_SENDV_SPA, 0); + LOG(("Sent buffer (len=%u), result=%d", length, result)); + } else { + // Fake EAGAIN if we're already buffering data + result = -1; + errno = EAGAIN; + } + if (result < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + + // queue data for resend! And queue any further data for the stream until it is... + auto *buffered = new BufferedMsg(spa, data, length); // infallible malloc + channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA; + LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length)); + return 0; + } + LOG(("error %d sending string", errno)); + } + return result; +} + +// Handles fragmenting binary messages +int32_t +DataChannelConnection::SendBinary(DataChannel *channel, const char *data, + size_t len, + uint32_t ppid_partial, uint32_t ppid_final) +{ + // Since there's a limit on network buffer size and no limits on message + // size, and we don't want to use EOR mode (multiple writes for a + // message, but all other streams are blocked until you finish sending + // this message), we need to add application-level fragmentation of large + // messages. On a reliable channel, these can be simply rebuilt into a + // large message. On an unreliable channel, we can't and don't know how + // long to wait, and there are no retransmissions, and no easy way to + // tell the user "this part is missing", so on unreliable channels we + // need to return an error if sending more bytes than the network buffers + // can hold, and perhaps a lower number. + + // We *really* don't want to do this from main thread! - and SendMsgInternal + // avoids blocking. + // This MUST be reliable and in-order for the reassembly to work + if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT && + channel->mPrPolicy == DATA_CHANNEL_RELIABLE && + !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) { + int32_t sent=0; + uint32_t origlen = len; + LOG(("Sending binary message length %u in chunks", len)); + // XXX check flags for out-of-order, or force in-order for large binary messages + while (len > 0) { + size_t sendlen = std::min<size_t>(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT); + uint32_t ppid; + len -= sendlen; + ppid = len > 0 ? ppid_partial : ppid_final; + LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid)); + // Note that these might end up being deferred and queued. + sent += SendMsgInternal(channel, data, sendlen, ppid); + data += sendlen; + } + LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued", + (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT, + origlen, sent, + channel->mBufferedData.Length())); + return sent; + } + NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT, + "Sending too-large data on unreliable channel!"); + + // This will fail if the message is too large (default 256K) + return SendMsgInternal(channel, data, len, ppid_final); +} + +class ReadBlobRunnable : public Runnable { +public: + ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream, + nsIInputStream* aBlob) : + mConnection(aConnection), + mStream(aStream), + mBlob(aBlob) + {} + + NS_IMETHOD Run() override { + // ReadBlob() is responsible to releasing the reference + DataChannelConnection *self = mConnection; + self->ReadBlob(mConnection.forget(), mStream, mBlob); + return NS_OK; + } + +private: + // Make sure the Connection doesn't die while there are jobs outstanding. + // Let it die (if released by PeerConnectionImpl while we're running) + // when we send our runnable back to MainThread. Then ~DataChannelConnection + // can send the IOThread to MainThread to die in a runnable, avoiding + // unsafe event loop recursion. Evil. + RefPtr<DataChannelConnection> mConnection; + uint16_t mStream; + // Use RefCount for preventing the object is deleted when SendBlob returns. + RefPtr<nsIInputStream> mBlob; +}; + +int32_t +DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob) +{ + DataChannel *channel = mStreams[stream]; + NS_ENSURE_TRUE(channel, 0); + // Spawn a thread to send the data + if (!mInternalIOThread) { + nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread)); + if (NS_FAILED(res)) { + return -1; + } + } + + mInternalIOThread->Dispatch(do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL); + return 0; +} + +class DataChannelBlobSendRunnable : public Runnable +{ +public: + DataChannelBlobSendRunnable(already_AddRefed<DataChannelConnection>& aConnection, + uint16_t aStream) + : mConnection(aConnection) + , mStream(aStream) {} + + ~DataChannelBlobSendRunnable() override + { + if (!NS_IsMainThread() && mConnection) { + MOZ_ASSERT(false); + // explicitly leak the connection if destroyed off mainthread + Unused << mConnection.forget().take(); + } + } + + NS_IMETHOD Run() override + { + ASSERT_WEBRTC(NS_IsMainThread()); + + mConnection->SendBinaryMsg(mStream, mData); + mConnection = nullptr; + return NS_OK; + } + + // explicitly public so we can avoid allocating twice and copying + nsCString mData; + +private: + // Note: we can be destroyed off the target thread, so be careful not to let this + // get Released()ed on the temp thread! + RefPtr<DataChannelConnection> mConnection; + uint16_t mStream; +}; + +void +DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis, + uint16_t aStream, nsIInputStream* aBlob) +{ + // NOTE: 'aThis' has been forgotten by the caller to avoid releasing + // it off mainthread; if PeerConnectionImpl has released then we want + // ~DataChannelConnection() to run on MainThread + + // XXX to do this safely, we must enqueue these atomically onto the + // output socket. We need a sender thread(s?) to enqueue data into the + // socket and to avoid main-thread IO that might block. Even on a + // background thread, we may not want to block on one stream's data. + // I.e. run non-blocking and service multiple channels. + + // For now as a hack, send as a single blast of queued packets which may + // be deferred until buffer space is available. + uint64_t len; + nsCOMPtr<nsIThread> mainThread; + NS_GetMainThread(getter_AddRefs(mainThread)); + + // Must not let Dispatching it cause the DataChannelConnection to get + // released on the wrong thread. Using WrapRunnable(RefPtr<DataChannelConnection>(aThis),... + // will occasionally cause aThis to get released on this thread. Also, an explicit Runnable + // lets us avoid copying the blob data an extra time. + RefPtr<DataChannelBlobSendRunnable> runnable = new DataChannelBlobSendRunnable(aThis, + aStream); + // avoid copying the blob data by passing the mData from the runnable + if (NS_FAILED(aBlob->Available(&len)) || + NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, len))) { + // Bug 966602: Doesn't return an error to the caller via onerror. + // We must release DataChannelConnection on MainThread to avoid issues (bug 876167) + // aThis is now owned by the runnable; release it there + NS_ProxyRelease(mainThread, runnable.forget()); + return; + } + aBlob->Close(); + NS_DispatchToMainThread(runnable, NS_DISPATCH_NORMAL); +} + +void +DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList) +{ + ASSERT_WEBRTC(NS_IsMainThread()); + for (uint32_t i = 0; i < mStreams.Length(); ++i) { + if (mStreams[i]) { + aStreamList->push_back(mStreams[i]->mStream); + } + } +} + +int32_t +DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg, + bool isBinary) +{ + ASSERT_WEBRTC(NS_IsMainThread()); + // We really could allow this from other threads, so long as we deal with + // asynchronosity issues with channels closing, in particular access to + // mStreams, and issues with the association closing (access to mSocket). + + const char *data = aMsg.BeginReading(); + uint32_t len = aMsg.Length(); + DataChannel *channel; + + LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len)); + // XXX if we want more efficiency, translate flags once at open time + channel = mStreams[stream]; + NS_ENSURE_TRUE(channel, 0); + + if (isBinary) + return SendBinary(channel, data, len, + DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST); + return SendBinary(channel, data, len, + DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST); +} + +void +DataChannelConnection::Close(DataChannel *aChannel) +{ + MutexAutoLock lock(mLock); + CloseInt(aChannel); +} + +// So we can call Close() with the lock already held +// Called from someone who holds a ref via ::Close(), or from ~DataChannel +void +DataChannelConnection::CloseInt(DataChannel *aChannel) +{ + MOZ_ASSERT(aChannel); + RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us + + mLock.AssertCurrentThreadOwns(); + LOG(("Connection %p/Channel %p: Closing stream %u", + channel->mConnection.get(), channel.get(), channel->mStream)); + // re-test since it may have closed before the lock was grabbed + if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) { + LOG(("Channel already closing/closed (%u)", aChannel->mState)); + if (mState == CLOSED && channel->mStream != INVALID_STREAM) { + // called from CloseAll() + // we're not going to hang around waiting any more + mStreams[channel->mStream] = nullptr; + } + return; + } + aChannel->mBufferedData.Clear(); + if (channel->mStream != INVALID_STREAM) { + ResetOutgoingStream(channel->mStream); + if (mState == CLOSED) { // called from CloseAll() + // Let resets accumulate then send all at once in CloseAll() + // we're not going to hang around waiting + mStreams[channel->mStream] = nullptr; + } else { + SendOutgoingStreamReset(); + } + } + aChannel->mState = CLOSING; + if (mState == CLOSED) { + // we're not going to hang around waiting + channel->StreamClosedLocked(); + } + // At this point when we leave here, the object is a zombie held alive only by the DOM object +} + +void DataChannelConnection::CloseAll() +{ + LOG(("Closing all channels (connection %p)", (void*) this)); + // Don't need to lock here + + // Make sure no more channels will be opened + { + MutexAutoLock lock(mLock); + mState = CLOSED; + } + + // Close current channels + // If there are runnables, they hold a strong ref and keep the channel + // and/or connection alive (even if in a CLOSED state) + bool closed_some = false; + for (uint32_t i = 0; i < mStreams.Length(); ++i) { + if (mStreams[i]) { + mStreams[i]->Close(); + closed_some = true; + } + } + + // Clean up any pending opens for channels + RefPtr<DataChannel> channel; + while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) { + LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream)); + channel->Close(); // also releases the ref on each iteration + closed_some = true; + } + // It's more efficient to let the Resets queue in shutdown and then + // SendOutgoingStreamReset() here. + if (closed_some) { + MutexAutoLock lock(mLock); + SendOutgoingStreamReset(); + } +} + +DataChannel::~DataChannel() +{ + // NS_ASSERTION since this is more "I think I caught all the cases that + // can cause this" than a true kill-the-program assertion. If this is + // wrong, nothing bad happens. A worst it's a leak. + NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel"); +} + +void +DataChannel::Close() +{ + if (mConnection) { + // ensure we don't get deleted + RefPtr<DataChannelConnection> connection(mConnection); + connection->Close(this); + } +} + +// Used when disconnecting from the DataChannelConnection +void +DataChannel::StreamClosedLocked() +{ + mConnection->mLock.AssertCurrentThreadOwns(); + ENSURE_DATACONNECTION; + + LOG(("Destroying Data channel %u", mStream)); + MOZ_ASSERT_IF(mStream != INVALID_STREAM, + !mConnection->FindChannelByStream(mStream)); + mStream = INVALID_STREAM; + mState = CLOSED; + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, + mConnection, this))); + // We leave mConnection live until the DOM releases us, to avoid races +} + +void +DataChannel::ReleaseConnection() +{ + ASSERT_WEBRTC(NS_IsMainThread()); + mConnection = nullptr; +} + +void +DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext) +{ + MutexAutoLock mLock(mListenerLock); + mContext = aContext; + mListener = aListener; +} + +// May be called from another (i.e. Main) thread! +void +DataChannel::AppReady() +{ + ENSURE_DATACONNECTION; + + MutexAutoLock lock(mConnection->mLock); + + mReady = true; + if (mState == WAITING_TO_OPEN) { + mState = OPEN; + NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection, + this))); + for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) { + nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i]; + MOZ_ASSERT(runnable); + NS_DispatchToMainThread(runnable); + } + } else { + NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN"); + } + mQueuedMessages.Clear(); + mQueuedMessages.Compact(); + // We never use it again... We could even allocate the array in the odd + // cases we need it. +} + +uint32_t +DataChannel::GetBufferedAmountLocked() const +{ + size_t buffered = 0; + + for (auto& buffer : mBufferedData) { + buffered += buffer->mLength; + } + // XXX Note: per Michael Tuexen, there's no way to currently get the buffered + // amount from the SCTP stack for a single stream. It is on their to-do + // list, and once we import a stack with support for that, we'll need to + // add it to what we buffer. Also we'll need to ask for notification of a per- + // stream buffer-low event and merge that into the handling of buffer-low + // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets) + + if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely + buffered = UINT32_MAX; + } + return buffered; +} + +uint32_t +DataChannel::GetBufferedAmountLowThreshold() +{ + return mBufferedThreshold; +} + +// Never fire immediately, as it's defined to fire on transitions, not state +void +DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold) +{ + mBufferedThreshold = aThreshold; +} + +// Called with mLock locked! +void +DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage) +{ + if (!mReady && + (mState == CONNECTING || mState == WAITING_TO_OPEN)) { + mQueuedMessages.AppendElement(aMessage); + } else { + NS_DispatchToMainThread(aMessage); + } +} + +} // namespace mozilla |