diff options
Diffstat (limited to 'ipc/glue/MessageChannel.cpp')
-rw-r--r-- | ipc/glue/MessageChannel.cpp | 2560 |
1 files changed, 2560 insertions, 0 deletions
diff --git a/ipc/glue/MessageChannel.cpp b/ipc/glue/MessageChannel.cpp new file mode 100644 index 000000000..70e2387d5 --- /dev/null +++ b/ipc/glue/MessageChannel.cpp @@ -0,0 +1,2560 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- + * vim: sw=4 ts=4 et : + */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "mozilla/ipc/MessageChannel.h" +#include "mozilla/ipc/ProtocolUtils.h" + +#include "mozilla/dom/ScriptSettings.h" + +#include "mozilla/Assertions.h" +#include "mozilla/DebugOnly.h" +#include "mozilla/Move.h" +#include "mozilla/SizePrintfMacros.h" +#include "mozilla/Sprintf.h" +#include "mozilla/Telemetry.h" +#include "mozilla/Logging.h" +#include "nsAutoPtr.h" +#include "nsDebug.h" +#include "nsISupportsImpl.h" +#include "nsContentUtils.h" + +using mozilla::Move; + +// Undo the damage done by mozzconf.h +#undef compress + +// Logging seems to be somewhat broken on b2g. +#ifdef MOZ_B2G +#define IPC_LOG(...) +#else +static mozilla::LazyLogModule sLogModule("ipc"); +#define IPC_LOG(...) MOZ_LOG(sLogModule, LogLevel::Debug, (__VA_ARGS__)) +#endif + +/* + * IPC design: + * + * There are three kinds of messages: async, sync, and intr. Sync and intr + * messages are blocking. + * + * Terminology: To dispatch a message Foo is to run the RecvFoo code for + * it. This is also called "handling" the message. + * + * Sync and async messages can sometimes "nest" inside other sync messages + * (i.e., while waiting for the sync reply, we can dispatch the inner + * message). Intr messages cannot nest. The three possible nesting levels are + * NOT_NESTED, NESTED_INSIDE_SYNC, and NESTED_INSIDE_CPOW. The intended uses + * are: + * NOT_NESTED - most messages. + * NESTED_INSIDE_SYNC - CPOW-related messages, which are always sync + * and can go in either direction. + * NESTED_INSIDE_CPOW - messages where we don't want to dispatch + * incoming CPOWs while waiting for the response. + * These nesting levels are ordered: NOT_NESTED, NESTED_INSIDE_SYNC, + * NESTED_INSIDE_CPOW. Async messages cannot be NESTED_INSIDE_SYNC but they can + * be NESTED_INSIDE_CPOW. + * + * To avoid jank, the parent process is not allowed to send NOT_NESTED sync messages. + * When a process is waiting for a response to a sync message + * M0, it will dispatch an incoming message M if: + * 1. M has a higher nesting level than M0, or + * 2. if M has the same nesting level as M0 and we're in the child, or + * 3. if M has the same nesting level as M0 and it was sent by the other side + * while dispatching M0. + * The idea is that messages with higher nesting should take precendence. The + * purpose of rule 2 is to handle a race where both processes send to each other + * simultaneously. In this case, we resolve the race in favor of the parent (so + * the child dispatches first). + * + * Messages satisfy the following properties: + * A. When waiting for a response to a sync message, we won't dispatch any + * messages of nesting level. + * B. Messages of the same nesting level will be dispatched roughly in the + * order they were sent. The exception is when the parent and child send + * sync messages to each other simulataneously. In this case, the parent's + * message is dispatched first. While it is dispatched, the child may send + * further nested messages, and these messages may be dispatched before the + * child's original message. We can consider ordering to be preserved here + * because we pretend that the child's original message wasn't sent until + * after the parent's message is finished being dispatched. + * + * When waiting for a sync message reply, we dispatch an async message only if + * it is NESTED_INSIDE_CPOW. Normally NESTED_INSIDE_CPOW async + * messages are sent only from the child. However, the parent can send + * NESTED_INSIDE_CPOW async messages when it is creating a bridged protocol. + * + * Intr messages are blocking and can nest, but they don't participate in the + * nesting levels. While waiting for an intr response, all incoming messages are + * dispatched until a response is received. When two intr messages race with + * each other, a similar scheme is used to ensure that one side wins. The + * winning side is chosen based on the message type. + * + * Intr messages differ from sync messages in that, while sending an intr + * message, we may dispatch an async message. This causes some additional + * complexity. One issue is that replies can be received out of order. It's also + * more difficult to determine whether one message is nested inside + * another. Consequently, intr handling uses mOutOfTurnReplies and + * mRemoteStackDepthGuess, which are not needed for sync messages. + */ + +using namespace mozilla; +using namespace mozilla::ipc; +using namespace std; + +using mozilla::dom::AutoNoJSAPI; +using mozilla::dom::ScriptSettingsInitialized; +using mozilla::MonitorAutoLock; +using mozilla::MonitorAutoUnlock; + +#define IPC_ASSERT(_cond, ...) \ + do { \ + if (!(_cond)) \ + DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__); \ + } while (0) + +static MessageChannel* gParentProcessBlocker; + +namespace mozilla { +namespace ipc { + +static const uint32_t kMinTelemetryMessageSize = 8192; + +const int32_t MessageChannel::kNoTimeout = INT32_MIN; + +// static +bool MessageChannel::sIsPumpingMessages = false; + +enum Direction +{ + IN_MESSAGE, + OUT_MESSAGE +}; + +class MessageChannel::InterruptFrame +{ +private: + enum Semantics + { + INTR_SEMS, + SYNC_SEMS, + ASYNC_SEMS + }; + +public: + InterruptFrame(Direction direction, const Message* msg) + : mMessageName(msg->name()), + mMessageRoutingId(msg->routing_id()), + mMesageSemantics(msg->is_interrupt() ? INTR_SEMS : + msg->is_sync() ? SYNC_SEMS : + ASYNC_SEMS), + mDirection(direction), + mMoved(false) + { + MOZ_RELEASE_ASSERT(mMessageName); + } + + InterruptFrame(InterruptFrame&& aOther) + { + MOZ_RELEASE_ASSERT(aOther.mMessageName); + mMessageName = aOther.mMessageName; + aOther.mMessageName = nullptr; + mMoved = aOther.mMoved; + aOther.mMoved = true; + + mMessageRoutingId = aOther.mMessageRoutingId; + mMesageSemantics = aOther.mMesageSemantics; + mDirection = aOther.mDirection; + } + + ~InterruptFrame() + { + MOZ_RELEASE_ASSERT(mMessageName || mMoved); + } + + InterruptFrame& operator=(InterruptFrame&& aOther) + { + MOZ_RELEASE_ASSERT(&aOther != this); + this->~InterruptFrame(); + new (this) InterruptFrame(Move(aOther)); + return *this; + } + + bool IsInterruptIncall() const + { + return INTR_SEMS == mMesageSemantics && IN_MESSAGE == mDirection; + } + + bool IsInterruptOutcall() const + { + return INTR_SEMS == mMesageSemantics && OUT_MESSAGE == mDirection; + } + + bool IsOutgoingSync() const { + return (mMesageSemantics == INTR_SEMS || mMesageSemantics == SYNC_SEMS) && + mDirection == OUT_MESSAGE; + } + + void Describe(int32_t* id, const char** dir, const char** sems, + const char** name) const + { + *id = mMessageRoutingId; + *dir = (IN_MESSAGE == mDirection) ? "in" : "out"; + *sems = (INTR_SEMS == mMesageSemantics) ? "intr" : + (SYNC_SEMS == mMesageSemantics) ? "sync" : + "async"; + *name = mMessageName; + } + + int32_t GetRoutingId() const + { + return mMessageRoutingId; + } + +private: + const char* mMessageName; + int32_t mMessageRoutingId; + Semantics mMesageSemantics; + Direction mDirection; + bool mMoved; + + // Disable harmful methods. + InterruptFrame(const InterruptFrame& aOther) = delete; + InterruptFrame& operator=(const InterruptFrame&) = delete; +}; + +class MOZ_STACK_CLASS MessageChannel::CxxStackFrame +{ +public: + CxxStackFrame(MessageChannel& that, Direction direction, const Message* msg) + : mThat(that) + { + mThat.AssertWorkerThread(); + + if (mThat.mCxxStackFrames.empty()) + mThat.EnteredCxxStack(); + + if (!mThat.mCxxStackFrames.append(InterruptFrame(direction, msg))) + MOZ_CRASH(); + + const InterruptFrame& frame = mThat.mCxxStackFrames.back(); + + if (frame.IsInterruptIncall()) + mThat.EnteredCall(); + + if (frame.IsOutgoingSync()) + mThat.EnteredSyncSend(); + + mThat.mSawInterruptOutMsg |= frame.IsInterruptOutcall(); + } + + ~CxxStackFrame() { + mThat.AssertWorkerThread(); + + MOZ_RELEASE_ASSERT(!mThat.mCxxStackFrames.empty()); + + const InterruptFrame& frame = mThat.mCxxStackFrames.back(); + bool exitingSync = frame.IsOutgoingSync(); + bool exitingCall = frame.IsInterruptIncall(); + mThat.mCxxStackFrames.shrinkBy(1); + + bool exitingStack = mThat.mCxxStackFrames.empty(); + + // According how lifetime is declared, mListener on MessageChannel + // lives longer than MessageChannel itself. Hence is expected to + // be alive. There is nothing to even assert here, there is no place + // we would be nullifying mListener on MessageChannel. + + if (exitingCall) + mThat.ExitedCall(); + + if (exitingSync) + mThat.ExitedSyncSend(); + + if (exitingStack) + mThat.ExitedCxxStack(); + } +private: + MessageChannel& mThat; + + // Disable harmful methods. + CxxStackFrame() = delete; + CxxStackFrame(const CxxStackFrame&) = delete; + CxxStackFrame& operator=(const CxxStackFrame&) = delete; +}; + +class AutoEnterTransaction +{ +public: + explicit AutoEnterTransaction(MessageChannel *aChan, + int32_t aMsgSeqno, + int32_t aTransactionID, + int aNestedLevel) + : mChan(aChan), + mActive(true), + mOutgoing(true), + mNestedLevel(aNestedLevel), + mSeqno(aMsgSeqno), + mTransaction(aTransactionID), + mNext(mChan->mTransactionStack) + { + mChan->mMonitor->AssertCurrentThreadOwns(); + mChan->mTransactionStack = this; + } + + explicit AutoEnterTransaction(MessageChannel *aChan, const IPC::Message &aMessage) + : mChan(aChan), + mActive(true), + mOutgoing(false), + mNestedLevel(aMessage.nested_level()), + mSeqno(aMessage.seqno()), + mTransaction(aMessage.transaction_id()), + mNext(mChan->mTransactionStack) + { + mChan->mMonitor->AssertCurrentThreadOwns(); + + if (!aMessage.is_sync()) { + mActive = false; + return; + } + + mChan->mTransactionStack = this; + } + + ~AutoEnterTransaction() { + mChan->mMonitor->AssertCurrentThreadOwns(); + if (mActive) { + mChan->mTransactionStack = mNext; + } + } + + void Cancel() { + AutoEnterTransaction *cur = mChan->mTransactionStack; + MOZ_RELEASE_ASSERT(cur == this); + while (cur && cur->mNestedLevel != IPC::Message::NOT_NESTED) { + // Note that, in the following situation, we will cancel multiple + // transactions: + // 1. Parent sends NESTED_INSIDE_SYNC message P1 to child. + // 2. Child sends NESTED_INSIDE_SYNC message C1 to child. + // 3. Child dispatches P1, parent blocks. + // 4. Child cancels. + // In this case, both P1 and C1 are cancelled. The parent will + // remove C1 from its queue when it gets the cancellation message. + MOZ_RELEASE_ASSERT(cur->mActive); + cur->mActive = false; + cur = cur->mNext; + } + + mChan->mTransactionStack = cur; + + MOZ_RELEASE_ASSERT(IsComplete()); + } + + bool AwaitingSyncReply() const { + MOZ_RELEASE_ASSERT(mActive); + if (mOutgoing) { + return true; + } + return mNext ? mNext->AwaitingSyncReply() : false; + } + + int AwaitingSyncReplyNestedLevel() const { + MOZ_RELEASE_ASSERT(mActive); + if (mOutgoing) { + return mNestedLevel; + } + return mNext ? mNext->AwaitingSyncReplyNestedLevel() : 0; + } + + bool DispatchingSyncMessage() const { + MOZ_RELEASE_ASSERT(mActive); + if (!mOutgoing) { + return true; + } + return mNext ? mNext->DispatchingSyncMessage() : false; + } + + int DispatchingSyncMessageNestedLevel() const { + MOZ_RELEASE_ASSERT(mActive); + if (!mOutgoing) { + return mNestedLevel; + } + return mNext ? mNext->DispatchingSyncMessageNestedLevel() : 0; + } + + int NestedLevel() const { + MOZ_RELEASE_ASSERT(mActive); + return mNestedLevel; + } + + int32_t SequenceNumber() const { + MOZ_RELEASE_ASSERT(mActive); + return mSeqno; + } + + int32_t TransactionID() const { + MOZ_RELEASE_ASSERT(mActive); + return mTransaction; + } + + void ReceivedReply(IPC::Message&& aMessage) { + MOZ_RELEASE_ASSERT(aMessage.seqno() == mSeqno); + MOZ_RELEASE_ASSERT(aMessage.transaction_id() == mTransaction); + MOZ_RELEASE_ASSERT(!mReply); + IPC_LOG("Reply received on worker thread: seqno=%d", mSeqno); + mReply = new IPC::Message(Move(aMessage)); + MOZ_RELEASE_ASSERT(IsComplete()); + } + + void HandleReply(IPC::Message&& aMessage) { + AutoEnterTransaction *cur = mChan->mTransactionStack; + MOZ_RELEASE_ASSERT(cur == this); + while (cur) { + MOZ_RELEASE_ASSERT(cur->mActive); + if (aMessage.seqno() == cur->mSeqno) { + cur->ReceivedReply(Move(aMessage)); + break; + } + cur = cur->mNext; + MOZ_RELEASE_ASSERT(cur); + } + } + + bool IsComplete() { + return !mActive || mReply; + } + + bool IsOutgoing() { + return mOutgoing; + } + + bool IsCanceled() { + return !mActive; + } + + bool IsBottom() const { + return !mNext; + } + + bool IsError() { + MOZ_RELEASE_ASSERT(mReply); + return mReply->is_reply_error(); + } + + nsAutoPtr<IPC::Message> GetReply() { + return Move(mReply); + } + +private: + MessageChannel *mChan; + + // Active is true if this transaction is on the mChan->mTransactionStack + // stack. Generally we're not on the stack if the transaction was canceled + // or if it was for a message that doesn't require transactions (an async + // message). + bool mActive; + + // Is this stack frame for an outgoing message? + bool mOutgoing; + + // Properties of the message being sent/received. + int mNestedLevel; + int32_t mSeqno; + int32_t mTransaction; + + // Next item in mChan->mTransactionStack. + AutoEnterTransaction *mNext; + + // Pointer the a reply received for this message, if one was received. + nsAutoPtr<IPC::Message> mReply; +}; + +MessageChannel::MessageChannel(IToplevelProtocol *aListener) + : mListener(aListener), + mChannelState(ChannelClosed), + mSide(UnknownSide), + mLink(nullptr), + mWorkerLoop(nullptr), + mChannelErrorTask(nullptr), + mWorkerLoopID(-1), + mTimeoutMs(kNoTimeout), + mInTimeoutSecondHalf(false), + mNextSeqno(0), + mLastSendError(SyncSendError::SendSuccess), + mDispatchingAsyncMessage(false), + mDispatchingAsyncMessageNestedLevel(0), + mTransactionStack(nullptr), + mTimedOutMessageSeqno(0), + mTimedOutMessageNestedLevel(0), + mRemoteStackDepthGuess(0), + mSawInterruptOutMsg(false), + mIsWaitingForIncoming(false), + mAbortOnError(false), + mNotifiedChannelDone(false), + mFlags(REQUIRE_DEFAULT), + mPeerPidSet(false), + mPeerPid(-1) +{ + MOZ_COUNT_CTOR(ipc::MessageChannel); + +#ifdef OS_WIN + mTopFrame = nullptr; + mIsSyncWaitingOnNonMainThread = false; +#endif + + mOnChannelConnectedTask = + NewNonOwningCancelableRunnableMethod(this, &MessageChannel::DispatchOnChannelConnected); + +#ifdef OS_WIN + mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr); + MOZ_RELEASE_ASSERT(mEvent, "CreateEvent failed! Nothing is going to work!"); +#endif +} + +MessageChannel::~MessageChannel() +{ + MOZ_COUNT_DTOR(ipc::MessageChannel); + IPC_ASSERT(mCxxStackFrames.empty(), "mismatched CxxStackFrame ctor/dtors"); +#ifdef OS_WIN + if (mEvent) { + BOOL ok = CloseHandle(mEvent); + mEvent = nullptr; + + if (!ok) { + gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure) << + "MessageChannel failed to close. GetLastError: " << + GetLastError(); + } + MOZ_RELEASE_ASSERT(ok); + } else { + gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure) << + "MessageChannel destructor ran without an mEvent Handle"; + } +#endif + Clear(); +} + +// This function returns the current transaction ID. Since the notion of a +// "current transaction" can be hard to define when messages race with each +// other and one gets canceled and the other doesn't, we require that this +// function is only called when the current transaction is known to be for a +// NESTED_INSIDE_SYNC message. In that case, we know for sure what the caller is +// looking for. +int32_t +MessageChannel::CurrentNestedInsideSyncTransaction() const +{ + mMonitor->AssertCurrentThreadOwns(); + if (!mTransactionStack) { + return 0; + } + MOZ_RELEASE_ASSERT(mTransactionStack->NestedLevel() == IPC::Message::NESTED_INSIDE_SYNC); + return mTransactionStack->TransactionID(); +} + +bool +MessageChannel::AwaitingSyncReply() const +{ + mMonitor->AssertCurrentThreadOwns(); + return mTransactionStack ? mTransactionStack->AwaitingSyncReply() : false; +} + +int +MessageChannel::AwaitingSyncReplyNestedLevel() const +{ + mMonitor->AssertCurrentThreadOwns(); + return mTransactionStack ? mTransactionStack->AwaitingSyncReplyNestedLevel() : 0; +} + +bool +MessageChannel::DispatchingSyncMessage() const +{ + mMonitor->AssertCurrentThreadOwns(); + return mTransactionStack ? mTransactionStack->DispatchingSyncMessage() : false; +} + +int +MessageChannel::DispatchingSyncMessageNestedLevel() const +{ + mMonitor->AssertCurrentThreadOwns(); + return mTransactionStack ? mTransactionStack->DispatchingSyncMessageNestedLevel() : 0; +} + +static void +PrintErrorMessage(Side side, const char* channelName, const char* msg) +{ + const char *from = (side == ChildSide) + ? "Child" + : ((side == ParentSide) ? "Parent" : "Unknown"); + printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", from, channelName, msg); +} + +bool +MessageChannel::Connected() const +{ + mMonitor->AssertCurrentThreadOwns(); + + // The transport layer allows us to send messages before + // receiving the "connected" ack from the remote side. + return (ChannelOpening == mChannelState || ChannelConnected == mChannelState); +} + +bool +MessageChannel::CanSend() const +{ + if (!mMonitor) { + return false; + } + MonitorAutoLock lock(*mMonitor); + return Connected(); +} + +void +MessageChannel::Clear() +{ + // Don't clear mWorkerLoopID; we use it in AssertLinkThread() and + // AssertWorkerThread(). + // + // Also don't clear mListener. If we clear it, then sending a message + // through this channel after it's Clear()'ed can cause this process to + // crash. + // + // In practice, mListener owns the channel, so the channel gets deleted + // before mListener. But just to be safe, mListener is a weak pointer. + + if (gParentProcessBlocker == this) { + gParentProcessBlocker = nullptr; + } + + mWorkerLoop = nullptr; + delete mLink; + mLink = nullptr; + + mOnChannelConnectedTask->Cancel(); + + if (mChannelErrorTask) { + mChannelErrorTask->Cancel(); + mChannelErrorTask = nullptr; + } + + // Free up any memory used by pending messages. + for (RefPtr<MessageTask> task : mPending) { + task->Clear(); + } + mPending.clear(); + + mOutOfTurnReplies.clear(); + while (!mDeferred.empty()) { + mDeferred.pop(); + } +} + +bool +MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide) +{ + NS_PRECONDITION(!mLink, "Open() called > once"); + + mMonitor = new RefCountedMonitor(); + mWorkerLoop = MessageLoop::current(); + mWorkerLoopID = mWorkerLoop->id(); + + ProcessLink *link = new ProcessLink(this); + link->Open(aTransport, aIOLoop, aSide); // :TODO: n.b.: sets mChild + mLink = link; + return true; +} + +bool +MessageChannel::Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide) +{ + // Opens a connection to another thread in the same process. + + // This handshake proceeds as follows: + // - Let A be the thread initiating the process (either child or parent) + // and B be the other thread. + // - A spawns thread for B, obtaining B's message loop + // - A creates ProtocolChild and ProtocolParent instances. + // Let PA be the one appropriate to A and PB the side for B. + // - A invokes PA->Open(PB, ...): + // - set state to mChannelOpening + // - this will place a work item in B's worker loop (see next bullet) + // and then spins until PB->mChannelState becomes mChannelConnected + // - meanwhile, on PB's worker loop, the work item is removed and: + // - invokes PB->SlaveOpen(PA, ...): + // - sets its state and that of PA to Connected + NS_PRECONDITION(aTargetChan, "Need a target channel"); + NS_PRECONDITION(ChannelClosed == mChannelState, "Not currently closed"); + + CommonThreadOpenInit(aTargetChan, aSide); + + Side oppSide = UnknownSide; + switch(aSide) { + case ChildSide: oppSide = ParentSide; break; + case ParentSide: oppSide = ChildSide; break; + case UnknownSide: break; + } + + mMonitor = new RefCountedMonitor(); + + MonitorAutoLock lock(*mMonitor); + mChannelState = ChannelOpening; + aTargetLoop->PostTask(NewNonOwningRunnableMethod + <MessageChannel*, Side>(aTargetChan, + &MessageChannel::OnOpenAsSlave, + this, oppSide)); + + while (ChannelOpening == mChannelState) + mMonitor->Wait(); + MOZ_RELEASE_ASSERT(ChannelConnected == mChannelState, "not connected when awoken"); + return (ChannelConnected == mChannelState); +} + +void +MessageChannel::OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide) +{ + // Invoked when the other side has begun the open. + NS_PRECONDITION(ChannelClosed == mChannelState, + "Not currently closed"); + NS_PRECONDITION(ChannelOpening == aTargetChan->mChannelState, + "Target channel not in the process of opening"); + + CommonThreadOpenInit(aTargetChan, aSide); + mMonitor = aTargetChan->mMonitor; + + MonitorAutoLock lock(*mMonitor); + MOZ_RELEASE_ASSERT(ChannelOpening == aTargetChan->mChannelState, + "Target channel not in the process of opening"); + mChannelState = ChannelConnected; + aTargetChan->mChannelState = ChannelConnected; + aTargetChan->mMonitor->Notify(); +} + +void +MessageChannel::CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide) +{ + mWorkerLoop = MessageLoop::current(); + mWorkerLoopID = mWorkerLoop->id(); + mLink = new ThreadLink(this, aTargetChan); + mSide = aSide; +} + +bool +MessageChannel::Echo(Message* aMsg) +{ + nsAutoPtr<Message> msg(aMsg); + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + if (MSG_ROUTING_NONE == msg->routing_id()) { + ReportMessageRouteError("MessageChannel::Echo"); + return false; + } + + MonitorAutoLock lock(*mMonitor); + + if (!Connected()) { + ReportConnectionError("MessageChannel", msg); + return false; + } + + mLink->EchoMessage(msg.forget()); + return true; +} + +bool +MessageChannel::Send(Message* aMsg) +{ + if (aMsg->size() >= kMinTelemetryMessageSize) { + Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE, + nsDependentCString(aMsg->name()), aMsg->size()); + } + + MOZ_RELEASE_ASSERT(!aMsg->is_sync()); + MOZ_RELEASE_ASSERT(aMsg->nested_level() != IPC::Message::NESTED_INSIDE_SYNC); + + CxxStackFrame frame(*this, OUT_MESSAGE, aMsg); + + nsAutoPtr<Message> msg(aMsg); + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + if (MSG_ROUTING_NONE == msg->routing_id()) { + ReportMessageRouteError("MessageChannel::Send"); + return false; + } + + MonitorAutoLock lock(*mMonitor); + if (!Connected()) { + ReportConnectionError("MessageChannel", msg); + return false; + } + mLink->SendMessage(msg.forget()); + return true; +} + +class CancelMessage : public IPC::Message +{ +public: + explicit CancelMessage(int transaction) : + IPC::Message(MSG_ROUTING_NONE, CANCEL_MESSAGE_TYPE) + { + set_transaction_id(transaction); + } + static bool Read(const Message* msg) { + return true; + } + void Log(const std::string& aPrefix, FILE* aOutf) const { + fputs("(special `Cancel' message)", aOutf); + } +}; + +bool +MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg) +{ + AssertLinkThread(); + mMonitor->AssertCurrentThreadOwns(); + + if (MSG_ROUTING_NONE == aMsg.routing_id()) { + if (GOODBYE_MESSAGE_TYPE == aMsg.type()) { + // :TODO: Sort out Close() on this side racing with Close() on the + // other side + mChannelState = ChannelClosing; + if (LoggingEnabled()) { + printf("NOTE: %s process received `Goodbye', closing down\n", + (mSide == ChildSide) ? "child" : "parent"); + } + return true; + } else if (CANCEL_MESSAGE_TYPE == aMsg.type()) { + IPC_LOG("Cancel from message"); + CancelTransaction(aMsg.transaction_id()); + NotifyWorkerThread(); + return true; + } + } + return false; +} + +bool +MessageChannel::ShouldDeferMessage(const Message& aMsg) +{ + // Never defer messages that have the highest nested level, even async + // ones. This is safe because only the child can send these messages, so + // they can never nest. + if (aMsg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW) + return false; + + // Unless they're NESTED_INSIDE_CPOW, we always defer async messages. + // Note that we never send an async NESTED_INSIDE_SYNC message. + if (!aMsg.is_sync()) { + MOZ_RELEASE_ASSERT(aMsg.nested_level() == IPC::Message::NOT_NESTED); + return true; + } + + int msgNestedLevel = aMsg.nested_level(); + int waitingNestedLevel = AwaitingSyncReplyNestedLevel(); + + // Always defer if the nested level of the incoming message is less than the + // nested level of the message we're awaiting. + if (msgNestedLevel < waitingNestedLevel) + return true; + + // Never defer if the message has strictly greater nested level. + if (msgNestedLevel > waitingNestedLevel) + return false; + + // When both sides send sync messages of the same nested level, we resolve the + // race by dispatching in the child and deferring the incoming message in + // the parent. However, the parent still needs to dispatch nested sync + // messages. + // + // Deferring in the parent only sort of breaks message ordering. When the + // child's message comes in, we can pretend the child hasn't quite + // finished sending it yet. Since the message is sync, we know that the + // child hasn't moved on yet. + return mSide == ParentSide && aMsg.transaction_id() != CurrentNestedInsideSyncTransaction(); +} + +void +MessageChannel::OnMessageReceivedFromLink(Message&& aMsg) +{ + AssertLinkThread(); + mMonitor->AssertCurrentThreadOwns(); + + if (MaybeInterceptSpecialIOMessage(aMsg)) + return; + + // Regardless of the Interrupt stack, if we're awaiting a sync reply, + // we know that it needs to be immediately handled to unblock us. + if (aMsg.is_sync() && aMsg.is_reply()) { + IPC_LOG("Received reply seqno=%d xid=%d", aMsg.seqno(), aMsg.transaction_id()); + + if (aMsg.seqno() == mTimedOutMessageSeqno) { + // Drop the message, but allow future sync messages to be sent. + IPC_LOG("Received reply to timedout message; igoring; xid=%d", mTimedOutMessageSeqno); + EndTimeout(); + return; + } + + MOZ_RELEASE_ASSERT(AwaitingSyncReply()); + MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno); + + mTransactionStack->HandleReply(Move(aMsg)); + NotifyWorkerThread(); + return; + } + + // Nested messages cannot be compressed. + MOZ_RELEASE_ASSERT(aMsg.compress_type() == IPC::Message::COMPRESSION_NONE || + aMsg.nested_level() == IPC::Message::NOT_NESTED); + + bool reuseTask = false; + if (aMsg.compress_type() == IPC::Message::COMPRESSION_ENABLED) { + bool compress = (!mPending.isEmpty() && + mPending.getLast()->Msg().type() == aMsg.type() && + mPending.getLast()->Msg().routing_id() == aMsg.routing_id()); + if (compress) { + // This message type has compression enabled, and the back of the + // queue was the same message type and routed to the same destination. + // Replace it with the newer message. + MOZ_RELEASE_ASSERT(mPending.getLast()->Msg().compress_type() == + IPC::Message::COMPRESSION_ENABLED); + mPending.getLast()->Msg() = Move(aMsg); + + reuseTask = true; + } + } else if (aMsg.compress_type() == IPC::Message::COMPRESSION_ALL && !mPending.isEmpty()) { + for (RefPtr<MessageTask> p = mPending.getLast(); p; p = p->getPrevious()) { + if (p->Msg().type() == aMsg.type() && + p->Msg().routing_id() == aMsg.routing_id()) + { + // This message type has compression enabled, and the queue + // holds a message with the same message type and routed to the + // same destination. Erase it. Note that, since we always + // compress these redundancies, There Can Be Only One. + MOZ_RELEASE_ASSERT(p->Msg().compress_type() == IPC::Message::COMPRESSION_ALL); + p->remove(); + break; + } + } + } + + bool wakeUpSyncSend = AwaitingSyncReply() && !ShouldDeferMessage(aMsg); + + bool shouldWakeUp = AwaitingInterruptReply() || + wakeUpSyncSend || + AwaitingIncomingMessage(); + + // Although we usually don't need to post a message task if + // shouldWakeUp is true, it's easier to post anyway than to have to + // guarantee that every Send call processes everything it's supposed to + // before returning. + bool shouldPostTask = !shouldWakeUp || wakeUpSyncSend; + + IPC_LOG("Receive on link thread; seqno=%d, xid=%d, shouldWakeUp=%d", + aMsg.seqno(), aMsg.transaction_id(), shouldWakeUp); + + if (reuseTask) { + return; + } + + // There are three cases we're concerned about, relating to the state of the + // main thread: + // + // (1) We are waiting on a sync reply - main thread is blocked on the + // IPC monitor. + // - If the message is NESTED_INSIDE_SYNC, we wake up the main thread to + // deliver the message depending on ShouldDeferMessage. Otherwise, we + // leave it in the mPending queue, posting a task to the main event + // loop, where it will be processed once the synchronous reply has been + // received. + // + // (2) We are waiting on an Interrupt reply - main thread is blocked on the + // IPC monitor. + // - Always notify and wake up the main thread. + // + // (3) We are not waiting on a reply. + // - We post a task to the main event loop. + // + // Note that, we may notify the main thread even though the monitor is not + // blocked. This is okay, since we always check for pending events before + // blocking again. + + RefPtr<MessageTask> task = new MessageTask(this, Move(aMsg)); + mPending.insertBack(task); + + if (shouldWakeUp) { + NotifyWorkerThread(); + } + + if (shouldPostTask) { + task->Post(); + } +} + +void +MessageChannel::PeekMessages(mozilla::function<bool(const Message& aMsg)> aInvoke) +{ + // FIXME: We shouldn't be holding the lock for aInvoke! + MonitorAutoLock lock(*mMonitor); + + for (RefPtr<MessageTask> it : mPending) { + const Message &msg = it->Msg(); + if (!aInvoke(msg)) { + break; + } + } +} + +void +MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction) +{ + mMonitor->AssertCurrentThreadOwns(); + + IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d", + aTransaction.SequenceNumber(), aTransaction.TransactionID()); + + // Loop until there aren't any more nested messages to process. + for (;;) { + // If we canceled during ProcessPendingRequest, then we need to leave + // immediately because the results of ShouldDeferMessage will be + // operating with weird state (as if no Send is in progress). That could + // cause even NOT_NESTED sync messages to be processed (but not + // NOT_NESTED async messages), which would break message ordering. + if (aTransaction.IsCanceled()) { + return; + } + + mozilla::Vector<Message> toProcess; + + for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) { + Message &msg = p->Msg(); + + MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(), + "Calling ShouldDeferMessage when cancelled"); + bool defer = ShouldDeferMessage(msg); + + // Only log the interesting messages. + if (msg.is_sync() || msg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW) { + IPC_LOG("ShouldDeferMessage(seqno=%d) = %d", msg.seqno(), defer); + } + + if (!defer) { + if (!toProcess.append(Move(msg))) + MOZ_CRASH(); + + p = p->removeAndGetNext(); + continue; + } + p = p->getNext(); + } + + if (toProcess.empty()) { + break; + } + + // Processing these messages could result in more messages, so we + // loop around to check for more afterwards. + + for (auto it = toProcess.begin(); it != toProcess.end(); it++) { + ProcessPendingRequest(Move(*it)); + } + } +} + +bool +MessageChannel::Send(Message* aMsg, Message* aReply) +{ + if (aMsg->size() >= kMinTelemetryMessageSize) { + Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE, + nsDependentCString(aMsg->name()), aMsg->size()); + } + + nsAutoPtr<Message> msg(aMsg); + + // Sanity checks. + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + +#ifdef OS_WIN + SyncStackFrame frame(this, false); + NeuteredWindowRegion neuteredRgn(mFlags & REQUIRE_DEFERRED_MESSAGE_PROTECTION); +#endif + + CxxStackFrame f(*this, OUT_MESSAGE, msg); + + MonitorAutoLock lock(*mMonitor); + + if (mTimedOutMessageSeqno) { + // Don't bother sending another sync message if a previous one timed out + // and we haven't received a reply for it. Once the original timed-out + // message receives a reply, we'll be able to send more sync messages + // again. + IPC_LOG("Send() failed due to previous timeout"); + mLastSendError = SyncSendError::PreviousTimeout; + return false; + } + + if (DispatchingSyncMessageNestedLevel() == IPC::Message::NOT_NESTED && + msg->nested_level() > IPC::Message::NOT_NESTED) + { + // Don't allow sending CPOWs while we're dispatching a sync message. + // If you want to do that, use sendRpcMessage instead. + IPC_LOG("Nested level forbids send"); + mLastSendError = SyncSendError::SendingCPOWWhileDispatchingSync; + return false; + } + + if (DispatchingSyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW || + DispatchingAsyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW) + { + // Generally only the parent dispatches urgent messages. And the only + // sync messages it can send are NESTED_INSIDE_SYNC. Mainly we want to ensure + // here that we don't return false for non-CPOW messages. + MOZ_RELEASE_ASSERT(msg->nested_level() == IPC::Message::NESTED_INSIDE_SYNC); + IPC_LOG("Sending while dispatching urgent message"); + mLastSendError = SyncSendError::SendingCPOWWhileDispatchingUrgent; + return false; + } + + if (msg->nested_level() < DispatchingSyncMessageNestedLevel() || + msg->nested_level() < AwaitingSyncReplyNestedLevel()) + { + MOZ_RELEASE_ASSERT(DispatchingSyncMessage() || DispatchingAsyncMessage()); + IPC_LOG("Cancel from Send"); + CancelMessage *cancel = new CancelMessage(CurrentNestedInsideSyncTransaction()); + CancelTransaction(CurrentNestedInsideSyncTransaction()); + mLink->SendMessage(cancel); + } + + IPC_ASSERT(msg->is_sync(), "can only Send() sync messages here"); + + IPC_ASSERT(msg->nested_level() >= DispatchingSyncMessageNestedLevel(), + "can't send sync message of a lesser nested level than what's being dispatched"); + IPC_ASSERT(AwaitingSyncReplyNestedLevel() <= msg->nested_level(), + "nested sync message sends must be of increasing nested level"); + IPC_ASSERT(DispatchingSyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW, + "not allowed to send messages while dispatching urgent messages"); + + IPC_ASSERT(DispatchingAsyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW, + "not allowed to send messages while dispatching urgent messages"); + + if (!Connected()) { + ReportConnectionError("MessageChannel::SendAndWait", msg); + mLastSendError = SyncSendError::NotConnectedBeforeSend; + return false; + } + + msg->set_seqno(NextSeqno()); + + int32_t seqno = msg->seqno(); + int nestedLevel = msg->nested_level(); + msgid_t replyType = msg->type() + 1; + + AutoEnterTransaction *stackTop = mTransactionStack; + + // If the most recent message on the stack is NESTED_INSIDE_SYNC, then our + // message should nest inside that and we use the same transaction + // ID. Otherwise we need a new transaction ID (so we use the seqno of the + // message we're sending). + bool nest = stackTop && stackTop->NestedLevel() == IPC::Message::NESTED_INSIDE_SYNC; + int32_t transaction = nest ? stackTop->TransactionID() : seqno; + msg->set_transaction_id(transaction); + + bool handleWindowsMessages = mListener->HandleWindowsMessages(*aMsg); + AutoEnterTransaction transact(this, seqno, transaction, nestedLevel); + + IPC_LOG("Send seqno=%d, xid=%d", seqno, transaction); + + // msg will be destroyed soon, but name() is not owned by msg. + const char* msgName = msg->name(); + + mLink->SendMessage(msg.forget()); + + while (true) { + MOZ_RELEASE_ASSERT(!transact.IsCanceled()); + ProcessPendingRequests(transact); + if (transact.IsComplete()) { + break; + } + if (!Connected()) { + ReportConnectionError("MessageChannel::Send"); + mLastSendError = SyncSendError::DisconnectedDuringSend; + return false; + } + + MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno); + MOZ_RELEASE_ASSERT(!transact.IsComplete()); + MOZ_RELEASE_ASSERT(mTransactionStack == &transact); + + bool maybeTimedOut = !WaitForSyncNotify(handleWindowsMessages); + + if (mListener->NeedArtificialSleep()) { + MonitorAutoUnlock unlock(*mMonitor); + mListener->ArtificialSleep(); + } + + if (!Connected()) { + ReportConnectionError("MessageChannel::SendAndWait"); + mLastSendError = SyncSendError::DisconnectedDuringSend; + return false; + } + + if (transact.IsCanceled()) { + break; + } + + MOZ_RELEASE_ASSERT(mTransactionStack == &transact); + + // We only time out a message if it initiated a new transaction (i.e., + // if neither side has any other message Sends on the stack). + bool canTimeOut = transact.IsBottom(); + if (maybeTimedOut && canTimeOut && !ShouldContinueFromTimeout()) { + // Since ShouldContinueFromTimeout drops the lock, we need to + // re-check all our conditions here. We shouldn't time out if any of + // these things happen because there won't be a reply to the timed + // out message in these cases. + if (transact.IsComplete()) { + break; + } + + IPC_LOG("Timing out Send: xid=%d", transaction); + + mTimedOutMessageSeqno = seqno; + mTimedOutMessageNestedLevel = nestedLevel; + mLastSendError = SyncSendError::TimedOut; + return false; + } + + if (transact.IsCanceled()) { + break; + } + } + + if (transact.IsCanceled()) { + IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction); + mLastSendError = SyncSendError::CancelledAfterSend; + return false; + } + + if (transact.IsError()) { + IPC_LOG("Error: seqno=%d, xid=%d", seqno, transaction); + mLastSendError = SyncSendError::ReplyError; + return false; + } + + IPC_LOG("Got reply: seqno=%d, xid=%d", seqno, transaction); + + nsAutoPtr<Message> reply = transact.GetReply(); + + MOZ_RELEASE_ASSERT(reply); + MOZ_RELEASE_ASSERT(reply->is_reply(), "expected reply"); + MOZ_RELEASE_ASSERT(!reply->is_reply_error()); + MOZ_RELEASE_ASSERT(reply->seqno() == seqno); + MOZ_RELEASE_ASSERT(reply->type() == replyType, "wrong reply type"); + MOZ_RELEASE_ASSERT(reply->is_sync()); + + *aReply = Move(*reply); + if (aReply->size() >= kMinTelemetryMessageSize) { + Telemetry::Accumulate(Telemetry::IPC_REPLY_SIZE, + nsDependentCString(msgName), aReply->size()); + } + return true; +} + +bool +MessageChannel::Call(Message* aMsg, Message* aReply) +{ + nsAutoPtr<Message> msg(aMsg); + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + +#ifdef OS_WIN + SyncStackFrame frame(this, true); +#endif + + // This must come before MonitorAutoLock, as its destructor acquires the + // monitor lock. + CxxStackFrame cxxframe(*this, OUT_MESSAGE, msg); + + MonitorAutoLock lock(*mMonitor); + if (!Connected()) { + ReportConnectionError("MessageChannel::Call", msg); + return false; + } + + // Sanity checks. + IPC_ASSERT(!AwaitingSyncReply(), + "cannot issue Interrupt call while blocked on sync request"); + IPC_ASSERT(!DispatchingSyncMessage(), + "violation of sync handler invariant"); + IPC_ASSERT(msg->is_interrupt(), "can only Call() Interrupt messages here"); + + msg->set_seqno(NextSeqno()); + msg->set_interrupt_remote_stack_depth_guess(mRemoteStackDepthGuess); + msg->set_interrupt_local_stack_depth(1 + InterruptStackDepth()); + mInterruptStack.push(MessageInfo(*msg)); + mLink->SendMessage(msg.forget()); + + while (true) { + // if a handler invoked by *Dispatch*() spun a nested event + // loop, and the connection was broken during that loop, we + // might have already processed the OnError event. if so, + // trying another loop iteration will be futile because + // channel state will have been cleared + if (!Connected()) { + ReportConnectionError("MessageChannel::Call"); + return false; + } + +#ifdef OS_WIN + // We need to limit the scoped of neuteredRgn to this spot in the code. + // Window neutering can't be enabled during some plugin calls because + // we then risk the neutered window procedure being subclassed by a + // plugin. + { + NeuteredWindowRegion neuteredRgn(mFlags & REQUIRE_DEFERRED_MESSAGE_PROTECTION); + /* We should pump messages at this point to ensure that the IPC peer + does not become deadlocked on a pending inter-thread SendMessage() */ + neuteredRgn.PumpOnce(); + } +#endif + + // Now might be the time to process a message deferred because of race + // resolution. + MaybeUndeferIncall(); + + // Wait for an event to occur. + while (!InterruptEventOccurred()) { + bool maybeTimedOut = !WaitForInterruptNotify(); + + // We might have received a "subtly deferred" message in a nested + // loop that it's now time to process. + if (InterruptEventOccurred() || + (!maybeTimedOut && (!mDeferred.empty() || !mOutOfTurnReplies.empty()))) + { + break; + } + + if (maybeTimedOut && !ShouldContinueFromTimeout()) + return false; + } + + Message recvd; + MessageMap::iterator it; + + if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno())) + != mOutOfTurnReplies.end()) + { + recvd = Move(it->second); + mOutOfTurnReplies.erase(it); + } else if (!mPending.isEmpty()) { + RefPtr<MessageTask> task = mPending.popFirst(); + recvd = Move(task->Msg()); + } else { + // because of subtleties with nested event loops, it's possible + // that we got here and nothing happened. or, we might have a + // deferred in-call that needs to be processed. either way, we + // won't break the inner while loop again until something new + // happens. + continue; + } + + // If the message is not Interrupt, we can dispatch it as normal. + if (!recvd.is_interrupt()) { + DispatchMessage(Move(recvd)); + if (!Connected()) { + ReportConnectionError("MessageChannel::DispatchMessage"); + return false; + } + continue; + } + + // If the message is an Interrupt reply, either process it as a reply to our + // call, or add it to the list of out-of-turn replies we've received. + if (recvd.is_reply()) { + IPC_ASSERT(!mInterruptStack.empty(), "invalid Interrupt stack"); + + // If this is not a reply the call we've initiated, add it to our + // out-of-turn replies and keep polling for events. + { + const MessageInfo &outcall = mInterruptStack.top(); + + // Note, In the parent, sequence numbers increase from 0, and + // in the child, they decrease from 0. + if ((mSide == ChildSide && recvd.seqno() > outcall.seqno()) || + (mSide != ChildSide && recvd.seqno() < outcall.seqno())) + { + mOutOfTurnReplies[recvd.seqno()] = Move(recvd); + continue; + } + + IPC_ASSERT(recvd.is_reply_error() || + (recvd.type() == (outcall.type() + 1) && + recvd.seqno() == outcall.seqno()), + "somebody's misbehavin'", true); + } + + // We received a reply to our most recent outstanding call. Pop + // this frame and return the reply. + mInterruptStack.pop(); + + bool is_reply_error = recvd.is_reply_error(); + if (!is_reply_error) { + *aReply = Move(recvd); + } + + // If we have no more pending out calls waiting on replies, then + // the reply queue should be empty. + IPC_ASSERT(!mInterruptStack.empty() || mOutOfTurnReplies.empty(), + "still have pending replies with no pending out-calls", + true); + + return !is_reply_error; + } + + // Dispatch an Interrupt in-call. Snapshot the current stack depth while we + // own the monitor. + size_t stackDepth = InterruptStackDepth(); + { + MonitorAutoUnlock unlock(*mMonitor); + + CxxStackFrame frame(*this, IN_MESSAGE, &recvd); + DispatchInterruptMessage(Move(recvd), stackDepth); + } + if (!Connected()) { + ReportConnectionError("MessageChannel::DispatchInterruptMessage"); + return false; + } + } + + return true; +} + +bool +MessageChannel::WaitForIncomingMessage() +{ +#ifdef OS_WIN + SyncStackFrame frame(this, true); + NeuteredWindowRegion neuteredRgn(mFlags & REQUIRE_DEFERRED_MESSAGE_PROTECTION); +#endif + + MonitorAutoLock lock(*mMonitor); + AutoEnterWaitForIncoming waitingForIncoming(*this); + if (mChannelState != ChannelConnected) { + return false; + } + if (!HasPendingEvents()) { + return WaitForInterruptNotify(); + } + + MOZ_RELEASE_ASSERT(!mPending.isEmpty()); + RefPtr<MessageTask> task = mPending.getFirst(); + RunMessage(*task); + return true; +} + +bool +MessageChannel::HasPendingEvents() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + return Connected() && !mPending.isEmpty(); +} + +bool +MessageChannel::InterruptEventOccurred() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop"); + + return (!Connected() || + !mPending.isEmpty() || + (!mOutOfTurnReplies.empty() && + mOutOfTurnReplies.find(mInterruptStack.top().seqno()) != + mOutOfTurnReplies.end())); +} + +bool +MessageChannel::ProcessPendingRequest(Message &&aUrgent) +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + IPC_LOG("Process pending: seqno=%d, xid=%d", aUrgent.seqno(), aUrgent.transaction_id()); + + DispatchMessage(Move(aUrgent)); + if (!Connected()) { + ReportConnectionError("MessageChannel::ProcessPendingRequest"); + return false; + } + + return true; +} + +bool +MessageChannel::ShouldRunMessage(const Message& aMsg) +{ + if (!mTimedOutMessageSeqno) { + return true; + } + + // If we've timed out a message and we're awaiting the reply to the timed + // out message, we have to be careful what messages we process. Here's what + // can go wrong: + // 1. child sends a NOT_NESTED sync message S + // 2. parent sends a NESTED_INSIDE_SYNC sync message H at the same time + // 3. parent times out H + // 4. child starts processing H and sends a NESTED_INSIDE_SYNC message H' nested + // within the same transaction + // 5. parent dispatches S and sends reply + // 6. child asserts because it instead expected a reply to H'. + // + // To solve this, we refuse to process S in the parent until we get a reply + // to H. More generally, let the timed out message be M. We don't process a + // message unless the child would need the response to that message in order + // to process M. Those messages are the ones that have a higher nested level + // than M or that are part of the same transaction as M. + if (aMsg.nested_level() < mTimedOutMessageNestedLevel || + (aMsg.nested_level() == mTimedOutMessageNestedLevel + && aMsg.transaction_id() != mTimedOutMessageSeqno)) + { + return false; + } + + return true; +} + +void +MessageChannel::RunMessage(MessageTask& aTask) +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + Message& msg = aTask.Msg(); + + if (!Connected()) { + ReportConnectionError("RunMessage"); + return; + } + + // Check that we're going to run the first message that's valid to run. +#ifdef DEBUG + for (RefPtr<MessageTask> task : mPending) { + if (task == &aTask) { + break; + } + + MOZ_ASSERT(!ShouldRunMessage(task->Msg()) || + aTask.Msg().priority() != task->Msg().priority()); + + } +#endif + + if (!mDeferred.empty()) { + MaybeUndeferIncall(); + } + + if (!ShouldRunMessage(msg)) { + return; + } + + MOZ_RELEASE_ASSERT(aTask.isInList()); + aTask.remove(); + + if (IsOnCxxStack() && msg.is_interrupt() && msg.is_reply()) { + // We probably just received a reply in a nested loop for an + // Interrupt call sent before entering that loop. + mOutOfTurnReplies[msg.seqno()] = Move(msg); + return; + } + + DispatchMessage(Move(msg)); +} + +NS_IMPL_ISUPPORTS_INHERITED(MessageChannel::MessageTask, CancelableRunnable, nsIRunnablePriority) + +nsresult +MessageChannel::MessageTask::Run() +{ + if (!mChannel) { + return NS_OK; + } + + mChannel->AssertWorkerThread(); + mChannel->mMonitor->AssertNotCurrentThreadOwns(); + + MonitorAutoLock lock(*mChannel->mMonitor); + + // In case we choose not to run this message, we may need to be able to Post + // it again. + mScheduled = false; + + if (!isInList()) { + return NS_OK; + } + + mChannel->RunMessage(*this); + return NS_OK; +} + +// Warning: This method removes the receiver from whatever list it might be in. +nsresult +MessageChannel::MessageTask::Cancel() +{ + if (!mChannel) { + return NS_OK; + } + + mChannel->AssertWorkerThread(); + mChannel->mMonitor->AssertNotCurrentThreadOwns(); + + MonitorAutoLock lock(*mChannel->mMonitor); + + if (!isInList()) { + return NS_OK; + } + remove(); + + return NS_OK; +} + +void +MessageChannel::MessageTask::Post() +{ + MOZ_RELEASE_ASSERT(!mScheduled); + MOZ_RELEASE_ASSERT(isInList()); + + mScheduled = true; + + RefPtr<MessageTask> self = this; + mChannel->mWorkerLoop->PostTask(self.forget()); +} + +void +MessageChannel::MessageTask::Clear() +{ + mChannel->AssertWorkerThread(); + + mChannel = nullptr; +} + +NS_IMETHODIMP +MessageChannel::MessageTask::GetPriority(uint32_t* aPriority) +{ + *aPriority = mMessage.priority() == Message::HIGH_PRIORITY ? + PRIORITY_HIGH : PRIORITY_NORMAL; + return NS_OK; +} + +void +MessageChannel::DispatchMessage(Message &&aMsg) +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + Maybe<AutoNoJSAPI> nojsapi; + if (ScriptSettingsInitialized() && NS_IsMainThread()) + nojsapi.emplace(); + + nsAutoPtr<Message> reply; + + IPC_LOG("DispatchMessage: seqno=%d, xid=%d", aMsg.seqno(), aMsg.transaction_id()); + + { + AutoEnterTransaction transaction(this, aMsg); + + int id = aMsg.transaction_id(); + MOZ_RELEASE_ASSERT(!aMsg.is_sync() || id == transaction.TransactionID()); + + { + MonitorAutoUnlock unlock(*mMonitor); + CxxStackFrame frame(*this, IN_MESSAGE, &aMsg); + + mListener->ArtificialSleep(); + + if (aMsg.is_sync()) + DispatchSyncMessage(aMsg, *getter_Transfers(reply)); + else if (aMsg.is_interrupt()) + DispatchInterruptMessage(Move(aMsg), 0); + else + DispatchAsyncMessage(aMsg); + + mListener->ArtificialSleep(); + } + + if (reply && transaction.IsCanceled()) { + // The transaction has been canceled. Don't send a reply. + IPC_LOG("Nulling out reply due to cancellation, seqno=%d, xid=%d", aMsg.seqno(), id); + reply = nullptr; + } + } + + if (reply && ChannelConnected == mChannelState) { + IPC_LOG("Sending reply seqno=%d, xid=%d", aMsg.seqno(), aMsg.transaction_id()); + mLink->SendMessage(reply.forget()); + } +} + +void +MessageChannel::DispatchSyncMessage(const Message& aMsg, Message*& aReply) +{ + AssertWorkerThread(); + + int nestedLevel = aMsg.nested_level(); + + MOZ_RELEASE_ASSERT(nestedLevel == IPC::Message::NOT_NESTED || NS_IsMainThread()); + + MessageChannel* dummy; + MessageChannel*& blockingVar = mSide == ChildSide && NS_IsMainThread() ? gParentProcessBlocker : dummy; + + Result rv; + { + AutoSetValue<MessageChannel*> blocked(blockingVar, this); + rv = mListener->OnMessageReceived(aMsg, aReply); + } + + if (!MaybeHandleError(rv, aMsg, "DispatchSyncMessage")) { + aReply = new Message(); + aReply->set_sync(); + aReply->set_nested_level(aMsg.nested_level()); + aReply->set_reply(); + aReply->set_reply_error(); + } + aReply->set_seqno(aMsg.seqno()); + aReply->set_transaction_id(aMsg.transaction_id()); +} + +void +MessageChannel::DispatchAsyncMessage(const Message& aMsg) +{ + AssertWorkerThread(); + MOZ_RELEASE_ASSERT(!aMsg.is_interrupt() && !aMsg.is_sync()); + + if (aMsg.routing_id() == MSG_ROUTING_NONE) { + NS_RUNTIMEABORT("unhandled special message!"); + } + + Result rv; + { + int nestedLevel = aMsg.nested_level(); + AutoSetValue<bool> async(mDispatchingAsyncMessage, true); + AutoSetValue<int> nestedLevelSet(mDispatchingAsyncMessageNestedLevel, nestedLevel); + rv = mListener->OnMessageReceived(aMsg); + } + MaybeHandleError(rv, aMsg, "DispatchAsyncMessage"); +} + +bool +MessageChannel::ShouldDeferInterruptMessage(const Message& aMsg, size_t aStackDepth) +{ + AssertWorkerThread(); + + // We may or may not own the lock in this function, so don't access any + // channel state. + + IPC_ASSERT(aMsg.is_interrupt() && !aMsg.is_reply(), "wrong message type"); + + // Race detection: see the long comment near mRemoteStackDepthGuess in + // MessageChannel.h. "Remote" stack depth means our side, and "local" means + // the other side. + if (aMsg.interrupt_remote_stack_depth_guess() == RemoteViewOfStackDepth(aStackDepth)) { + return false; + } + + // Interrupt in-calls have raced. The winner, if there is one, gets to defer + // processing of the other side's in-call. + bool defer; + const MessageInfo parentMsgInfo = + (mSide == ChildSide) ? MessageInfo(aMsg) : mInterruptStack.top(); + const MessageInfo childMsgInfo = + (mSide == ChildSide) ? mInterruptStack.top() : MessageInfo(aMsg); + switch (mListener->MediateInterruptRace(parentMsgInfo, childMsgInfo)) + { + case RIPChildWins: + defer = (mSide == ChildSide); + break; + case RIPParentWins: + defer = (mSide != ChildSide); + break; + case RIPError: + MOZ_CRASH("NYI: 'Error' Interrupt race policy"); + default: + MOZ_CRASH("not reached"); + } + + return defer; +} + +void +MessageChannel::DispatchInterruptMessage(Message&& aMsg, size_t stackDepth) +{ + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + + IPC_ASSERT(aMsg.is_interrupt() && !aMsg.is_reply(), "wrong message type"); + + if (ShouldDeferInterruptMessage(aMsg, stackDepth)) { + // We now know the other side's stack has one more frame + // than we thought. + ++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred() + mDeferred.push(Move(aMsg)); + return; + } + + // If we "lost" a race and need to process the other side's in-call, we + // don't need to fix up the mRemoteStackDepthGuess here, because we're just + // about to increment it, which will make it correct again. + +#ifdef OS_WIN + SyncStackFrame frame(this, true); +#endif + + nsAutoPtr<Message> reply; + + ++mRemoteStackDepthGuess; + Result rv = mListener->OnCallReceived(aMsg, *getter_Transfers(reply)); + --mRemoteStackDepthGuess; + + if (!MaybeHandleError(rv, aMsg, "DispatchInterruptMessage")) { + reply = new Message(); + reply->set_interrupt(); + reply->set_reply(); + reply->set_reply_error(); + } + reply->set_seqno(aMsg.seqno()); + + MonitorAutoLock lock(*mMonitor); + if (ChannelConnected == mChannelState) { + mLink->SendMessage(reply.forget()); + } +} + +void +MessageChannel::MaybeUndeferIncall() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + if (mDeferred.empty()) + return; + + size_t stackDepth = InterruptStackDepth(); + + Message& deferred = mDeferred.top(); + + // the other side can only *under*-estimate our actual stack depth + IPC_ASSERT(deferred.interrupt_remote_stack_depth_guess() <= stackDepth, + "fatal logic error"); + + if (ShouldDeferInterruptMessage(deferred, stackDepth)) { + return; + } + + // maybe time to process this message + Message call(Move(deferred)); + mDeferred.pop(); + + // fix up fudge factor we added to account for race + IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error"); + --mRemoteStackDepthGuess; + + MOZ_RELEASE_ASSERT(call.nested_level() == IPC::Message::NOT_NESTED); + RefPtr<MessageTask> task = new MessageTask(this, Move(call)); + mPending.insertBack(task); + task->Post(); +} + +void +MessageChannel::EnteredCxxStack() +{ + mListener->EnteredCxxStack(); +} + +void +MessageChannel::ExitedCxxStack() +{ + mListener->ExitedCxxStack(); + if (mSawInterruptOutMsg) { + MonitorAutoLock lock(*mMonitor); + // see long comment in OnMaybeDequeueOne() + EnqueuePendingMessages(); + mSawInterruptOutMsg = false; + } +} + +void +MessageChannel::EnteredCall() +{ + mListener->EnteredCall(); +} + +void +MessageChannel::ExitedCall() +{ + mListener->ExitedCall(); +} + +void +MessageChannel::EnteredSyncSend() +{ + mListener->OnEnteredSyncSend(); +} + +void +MessageChannel::ExitedSyncSend() +{ + mListener->OnExitedSyncSend(); +} + +void +MessageChannel::EnqueuePendingMessages() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + MaybeUndeferIncall(); + + // XXX performance tuning knob: could process all or k pending + // messages here, rather than enqueuing for later processing + + RepostAllMessages(); +} + +static inline bool +IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout) +{ + return (aTimeout != PR_INTERVAL_NO_TIMEOUT) && + (aTimeout <= (PR_IntervalNow() - aStart)); +} + +bool +MessageChannel::WaitResponse(bool aWaitTimedOut) +{ + if (aWaitTimedOut) { + if (mInTimeoutSecondHalf) { + // We've really timed out this time. + return false; + } + // Try a second time. + mInTimeoutSecondHalf = true; + } else { + mInTimeoutSecondHalf = false; + } + return true; +} + +#ifndef OS_WIN +bool +MessageChannel::WaitForSyncNotify(bool /* aHandleWindowsMessages */) +{ +#ifdef DEBUG + // WARNING: We don't release the lock here. We can't because the link thread + // could signal at this time and we would miss it. Instead we require + // ArtificialTimeout() to be extremely simple. + if (mListener->ArtificialTimeout()) { + return false; + } +#endif + + PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ? + PR_INTERVAL_NO_TIMEOUT : + PR_MillisecondsToInterval(mTimeoutMs); + // XXX could optimize away this syscall for "no timeout" case if desired + PRIntervalTime waitStart = PR_IntervalNow(); + + mMonitor->Wait(timeout); + + // If the timeout didn't expire, we know we received an event. The + // converse is not true. + return WaitResponse(IsTimeoutExpired(waitStart, timeout)); +} + +bool +MessageChannel::WaitForInterruptNotify() +{ + return WaitForSyncNotify(true); +} + +void +MessageChannel::NotifyWorkerThread() +{ + mMonitor->Notify(); +} +#endif + +bool +MessageChannel::ShouldContinueFromTimeout() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + bool cont; + { + MonitorAutoUnlock unlock(*mMonitor); + cont = mListener->ShouldContinueFromReplyTimeout(); + mListener->ArtificialSleep(); + } + + static enum { UNKNOWN, NOT_DEBUGGING, DEBUGGING } sDebuggingChildren = UNKNOWN; + + if (sDebuggingChildren == UNKNOWN) { + sDebuggingChildren = getenv("MOZ_DEBUG_CHILD_PROCESS") ? DEBUGGING : NOT_DEBUGGING; + } + if (sDebuggingChildren == DEBUGGING) { + return true; + } + + return cont; +} + +void +MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs) +{ + // Set channel timeout value. Since this is broken up into + // two period, the minimum timeout value is 2ms. + AssertWorkerThread(); + mTimeoutMs = (aTimeoutMs <= 0) + ? kNoTimeout + : (int32_t)ceil((double)aTimeoutMs / 2.0); +} + +void +MessageChannel::OnChannelConnected(int32_t peer_id) +{ + MOZ_RELEASE_ASSERT(!mPeerPidSet); + mPeerPidSet = true; + mPeerPid = peer_id; + RefPtr<CancelableRunnable> task = mOnChannelConnectedTask; + mWorkerLoop->PostTask(task.forget()); +} + +void +MessageChannel::DispatchOnChannelConnected() +{ + AssertWorkerThread(); + MOZ_RELEASE_ASSERT(mPeerPidSet); + mListener->OnChannelConnected(mPeerPid); +} + +void +MessageChannel::ReportMessageRouteError(const char* channelName) const +{ + PrintErrorMessage(mSide, channelName, "Need a route"); + mListener->ProcessingError(MsgRouteError, "MsgRouteError"); +} + +void +MessageChannel::ReportConnectionError(const char* aChannelName, Message* aMsg) const +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + const char* errorMsg = nullptr; + switch (mChannelState) { + case ChannelClosed: + errorMsg = "Closed channel: cannot send/recv"; + break; + case ChannelOpening: + errorMsg = "Opening channel: not yet ready for send/recv"; + break; + case ChannelTimeout: + errorMsg = "Channel timeout: cannot send/recv"; + break; + case ChannelClosing: + errorMsg = "Channel closing: too late to send/recv, messages will be lost"; + break; + case ChannelError: + errorMsg = "Channel error: cannot send/recv"; + break; + + default: + NS_RUNTIMEABORT("unreached"); + } + + if (aMsg) { + char reason[512]; + SprintfLiteral(reason,"(msgtype=0x%X,name=%s) %s", + aMsg->type(), aMsg->name(), errorMsg); + + PrintErrorMessage(mSide, aChannelName, reason); + } else { + PrintErrorMessage(mSide, aChannelName, errorMsg); + } + + MonitorAutoUnlock unlock(*mMonitor); + mListener->ProcessingError(MsgDropped, errorMsg); +} + +bool +MessageChannel::MaybeHandleError(Result code, const Message& aMsg, const char* channelName) +{ + if (MsgProcessed == code) + return true; + + const char* errorMsg = nullptr; + switch (code) { + case MsgNotKnown: + errorMsg = "Unknown message: not processed"; + break; + case MsgNotAllowed: + errorMsg = "Message not allowed: cannot be sent/recvd in this state"; + break; + case MsgPayloadError: + errorMsg = "Payload error: message could not be deserialized"; + break; + case MsgProcessingError: + errorMsg = "Processing error: message was deserialized, but the handler returned false (indicating failure)"; + break; + case MsgRouteError: + errorMsg = "Route error: message sent to unknown actor ID"; + break; + case MsgValueError: + errorMsg = "Value error: message was deserialized, but contained an illegal value"; + break; + + default: + NS_RUNTIMEABORT("unknown Result code"); + return false; + } + + char reason[512]; + const char* msgname = StringFromIPCMessageType(aMsg.type()); + if (msgname[0] == '?') { + SprintfLiteral(reason,"(msgtype=0x%X) %s", aMsg.type(), errorMsg); + } else { + SprintfLiteral(reason,"%s %s", msgname, errorMsg); + } + + PrintErrorMessage(mSide, channelName, reason); + + mListener->ProcessingError(code, reason); + + return false; +} + +void +MessageChannel::OnChannelErrorFromLink() +{ + AssertLinkThread(); + mMonitor->AssertCurrentThreadOwns(); + + IPC_LOG("OnChannelErrorFromLink"); + + if (InterruptStackDepth() > 0) + NotifyWorkerThread(); + + if (AwaitingSyncReply() || AwaitingIncomingMessage()) + NotifyWorkerThread(); + + if (ChannelClosing != mChannelState) { + if (mAbortOnError) { + NS_RUNTIMEABORT("Aborting on channel error."); + } + mChannelState = ChannelError; + mMonitor->Notify(); + } + + PostErrorNotifyTask(); +} + +void +MessageChannel::NotifyMaybeChannelError() +{ + mMonitor->AssertNotCurrentThreadOwns(); + + // TODO sort out Close() on this side racing with Close() on the other side + if (ChannelClosing == mChannelState) { + // the channel closed, but we received a "Goodbye" message warning us + // about it. no worries + mChannelState = ChannelClosed; + NotifyChannelClosed(); + return; + } + + Clear(); + + // Oops, error! Let the listener know about it. + mChannelState = ChannelError; + + // IPDL assumes these notifications do not fire twice, so we do not let + // that happen. + if (mNotifiedChannelDone) { + return; + } + mNotifiedChannelDone = true; + + // After this, the channel may be deleted. Based on the premise that + // mListener owns this channel, any calls back to this class that may + // work with mListener should still work on living objects. + mListener->OnChannelError(); +} + +void +MessageChannel::OnNotifyMaybeChannelError() +{ + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + + mChannelErrorTask = nullptr; + + // OnChannelError holds mMonitor when it posts this task and this + // task cannot be allowed to run until OnChannelError has + // exited. We enforce that order by grabbing the mutex here which + // should only continue once OnChannelError has completed. + { + MonitorAutoLock lock(*mMonitor); + // nothing to do here + } + + if (IsOnCxxStack()) { + mChannelErrorTask = + NewNonOwningCancelableRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError); + RefPtr<Runnable> task = mChannelErrorTask; + // 10 ms delay is completely arbitrary + mWorkerLoop->PostDelayedTask(task.forget(), 10); + return; + } + + NotifyMaybeChannelError(); +} + +void +MessageChannel::PostErrorNotifyTask() +{ + mMonitor->AssertCurrentThreadOwns(); + + if (mChannelErrorTask) + return; + + // This must be the last code that runs on this thread! + mChannelErrorTask = + NewNonOwningCancelableRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError); + RefPtr<Runnable> task = mChannelErrorTask; + mWorkerLoop->PostTask(task.forget()); +} + +// Special async message. +class GoodbyeMessage : public IPC::Message +{ +public: + GoodbyeMessage() : + IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE) + { + } + static bool Read(const Message* msg) { + return true; + } + void Log(const std::string& aPrefix, FILE* aOutf) const { + fputs("(special `Goodbye' message)", aOutf); + } +}; + +void +MessageChannel::SynchronouslyClose() +{ + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + mLink->SendClose(); + while (ChannelClosed != mChannelState) + mMonitor->Wait(); +} + +void +MessageChannel::CloseWithError() +{ + AssertWorkerThread(); + + MonitorAutoLock lock(*mMonitor); + if (ChannelConnected != mChannelState) { + return; + } + SynchronouslyClose(); + mChannelState = ChannelError; + PostErrorNotifyTask(); +} + +void +MessageChannel::CloseWithTimeout() +{ + AssertWorkerThread(); + + MonitorAutoLock lock(*mMonitor); + if (ChannelConnected != mChannelState) { + return; + } + SynchronouslyClose(); + mChannelState = ChannelTimeout; +} + +void +MessageChannel::Close() +{ + AssertWorkerThread(); + + { + MonitorAutoLock lock(*mMonitor); + + if (ChannelError == mChannelState || ChannelTimeout == mChannelState) { + // See bug 538586: if the listener gets deleted while the + // IO thread's NotifyChannelError event is still enqueued + // and subsequently deletes us, then the error event will + // also be deleted and the listener will never be notified + // of the channel error. + if (mListener) { + MonitorAutoUnlock unlock(*mMonitor); + NotifyMaybeChannelError(); + } + return; + } + + if (ChannelOpening == mChannelState) { + // SynchronouslyClose() waits for an ack from the other side, so + // the opening sequence should complete before this returns. + SynchronouslyClose(); + mChannelState = ChannelError; + NotifyMaybeChannelError(); + return; + } + + if (ChannelClosed == mChannelState) { + // XXX be strict about this until there's a compelling reason + // to relax + NS_RUNTIMEABORT("Close() called on closed channel!"); + } + + // Notify the other side that we're about to close our socket. If we've + // already received a Goodbye from the other side (and our state is + // ChannelClosing), there's no reason to send one. + if (ChannelConnected == mChannelState) { + mLink->SendMessage(new GoodbyeMessage()); + } + SynchronouslyClose(); + } + + NotifyChannelClosed(); +} + +void +MessageChannel::NotifyChannelClosed() +{ + mMonitor->AssertNotCurrentThreadOwns(); + + if (ChannelClosed != mChannelState) + NS_RUNTIMEABORT("channel should have been closed!"); + + Clear(); + + // IPDL assumes these notifications do not fire twice, so we do not let + // that happen. + if (mNotifiedChannelDone) { + return; + } + mNotifiedChannelDone = true; + + // OK, the IO thread just closed the channel normally. Let the + // listener know about it. After this point the channel may be + // deleted. + mListener->OnChannelClose(); +} + +void +MessageChannel::DebugAbort(const char* file, int line, const char* cond, + const char* why, + bool reply) +{ + printf_stderr("###!!! [MessageChannel][%s][%s:%d] " + "Assertion (%s) failed. %s %s\n", + mSide == ChildSide ? "Child" : "Parent", + file, line, cond, + why, + reply ? "(reply)" : ""); + // technically we need the mutex for this, but we're dying anyway + DumpInterruptStack(" "); + printf_stderr(" remote Interrupt stack guess: %" PRIuSIZE "\n", + mRemoteStackDepthGuess); + printf_stderr(" deferred stack size: %" PRIuSIZE "\n", + mDeferred.size()); + printf_stderr(" out-of-turn Interrupt replies stack size: %" PRIuSIZE "\n", + mOutOfTurnReplies.size()); + + MessageQueue pending = Move(mPending); + while (!pending.isEmpty()) { + printf_stderr(" [ %s%s ]\n", + pending.getFirst()->Msg().is_interrupt() ? "intr" : + (pending.getFirst()->Msg().is_sync() ? "sync" : "async"), + pending.getFirst()->Msg().is_reply() ? "reply" : ""); + pending.popFirst(); + } + + NS_RUNTIMEABORT(why); +} + +void +MessageChannel::DumpInterruptStack(const char* const pfx) const +{ + NS_WARNING_ASSERTION( + MessageLoop::current() != mWorkerLoop, + "The worker thread had better be paused in a debugger!"); + + printf_stderr("%sMessageChannel 'backtrace':\n", pfx); + + // print a python-style backtrace, first frame to last + for (uint32_t i = 0; i < mCxxStackFrames.length(); ++i) { + int32_t id; + const char* dir; + const char* sems; + const char* name; + mCxxStackFrames[i].Describe(&id, &dir, &sems, &name); + + printf_stderr("%s[(%u) %s %s %s(actor=%d) ]\n", pfx, + i, dir, sems, name, id); + } +} + +int32_t +MessageChannel::GetTopmostMessageRoutingId() const +{ + MOZ_RELEASE_ASSERT(MessageLoop::current() == mWorkerLoop); + if (mCxxStackFrames.empty()) { + return MSG_ROUTING_NONE; + } + const InterruptFrame& frame = mCxxStackFrames.back(); + return frame.GetRoutingId(); +} + +void +MessageChannel::EndTimeout() +{ + mMonitor->AssertCurrentThreadOwns(); + + IPC_LOG("Ending timeout of seqno=%d", mTimedOutMessageSeqno); + mTimedOutMessageSeqno = 0; + mTimedOutMessageNestedLevel = 0; + + RepostAllMessages(); +} + +void +MessageChannel::RepostAllMessages() +{ + bool needRepost = false; + for (RefPtr<MessageTask> task : mPending) { + if (!task->IsScheduled()) { + needRepost = true; + } + } + if (!needRepost) { + // If everything is already scheduled to run, do nothing. + return; + } + + // In some cases we may have deferred dispatch of some messages in the + // queue. Now we want to run them again. However, we can't just re-post + // those messages since the messages after them in mPending would then be + // before them in the event queue. So instead we cancel everything and + // re-post all messages in the correct order. + MessageQueue queue = Move(mPending); + while (RefPtr<MessageTask> task = queue.popFirst()) { + RefPtr<MessageTask> newTask = new MessageTask(this, Move(task->Msg())); + mPending.insertBack(newTask); + newTask->Post(); + } +} + +void +MessageChannel::CancelTransaction(int transaction) +{ + mMonitor->AssertCurrentThreadOwns(); + + // When we cancel a transaction, we need to behave as if there's no longer + // any IPC on the stack. Anything we were dispatching or sending will get + // canceled. Consequently, we have to update the state variables below. + // + // We also need to ensure that when any IPC functions on the stack return, + // they don't reset these values using an RAII class like AutoSetValue. To + // avoid that, these RAII classes check if the variable they set has been + // tampered with (by us). If so, they don't reset the variable to the old + // value. + + IPC_LOG("CancelTransaction: xid=%d", transaction); + + // An unusual case: We timed out a transaction which the other side then + // cancelled. In this case we just leave the timedout state and try to + // forget this ever happened. + if (transaction == mTimedOutMessageSeqno) { + IPC_LOG("Cancelled timed out message %d", mTimedOutMessageSeqno); + EndTimeout(); + + // Normally mCurrentTransaction == 0 here. But it can be non-zero if: + // 1. Parent sends NESTED_INSIDE_SYNC message H. + // 2. Parent times out H. + // 3. Child dispatches H and sends nested message H' (same transaction). + // 4. Parent dispatches H' and cancels. + MOZ_RELEASE_ASSERT(!mTransactionStack || mTransactionStack->TransactionID() == transaction); + if (mTransactionStack) { + mTransactionStack->Cancel(); + } + } else { + MOZ_RELEASE_ASSERT(mTransactionStack->TransactionID() == transaction); + mTransactionStack->Cancel(); + } + + bool foundSync = false; + for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) { + Message &msg = p->Msg(); + + // If there was a race between the parent and the child, then we may + // have a queued sync message. We want to drop this message from the + // queue since if will get cancelled along with the transaction being + // cancelled. This happens if the message in the queue is NESTED_INSIDE_SYNC. + if (msg.is_sync() && msg.nested_level() != IPC::Message::NOT_NESTED) { + MOZ_RELEASE_ASSERT(!foundSync); + MOZ_RELEASE_ASSERT(msg.transaction_id() != transaction); + IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg.seqno(), msg.transaction_id()); + foundSync = true; + p = p->removeAndGetNext(); + continue; + } + + p = p->getNext(); + } +} + +bool +MessageChannel::IsInTransaction() const +{ + MonitorAutoLock lock(*mMonitor); + return !!mTransactionStack; +} + +void +MessageChannel::CancelCurrentTransaction() +{ + MonitorAutoLock lock(*mMonitor); + if (DispatchingSyncMessageNestedLevel() >= IPC::Message::NESTED_INSIDE_SYNC) { + if (DispatchingSyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW || + DispatchingAsyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW) + { + mListener->IntentionalCrash(); + } + + IPC_LOG("Cancel requested: current xid=%d", CurrentNestedInsideSyncTransaction()); + MOZ_RELEASE_ASSERT(DispatchingSyncMessage()); + CancelMessage *cancel = new CancelMessage(CurrentNestedInsideSyncTransaction()); + CancelTransaction(CurrentNestedInsideSyncTransaction()); + mLink->SendMessage(cancel); + } +} + +void +CancelCPOWs() +{ + if (gParentProcessBlocker) { + mozilla::Telemetry::Accumulate(mozilla::Telemetry::IPC_TRANSACTION_CANCEL, true); + gParentProcessBlocker->CancelCurrentTransaction(); + } +} + +} // namespace ipc +} // namespace mozilla |