summaryrefslogtreecommitdiffstats
path: root/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SynchronizerSession.java
blob: c4d244b4cca881939703500b5c9e619c4eb2cde8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

package org.mozilla.gecko.sync.synchronizer;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
import org.mozilla.gecko.sync.repositories.delegates.DeferrableRepositorySessionCreationDelegate;
import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionFinishDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;

import android.content.Context;

/**
 * I coordinate the moving parts of a sync started by
 * {@link Synchronizer#synchronize}.
 *
 * I flow records twice: first from A to B, and then from B to A. I provide
 * fine-grained feedback by calling my delegate's callback methods.
 *
 * Initialize me by creating me with a Synchronizer and a
 * SynchronizerSessionDelegate. Kick things off by calling `init` with two
 * RepositorySessionBundles, and then call `synchronize` in your `onInitialized`
 * callback.
 *
 * I always call exactly one of my delegate's `onInitialized` or
 * `onSessionError` callback methods from `init`.
 *
 * I call my delegate's `onSynchronizeSkipped` callback method if there is no
 * data to be synchronized in `synchronize`.
 *
 * In addition, I call `onFetchError`, `onStoreError`, and `onSessionError` when
 * I encounter a fetch, store, or session error while synchronizing.
 *
 * Typically my delegate will call `abort` in its error callbacks, which will
 * call my delegate's `onSynchronizeAborted` method and halt the sync.
 *
 * I always call exactly one of my delegate's `onSynchronized` or
 * `onSynchronizeFailed` callback methods if I have not seen an error.
 */
public class SynchronizerSession
extends DeferrableRepositorySessionCreationDelegate
implements RecordsChannelDelegate,
           RepositorySessionFinishDelegate {

  protected static final String LOG_TAG = "SynchronizerSession";
  protected Synchronizer synchronizer;
  protected SynchronizerSessionDelegate delegate;
  protected Context context;

  /*
   * Computed during init.
   */
  private RepositorySession sessionA;
  private RepositorySession sessionB;
  private RepositorySessionBundle bundleA;
  private RepositorySessionBundle bundleB;

  // Bug 726054: just like desktop, we track our last interaction with the server,
  // not the last record timestamp that we fetched. This ensures that we don't re-
  // download the records we just uploaded, at the cost of skipping any records
  // that a concurrently syncing client has uploaded.
  private long pendingATimestamp = -1;
  private long pendingBTimestamp = -1;
  private long storeEndATimestamp = -1;
  private long storeEndBTimestamp = -1;
  private boolean flowAToBCompleted = false;
  private boolean flowBToACompleted = false;

  protected final AtomicInteger numInboundRecords = new AtomicInteger(-1);
  protected final AtomicInteger numOutboundRecords = new AtomicInteger(-1);

  /*
   * Public API: constructor, init, synchronize.
   */
  public SynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
    this.setSynchronizer(synchronizer);
    this.delegate = delegate;
  }

  public Synchronizer getSynchronizer() {
    return synchronizer;
  }

  public void setSynchronizer(Synchronizer synchronizer) {
    this.synchronizer = synchronizer;
  }

  public void init(Context context, RepositorySessionBundle bundleA, RepositorySessionBundle bundleB) {
    this.context = context;
    this.bundleA = bundleA;
    this.bundleB = bundleB;
    // Begin sessionA and sessionB, call onInitialized in callbacks.
    this.getSynchronizer().repositoryA.createSession(this, context);
  }

  /**
   * Get the number of records fetched from the first repository (usually the
   * server, hence inbound).
   * <p>
   * Valid only after first flow has completed.
   *
   * @return number of records, or -1 if not valid.
   */
  public int getInboundCount() {
    return numInboundRecords.get();
  }

  /**
   * Get the number of records fetched from the second repository (usually the
   * local store, hence outbound).
   * <p>
   * Valid only after second flow has completed.
   *
   * @return number of records, or -1 if not valid.
   */
  public int getOutboundCount() {
    return numOutboundRecords.get();
  }

  // These are accessed by `abort` and `synchronize`, both of which are synchronized.
  // Guarded by `this`.
  protected RecordsChannel channelAToB;
  protected RecordsChannel channelBToA;

  /**
   * Please don't call this until you've been notified with onInitialized.
   */
  public synchronized void synchronize() {
    numInboundRecords.set(-1);
    numOutboundRecords.set(-1);

    // First thing: decide whether we should.
    if (sessionA.shouldSkip() ||
        sessionB.shouldSkip()) {
      Logger.info(LOG_TAG, "Session requested skip. Short-circuiting sync.");
      sessionA.abort();
      sessionB.abort();
      this.delegate.onSynchronizeSkipped(this);
      return;
    }

    final SynchronizerSession session = this;

    // TODO: failed record handling.

    // This is the *second* record channel to flow.
    // I, SynchronizerSession, am the delegate for the *second* flow.
    channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this);

    // This is the delegate for the *first* flow.
    RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() {
      @Override
      public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
        session.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd);
      }

      @Override
      public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
        Logger.warn(LOG_TAG, "First RecordsChannel onFlowBeginFailed. Logging session error.", ex);
        session.delegate.onSynchronizeFailed(session, ex, "Failed to begin first flow.");
      }

      @Override
      public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
        Logger.warn(LOG_TAG, "First RecordsChannel onFlowFetchFailed. Logging remote fetch error.", ex);
      }

      @Override
      public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
        Logger.warn(LOG_TAG, "First RecordsChannel onFlowStoreFailed. Logging local store error.", ex);
      }

      @Override
      public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
        Logger.warn(LOG_TAG, "First RecordsChannel onFlowFinishedFailed. Logging session error.", ex);
        session.delegate.onSynchronizeFailed(session, ex, "Failed to finish first flow.");
      }
    };

    // This is the *first* channel to flow.
    channelAToB = new RecordsChannel(this.sessionA, this.sessionB, channelAToBDelegate);

    Logger.trace(LOG_TAG, "Starting A to B flow. Channel is " + channelAToB);
    try {
      channelAToB.beginAndFlow();
    } catch (InvalidSessionTransitionException e) {
      onFlowBeginFailed(channelAToB, e);
    }
  }

  /**
   * Called after the first flow completes.
   * <p>
   * By default, any fetch and store failures are ignored.
   * @param recordsChannel the <code>RecordsChannel</code> (for error testing).
   * @param fetchEnd timestamp when fetches completed.
   * @param storeEnd timestamp when stores completed.
   */
  public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
    Logger.trace(LOG_TAG, "First RecordsChannel onFlowCompleted.");
    Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Starting next.");
    pendingATimestamp = fetchEnd;
    storeEndBTimestamp = storeEnd;
    numInboundRecords.set(recordsChannel.getFetchCount());
    flowAToBCompleted = true;
    channelBToA.flow();
  }

  /**
   * Called after the second flow completes.
   * <p>
   * By default, any fetch and store failures are ignored.
   * @param recordsChannel the <code>RecordsChannel</code> (for error testing).
   * @param fetchEnd timestamp when fetches completed.
   * @param storeEnd timestamp when stores completed.
   */
  public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
    Logger.trace(LOG_TAG, "Second RecordsChannel onFlowCompleted.");
    Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Finishing.");

    pendingBTimestamp = fetchEnd;
    storeEndATimestamp = storeEnd;
    numOutboundRecords.set(recordsChannel.getFetchCount());
    flowBToACompleted = true;

    // Finish the two sessions.
    try {
      this.sessionA.finish(this);
    } catch (InactiveSessionException e) {
      this.onFinishFailed(e);
      return;
    }
  }

  @Override
  public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
    onSecondFlowCompleted(recordsChannel, fetchEnd, storeEnd);
  }

  @Override
  public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
    Logger.warn(LOG_TAG, "Second RecordsChannel onFlowBeginFailed. Logging session error.", ex);
    this.delegate.onSynchronizeFailed(this, ex, "Failed to begin second flow.");
  }

  @Override
  public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
    Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFetchFailed. Logging local fetch error.", ex);
  }

  @Override
  public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
    Logger.warn(LOG_TAG, "Second RecordsChannel onFlowStoreFailed. Logging remote store error.", ex);
  }

  @Override
  public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
    Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFinishedFailed. Logging session error.", ex);
    this.delegate.onSynchronizeFailed(this, ex, "Failed to finish second flow.");
  }

  /*
   * RepositorySessionCreationDelegate methods.
   */

  /**
   * I could be called twice: once for sessionA and once for sessionB.
   *
   * I try to clean up sessionA if it is not null, since the creation of
   * sessionB must have failed.
   */
  @Override
  public void onSessionCreateFailed(Exception ex) {
    // Attempt to finish the first session, if the second is the one that failed.
    if (this.sessionA != null) {
      try {
        // We no longer need a reference to our context.
        this.context = null;
        this.sessionA.finish(this);
      } catch (Exception e) {
        // Never mind; best-effort finish.
      }
    }
    // We no longer need a reference to our context.
    this.context = null;
    this.delegate.onSynchronizeFailed(this, ex, "Failed to create session");
  }

  /**
   * I should be called twice: first for sessionA and second for sessionB.
   *
   * If I am called for sessionB, I call my delegate's `onInitialized` callback
   * method because my repository sessions are correctly initialized.
   */
  // TODO: some of this "finish and clean up" code can be refactored out.
  @Override
  public void onSessionCreated(RepositorySession session) {
    if (session == null ||
        this.sessionA == session) {
      // TODO: clean up sessionA.
      this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session.");
      return;
    }
    if (this.sessionA == null) {
      this.sessionA = session;

      // Unbundle.
      try {
        this.sessionA.unbundle(this.bundleA);
      } catch (Exception e) {
        this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle first session.");
        // TODO: abort
        return;
      }
      this.getSynchronizer().repositoryB.createSession(this, this.context);
      return;
    }
    if (this.sessionB == null) {
      this.sessionB = session;
      // We no longer need a reference to our context.
      this.context = null;

      // Unbundle. We unbundled sessionA when that session was created.
      try {
        this.sessionB.unbundle(this.bundleB);
      } catch (Exception e) {
        this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle second session.");
        return;
      }

      this.delegate.onInitialized(this);
      return;
    }
    // TODO: need a way to make sure we don't call any more delegate methods.
    this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session.");
  }

  /*
   * RepositorySessionFinishDelegate methods.
   */

  /**
   * I could be called twice: once for sessionA and once for sessionB.
   *
   * If sessionB couldn't be created, I don't fail again.
   */
  @Override
  public void onFinishFailed(Exception ex) {
    if (this.sessionB == null) {
      // Ah, it was a problem cleaning up. Never mind.
      Logger.warn(LOG_TAG, "Got exception cleaning up first after second session creation failed.", ex);
      return;
    }
    String session = (this.sessionA == null) ? "B" : "A";
    this.delegate.onSynchronizeFailed(this, ex, "Finish of session " + session + " failed.");
  }

  /**
   * I should be called twice: first for sessionA and second for sessionB.
   *
   * If I am called for sessionA, I try to finish sessionB.
   *
   * If I am called for sessionB, I call my delegate's `onSynchronized` callback
   * method because my flows should have completed.
   */
  @Override
  public void onFinishSucceeded(RepositorySession session,
                                RepositorySessionBundle bundle) {
    Logger.debug(LOG_TAG, "onFinishSucceeded. Flows? " + flowAToBCompleted + ", " + flowBToACompleted);

    if (session == sessionA) {
      if (flowAToBCompleted) {
        Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session A's timestamp to " + pendingATimestamp + " or " + storeEndATimestamp);
        bundle.bumpTimestamp(Math.max(pendingATimestamp, storeEndATimestamp));
        this.synchronizer.bundleA = bundle;
      } else {
        // Should not happen!
        this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionA), "Failed to finish first session.");
        return;
      }
      if (this.sessionB != null) {
        Logger.trace(LOG_TAG, "Finishing session B.");
        // On to the next.
        try {
          this.sessionB.finish(this);
        } catch (InactiveSessionException e) {
          this.onFinishFailed(e);
          return;
        }
      }
    } else if (session == sessionB) {
      if (flowBToACompleted) {
        Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session B's timestamp to " + pendingBTimestamp + " or " + storeEndBTimestamp);
        bundle.bumpTimestamp(Math.max(pendingBTimestamp, storeEndBTimestamp));
        this.synchronizer.bundleB = bundle;
        Logger.trace(LOG_TAG, "Notifying delegate.onSynchronized.");
        this.delegate.onSynchronized(this);
      } else {
        // Should not happen!
        this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionB), "Failed to finish second session.");
        return;
      }
    } else {
      // TODO: hurrrrrr...
    }

    if (this.sessionB == null) {
      this.sessionA = null; // We're done.
    }
  }

  @Override
  public RepositorySessionFinishDelegate deferredFinishDelegate(final ExecutorService executor) {
    return new DeferredRepositorySessionFinishDelegate(this, executor);
  }
}