summaryrefslogtreecommitdiffstats
path: root/netwerk/sctp/datachannel/DataChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'netwerk/sctp/datachannel/DataChannel.cpp')
-rw-r--r--netwerk/sctp/datachannel/DataChannel.cpp2661
1 files changed, 2661 insertions, 0 deletions
diff --git a/netwerk/sctp/datachannel/DataChannel.cpp b/netwerk/sctp/datachannel/DataChannel.cpp
new file mode 100644
index 000000000..bf7790f51
--- /dev/null
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -0,0 +1,2661 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include <algorithm>
+#include <stdio.h>
+#include <stdlib.h>
+#if !defined(__Userspace_os_Windows)
+#include <arpa/inet.h>
+#endif
+// usrsctp.h expects to have errno definitions prior to its inclusion.
+#include <errno.h>
+
+#define SCTP_DEBUG 1
+#define SCTP_STDINT_INCLUDE <stdint.h>
+
+#ifdef _MSC_VER
+// Disable "warning C4200: nonstandard extension used : zero-sized array in
+// struct/union"
+// ...which the third-party file usrsctp.h runs afoul of.
+#pragma warning(push)
+#pragma warning(disable:4200)
+#endif
+
+#include "usrsctp.h"
+
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+
+#include "DataChannelLog.h"
+
+#include "nsServiceManagerUtils.h"
+#include "nsIObserverService.h"
+#include "nsIObserver.h"
+#include "mozilla/Services.h"
+#include "mozilla/Sprintf.h"
+#include "nsProxyRelease.h"
+#include "nsThread.h"
+#include "nsThreadUtils.h"
+#include "nsAutoPtr.h"
+#include "nsNetUtil.h"
+#include "nsNetCID.h"
+#include "mozilla/StaticPtr.h"
+#include "mozilla/Unused.h"
+#ifdef MOZ_PEERCONNECTION
+#include "mtransport/runnable_utils.h"
+#endif
+
+#define DATACHANNEL_LOG(args) LOG(args)
+#include "DataChannel.h"
+#include "DataChannelProtocol.h"
+
+// Let us turn on and off important assertions in non-debug builds
+#ifdef DEBUG
+#define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
+#elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
+#define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
+#endif
+
+static bool sctp_initialized;
+
+namespace mozilla {
+
+LazyLogModule gDataChannelLog("DataChannel");
+static LazyLogModule gSCTPLog("SCTP");
+
+class DataChannelShutdown : public nsIObserver
+{
+public:
+ // This needs to be tied to some form object that is guaranteed to be
+ // around (singleton likely) unless we want to shutdown sctp whenever
+ // we're not using it (and in which case we'd keep a refcnt'd object
+ // ref'd by each DataChannelConnection to release the SCTP usrlib via
+ // sctp_finish). Right now, the single instance of this class is
+ // owned by the observer service.
+
+ NS_DECL_ISUPPORTS
+
+ DataChannelShutdown() {}
+
+ void Init()
+ {
+ nsCOMPtr<nsIObserverService> observerService =
+ mozilla::services::GetObserverService();
+ if (!observerService)
+ return;
+
+ nsresult rv = observerService->AddObserver(this,
+ "xpcom-will-shutdown",
+ false);
+ MOZ_ASSERT(rv == NS_OK);
+ (void) rv;
+ }
+
+private:
+ // The only instance of DataChannelShutdown is owned by the observer
+ // service, so there is no need to call RemoveObserver here.
+ virtual ~DataChannelShutdown() = default;
+
+public:
+ NS_IMETHOD Observe(nsISupports* aSubject, const char* aTopic,
+ const char16_t* aData) override {
+ if (strcmp(aTopic, "xpcom-will-shutdown") == 0) {
+ LOG(("Shutting down SCTP"));
+ if (sctp_initialized) {
+ usrsctp_finish();
+ sctp_initialized = false;
+ }
+ nsCOMPtr<nsIObserverService> observerService =
+ mozilla::services::GetObserverService();
+ if (!observerService)
+ return NS_ERROR_FAILURE;
+
+ nsresult rv = observerService->RemoveObserver(this,
+ "xpcom-will-shutdown");
+ MOZ_ASSERT(rv == NS_OK);
+ (void) rv;
+ }
+ return NS_OK;
+ }
+};
+
+NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
+
+BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
+ size_t length) : mLength(length)
+{
+ mSpa = new sctp_sendv_spa;
+ *mSpa = spa;
+ auto *tmp = new char[length]; // infallible malloc!
+ memcpy(tmp, data, length);
+ mData = tmp;
+}
+
+BufferedMsg::~BufferedMsg()
+{
+ delete mSpa;
+ delete mData;
+}
+
+static int
+receive_cb(struct socket* sock, union sctp_sockstore addr,
+ void *data, size_t datalen,
+ struct sctp_rcvinfo rcv, int flags, void *ulp_info)
+{
+ DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
+ return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
+}
+
+static
+DataChannelConnection *
+GetConnectionFromSocket(struct socket* sock)
+{
+ struct sockaddr *addrs = nullptr;
+ int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
+ if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
+ return nullptr;
+ }
+ // usrsctp_getladdrs() returns the addresses bound to this socket, which
+ // contains the SctpDataMediaChannel* as sconn_addr. Read the pointer,
+ // then free the list of addresses once we have the pointer. We only open
+ // AF_CONN sockets, and they should all have the sconn_addr set to the
+ // pointer that created them, so [0] is as good as any other.
+ struct sockaddr_conn *sconn = reinterpret_cast<struct sockaddr_conn *>(&addrs[0]);
+ DataChannelConnection *connection =
+ reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr);
+ usrsctp_freeladdrs(addrs);
+
+ return connection;
+}
+
+// called when the buffer empties to the threshold value
+static int
+threshold_event(struct socket* sock, uint32_t sb_free)
+{
+ DataChannelConnection *connection = GetConnectionFromSocket(sock);
+ if (connection) {
+ LOG(("SendDeferred()"));
+ connection->SendDeferredMessages();
+ } else {
+ LOG(("Can't find connection for socket %p", sock));
+ }
+ return 0;
+}
+
+static void
+debug_printf(const char *format, ...)
+{
+ va_list ap;
+ char buffer[1024];
+
+ if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
+ va_start(ap, format);
+#ifdef _WIN32
+ if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
+#else
+ if (VsprintfLiteral(buffer, format, ap) > 0) {
+#endif
+ PR_LogPrint("%s", buffer);
+ }
+ va_end(ap);
+ }
+}
+
+DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
+ mLock("netwerk::sctp::DataChannelConnection")
+{
+ mState = CLOSED;
+ mSocket = nullptr;
+ mMasterSocket = nullptr;
+ mListener = listener;
+ mLocalPort = 0;
+ mRemotePort = 0;
+ LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
+ mInternalIOThread = nullptr;
+}
+
+DataChannelConnection::~DataChannelConnection()
+{
+ LOG(("Deleting DataChannelConnection %p", (void *) this));
+ // This may die on the MainThread, or on the STS thread
+ ASSERT_WEBRTC(mState == CLOSED);
+ MOZ_ASSERT(!mMasterSocket);
+ MOZ_ASSERT(mPending.GetSize() == 0);
+
+ // Already disconnected from sigslot/mTransportFlow
+ // TransportFlows must be released from the STS thread
+ if (!IsSTSThread()) {
+ ASSERT_WEBRTC(NS_IsMainThread());
+ if (mTransportFlow) {
+ ASSERT_WEBRTC(mSTS);
+ NS_ProxyRelease(mSTS, mTransportFlow.forget());
+ }
+
+ if (mInternalIOThread) {
+ // Avoid spinning the event thread from here (which if we're mainthread
+ // is in the event loop already)
+ NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
+ &nsIThread::Shutdown),
+ NS_DISPATCH_NORMAL);
+ }
+ } else {
+ // on STS, safe to call shutdown
+ if (mInternalIOThread) {
+ mInternalIOThread->Shutdown();
+ }
+ }
+}
+
+void
+DataChannelConnection::Destroy()
+{
+ // Though it's probably ok to do this and close the sockets;
+ // if we really want it to do true clean shutdowns it can
+ // create a dependant Internal object that would remain around
+ // until the network shut down the association or timed out.
+ LOG(("Destroying DataChannelConnection %p", (void *) this));
+ ASSERT_WEBRTC(NS_IsMainThread());
+ CloseAll();
+
+ MutexAutoLock lock(mLock);
+ // If we had a pending reset, we aren't waiting for it - clear the list so
+ // we can deregister this DataChannelConnection without leaking.
+ ClearResets();
+
+ MOZ_ASSERT(mSTS);
+ ASSERT_WEBRTC(NS_IsMainThread());
+ // Must do this in Destroy() since we may then delete this object.
+ // Do this before dispatching to create a consistent ordering of calls to
+ // the SCTP stack.
+ if (mUsingDtls) {
+ usrsctp_deregister_address(static_cast<void *>(this));
+ LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
+ }
+
+ // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
+ // the usrsctp_close() calls can move back here (and just proxy the
+ // disconnect_all())
+ RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
+ &DataChannelConnection::DestroyOnSTS,
+ mSocket, mMasterSocket),
+ NS_DISPATCH_NORMAL);
+
+ // These will be released on STS
+ mSocket = nullptr;
+ mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
+
+ // We can't get any more new callbacks from the SCTP library
+ // All existing callbacks have refs to DataChannelConnection
+
+ // nsDOMDataChannel objects have refs to DataChannels that have refs to us
+}
+
+void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
+ struct socket *aSocket)
+{
+ if (aSocket && aSocket != aMasterSocket)
+ usrsctp_close(aSocket);
+ if (aMasterSocket)
+ usrsctp_close(aMasterSocket);
+
+ disconnect_all();
+}
+
+bool
+DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
+{
+ struct sctp_initmsg initmsg;
+ struct sctp_udpencaps encaps;
+ struct sctp_assoc_value av;
+ struct sctp_event event;
+ socklen_t len;
+
+ uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
+ SCTP_PEER_ADDR_CHANGE,
+ SCTP_REMOTE_ERROR,
+ SCTP_SHUTDOWN_EVENT,
+ SCTP_ADAPTATION_INDICATION,
+ SCTP_SEND_FAILED_EVENT,
+ SCTP_STREAM_RESET_EVENT,
+ SCTP_STREAM_CHANGE_EVENT};
+ {
+ ASSERT_WEBRTC(NS_IsMainThread());
+
+ // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
+ if (!sctp_initialized) {
+ if (aUsingDtls) {
+ LOG(("sctp_init(DTLS)"));
+#ifdef MOZ_PEERCONNECTION
+ usrsctp_init(0,
+ DataChannelConnection::SctpDtlsOutput,
+ debug_printf
+ );
+#else
+ NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
+#endif
+ } else {
+ LOG(("sctp_init(%u)", aPort));
+ usrsctp_init(aPort,
+ nullptr,
+ debug_printf
+ );
+ }
+
+ // Set logging to SCTP:LogLevel::Debug to get SCTP debugs
+ if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
+ usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
+ }
+
+ usrsctp_sysctl_set_sctp_blackhole(2);
+ // ECN is currently not supported by the Firefox code
+ usrsctp_sysctl_set_sctp_ecn_enable(0);
+ sctp_initialized = true;
+
+ RefPtr<DataChannelShutdown> shutdown = new DataChannelShutdown();
+ shutdown->Init();
+ }
+ }
+
+ // XXX FIX! make this a global we get once
+ // Find the STS thread
+ nsresult rv;
+ mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
+ MOZ_ASSERT(NS_SUCCEEDED(rv));
+
+ // Open sctp with a callback
+ if ((mMasterSocket = usrsctp_socket(
+ aUsingDtls ? AF_CONN : AF_INET,
+ SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
+ usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
+ return false;
+ }
+
+ // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
+ // in associations for normal IO
+ if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
+ LOG(("Couldn't set non_blocking on SCTP socket"));
+ // We can't handle connect() safely if it will block, not that this will
+ // even happen.
+ goto error_cleanup;
+ }
+
+ // Make sure when we close the socket, make sure it doesn't call us back again!
+ // This would cause it try to use an invalid DataChannelConnection pointer
+ struct linger l;
+ l.l_onoff = 1;
+ l.l_linger = 0;
+ if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
+ (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
+ LOG(("Couldn't set SO_LINGER on SCTP socket"));
+ // unsafe to allow it to continue if this fails
+ goto error_cleanup;
+ }
+
+ // XXX Consider disabling this when we add proper SDP negotiation.
+ // We may want to leave enabled for supporting 'cloning' of SDP offers, which
+ // implies re-use of the same pseudo-port number, or forcing a renegotiation.
+ {
+ uint32_t on = 1;
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
+ (const void *)&on, (socklen_t)sizeof(on)) < 0) {
+ LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
+ }
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
+ (const void *)&on, (socklen_t)sizeof(on)) < 0) {
+ LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
+ }
+ }
+
+ if (!aUsingDtls) {
+ memset(&encaps, 0, sizeof(encaps));
+ encaps.sue_address.ss_family = AF_INET;
+ encaps.sue_port = htons(aPort);
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
+ (const void*)&encaps,
+ (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
+ LOG(("*** failed encaps errno %d", errno));
+ goto error_cleanup;
+ }
+ LOG(("SCTP encapsulation local port %d", aPort));
+ }
+
+ av.assoc_id = SCTP_ALL_ASSOC;
+ av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
+ (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
+ LOG(("*** failed enable stream reset errno %d", errno));
+ goto error_cleanup;
+ }
+
+ /* Enable the events of interest. */
+ memset(&event, 0, sizeof(event));
+ event.se_assoc_id = SCTP_ALL_ASSOC;
+ event.se_on = 1;
+ for (unsigned short event_type : event_types) {
+ event.se_type = event_type;
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
+ LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
+ goto error_cleanup;
+ }
+ }
+
+ // Update number of streams
+ mStreams.AppendElements(aNumStreams);
+ for (uint32_t i = 0; i < aNumStreams; ++i) {
+ mStreams[i] = nullptr;
+ }
+ memset(&initmsg, 0, sizeof(initmsg));
+ len = sizeof(initmsg);
+ if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
+ LOG(("*** failed getsockopt SCTP_INITMSG"));
+ goto error_cleanup;
+ }
+ LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
+ initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
+ initmsg.sinit_num_ostreams = aNumStreams;
+ initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
+ (socklen_t)sizeof(initmsg)) < 0) {
+ LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
+ goto error_cleanup;
+ }
+
+ mSocket = nullptr;
+ if (aUsingDtls) {
+ mUsingDtls = true;
+ usrsctp_register_address(static_cast<void *>(this));
+ LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
+ } else {
+ mUsingDtls = false;
+ }
+ return true;
+
+error_cleanup:
+ usrsctp_close(mMasterSocket);
+ mMasterSocket = nullptr;
+ mUsingDtls = false;
+ return false;
+}
+
+#ifdef MOZ_PEERCONNECTION
+void
+DataChannelConnection::SetEvenOdd()
+{
+ ASSERT_WEBRTC(IsSTSThread());
+
+ TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
+ mTransportFlow->GetLayer(TransportLayerDtls::ID()));
+ MOZ_ASSERT(dtls); // DTLS is mandatory
+ mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
+}
+
+bool
+DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
+{
+ LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
+
+ NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
+ NS_ENSURE_TRUE(aFlow, false);
+
+ mTransportFlow = aFlow;
+ mLocalPort = localport;
+ mRemotePort = remoteport;
+ mState = CONNECTING;
+
+ RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
+ &DataChannelConnection::SetSignals),
+ NS_DISPATCH_NORMAL);
+ return true;
+}
+
+void
+DataChannelConnection::SetSignals()
+{
+ ASSERT_WEBRTC(IsSTSThread());
+ ASSERT_WEBRTC(mTransportFlow);
+ LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
+ mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
+ // SignalStateChange() doesn't call you with the initial state
+ mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
+ CompleteConnect(mTransportFlow, mTransportFlow->state());
+}
+
+void
+DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
+{
+ LOG(("Data transport state: %d", state));
+ MutexAutoLock lock(mLock);
+ ASSERT_WEBRTC(IsSTSThread());
+ // We should abort connection on TS_ERROR.
+ // Note however that the association will also fail (perhaps with a delay) and
+ // notify us in that way
+ if (state != TransportLayer::TS_OPEN || !mMasterSocket)
+ return;
+
+ struct sockaddr_conn addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sconn_family = AF_CONN;
+#if defined(__Userspace_os_Darwin)
+ addr.sconn_len = sizeof(addr);
+#endif
+ addr.sconn_port = htons(mLocalPort);
+ addr.sconn_addr = static_cast<void *>(this);
+
+ LOG(("Calling usrsctp_bind"));
+ int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
+ sizeof(addr));
+ if (r < 0) {
+ LOG(("usrsctp_bind failed: %d", r));
+ } else {
+ // This is the remote addr
+ addr.sconn_port = htons(mRemotePort);
+ LOG(("Calling usrsctp_connect"));
+ r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
+ sizeof(addr));
+ if (r >= 0 || errno == EINPROGRESS) {
+ struct sctp_paddrparams paddrparams;
+ socklen_t opt_len;
+
+ memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
+ memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
+ opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
+ r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
+ &paddrparams, &opt_len);
+ if (r < 0) {
+ LOG(("usrsctp_getsockopt failed: %d", r));
+ } else {
+ // draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4 1200, IPV6 1280
+ paddrparams.spp_pathmtu = 1200; // safe for either
+ paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
+ paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
+ opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
+ r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
+ &paddrparams, opt_len);
+ if (r < 0) {
+ LOG(("usrsctp_getsockopt failed: %d", r));
+ } else {
+ LOG(("usrsctp: PMTUD disabled, MTU set to %u", paddrparams.spp_pathmtu));
+ }
+ }
+ }
+ if (r < 0) {
+ if (errno == EINPROGRESS) {
+ // non-blocking
+ return;
+ } else {
+ LOG(("usrsctp_connect failed: %d", errno));
+ mState = CLOSED;
+ }
+ } else {
+ // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
+ // This also avoids issues with calling TransportFlow stuff on Mainthread
+ return;
+ }
+ }
+ // Note: currently this doesn't actually notify the application
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CONNECTION,
+ this)));
+ return;
+}
+
+// Process any pending Opens
+void
+DataChannelConnection::ProcessQueuedOpens()
+{
+ // The nsDeque holds channels with an AddRef applied. Another reference
+ // (may) be held by the DOMDataChannel, unless it's been GC'd. No other
+ // references should exist.
+
+ // Can't copy nsDeque's. Move into temp array since any that fail will
+ // go back to mPending
+ nsDeque temp;
+ DataChannel *temp_channel; // really already_AddRefed<>
+ while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
+ temp.Push(static_cast<void *>(temp_channel));
+ }
+
+ RefPtr<DataChannel> channel;
+ // All these entries have an AddRef(); make that explicit now via the dont_AddRef()
+ while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
+ LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
+ // OpenFinish returns a reference itself, so we need to take it can Release it
+ channel = OpenFinish(channel.forget()); // may reset the flag and re-push
+ } else {
+ NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
+ }
+ }
+
+}
+void
+DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
+ const unsigned char *data, size_t len)
+{
+ if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
+ char *buf;
+
+ if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
+ PR_LogPrint("%s", buf);
+ usrsctp_freedumpbuffer(buf);
+ }
+ }
+ // Pass the data to SCTP
+ usrsctp_conninput(static_cast<void *>(this), data, len, 0);
+}
+
+int
+DataChannelConnection::SendPacket(unsigned char data[], size_t len, bool release)
+{
+ //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
+ int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
+ if (release)
+ delete [] data;
+ return res;
+}
+
+/* static */
+int
+DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
+ uint8_t tos, uint8_t set_df)
+{
+ DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
+ int res;
+
+ if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
+ char *buf;
+
+ if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
+ PR_LogPrint("%s", buf);
+ usrsctp_freedumpbuffer(buf);
+ }
+ }
+ // We're async proxying even if on the STSThread because this is called
+ // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
+ // SCTP has an option for Apple, on IP connections only, to release at least
+ // one of the locks before calling a packet output routine; with changes to
+ // the underlying SCTP stack this might remove the need to use an async proxy.
+ if ((false /*peer->IsSTSThread()*/)) {
+ res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
+ } else {
+ auto *data = new unsigned char[length];
+ memcpy(data, buffer, length);
+ // Commented out since we have to Dispatch SendPacket to avoid deadlock"
+ // res = -1;
+
+ // XXX It might be worthwhile to add an assertion against the thread
+ // somehow getting into the DataChannel/SCTP code again, as
+ // DISPATCH_SYNC is not fully blocking. This may be tricky, as it
+ // needs to be a per-thread check, not a global.
+ peer->mSTS->Dispatch(WrapRunnable(
+ RefPtr<DataChannelConnection>(peer),
+ &DataChannelConnection::SendPacket, data, length, true),
+ NS_DISPATCH_NORMAL);
+ res = 0; // cheat! Packets can always be dropped later anyways
+ }
+ return res;
+}
+#endif
+
+#ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
+// listen for incoming associations
+// Blocks! - Don't call this from main thread!
+
+#error This code will not work as-is since SetEvenOdd() runs on Mainthread
+
+bool
+DataChannelConnection::Listen(unsigned short port)
+{
+ struct sockaddr_in addr;
+ socklen_t addr_len;
+
+ NS_WARNING_ASSERTION(!NS_IsMainThread(),
+ "Blocks, do not call from main thread!!!");
+
+ /* Acting as the 'server' */
+ memset((void *)&addr, 0, sizeof(addr));
+#ifdef HAVE_SIN_LEN
+ addr.sin_len = sizeof(struct sockaddr_in);
+#endif
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = htonl(INADDR_ANY);
+ LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
+ mState = CONNECTING;
+ if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
+ LOG(("***Failed userspace_bind"));
+ return false;
+ }
+ if (usrsctp_listen(mMasterSocket, 1) < 0) {
+ LOG(("***Failed userspace_listen"));
+ return false;
+ }
+
+ LOG(("Accepting connection"));
+ addr_len = 0;
+ if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
+ LOG(("***Failed accept"));
+ return false;
+ }
+ mState = OPEN;
+
+ struct linger l;
+ l.l_onoff = 1;
+ l.l_linger = 0;
+ if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
+ (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
+ LOG(("Couldn't set SO_LINGER on SCTP socket"));
+ }
+
+ SetEvenOdd();
+
+ // Notify Connection open
+ // XXX We need to make sure connection sticks around until the message is delivered
+ LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CONNECTION,
+ this, (DataChannel *) nullptr)));
+ return true;
+}
+
+// Blocks! - Don't call this from main thread!
+bool
+DataChannelConnection::Connect(const char *addr, unsigned short port)
+{
+ struct sockaddr_in addr4;
+ struct sockaddr_in6 addr6;
+
+ NS_WARNING_ASSERTION(!NS_IsMainThread(),
+ "Blocks, do not call from main thread!!!");
+
+ /* Acting as the connector */
+ LOG(("Connecting to %s, port %u", addr, port));
+ memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
+ memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
+#ifdef HAVE_SIN_LEN
+ addr4.sin_len = sizeof(struct sockaddr_in);
+#endif
+#ifdef HAVE_SIN6_LEN
+ addr6.sin6_len = sizeof(struct sockaddr_in6);
+#endif
+ addr4.sin_family = AF_INET;
+ addr6.sin6_family = AF_INET6;
+ addr4.sin_port = htons(port);
+ addr6.sin6_port = htons(port);
+ mState = CONNECTING;
+
+#if !defined(__Userspace_os_Windows)
+ if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
+ if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
+ LOG(("*** Failed userspace_connect"));
+ return false;
+ }
+ } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
+ if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
+ LOG(("*** Failed userspace_connect"));
+ return false;
+ }
+ } else {
+ LOG(("*** Illegal destination address."));
+ }
+#else
+ {
+ struct sockaddr_storage ss;
+ int sslen = sizeof(ss);
+
+ if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
+ addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
+ if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
+ LOG(("*** Failed userspace_connect"));
+ return false;
+ }
+ } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
+ addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
+ if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
+ LOG(("*** Failed userspace_connect"));
+ return false;
+ }
+ } else {
+ LOG(("*** Illegal destination address."));
+ }
+ }
+#endif
+
+ mSocket = mMasterSocket;
+
+ LOG(("connect() succeeded! Entering connected mode"));
+ mState = OPEN;
+
+ SetEvenOdd();
+
+ // Notify Connection open
+ // XXX We need to make sure connection sticks around until the message is delivered
+ LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CONNECTION,
+ this, (DataChannel *) nullptr)));
+ return true;
+}
+#endif
+
+DataChannel *
+DataChannelConnection::FindChannelByStream(uint16_t stream)
+{
+ return mStreams.SafeElementAt(stream);
+}
+
+uint16_t
+DataChannelConnection::FindFreeStream()
+{
+ uint32_t i, j, limit;
+
+ limit = mStreams.Length();
+ if (limit > MAX_NUM_STREAMS)
+ limit = MAX_NUM_STREAMS;
+
+ for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
+ if (!mStreams[i]) {
+ // Verify it's not still in the process of closing
+ for (j = 0; j < mStreamsResetting.Length(); ++j) {
+ if (mStreamsResetting[j] == i) {
+ break;
+ }
+ }
+ if (j == mStreamsResetting.Length())
+ break;
+ }
+ }
+ if (i >= limit) {
+ return INVALID_STREAM;
+ }
+ return i;
+}
+
+bool
+DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
+{
+ struct sctp_status status;
+ struct sctp_add_streams sas;
+ uint32_t outStreamsNeeded;
+ socklen_t len;
+
+ if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
+ aNeeded = MAX_NUM_STREAMS - mStreams.Length();
+ }
+ if (aNeeded <= 0) {
+ return false;
+ }
+
+ len = (socklen_t)sizeof(struct sctp_status);
+ if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
+ LOG(("***failed: getsockopt SCTP_STATUS"));
+ return false;
+ }
+ outStreamsNeeded = aNeeded; // number to add
+
+ // Note: if multiple channel opens happen when we don't have enough space,
+ // we'll call RequestMoreStreams() multiple times
+ memset(&sas, 0, sizeof(sas));
+ sas.sas_instrms = 0;
+ sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
+ // Doesn't block, we get an event when it succeeds or fails
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
+ (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
+ if (errno == EALREADY) {
+ LOG(("Already have %u output streams", outStreamsNeeded));
+ return true;
+ }
+
+ LOG(("***failed: setsockopt ADD errno=%d", errno));
+ return false;
+ }
+ LOG(("Requested %u more streams", outStreamsNeeded));
+ // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
+ // values are larger than mStreams.Length()
+ return true;
+}
+
+int32_t
+DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
+{
+ struct sctp_sndinfo sndinfo;
+
+ // Note: Main-thread IO, but doesn't block
+ memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
+ sndinfo.snd_sid = stream;
+ sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
+ if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
+ &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
+ SCTP_SENDV_SNDINFO, 0) < 0) {
+ //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
+ return (0);
+ }
+ return (1);
+}
+
+int32_t
+DataChannelConnection::SendOpenAckMessage(uint16_t stream)
+{
+ struct rtcweb_datachannel_ack ack;
+
+ memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
+ ack.msg_type = DATA_CHANNEL_ACK;
+
+ return SendControlMessage(&ack, sizeof(ack), stream);
+}
+
+int32_t
+DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
+ const nsACString& protocol,
+ uint16_t stream, bool unordered,
+ uint16_t prPolicy, uint32_t prValue)
+{
+ const int label_len = label.Length(); // not including nul
+ const int proto_len = protocol.Length(); // not including nul
+ // careful - request struct include one char for the label
+ const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
+ label_len + proto_len;
+ struct rtcweb_datachannel_open_request *req =
+ (struct rtcweb_datachannel_open_request*) moz_xmalloc(req_size);
+
+ memset(req, 0, req_size);
+ req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
+ switch (prPolicy) {
+ case SCTP_PR_SCTP_NONE:
+ req->channel_type = DATA_CHANNEL_RELIABLE;
+ break;
+ case SCTP_PR_SCTP_TTL:
+ req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
+ break;
+ case SCTP_PR_SCTP_RTX:
+ req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
+ break;
+ default:
+ // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno!
+ free(req);
+ return (0);
+ }
+ if (unordered) {
+ // Per the current types, all differ by 0x80 between ordered and unordered
+ req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
+ }
+
+ req->reliability_param = htonl(prValue);
+ req->priority = htons(0); /* XXX: add support */
+ req->label_length = htons(label_len);
+ req->protocol_length = htons(proto_len);
+ memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
+ memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
+
+ int32_t result = SendControlMessage(req, req_size, stream);
+
+ free(req);
+ return result;
+}
+
+// XXX This should use a separate thread (outbound queue) which should
+// select() to know when to *try* to send data to the socket again.
+// Alternatively, it can use a timeout, but that's guaranteed to be wrong
+// (just not sure in what direction). We could re-implement NSPR's
+// PR_POLL_WRITE/etc handling... with a lot of work.
+
+// Better yet, use the SCTP stack's notifications on buffer state to avoid
+// filling the SCTP's buffers.
+
+// returns if we're still blocked or not
+bool
+DataChannelConnection::SendDeferredMessages()
+{
+ uint32_t i;
+ RefPtr<DataChannel> channel; // we may null out the refs to this
+ bool still_blocked = false;
+
+ // This may block while something is modifying channels, but should not block for IO
+ MutexAutoLock lock(mLock);
+
+ // XXX For total fairness, on a still_blocked we'd start next time at the
+ // same index. Sorry, not going to bother for now.
+ for (i = 0; i < mStreams.Length(); ++i) {
+ channel = mStreams[i];
+ if (!channel)
+ continue;
+
+ // Only one of these should be set....
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
+ if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
+ channel->mStream,
+ channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
+ channel->mPrPolicy, channel->mPrValue)) {
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
+
+ channel->mState = OPEN;
+ channel->mReady = true;
+ LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
+ channel)));
+ } else {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ still_blocked = true;
+ } else {
+ // Close the channel, inform the user
+ mStreams[channel->mStream] = nullptr;
+ channel->mState = CLOSED;
+ // Don't need to reset; we didn't open it
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
+ channel)));
+ }
+ }
+ }
+ if (still_blocked)
+ break;
+
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
+ if (SendOpenAckMessage(channel->mStream)) {
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
+ } else {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ still_blocked = true;
+ } else {
+ // Close the channel, inform the user
+ CloseInt(channel);
+ // XXX send error via DataChannelOnMessageAvailable (bug 843625)
+ }
+ }
+ }
+ if (still_blocked)
+ break;
+
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
+ bool failed_send = false;
+ int32_t result;
+
+ if (channel->mState == CLOSED || channel->mState == CLOSING) {
+ channel->mBufferedData.Clear();
+ }
+
+ uint32_t buffered_amount = channel->GetBufferedAmountLocked();
+ uint32_t threshold = channel->GetBufferedAmountLowThreshold();
+ bool was_over_threshold = buffered_amount >= threshold;
+
+ while (!channel->mBufferedData.IsEmpty() &&
+ !failed_send) {
+ struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
+ const char *data = channel->mBufferedData[0]->mData;
+ size_t len = channel->mBufferedData[0]->mLength;
+
+ // SCTP will return EMSGSIZE if the message is bigger than the buffer
+ // size (or EAGAIN if there isn't space)
+ if ((result = usrsctp_sendv(mSocket, data, len,
+ nullptr, 0,
+ (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
+ SCTP_SENDV_SPA,
+ 0)) < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ // leave queued for resend
+ failed_send = true;
+ LOG(("queue full again when resending %d bytes (%d)", len, result));
+ } else {
+ LOG(("error %d re-sending string", errno));
+ failed_send = true;
+ }
+ } else {
+ LOG(("Resent buffer of %d bytes (%d)", len, result));
+ // In theory this could underflow if >4GB was buffered and re
+ // truncated in GetBufferedAmount(), but this won't cause any problems.
+ buffered_amount -= channel->mBufferedData[0]->mLength;
+ channel->mBufferedData.RemoveElementAt(0);
+ // can never fire with default threshold of 0
+ if (was_over_threshold && buffered_amount < threshold) {
+ LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
+ channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
+ this, channel)));
+ was_over_threshold = false;
+ }
+ if (buffered_amount == 0) {
+ // buffered-to-not-buffered transition; tell the DOM code in case this makes it
+ // available for GC
+ LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
+ channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
+ this, channel)));
+ }
+ }
+ }
+ if (channel->mBufferedData.IsEmpty())
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
+ else
+ still_blocked = true;
+ }
+ if (still_blocked)
+ break;
+ }
+
+ return still_blocked;
+}
+
+void
+DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
+ size_t length,
+ uint16_t stream)
+{
+ RefPtr<DataChannel> channel;
+ uint32_t prValue;
+ uint16_t prPolicy;
+ uint32_t flags;
+
+ mLock.AssertCurrentThreadOwns();
+
+ if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
+ LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length,
+ (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
+ if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
+ return;
+ }
+
+ LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req)));
+
+ switch (req->channel_type) {
+ case DATA_CHANNEL_RELIABLE:
+ case DATA_CHANNEL_RELIABLE_UNORDERED:
+ prPolicy = SCTP_PR_SCTP_NONE;
+ break;
+ case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
+ case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
+ prPolicy = SCTP_PR_SCTP_RTX;
+ break;
+ case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
+ case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
+ prPolicy = SCTP_PR_SCTP_TTL;
+ break;
+ default:
+ LOG(("Unknown channel type", req->channel_type));
+ /* XXX error handling */
+ return;
+ }
+ prValue = ntohl(req->reliability_param);
+ flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
+
+ if ((channel = FindChannelByStream(stream))) {
+ if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
+ LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
+ stream, channel->mState));
+ /* XXX: some error handling */
+ } else {
+ LOG(("Open for externally negotiated channel %u", stream));
+ // XXX should also check protocol, maybe label
+ if (prPolicy != channel->mPrPolicy ||
+ prValue != channel->mPrValue ||
+ flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
+ {
+ LOG(("WARNING: external negotiation mismatch with OpenRequest:"
+ "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
+ stream, prPolicy, channel->mPrPolicy,
+ prValue, channel->mPrValue, flags, channel->mFlags));
+ }
+ }
+ return;
+ }
+ if (stream >= mStreams.Length()) {
+ LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length()));
+ return;
+ }
+
+ nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
+ nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
+ ntohs(req->protocol_length)));
+
+ channel = new DataChannel(this,
+ stream,
+ DataChannel::CONNECTING,
+ label,
+ protocol,
+ prPolicy, prValue,
+ flags,
+ nullptr, nullptr);
+ mStreams[stream] = channel;
+
+ channel->mState = DataChannel::WAITING_TO_OPEN;
+
+ LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
+ channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
+ this, channel)));
+
+ LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
+
+ if (!SendOpenAckMessage(stream)) {
+ // XXX Only on EAGAIN!? And if not, then close the channel??
+ channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
+ // Note: we're locked, so there's no danger of a race with the
+ // buffer-threshold callback
+ }
+
+ // Now process any queued data messages for the channel (which will
+ // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
+ // more that come in before that happens)
+ DeliverQueuedData(stream);
+}
+
+// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
+// That would make this code moot. Keep it for now for backwards compatibility.
+void
+DataChannelConnection::DeliverQueuedData(uint16_t stream)
+{
+ mLock.AssertCurrentThreadOwns();
+
+ uint32_t i = 0;
+ while (i < mQueuedData.Length()) {
+ // Careful! we may modify the array length from within the loop!
+ if (mQueuedData[i]->mStream == stream) {
+ LOG(("Delivering queued data for stream %u, length %u",
+ stream, (unsigned int) mQueuedData[i]->mLength));
+ // Deliver the queued data
+ HandleDataMessage(mQueuedData[i]->mPpid,
+ mQueuedData[i]->mData, mQueuedData[i]->mLength,
+ mQueuedData[i]->mStream);
+ mQueuedData.RemoveElementAt(i);
+ continue; // don't bump index since we removed the element
+ }
+ i++;
+ }
+}
+
+void
+DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
+ size_t length, uint16_t stream)
+{
+ DataChannel *channel;
+
+ mLock.AssertCurrentThreadOwns();
+
+ channel = FindChannelByStream(stream);
+ NS_ENSURE_TRUE_VOID(channel);
+
+ LOG(("OpenAck received for stream %u, waiting=%d", stream,
+ (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
+
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
+}
+
+void
+DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
+{
+ /* XXX: Send an error message? */
+ LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream));
+ // XXX Log to JS error console if possible
+}
+
+void
+DataChannelConnection::HandleDataMessage(uint32_t ppid,
+ const void *data, size_t length,
+ uint16_t stream)
+{
+ DataChannel *channel;
+ const char *buffer = (const char *) data;
+
+ mLock.AssertCurrentThreadOwns();
+
+ channel = FindChannelByStream(stream);
+
+ // XXX A closed channel may trip this... check
+ // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
+ // That would make this code moot. Keep it for now for backwards compatibility.
+ if (!channel) {
+ // In the updated 0-RTT open case, the sender can send data immediately
+ // after Open, and doesn't set the in-order bit (since we don't have a
+ // response or ack). Also, with external negotiation, data can come in
+ // before we're told about the external negotiation. We need to buffer
+ // data until either a) Open comes in, if the ordering get messed up,
+ // or b) the app tells us this channel was externally negotiated. When
+ // these occur, we deliver the data.
+
+ // Since this is rare and non-performance, keep a single list of queued
+ // data messages to deliver once the channel opens.
+ LOG(("Queuing data for stream %u, length %u", stream, length));
+ // Copies data
+ mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
+ return;
+ }
+
+ // XXX should this be a simple if, no warnings/debugbreaks?
+ NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
+
+ {
+ nsAutoCString recvData(buffer, length); // copies (<64) or allocates
+ bool is_binary = true;
+
+ if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
+ ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
+ is_binary = false;
+ }
+ if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
+ NS_WARNING("DataChannel message aborted by fragment type change!");
+ channel->mRecvBuffer.Truncate(0);
+ }
+ channel->mIsRecvBinary = is_binary;
+
+ switch (ppid) {
+ case DATA_CHANNEL_PPID_DOMSTRING:
+ case DATA_CHANNEL_PPID_BINARY:
+ channel->mRecvBuffer += recvData;
+ LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u",
+ is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
+ channel->mStream));
+ return; // Not ready to notify application
+
+ case DATA_CHANNEL_PPID_DOMSTRING_LAST:
+ LOG(("DataChannel: String message received of length %lu on channel %u",
+ length, channel->mStream));
+ if (!channel->mRecvBuffer.IsEmpty()) {
+ channel->mRecvBuffer += recvData;
+ LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
+ channel->SendOrQueue(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_DATA, this,
+ channel, channel->mRecvBuffer, -1));
+ channel->mRecvBuffer.Truncate(0);
+ return;
+ }
+ // else send using recvData normally
+ length = -1; // Flag for DOMString
+
+ // WebSockets checks IsUTF8() here; we can try to deliver it
+ break;
+
+ case DATA_CHANNEL_PPID_BINARY_LAST:
+ LOG(("DataChannel: Received binary message of length %lu on channel id %u",
+ length, channel->mStream));
+ if (!channel->mRecvBuffer.IsEmpty()) {
+ channel->mRecvBuffer += recvData;
+ LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
+ channel->SendOrQueue(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_DATA, this,
+ channel, channel->mRecvBuffer,
+ channel->mRecvBuffer.Length()));
+ channel->mRecvBuffer.Truncate(0);
+ return;
+ }
+ // else send using recvData normally
+ break;
+
+ default:
+ NS_ERROR("Unknown data PPID");
+ return;
+ }
+ /* Notify onmessage */
+ LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
+ channel->SendOrQueue(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_DATA, this,
+ channel, recvData, length));
+ }
+}
+
+// Called with mLock locked!
+void
+DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
+{
+ const struct rtcweb_datachannel_open_request *req;
+ const struct rtcweb_datachannel_ack *ack;
+
+ mLock.AssertCurrentThreadOwns();
+
+ switch (ppid) {
+ case DATA_CHANNEL_PPID_CONTROL:
+ req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
+
+ NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
+ switch (req->msg_type) {
+ case DATA_CHANNEL_OPEN_REQUEST:
+ // structure includes a possibly-unused char label[1] (in a packed structure)
+ NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
+
+ HandleOpenRequestMessage(req, length, stream);
+ break;
+ case DATA_CHANNEL_ACK:
+ // >= sizeof(*ack) checked above
+
+ ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
+ HandleOpenAckMessage(ack, length, stream);
+ break;
+ default:
+ HandleUnknownMessage(ppid, length, stream);
+ break;
+ }
+ break;
+ case DATA_CHANNEL_PPID_DOMSTRING:
+ case DATA_CHANNEL_PPID_DOMSTRING_LAST:
+ case DATA_CHANNEL_PPID_BINARY:
+ case DATA_CHANNEL_PPID_BINARY_LAST:
+ HandleDataMessage(ppid, buffer, length, stream);
+ break;
+ default:
+ LOG(("Message of length %lu, PPID %u on stream %u received.",
+ length, ppid, stream));
+ break;
+ }
+}
+
+void
+DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
+{
+ uint32_t i, n;
+
+ switch (sac->sac_state) {
+ case SCTP_COMM_UP:
+ LOG(("Association change: SCTP_COMM_UP"));
+ if (mState == CONNECTING) {
+ mSocket = mMasterSocket;
+ mState = OPEN;
+
+ SetEvenOdd();
+
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CONNECTION,
+ this)));
+ LOG(("DTLS connect() succeeded! Entering connected mode"));
+
+ // Open any streams pending...
+ ProcessQueuedOpens();
+
+ } else if (mState == OPEN) {
+ LOG(("DataConnection Already OPEN"));
+ } else {
+ LOG(("Unexpected state: %d", mState));
+ }
+ break;
+ case SCTP_COMM_LOST:
+ LOG(("Association change: SCTP_COMM_LOST"));
+ // This association is toast, so also close all the channels -- from mainthread!
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_DISCONNECTED,
+ this)));
+ break;
+ case SCTP_RESTART:
+ LOG(("Association change: SCTP_RESTART"));
+ break;
+ case SCTP_SHUTDOWN_COMP:
+ LOG(("Association change: SCTP_SHUTDOWN_COMP"));
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_DISCONNECTED,
+ this)));
+ break;
+ case SCTP_CANT_STR_ASSOC:
+ LOG(("Association change: SCTP_CANT_STR_ASSOC"));
+ break;
+ default:
+ LOG(("Association change: UNKNOWN"));
+ break;
+ }
+ LOG(("Association change: streams (in/out) = (%u/%u)",
+ sac->sac_inbound_streams, sac->sac_outbound_streams));
+
+ NS_ENSURE_TRUE_VOID(sac);
+ n = sac->sac_length - sizeof(*sac);
+ if (((sac->sac_state == SCTP_COMM_UP) ||
+ (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
+ for (i = 0; i < n; ++i) {
+ switch (sac->sac_info[i]) {
+ case SCTP_ASSOC_SUPPORTS_PR:
+ LOG(("Supports: PR"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_AUTH:
+ LOG(("Supports: AUTH"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_ASCONF:
+ LOG(("Supports: ASCONF"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_MULTIBUF:
+ LOG(("Supports: MULTIBUF"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
+ LOG(("Supports: RE-CONFIG"));
+ break;
+ default:
+ LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
+ break;
+ }
+ }
+ } else if (((sac->sac_state == SCTP_COMM_LOST) ||
+ (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
+ LOG(("Association: ABORT ="));
+ for (i = 0; i < n; ++i) {
+ LOG((" 0x%02x", sac->sac_info[i]));
+ }
+ }
+ if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
+ (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
+ (sac->sac_state == SCTP_COMM_LOST)) {
+ return;
+ }
+}
+
+void
+DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
+{
+ const char *addr = "";
+#if !defined(__Userspace_os_Windows)
+ char addr_buf[INET6_ADDRSTRLEN];
+ struct sockaddr_in *sin;
+ struct sockaddr_in6 *sin6;
+#endif
+
+ switch (spc->spc_aaddr.ss_family) {
+ case AF_INET:
+#if !defined(__Userspace_os_Windows)
+ sin = (struct sockaddr_in *)&spc->spc_aaddr;
+ addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
+#endif
+ break;
+ case AF_INET6:
+#if !defined(__Userspace_os_Windows)
+ sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
+ addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
+#endif
+ break;
+ case AF_CONN:
+ addr = "DTLS connection";
+ break;
+ default:
+ break;
+ }
+ LOG(("Peer address %s is now ", addr));
+ switch (spc->spc_state) {
+ case SCTP_ADDR_AVAILABLE:
+ LOG(("SCTP_ADDR_AVAILABLE"));
+ break;
+ case SCTP_ADDR_UNREACHABLE:
+ LOG(("SCTP_ADDR_UNREACHABLE"));
+ break;
+ case SCTP_ADDR_REMOVED:
+ LOG(("SCTP_ADDR_REMOVED"));
+ break;
+ case SCTP_ADDR_ADDED:
+ LOG(("SCTP_ADDR_ADDED"));
+ break;
+ case SCTP_ADDR_MADE_PRIM:
+ LOG(("SCTP_ADDR_MADE_PRIM"));
+ break;
+ case SCTP_ADDR_CONFIRMED:
+ LOG(("SCTP_ADDR_CONFIRMED"));
+ break;
+ default:
+ LOG(("UNKNOWN"));
+ break;
+ }
+ LOG((" (error = 0x%08x).\n", spc->spc_error));
+}
+
+void
+DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
+{
+ size_t i, n;
+
+ n = sre->sre_length - sizeof(struct sctp_remote_error);
+ LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
+ for (i = 0; i < n; ++i) {
+ LOG((" 0x%02x", sre-> sre_data[i]));
+ }
+}
+
+void
+DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
+{
+ LOG(("Shutdown event."));
+ /* XXX: notify all channels. */
+ // Attempts to actually send anything will fail
+}
+
+void
+DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
+{
+ LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
+}
+
+void
+DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
+{
+ size_t i, n;
+
+ if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
+ LOG(("Unsent "));
+ }
+ if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
+ LOG(("Sent "));
+ }
+ if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
+ LOG(("(flags = %x) ", ssfe->ssfe_flags));
+ }
+ LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
+ ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
+ ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
+ n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
+ for (i = 0; i < n; ++i) {
+ LOG((" 0x%02x", ssfe->ssfe_data[i]));
+ }
+}
+
+void
+DataChannelConnection::ClearResets()
+{
+ // Clear all pending resets
+ if (!mStreamsResetting.IsEmpty()) {
+ LOG(("Clearing resets for %d streams", mStreamsResetting.Length()));
+ }
+
+ for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
+ RefPtr<DataChannel> channel;
+ channel = FindChannelByStream(mStreamsResetting[i]);
+ if (channel) {
+ LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
+ mStreams[channel->mStream] = nullptr;
+ }
+ }
+ mStreamsResetting.Clear();
+}
+
+void
+DataChannelConnection::ResetOutgoingStream(uint16_t stream)
+{
+ uint32_t i;
+
+ mLock.AssertCurrentThreadOwns();
+ LOG(("Connection %p: Resetting outgoing stream %u",
+ (void *) this, stream));
+ // Rarely has more than a couple items and only for a short time
+ for (i = 0; i < mStreamsResetting.Length(); ++i) {
+ if (mStreamsResetting[i] == stream) {
+ return;
+ }
+ }
+ mStreamsResetting.AppendElement(stream);
+}
+
+void
+DataChannelConnection::SendOutgoingStreamReset()
+{
+ struct sctp_reset_streams *srs;
+ uint32_t i;
+ size_t len;
+
+ LOG(("Connection %p: Sending outgoing stream reset for %d streams",
+ (void *) this, mStreamsResetting.Length()));
+ mLock.AssertCurrentThreadOwns();
+ if (mStreamsResetting.IsEmpty()) {
+ LOG(("No streams to reset"));
+ return;
+ }
+ len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
+ srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
+ memset(srs, 0, len);
+ srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
+ srs->srs_number_streams = mStreamsResetting.Length();
+ for (i = 0; i < mStreamsResetting.Length(); ++i) {
+ srs->srs_stream_list[i] = mStreamsResetting[i];
+ }
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
+ LOG(("***failed: setsockopt RESET, errno %d", errno));
+ // if errno == EALREADY, this is normal - we can't send another reset
+ // with one pending.
+ // When we get an incoming reset (which may be a response to our
+ // outstanding one), see if we have any pending outgoing resets and
+ // send them
+ } else {
+ mStreamsResetting.Clear();
+ }
+ free(srs);
+}
+
+void
+DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
+{
+ uint32_t n, i;
+ RefPtr<DataChannel> channel; // since we may null out the ref to the channel
+
+ if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
+ !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
+ n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
+ for (i = 0; i < n; ++i) {
+ if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
+ channel = FindChannelByStream(strrst->strreset_stream_list[i]);
+ if (channel) {
+ // The other side closed the channel
+ // We could be in three states:
+ // 1. Normal state (input and output streams (OPEN)
+ // Notify application, send a RESET in response on our
+ // outbound channel. Go to CLOSED
+ // 2. We sent our own reset (CLOSING); either they crossed on the
+ // wire, or this is a response to our Reset.
+ // Go to CLOSED
+ // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
+ // I believe this is impossible, as we don't have an input stream yet.
+
+ LOG(("Incoming: Channel %u closed, state %d",
+ channel->mStream, channel->mState));
+ ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
+ channel->mState == DataChannel::CLOSING ||
+ channel->mState == DataChannel::CONNECTING ||
+ channel->mState == DataChannel::WAITING_TO_OPEN);
+ if (channel->mState == DataChannel::OPEN ||
+ channel->mState == DataChannel::WAITING_TO_OPEN) {
+ // Mark the stream for reset (the reset is sent below)
+ ResetOutgoingStream(channel->mStream);
+ }
+ mStreams[channel->mStream] = nullptr;
+
+ LOG(("Disconnected DataChannel %p from connection %p",
+ (void *) channel.get(), (void *) channel->mConnection.get()));
+ // This sends ON_CHANNEL_CLOSED to mainthread
+ channel->StreamClosedLocked();
+ } else {
+ LOG(("Can't find incoming channel %d",i));
+ }
+ }
+ }
+ }
+
+ // Process any pending resets now:
+ if (!mStreamsResetting.IsEmpty()) {
+ LOG(("Sending %d pending resets", mStreamsResetting.Length()));
+ SendOutgoingStreamReset();
+ }
+}
+
+void
+DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
+{
+ uint16_t stream;
+ RefPtr<DataChannel> channel;
+
+ if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
+ LOG(("*** Failed increasing number of streams from %u (%u/%u)",
+ mStreams.Length(),
+ strchg->strchange_instrms,
+ strchg->strchange_outstrms));
+ // XXX FIX! notify pending opens of failure
+ return;
+ } else {
+ if (strchg->strchange_instrms > mStreams.Length()) {
+ LOG(("Other side increased streams from %u to %u",
+ mStreams.Length(), strchg->strchange_instrms));
+ }
+ if (strchg->strchange_outstrms > mStreams.Length() ||
+ strchg->strchange_instrms > mStreams.Length()) {
+ uint16_t old_len = mStreams.Length();
+ uint16_t new_len = std::max(strchg->strchange_outstrms,
+ strchg->strchange_instrms);
+ LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
+ old_len, new_len, new_len - old_len,
+ strchg->strchange_instrms));
+ // make sure both are the same length
+ mStreams.AppendElements(new_len - old_len);
+ LOG(("New length = %d (was %d)", mStreams.Length(), old_len));
+ for (size_t i = old_len; i < mStreams.Length(); ++i) {
+ mStreams[i] = nullptr;
+ }
+ // Re-process any channels waiting for streams.
+ // Linear search, but we don't increase channels often and
+ // the array would only get long in case of an app error normally
+
+ // Make sure we request enough streams if there's a big jump in streams
+ // Could make a more complex API for OpenXxxFinish() and avoid this loop
+ size_t num_needed = mPending.GetSize();
+ LOG(("%d of %d new streams already needed", num_needed,
+ new_len - old_len));
+ num_needed -= (new_len - old_len); // number we added
+ if (num_needed > 0) {
+ if (num_needed < 16)
+ num_needed = 16;
+ LOG(("Not enough new streams, asking for %d more", num_needed));
+ RequestMoreStreams(num_needed);
+ } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
+ LOG(("Requesting %d output streams to match partner",
+ strchg->strchange_instrms - strchg->strchange_outstrms));
+ RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
+ }
+
+ ProcessQueuedOpens();
+ }
+ // else probably not a change in # of streams
+ }
+
+ for (uint32_t i = 0; i < mStreams.Length(); ++i) {
+ channel = mStreams[i];
+ if (!channel)
+ continue;
+
+ if ((channel->mState == CONNECTING) &&
+ (channel->mStream == INVALID_STREAM)) {
+ if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
+ (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
+ /* XXX: Signal to the other end. */
+ channel->mState = CLOSED;
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
+ channel)));
+ // maybe fire onError (bug 843625)
+ } else {
+ stream = FindFreeStream();
+ if (stream != INVALID_STREAM) {
+ channel->mStream = stream;
+ mStreams[stream] = channel;
+ channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
+ // Note: we're locked, so there's no danger of a race with the
+ // buffer-threshold callback
+ } else {
+ /* We will not find more ... */
+ break;
+ }
+ }
+ }
+ }
+}
+
+// Called with mLock locked!
+void
+DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
+{
+ mLock.AssertCurrentThreadOwns();
+ if (notif->sn_header.sn_length != (uint32_t)n) {
+ return;
+ }
+ switch (notif->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE:
+ HandleAssociationChangeEvent(&(notif->sn_assoc_change));
+ break;
+ case SCTP_PEER_ADDR_CHANGE:
+ HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
+ break;
+ case SCTP_REMOTE_ERROR:
+ HandleRemoteErrorEvent(&(notif->sn_remote_error));
+ break;
+ case SCTP_SHUTDOWN_EVENT:
+ HandleShutdownEvent(&(notif->sn_shutdown_event));
+ break;
+ case SCTP_ADAPTATION_INDICATION:
+ HandleAdaptationIndication(&(notif->sn_adaptation_event));
+ break;
+ case SCTP_PARTIAL_DELIVERY_EVENT:
+ LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
+ break;
+ case SCTP_AUTHENTICATION_EVENT:
+ LOG(("SCTP_AUTHENTICATION_EVENT"));
+ break;
+ case SCTP_SENDER_DRY_EVENT:
+ //LOG(("SCTP_SENDER_DRY_EVENT"));
+ break;
+ case SCTP_NOTIFICATIONS_STOPPED_EVENT:
+ LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
+ break;
+ case SCTP_SEND_FAILED_EVENT:
+ HandleSendFailedEvent(&(notif->sn_send_failed_event));
+ break;
+ case SCTP_STREAM_RESET_EVENT:
+ HandleStreamResetEvent(&(notif->sn_strreset_event));
+ break;
+ case SCTP_ASSOC_RESET_EVENT:
+ LOG(("SCTP_ASSOC_RESET_EVENT"));
+ break;
+ case SCTP_STREAM_CHANGE_EVENT:
+ HandleStreamChangeEvent(&(notif->sn_strchange_event));
+ break;
+ default:
+ LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
+ break;
+ }
+ }
+
+int
+DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
+ struct sctp_rcvinfo rcv, int32_t flags)
+{
+ ASSERT_WEBRTC(!NS_IsMainThread());
+
+ if (!data) {
+ usrsctp_close(sock); // SCTP has finished shutting down
+ } else {
+ MutexAutoLock lock(mLock);
+ if (flags & MSG_NOTIFICATION) {
+ HandleNotification(static_cast<union sctp_notification *>(data), datalen);
+ } else {
+ HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
+ }
+ }
+ // sctp allocates 'data' with malloc(), and expects the receiver to free
+ // it (presumably with free).
+ // XXX future optimization: try to deliver messages without an internal
+ // alloc/copy, and if so delay the free until later.
+ free(data);
+ // usrsctp defines the callback as returning an int, but doesn't use it
+ return 1;
+}
+
+already_AddRefed<DataChannel>
+DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
+ Type type, bool inOrder,
+ uint32_t prValue, DataChannelListener *aListener,
+ nsISupports *aContext, bool aExternalNegotiated,
+ uint16_t aStream)
+{
+ // aStream == INVALID_STREAM to have the protocol allocate
+ uint16_t prPolicy = SCTP_PR_SCTP_NONE;
+ uint32_t flags;
+
+ LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
+ PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
+ type, inOrder, prValue, aListener, aContext,
+ aExternalNegotiated ? "true" : "false", aStream));
+ switch (type) {
+ case DATA_CHANNEL_RELIABLE:
+ prPolicy = SCTP_PR_SCTP_NONE;
+ break;
+ case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
+ prPolicy = SCTP_PR_SCTP_RTX;
+ break;
+ case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
+ prPolicy = SCTP_PR_SCTP_TTL;
+ break;
+ }
+ if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
+ return nullptr;
+ }
+
+ // Don't look past currently-negotiated streams
+ if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
+ LOG(("ERROR: external negotiation of already-open channel %u", aStream));
+ // XXX How do we indicate this up to the application? Probably the
+ // caller's job, but we may need to return an error code.
+ return nullptr;
+ }
+
+ flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
+ RefPtr<DataChannel> channel(new DataChannel(this,
+ aStream,
+ DataChannel::CONNECTING,
+ label, protocol,
+ type, prValue,
+ flags,
+ aListener, aContext));
+ if (aExternalNegotiated) {
+ channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
+ }
+
+ MutexAutoLock lock(mLock); // OpenFinish assumes this
+ return OpenFinish(channel.forget());
+}
+
+// Separate routine so we can also call it to finish up from pending opens
+already_AddRefed<DataChannel>
+DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
+{
+ RefPtr<DataChannel> channel(aChannel); // takes the reference passed in
+ // Normally 1 reference if called from ::Open(), or 2 if called from
+ // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
+ uint16_t stream = channel->mStream;
+ bool queue = false;
+
+ mLock.AssertCurrentThreadOwns();
+
+ // Cases we care about:
+ // Pre-negotiated:
+ // Not Open:
+ // Doesn't fit:
+ // -> change initial ask or renegotiate after open
+ // -> queue open
+ // Open:
+ // Doesn't fit:
+ // -> RequestMoreStreams && queue
+ // Does fit:
+ // -> open
+ // Not negotiated:
+ // Not Open:
+ // -> queue open
+ // Open:
+ // -> Try to get a stream
+ // Doesn't fit:
+ // -> RequestMoreStreams && queue
+ // Does fit:
+ // -> open
+ // So the Open cases are basically the same
+ // Not Open cases are simply queue for non-negotiated, and
+ // either change the initial ask or possibly renegotiate after open.
+
+ if (mState == OPEN) {
+ if (stream == INVALID_STREAM) {
+ stream = FindFreeStream(); // may be INVALID_STREAM if we need more
+ }
+ if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
+ // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
+ // to avoid going back immediately for more if the ask to N, N+1, etc
+ int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
+ (stream-((int32_t)mStreams.Length())) + 16;
+ if (!RequestMoreStreams(more_needed)) {
+ // Something bad happened... we're done
+ goto request_error_cleanup;
+ }
+ queue = true;
+ }
+ } else {
+ // not OPEN
+ if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
+ mState == CLOSED) {
+ // Update number of streams for init message
+ struct sctp_initmsg initmsg;
+ socklen_t len = sizeof(initmsg);
+ int32_t total_needed = stream+16;
+
+ memset(&initmsg, 0, sizeof(initmsg));
+ if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
+ LOG(("*** failed getsockopt SCTP_INITMSG"));
+ goto request_error_cleanup;
+ }
+ LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
+ initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
+ initmsg.sinit_num_ostreams = total_needed;
+ initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
+ (socklen_t)sizeof(initmsg)) < 0) {
+ LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
+ goto request_error_cleanup;
+ }
+
+ int32_t old_len = mStreams.Length();
+ mStreams.AppendElements(total_needed - old_len);
+ for (int32_t i = old_len; i < total_needed; ++i) {
+ mStreams[i] = nullptr;
+ }
+ }
+ // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
+ // is called, if needed
+ queue = true;
+ }
+ if (queue) {
+ LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
+ // Also serves to mark we told the app
+ channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
+ // we need a ref for the nsDeQue and one to return
+ DataChannel* rawChannel = channel;
+ rawChannel->AddRef();
+ mPending.Push(rawChannel);
+ return channel.forget();
+ }
+
+ MOZ_ASSERT(stream != INVALID_STREAM);
+ // just allocated (& OPEN), or externally negotiated
+ mStreams[stream] = channel; // holds a reference
+ channel->mStream = stream;
+
+#ifdef TEST_QUEUED_DATA
+ // It's painful to write a test for this...
+ channel->mState = OPEN;
+ channel->mReady = true;
+ SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
+#endif
+
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
+ // Don't send unordered until this gets cleared
+ channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
+ }
+
+ if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
+ if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
+ stream,
+ !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
+ channel->mPrPolicy, channel->mPrValue)) {
+ LOG(("SendOpenRequest failed, errno = %d", errno));
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
+ // Note: we're locked, so there's no danger of a race with the
+ // buffer-threshold callback
+ return channel.forget();
+ } else {
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
+ // We already returned the channel to the app.
+ NS_ERROR("Failed to send open request");
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
+ channel)));
+ }
+ // If we haven't returned the channel yet, it will get destroyed when we exit
+ // this function.
+ mStreams[stream] = nullptr;
+ channel->mStream = INVALID_STREAM;
+ // we'll be destroying the channel
+ channel->mState = CLOSED;
+ return nullptr;
+ }
+ /* NOTREACHED */
+ }
+ }
+ // Either externally negotiated or we sent Open
+ channel->mState = OPEN;
+ channel->mReady = true;
+ // FIX? Move into DOMDataChannel? I don't think we can send it yet here
+ LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
+ channel)));
+
+ return channel.forget();
+
+request_error_cleanup:
+ channel->mState = CLOSED;
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
+ // We already returned the channel to the app.
+ NS_ERROR("Failed to request more streams");
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
+ channel)));
+ return channel.forget();
+ }
+ // we'll be destroying the channel, but it never really got set up
+ // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
+ // Dispatch it to ourselves
+ return nullptr;
+}
+
+int32_t
+DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
+ size_t length, uint32_t ppid)
+{
+ uint16_t flags;
+ struct sctp_sendv_spa spa;
+ int32_t result;
+
+ NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
+ NS_WARNING_ASSERTION(length > 0, "Length is 0?!");
+
+ // To avoid problems where an in-order OPEN is lost and an
+ // out-of-order data message "beats" it, require data to be in-order
+ // until we get an ACK.
+ if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
+ !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
+ flags = SCTP_UNORDERED;
+ } else {
+ flags = 0;
+ }
+
+ spa.sendv_sndinfo.snd_ppid = htonl(ppid);
+ spa.sendv_sndinfo.snd_sid = channel->mStream;
+ spa.sendv_sndinfo.snd_flags = flags;
+ spa.sendv_sndinfo.snd_context = 0;
+ spa.sendv_sndinfo.snd_assoc_id = 0;
+ spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
+
+ if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
+ spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
+ spa.sendv_prinfo.pr_value = channel->mPrValue;
+ spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
+ }
+
+ // Note: Main-thread IO, but doesn't block!
+ // XXX FIX! to deal with heavy overruns of JS trying to pass data in
+ // (more than the buffersize) queue data onto another thread to do the
+ // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
+
+ // SCTP will return EMSGSIZE if the message is bigger than the buffer
+ // size (or EAGAIN if there isn't space)
+
+ // Avoid a race between buffer-full-failure (where we have to add the
+ // packet to the buffered-data queue) and the buffer-now-only-half-full
+ // callback, which happens on a different thread. Otherwise we might
+ // fail here, then before we add it to the queue get the half-full
+ // callback, find nothing to do, then on this thread add it to the
+ // queue - which would sit there. Also, if we later send more data, it
+ // would arrive ahead of the buffered message, but if the buffer ever
+ // got to 1/2 full, the message would get sent - but at a semi-random
+ // time, after other data it was supposed to be in front of.
+
+ // Must lock before empty check for similar reasons!
+ MutexAutoLock lock(mLock);
+ if (channel->mBufferedData.IsEmpty()) {
+ result = usrsctp_sendv(mSocket, data, length,
+ nullptr, 0,
+ (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
+ SCTP_SENDV_SPA, 0);
+ LOG(("Sent buffer (len=%u), result=%d", length, result));
+ } else {
+ // Fake EAGAIN if we're already buffering data
+ result = -1;
+ errno = EAGAIN;
+ }
+ if (result < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+
+ // queue data for resend! And queue any further data for the stream until it is...
+ auto *buffered = new BufferedMsg(spa, data, length); // infallible malloc
+ channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
+ channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
+ LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length));
+ return 0;
+ }
+ LOG(("error %d sending string", errno));
+ }
+ return result;
+}
+
+// Handles fragmenting binary messages
+int32_t
+DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
+ size_t len,
+ uint32_t ppid_partial, uint32_t ppid_final)
+{
+ // Since there's a limit on network buffer size and no limits on message
+ // size, and we don't want to use EOR mode (multiple writes for a
+ // message, but all other streams are blocked until you finish sending
+ // this message), we need to add application-level fragmentation of large
+ // messages. On a reliable channel, these can be simply rebuilt into a
+ // large message. On an unreliable channel, we can't and don't know how
+ // long to wait, and there are no retransmissions, and no easy way to
+ // tell the user "this part is missing", so on unreliable channels we
+ // need to return an error if sending more bytes than the network buffers
+ // can hold, and perhaps a lower number.
+
+ // We *really* don't want to do this from main thread! - and SendMsgInternal
+ // avoids blocking.
+ // This MUST be reliable and in-order for the reassembly to work
+ if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
+ channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
+ !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
+ int32_t sent=0;
+ uint32_t origlen = len;
+ LOG(("Sending binary message length %u in chunks", len));
+ // XXX check flags for out-of-order, or force in-order for large binary messages
+ while (len > 0) {
+ size_t sendlen = std::min<size_t>(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
+ uint32_t ppid;
+ len -= sendlen;
+ ppid = len > 0 ? ppid_partial : ppid_final;
+ LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
+ // Note that these might end up being deferred and queued.
+ sent += SendMsgInternal(channel, data, sendlen, ppid);
+ data += sendlen;
+ }
+ LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued",
+ (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
+ origlen, sent,
+ channel->mBufferedData.Length()));
+ return sent;
+ }
+ NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
+ "Sending too-large data on unreliable channel!");
+
+ // This will fail if the message is too large (default 256K)
+ return SendMsgInternal(channel, data, len, ppid_final);
+}
+
+class ReadBlobRunnable : public Runnable {
+public:
+ ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream,
+ nsIInputStream* aBlob) :
+ mConnection(aConnection),
+ mStream(aStream),
+ mBlob(aBlob)
+ {}
+
+ NS_IMETHOD Run() override {
+ // ReadBlob() is responsible to releasing the reference
+ DataChannelConnection *self = mConnection;
+ self->ReadBlob(mConnection.forget(), mStream, mBlob);
+ return NS_OK;
+ }
+
+private:
+ // Make sure the Connection doesn't die while there are jobs outstanding.
+ // Let it die (if released by PeerConnectionImpl while we're running)
+ // when we send our runnable back to MainThread. Then ~DataChannelConnection
+ // can send the IOThread to MainThread to die in a runnable, avoiding
+ // unsafe event loop recursion. Evil.
+ RefPtr<DataChannelConnection> mConnection;
+ uint16_t mStream;
+ // Use RefCount for preventing the object is deleted when SendBlob returns.
+ RefPtr<nsIInputStream> mBlob;
+};
+
+int32_t
+DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
+{
+ DataChannel *channel = mStreams[stream];
+ NS_ENSURE_TRUE(channel, 0);
+ // Spawn a thread to send the data
+ if (!mInternalIOThread) {
+ nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread));
+ if (NS_FAILED(res)) {
+ return -1;
+ }
+ }
+
+ mInternalIOThread->Dispatch(do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
+ return 0;
+}
+
+class DataChannelBlobSendRunnable : public Runnable
+{
+public:
+ DataChannelBlobSendRunnable(already_AddRefed<DataChannelConnection>& aConnection,
+ uint16_t aStream)
+ : mConnection(aConnection)
+ , mStream(aStream) {}
+
+ ~DataChannelBlobSendRunnable() override
+ {
+ if (!NS_IsMainThread() && mConnection) {
+ MOZ_ASSERT(false);
+ // explicitly leak the connection if destroyed off mainthread
+ Unused << mConnection.forget().take();
+ }
+ }
+
+ NS_IMETHOD Run() override
+ {
+ ASSERT_WEBRTC(NS_IsMainThread());
+
+ mConnection->SendBinaryMsg(mStream, mData);
+ mConnection = nullptr;
+ return NS_OK;
+ }
+
+ // explicitly public so we can avoid allocating twice and copying
+ nsCString mData;
+
+private:
+ // Note: we can be destroyed off the target thread, so be careful not to let this
+ // get Released()ed on the temp thread!
+ RefPtr<DataChannelConnection> mConnection;
+ uint16_t mStream;
+};
+
+void
+DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
+ uint16_t aStream, nsIInputStream* aBlob)
+{
+ // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
+ // it off mainthread; if PeerConnectionImpl has released then we want
+ // ~DataChannelConnection() to run on MainThread
+
+ // XXX to do this safely, we must enqueue these atomically onto the
+ // output socket. We need a sender thread(s?) to enqueue data into the
+ // socket and to avoid main-thread IO that might block. Even on a
+ // background thread, we may not want to block on one stream's data.
+ // I.e. run non-blocking and service multiple channels.
+
+ // For now as a hack, send as a single blast of queued packets which may
+ // be deferred until buffer space is available.
+ uint64_t len;
+ nsCOMPtr<nsIThread> mainThread;
+ NS_GetMainThread(getter_AddRefs(mainThread));
+
+ // Must not let Dispatching it cause the DataChannelConnection to get
+ // released on the wrong thread. Using WrapRunnable(RefPtr<DataChannelConnection>(aThis),...
+ // will occasionally cause aThis to get released on this thread. Also, an explicit Runnable
+ // lets us avoid copying the blob data an extra time.
+ RefPtr<DataChannelBlobSendRunnable> runnable = new DataChannelBlobSendRunnable(aThis,
+ aStream);
+ // avoid copying the blob data by passing the mData from the runnable
+ if (NS_FAILED(aBlob->Available(&len)) ||
+ NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, len))) {
+ // Bug 966602: Doesn't return an error to the caller via onerror.
+ // We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
+ // aThis is now owned by the runnable; release it there
+ NS_ProxyRelease(mainThread, runnable.forget());
+ return;
+ }
+ aBlob->Close();
+ NS_DispatchToMainThread(runnable, NS_DISPATCH_NORMAL);
+}
+
+void
+DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
+{
+ ASSERT_WEBRTC(NS_IsMainThread());
+ for (uint32_t i = 0; i < mStreams.Length(); ++i) {
+ if (mStreams[i]) {
+ aStreamList->push_back(mStreams[i]->mStream);
+ }
+ }
+}
+
+int32_t
+DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
+ bool isBinary)
+{
+ ASSERT_WEBRTC(NS_IsMainThread());
+ // We really could allow this from other threads, so long as we deal with
+ // asynchronosity issues with channels closing, in particular access to
+ // mStreams, and issues with the association closing (access to mSocket).
+
+ const char *data = aMsg.BeginReading();
+ uint32_t len = aMsg.Length();
+ DataChannel *channel;
+
+ LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
+ // XXX if we want more efficiency, translate flags once at open time
+ channel = mStreams[stream];
+ NS_ENSURE_TRUE(channel, 0);
+
+ if (isBinary)
+ return SendBinary(channel, data, len,
+ DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
+ return SendBinary(channel, data, len,
+ DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
+}
+
+void
+DataChannelConnection::Close(DataChannel *aChannel)
+{
+ MutexAutoLock lock(mLock);
+ CloseInt(aChannel);
+}
+
+// So we can call Close() with the lock already held
+// Called from someone who holds a ref via ::Close(), or from ~DataChannel
+void
+DataChannelConnection::CloseInt(DataChannel *aChannel)
+{
+ MOZ_ASSERT(aChannel);
+ RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
+
+ mLock.AssertCurrentThreadOwns();
+ LOG(("Connection %p/Channel %p: Closing stream %u",
+ channel->mConnection.get(), channel.get(), channel->mStream));
+ // re-test since it may have closed before the lock was grabbed
+ if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
+ LOG(("Channel already closing/closed (%u)", aChannel->mState));
+ if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
+ // called from CloseAll()
+ // we're not going to hang around waiting any more
+ mStreams[channel->mStream] = nullptr;
+ }
+ return;
+ }
+ aChannel->mBufferedData.Clear();
+ if (channel->mStream != INVALID_STREAM) {
+ ResetOutgoingStream(channel->mStream);
+ if (mState == CLOSED) { // called from CloseAll()
+ // Let resets accumulate then send all at once in CloseAll()
+ // we're not going to hang around waiting
+ mStreams[channel->mStream] = nullptr;
+ } else {
+ SendOutgoingStreamReset();
+ }
+ }
+ aChannel->mState = CLOSING;
+ if (mState == CLOSED) {
+ // we're not going to hang around waiting
+ channel->StreamClosedLocked();
+ }
+ // At this point when we leave here, the object is a zombie held alive only by the DOM object
+}
+
+void DataChannelConnection::CloseAll()
+{
+ LOG(("Closing all channels (connection %p)", (void*) this));
+ // Don't need to lock here
+
+ // Make sure no more channels will be opened
+ {
+ MutexAutoLock lock(mLock);
+ mState = CLOSED;
+ }
+
+ // Close current channels
+ // If there are runnables, they hold a strong ref and keep the channel
+ // and/or connection alive (even if in a CLOSED state)
+ bool closed_some = false;
+ for (uint32_t i = 0; i < mStreams.Length(); ++i) {
+ if (mStreams[i]) {
+ mStreams[i]->Close();
+ closed_some = true;
+ }
+ }
+
+ // Clean up any pending opens for channels
+ RefPtr<DataChannel> channel;
+ while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
+ LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
+ channel->Close(); // also releases the ref on each iteration
+ closed_some = true;
+ }
+ // It's more efficient to let the Resets queue in shutdown and then
+ // SendOutgoingStreamReset() here.
+ if (closed_some) {
+ MutexAutoLock lock(mLock);
+ SendOutgoingStreamReset();
+ }
+}
+
+DataChannel::~DataChannel()
+{
+ // NS_ASSERTION since this is more "I think I caught all the cases that
+ // can cause this" than a true kill-the-program assertion. If this is
+ // wrong, nothing bad happens. A worst it's a leak.
+ NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
+}
+
+void
+DataChannel::Close()
+{
+ if (mConnection) {
+ // ensure we don't get deleted
+ RefPtr<DataChannelConnection> connection(mConnection);
+ connection->Close(this);
+ }
+}
+
+// Used when disconnecting from the DataChannelConnection
+void
+DataChannel::StreamClosedLocked()
+{
+ mConnection->mLock.AssertCurrentThreadOwns();
+ ENSURE_DATACONNECTION;
+
+ LOG(("Destroying Data channel %u", mStream));
+ MOZ_ASSERT_IF(mStream != INVALID_STREAM,
+ !mConnection->FindChannelByStream(mStream));
+ mStream = INVALID_STREAM;
+ mState = CLOSED;
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED,
+ mConnection, this)));
+ // We leave mConnection live until the DOM releases us, to avoid races
+}
+
+void
+DataChannel::ReleaseConnection()
+{
+ ASSERT_WEBRTC(NS_IsMainThread());
+ mConnection = nullptr;
+}
+
+void
+DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
+{
+ MutexAutoLock mLock(mListenerLock);
+ mContext = aContext;
+ mListener = aListener;
+}
+
+// May be called from another (i.e. Main) thread!
+void
+DataChannel::AppReady()
+{
+ ENSURE_DATACONNECTION;
+
+ MutexAutoLock lock(mConnection->mLock);
+
+ mReady = true;
+ if (mState == WAITING_TO_OPEN) {
+ mState = OPEN;
+ NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
+ this)));
+ for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
+ nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
+ MOZ_ASSERT(runnable);
+ NS_DispatchToMainThread(runnable);
+ }
+ } else {
+ NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
+ }
+ mQueuedMessages.Clear();
+ mQueuedMessages.Compact();
+ // We never use it again... We could even allocate the array in the odd
+ // cases we need it.
+}
+
+uint32_t
+DataChannel::GetBufferedAmountLocked() const
+{
+ size_t buffered = 0;
+
+ for (auto& buffer : mBufferedData) {
+ buffered += buffer->mLength;
+ }
+ // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
+ // amount from the SCTP stack for a single stream. It is on their to-do
+ // list, and once we import a stack with support for that, we'll need to
+ // add it to what we buffer. Also we'll need to ask for notification of a per-
+ // stream buffer-low event and merge that into the handling of buffer-low
+ // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
+
+ if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
+ buffered = UINT32_MAX;
+ }
+ return buffered;
+}
+
+uint32_t
+DataChannel::GetBufferedAmountLowThreshold()
+{
+ return mBufferedThreshold;
+}
+
+// Never fire immediately, as it's defined to fire on transitions, not state
+void
+DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
+{
+ mBufferedThreshold = aThreshold;
+}
+
+// Called with mLock locked!
+void
+DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
+{
+ if (!mReady &&
+ (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
+ mQueuedMessages.AppendElement(aMessage);
+ } else {
+ NS_DispatchToMainThread(aMessage);
+ }
+}
+
+} // namespace mozilla