diff options
Diffstat (limited to 'media/mtransport/nr_socket_prsock.cpp')
-rw-r--r-- | media/mtransport/nr_socket_prsock.cpp | 2299 |
1 files changed, 2299 insertions, 0 deletions
diff --git a/media/mtransport/nr_socket_prsock.cpp b/media/mtransport/nr_socket_prsock.cpp new file mode 100644 index 000000000..f22229df8 --- /dev/null +++ b/media/mtransport/nr_socket_prsock.cpp @@ -0,0 +1,2299 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ +/* +Modified version of nr_socket_local, adapted for NSPR +*/ + +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/* +Original code from nICEr and nrappkit. + +nICEr copyright: + +Copyright (c) 2007, Adobe Systems, Incorporated +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + +* Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + +* Neither the name of Adobe Systems, Network Resonance nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +nrappkit copyright: + + Copyright (C) 2001-2003, Network Resonance, Inc. + Copyright (C) 2006, Network Resonance, Inc. + All Rights Reserved + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of Network Resonance, Inc. nor the name of any + contributors to this software may be used to endorse or promote + products derived from this software without specific prior written + permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. + + + ekr@rtfm.com Thu Dec 20 20:14:49 2001 +*/ + +#include <csi_platform.h> +#include <stdio.h> +#include <string.h> +#include <sys/types.h> +#include <assert.h> +#include <errno.h> +#include <string> + +#include "nspr.h" +#include "prerror.h" +#include "prio.h" +#include "prnetdb.h" + +#include "mozilla/net/DNS.h" +#include "nsCOMPtr.h" +#include "nsASocketHandler.h" +#include "nsISocketTransportService.h" +#include "nsNetCID.h" +#include "nsISupportsImpl.h" +#include "nsServiceManagerUtils.h" +#include "nsComponentManagerUtils.h" +#include "nsXPCOM.h" +#include "nsXULAppAPI.h" +#include "runnable_utils.h" +#include "mozilla/SyncRunnable.h" +#include "nsTArray.h" +#include "mozilla/dom/TCPSocketBinding.h" +#include "nsITCPSocketCallback.h" +#include "nsIPrefService.h" +#include "nsIPrefBranch.h" +#include "nsISocketFilter.h" + +#ifdef XP_WIN +#include "mozilla/WindowsVersion.h" +#endif + +#if defined(MOZILLA_INTERNAL_API) +// csi_platform.h deep in nrappkit defines LOG_INFO and LOG_WARNING +#ifdef LOG_INFO +#define LOG_TEMP_INFO LOG_INFO +#undef LOG_INFO +#endif +#ifdef LOG_WARNING +#define LOG_TEMP_WARNING LOG_WARNING +#undef LOG_WARNING +#endif +#if defined(LOG_DEBUG) +#define LOG_TEMP_DEBUG LOG_DEBUG +#undef LOG_DEBUG +#endif +#undef strlcpy + +#include "mozilla/dom/network/TCPSocketChild.h" + +#ifdef LOG_TEMP_INFO +#define LOG_INFO LOG_TEMP_INFO +#endif +#ifdef LOG_TEMP_WARNING +#define LOG_WARNING LOG_TEMP_WARNING +#endif + +#ifdef LOG_TEMP_DEBUG +#define LOG_DEBUG LOG_TEMP_DEBUG +#endif +#ifdef XP_WIN +#ifdef LOG_DEBUG +#undef LOG_DEBUG +#endif +// cloned from csi_platform.h. Win32 doesn't like how we hide symbols +#define LOG_DEBUG 7 +#endif +#endif + + +extern "C" { +#include "nr_api.h" +#include "async_wait.h" +#include "nr_socket.h" +#include "nr_socket_local.h" +#include "stun_hint.h" +} +#include "nr_socket_prsock.h" +#include "simpletokenbucket.h" +#include "test_nr_socket.h" + +// Implement the nsISupports ref counting +namespace mozilla { + +#if defined(MOZILLA_INTERNAL_API) +class SingletonThreadHolder final +{ +private: + ~SingletonThreadHolder() + { + r_log(LOG_GENERIC,LOG_DEBUG,"Deleting SingletonThreadHolder"); + MOZ_ASSERT(!mThread, "SingletonThreads should be Released and shut down before exit!"); + if (mThread) { + mThread->Shutdown(); + mThread = nullptr; + } + } + + DISALLOW_COPY_ASSIGN(SingletonThreadHolder); + +public: + // Must be threadsafe for StaticRefPtr/ClearOnShutdown + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(SingletonThreadHolder) + + explicit SingletonThreadHolder(const nsCSubstring& aName) + : mName(aName) + { + mParentThread = NS_GetCurrentThread(); + } + + nsIThread* GetThread() { + return mThread; + } + + /* + * Keep track of how many instances are using a SingletonThreadHolder. + * When no one is using it, shut it down + */ + MozExternalRefCountType AddUse() + { + MOZ_ASSERT(mParentThread == NS_GetCurrentThread()); + MOZ_ASSERT(int32_t(mUseCount) >= 0, "illegal refcnt"); + nsrefcnt count = ++mUseCount; + if (count == 1) { + // idle -> in-use + nsresult rv = NS_NewThread(getter_AddRefs(mThread)); + MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv) && mThread, + "Should successfully create mtransport I/O thread"); + NS_SetThreadName(mThread, mName); + r_log(LOG_GENERIC,LOG_DEBUG,"Created wrapped SingletonThread %p", + mThread.get()); + } + r_log(LOG_GENERIC,LOG_DEBUG,"AddUse: %lu", (unsigned long) count); + return count; + } + + MozExternalRefCountType ReleaseUse() + { + MOZ_ASSERT(mParentThread == NS_GetCurrentThread()); + nsrefcnt count = --mUseCount; + MOZ_ASSERT(int32_t(mUseCount) >= 0, "illegal refcnt"); + if (count == 0) { + // in-use -> idle -- no one forcing it to remain instantiated + r_log(LOG_GENERIC,LOG_DEBUG,"Shutting down wrapped SingletonThread %p", + mThread.get()); + mThread->Shutdown(); + mThread = nullptr; + // It'd be nice to use a timer instead... But be careful of + // xpcom-shutdown-threads in that case + } + r_log(LOG_GENERIC,LOG_DEBUG,"ReleaseUse: %lu", (unsigned long) count); + return count; + } + +private: + nsCString mName; + nsAutoRefCnt mUseCount; + nsCOMPtr<nsIThread> mParentThread; + nsCOMPtr<nsIThread> mThread; +}; + +static StaticRefPtr<SingletonThreadHolder> sThread; + +static void ClearSingletonOnShutdown() +{ + ClearOnShutdown(&sThread); +} +#endif + +static nsIThread* GetIOThreadAndAddUse_s() +{ + // Always runs on STS thread! +#if defined(MOZILLA_INTERNAL_API) + // We need to safely release this on shutdown to avoid leaks + if (!sThread) { + sThread = new SingletonThreadHolder(NS_LITERAL_CSTRING("mtransport")); + NS_DispatchToMainThread(mozilla::WrapRunnableNM(&ClearSingletonOnShutdown)); + } + // Mark that we're using the shared thread and need it to stick around + sThread->AddUse(); + return sThread->GetThread(); +#else + static nsCOMPtr<nsIThread> sThread; + if (!sThread) { + (void) NS_NewNamedThread("mtransport", getter_AddRefs(sThread)); + } + return sThread; +#endif +} + +NrSocketIpc::NrSocketIpc(nsIEventTarget *aThread) + : io_thread_(aThread) +{} + +static TimeStamp nr_socket_short_term_violation_time; +static TimeStamp nr_socket_long_term_violation_time; + +TimeStamp NrSocketBase::short_term_violation_time() { + return nr_socket_short_term_violation_time; +} + +TimeStamp NrSocketBase::long_term_violation_time() { + return nr_socket_long_term_violation_time; +} + +// NrSocketBase implementation +// async_event APIs +int NrSocketBase::async_wait(int how, NR_async_cb cb, void *cb_arg, + char *function, int line) { + uint16_t flag; + + switch (how) { + case NR_ASYNC_WAIT_READ: + flag = PR_POLL_READ; + break; + case NR_ASYNC_WAIT_WRITE: + flag = PR_POLL_WRITE; + break; + default: + return R_BAD_ARGS; + } + + cbs_[how] = cb; + cb_args_[how] = cb_arg; + poll_flags_ |= flag; + + return 0; +} + +int NrSocketBase::cancel(int how) { + uint16_t flag; + + switch (how) { + case NR_ASYNC_WAIT_READ: + flag = PR_POLL_READ; + break; + case NR_ASYNC_WAIT_WRITE: + flag = PR_POLL_WRITE; + break; + default: + return R_BAD_ARGS; + } + + poll_flags_ &= ~flag; + + return 0; +} + +void NrSocketBase::fire_callback(int how) { + // This can't happen unless we are armed because we only set + // the flags if we are armed + MOZ_ASSERT(cbs_[how]); + + // Now cancel so that we need to be re-armed. Note that + // the re-arming probably happens in the callback we are + // about to fire. + cancel(how); + + cbs_[how](this, how, cb_args_[how]); +} + +// NrSocket implementation +NS_IMPL_ISUPPORTS0(NrSocket) + + +// The nsASocket callbacks +void NrSocket::OnSocketReady(PRFileDesc *fd, int16_t outflags) { + if (outflags & PR_POLL_READ & poll_flags()) + fire_callback(NR_ASYNC_WAIT_READ); + if (outflags & PR_POLL_WRITE & poll_flags()) + fire_callback(NR_ASYNC_WAIT_WRITE); + if (outflags & (PR_POLL_ERR | PR_POLL_NVAL | PR_POLL_HUP)) + // TODO: Bug 946423: how do we notify the upper layers about this? + close(); +} + +void NrSocket::OnSocketDetached(PRFileDesc *fd) { + r_log(LOG_GENERIC, LOG_DEBUG, "Socket %p detached", fd); +} + +void NrSocket::IsLocal(bool *aIsLocal) { + // TODO(jesup): better check? Does it matter? (likely no) + *aIsLocal = false; +} + +// async_event APIs +int NrSocket::async_wait(int how, NR_async_cb cb, void *cb_arg, + char *function, int line) { + int r = NrSocketBase::async_wait(how, cb, cb_arg, function, line); + + if (!r) { + mPollFlags = poll_flags(); + } + + return r; +} + +int NrSocket::cancel(int how) { + int r = NrSocketBase::cancel(how); + + if (!r) { + mPollFlags = poll_flags(); + } + + return r; +} + +// Helper functions for addresses +static int nr_transport_addr_to_praddr(nr_transport_addr *addr, + PRNetAddr *naddr) + { + int _status; + + memset(naddr, 0, sizeof(*naddr)); + + switch(addr->protocol){ + case IPPROTO_TCP: + break; + case IPPROTO_UDP: + break; + default: + ABORT(R_BAD_ARGS); + } + + switch(addr->ip_version){ + case NR_IPV4: + naddr->inet.family = PR_AF_INET; + naddr->inet.port = addr->u.addr4.sin_port; + naddr->inet.ip = addr->u.addr4.sin_addr.s_addr; + break; + case NR_IPV6: + naddr->ipv6.family = PR_AF_INET6; + naddr->ipv6.port = addr->u.addr6.sin6_port; + naddr->ipv6.flowinfo = addr->u.addr6.sin6_flowinfo; + memcpy(&naddr->ipv6.ip, &addr->u.addr6.sin6_addr, sizeof(in6_addr)); + naddr->ipv6.scope_id = addr->u.addr6.sin6_scope_id; + break; + default: + ABORT(R_BAD_ARGS); + } + + _status = 0; + abort: + return(_status); + } + +//XXX schien@mozilla.com: copy from PRNetAddrToNetAddr, +// should be removed after fix the link error in signaling_unittests +static int praddr_to_netaddr(const PRNetAddr *prAddr, net::NetAddr *addr) +{ + int _status; + + switch (prAddr->raw.family) { + case PR_AF_INET: + addr->inet.family = AF_INET; + addr->inet.port = prAddr->inet.port; + addr->inet.ip = prAddr->inet.ip; + break; + case PR_AF_INET6: + addr->inet6.family = AF_INET6; + addr->inet6.port = prAddr->ipv6.port; + addr->inet6.flowinfo = prAddr->ipv6.flowinfo; + memcpy(&addr->inet6.ip, &prAddr->ipv6.ip, sizeof(addr->inet6.ip.u8)); + addr->inet6.scope_id = prAddr->ipv6.scope_id; + break; + default: + MOZ_ASSERT(false); + ABORT(R_BAD_ARGS); + } + + _status = 0; +abort: + return(_status); +} + +static int nr_transport_addr_to_netaddr(nr_transport_addr *addr, + net::NetAddr *naddr) +{ + int r, _status; + PRNetAddr praddr; + + if((r = nr_transport_addr_to_praddr(addr, &praddr))) { + ABORT(r); + } + + if((r = praddr_to_netaddr(&praddr, naddr))) { + ABORT(r); + } + + _status = 0; +abort: + return(_status); +} + +int nr_netaddr_to_transport_addr(const net::NetAddr *netaddr, + nr_transport_addr *addr, int protocol) + { + int _status; + int r; + + switch(netaddr->raw.family) { + case AF_INET: + if ((r = nr_ip4_port_to_transport_addr(ntohl(netaddr->inet.ip), + ntohs(netaddr->inet.port), + protocol, addr))) + ABORT(r); + break; + case AF_INET6: + if ((r = nr_ip6_port_to_transport_addr((in6_addr *)&netaddr->inet6.ip.u8, + ntohs(netaddr->inet6.port), + protocol, addr))) + ABORT(r); + break; + default: + MOZ_ASSERT(false); + ABORT(R_BAD_ARGS); + } + _status = 0; + abort: + return(_status); + } + +int nr_praddr_to_transport_addr(const PRNetAddr *praddr, + nr_transport_addr *addr, int protocol, + int keep) + { + int _status; + int r; + struct sockaddr_in ip4; + struct sockaddr_in6 ip6; + + switch(praddr->raw.family) { + case PR_AF_INET: + ip4.sin_family = PF_INET; + ip4.sin_addr.s_addr = praddr->inet.ip; + ip4.sin_port = praddr->inet.port; + if ((r = nr_sockaddr_to_transport_addr((sockaddr *)&ip4, + protocol, keep, + addr))) + ABORT(r); + break; + case PR_AF_INET6: + ip6.sin6_family = PF_INET6; + ip6.sin6_port = praddr->ipv6.port; + ip6.sin6_flowinfo = praddr->ipv6.flowinfo; + memcpy(&ip6.sin6_addr, &praddr->ipv6.ip, sizeof(in6_addr)); + ip6.sin6_scope_id = praddr->ipv6.scope_id; + if ((r = nr_sockaddr_to_transport_addr((sockaddr *)&ip6,protocol,keep,addr))) + ABORT(r); + break; + default: + MOZ_ASSERT(false); + ABORT(R_BAD_ARGS); + } + + _status = 0; + abort: + return(_status); + } + +/* + * nr_transport_addr_get_addrstring_and_port + * convert nr_transport_addr to IP address string and port number + */ +int nr_transport_addr_get_addrstring_and_port(nr_transport_addr *addr, + nsACString *host, int32_t *port) { + int r, _status; + char addr_string[64]; + + // We cannot directly use |nr_transport_addr.as_string| because it contains + // more than ip address, therefore, we need to explicity convert it + // from |nr_transport_addr_get_addrstring|. + if ((r=nr_transport_addr_get_addrstring(addr, addr_string, sizeof(addr_string)))) { + ABORT(r); + } + + if ((r=nr_transport_addr_get_port(addr, port))) { + ABORT(r); + } + + *host = addr_string; + + _status = 0; +abort: + return(_status); +} + +// nr_socket APIs (as member functions) +int NrSocket::create(nr_transport_addr *addr) { + int r,_status; + + PRStatus status; + PRNetAddr naddr; + + nsresult rv; + nsCOMPtr<nsISocketTransportService> stservice = + do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); + + if (!NS_SUCCEEDED(rv)) { + ABORT(R_INTERNAL); + } + + if((r=nr_transport_addr_to_praddr(addr, &naddr))) + ABORT(r); + + switch (addr->protocol) { + case IPPROTO_UDP: + if (!(fd_ = PR_OpenUDPSocket(naddr.raw.family))) { + r_log(LOG_GENERIC,LOG_CRIT,"Couldn't create UDP socket, " + "family=%d, err=%d", naddr.raw.family, PR_GetError()); + ABORT(R_INTERNAL); + } +#ifdef XP_WIN + if (!mozilla::IsWin8OrLater()) { + // Increase default send and receive buffer sizes on <= Win7 to be able to + // receive and send an unpaced HD (>= 720p = 1280x720 - I Frame ~ 21K size) + // stream without losing packets. + // Manual testing showed that 100K buffer size was not enough and the + // packet loss dis-appeared with 256K buffer size. + // See bug 1252769 for future improvements of this. + PRSize min_buffer_size = 256 * 1024; + PRSocketOptionData opt_rcvbuf; + opt_rcvbuf.option = PR_SockOpt_RecvBufferSize; + if ((status = PR_GetSocketOption(fd_, &opt_rcvbuf)) == PR_SUCCESS) { + if (opt_rcvbuf.value.recv_buffer_size < min_buffer_size) { + opt_rcvbuf.value.recv_buffer_size = min_buffer_size; + if ((status = PR_SetSocketOption(fd_, &opt_rcvbuf)) != PR_SUCCESS) { + r_log(LOG_GENERIC, LOG_CRIT, + "Couldn't set socket receive buffer size: %d", status); + } + } else { + r_log(LOG_GENERIC, LOG_INFO, + "Socket receive buffer size is already: %d", + opt_rcvbuf.value.recv_buffer_size); + } + } else { + r_log(LOG_GENERIC, LOG_CRIT, + "Couldn't get socket receive buffer size: %d", status); + } + PRSocketOptionData opt_sndbuf; + opt_sndbuf.option = PR_SockOpt_SendBufferSize; + if ((status = PR_GetSocketOption(fd_, &opt_sndbuf)) == PR_SUCCESS) { + if (opt_sndbuf.value.recv_buffer_size < min_buffer_size) { + opt_sndbuf.value.recv_buffer_size = min_buffer_size; + if ((status = PR_SetSocketOption(fd_, &opt_sndbuf)) != PR_SUCCESS) { + r_log(LOG_GENERIC, LOG_CRIT, + "Couldn't set socket send buffer size: %d", status); + } + } else { + r_log(LOG_GENERIC, LOG_INFO, + "Socket send buffer size is already: %d", + opt_sndbuf.value.recv_buffer_size); + } + } else { + r_log(LOG_GENERIC, LOG_CRIT, + "Couldn't get socket send buffer size: %d", status); + } + } +#endif + break; + case IPPROTO_TCP: + if (!(fd_ = PR_OpenTCPSocket(naddr.raw.family))) { + r_log(LOG_GENERIC,LOG_CRIT,"Couldn't create TCP socket, " + "family=%d, err=%d", naddr.raw.family, PR_GetError()); + ABORT(R_INTERNAL); + } + // Set ReuseAddr for TCP sockets to enable having several + // sockets bound to same local IP and port + PRSocketOptionData opt_reuseaddr; + opt_reuseaddr.option = PR_SockOpt_Reuseaddr; + opt_reuseaddr.value.reuse_addr = PR_TRUE; + status = PR_SetSocketOption(fd_, &opt_reuseaddr); + if (status != PR_SUCCESS) { + r_log(LOG_GENERIC, LOG_CRIT, + "Couldn't set reuse addr socket option: %d", status); + ABORT(R_INTERNAL); + } + // And also set ReusePort for platforms supporting this socket option + PRSocketOptionData opt_reuseport; + opt_reuseport.option = PR_SockOpt_Reuseport; + opt_reuseport.value.reuse_port = PR_TRUE; + status = PR_SetSocketOption(fd_, &opt_reuseport); + if (status != PR_SUCCESS) { + if (PR_GetError() != PR_OPERATION_NOT_SUPPORTED_ERROR) { + r_log(LOG_GENERIC, LOG_CRIT, + "Couldn't set reuse port socket option: %d", status); + ABORT(R_INTERNAL); + } + } + // Try to speedup packet delivery by disabling TCP Nagle + PRSocketOptionData opt_nodelay; + opt_nodelay.option = PR_SockOpt_NoDelay; + opt_nodelay.value.no_delay = PR_TRUE; + status = PR_SetSocketOption(fd_, &opt_nodelay); + if (status != PR_SUCCESS) { + r_log(LOG_GENERIC, LOG_WARNING, + "Couldn't set Nodelay socket option: %d", status); + } + break; + default: + ABORT(R_INTERNAL); + } + + status = PR_Bind(fd_, &naddr); + if (status != PR_SUCCESS) { + r_log(LOG_GENERIC,LOG_CRIT,"Couldn't bind socket to address %s", + addr->as_string); + ABORT(R_INTERNAL); + } + + r_log(LOG_GENERIC,LOG_DEBUG,"Creating socket %p with addr %s", + fd_, addr->as_string); + nr_transport_addr_copy(&my_addr_,addr); + + /* If we have a wildcard port, patch up the addr */ + if(nr_transport_addr_is_wildcard(addr)){ + status = PR_GetSockName(fd_, &naddr); + if (status != PR_SUCCESS){ + r_log(LOG_GENERIC, LOG_CRIT, "Couldn't get sock name for socket"); + ABORT(R_INTERNAL); + } + + if((r=nr_praddr_to_transport_addr(&naddr,&my_addr_,addr->protocol,1))) + ABORT(r); + } + + + // Set nonblocking + PRSocketOptionData opt_nonblock; + opt_nonblock.option = PR_SockOpt_Nonblocking; + opt_nonblock.value.non_blocking = PR_TRUE; + status = PR_SetSocketOption(fd_, &opt_nonblock); + if (status != PR_SUCCESS) { + r_log(LOG_GENERIC, LOG_CRIT, "Couldn't make socket nonblocking"); + ABORT(R_INTERNAL); + } + + // Remember our thread. + ststhread_ = do_QueryInterface(stservice, &rv); + if (!NS_SUCCEEDED(rv)) + ABORT(R_INTERNAL); + + // Finally, register with the STS + rv = stservice->AttachSocket(fd_, this); + if (!NS_SUCCEEDED(rv)) { + r_log(LOG_GENERIC, LOG_CRIT, "Couldn't attach socket to STS, rv=%u", + static_cast<unsigned>(rv)); + ABORT(R_INTERNAL); + } + + _status = 0; + +abort: + return(_status); +} + +static int ShouldDrop(size_t len) { + // Global rate limiting for stun requests, to mitigate the ice hammer DoS + // (see http://tools.ietf.org/html/draft-thomson-mmusic-ice-webrtc) + + // Tolerate rate of 8k/sec, for one second. + static SimpleTokenBucket burst(16384*1, 16384); + // Tolerate rate of 7.2k/sec over twenty seconds. + static SimpleTokenBucket sustained(7372*20, 7372); + + // Check number of tokens in each bucket. + if (burst.getTokens(UINT32_MAX) < len) { + r_log(LOG_GENERIC, LOG_ERR, + "Short term global rate limit for STUN requests exceeded."); +#ifdef MOZILLA_INTERNAL_API + nr_socket_short_term_violation_time = TimeStamp::Now(); +#endif + +// Bug 1013007 +#if !EARLY_BETA_OR_EARLIER + return R_WOULDBLOCK; +#else + MOZ_ASSERT(false, + "Short term global rate limit for STUN requests exceeded. Go " + "bug bcampen@mozilla.com if you weren't intentionally " + "spamming ICE candidates, or don't know what that means."); +#endif + } + + if (sustained.getTokens(UINT32_MAX) < len) { + r_log(LOG_GENERIC, LOG_ERR, + "Long term global rate limit for STUN requests exceeded."); +#ifdef MOZILLA_INTERNAL_API + nr_socket_long_term_violation_time = TimeStamp::Now(); +#endif +// Bug 1013007 +#if !EARLY_BETA_OR_EARLIER + return R_WOULDBLOCK; +#else + MOZ_ASSERT(false, + "Long term global rate limit for STUN requests exceeded. Go " + "bug bcampen@mozilla.com if you weren't intentionally " + "spamming ICE candidates, or don't know what that means."); +#endif + } + + // Take len tokens from both buckets. + // (not threadsafe, but no problem since this is only called from STS) + burst.getTokens(len); + sustained.getTokens(len); + return 0; +} + +// This should be called on the STS thread. +int NrSocket::sendto(const void *msg, size_t len, + int flags, nr_transport_addr *to) { + ASSERT_ON_THREAD(ststhread_); + int r,_status; + PRNetAddr naddr; + int32_t status; + + if ((r=nr_transport_addr_to_praddr(to, &naddr))) + ABORT(r); + + if(fd_==nullptr) + ABORT(R_EOD); + + if (nr_is_stun_request_message((UCHAR*)msg, len) && ShouldDrop(len)) { + ABORT(R_WOULDBLOCK); + } + + // TODO: Convert flags? + status = PR_SendTo(fd_, msg, len, flags, &naddr, PR_INTERVAL_NO_WAIT); + if (status < 0 || (size_t)status != len) { + if (PR_GetError() == PR_WOULD_BLOCK_ERROR) + ABORT(R_WOULDBLOCK); + + r_log(LOG_GENERIC, LOG_INFO, "Error in sendto %s: %d", + to->as_string, PR_GetError()); + ABORT(R_IO_ERROR); + } + + _status = 0; +abort: + return(_status); +} + +int NrSocket::recvfrom(void * buf, size_t maxlen, + size_t *len, int flags, + nr_transport_addr *from) { + ASSERT_ON_THREAD(ststhread_); + int r,_status; + PRNetAddr nfrom; + int32_t status; + + status = PR_RecvFrom(fd_, buf, maxlen, flags, &nfrom, PR_INTERVAL_NO_WAIT); + if (status <= 0) { + if (PR_GetError() == PR_WOULD_BLOCK_ERROR) + ABORT(R_WOULDBLOCK); + r_log(LOG_GENERIC, LOG_INFO, "Error in recvfrom: %d", (int)PR_GetError()); + ABORT(R_IO_ERROR); + } + *len = status; + + if((r=nr_praddr_to_transport_addr(&nfrom,from,my_addr_.protocol,0))) + ABORT(r); + + //r_log(LOG_GENERIC,LOG_DEBUG,"Read %d bytes from %s",*len,addr->as_string); + + _status = 0; +abort: + return(_status); +} + +int NrSocket::getaddr(nr_transport_addr *addrp) { + ASSERT_ON_THREAD(ststhread_); + return nr_transport_addr_copy(addrp, &my_addr_); +} + +// Close the socket so that the STS will detach and then kill it +void NrSocket::close() { + ASSERT_ON_THREAD(ststhread_); + mCondition = NS_BASE_STREAM_CLOSED; +} + + +int NrSocket::connect(nr_transport_addr *addr) { + ASSERT_ON_THREAD(ststhread_); + int r,_status; + PRNetAddr naddr; + int32_t connect_status, getsockname_status; + + if ((r=nr_transport_addr_to_praddr(addr, &naddr))) + ABORT(r); + + if(!fd_) + ABORT(R_EOD); + + // Note: this just means we tried to connect, not that we + // are actually live. + connect_invoked_ = true; + connect_status = PR_Connect(fd_, &naddr, PR_INTERVAL_NO_WAIT); + if (connect_status != PR_SUCCESS) { + if (PR_GetError() != PR_IN_PROGRESS_ERROR) + ABORT(R_IO_ERROR); + } + + // If our local address is wildcard, then fill in the + // address now. + if(nr_transport_addr_is_wildcard(&my_addr_)){ + getsockname_status = PR_GetSockName(fd_, &naddr); + if (getsockname_status != PR_SUCCESS){ + r_log(LOG_GENERIC, LOG_CRIT, "Couldn't get sock name for socket"); + ABORT(R_INTERNAL); + } + + if((r=nr_praddr_to_transport_addr(&naddr,&my_addr_,addr->protocol,1))) + ABORT(r); + } + + // Now return the WOULDBLOCK if needed. + if (connect_status != PR_SUCCESS) { + ABORT(R_WOULDBLOCK); + } + + _status = 0; +abort: + return(_status); +} + + +int NrSocket::write(const void *msg, size_t len, size_t *written) { + ASSERT_ON_THREAD(ststhread_); + int _status; + int32_t status; + + if (!connect_invoked_) + ABORT(R_FAILED); + + status = PR_Write(fd_, msg, len); + if (status < 0) { + if (PR_GetError() == PR_WOULD_BLOCK_ERROR) + ABORT(R_WOULDBLOCK); + r_log(LOG_GENERIC, LOG_INFO, "Error in write"); + ABORT(R_IO_ERROR); + } + + *written = status; + + _status = 0; +abort: + return _status; +} + +int NrSocket::read(void* buf, size_t maxlen, size_t *len) { + ASSERT_ON_THREAD(ststhread_); + int _status; + int32_t status; + + if (!connect_invoked_) + ABORT(R_FAILED); + + status = PR_Read(fd_, buf, maxlen); + if (status < 0) { + if (PR_GetError() == PR_WOULD_BLOCK_ERROR) + ABORT(R_WOULDBLOCK); + r_log(LOG_GENERIC, LOG_INFO, "Error in read"); + ABORT(R_IO_ERROR); + } + if (status == 0) + ABORT(R_EOD); + + *len = (size_t)status; // Guaranteed to be > 0 + _status = 0; +abort: + return(_status); +} + +int NrSocket::listen(int backlog) { + ASSERT_ON_THREAD(ststhread_); + int32_t status; + int _status; + + assert(fd_); + status = PR_Listen(fd_, backlog); + if (status != PR_SUCCESS) { + r_log(LOG_GENERIC, LOG_CRIT, "%s: PR_GetError() == %d", + __FUNCTION__, PR_GetError()); + ABORT(R_IO_ERROR); + } + + _status = 0; +abort: + return(_status); +} + +int NrSocket::accept(nr_transport_addr *addrp, nr_socket **sockp) { + ASSERT_ON_THREAD(ststhread_); + int _status, r; + PRStatus status; + PRFileDesc *prfd; + PRNetAddr nfrom; + NrSocket *sock=nullptr; + nsresult rv; + PRSocketOptionData opt_nonblock, opt_nodelay; + nsCOMPtr<nsISocketTransportService> stservice = + do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); + + if (NS_FAILED(rv)) { + ABORT(R_INTERNAL); + } + + if(!fd_) + ABORT(R_EOD); + + prfd = PR_Accept(fd_, &nfrom, PR_INTERVAL_NO_WAIT); + + if (!prfd) { + if (PR_GetError() == PR_WOULD_BLOCK_ERROR) + ABORT(R_WOULDBLOCK); + + ABORT(R_IO_ERROR); + } + + sock = new NrSocket(); + + sock->fd_=prfd; + nr_transport_addr_copy(&sock->my_addr_, &my_addr_); + + if((r=nr_praddr_to_transport_addr(&nfrom, addrp, my_addr_.protocol, 0))) + ABORT(r); + + // Set nonblocking + opt_nonblock.option = PR_SockOpt_Nonblocking; + opt_nonblock.value.non_blocking = PR_TRUE; + status = PR_SetSocketOption(prfd, &opt_nonblock); + if (status != PR_SUCCESS) { + r_log(LOG_GENERIC, LOG_CRIT, + "Failed to make accepted socket nonblocking: %d", status); + ABORT(R_INTERNAL); + } + // Disable TCP Nagle + opt_nodelay.option = PR_SockOpt_NoDelay; + opt_nodelay.value.no_delay = PR_TRUE; + status = PR_SetSocketOption(prfd, &opt_nodelay); + if (status != PR_SUCCESS) { + r_log(LOG_GENERIC, LOG_WARNING, + "Failed to set Nodelay on accepted socket: %d", status); + } + + // Should fail only with OOM + if ((r=nr_socket_create_int(static_cast<void *>(sock), sock->vtbl(), sockp))) + ABORT(r); + + // Remember our thread. + sock->ststhread_ = do_QueryInterface(stservice, &rv); + if (NS_FAILED(rv)) + ABORT(R_INTERNAL); + + // Finally, register with the STS + rv = stservice->AttachSocket(prfd, sock); + if (NS_FAILED(rv)) { + ABORT(R_INTERNAL); + } + + sock->connect_invoked_ = true; + + // Add a reference so that we can delete it in destroy() + sock->AddRef(); + _status = 0; +abort: + if (_status) { + delete sock; + } + + return(_status); +} + +NS_IMPL_ISUPPORTS(NrUdpSocketIpcProxy, nsIUDPSocketInternal) + +nsresult +NrUdpSocketIpcProxy::Init(const RefPtr<NrUdpSocketIpc>& socket) +{ + nsresult rv; + sts_thread_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); + if (NS_FAILED(rv)) { + MOZ_ASSERT(false, "Failed to get STS thread"); + return rv; + } + + socket_ = socket; + return NS_OK; +} + +NrUdpSocketIpcProxy::~NrUdpSocketIpcProxy() +{ + // Send our ref to STS to be released + RUN_ON_THREAD(sts_thread_, + mozilla::WrapRelease(socket_.forget()), + NS_DISPATCH_NORMAL); +} + +// IUDPSocketInternal interfaces +// callback while error happened in UDP socket operation +NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerError(const nsACString &message, + const nsACString &filename, + uint32_t line_number) { + return socket_->CallListenerError(message, filename, line_number); +} + +// callback while receiving UDP packet +NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerReceivedData(const nsACString &host, + uint16_t port, + const uint8_t *data, + uint32_t data_length) { + return socket_->CallListenerReceivedData(host, port, data, data_length); +} + +// callback while UDP socket is opened +NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerOpened() { + return socket_->CallListenerOpened(); +} + +// callback while UDP socket is connected +NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerConnected() { + return socket_->CallListenerConnected(); +} + +// callback while UDP socket is closed +NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerClosed() { + return socket_->CallListenerClosed(); +} + +// NrUdpSocketIpc Implementation +NrUdpSocketIpc::NrUdpSocketIpc() + : NrSocketIpc(GetIOThreadAndAddUse_s()), + monitor_("NrUdpSocketIpc"), + err_(false), + state_(NR_INIT) { +} + +NrUdpSocketIpc::~NrUdpSocketIpc() +{ + // also guarantees socket_child_ is released from the io_thread, and + // tells the SingletonThreadHolder we're done with it + +#if defined(MOZILLA_INTERNAL_API) + // close(), but transfer the socket_child_ reference to die as well + RUN_ON_THREAD(io_thread_, + mozilla::WrapRunnableNM(&NrUdpSocketIpc::release_child_i, + socket_child_.forget().take(), + sts_thread_), + NS_DISPATCH_NORMAL); +#endif +} + +// IUDPSocketInternal interfaces +// callback while error happened in UDP socket operation +NS_IMETHODIMP NrUdpSocketIpc::CallListenerError(const nsACString &message, + const nsACString &filename, + uint32_t line_number) { + ASSERT_ON_THREAD(io_thread_); + + r_log(LOG_GENERIC, LOG_ERR, "UDP socket error:%s at %s:%d this=%p", + message.BeginReading(), filename.BeginReading(), line_number, (void*) this ); + + ReentrantMonitorAutoEnter mon(monitor_); + err_ = true; + monitor_.NotifyAll(); + + return NS_OK; +} + +// callback while receiving UDP packet +NS_IMETHODIMP NrUdpSocketIpc::CallListenerReceivedData(const nsACString &host, + uint16_t port, + const uint8_t *data, + uint32_t data_length) { + ASSERT_ON_THREAD(io_thread_); + + PRNetAddr addr; + memset(&addr, 0, sizeof(addr)); + + { + ReentrantMonitorAutoEnter mon(monitor_); + + if (PR_SUCCESS != PR_StringToNetAddr(host.BeginReading(), &addr)) { + err_ = true; + MOZ_ASSERT(false, "Failed to convert remote host to PRNetAddr"); + return NS_OK; + } + + // Use PR_IpAddrNull to avoid address being reset to 0. + if (PR_SUCCESS != PR_SetNetAddr(PR_IpAddrNull, addr.raw.family, port, &addr)) { + err_ = true; + MOZ_ASSERT(false, "Failed to set port in PRNetAddr"); + return NS_OK; + } + } + + nsAutoPtr<DataBuffer> buf(new DataBuffer(data, data_length)); + RefPtr<nr_udp_message> msg(new nr_udp_message(addr, buf)); + + RUN_ON_THREAD(sts_thread_, + mozilla::WrapRunnable(RefPtr<NrUdpSocketIpc>(this), + &NrUdpSocketIpc::recv_callback_s, + msg), + NS_DISPATCH_NORMAL); + return NS_OK; +} + +nsresult NrUdpSocketIpc::SetAddress() { + uint16_t port; + if (NS_FAILED(socket_child_->GetLocalPort(&port))) { + err_ = true; + MOZ_ASSERT(false, "Failed to get local port"); + return NS_OK; + } + + nsAutoCString address; + if(NS_FAILED(socket_child_->GetLocalAddress(address))) { + err_ = true; + MOZ_ASSERT(false, "Failed to get local address"); + return NS_OK; + } + + PRNetAddr praddr; + if (PR_SUCCESS != PR_InitializeNetAddr(PR_IpAddrAny, port, &praddr)) { + err_ = true; + MOZ_ASSERT(false, "Failed to set port in PRNetAddr"); + return NS_OK; + } + + if (PR_SUCCESS != PR_StringToNetAddr(address.BeginReading(), &praddr)) { + err_ = true; + MOZ_ASSERT(false, "Failed to convert local host to PRNetAddr"); + return NS_OK; + } + + nr_transport_addr expected_addr; + if(nr_transport_addr_copy(&expected_addr, &my_addr_)) { + err_ = true; + MOZ_ASSERT(false, "Failed to copy my_addr_"); + } + + if (nr_praddr_to_transport_addr(&praddr, &my_addr_, IPPROTO_UDP, 1)) { + err_ = true; + MOZ_ASSERT(false, "Failed to copy local host to my_addr_"); + } + + if (!nr_transport_addr_is_wildcard(&expected_addr) && + nr_transport_addr_cmp(&expected_addr, &my_addr_, + NR_TRANSPORT_ADDR_CMP_MODE_ADDR)) { + err_ = true; + MOZ_ASSERT(false, "Address of opened socket is not expected"); + } + + return NS_OK; +} + +// callback while UDP socket is opened +NS_IMETHODIMP NrUdpSocketIpc::CallListenerOpened() { + ASSERT_ON_THREAD(io_thread_); + ReentrantMonitorAutoEnter mon(monitor_); + + r_log(LOG_GENERIC, LOG_DEBUG, "UDP socket opened this=%p", (void*) this); + nsresult rv = SetAddress(); + if (NS_FAILED(rv)) { + return rv; + } + + mon.NotifyAll(); + + return NS_OK; +} + +// callback while UDP socket is connected +NS_IMETHODIMP NrUdpSocketIpc::CallListenerConnected() { + ASSERT_ON_THREAD(io_thread_); + + ReentrantMonitorAutoEnter mon(monitor_); + + r_log(LOG_GENERIC, LOG_DEBUG, "UDP socket connected this=%p", (void*) this); + MOZ_ASSERT(state_ == NR_CONNECTED); + + nsresult rv = SetAddress(); + if (NS_FAILED(rv)) { + mon.NotifyAll(); + return rv; + } + + r_log(LOG_GENERIC, LOG_INFO, "Exit UDP socket connected"); + mon.NotifyAll(); + + return NS_OK; +} + +// callback while UDP socket is closed +NS_IMETHODIMP NrUdpSocketIpc::CallListenerClosed() { + ASSERT_ON_THREAD(io_thread_); + + ReentrantMonitorAutoEnter mon(monitor_); + + r_log(LOG_GENERIC, LOG_DEBUG, "UDP socket closed this=%p", (void*) this); + MOZ_ASSERT(state_ == NR_CONNECTED || state_ == NR_CLOSING); + state_ = NR_CLOSED; + + return NS_OK; +} + +// +// NrSocketBase methods. +// +int NrUdpSocketIpc::create(nr_transport_addr *addr) { + ASSERT_ON_THREAD(sts_thread_); + + int r, _status; + nsresult rv; + int32_t port; + nsCString host; + + ReentrantMonitorAutoEnter mon(monitor_); + + if (state_ != NR_INIT) { + ABORT(R_INTERNAL); + } + + sts_thread_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); + if (NS_FAILED(rv)) { + MOZ_ASSERT(false, "Failed to get STS thread"); + ABORT(R_INTERNAL); + } + + if ((r=nr_transport_addr_get_addrstring_and_port(addr, &host, &port))) { + ABORT(r); + } + + // wildcard address will be resolved at NrUdpSocketIpc::CallListenerVoid + if ((r=nr_transport_addr_copy(&my_addr_, addr))) { + ABORT(r); + } + + state_ = NR_CONNECTING; + + RUN_ON_THREAD(io_thread_, + mozilla::WrapRunnable(RefPtr<NrUdpSocketIpc>(this), + &NrUdpSocketIpc::create_i, + host, static_cast<uint16_t>(port)), + NS_DISPATCH_NORMAL); + + // Wait until socket creation complete. + mon.Wait(); + + if (err_) { + close(); + ABORT(R_INTERNAL); + } + + state_ = NR_CONNECTED; + + _status = 0; +abort: + return(_status); +} + +int NrUdpSocketIpc::sendto(const void *msg, size_t len, int flags, + nr_transport_addr *to) { + ASSERT_ON_THREAD(sts_thread_); + + ReentrantMonitorAutoEnter mon(monitor_); + + //If send err happened before, simply return the error. + if (err_) { + return R_IO_ERROR; + } + + if (state_ != NR_CONNECTED) { + return R_INTERNAL; + } + + int r; + net::NetAddr addr; + if ((r=nr_transport_addr_to_netaddr(to, &addr))) { + return r; + } + + if (nr_is_stun_request_message((UCHAR*)msg, len) && ShouldDrop(len)) { + return R_WOULDBLOCK; + } + + nsAutoPtr<DataBuffer> buf(new DataBuffer(static_cast<const uint8_t*>(msg), len)); + + RUN_ON_THREAD(io_thread_, + mozilla::WrapRunnable(RefPtr<NrUdpSocketIpc>(this), + &NrUdpSocketIpc::sendto_i, + addr, buf), + NS_DISPATCH_NORMAL); + return 0; +} + +void NrUdpSocketIpc::close() { + r_log(LOG_GENERIC, LOG_DEBUG, "NrUdpSocketIpc::close()"); + + ASSERT_ON_THREAD(sts_thread_); + + ReentrantMonitorAutoEnter mon(monitor_); + state_ = NR_CLOSING; + + RUN_ON_THREAD(io_thread_, + mozilla::WrapRunnable(RefPtr<NrUdpSocketIpc>(this), + &NrUdpSocketIpc::close_i), + NS_DISPATCH_NORMAL); + + //remove all enqueued messages + std::queue<RefPtr<nr_udp_message> > empty; + std::swap(received_msgs_, empty); +} + +int NrUdpSocketIpc::recvfrom(void *buf, size_t maxlen, size_t *len, int flags, + nr_transport_addr *from) { + ASSERT_ON_THREAD(sts_thread_); + + ReentrantMonitorAutoEnter mon(monitor_); + + int r, _status; + uint32_t consumed_len; + + *len = 0; + + if (state_ != NR_CONNECTED) { + ABORT(R_INTERNAL); + } + + if (received_msgs_.empty()) { + ABORT(R_WOULDBLOCK); + } + + { + RefPtr<nr_udp_message> msg(received_msgs_.front()); + + received_msgs_.pop(); + + if ((r=nr_praddr_to_transport_addr(&msg->from, from, IPPROTO_UDP, 0))) { + err_ = true; + MOZ_ASSERT(false, "Get bogus address for received UDP packet"); + ABORT(r); + } + + consumed_len = std::min(maxlen, msg->data->len()); + if (consumed_len < msg->data->len()) { + r_log(LOG_GENERIC, LOG_DEBUG, "Partial received UDP packet will be discard"); + } + + memcpy(buf, msg->data->data(), consumed_len); + *len = consumed_len; + } + + _status = 0; +abort: + return(_status); +} + +int NrUdpSocketIpc::getaddr(nr_transport_addr *addrp) { + ASSERT_ON_THREAD(sts_thread_); + + ReentrantMonitorAutoEnter mon(monitor_); + + if (state_ != NR_CONNECTED) { + return R_INTERNAL; + } + + return nr_transport_addr_copy(addrp, &my_addr_); +} + +int NrUdpSocketIpc::connect(nr_transport_addr *addr) { + int r,_status; + int32_t port; + nsCString host; + + ReentrantMonitorAutoEnter mon(monitor_); + r_log(LOG_GENERIC, LOG_DEBUG, "NrUdpSocketIpc::connect(%s) this=%p", addr->as_string, + (void*) this); + + if ((r=nr_transport_addr_get_addrstring_and_port(addr, &host, &port))) { + ABORT(r); + } + + RUN_ON_THREAD(io_thread_, + mozilla::WrapRunnable(RefPtr<NrUdpSocketIpc>(this), + &NrUdpSocketIpc::connect_i, + host, static_cast<uint16_t>(port)), + NS_DISPATCH_NORMAL); + + // Wait until connect() completes. + mon.Wait(); + + r_log(LOG_GENERIC, LOG_DEBUG, "NrUdpSocketIpc::connect this=%p completed err_ = %s", + (void*) this, err_ ? "true" : "false"); + + if (err_) { + ABORT(R_INTERNAL); + } + + _status = 0; +abort: + return _status; +} + +int NrUdpSocketIpc::write(const void *msg, size_t len, size_t *written) { + MOZ_ASSERT(false); + return R_INTERNAL; +} + +int NrUdpSocketIpc::read(void* buf, size_t maxlen, size_t *len) { + MOZ_ASSERT(false); + return R_INTERNAL; +} + +int NrUdpSocketIpc::listen(int backlog) { + MOZ_ASSERT(false); + return R_INTERNAL; +} + +int NrUdpSocketIpc::accept(nr_transport_addr *addrp, nr_socket **sockp) { + MOZ_ASSERT(false); + return R_INTERNAL; +} + +// IO thread executors +void NrUdpSocketIpc::create_i(const nsACString &host, const uint16_t port) { + ASSERT_ON_THREAD(io_thread_); + + uint32_t minBuffSize = 0; + nsresult rv; + nsCOMPtr<nsIUDPSocketChild> socketChild = do_CreateInstance("@mozilla.org/udp-socket-child;1", &rv); + if (NS_FAILED(rv)) { + ReentrantMonitorAutoEnter mon(monitor_); + err_ = true; + MOZ_ASSERT(false, "Failed to create UDPSocketChild"); + return; + } + + // This can spin the event loop; don't do that with the monitor held + socketChild->SetBackgroundSpinsEvents(); + + ReentrantMonitorAutoEnter mon(monitor_); + if (!socket_child_) { + socket_child_ = socketChild; + socket_child_->SetFilterName(nsCString(NS_NETWORK_SOCKET_FILTER_HANDLER_STUN_SUFFIX)); + } else { + socketChild = nullptr; + } + + RefPtr<NrUdpSocketIpcProxy> proxy(new NrUdpSocketIpcProxy); + rv = proxy->Init(this); + if (NS_FAILED(rv)) { + err_ = true; + mon.NotifyAll(); + return; + } + +#ifdef XP_WIN + if (!mozilla::IsWin8OrLater()) { + // Increase default receive and send buffer size on <= Win7 to be able to + // receive and send an unpaced HD (>= 720p = 1280x720 - I Frame ~ 21K size) + // stream without losing packets. + // Manual testing showed that 100K buffer size was not enough and the + // packet loss dis-appeared with 256K buffer size. + // See bug 1252769 for future improvements of this. + minBuffSize = 256 * 1024; + } +#endif + // XXX bug 1126232 - don't use null Principal! + if (NS_FAILED(socket_child_->Bind(proxy, nullptr, host, port, + /* reuse = */ false, + /* loopback = */ false, + /* recv buffer size */ minBuffSize, + /* send buffer size */ minBuffSize))) { + err_ = true; + MOZ_ASSERT(false, "Failed to create UDP socket"); + mon.NotifyAll(); + return; + } +} + +void NrUdpSocketIpc::connect_i(const nsACString &host, const uint16_t port) { + ASSERT_ON_THREAD(io_thread_); + nsresult rv; + ReentrantMonitorAutoEnter mon(monitor_); + + RefPtr<NrUdpSocketIpcProxy> proxy(new NrUdpSocketIpcProxy); + rv = proxy->Init(this); + if (NS_FAILED(rv)) { + err_ = true; + mon.NotifyAll(); + return; + } + + if (NS_FAILED(socket_child_->Connect(proxy, host, port))) { + err_ = true; + MOZ_ASSERT(false, "Failed to connect UDP socket"); + mon.NotifyAll(); + return; + } +} + + +void NrUdpSocketIpc::sendto_i(const net::NetAddr &addr, nsAutoPtr<DataBuffer> buf) { + ASSERT_ON_THREAD(io_thread_); + + ReentrantMonitorAutoEnter mon(monitor_); + + if (!socket_child_) { + MOZ_ASSERT(false); + err_ = true; + return; + } + if (NS_FAILED(socket_child_->SendWithAddress(&addr, + buf->data(), + buf->len()))) { + err_ = true; + } +} + +void NrUdpSocketIpc::close_i() { + ASSERT_ON_THREAD(io_thread_); + + if (socket_child_) { + socket_child_->Close(); + socket_child_ = nullptr; + } +} + +#if defined(MOZILLA_INTERNAL_API) +// close(), but transfer the socket_child_ reference to die as well +// static +void NrUdpSocketIpc::release_child_i(nsIUDPSocketChild* aChild, + nsCOMPtr<nsIEventTarget> sts_thread) { + RefPtr<nsIUDPSocketChild> socket_child_ref = + already_AddRefed<nsIUDPSocketChild>(aChild); + if (socket_child_ref) { + socket_child_ref->Close(); + } + // Tell SingletonThreadHolder we're done with it + RUN_ON_THREAD(sts_thread, + mozilla::WrapRunnableNM(&NrUdpSocketIpc::release_use_s), + NS_DISPATCH_NORMAL); +} + +void NrUdpSocketIpc::release_use_s() { + sThread->ReleaseUse(); +} +#endif + +void NrUdpSocketIpc::recv_callback_s(RefPtr<nr_udp_message> msg) { + ASSERT_ON_THREAD(sts_thread_); + + { + ReentrantMonitorAutoEnter mon(monitor_); + if (state_ != NR_CONNECTED) { + return; + } + } + + //enqueue received message + received_msgs_.push(msg); + + if ((poll_flags() & PR_POLL_READ)) { + fire_callback(NR_ASYNC_WAIT_READ); + } +} + +#if defined(MOZILLA_INTERNAL_API) +// TCPSocket. +class NrTcpSocketIpc::TcpSocketReadyRunner: public Runnable +{ +public: + explicit TcpSocketReadyRunner(NrTcpSocketIpc *sck) + : socket_(sck) {} + + NS_IMETHOD Run() override { + socket_->maybe_post_socket_ready(); + return NS_OK; + } + +private: + RefPtr<NrTcpSocketIpc> socket_; +}; + + +NS_IMPL_ISUPPORTS(NrTcpSocketIpc, + nsITCPSocketCallback) + +NrTcpSocketIpc::NrTcpSocketIpc(nsIThread* aThread) + : NrSocketIpc(static_cast<nsIEventTarget*>(aThread)), + mirror_state_(NR_INIT), + state_(NR_INIT), + buffered_bytes_(0), + tracking_number_(0) { +} + +NrTcpSocketIpc::~NrTcpSocketIpc() +{ + // also guarantees socket_child_ is released from the io_thread + + // close(), but transfer the socket_child_ reference to die as well + RUN_ON_THREAD(io_thread_, + mozilla::WrapRunnableNM(&NrTcpSocketIpc::release_child_i, + socket_child_.forget().take(), + sts_thread_), + NS_DISPATCH_NORMAL); +} + +// +// nsITCPSocketCallback methods +// +NS_IMETHODIMP NrTcpSocketIpc::UpdateReadyState(uint32_t aReadyState) { + NrSocketIpcState temp = NR_INIT; + switch (static_cast<dom::TCPReadyState>(aReadyState)) { + case dom::TCPReadyState::Connecting: + temp = NR_CONNECTING; + break; + case dom::TCPReadyState::Open: + temp = NR_CONNECTED; + break; + case dom::TCPReadyState::Closing: + temp = NR_CLOSING; + break; + case dom::TCPReadyState::Closed: + temp = NR_CLOSED; + break; + default: + MOZ_ASSERT(false, "Invalid ReadyState"); + return NS_OK; + } + if (mirror_state_ != temp) { + mirror_state_ = temp; + RUN_ON_THREAD(sts_thread_, + mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this), + &NrTcpSocketIpc::update_state_s, + temp), + NS_DISPATCH_NORMAL); + } + return NS_OK; +} + +NS_IMETHODIMP NrTcpSocketIpc::UpdateBufferedAmount(uint32_t buffered_amount, + uint32_t tracking_number) { + RUN_ON_THREAD(sts_thread_, + mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this), + &NrTcpSocketIpc::message_sent_s, + buffered_amount, + tracking_number), + NS_DISPATCH_NORMAL); + + return NS_OK; +} + +NS_IMETHODIMP NrTcpSocketIpc::FireDataArrayEvent(const nsAString& aType, + const InfallibleTArray<uint8_t>& buffer) { + // Called when we received data. + uint8_t *buf = const_cast<uint8_t*>(buffer.Elements()); + + nsAutoPtr<DataBuffer> data_buf(new DataBuffer(buf, buffer.Length())); + RefPtr<nr_tcp_message> msg = new nr_tcp_message(data_buf); + + RUN_ON_THREAD(sts_thread_, + mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this), + &NrTcpSocketIpc::recv_message_s, + msg), + NS_DISPATCH_NORMAL); + return NS_OK; +} + +NS_IMETHODIMP NrTcpSocketIpc::FireErrorEvent(const nsAString &type, + const nsAString &name) { + r_log(LOG_GENERIC, LOG_ERR, + "Error from TCPSocketChild: type: %s, name: %s", + NS_LossyConvertUTF16toASCII(type).get(), NS_LossyConvertUTF16toASCII(name).get()); + socket_child_ = nullptr; + + mirror_state_ = NR_CLOSED; + RUN_ON_THREAD(sts_thread_, + mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this), + &NrTcpSocketIpc::update_state_s, + NR_CLOSED), + NS_DISPATCH_NORMAL); + + return NS_OK; +} + +// methods of nsITCPSocketCallback that we are not going to implement. + +NS_IMETHODIMP NrTcpSocketIpc::FireDataStringEvent(const nsAString &type, + const nsACString &data) { + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP NrTcpSocketIpc::FireEvent(const nsAString &type) { + // XXX support type.mData == 'close' at least + return NS_ERROR_NOT_IMPLEMENTED; +} + +// +// NrSocketBase methods. +// +int NrTcpSocketIpc::create(nr_transport_addr *addr) { + int r, _status; + nsresult rv; + int32_t port; + nsCString host; + + if (state_ != NR_INIT) { + ABORT(R_INTERNAL); + } + + sts_thread_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); + if (NS_FAILED(rv)) { + MOZ_ASSERT(false, "Failed to get STS thread"); + ABORT(R_INTERNAL); + } + + // Sanity check + if ((r=nr_transport_addr_get_addrstring_and_port(addr, &host, &port))) { + ABORT(r); + } + + if ((r=nr_transport_addr_copy(&my_addr_, addr))) { + ABORT(r); + } + + _status = 0; +abort: + return(_status); +} + +int NrTcpSocketIpc::sendto(const void *msg, size_t len, + int flags, nr_transport_addr *to) { + MOZ_ASSERT(false); + return R_INTERNAL; +} + +int NrTcpSocketIpc::recvfrom(void * buf, size_t maxlen, + size_t *len, int flags, + nr_transport_addr *from) { + MOZ_ASSERT(false); + return R_INTERNAL; +} + +int NrTcpSocketIpc::getaddr(nr_transport_addr *addrp) { + ASSERT_ON_THREAD(sts_thread_); + return nr_transport_addr_copy(addrp, &my_addr_); +} + +void NrTcpSocketIpc::close() { + ASSERT_ON_THREAD(sts_thread_); + + if (state_ == NR_CLOSED || state_ == NR_CLOSING) { + return; + } + + state_ = NR_CLOSING; + + RUN_ON_THREAD(io_thread_, + mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this), + &NrTcpSocketIpc::close_i), + NS_DISPATCH_NORMAL); + + //remove all enqueued messages + std::queue<RefPtr<nr_tcp_message>> empty; + std::swap(msg_queue_, empty); +} + +int NrTcpSocketIpc::connect(nr_transport_addr *addr) { + nsCString remote_addr, local_addr; + int32_t remote_port, local_port; + int r, _status; + if ((r=nr_transport_addr_get_addrstring_and_port(addr, + &remote_addr, + &remote_port))) { + ABORT(r); + } + + if ((r=nr_transport_addr_get_addrstring_and_port(&my_addr_, + &local_addr, + &local_port))) { + MOZ_ASSERT(false); // shouldn't fail as it was sanity-checked in ::create() + ABORT(r); + } + + state_ = mirror_state_ = NR_CONNECTING; + RUN_ON_THREAD(io_thread_, + mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this), + &NrTcpSocketIpc::connect_i, + remote_addr, + static_cast<uint16_t>(remote_port), + local_addr, + static_cast<uint16_t>(local_port)), + NS_DISPATCH_NORMAL); + + // Make caller wait for ready to write. + _status = R_WOULDBLOCK; + abort: + return _status; +} + +int NrTcpSocketIpc::write(const void *msg, size_t len, size_t *written) { + ASSERT_ON_THREAD(sts_thread_); + int _status = 0; + if (state_ != NR_CONNECTED) { + ABORT(R_FAILED); + } + + if (buffered_bytes_ + len >= nsITCPSocketCallback::BUFFER_SIZE) { + ABORT(R_WOULDBLOCK); + } + + buffered_bytes_ += len; + { + InfallibleTArray<uint8_t>* arr = new InfallibleTArray<uint8_t>(); + arr->AppendElements(static_cast<const uint8_t*>(msg), len); + // keep track of un-acknowleged writes by tracking number. + writes_in_flight_.push_back(len); + RUN_ON_THREAD(io_thread_, + mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this), + &NrTcpSocketIpc::write_i, + nsAutoPtr<InfallibleTArray<uint8_t>>(arr), + ++tracking_number_), + NS_DISPATCH_NORMAL); + } + *written = len; + abort: + return _status; +} + +int NrTcpSocketIpc::read(void* buf, size_t maxlen, size_t *len) { + int _status = 0; + if (state_ != NR_CONNECTED) { + ABORT(R_FAILED); + } + + if (msg_queue_.size() == 0) { + ABORT(R_WOULDBLOCK); + } + + { + RefPtr<nr_tcp_message> msg(msg_queue_.front()); + size_t consumed_len = std::min(maxlen, msg->unread_bytes()); + memcpy(buf, msg->reading_pointer(), consumed_len); + if (consumed_len < msg->unread_bytes()) { + // There is still something left in buffer. + msg->read_bytes += consumed_len; + } else { + msg_queue_.pop(); + } + *len = consumed_len; + } + + abort: + return _status; +} + +int NrTcpSocketIpc::listen(int backlog) { + return R_INTERNAL; +} + +int NrTcpSocketIpc::accept(nr_transport_addr *addrp, nr_socket **sockp) { + return R_INTERNAL; +} + +void NrTcpSocketIpc::connect_i(const nsACString &remote_addr, + uint16_t remote_port, + const nsACString &local_addr, + uint16_t local_port) { + ASSERT_ON_THREAD(io_thread_); + mirror_state_ = NR_CONNECTING; + + dom::TCPSocketChild* child = new dom::TCPSocketChild(NS_ConvertUTF8toUTF16(remote_addr), remote_port); + socket_child_ = child; + + // Bug 1285330: put filtering back in here + + // XXX remove remote! + socket_child_->SendWindowlessOpenBind(this, + remote_addr, remote_port, + local_addr, local_port, + /* use ssl */ false); +} + +void NrTcpSocketIpc::write_i(nsAutoPtr<InfallibleTArray<uint8_t>> arr, + uint32_t tracking_number) { + ASSERT_ON_THREAD(io_thread_); + if (!socket_child_) { + return; + } + socket_child_->SendSendArray(*arr, tracking_number); +} + +void NrTcpSocketIpc::close_i() { + ASSERT_ON_THREAD(io_thread_); + mirror_state_ = NR_CLOSING; + if (!socket_child_) { + return; + } + socket_child_->SendClose(); +} + +// close(), but transfer the socket_child_ reference to die as well +// static +void NrTcpSocketIpc::release_child_i(dom::TCPSocketChild* aChild, + nsCOMPtr<nsIEventTarget> sts_thread) { + RefPtr<dom::TCPSocketChild> socket_child_ref = + already_AddRefed<dom::TCPSocketChild>(aChild); + if (socket_child_ref) { + socket_child_ref->SendClose(); + } + // io_thread_ is MainThread, so no use to release +} + +void NrTcpSocketIpc::message_sent_s(uint32_t buffered_amount, + uint32_t tracking_number) { + ASSERT_ON_THREAD(sts_thread_); + + size_t num_unacked_writes = tracking_number_ - tracking_number; + while (writes_in_flight_.size() > num_unacked_writes) { + writes_in_flight_.pop_front(); + } + + for (size_t unacked_write_len : writes_in_flight_) { + buffered_amount += unacked_write_len; + } + + r_log(LOG_GENERIC, LOG_ERR, + "UpdateBufferedAmount: (tracking %u): %u, waiting: %s", + tracking_number, buffered_amount, + (poll_flags() & PR_POLL_WRITE) ? "yes" : "no"); + + buffered_bytes_ = buffered_amount; + maybe_post_socket_ready(); +} + +void NrTcpSocketIpc::recv_message_s(nr_tcp_message *msg) { + ASSERT_ON_THREAD(sts_thread_); + msg_queue_.push(msg); + maybe_post_socket_ready(); +} + +void NrTcpSocketIpc::update_state_s(NrSocketIpcState next_state) { + ASSERT_ON_THREAD(sts_thread_); + // only allow valid transitions + switch (state_) { + case NR_CONNECTING: + if (next_state == NR_CONNECTED) { + state_ = NR_CONNECTED; + maybe_post_socket_ready(); + } else { + state_ = next_state; // all states are valid from CONNECTING + } + break; + case NR_CONNECTED: + if (next_state != NR_CONNECTING) { + state_ = next_state; + } + break; + case NR_CLOSING: + if (next_state == NR_CLOSED) { + state_ = next_state; + } + break; + case NR_CLOSED: + break; + default: + MOZ_CRASH("update_state_s while in illegal state"); + } +} + +void NrTcpSocketIpc::maybe_post_socket_ready() { + bool has_event = false; + if (state_ == NR_CONNECTED) { + if (poll_flags() & PR_POLL_WRITE) { + // This effectively polls via the event loop until the + // NR_ASYNC_WAIT_WRITE is no longer armed. + if (buffered_bytes_ < nsITCPSocketCallback::BUFFER_SIZE) { + r_log(LOG_GENERIC, LOG_INFO, "Firing write callback (%u)", + (uint32_t)buffered_bytes_); + fire_callback(NR_ASYNC_WAIT_WRITE); + has_event = true; + } + } + if (poll_flags() & PR_POLL_READ) { + if (msg_queue_.size()) { + r_log(LOG_GENERIC, LOG_INFO, "Firing read callback (%u)", + (uint32_t)msg_queue_.size()); + fire_callback(NR_ASYNC_WAIT_READ); + has_event = true; + } + } + } + + // If any event has been posted, we post a runnable to see + // if the events have to be posted again. + if (has_event) { + RefPtr<TcpSocketReadyRunner> runnable = new TcpSocketReadyRunner(this); + NS_DispatchToCurrentThread(runnable); + } +} +#endif + +} // close namespace + + +using namespace mozilla; + +// Bridge to the nr_socket interface +static int nr_socket_local_destroy(void **objp); +static int nr_socket_local_sendto(void *obj,const void *msg, size_t len, + int flags, nr_transport_addr *to); +static int nr_socket_local_recvfrom(void *obj,void * restrict buf, + size_t maxlen, size_t *len, int flags, nr_transport_addr *from); +static int nr_socket_local_getfd(void *obj, NR_SOCKET *fd); +static int nr_socket_local_getaddr(void *obj, nr_transport_addr *addrp); +static int nr_socket_local_close(void *obj); +static int nr_socket_local_connect(void *sock, nr_transport_addr *addr); +static int nr_socket_local_write(void *obj,const void *msg, size_t len, + size_t *written); +static int nr_socket_local_read(void *obj,void * restrict buf, size_t maxlen, + size_t *len); +static int nr_socket_local_listen(void *obj, int backlog); +static int nr_socket_local_accept(void *obj, nr_transport_addr *addrp, + nr_socket **sockp); + +static nr_socket_vtbl nr_socket_local_vtbl={ + 2, + nr_socket_local_destroy, + nr_socket_local_sendto, + nr_socket_local_recvfrom, + nr_socket_local_getfd, + nr_socket_local_getaddr, + nr_socket_local_connect, + nr_socket_local_write, + nr_socket_local_read, + nr_socket_local_close, + nr_socket_local_listen, + nr_socket_local_accept +}; + +/* static */ +int +NrSocketBase::CreateSocket(nr_transport_addr *addr, RefPtr<NrSocketBase> *sock) +{ + int r, _status; + + // create IPC bridge for content process + if (XRE_IsParentProcess()) { + *sock = new NrSocket(); + } else { + switch (addr->protocol) { + case IPPROTO_UDP: + *sock = new NrUdpSocketIpc(); + break; + case IPPROTO_TCP: +#if defined(MOZILLA_INTERNAL_API) + { + nsCOMPtr<nsIThread> main_thread; + NS_GetMainThread(getter_AddRefs(main_thread)); + *sock = new NrTcpSocketIpc(main_thread.get()); + } +#else + ABORT(R_REJECTED); +#endif + break; + } + } + + r = (*sock)->create(addr); + if (r) + ABORT(r); + + _status = 0; +abort: + if (_status) { + *sock = nullptr; + } + return _status; +} + +int nr_socket_local_create(void *obj, nr_transport_addr *addr, nr_socket **sockp) { + RefPtr<NrSocketBase> sock; + int r, _status; + + r = NrSocketBase::CreateSocket(addr, &sock); + if (r) { + ABORT(r); + } + + r = nr_socket_create_int(static_cast<void *>(sock), + sock->vtbl(), sockp); + if (r) + ABORT(r); + + _status = 0; + + { + // We will release this reference in destroy(), not exactly the normal + // ownership model, but it is what it is. + NrSocketBase* dummy = sock.forget().take(); + (void)dummy; + } + +abort: + return _status; +} + + +static int nr_socket_local_destroy(void **objp) { + if(!objp || !*objp) + return 0; + + NrSocketBase *sock = static_cast<NrSocketBase *>(*objp); + *objp = 0; + + sock->close(); // Signal STS that we want not to listen + sock->Release(); // Decrement the ref count + + return 0; +} + +static int nr_socket_local_sendto(void *obj,const void *msg, size_t len, + int flags, nr_transport_addr *addr) { + NrSocketBase *sock = static_cast<NrSocketBase *>(obj); + + return sock->sendto(msg, len, flags, addr); +} + +static int nr_socket_local_recvfrom(void *obj,void * restrict buf, + size_t maxlen, size_t *len, int flags, + nr_transport_addr *addr) { + NrSocketBase *sock = static_cast<NrSocketBase *>(obj); + + return sock->recvfrom(buf, maxlen, len, flags, addr); +} + +static int nr_socket_local_getfd(void *obj, NR_SOCKET *fd) { + NrSocketBase *sock = static_cast<NrSocketBase *>(obj); + + *fd = sock; + + return 0; +} + +static int nr_socket_local_getaddr(void *obj, nr_transport_addr *addrp) { + NrSocketBase *sock = static_cast<NrSocketBase *>(obj); + + return sock->getaddr(addrp); +} + + +static int nr_socket_local_close(void *obj) { + NrSocketBase *sock = static_cast<NrSocketBase *>(obj); + + sock->close(); + + return 0; +} + +static int nr_socket_local_write(void *obj, const void *msg, size_t len, + size_t *written) { + NrSocketBase *sock = static_cast<NrSocketBase *>(obj); + + return sock->write(msg, len, written); +} + +static int nr_socket_local_read(void *obj, void * restrict buf, size_t maxlen, + size_t *len) { + NrSocketBase *sock = static_cast<NrSocketBase *>(obj); + + return sock->read(buf, maxlen, len); +} + +static int nr_socket_local_connect(void *obj, nr_transport_addr *addr) { + NrSocketBase *sock = static_cast<NrSocketBase *>(obj); + + return sock->connect(addr); +} + +static int nr_socket_local_listen(void *obj, int backlog) { + NrSocketBase *sock = static_cast<NrSocketBase *>(obj); + + return sock->listen(backlog); +} + +static int nr_socket_local_accept(void *obj, nr_transport_addr *addrp, + nr_socket **sockp) { + NrSocketBase *sock = static_cast<NrSocketBase *>(obj); + + return sock->accept(addrp, sockp); +} + +// Implement async api +int NR_async_wait(NR_SOCKET sock, int how, NR_async_cb cb,void *cb_arg, + char *function,int line) { + NrSocketBase *s = static_cast<NrSocketBase *>(sock); + + return s->async_wait(how, cb, cb_arg, function, line); +} + +int NR_async_cancel(NR_SOCKET sock,int how) { + NrSocketBase *s = static_cast<NrSocketBase *>(sock); + + return s->cancel(how); +} + +nr_socket_vtbl* NrSocketBase::vtbl() { + return &nr_socket_local_vtbl; +} |