diff options
Diffstat (limited to 'ipc/unixsocket')
-rw-r--r-- | ipc/unixsocket/ConnectionOrientedSocket.cpp | 202 | ||||
-rw-r--r-- | ipc/unixsocket/ConnectionOrientedSocket.h | 122 | ||||
-rw-r--r-- | ipc/unixsocket/DataSocket.cpp | 136 | ||||
-rw-r--r-- | ipc/unixsocket/DataSocket.h | 145 | ||||
-rw-r--r-- | ipc/unixsocket/ListenSocket.cpp | 432 | ||||
-rw-r--r-- | ipc/unixsocket/ListenSocket.h | 93 | ||||
-rw-r--r-- | ipc/unixsocket/ListenSocketConsumer.cpp | 20 | ||||
-rw-r--r-- | ipc/unixsocket/ListenSocketConsumer.h | 46 | ||||
-rw-r--r-- | ipc/unixsocket/SocketBase.cpp | 449 | ||||
-rw-r--r-- | ipc/unixsocket/SocketBase.h | 585 | ||||
-rw-r--r-- | ipc/unixsocket/StreamSocket.cpp | 482 | ||||
-rw-r--r-- | ipc/unixsocket/StreamSocket.h | 95 | ||||
-rw-r--r-- | ipc/unixsocket/StreamSocketConsumer.cpp | 20 | ||||
-rw-r--r-- | ipc/unixsocket/StreamSocketConsumer.h | 60 | ||||
-rw-r--r-- | ipc/unixsocket/UnixSocketConnector.cpp | 38 | ||||
-rw-r--r-- | ipc/unixsocket/UnixSocketConnector.h | 102 | ||||
-rw-r--r-- | ipc/unixsocket/moz.build | 31 |
17 files changed, 3058 insertions, 0 deletions
diff --git a/ipc/unixsocket/ConnectionOrientedSocket.cpp b/ipc/unixsocket/ConnectionOrientedSocket.cpp new file mode 100644 index 000000000..abcfd1734 --- /dev/null +++ b/ipc/unixsocket/ConnectionOrientedSocket.cpp @@ -0,0 +1,202 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "ConnectionOrientedSocket.h" +#include "nsISupportsImpl.h" // for MOZ_COUNT_CTOR, MOZ_COUNT_DTOR +#include "UnixSocketConnector.h" + +namespace mozilla { +namespace ipc { + +// +// ConnectionOrientedSocketIO +// + +ConnectionOrientedSocketIO::ConnectionOrientedSocketIO( + MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + int aFd, ConnectionStatus aConnectionStatus, + UnixSocketConnector* aConnector) + : DataSocketIO(aConsumerLoop) + , UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus) + , mConnector(aConnector) + , mPeerAddressLength(0) +{ + MOZ_ASSERT(mConnector); + + MOZ_COUNT_CTOR_INHERITED(ConnectionOrientedSocketIO, DataSocketIO); +} + +ConnectionOrientedSocketIO::ConnectionOrientedSocketIO( + MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + UnixSocketConnector* aConnector) + : DataSocketIO(aConsumerLoop) + , UnixSocketWatcher(aIOLoop) + , mConnector(aConnector) + , mPeerAddressLength(0) +{ + MOZ_ASSERT(mConnector); + + MOZ_COUNT_CTOR_INHERITED(ConnectionOrientedSocketIO, DataSocketIO); +} + +ConnectionOrientedSocketIO::~ConnectionOrientedSocketIO() +{ + MOZ_COUNT_DTOR_INHERITED(ConnectionOrientedSocketIO, DataSocketIO); +} + +nsresult +ConnectionOrientedSocketIO::Accept(int aFd, + const struct sockaddr* aPeerAddress, + socklen_t aPeerAddressLength) +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTING); + + SetSocket(aFd, SOCKET_IS_CONNECTED); + + // Address setup + mPeerAddressLength = aPeerAddressLength; + memcpy(&mPeerAddress, aPeerAddress, mPeerAddressLength); + + // Signal success and start data transfer + OnConnected(); + + return NS_OK; +} + +nsresult +ConnectionOrientedSocketIO::Connect() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(!IsOpen()); + + struct sockaddr* peerAddress = + reinterpret_cast<struct sockaddr*>(&mPeerAddress); + mPeerAddressLength = sizeof(mPeerAddress); + + int fd; + nsresult rv = mConnector->CreateStreamSocket(peerAddress, + &mPeerAddressLength, + fd); + if (NS_FAILED(rv)) { + // Tell the consumer thread we've errored + GetConsumerThread()->PostTask( + MakeAndAddRef<SocketEventTask>(this, SocketEventTask::CONNECT_ERROR)); + return NS_ERROR_FAILURE; + } + + SetFd(fd); + + // calls OnConnected() on success, or OnError() otherwise + rv = UnixSocketWatcher::Connect(peerAddress, mPeerAddressLength); + + if (NS_FAILED(rv)) { + return rv; + } + + return NS_OK; +} + +void +ConnectionOrientedSocketIO::Send(UnixSocketIOBuffer* aBuffer) +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + + EnqueueData(aBuffer); + AddWatchers(WRITE_WATCHER, false); +} + +// |UnixSocketWatcher| + +void +ConnectionOrientedSocketIO::OnSocketCanReceiveWithoutBlocking() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 + + ssize_t res = ReceiveData(GetFd()); + if (res < 0) { + /* I/O error */ + RemoveWatchers(READ_WATCHER|WRITE_WATCHER); + } else if (!res) { + /* EOF or peer shutdown */ + RemoveWatchers(READ_WATCHER); + } +} + +void +ConnectionOrientedSocketIO::OnSocketCanSendWithoutBlocking() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 + MOZ_ASSERT(!IsShutdownOnIOThread()); + + nsresult rv = SendPendingData(GetFd()); + if (NS_FAILED(rv)) { + return; + } + + if (HasPendingData()) { + AddWatchers(WRITE_WATCHER, false); + } +} + +void +ConnectionOrientedSocketIO::OnConnected() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); + + GetConsumerThread()->PostTask( + MakeAndAddRef<SocketEventTask>(this, SocketEventTask::CONNECT_SUCCESS)); + + AddWatchers(READ_WATCHER, true); + if (HasPendingData()) { + AddWatchers(WRITE_WATCHER, false); + } +} + +void +ConnectionOrientedSocketIO::OnListening() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + + NS_NOTREACHED("Invalid call to |ConnectionOrientedSocketIO::OnListening|"); +} + +void +ConnectionOrientedSocketIO::OnError(const char* aFunction, int aErrno) +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + + UnixFdWatcher::OnError(aFunction, aErrno); + + // Clean up watchers, status, fd + Close(); + + // Tell the consumer thread we've errored + GetConsumerThread()->PostTask( + MakeAndAddRef<SocketEventTask>(this, SocketEventTask::CONNECT_ERROR)); +} + +// +// ConnectionOrientedSocket +// + +ConnectionOrientedSocket::ConnectionOrientedSocket() +{ + MOZ_COUNT_CTOR_INHERITED(ConnectionOrientedSocket, DataSocket); +} + +ConnectionOrientedSocket::~ConnectionOrientedSocket() +{ + MOZ_COUNT_DTOR_INHERITED(ConnectionOrientedSocket, DataSocket); +} + +} +} diff --git a/ipc/unixsocket/ConnectionOrientedSocket.h b/ipc/unixsocket/ConnectionOrientedSocket.h new file mode 100644 index 000000000..a1236ec4a --- /dev/null +++ b/ipc/unixsocket/ConnectionOrientedSocket.h @@ -0,0 +1,122 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef mozilla_ipc_ConnectionOrientedSocket_h +#define mozilla_ipc_ConnectionOrientedSocket_h + +#include <sys/socket.h> +#include "DataSocket.h" +#include "mozilla/ipc/UnixSocketWatcher.h" + +class MessageLoop; + +namespace mozilla { +namespace ipc { + +class UnixSocketConnector; + +/* + * |ConnectionOrientedSocketIO| and |ConnectionOrientedSocket| define + * interfaces for implementing stream sockets on I/O and consumer thread. + * |ListenSocket| uses these classes to handle accepted sockets. + */ + +class ConnectionOrientedSocketIO + : public DataSocketIO + , public UnixSocketWatcher +{ +public: + virtual ~ConnectionOrientedSocketIO(); + + nsresult Accept(int aFd, + const struct sockaddr* aAddress, + socklen_t aAddressLength); + + nsresult Connect(); + + void Send(UnixSocketIOBuffer* aBuffer); + + // Methods for |UnixSocketWatcher| + // + + void OnSocketCanReceiveWithoutBlocking() final; + void OnSocketCanSendWithoutBlocking() final; + + void OnListening() final; + void OnConnected() final; + void OnError(const char* aFunction, int aErrno) final; + +protected: + /** + * Constructs an instance of |ConnectionOrientedSocketIO| + * + * @param aConsumerLoop The socket's consumer thread. + * @param aIOLoop The socket's I/O loop. + * @param aFd The socket file descriptor. + * @param aConnectionStatus The connection status for |aFd|. + * @param aConnector Connector object for socket-type-specific methods. + */ + ConnectionOrientedSocketIO(MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + int aFd, ConnectionStatus aConnectionStatus, + UnixSocketConnector* aConnector); + + /** + * Constructs an instance of |ConnectionOrientedSocketIO| + * + * @param aConsumerLoop The socket's consumer thread. + * @param aIOLoop The socket's I/O loop. + * @param aConnector Connector object for socket-type-specific methods. + */ + ConnectionOrientedSocketIO(MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + UnixSocketConnector* aConnector); + +private: + /** + * Connector object used to create the connection we are currently using. + */ + UniquePtr<UnixSocketConnector> mConnector; + + /** + * Number of valid bytes in |mPeerAddress|. + */ + socklen_t mPeerAddressLength; + + /** + * Address of the socket's current peer. + */ + struct sockaddr_storage mPeerAddress; +}; + +class ConnectionOrientedSocket : public DataSocket +{ +public: + /** + * Prepares an instance of |ConnectionOrientedSocket| in DISCONNECTED + * state for accepting a connection. Consumer-thread only. + * + * @param aConnector The new connector object, owned by the + * connection-oriented socket. + * @param aConsumerLoop The socket's consumer thread. + * @param aIOLoop The socket's I/O thread. + * @param[out] aIO, Returns an instance of |ConnectionOrientedSocketIO|. + * @return NS_OK on success, or an XPCOM error code otherwise. + */ + virtual nsresult PrepareAccept(UnixSocketConnector* aConnector, + MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + ConnectionOrientedSocketIO*& aIO) = 0; + +protected: + ConnectionOrientedSocket(); + virtual ~ConnectionOrientedSocket(); +}; + +} +} + +#endif // mozilla_ipc_ConnectionOrientedSocket diff --git a/ipc/unixsocket/DataSocket.cpp b/ipc/unixsocket/DataSocket.cpp new file mode 100644 index 000000000..057c59203 --- /dev/null +++ b/ipc/unixsocket/DataSocket.cpp @@ -0,0 +1,136 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ + +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "DataSocket.h" +#ifdef MOZ_TASK_TRACER +#include "GeckoTaskTracer.h" +#endif +#include "nsISupportsImpl.h" // for MOZ_COUNT_CTOR, MOZ_COUNT_DTOR + +#ifdef MOZ_TASK_TRACER +using namespace mozilla::tasktracer; +#endif + +namespace mozilla { +namespace ipc { + +// +// DataSocketIO +// + +DataSocketIO::~DataSocketIO() +{ + MOZ_COUNT_DTOR_INHERITED(DataSocketIO, SocketIOBase); +} + +void +DataSocketIO::EnqueueData(UnixSocketIOBuffer* aBuffer) +{ + if (!aBuffer->GetSize()) { + delete aBuffer; // delete empty data immediately + return; + } + mOutgoingQ.AppendElement(aBuffer); +} + +bool +DataSocketIO::HasPendingData() const +{ + return !mOutgoingQ.IsEmpty(); +} + +ssize_t +DataSocketIO::ReceiveData(int aFd) +{ + MOZ_ASSERT(aFd >= 0); + + UnixSocketIOBuffer* incoming; + nsresult rv = QueryReceiveBuffer(&incoming); + if (NS_FAILED(rv)) { + /* an error occured */ + GetConsumerThread()->PostTask( + MakeAndAddRef<SocketRequestClosingTask>(this)); + return -1; + } + + ssize_t res = incoming->Receive(aFd); + if (res < 0) { + /* an I/O error occured */ + DiscardBuffer(); + GetConsumerThread()->PostTask( + MakeAndAddRef<SocketRequestClosingTask>(this)); + return -1; + } else if (!res) { + /* EOF or peer shut down sending */ + DiscardBuffer(); + GetConsumerThread()->PostTask( + MakeAndAddRef<SocketRequestClosingTask>(this)); + return 0; + } + +#ifdef MOZ_TASK_TRACER + /* Make unix socket creation events to be the source events of TaskTracer, + * and originate the rest correlation tasks from here. + */ + AutoSourceEvent taskTracerEvent(SourceEventType::Unixsocket); +#endif + + ConsumeBuffer(); + + return res; +} + +nsresult +DataSocketIO::SendPendingData(int aFd) +{ + MOZ_ASSERT(aFd >= 0); + + while (HasPendingData()) { + UnixSocketIOBuffer* outgoing = mOutgoingQ.ElementAt(0); + + ssize_t res = outgoing->Send(aFd); + if (res < 0) { + /* an I/O error occured */ + GetConsumerThread()->PostTask( + MakeAndAddRef<SocketRequestClosingTask>(this)); + return NS_ERROR_FAILURE; + } else if (!res && outgoing->GetSize()) { + /* I/O is currently blocked; try again later */ + return NS_OK; + } + if (!outgoing->GetSize()) { + mOutgoingQ.RemoveElementAt(0); + delete outgoing; + } + } + + return NS_OK; +} + +DataSocketIO::DataSocketIO(MessageLoop* aConsumerLoop) + : SocketIOBase(aConsumerLoop) +{ + MOZ_COUNT_CTOR_INHERITED(DataSocketIO, SocketIOBase); +} + +// +// DataSocket +// + +DataSocket::DataSocket() +{ + MOZ_COUNT_CTOR_INHERITED(DataSocket, SocketBase); +} + +DataSocket::~DataSocket() +{ + MOZ_COUNT_DTOR_INHERITED(DataSocket, SocketBase); +} + +} +} diff --git a/ipc/unixsocket/DataSocket.h b/ipc/unixsocket/DataSocket.h new file mode 100644 index 000000000..454333461 --- /dev/null +++ b/ipc/unixsocket/DataSocket.h @@ -0,0 +1,145 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ + +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#ifndef mozilla_ipc_datasocket_h +#define mozilla_ipc_datasocket_h + +#include "mozilla/ipc/SocketBase.h" +#include "nsTArray.h" + +namespace mozilla { +namespace ipc { + +// +// DataSocket +// + +/** + * |DataSocket| represents a socket that can send or receive data. This + * can be a stream-based socket, a datagram-based socket, or any other + * socket that transfers data. + */ +class DataSocket : public SocketBase +{ +public: + virtual ~DataSocket(); + + /** + * Queue data to be sent to the socket on the IO thread. Can only be called on + * originating thread. + * + * @param aBuffer Data to be sent to socket + */ + virtual void SendSocketData(UnixSocketIOBuffer* aBuffer) = 0; + +protected: + DataSocket(); +}; + +// +// DataSocketIO +// + +/** + * |DataSocketIO| is a base class for Socket I/O classes that + * transfer data on the I/O thread. It provides methods for the + * most common read and write scenarios. + */ +class DataSocketIO : public SocketIOBase +{ +public: + virtual ~DataSocketIO(); + + /** + * Allocates a buffer for receiving data from the socket. The method + * shall return the buffer in the arguments. The buffer is owned by the + * I/O class. |DataSocketIO| will never ask for more than one buffer + * at a time, so I/O classes can handout the same buffer on each invokation + * of this method. I/O-thread only. + * + * @param[out] aBuffer returns a pointer to the I/O buffer + * @return NS_OK on success, or an error code otherwise + */ + virtual nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer) = 0; + + /** + * Marks the current socket buffer to by consumed by the I/O class. The + * class is resonsible for releasing the buffer afterwards. I/O-thread + * only. + * + * @param aIndex the socket's index + * @param[out] aBuffer the receive buffer + * @param[out] aSize the receive buffer's size + */ + virtual void ConsumeBuffer() = 0; + + /** + * Marks the current socket buffer to be discarded. The I/O class is + * resonsible for releasing the buffer's memory. I/O-thread only. + * + * @param aIndex the socket's index + */ + virtual void DiscardBuffer() = 0; + + void EnqueueData(UnixSocketIOBuffer* aBuffer); + bool HasPendingData() const; + + ssize_t ReceiveData(int aFd); + + nsresult SendPendingData(int aFd); + +protected: + DataSocketIO(MessageLoop* aConsumerLoop); + +private: + /** + * Raw data queue. Must be pushed/popped from I/O thread only. + */ + nsTArray<UnixSocketIOBuffer*> mOutgoingQ; +}; + +// +// Tasks +// + +/* |SocketIOSendTask| transfers an instance of |Tdata|, such as + * |UnixSocketRawData|, to the I/O thread and queues it up for + * sending the contained data. + */ +template<typename Tio, typename Tdata> +class SocketIOSendTask final : public SocketIOTask<Tio> +{ +public: + SocketIOSendTask(Tio* aIO, Tdata* aData) + : SocketIOTask<Tio>(aIO) + , mData(aData) + { + MOZ_ASSERT(aData); + } + + NS_IMETHOD Run() override + { + MOZ_ASSERT(!SocketIOTask<Tio>::IsCanceled()); + + Tio* io = SocketIOTask<Tio>::GetIO(); + MOZ_ASSERT(!io->IsConsumerThread()); + MOZ_ASSERT(!io->IsShutdownOnIOThread()); + + io->Send(mData); + + return NS_OK; + } + +private: + Tdata* mData; +}; + +} +} + +#endif diff --git a/ipc/unixsocket/ListenSocket.cpp b/ipc/unixsocket/ListenSocket.cpp new file mode 100644 index 000000000..c05a4d701 --- /dev/null +++ b/ipc/unixsocket/ListenSocket.cpp @@ -0,0 +1,432 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "ListenSocket.h" +#include <fcntl.h> +#include "ConnectionOrientedSocket.h" +#include "DataSocket.h" +#include "ListenSocketConsumer.h" +#include "mozilla/DebugOnly.h" +#include "mozilla/RefPtr.h" +#include "mozilla/Unused.h" +#include "nsISupportsImpl.h" // for MOZ_COUNT_CTOR, MOZ_COUNT_DTOR +#include "nsXULAppAPI.h" +#include "UnixSocketConnector.h" + +namespace mozilla { +namespace ipc { + +// +// ListenSocketIO +// + +class ListenSocketIO final + : public UnixSocketWatcher + , public SocketIOBase +{ +public: + class ListenTask; + + ListenSocketIO(MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + ListenSocket* aListenSocket, + UnixSocketConnector* aConnector); + ~ListenSocketIO(); + + UnixSocketConnector* GetConnector() const; + + // Task callback methods + // + + /** + * Run bind/listen to prepare for further runs of accept() + */ + void Listen(ConnectionOrientedSocketIO* aCOSocketIO); + + // I/O callback methods + // + + void OnConnected() override; + void OnError(const char* aFunction, int aErrno) override; + void OnListening() override; + void OnSocketCanAcceptWithoutBlocking() override; + + // Methods for |SocketIOBase| + // + + SocketBase* GetSocketBase() override; + + bool IsShutdownOnConsumerThread() const override; + bool IsShutdownOnIOThread() const override; + + void ShutdownOnConsumerThread() override; + void ShutdownOnIOThread() override; + +private: + void FireSocketError(); + + /** + * Consumer pointer. Non-thread-safe pointer, so should only be manipulated + * directly from consumer thread. All non-consumer-thread accesses should + * happen with mIO as container. + */ + ListenSocket* mListenSocket; + + /** + * Connector object used to create the connection we are currently using. + */ + UniquePtr<UnixSocketConnector> mConnector; + + /** + * If true, do not requeue whatever task we're running + */ + bool mShuttingDownOnIOThread; + + /** + * Number of valid bytes in |mAddress| + */ + socklen_t mAddressLength; + + /** + * Address structure of the socket currently in use + */ + struct sockaddr_storage mAddress; + + ConnectionOrientedSocketIO* mCOSocketIO; +}; + +ListenSocketIO::ListenSocketIO(MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + ListenSocket* aListenSocket, + UnixSocketConnector* aConnector) + : UnixSocketWatcher(aIOLoop) + , SocketIOBase(aConsumerLoop) + , mListenSocket(aListenSocket) + , mConnector(aConnector) + , mShuttingDownOnIOThread(false) + , mAddressLength(0) + , mCOSocketIO(nullptr) +{ + MOZ_ASSERT(mListenSocket); + MOZ_ASSERT(mConnector); + + MOZ_COUNT_CTOR_INHERITED(ListenSocketIO, SocketIOBase); +} + +ListenSocketIO::~ListenSocketIO() +{ + MOZ_ASSERT(IsConsumerThread()); + MOZ_ASSERT(IsShutdownOnConsumerThread()); + + MOZ_COUNT_DTOR_INHERITED(ListenSocketIO, SocketIOBase); +} + +UnixSocketConnector* +ListenSocketIO::GetConnector() const +{ + return mConnector.get(); +} + +void +ListenSocketIO::Listen(ConnectionOrientedSocketIO* aCOSocketIO) +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(mConnector); + MOZ_ASSERT(aCOSocketIO); + + struct sockaddr* address = reinterpret_cast<struct sockaddr*>(&mAddress); + mAddressLength = sizeof(mAddress); + + if (!IsOpen()) { + int fd; + nsresult rv = mConnector->CreateListenSocket(address, &mAddressLength, + fd); + if (NS_FAILED(rv)) { + FireSocketError(); + return; + } + SetFd(fd); + } + + mCOSocketIO = aCOSocketIO; + + // calls OnListening on success, or OnError otherwise + DebugOnly<nsresult> rv = UnixSocketWatcher::Listen(address, mAddressLength); + NS_WARNING_ASSERTION(NS_SUCCEEDED(rv), "Listen failed"); +} + +void +ListenSocketIO::OnConnected() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + + NS_NOTREACHED("Invalid call to |ListenSocketIO::OnConnected|"); +} + +void +ListenSocketIO::OnListening() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); + + AddWatchers(READ_WATCHER, true); + + /* We signal a successful 'connection' to a local address for listening. */ + GetConsumerThread()->PostTask( + MakeAndAddRef<SocketEventTask>(this, SocketEventTask::CONNECT_SUCCESS)); +} + +void +ListenSocketIO::OnError(const char* aFunction, int aErrno) +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + + UnixFdWatcher::OnError(aFunction, aErrno); + FireSocketError(); +} + +void +ListenSocketIO::FireSocketError() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + + // Clean up watchers, statuses, fds + Close(); + + // Tell the consumer thread we've errored + GetConsumerThread()->PostTask( + MakeAndAddRef<SocketEventTask>(this, SocketEventTask::CONNECT_ERROR)); +} + +void +ListenSocketIO::OnSocketCanAcceptWithoutBlocking() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); + MOZ_ASSERT(mCOSocketIO); + + RemoveWatchers(READ_WATCHER|WRITE_WATCHER); + + struct sockaddr_storage storage; + socklen_t addressLength = sizeof(storage); + + int fd; + nsresult rv = mConnector->AcceptStreamSocket( + GetFd(), + reinterpret_cast<struct sockaddr*>(&storage), &addressLength, + fd); + if (NS_FAILED(rv)) { + FireSocketError(); + return; + } + + mCOSocketIO->Accept(fd, + reinterpret_cast<struct sockaddr*>(&storage), + addressLength); +} + +// |SocketIOBase| + +SocketBase* +ListenSocketIO::GetSocketBase() +{ + return mListenSocket; +} + +bool +ListenSocketIO::IsShutdownOnConsumerThread() const +{ + MOZ_ASSERT(IsConsumerThread()); + + return mListenSocket == nullptr; +} + +bool +ListenSocketIO::IsShutdownOnIOThread() const +{ + return mShuttingDownOnIOThread; +} + +void +ListenSocketIO::ShutdownOnConsumerThread() +{ + MOZ_ASSERT(IsConsumerThread()); + MOZ_ASSERT(!IsShutdownOnConsumerThread()); + + mListenSocket = nullptr; +} + +void +ListenSocketIO::ShutdownOnIOThread() +{ + MOZ_ASSERT(!IsConsumerThread()); + MOZ_ASSERT(!mShuttingDownOnIOThread); + + Close(); // will also remove fd from I/O loop + mShuttingDownOnIOThread = true; +} + +// +// Socket tasks +// + +class ListenSocketIO::ListenTask final : public SocketIOTask<ListenSocketIO> +{ +public: + ListenTask(ListenSocketIO* aIO, ConnectionOrientedSocketIO* aCOSocketIO) + : SocketIOTask<ListenSocketIO>(aIO) + , mCOSocketIO(aCOSocketIO) + { + MOZ_ASSERT(mCOSocketIO); + + MOZ_COUNT_CTOR(ListenTask); + } + + ~ListenTask() + { + MOZ_COUNT_DTOR(ListenTask); + } + + NS_IMETHOD Run() override + { + MOZ_ASSERT(!GetIO()->IsConsumerThread()); + + if (!IsCanceled()) { + GetIO()->Listen(mCOSocketIO); + } + return NS_OK; + } + +private: + ConnectionOrientedSocketIO* mCOSocketIO; +}; + +// +// ListenSocket +// + +ListenSocket::ListenSocket(ListenSocketConsumer* aConsumer, int aIndex) + : mIO(nullptr) + , mConsumer(aConsumer) + , mIndex(aIndex) +{ + MOZ_ASSERT(mConsumer); + + MOZ_COUNT_CTOR_INHERITED(ListenSocket, SocketBase); +} + +ListenSocket::~ListenSocket() +{ + MOZ_ASSERT(!mIO); + + MOZ_COUNT_DTOR_INHERITED(ListenSocket, SocketBase); +} + +nsresult +ListenSocket::Listen(UnixSocketConnector* aConnector, + MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + ConnectionOrientedSocket* aCOSocket) +{ + MOZ_ASSERT(!mIO); + + mIO = new ListenSocketIO(aConsumerLoop, aIOLoop, this, aConnector); + + // Prepared I/O object, now start listening. + nsresult rv = Listen(aCOSocket); + if (NS_FAILED(rv)) { + delete mIO; + mIO = nullptr; + return rv; + } + + return NS_OK; +} + +nsresult +ListenSocket::Listen(UnixSocketConnector* aConnector, + ConnectionOrientedSocket* aCOSocket) +{ + return Listen(aConnector, MessageLoop::current(), XRE_GetIOMessageLoop(), + aCOSocket); +} + +nsresult +ListenSocket::Listen(ConnectionOrientedSocket* aCOSocket) +{ + MOZ_ASSERT(aCOSocket); + MOZ_ASSERT(mIO); + + // We first prepare the connection-oriented socket with a + // socket connector and a socket I/O class. + + UniquePtr<UnixSocketConnector> connector; + nsresult rv = mIO->GetConnector()->Duplicate(connector); + if (NS_FAILED(rv)) { + return rv; + } + + ConnectionOrientedSocketIO* io; + rv = aCOSocket->PrepareAccept(connector.get(), + mIO->GetConsumerThread(), mIO->GetIOLoop(), + io); + if (NS_FAILED(rv)) { + return rv; + } + + Unused << connector.release(); // now owned by |io| + + // Then we start listening for connection requests. + + SetConnectionStatus(SOCKET_LISTENING); + + mIO->GetIOLoop()->PostTask( + MakeAndAddRef<ListenSocketIO::ListenTask>(mIO, io)); + + return NS_OK; +} + +// |SocketBase| + +void +ListenSocket::Close() +{ + if (!mIO) { + return; + } + + MOZ_ASSERT(mIO->IsConsumerThread()); + + // From this point on, we consider mIO as being deleted. We sever + // the relationship here so any future calls to listen or connect + // will create a new implementation. + mIO->ShutdownOnConsumerThread(); + mIO->GetIOLoop()->PostTask(MakeAndAddRef<SocketIOShutdownTask>(mIO)); + mIO = nullptr; + + NotifyDisconnect(); +} + +void +ListenSocket::OnConnectSuccess() +{ + mConsumer->OnConnectSuccess(mIndex); +} + +void +ListenSocket::OnConnectError() +{ + mConsumer->OnConnectError(mIndex); +} + +void +ListenSocket::OnDisconnect() +{ + mConsumer->OnDisconnect(mIndex); +} + +} // namespace ipc +} // namespace mozilla diff --git a/ipc/unixsocket/ListenSocket.h b/ipc/unixsocket/ListenSocket.h new file mode 100644 index 000000000..9bde4602d --- /dev/null +++ b/ipc/unixsocket/ListenSocket.h @@ -0,0 +1,93 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef mozilla_ipc_ListenSocket_h +#define mozilla_ipc_ListenSocket_h + +#include "mozilla/ipc/SocketBase.h" +#include "nsString.h" + +class MessageLoop; + +namespace mozilla { +namespace ipc { + +class ConnectionOrientedSocket; +class ListenSocketConsumer; +class ListenSocketIO; +class UnixSocketConnector; + +class ListenSocket final : public SocketBase +{ +public: + /** + * Constructs an instance of |ListenSocket|. + * + * @param aConsumer The consumer for the socket. + * @param aIndex An arbitrary index. + */ + ListenSocket(ListenSocketConsumer* aConsumer, int aIndex); + + /** + * Starts a task on the socket that will try to accept a new connection + * in a non-blocking manner. + * + * @param aConnector Connector object for socket-type-specific functions + * @param aConsumerLoop The socket's consumer thread. + * @param aIOLoop The socket's I/O thread. + * @param aCOSocket The connection-oriented socket for handling the + * accepted connection. + * @return NS_OK on success, or an XPCOM error code otherwise. + */ + nsresult Listen(UnixSocketConnector* aConnector, + MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + ConnectionOrientedSocket* aCOSocket); + + /** + * Starts a task on the socket that will try to accept a new connection + * in a non-blocking manner. + * + * @param aConnector Connector object for socket-type-specific functions + * @param aCOSocket The connection-oriented socket for handling the + * accepted connection. + * @return NS_OK on success, or an XPCOM error code otherwise. + */ + nsresult Listen(UnixSocketConnector* aConnector, + ConnectionOrientedSocket* aCOSocket); + + /** + * Starts a task on the socket that will try to accept a new connection + * in a non-blocking manner. This method re-uses a previously created + * listen socket. + * + * @param aCOSocket The connection-oriented socket for handling the + * accepted connection. + * @return NS_OK on success, or an XPCOM error code otherwise. + */ + nsresult Listen(ConnectionOrientedSocket* aCOSocket); + + // Methods for |SocketBase| + // + + void Close() override; + void OnConnectSuccess() override; + void OnConnectError() override; + void OnDisconnect() override; + +protected: + virtual ~ListenSocket(); + +private: + ListenSocketIO* mIO; + ListenSocketConsumer* mConsumer; + int mIndex; +}; + +} // namespace ipc +} // namepsace mozilla + +#endif // mozilla_ipc_ListenSocket_h diff --git a/ipc/unixsocket/ListenSocketConsumer.cpp b/ipc/unixsocket/ListenSocketConsumer.cpp new file mode 100644 index 000000000..f4de0c42e --- /dev/null +++ b/ipc/unixsocket/ListenSocketConsumer.cpp @@ -0,0 +1,20 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "ListenSocketConsumer.h" + +namespace mozilla { +namespace ipc { + +// +// ListenSocketConsumer +// + +ListenSocketConsumer::~ListenSocketConsumer() +{ } + +} +} diff --git a/ipc/unixsocket/ListenSocketConsumer.h b/ipc/unixsocket/ListenSocketConsumer.h new file mode 100644 index 000000000..8c9af3019 --- /dev/null +++ b/ipc/unixsocket/ListenSocketConsumer.h @@ -0,0 +1,46 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef mozilla_ipc_listensocketconsumer_h +#define mozilla_ipc_listensocketconsumer_h + +namespace mozilla { +namespace ipc { + +/** + * |ListenSocketConsumer| handles socket events. + */ +class ListenSocketConsumer +{ +public: + virtual ~ListenSocketConsumer(); + + /** + * Callback for socket success. Consumer-thread only. + * + * @param aIndex The index that has been given to the listening socket. + */ + virtual void OnConnectSuccess(int aIndex) = 0; + + /** + * Callback for socket errors. Consumer-thread only. + * + * @param aIndex The index that has been given to the listening socket. + */ + virtual void OnConnectError(int aIndex) = 0; + + /** + * Callback for socket disconnect. Consumer-thread only. + * + * @param aIndex The index that has been given to the listeing socket. + */ + virtual void OnDisconnect(int aIndex) = 0; +}; + +} +} + +#endif diff --git a/ipc/unixsocket/SocketBase.cpp b/ipc/unixsocket/SocketBase.cpp new file mode 100644 index 000000000..b11729652 --- /dev/null +++ b/ipc/unixsocket/SocketBase.cpp @@ -0,0 +1,449 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ + +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "SocketBase.h" +#include <errno.h> +#include <string.h> +#include <unistd.h> +#include "nsISupportsImpl.h" // for MOZ_COUNT_CTOR, MOZ_COUNT_DTOR + +namespace mozilla { +namespace ipc { + +// +// UnixSocketIOBuffer +// + +UnixSocketBuffer::UnixSocketBuffer() + : mSize(0) + , mOffset(0) + , mAvailableSpace(0) + , mData(nullptr) +{ + MOZ_COUNT_CTOR(UnixSocketBuffer); +} + +UnixSocketBuffer::~UnixSocketBuffer() +{ + MOZ_COUNT_DTOR(UnixSocketBuffer); + + // Make sure that the caller released the buffer's memory. + MOZ_ASSERT(!GetBuffer()); +} + +const uint8_t* +UnixSocketBuffer::Consume(size_t aLen) +{ + if (NS_WARN_IF(GetSize() < aLen)) { + return nullptr; + } + uint8_t* data = mData + mOffset; + mOffset += aLen; + return data; +} + +nsresult +UnixSocketBuffer::Read(void* aValue, size_t aLen) +{ + const uint8_t* data = Consume(aLen); + if (!data) { + return NS_ERROR_OUT_OF_MEMORY; + } + memcpy(aValue, data, aLen); + return NS_OK; +} + +uint8_t* +UnixSocketBuffer::Append(size_t aLen) +{ + if (((mAvailableSpace - mSize) < aLen)) { + size_t availableSpace = mAvailableSpace + std::max(mAvailableSpace, aLen); + uint8_t* data = new uint8_t[availableSpace]; + memcpy(data, mData, mSize); + mData = data; + mAvailableSpace = availableSpace; + } + uint8_t* data = mData + mSize; + mSize += aLen; + return data; +} + +nsresult +UnixSocketBuffer::Write(const void* aValue, size_t aLen) +{ + uint8_t* data = Append(aLen); + if (!data) { + return NS_ERROR_OUT_OF_MEMORY; + } + memcpy(data, aValue, aLen); + return NS_OK; +} + +void +UnixSocketBuffer::CleanupLeadingSpace() +{ + if (GetLeadingSpace()) { + if (GetSize() <= GetLeadingSpace()) { + memcpy(mData, GetData(), GetSize()); + } else { + memmove(mData, GetData(), GetSize()); + } + mOffset = 0; + } +} + +// +// UnixSocketIOBuffer +// + +UnixSocketIOBuffer::UnixSocketIOBuffer() +{ + MOZ_COUNT_CTOR_INHERITED(UnixSocketIOBuffer, UnixSocketBuffer); +} + +UnixSocketIOBuffer::~UnixSocketIOBuffer() +{ + MOZ_COUNT_DTOR_INHERITED(UnixSocketIOBuffer, UnixSocketBuffer); +} + +// +// UnixSocketRawData +// + +UnixSocketRawData::UnixSocketRawData(const void* aData, size_t aSize) +{ + MOZ_ASSERT(aData || !aSize); + + MOZ_COUNT_CTOR_INHERITED(UnixSocketRawData, UnixSocketIOBuffer); + + ResetBuffer(static_cast<uint8_t*>(memcpy(new uint8_t[aSize], aData, aSize)), + 0, aSize, aSize); +} + +UnixSocketRawData::UnixSocketRawData(UniquePtr<uint8_t[]> aData, size_t aSize) +{ + MOZ_ASSERT(aData || !aSize); + + MOZ_COUNT_CTOR_INHERITED(UnixSocketRawData, UnixSocketIOBuffer); + + ResetBuffer(aData.release(), 0, aSize, aSize); +} + +UnixSocketRawData::UnixSocketRawData(size_t aSize) +{ + MOZ_COUNT_CTOR_INHERITED(UnixSocketRawData, UnixSocketIOBuffer); + + ResetBuffer(new uint8_t[aSize], 0, 0, aSize); +} + +UnixSocketRawData::~UnixSocketRawData() +{ + MOZ_COUNT_DTOR_INHERITED(UnixSocketRawData, UnixSocketIOBuffer); + + UniquePtr<uint8_t[]> data(GetBuffer()); + ResetBuffer(nullptr, 0, 0, 0); +} + +ssize_t +UnixSocketRawData::Receive(int aFd) +{ + if (!GetTrailingSpace()) { + if (!GetLeadingSpace()) { + return -1; /* buffer is full */ + } + /* free up space at the end of data buffer */ + CleanupLeadingSpace(); + } + + ssize_t res = + TEMP_FAILURE_RETRY(read(aFd, GetTrailingBytes(), GetTrailingSpace())); + + if (res < 0) { + /* I/O error */ + return -1; + } else if (!res) { + /* EOF or peer shutdown sending */ + return 0; + } + + Append(res); /* mark read data as 'valid' */ + + return res; +} + +ssize_t +UnixSocketRawData::Send(int aFd) +{ + if (!GetSize()) { + return 0; + } + + ssize_t res = TEMP_FAILURE_RETRY(write(aFd, GetData(), GetSize())); + + if (res < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return 0; /* socket is blocked; try again later */ + } + return -1; + } else if (!res) { + /* nothing written */ + return 0; + } + + Consume(res); + + return res; +} + +// +// SocketBase +// + +SocketConnectionStatus +SocketBase::GetConnectionStatus() const +{ + return mConnectionStatus; +} + +int +SocketBase::GetSuggestedConnectDelayMs() const +{ + return mConnectDelayMs; +} + +void +SocketBase::NotifySuccess() +{ + mConnectionStatus = SOCKET_CONNECTED; + mConnectTimestamp = PR_IntervalNow(); + OnConnectSuccess(); +} + +void +SocketBase::NotifyError() +{ + mConnectionStatus = SOCKET_DISCONNECTED; + mConnectDelayMs = CalculateConnectDelayMs(); + mConnectTimestamp = 0; + OnConnectError(); +} + +void +SocketBase::NotifyDisconnect() +{ + mConnectionStatus = SOCKET_DISCONNECTED; + mConnectDelayMs = CalculateConnectDelayMs(); + mConnectTimestamp = 0; + OnDisconnect(); +} + +uint32_t +SocketBase::CalculateConnectDelayMs() const +{ + uint32_t connectDelayMs = mConnectDelayMs; + + if (mConnectTimestamp && (PR_IntervalNow()-mConnectTimestamp) > connectDelayMs) { + // reset delay if connection has been opened for a while, or... + connectDelayMs = 0; + } else if (!connectDelayMs) { + // ...start with a delay of ~1 sec, or... + connectDelayMs = 1<<10; + } else if (connectDelayMs < (1<<16)) { + // ...otherwise increase delay by a factor of 2 + connectDelayMs <<= 1; + } + return connectDelayMs; +} + +SocketBase::SocketBase() +: mConnectionStatus(SOCKET_DISCONNECTED) +, mConnectTimestamp(0) +, mConnectDelayMs(0) +{ + MOZ_COUNT_CTOR(SocketBase); +} + +SocketBase::~SocketBase() +{ + MOZ_ASSERT(mConnectionStatus == SOCKET_DISCONNECTED); + + MOZ_COUNT_DTOR(SocketBase); +} + +void +SocketBase::SetConnectionStatus(SocketConnectionStatus aConnectionStatus) +{ + mConnectionStatus = aConnectionStatus; +} + +// +// SocketIOBase +// + +SocketIOBase::SocketIOBase(MessageLoop* aConsumerLoop) + : mConsumerLoop(aConsumerLoop) +{ + MOZ_ASSERT(mConsumerLoop); + + MOZ_COUNT_CTOR(SocketIOBase); +} + +SocketIOBase::~SocketIOBase() +{ + MOZ_COUNT_DTOR(SocketIOBase); +} + +MessageLoop* +SocketIOBase::GetConsumerThread() const +{ + return mConsumerLoop; +} + +bool +SocketIOBase::IsConsumerThread() const +{ + return GetConsumerThread() == MessageLoop::current(); +} + +// +// SocketEventTask +// + +SocketEventTask::SocketEventTask(SocketIOBase* aIO, SocketEvent aEvent) + : SocketTask<SocketIOBase>(aIO) + , mEvent(aEvent) +{ + MOZ_COUNT_CTOR(SocketEventTask); +} + +SocketEventTask::~SocketEventTask() +{ + MOZ_COUNT_DTOR(SocketEventTask); +} + +NS_IMETHODIMP +SocketEventTask::Run() +{ + SocketIOBase* io = SocketTask<SocketIOBase>::GetIO(); + + MOZ_ASSERT(io->IsConsumerThread()); + + if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) { + // Since we've already explicitly closed and the close + // happened before this, this isn't really an error. + return NS_OK; + } + + SocketBase* socketBase = io->GetSocketBase(); + MOZ_ASSERT(socketBase); + + if (mEvent == CONNECT_SUCCESS) { + socketBase->NotifySuccess(); + } else if (mEvent == CONNECT_ERROR) { + socketBase->NotifyError(); + } else if (mEvent == DISCONNECT) { + socketBase->NotifyDisconnect(); + } + + return NS_OK; +} + +// +// SocketRequestClosingTask +// + +SocketRequestClosingTask::SocketRequestClosingTask(SocketIOBase* aIO) + : SocketTask<SocketIOBase>(aIO) +{ + MOZ_COUNT_CTOR(SocketRequestClosingTask); +} + +SocketRequestClosingTask::~SocketRequestClosingTask() +{ + MOZ_COUNT_DTOR(SocketRequestClosingTask); +} + +NS_IMETHODIMP +SocketRequestClosingTask::Run() +{ + SocketIOBase* io = SocketTask<SocketIOBase>::GetIO(); + + MOZ_ASSERT(io->IsConsumerThread()); + + if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) { + // Since we've already explicitly closed and the close + // happened before this, this isn't really an error. + return NS_OK; + } + + SocketBase* socketBase = io->GetSocketBase(); + MOZ_ASSERT(socketBase); + + socketBase->Close(); + + return NS_OK; +} + +// +// SocketDeleteInstanceTask +// + +SocketDeleteInstanceTask::SocketDeleteInstanceTask(SocketIOBase* aIO) + : mIO(aIO) +{ + MOZ_COUNT_CTOR(SocketDeleteInstanceTask); +} + +SocketDeleteInstanceTask::~SocketDeleteInstanceTask() +{ + MOZ_COUNT_DTOR(SocketDeleteInstanceTask); +} + +NS_IMETHODIMP +SocketDeleteInstanceTask::Run() +{ + mIO.reset(); // delete instance + return NS_OK; +} + +// +// SocketIOShutdownTask +// + +SocketIOShutdownTask::SocketIOShutdownTask(SocketIOBase* aIO) + : SocketIOTask<SocketIOBase>(aIO) +{ + MOZ_COUNT_CTOR(SocketIOShutdownTask); +} + +SocketIOShutdownTask::~SocketIOShutdownTask() +{ + MOZ_COUNT_DTOR(SocketIOShutdownTask); +} + +NS_IMETHODIMP +SocketIOShutdownTask::Run() +{ + SocketIOBase* io = SocketIOTask<SocketIOBase>::GetIO(); + + MOZ_ASSERT(!io->IsConsumerThread()); + MOZ_ASSERT(!io->IsShutdownOnIOThread()); + + // At this point, there should be no new events on the I/O thread + // after this one with the possible exception of an accept task, + // which ShutdownOnIOThread will cancel for us. We are now fully + // shut down, so we can send a message to the consumer thread to + // delete |io| safely knowing that it's not reference any longer. + io->ShutdownOnIOThread(); + io->GetConsumerThread()->PostTask( + MakeAndAddRef<SocketDeleteInstanceTask>(io)); + return NS_OK; +} + +} +} diff --git a/ipc/unixsocket/SocketBase.h b/ipc/unixsocket/SocketBase.h new file mode 100644 index 000000000..191567fdb --- /dev/null +++ b/ipc/unixsocket/SocketBase.h @@ -0,0 +1,585 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ + +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#ifndef mozilla_ipc_SocketBase_h +#define mozilla_ipc_SocketBase_h + +#include "base/message_loop.h" +#include "mozilla/UniquePtr.h" + +namespace mozilla { +namespace ipc { + +// +// UnixSocketBuffer +// + +/** + * |UnixSocketBuffer| implements a FIFO buffer that stores raw socket + * data, either for sending on a socket or received from a socket. + */ +class UnixSocketBuffer +{ +public: + virtual ~UnixSocketBuffer(); + + const uint8_t* GetData() const + { + return mData + mOffset; + } + + size_t GetSize() const + { + return mSize - mOffset; + } + + const uint8_t* Consume(size_t aLen); + + nsresult Read(void* aValue, size_t aLen); + + nsresult Read(int8_t& aValue) + { + return Read(&aValue, sizeof(aValue)); + } + + nsresult Read(uint8_t& aValue) + { + return Read(&aValue, sizeof(aValue)); + } + + nsresult Read(int16_t& aValue) + { + return Read(&aValue, sizeof(aValue)); + } + + nsresult Read(uint16_t& aValue) + { + return Read(&aValue, sizeof(aValue)); + } + + nsresult Read(int32_t& aValue) + { + return Read(&aValue, sizeof(aValue)); + } + + nsresult Read(uint32_t& aValue) + { + return Read(&aValue, sizeof(aValue)); + } + + nsresult Read(int64_t& aValue) + { + return Read(&aValue, sizeof(aValue)); + } + + nsresult Read(uint64_t& aValue) + { + return Read(&aValue, sizeof(aValue)); + } + + nsresult Read(float& aValue) + { + return Read(&aValue, sizeof(aValue)); + } + + nsresult Read(double& aValue) + { + return Read(&aValue, sizeof(aValue)); + } + + uint8_t* Append(size_t aLen); + + nsresult Write(const void* aValue, size_t aLen); + + nsresult Write(int8_t aValue) + { + return Write(&aValue, sizeof(aValue)); + } + + nsresult Write(uint8_t aValue) + { + return Write(&aValue, sizeof(aValue)); + } + + nsresult Write(int16_t aValue) + { + return Write(&aValue, sizeof(aValue)); + } + + nsresult Write(uint16_t aValue) + { + return Write(&aValue, sizeof(aValue)); + } + + nsresult Write(int32_t aValue) + { + return Write(&aValue, sizeof(aValue)); + } + + nsresult Write(uint32_t aValue) + { + return Write(&aValue, sizeof(aValue)); + } + + nsresult Write(int64_t aValue) + { + return Write(&aValue, sizeof(aValue)); + } + + nsresult Write(uint64_t aValue) + { + return Write(&aValue, sizeof(aValue)); + } + + nsresult Write(float aValue) + { + return Write(&aValue, sizeof(aValue)); + } + + nsresult Write(double aValue) + { + return Write(&aValue, sizeof(aValue)); + } + +protected: + UnixSocketBuffer(); + + /** + * Sets the raw memory. The caller is responsible for freeing + * this memory. + * + * @param aData A pointer to the buffer's raw memory. + * @param aOffset The start of valid bytes in |aData|. + * @param aSize The number of valid bytes in |aData|. + * @param aAvailableSpace The number of bytes in |aData|. + */ + void ResetBuffer(uint8_t* aData, + size_t aOffset, size_t aSize, size_t aAvailableSpace) + { + MOZ_ASSERT(aData || !aAvailableSpace); + MOZ_ASSERT((aOffset + aSize) <= aAvailableSpace); + + mOffset = aOffset; + mSize = aSize; + mAvailableSpace = aAvailableSpace; + mData = aData; + } + + /** + * Retrieves the memory buffer. + * + * @return A pointer to the buffer's raw memory. + */ + uint8_t* GetBuffer() + { + return mData; + } + + size_t GetLeadingSpace() const + { + return mOffset; + } + + size_t GetTrailingSpace() const + { + return mAvailableSpace - mSize; + } + + size_t GetAvailableSpace() const + { + return mAvailableSpace; + } + + void* GetTrailingBytes() + { + return mData + mSize; + } + + uint8_t* GetData(size_t aOffset) + { + MOZ_ASSERT(aOffset <= mSize); + + return mData + aOffset; + } + + void SetRange(size_t aOffset, size_t aSize) + { + MOZ_ASSERT((aOffset + aSize) <= mAvailableSpace); + + mOffset = aOffset; + mSize = mOffset + aSize; + } + + void CleanupLeadingSpace(); + +private: + size_t mSize; + size_t mOffset; + size_t mAvailableSpace; + uint8_t* mData; +}; + +// +// UnixSocketIOBuffer +// + +/** + * |UnixSocketIOBuffer| is a |UnixSocketBuffer| that supports being + * received on a socket or being send on a socket. Network protocols + * might differ in their exact usage of Unix socket functions and + * |UnixSocketIOBuffer| provides a protocol-neutral interface. + */ +class UnixSocketIOBuffer : public UnixSocketBuffer +{ +public: + UnixSocketIOBuffer(); + virtual ~UnixSocketIOBuffer(); + + /** + * Receives data from aFd at the end of the buffer. The returned value + * is the number of newly received bytes, or 0 if the peer shut down + * its connection, or a negative value on errors. + */ + virtual ssize_t Receive(int aFd) = 0; + + /** + * Sends data to aFd from the beginning of the buffer. The returned value + * is the number of bytes written, or a negative value on error. + */ + virtual ssize_t Send(int aFd) = 0; +}; + +// +// UnixSocketRawData +// + +class UnixSocketRawData final : public UnixSocketIOBuffer +{ +public: + /** + * This constructor copies aData of aSize bytes length into the + * new instance of |UnixSocketRawData|. + * + * @param aData The buffer to copy. + * @param aSize The number of bytes in |aData|. + */ + UnixSocketRawData(const void* aData, size_t aSize); + + /** + * This constructor takes ownership of the data in aData. The + * data is assumed to be aSize bytes in length. + * + * @param aData The buffer to take ownership of. + * @param aSize The number of bytes in |aData|. + */ + UnixSocketRawData(UniquePtr<uint8_t[]> aData, size_t aSize); + + /** + * This constructor reserves aSize bytes of space. Currently + * it's only possible to fill this buffer by calling |Receive|. + * + * @param aSize The number of bytes to allocate. + */ + UnixSocketRawData(size_t aSize); + + /** + * The destructor releases the buffer's raw memory. + */ + ~UnixSocketRawData(); + + /** + * Receives data from aFd at the end of the buffer. The returned value + * is the number of newly received bytes, or 0 if the peer shut down + * its connection, or a negative value on errors. + */ + ssize_t Receive(int aFd) override; + + /** + * Sends data to aFd from the beginning of the buffer. The returned value + * is the number of bytes written, or a negative value on error. + */ + ssize_t Send(int aFd) override; +}; + +enum SocketConnectionStatus { + SOCKET_DISCONNECTED = 0, + SOCKET_LISTENING = 1, + SOCKET_CONNECTING = 2, + SOCKET_CONNECTED = 3 +}; + +// +// SocketBase +// + +class SocketBase +{ +public: + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(SocketBase) + + SocketConnectionStatus GetConnectionStatus() const; + + int GetSuggestedConnectDelayMs() const; + + /** + * Queues the internal representation of socket for deletion. Can be called + * from consumer thread. + */ + virtual void Close() = 0; + + /** + * Callback for socket connect/accept success. Called after connect/accept has + * finished. Will be run on consumer thread before any reads take place. + */ + virtual void OnConnectSuccess() = 0; + + /** + * Callback for socket connect/accept error. Will be run on consumer thread. + */ + virtual void OnConnectError() = 0; + + /** + * Callback for socket disconnect. Will be run on consumer thread. + */ + virtual void OnDisconnect() = 0; + + /** + * Called by implementation to notify consumer of success. + */ + void NotifySuccess(); + + /** + * Called by implementation to notify consumer of error. + */ + void NotifyError(); + + /** + * Called by implementation to notify consumer of disconnect. + */ + void NotifyDisconnect(); + +protected: + SocketBase(); + virtual ~SocketBase(); + + void SetConnectionStatus(SocketConnectionStatus aConnectionStatus); + +private: + uint32_t CalculateConnectDelayMs() const; + + SocketConnectionStatus mConnectionStatus; + PRIntervalTime mConnectTimestamp; + uint32_t mConnectDelayMs; +}; + +// +// SocketIOBase +// + +/** + * |SocketIOBase| is a base class for Socket I/O classes that + * perform operations on the I/O thread. + */ +class SocketIOBase +{ +public: + virtual ~SocketIOBase(); + + /** + * Implemented by socket I/O classes to return the current instance of + * |SocketBase|. + * + * @return The current instance of |SocketBase| + */ + virtual SocketBase* GetSocketBase() = 0; + + /** + * Implemented by socket I/O classes to signal that the socket I/O class has + * been shut down. + * + * @return True if the socket I/O class has been shut down, false otherwise. + */ + virtual bool IsShutdownOnIOThread() const = 0; + + /** + * Implemented by socket I/O classes to signal that socket class has + * been shut down. + * + * @return True if the socket class has been shut down, false otherwise. + */ + virtual bool IsShutdownOnConsumerThread() const = 0; + + /** + * Signals to the socket I/O classes that it has been shut down. + */ + virtual void ShutdownOnIOThread() = 0; + + /** + * Signals to the socket I/O classes that the socket class has been + * shut down. + */ + virtual void ShutdownOnConsumerThread() = 0; + + /** + * Returns the consumer thread. + * + * @return A pointer to the consumer thread. + */ + MessageLoop* GetConsumerThread() const; + + /** + * @return True if the current thread is the consumer thread, or false + * otherwise. + */ + bool IsConsumerThread() const; + +protected: + SocketIOBase(MessageLoop* aConsumerLoop); + +private: + MessageLoop* mConsumerLoop; +}; + +// +// Socket tasks +// + +/* |SocketTask| is a task for sending a message from + * the I/O thread to the consumer thread. + */ +template <typename T> +class SocketTask : public CancelableRunnable +{ +public: + virtual ~SocketTask() + { } + + T* GetIO() const + { + return mIO; + } + +protected: + SocketTask(T* aIO) + : mIO(aIO) + { + MOZ_ASSERT(aIO); + } + +private: + T* mIO; +}; + +/** + * |SocketEventTask| reports the connection state on the + * I/O thread back to the consumer thread. + */ +class SocketEventTask final : public SocketTask<SocketIOBase> +{ +public: + enum SocketEvent { + CONNECT_SUCCESS, + CONNECT_ERROR, + DISCONNECT + }; + + SocketEventTask(SocketIOBase* aIO, SocketEvent aEvent); + ~SocketEventTask(); + + NS_IMETHOD Run() override; + +private: + SocketEvent mEvent; +}; + +/** + * |SocketRequestClosingTask| closes an instance of |SocketBase| + * on the consumer thread. + */ +class SocketRequestClosingTask final : public SocketTask<SocketIOBase> +{ +public: + SocketRequestClosingTask(SocketIOBase* aIO); + ~SocketRequestClosingTask(); + + NS_IMETHOD Run() override; +}; + +/** + * |SocketDeleteInstanceTask| deletes an object on the consumer thread. + */ +class SocketDeleteInstanceTask final : public Runnable +{ +public: + SocketDeleteInstanceTask(SocketIOBase* aIO); + ~SocketDeleteInstanceTask(); + + NS_IMETHOD Run() override; + +private: + UniquePtr<SocketIOBase> mIO; +}; + +// +// Socket I/O tasks +// + +/* |SocketIOTask| holds a reference to a Socket I/O object. It's + * supposed to run on the I/O thread. + */ +template<typename Tio> +class SocketIOTask : public CancelableRunnable +{ +public: + virtual ~SocketIOTask() + { } + + Tio* GetIO() const + { + return mIO; + } + + nsresult Cancel() override + { + mIO = nullptr; + return NS_OK; + } + + bool IsCanceled() const + { + return !mIO; + } + +protected: + SocketIOTask(Tio* aIO) + : mIO(aIO) + { + MOZ_ASSERT(mIO); + } + +private: + Tio* mIO; +}; + +/** + * |SocketIOShutdownTask| signals shutdown to the socket I/O class on + * the I/O thread and sends it to the consumer thread for destruction. + */ +class SocketIOShutdownTask final : public SocketIOTask<SocketIOBase> +{ +public: + SocketIOShutdownTask(SocketIOBase* aIO); + ~SocketIOShutdownTask(); + + NS_IMETHOD Run() override; +}; + +} +} + +#endif diff --git a/ipc/unixsocket/StreamSocket.cpp b/ipc/unixsocket/StreamSocket.cpp new file mode 100644 index 000000000..04c37e554 --- /dev/null +++ b/ipc/unixsocket/StreamSocket.cpp @@ -0,0 +1,482 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "StreamSocket.h" +#include <fcntl.h> +#include "mozilla/RefPtr.h" +#include "nsISupportsImpl.h" // for MOZ_COUNT_CTOR, MOZ_COUNT_DTOR +#include "nsXULAppAPI.h" +#include "StreamSocketConsumer.h" +#include "UnixSocketConnector.h" + +static const size_t MAX_READ_SIZE = 1 << 16; + +namespace mozilla { +namespace ipc { + +// +// StreamSocketIO +// + +class StreamSocketIO final : public ConnectionOrientedSocketIO +{ +public: + class ConnectTask; + class DelayedConnectTask; + class ReceiveTask; + + StreamSocketIO(MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + StreamSocket* aStreamSocket, + UnixSocketConnector* aConnector); + StreamSocketIO(MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + int aFd, ConnectionStatus aConnectionStatus, + StreamSocket* aStreamSocket, + UnixSocketConnector* aConnector); + ~StreamSocketIO(); + + StreamSocket* GetStreamSocket(); + DataSocket* GetDataSocket(); + + // Delayed-task handling + // + + void SetDelayedConnectTask(CancelableRunnable* aTask); + void ClearDelayedConnectTask(); + void CancelDelayedConnectTask(); + + // Methods for |DataSocket| + // + + nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer) override; + void ConsumeBuffer() override; + void DiscardBuffer() override; + + // Methods for |SocketIOBase| + // + + SocketBase* GetSocketBase() override; + + bool IsShutdownOnConsumerThread() const override; + bool IsShutdownOnIOThread() const override; + + void ShutdownOnConsumerThread() override; + void ShutdownOnIOThread() override; + +private: + /** + * Consumer pointer. Non-thread-safe pointer, so should only be manipulated + * directly from consumer thread. All non-consumer-thread accesses should + * happen with mIO as container. + */ + StreamSocket* mStreamSocket; + + /** + * If true, do not requeue whatever task we're running + */ + bool mShuttingDownOnIOThread; + + /** + * Task member for delayed connect task. Should only be access on consumer + * thread. + */ + CancelableRunnable* mDelayedConnectTask; + + /** + * I/O buffer for received data + */ + UniquePtr<UnixSocketRawData> mBuffer; +}; + +StreamSocketIO::StreamSocketIO(MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + StreamSocket* aStreamSocket, + UnixSocketConnector* aConnector) + : ConnectionOrientedSocketIO(aConsumerLoop, aIOLoop, aConnector) + , mStreamSocket(aStreamSocket) + , mShuttingDownOnIOThread(false) + , mDelayedConnectTask(nullptr) +{ + MOZ_ASSERT(mStreamSocket); + + MOZ_COUNT_CTOR_INHERITED(StreamSocketIO, ConnectionOrientedSocketIO); +} + +StreamSocketIO::StreamSocketIO(MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + int aFd, ConnectionStatus aConnectionStatus, + StreamSocket* aStreamSocket, + UnixSocketConnector* aConnector) + : ConnectionOrientedSocketIO(aConsumerLoop, + aIOLoop, + aFd, + aConnectionStatus, + aConnector) + , mStreamSocket(aStreamSocket) + , mShuttingDownOnIOThread(false) + , mDelayedConnectTask(nullptr) +{ + MOZ_ASSERT(mStreamSocket); + + MOZ_COUNT_CTOR_INHERITED(StreamSocketIO, ConnectionOrientedSocketIO); +} + +StreamSocketIO::~StreamSocketIO() +{ + MOZ_ASSERT(IsConsumerThread()); + MOZ_ASSERT(IsShutdownOnConsumerThread()); + + MOZ_COUNT_DTOR_INHERITED(StreamSocketIO, ConnectionOrientedSocketIO); +} + +StreamSocket* +StreamSocketIO::GetStreamSocket() +{ + return mStreamSocket; +} + +DataSocket* +StreamSocketIO::GetDataSocket() +{ + return GetStreamSocket(); +} + +void +StreamSocketIO::SetDelayedConnectTask(CancelableRunnable* aTask) +{ + MOZ_ASSERT(IsConsumerThread()); + + mDelayedConnectTask = aTask; +} + +void +StreamSocketIO::ClearDelayedConnectTask() +{ + MOZ_ASSERT(IsConsumerThread()); + + mDelayedConnectTask = nullptr; +} + +void +StreamSocketIO::CancelDelayedConnectTask() +{ + MOZ_ASSERT(IsConsumerThread()); + + if (!mDelayedConnectTask) { + return; + } + + mDelayedConnectTask->Cancel(); + ClearDelayedConnectTask(); +} + +// |DataSocketIO| + +nsresult +StreamSocketIO::QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer) +{ + MOZ_ASSERT(aBuffer); + + if (!mBuffer) { + mBuffer = MakeUnique<UnixSocketRawData>(MAX_READ_SIZE); + } + *aBuffer = mBuffer.get(); + + return NS_OK; +} + +/** + * |ReceiveTask| transfers data received on the I/O thread + * to an instance of |StreamSocket| on the consumer thread. + */ +class StreamSocketIO::ReceiveTask final : public SocketTask<StreamSocketIO> +{ +public: + ReceiveTask(StreamSocketIO* aIO, UnixSocketBuffer* aBuffer) + : SocketTask<StreamSocketIO>(aIO) + , mBuffer(aBuffer) + { + MOZ_COUNT_CTOR(ReceiveTask); + } + + ~ReceiveTask() + { + MOZ_COUNT_DTOR(ReceiveTask); + } + + NS_IMETHOD Run() override + { + StreamSocketIO* io = SocketTask<StreamSocketIO>::GetIO(); + + MOZ_ASSERT(io->IsConsumerThread()); + + if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) { + // Since we've already explicitly closed and the close + // happened before this, this isn't really an error. + return NS_OK; + } + + StreamSocket* streamSocket = io->GetStreamSocket(); + MOZ_ASSERT(streamSocket); + + streamSocket->ReceiveSocketData(mBuffer); + + return NS_OK; + } + +private: + UniquePtr<UnixSocketBuffer> mBuffer; +}; + +void +StreamSocketIO::ConsumeBuffer() +{ + GetConsumerThread()->PostTask( + MakeAndAddRef<ReceiveTask>(this, mBuffer.release())); +} + +void +StreamSocketIO::DiscardBuffer() +{ + // Nothing to do. +} + +// |SocketIOBase| + +SocketBase* +StreamSocketIO::GetSocketBase() +{ + return GetDataSocket(); +} + +bool +StreamSocketIO::IsShutdownOnConsumerThread() const +{ + MOZ_ASSERT(IsConsumerThread()); + + return mStreamSocket == nullptr; +} + +bool +StreamSocketIO::IsShutdownOnIOThread() const +{ + return mShuttingDownOnIOThread; +} + +void +StreamSocketIO::ShutdownOnConsumerThread() +{ + MOZ_ASSERT(IsConsumerThread()); + MOZ_ASSERT(!IsShutdownOnConsumerThread()); + + mStreamSocket = nullptr; +} + +void +StreamSocketIO::ShutdownOnIOThread() +{ + MOZ_ASSERT(!IsConsumerThread()); + MOZ_ASSERT(!mShuttingDownOnIOThread); + + Close(); // will also remove fd from I/O loop + mShuttingDownOnIOThread = true; +} + +// +// Socket tasks +// + +class StreamSocketIO::ConnectTask final : public SocketIOTask<StreamSocketIO> +{ +public: + ConnectTask(StreamSocketIO* aIO) + : SocketIOTask<StreamSocketIO>(aIO) + { + MOZ_COUNT_CTOR(ReceiveTask); + } + + ~ConnectTask() + { + MOZ_COUNT_DTOR(ReceiveTask); + } + + NS_IMETHOD Run() override + { + MOZ_ASSERT(!GetIO()->IsConsumerThread()); + MOZ_ASSERT(!IsCanceled()); + + GetIO()->Connect(); + + return NS_OK; + } +}; + +class StreamSocketIO::DelayedConnectTask final + : public SocketIOTask<StreamSocketIO> +{ +public: + DelayedConnectTask(StreamSocketIO* aIO) + : SocketIOTask<StreamSocketIO>(aIO) + { + MOZ_COUNT_CTOR(DelayedConnectTask); + } + + ~DelayedConnectTask() + { + MOZ_COUNT_DTOR(DelayedConnectTask); + } + + NS_IMETHOD Run() override + { + MOZ_ASSERT(GetIO()->IsConsumerThread()); + + if (IsCanceled()) { + return NS_OK; + } + + StreamSocketIO* io = GetIO(); + if (io->IsShutdownOnConsumerThread()) { + return NS_OK; + } + + io->ClearDelayedConnectTask(); + io->GetIOLoop()->PostTask(MakeAndAddRef<ConnectTask>(io)); + + return NS_OK; + } +}; + +// +// StreamSocket +// + +StreamSocket::StreamSocket(StreamSocketConsumer* aConsumer, int aIndex) + : mIO(nullptr) + , mConsumer(aConsumer) + , mIndex(aIndex) +{ + MOZ_ASSERT(mConsumer); + + MOZ_COUNT_CTOR_INHERITED(StreamSocket, ConnectionOrientedSocket); +} + +StreamSocket::~StreamSocket() +{ + MOZ_ASSERT(!mIO); + + MOZ_COUNT_DTOR_INHERITED(StreamSocket, ConnectionOrientedSocket); +} + +void +StreamSocket::ReceiveSocketData(UniquePtr<UnixSocketBuffer>& aBuffer) +{ + mConsumer->ReceiveSocketData(mIndex, aBuffer); +} + +nsresult +StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs, + MessageLoop* aConsumerLoop, MessageLoop* aIOLoop) +{ + MOZ_ASSERT(!mIO); + + mIO = new StreamSocketIO(aConsumerLoop, aIOLoop, this, aConnector); + SetConnectionStatus(SOCKET_CONNECTING); + + if (aDelayMs > 0) { + RefPtr<StreamSocketIO::DelayedConnectTask> connectTask = + MakeAndAddRef<StreamSocketIO::DelayedConnectTask>(mIO); + mIO->SetDelayedConnectTask(connectTask); + MessageLoop::current()->PostDelayedTask(connectTask.forget(), aDelayMs); + } else { + aIOLoop->PostTask(MakeAndAddRef<StreamSocketIO::ConnectTask>(mIO)); + } + + return NS_OK; +} + +nsresult +StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs) +{ + return Connect(aConnector, aDelayMs, + MessageLoop::current(), XRE_GetIOMessageLoop()); +} + +// |ConnectionOrientedSocket| + +nsresult +StreamSocket::PrepareAccept(UnixSocketConnector* aConnector, + MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + ConnectionOrientedSocketIO*& aIO) +{ + MOZ_ASSERT(!mIO); + MOZ_ASSERT(aConnector); + + SetConnectionStatus(SOCKET_CONNECTING); + + mIO = new StreamSocketIO(aConsumerLoop, aIOLoop, + -1, UnixSocketWatcher::SOCKET_IS_CONNECTING, + this, aConnector); + aIO = mIO; + + return NS_OK; +} + +// |DataSocket| + +void +StreamSocket::SendSocketData(UnixSocketIOBuffer* aBuffer) +{ + MOZ_ASSERT(mIO); + MOZ_ASSERT(mIO->IsConsumerThread()); + MOZ_ASSERT(!mIO->IsShutdownOnConsumerThread()); + + mIO->GetIOLoop()->PostTask( + MakeAndAddRef<SocketIOSendTask<StreamSocketIO, UnixSocketIOBuffer>>( + mIO, aBuffer)); +} + +// |SocketBase| + +void +StreamSocket::Close() +{ + MOZ_ASSERT(mIO); + MOZ_ASSERT(mIO->IsConsumerThread()); + + mIO->CancelDelayedConnectTask(); + + // From this point on, we consider |mIO| as being deleted. We sever + // the relationship here so any future calls to |Connect| will create + // a new I/O object. + mIO->ShutdownOnConsumerThread(); + mIO->GetIOLoop()->PostTask(MakeAndAddRef<SocketIOShutdownTask>(mIO)); + mIO = nullptr; + + NotifyDisconnect(); +} + +void +StreamSocket::OnConnectSuccess() +{ + mConsumer->OnConnectSuccess(mIndex); +} + +void +StreamSocket::OnConnectError() +{ + mConsumer->OnConnectError(mIndex); +} + +void +StreamSocket::OnDisconnect() +{ + mConsumer->OnDisconnect(mIndex); +} + +} // namespace ipc +} // namespace mozilla diff --git a/ipc/unixsocket/StreamSocket.h b/ipc/unixsocket/StreamSocket.h new file mode 100644 index 000000000..cb732389f --- /dev/null +++ b/ipc/unixsocket/StreamSocket.h @@ -0,0 +1,95 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef mozilla_ipc_streamsocket_h +#define mozilla_ipc_streamsocket_h + +#include "ConnectionOrientedSocket.h" + +class MessageLoop; + +namespace mozilla { +namespace ipc { + +class StreamSocketConsumer; +class StreamSocketIO; +class UnixSocketConnector; + +class StreamSocket final : public ConnectionOrientedSocket +{ +public: + /** + * Constructs an instance of |StreamSocket|. + * + * @param aConsumer The consumer for the socket. + * @param aIndex An arbitrary index. + */ + StreamSocket(StreamSocketConsumer* aConsumer, int aIndex); + + /** + * Method to be called whenever data is received. Consumer-thread only. + * + * @param aBuffer Data received from the socket. + */ + void ReceiveSocketData(UniquePtr<UnixSocketBuffer>& aBuffer); + + /** + * Starts a task on the socket that will try to connect to a socket in a + * non-blocking manner. + * + * @param aConnector Connector object for socket type specific functions + * @param aDelayMs Time delay in milliseconds. + * @param aConsumerLoop The socket's consumer thread. + * @param aIOLoop The socket's I/O thread. + * @return NS_OK on success, or an XPCOM error code otherwise. + */ + nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs, + MessageLoop* aConsumerLoop, MessageLoop* aIOLoop); + + /** + * Starts a task on the socket that will try to connect to a socket in a + * non-blocking manner. + * + * @param aConnector Connector object for socket type specific functions + * @param aDelayMs Time delay in milliseconds. + * @return NS_OK on success, or an XPCOM error code otherwise. + */ + nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs = 0); + + // Methods for |ConnectionOrientedSocket| + // + + nsresult PrepareAccept(UnixSocketConnector* aConnector, + MessageLoop* aConsumerLoop, + MessageLoop* aIOLoop, + ConnectionOrientedSocketIO*& aIO) override; + + // Methods for |DataSocket| + // + + void SendSocketData(UnixSocketIOBuffer* aBuffer) override; + + // Methods for |SocketBase| + // + + void Close() override; + void OnConnectSuccess() override; + void OnConnectError() override; + void OnDisconnect() override; + +protected: + virtual ~StreamSocket(); + +private: + StreamSocketIO* mIO; + StreamSocketConsumer* mConsumer; + int mIndex; +}; + +} // namespace ipc +} // namepsace mozilla + +#endif // mozilla_ipc_streamsocket_h diff --git a/ipc/unixsocket/StreamSocketConsumer.cpp b/ipc/unixsocket/StreamSocketConsumer.cpp new file mode 100644 index 000000000..bac9c1fe7 --- /dev/null +++ b/ipc/unixsocket/StreamSocketConsumer.cpp @@ -0,0 +1,20 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "StreamSocketConsumer.h" + +namespace mozilla { +namespace ipc { + +// +// StreamSocketConsumer +// + +StreamSocketConsumer::~StreamSocketConsumer() +{ } + +} +} diff --git a/ipc/unixsocket/StreamSocketConsumer.h b/ipc/unixsocket/StreamSocketConsumer.h new file mode 100644 index 000000000..d1f8c3d5b --- /dev/null +++ b/ipc/unixsocket/StreamSocketConsumer.h @@ -0,0 +1,60 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef mozilla_ipc_streamsocketconsumer_h +#define mozilla_ipc_streamsocketconsumer_h + +#include "mozilla/UniquePtr.h" + +namespace mozilla { +namespace ipc { + +class UnixSocketBuffer; + +/** + * |StreamSocketConsumer| handles socket events and received data. + */ +class StreamSocketConsumer +{ +public: + /** + * Method to be called whenever data is received. Consumer-thread only. + * + * @param aIndex The index that has been given to the stream socket. + * @param aBuffer Data received from the socket. + */ + virtual void ReceiveSocketData(int aIndex, + UniquePtr<UnixSocketBuffer>& aBuffer) = 0; + + /** + * Callback for socket success. Consumer-thread only. + * + * @param aIndex The index that has been given to the stream socket. + */ + virtual void OnConnectSuccess(int aIndex) = 0; + + /** + * Callback for socket errors. Consumer-thread only. + * + * @param aIndex The index that has been given to the stream socket. + */ + virtual void OnConnectError(int aIndex) = 0; + + /** + * Callback for socket disconnect. Consumer-thread only. + * + * @param aIndex The index that has been given to the stream socket. + */ + virtual void OnDisconnect(int aIndex) = 0; + +protected: + virtual ~StreamSocketConsumer(); +}; + +} +} + +#endif diff --git a/ipc/unixsocket/UnixSocketConnector.cpp b/ipc/unixsocket/UnixSocketConnector.cpp new file mode 100644 index 000000000..eb38ff6de --- /dev/null +++ b/ipc/unixsocket/UnixSocketConnector.cpp @@ -0,0 +1,38 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "UnixSocketConnector.h" +#include "nsISupportsImpl.h" // for MOZ_COUNT_CTOR, MOZ_COUNT_DTOR + +namespace mozilla { +namespace ipc { + +UnixSocketConnector::UnixSocketConnector() +{ + MOZ_COUNT_CTOR(UnixSocketConnector); +} + +UnixSocketConnector::~UnixSocketConnector() +{ + MOZ_COUNT_DTOR(UnixSocketConnector); +} + +nsresult +UnixSocketConnector::Duplicate(UniquePtr<UnixSocketConnector>& aConnector) +{ + UnixSocketConnector* connectorPtr; + auto rv = Duplicate(connectorPtr); + if (NS_FAILED(rv)) { + return rv; + } + + aConnector = Move(UniquePtr<UnixSocketConnector>(connectorPtr)); + + return NS_OK; +} + +} +} diff --git a/ipc/unixsocket/UnixSocketConnector.h b/ipc/unixsocket/UnixSocketConnector.h new file mode 100644 index 000000000..21d2c7860 --- /dev/null +++ b/ipc/unixsocket/UnixSocketConnector.h @@ -0,0 +1,102 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef mozilla_ipc_unixsocketconnector_h +#define mozilla_ipc_unixsocketconnector_h + +#include <sys/socket.h> +#include "mozilla/ipc/UnixSocketWatcher.h" +#include "nsString.h" + +namespace mozilla { +namespace ipc { + +/** + * |UnixSocketConnector| defines the socket creation and connection/listening + * functions for |UnixSocketConsumer|, et al. Due to the fact that socket setup + * can vary between protocols (Unix sockets, TCP sockets, Bluetooth sockets, etc), + * this allows the user to create whatever connection mechanism they need while + * still depending on libevent for non-blocking communication handling. + */ +class UnixSocketConnector +{ +public: + virtual ~UnixSocketConnector(); + + /** + * Converts an address to a human-readable string. + * + * @param aAddress A socket address + * @param aAddressLength The number of valid bytes in |aAddress| + * @param[out] aAddressString The resulting string + * @return NS_OK on success, or an XPCOM error code otherwise. + */ + virtual nsresult ConvertAddressToString(const struct sockaddr& aAddress, + socklen_t aAddressLength, + nsACString& aAddressString) = 0; + + /** + * Creates a listening socket. I/O thread only. + * + * @param[out] aAddress The listening socket's address + * @param[out] aAddressLength The number of valid bytes in |aAddress| + * @param[out] aListenFd The socket's file descriptor + * @return NS_OK on success, or an XPCOM error code otherwise. + */ + virtual nsresult CreateListenSocket(struct sockaddr* aAddress, + socklen_t* aAddressLength, + int& aListenFd) = 0; + + /** + * Accepts a stream socket from a listening socket. I/O thread only. + * + * @param aListenFd The listening socket + * @param[out] aAddress Returns the stream socket's address + * @param[out] aAddressLength Returns the number of valid bytes in |aAddress| + * @param[out] aStreamFd The stream socket's file descriptor + * @return NS_OK on success, or an XPCOM error code otherwise. + */ + virtual nsresult AcceptStreamSocket(int aListenFd, + struct sockaddr* aAddress, + socklen_t* aAddressLen, + int& aStreamFd) = 0; + + /** + * Creates a stream socket. I/O thread only. + * + * @param[in|out] aAddress The stream socket's address + * @param[in|out] aAddressLength The number of valid bytes in |aAddress| + * @param[out] aStreamFd The socket's file descriptor + * @return NS_OK on success, or an XPCOM error code otherwise. + */ + virtual nsresult CreateStreamSocket(struct sockaddr* aAddress, + socklen_t* aAddressLength, + int& aStreamFd) = 0; + + /** + * Copies the instance of |UnixSocketConnector|. I/O thread only. + * + * @param[in] aConnector Returns a new instance of the connector class + * @return NS_OK on success, or an XPCOM error code otherwise + */ + virtual nsresult Duplicate(UnixSocketConnector*& aConnector) = 0; + + /** + * Copies the instance of |UnixSocketConnector|. I/O thread only. + * + * @param[in] aConnector Returns a new instance of the connector class + * @return NS_OK on success, or an XPCOM error code otherwise + */ + nsresult Duplicate(UniquePtr<UnixSocketConnector>& aConnector); + +protected: + UnixSocketConnector(); +}; + +} +} + +#endif diff --git a/ipc/unixsocket/moz.build b/ipc/unixsocket/moz.build new file mode 100644 index 000000000..523aac3cb --- /dev/null +++ b/ipc/unixsocket/moz.build @@ -0,0 +1,31 @@ +# -*- Mode: python; indent-tabs-mode: nil; tab-width: 40 -*- +# vim: set filetype=python: +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +EXPORTS.mozilla.ipc += [ + 'ConnectionOrientedSocket.h', + 'DataSocket.h', + 'ListenSocket.h', + 'ListenSocketConsumer.h', + 'SocketBase.h', + 'StreamSocket.h', + 'StreamSocketConsumer.h', + 'UnixSocketConnector.h' +] + +SOURCES += [ + 'ConnectionOrientedSocket.cpp', + 'DataSocket.cpp', + 'ListenSocket.cpp', + 'ListenSocketConsumer.cpp', + 'SocketBase.cpp', + 'StreamSocket.cpp', + 'StreamSocketConsumer.cpp', + 'UnixSocketConnector.cpp' +] + +include('/ipc/chromium/chromium-config.mozbuild') + +FINAL_LIBRARY = 'xul' |