summaryrefslogtreecommitdiffstats
path: root/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
blob: 733e69da5ff20b1e4047bbb6c6d79b028d219f42 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

package org.mozilla.gecko.sync.stage;

import android.content.Context;

import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.EngineSettings;
import org.mozilla.gecko.sync.GlobalSession;
import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.MetaGlobalException;
import org.mozilla.gecko.sync.NoCollectionKeysSetException;
import org.mozilla.gecko.sync.NonObjectJSONException;
import org.mozilla.gecko.sync.SynchronizerConfiguration;
import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.crypto.KeyBundle;
import org.mozilla.gecko.sync.delegates.WipeServerDelegate;
import org.mozilla.gecko.sync.middleware.Crypto5MiddlewareRepository;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.BaseResource;
import org.mozilla.gecko.sync.net.SyncStorageRequest;
import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
import org.mozilla.gecko.sync.repositories.Server11Repository;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
import org.mozilla.gecko.sync.synchronizer.ServerLocalSynchronizer;
import org.mozilla.gecko.sync.synchronizer.Synchronizer;
import org.mozilla.gecko.sync.synchronizer.SynchronizerDelegate;
import org.mozilla.gecko.sync.synchronizer.SynchronizerSession;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
 * Fetch from a server collection into a local repository, encrypting
 * and decrypting along the way.
 *
 * @author rnewman
 *
 */
public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage implements SynchronizerDelegate {

  protected static final String LOG_TAG = "ServerSyncStage";

  protected long stageStartTimestamp = -1;
  protected long stageCompleteTimestamp = -1;

  /**
   * Override these in your subclasses.
   *
   * @return true if this stage should be executed.
   * @throws MetaGlobalException
   */
  protected boolean isEnabled() throws MetaGlobalException {
    EngineSettings engineSettings = null;
    try {
       engineSettings = getEngineSettings();
    } catch (Exception e) {
      Logger.warn(LOG_TAG, "Unable to get engine settings for " + this + ": fetching config failed.", e);
      // Fall through; null engineSettings will pass below.
    }

    // We can be disabled by the server's meta/global record, or malformed in the server's meta/global record,
    // or by the user manually in Sync Settings.
    // We catch the subclasses of MetaGlobalException to trigger various resets and wipes in execute().
    boolean enabledInMetaGlobal = session.isEngineRemotelyEnabled(this.getEngineName(), engineSettings);

    // Check for manual changes to engines by the user.
    checkAndUpdateUserSelectedEngines(enabledInMetaGlobal);

    // Check for changes on the server.
    if (!enabledInMetaGlobal) {
      Logger.debug(LOG_TAG, "Stage " + this.getEngineName() + " disabled by server meta/global.");
      return false;
    }

    // We can also be disabled just for this sync.
    boolean enabledThisSync = session.isEngineLocallyEnabled(this.getEngineName()); // For ServerSyncStage, stage name == engine name.
    if (!enabledThisSync) {
      Logger.debug(LOG_TAG, "Stage " + this.getEngineName() + " disabled just for this sync.");
    }
    return enabledThisSync;
  }

  /**
   * Compares meta/global engine state to user selected engines from Sync
   * Settings and throws an exception if they don't match and meta/global needs
   * to be updated.
   *
   * @param enabledInMetaGlobal
   *          boolean of engine sync state in meta/global
   * @throws MetaGlobalException
   *           if engine sync state has been changed in Sync Settings, with new
   *           engine sync state.
   */
  protected void checkAndUpdateUserSelectedEngines(boolean enabledInMetaGlobal) throws MetaGlobalException {
    Map<String, Boolean> selectedEngines = session.config.userSelectedEngines;
    String thisEngine = this.getEngineName();

    if (selectedEngines != null && selectedEngines.containsKey(thisEngine)) {
      boolean enabledInSelection = selectedEngines.get(thisEngine);
      if (enabledInMetaGlobal != enabledInSelection) {
        // Engine enable state has been changed by the user.
        Logger.debug(LOG_TAG, "Engine state has been changed by user. Throwing exception.");
        throw new MetaGlobalException.MetaGlobalEngineStateChangedException(enabledInSelection);
      }
    }
  }

  protected EngineSettings getEngineSettings() throws NonObjectJSONException, IOException {
    Integer version = getStorageVersion();
    if (version == null) {
      Logger.warn(LOG_TAG, "null storage version for " + this + "; using version 0.");
      version = 0;
    }

    SynchronizerConfiguration config = this.getConfig();
    if (config == null) {
      return new EngineSettings(null, version);
    }
    return new EngineSettings(config.syncID, version);
  }

  protected abstract String getCollection();
  protected abstract String getEngineName();
  protected abstract Repository getLocalRepository();
  protected abstract RecordFactory getRecordFactory();

  // Override this in subclasses.
  protected Repository getRemoteRepository() throws URISyntaxException {
    String collection = getCollection();
    return new Server11Repository(collection,
                                  session.config.storageURL(),
                                  session.getAuthHeaderProvider(),
                                  session.config.infoCollections,
                                  session.config.infoConfiguration);
  }

  /**
   * Return a Crypto5Middleware-wrapped Server11Repository.
   *
   * @throws NoCollectionKeysSetException
   * @throws URISyntaxException
   */
  protected Repository wrappedServerRepo() throws NoCollectionKeysSetException, URISyntaxException {
    String collection = this.getCollection();
    KeyBundle collectionKey = session.keyBundleForCollection(collection);
    Crypto5MiddlewareRepository cryptoRepo = new Crypto5MiddlewareRepository(getRemoteRepository(), collectionKey);
    cryptoRepo.recordFactory = getRecordFactory();
    return cryptoRepo;
  }

  protected String bundlePrefix() {
    return this.getCollection() + ".";
  }

  protected SynchronizerConfiguration getConfig() throws NonObjectJSONException, IOException {
    return new SynchronizerConfiguration(session.config.getBranch(bundlePrefix()));
  }

  protected void persistConfig(SynchronizerConfiguration synchronizerConfiguration) {
    synchronizerConfiguration.persist(session.config.getBranch(bundlePrefix()));
  }

  public Synchronizer getConfiguredSynchronizer(GlobalSession session) throws NoCollectionKeysSetException, URISyntaxException, NonObjectJSONException, IOException {
    Repository remote = wrappedServerRepo();

    Synchronizer synchronizer = new ServerLocalSynchronizer();
    synchronizer.repositoryA = remote;
    synchronizer.repositoryB = this.getLocalRepository();
    synchronizer.load(getConfig());

    return synchronizer;
  }

  /**
   * Reset timestamps.
   */
  @Override
  protected void resetLocal() {
    resetLocalWithSyncID(null);
  }

  /**
   * Reset timestamps and possibly set syncID.
   * @param syncID if non-null, new syncID to persist.
   */
  protected void resetLocalWithSyncID(String syncID) {
    // Clear both timestamps.
    SynchronizerConfiguration config;
    try {
      config = this.getConfig();
    } catch (Exception e) {
      Logger.warn(LOG_TAG, "Unable to reset " + this + ": fetching config failed.", e);
      return;
    }

    if (syncID != null) {
      config.syncID = syncID;
      Logger.info(LOG_TAG, "Setting syncID for " + this + " to '" + syncID + "'.");
    }
    config.localBundle.setTimestamp(0L);
    config.remoteBundle.setTimestamp(0L);
    persistConfig(config);
    Logger.info(LOG_TAG, "Reset timestamps for " + this);
  }

  // Not thread-safe. Use with caution.
  private class WipeWaiter {
    public boolean sessionSucceeded = true;
    public boolean wipeSucceeded = true;
    public Exception error;

    public void notify(Exception e, boolean sessionSucceeded) {
      this.sessionSucceeded = sessionSucceeded;
      this.wipeSucceeded = false;
      this.error = e;
      this.notify();
    }
  }

  /**
   * Synchronously wipe this stage by instantiating a local repository session
   * and wiping that.
   * <p>
   * Logs and re-throws an exception on failure.
   */
  @Override
  protected void wipeLocal() throws Exception {
    // Reset, then clear data.
    this.resetLocal();

    final WipeWaiter monitor = new WipeWaiter();
    final Context context = session.getContext();
    final Repository r = this.getLocalRepository();

    final Runnable doWipe = new Runnable() {
      @Override
      public void run() {
        r.createSession(new RepositorySessionCreationDelegate() {

          @Override
          public void onSessionCreated(final RepositorySession session) {
            try {
              session.begin(new RepositorySessionBeginDelegate() {

                @Override
                public void onBeginSucceeded(final RepositorySession session) {
                  session.wipe(new RepositorySessionWipeDelegate() {
                    @Override
                    public void onWipeSucceeded() {
                      try {
                        session.finish(new RepositorySessionFinishDelegate() {

                          @Override
                          public void onFinishSucceeded(RepositorySession session,
                                                        RepositorySessionBundle bundle) {
                            // Hurrah.
                            synchronized (monitor) {
                              monitor.notify();
                            }
                          }

                          @Override
                          public void onFinishFailed(Exception ex) {
                            // Assume that no finish => no wipe.
                            synchronized (monitor) {
                              monitor.notify(ex, true);
                            }
                          }

                          @Override
                          public RepositorySessionFinishDelegate deferredFinishDelegate(ExecutorService executor) {
                            return this;
                          }
                        });
                      } catch (InactiveSessionException e) {
                        // Cannot happen. Call for safety.
                        synchronized (monitor) {
                          monitor.notify(e, true);
                        }
                      }
                    }

                    @Override
                    public void onWipeFailed(Exception ex) {
                      session.abort();
                      synchronized (monitor) {
                        monitor.notify(ex, true);
                      }
                    }

                    @Override
                    public RepositorySessionWipeDelegate deferredWipeDelegate(ExecutorService executor) {
                      return this;
                    }
                  });
                }

                @Override
                public void onBeginFailed(Exception ex) {
                  session.abort();
                  synchronized (monitor) {
                    monitor.notify(ex, true);
                  }
                }

                @Override
                public RepositorySessionBeginDelegate deferredBeginDelegate(ExecutorService executor) {
                  return this;
                }
              });
            } catch (InvalidSessionTransitionException e) {
              session.abort();
              synchronized (monitor) {
                monitor.notify(e, true);
              }
            }
          }

          @Override
          public void onSessionCreateFailed(Exception ex) {
            synchronized (monitor) {
              monitor.notify(ex, false);
            }
          }

          @Override
          public RepositorySessionCreationDelegate deferredCreationDelegate() {
            return this;
          }
        }, context);
      }
    };

    final Thread wiping = new Thread(doWipe);
    synchronized (monitor) {
      wiping.start();
      try {
        monitor.wait();
      } catch (InterruptedException e) {
        Logger.error(LOG_TAG, "Wipe interrupted.");
      }
    }

    if (!monitor.sessionSucceeded) {
      Logger.error(LOG_TAG, "Failed to create session for wipe.");
      throw monitor.error;
    }

    if (!monitor.wipeSucceeded) {
      Logger.error(LOG_TAG, "Failed to wipe session.");
      throw monitor.error;
    }

    Logger.info(LOG_TAG, "Wiping stage complete.");
  }

  /**
   * Asynchronously wipe collection on server.
   */
  protected void wipeServer(final AuthHeaderProvider authHeaderProvider, final WipeServerDelegate wipeDelegate) {
    SyncStorageRequest request;

    try {
      request = new SyncStorageRequest(session.config.collectionURI(getCollection()));
    } catch (URISyntaxException ex) {
      Logger.warn(LOG_TAG, "Invalid URI in wipeServer.");
      wipeDelegate.onWipeFailed(ex);
      return;
    }

    request.delegate = new SyncStorageRequestDelegate() {

      @Override
      public String ifUnmodifiedSince() {
        return null;
      }

      @Override
      public void handleRequestSuccess(SyncStorageResponse response) {
        BaseResource.consumeEntity(response);
        resetLocal();
        wipeDelegate.onWiped(response.normalizedWeaveTimestamp());
      }

      @Override
      public void handleRequestFailure(SyncStorageResponse response) {
        Logger.warn(LOG_TAG, "Got request failure " + response.getStatusCode() + " in wipeServer.");
        // Process HTTP failures here to pick up backoffs, etc.
        session.interpretHTTPFailure(response.httpResponse());
        BaseResource.consumeEntity(response); // The exception thrown should not need the body of the response.
        wipeDelegate.onWipeFailed(new HTTPFailureException(response));
      }

      @Override
      public void handleRequestError(Exception ex) {
        Logger.warn(LOG_TAG, "Got exception in wipeServer.", ex);
        wipeDelegate.onWipeFailed(ex);
      }

      @Override
      public AuthHeaderProvider getAuthHeaderProvider() {
        return authHeaderProvider;
      }
    };

    request.delete();
  }

  /**
   * Synchronously wipe the server.
   * <p>
   * Logs and re-throws an exception on failure.
   */
  public void wipeServer(final GlobalSession session) throws Exception {
    this.session = session;

    final WipeWaiter monitor = new WipeWaiter();

    final Runnable doWipe = new Runnable() {
      @Override
      public void run() {
        wipeServer(session.getAuthHeaderProvider(), new WipeServerDelegate() {
          @Override
          public void onWiped(long timestamp) {
            synchronized (monitor) {
              monitor.notify();
            }
          }

          @Override
          public void onWipeFailed(Exception e) {
            synchronized (monitor) {
              monitor.notify(e, false);
            }
          }
        });
      }
    };

    final Thread wiping = new Thread(doWipe);
    synchronized (monitor) {
      wiping.start();
      try {
        monitor.wait();
      } catch (InterruptedException e) {
        Logger.error(LOG_TAG, "Server wipe interrupted.");
      }
    }

    if (!monitor.wipeSucceeded) {
      Logger.error(LOG_TAG, "Failed to wipe server.");
      throw monitor.error;
    }

    Logger.info(LOG_TAG, "Wiping server complete.");
  }

  @Override
  public void execute() throws NoSuchStageException {
    final String name = getEngineName();
    Logger.debug(LOG_TAG, "Starting execute for " + name);

    stageStartTimestamp = System.currentTimeMillis();

    try {
      if (!this.isEnabled()) {
        Logger.info(LOG_TAG, "Skipping stage " + name + ".");
        session.advance();
        return;
      }
    } catch (MetaGlobalException.MetaGlobalMalformedSyncIDException e) {
      // Bad engine syncID. This should never happen. Wipe the server.
      try {
        session.recordForMetaGlobalUpdate(name, new EngineSettings(Utils.generateGuid(), this.getStorageVersion()));
        Logger.info(LOG_TAG, "Wiping server because malformed engine sync ID was found in meta/global.");
        wipeServer(session);
        Logger.info(LOG_TAG, "Wiped server after malformed engine sync ID found in meta/global.");
      } catch (Exception ex) {
        session.abort(ex, "Failed to wipe server after malformed engine sync ID found in meta/global.");
      }
    } catch (MetaGlobalException.MetaGlobalMalformedVersionException e) {
      // Bad engine version. This should never happen. Wipe the server.
      try {
        session.recordForMetaGlobalUpdate(name, new EngineSettings(Utils.generateGuid(), this.getStorageVersion()));
        Logger.info(LOG_TAG, "Wiping server because malformed engine version was found in meta/global.");
        wipeServer(session);
        Logger.info(LOG_TAG, "Wiped server after malformed engine version found in meta/global.");
      } catch (Exception ex) {
        session.abort(ex, "Failed to wipe server after malformed engine version found in meta/global.");
      }
    } catch (MetaGlobalException.MetaGlobalStaleClientSyncIDException e) {
      // Our syncID is wrong. Reset client and take the server syncID.
      Logger.warn(LOG_TAG, "Remote engine syncID different from local engine syncID:" +
                           " resetting local engine and assuming remote engine syncID.");
      this.resetLocalWithSyncID(e.serverSyncID);
    } catch (MetaGlobalException.MetaGlobalEngineStateChangedException e) {
      boolean isEnabled = e.isEnabled;
      if (!isEnabled) {
        // Engine has been disabled; update meta/global with engine removal for upload.
        session.removeEngineFromMetaGlobal(name);
        session.config.declinedEngineNames.add(name);
      } else {
        session.config.declinedEngineNames.remove(name);
        // Add engine with new syncID to meta/global for upload.
        String newSyncID = Utils.generateGuid();
        session.recordForMetaGlobalUpdate(name, new EngineSettings(newSyncID, this.getStorageVersion()));
        // Update SynchronizerConfiguration w/ new engine syncID.
        this.resetLocalWithSyncID(newSyncID);
      }
      try {
        // Engine sync status has changed. Wipe server.
        Logger.warn(LOG_TAG, "Wiping server because engine sync state changed.");
        wipeServer(session);
        Logger.warn(LOG_TAG, "Wiped server because engine sync state changed.");
      } catch (Exception ex) {
        session.abort(ex, "Failed to wipe server after engine sync state changed");
      }
      if (!isEnabled) {
        Logger.warn(LOG_TAG, "Stage has been disabled. Advancing to next stage.");
        session.advance();
        return;
      }
    } catch (MetaGlobalException e) {
      session.abort(e, "Inappropriate meta/global; refusing to execute " + name + " stage.");
      return;
    }

    Synchronizer synchronizer;
    try {
      synchronizer = this.getConfiguredSynchronizer(session);
    } catch (NoCollectionKeysSetException e) {
      session.abort(e, "No CollectionKeys.");
      return;
    } catch (URISyntaxException e) {
      session.abort(e, "Invalid URI syntax for server repository.");
      return;
    } catch (NonObjectJSONException | IOException e) {
      session.abort(e, "Invalid persisted JSON for config.");
      return;
    }

    Logger.debug(LOG_TAG, "Invoking synchronizer.");
    synchronizer.synchronize(session.getContext(), this);
    Logger.debug(LOG_TAG, "Reached end of execute.");
  }

  /**
   * Express the duration taken by this stage as a String, like "0.56 seconds".
   *
   * @return formatted string.
   */
  protected String getStageDurationString() {
    return Utils.formatDuration(stageStartTimestamp, stageCompleteTimestamp);
  }

  /**
   * We synced this engine!  Persist timestamps and advance the session.
   *
   * @param synchronizer the <code>Synchronizer</code> that succeeded.
   */
  @Override
  public void onSynchronized(Synchronizer synchronizer) {
    stageCompleteTimestamp = System.currentTimeMillis();
    Logger.debug(LOG_TAG, "onSynchronized.");

    SynchronizerConfiguration newConfig = synchronizer.save();
    if (newConfig != null) {
      persistConfig(newConfig);
    } else {
      Logger.warn(LOG_TAG, "Didn't get configuration from synchronizer after success.");
    }

    final SynchronizerSession synchronizerSession = synchronizer.getSynchronizerSession();
    int inboundCount = synchronizerSession.getInboundCount();
    int outboundCount = synchronizerSession.getOutboundCount();
    Logger.info(LOG_TAG, "Stage " + getEngineName() +
        " received " + inboundCount + " and sent " + outboundCount +
        " records in " + getStageDurationString() + ".");
    Logger.info(LOG_TAG, "Advancing session.");
    session.advance();
  }

  /**
   * We failed to sync this engine! Do not persist timestamps (which means that
   * the next sync will include this sync's data), but do advance the session
   * (if we didn't get a Retry-After header).
   *
   * @param synchronizer the <code>Synchronizer</code> that failed.
   */
  @Override
  public void onSynchronizeFailed(Synchronizer synchronizer,
                                  Exception lastException, String reason) {
    stageCompleteTimestamp = System.currentTimeMillis();
    Logger.warn(LOG_TAG, "Synchronize failed: " + reason, lastException);

    // This failure could be due to a 503 or a 401 and it could have headers.
    // Interrogate the headers but only abort the global session if Retry-After header is set.
    if (lastException instanceof HTTPFailureException) {
      SyncStorageResponse response = ((HTTPFailureException)lastException).response;
      if (response.retryAfterInSeconds() > 0) {
        session.handleHTTPError(response, reason); // Calls session.abort().
        return;
      } else {
        session.interpretHTTPFailure(response.httpResponse()); // Does not call session.abort().
      }
    }

    Logger.info(LOG_TAG, "Advancing session even though stage failed (took " + getStageDurationString() +
        "). Timestamps not persisted.");
    session.advance();
  }
}