summaryrefslogtreecommitdiffstats
path: root/xpcom/threads/StateMirroring.h
blob: 87d94ba74727d7489df108310acfcadeed529b95 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#if !defined(StateMirroring_h_)
#define StateMirroring_h_

#include "mozilla/Maybe.h"
#include "mozilla/MozPromise.h"
#include "mozilla/StateWatching.h"
#include "mozilla/TaskDispatcher.h"
#include "mozilla/UniquePtr.h"
#include "mozilla/Unused.h"

#include "mozilla/Logging.h"
#include "nsISupportsImpl.h"

/*
 * The state-mirroring machinery allows pieces of interesting state to be
 * observed on multiple thread without locking. The basic strategy is to track
 * changes in a canonical value and post updates to other threads that hold
 * mirrors for that value.
 *
 * One problem with the naive implementation of such a system is that some pieces
 * of state need to be updated atomically, and certain other operations need to
 * wait for these atomic updates to complete before executing. The state-mirroring
 * machinery solves this problem by requiring that its owner thread uses tail
 * dispatch, and posting state update events (which should always be run first by
 * TaskDispatcher implementations) to that tail dispatcher. This ensures that
 * state changes are always atomic from the perspective of observing threads.
 *
 * Given that state-mirroring is an automatic background process, we try to avoid
 * burdening the caller with worrying too much about teardown. To that end, we
 * don't assert dispatch success for any of the notifications, and assume that
 * any canonical or mirror owned by a thread for whom dispatch fails will soon
 * be disconnected by its holder anyway.
 *
 * Given that semantics may change and comments tend to go out of date, we
 * deliberately don't provide usage examples here. Grep around to find them.
 */

namespace mozilla {

// Mirror<T> and Canonical<T> inherit WatchTarget, so we piggy-back on the
// logging that WatchTarget already does. Given that, it makes sense to share
// the same log module.
#define MIRROR_LOG(x, ...) \
  MOZ_ASSERT(gStateWatchingLog); \
  MOZ_LOG(gStateWatchingLog, LogLevel::Debug, (x, ##__VA_ARGS__))

template<typename T> class AbstractMirror;

/*
 * AbstractCanonical is a superclass from which all Canonical values must
 * inherit. It serves as the interface of operations which may be performed (via
 * asynchronous dispatch) by other threads, in particular by the corresponding
 * Mirror value.
 */
template<typename T>
class AbstractCanonical
{
public:
  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractCanonical)
  AbstractCanonical(AbstractThread* aThread) : mOwnerThread(aThread) {}
  virtual void AddMirror(AbstractMirror<T>* aMirror) = 0;
  virtual void RemoveMirror(AbstractMirror<T>* aMirror) = 0;

  AbstractThread* OwnerThread() const { return mOwnerThread; }
protected:
  virtual ~AbstractCanonical() {}
  RefPtr<AbstractThread> mOwnerThread;
};

/*
 * AbstractMirror is a superclass from which all Mirror values must
 * inherit. It serves as the interface of operations which may be performed (via
 * asynchronous dispatch) by other threads, in particular by the corresponding
 * Canonical value.
 */
template<typename T>
class AbstractMirror
{
public:
  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractMirror)
  AbstractMirror(AbstractThread* aThread) : mOwnerThread(aThread) {}
  virtual void UpdateValue(const T& aNewValue) = 0;
  virtual void NotifyDisconnected() = 0;

  AbstractThread* OwnerThread() const { return mOwnerThread; }
protected:
  virtual ~AbstractMirror() {}
  RefPtr<AbstractThread> mOwnerThread;
};

/*
 * Canonical<T> is a wrapper class that allows a given value to be mirrored by other
 * threads. It maintains a list of active mirrors, and queues updates for them
 * when the internal value changes. When changing the value, the caller needs to
 * pass a TaskDispatcher object, which fires the updates at the appropriate time.
 * Canonical<T> is also a WatchTarget, and may be set up to trigger other routines
 * (on the same thread) when the canonical value changes.
 *
 * Canonical<T> is intended to be used as a member variable, so it doesn't actually
 * inherit AbstractCanonical<T> (a refcounted type). Rather, it contains an inner
 * class called |Impl| that implements most of the interesting logic.
 */
template<typename T>
class Canonical
{
public:
  Canonical(AbstractThread* aThread, const T& aInitialValue, const char* aName)
  {
    mImpl = new Impl(aThread, aInitialValue, aName);
  }


  ~Canonical() {}

private:
  class Impl : public AbstractCanonical<T>, public WatchTarget
  {
  public:
    using AbstractCanonical<T>::OwnerThread;

    Impl(AbstractThread* aThread, const T& aInitialValue, const char* aName)
      : AbstractCanonical<T>(aThread), WatchTarget(aName), mValue(aInitialValue)
    {
      MIRROR_LOG("%s [%p] initialized", mName, this);
      MOZ_ASSERT(aThread->SupportsTailDispatch(), "Can't get coherency without tail dispatch");
    }

    void AddMirror(AbstractMirror<T>* aMirror) override
    {
      MIRROR_LOG("%s [%p] adding mirror %p", mName, this, aMirror);
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      MOZ_ASSERT(!mMirrors.Contains(aMirror));
      mMirrors.AppendElement(aMirror);
      aMirror->OwnerThread()->DispatchStateChange(MakeNotifier(aMirror));
    }

    void RemoveMirror(AbstractMirror<T>* aMirror) override
    {
      MIRROR_LOG("%s [%p] removing mirror %p", mName, this, aMirror);
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      MOZ_ASSERT(mMirrors.Contains(aMirror));
      mMirrors.RemoveElement(aMirror);
    }

    void DisconnectAll()
    {
      MIRROR_LOG("%s [%p] Disconnecting all mirrors", mName, this);
      for (size_t i = 0; i < mMirrors.Length(); ++i) {
        mMirrors[i]->OwnerThread()->Dispatch(NewRunnableMethod(mMirrors[i],
                                                               &AbstractMirror<T>::NotifyDisconnected),
                                             AbstractThread::DontAssertDispatchSuccess);
      }
      mMirrors.Clear();
    }

    operator const T&()
    {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      return mValue;
    }

    void Set(const T& aNewValue)
    {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());

      if (aNewValue == mValue) {
        return;
      }

      // Notify same-thread watchers. The state watching machinery will make sure
      // that notifications run at the right time.
      NotifyWatchers();

      // Check if we've already got a pending update. If so we won't schedule another
      // one.
      bool alreadyNotifying = mInitialValue.isSome();

      // Stash the initial value if needed, then update to the new value.
      if (mInitialValue.isNothing()) {
        mInitialValue.emplace(mValue);
      }
      mValue = aNewValue;

      // We wait until things have stablized before sending state updates so that
      // we can avoid sending multiple updates, and possibly avoid sending any
      // updates at all if the value ends up where it started.
      if (!alreadyNotifying) {
        AbstractThread::DispatchDirectTask(NewRunnableMethod(this, &Impl::DoNotify));
      }
    }

    Impl& operator=(const T& aNewValue) { Set(aNewValue); return *this; }
    Impl& operator=(const Impl& aOther) { Set(aOther); return *this; }
    Impl(const Impl& aOther) = delete;

  protected:
    ~Impl() { MOZ_DIAGNOSTIC_ASSERT(mMirrors.IsEmpty()); }

  private:
    void DoNotify()
    {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      MOZ_ASSERT(mInitialValue.isSome());
      bool same = mInitialValue.ref() == mValue;
      mInitialValue.reset();

      if (same) {
        MIRROR_LOG("%s [%p] unchanged - not sending update", mName, this);
        return;
      }

      for (size_t i = 0; i < mMirrors.Length(); ++i) {
        mMirrors[i]->OwnerThread()->DispatchStateChange(MakeNotifier(mMirrors[i]));
      }
    }

    already_AddRefed<nsIRunnable> MakeNotifier(AbstractMirror<T>* aMirror)
    {
      return NewRunnableMethod<T>(aMirror, &AbstractMirror<T>::UpdateValue, mValue);;
    }

    T mValue;
    Maybe<T> mInitialValue;
    nsTArray<RefPtr<AbstractMirror<T>>> mMirrors;
  };
public:

  // NB: Because mirror-initiated disconnection can race with canonical-
  // initiated disconnection, a canonical should never be reinitialized.
  // Forward control operations to the Impl.
  void DisconnectAll() { return mImpl->DisconnectAll(); }

  // Access to the Impl.
  operator Impl&() { return *mImpl; }
  Impl* operator&() { return mImpl; }

  // Access to the T.
  const T& Ref() const { return *mImpl; }
  operator const T&() const { return Ref(); }
  void Set(const T& aNewValue) { mImpl->Set(aNewValue); }
  Canonical& operator=(const T& aNewValue) { Set(aNewValue); return *this; }
  Canonical& operator=(const Canonical& aOther) { Set(aOther); return *this; }
  Canonical(const Canonical& aOther) = delete;

private:
  RefPtr<Impl> mImpl;
};

/*
 * Mirror<T> is a wrapper class that allows a given value to mirror that of a
 * Canonical<T> owned by another thread. It registers itself with a Canonical<T>,
 * and is periodically updated with new values. Mirror<T> is also a WatchTarget,
 * and may be set up to trigger other routines (on the same thread) when the
 * mirrored value changes.
 *
 * Mirror<T> is intended to be used as a member variable, so it doesn't actually
 * inherit AbstractMirror<T> (a refcounted type). Rather, it contains an inner
 * class called |Impl| that implements most of the interesting logic.
 */
template<typename T>
class Mirror
{
public:
  Mirror(AbstractThread* aThread, const T& aInitialValue, const char* aName)
  {
    mImpl = new Impl(aThread, aInitialValue, aName);
  }

  ~Mirror()
  {
    // As a member of complex objects, a Mirror<T> may be destroyed on a
    // different thread than its owner, or late in shutdown during CC. Given
    // that, we require manual disconnection so that callers can put things in
    // the right place.
    MOZ_DIAGNOSTIC_ASSERT(!mImpl->IsConnected());
  }

private:
  class Impl : public AbstractMirror<T>, public WatchTarget
  {
  public:
    using AbstractMirror<T>::OwnerThread;

    Impl(AbstractThread* aThread, const T& aInitialValue, const char* aName)
      : AbstractMirror<T>(aThread), WatchTarget(aName), mValue(aInitialValue)
    {
      MIRROR_LOG("%s [%p] initialized", mName, this);
      MOZ_ASSERT(aThread->SupportsTailDispatch(), "Can't get coherency without tail dispatch");
    }

    operator const T&()
    {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      return mValue;
    }

    virtual void UpdateValue(const T& aNewValue) override
    {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      if (mValue != aNewValue) {
        mValue = aNewValue;
        WatchTarget::NotifyWatchers();
      }
    }

    virtual void NotifyDisconnected() override
    {
      MIRROR_LOG("%s [%p] Notifed of disconnection from %p", mName, this, mCanonical.get());
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      mCanonical = nullptr;
    }

    bool IsConnected() const { return !!mCanonical; }

    void Connect(AbstractCanonical<T>* aCanonical)
    {
      MIRROR_LOG("%s [%p] Connecting to %p", mName, this, aCanonical);
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      MOZ_ASSERT(!IsConnected());
      MOZ_ASSERT(OwnerThread()->RequiresTailDispatch(aCanonical->OwnerThread()), "Can't get coherency without tail dispatch");

      nsCOMPtr<nsIRunnable> r = NewRunnableMethod<StorensRefPtrPassByPtr<AbstractMirror<T>>>
                                  (aCanonical, &AbstractCanonical<T>::AddMirror, this);
      aCanonical->OwnerThread()->Dispatch(r.forget(), AbstractThread::DontAssertDispatchSuccess);
      mCanonical = aCanonical;
    }
  public:

    void DisconnectIfConnected()
    {
      MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn());
      if (!IsConnected()) {
        return;
      }

      MIRROR_LOG("%s [%p] Disconnecting from %p", mName, this, mCanonical.get());
      nsCOMPtr<nsIRunnable> r = NewRunnableMethod<StorensRefPtrPassByPtr<AbstractMirror<T>>>
                                  (mCanonical, &AbstractCanonical<T>::RemoveMirror, this);
      mCanonical->OwnerThread()->Dispatch(r.forget(), AbstractThread::DontAssertDispatchSuccess);
      mCanonical = nullptr;
    }

  protected:
    ~Impl() { MOZ_DIAGNOSTIC_ASSERT(!IsConnected()); }

  private:
    T mValue;
    RefPtr<AbstractCanonical<T>> mCanonical;
  };
public:

  // Forward control operations to the Impl<T>.
  void Connect(AbstractCanonical<T>* aCanonical) { mImpl->Connect(aCanonical); }
  void DisconnectIfConnected() { mImpl->DisconnectIfConnected(); }

  // Access to the Impl<T>.
  operator Impl&() { return *mImpl; }
  Impl* operator&() { return mImpl; }

  // Access to the T.
  const T& Ref() const { return *mImpl; }
  operator const T&() const { return Ref(); }

private:
  RefPtr<Impl> mImpl;
};

#undef MIRROR_LOG

} // namespace mozilla

#endif