summaryrefslogtreecommitdiffstats
path: root/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
blob: 5c3037e4d7fda4a504914317cf99850952e3337d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

package org.mozilla.gecko.sync.repositories.downloaders;

import android.support.annotation.Nullable;
import android.support.annotation.VisibleForTesting;

import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.DelayedWorkTracker;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.Server11Repository;
import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
 * Batching Downloader, which implements batching protocol as supported by Sync 1.5.
 *
 * Downloader's batching behaviour is configured via two parameters, obtained from the repository:
 * - Per-batch limit, which specified how many records may be fetched in an individual GET request.
 * - Total limit, which controls number of batch GET requests we will make.
 *
 *
 * Batching is implemented via specifying a 'limit' GET parameter, and looking for an 'offset' token
 * in the response. If offset token is present, this indicates that there are more records than what
 * we've received so far, and we perform an additional fetch. Batching stops when either we hit a total
 * limit, or offset token is no longer present (indicating that we're done).
 *
 * For unlimited repositories (such as passwords), both of these value will be -1. Downloader will not
 * specify a limit parameter in this case, and the response will contain every record available and no
 * offset token, thus fully completing in one go.
 *
 * In between batches, we maintain a Last-Modified timestamp, based off the value return in the header
 * of the first response. Every response will have a Last-Modified header, indicating when the collection
 * was modified last. We pass along this header in our subsequent requests in a X-If-Unmodified-Since
 * header. Server will ensure that our collection did not change while we are batching, if it did it will
 * fail our fetch with a 412 (Consequent Modification) error. Additionally, we perform the same checks
 * locally.
 */
public class BatchingDownloader {
    public static final String LOG_TAG = "BatchingDownloader";

    protected final Server11Repository repository;
    private final Server11RepositorySession repositorySession;
    private final DelayedWorkTracker workTracker = new DelayedWorkTracker();
    // Used to track outstanding requests, so that we can abort them as needed.
    @VisibleForTesting
    protected final Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
    /* @GuardedBy("this") */ private String lastModified;
    /* @GuardedBy("this") */ private long numRecords = 0;

    public BatchingDownloader(final Server11Repository repository, final Server11RepositorySession repositorySession) {
        this.repository = repository;
        this.repositorySession = repositorySession;
    }

    @VisibleForTesting
    protected static String flattenIDs(String[] guids) {
        // Consider using Utils.toDelimitedString if and when the signature changes
        // to Collection<String> guids.
        if (guids.length == 0) {
            return "";
        }
        if (guids.length == 1) {
            return guids[0];
        }
        // Assuming 12-char GUIDs. There should be a -1 in there, but we accumulate one comma too many.
        StringBuilder b = new StringBuilder(guids.length * 12 + guids.length);
        for (String guid : guids) {
            b.append(guid);
            b.append(",");
        }
        return b.substring(0, b.length() - 1);
    }

    @VisibleForTesting
    protected void fetchWithParameters(long newer,
                                    long batchLimit,
                                    boolean full,
                                    String sort,
                                    String ids,
                                    SyncStorageCollectionRequest request,
                                    RepositorySessionFetchRecordsDelegate fetchRecordsDelegate)
            throws URISyntaxException, UnsupportedEncodingException {
        if (batchLimit > repository.getDefaultTotalLimit()) {
            throw new IllegalArgumentException("Batch limit should not be greater than total limit");
        }

        request.delegate = new BatchingDownloaderDelegate(this, fetchRecordsDelegate, request,
                newer, batchLimit, full, sort, ids);
        this.pending.add(request);
        request.get();
    }

    @VisibleForTesting
    @Nullable
    protected String encodeParam(String param) throws UnsupportedEncodingException {
        if (param != null) {
            return URLEncoder.encode(param, "UTF-8");
        }
        return null;
    }

    @VisibleForTesting
    protected SyncStorageCollectionRequest makeSyncStorageCollectionRequest(long newer,
                                                  long batchLimit,
                                                  boolean full,
                                                  String sort,
                                                  String ids,
                                                  String offset)
            throws URISyntaxException, UnsupportedEncodingException {
        URI collectionURI = repository.collectionURI(full, newer, batchLimit, sort, ids, encodeParam(offset));
        Logger.debug(LOG_TAG, collectionURI.toString());

        return new SyncStorageCollectionRequest(collectionURI);
    }

    public void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
        this.fetchSince(timestamp, null, fetchRecordsDelegate);
    }

    private void fetchSince(long timestamp, String offset,
                           RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
        long batchLimit = repository.getDefaultBatchLimit();
        String sort = repository.getDefaultSort();

        try {
            SyncStorageCollectionRequest request = makeSyncStorageCollectionRequest(timestamp,
                    batchLimit, true, sort, null, offset);
            this.fetchWithParameters(timestamp, batchLimit, true, sort, null, request, fetchRecordsDelegate);
        } catch (URISyntaxException | UnsupportedEncodingException e) {
            fetchRecordsDelegate.onFetchFailed(e, null);
        }
    }

    public void fetch(String[] guids, RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
        String ids = flattenIDs(guids);
        String index = "index";

        try {
            SyncStorageCollectionRequest request = makeSyncStorageCollectionRequest(
                    -1, -1, true, index, ids, null);
            this.fetchWithParameters(-1, -1, true, index, ids, request, fetchRecordsDelegate);
        } catch (URISyntaxException | UnsupportedEncodingException e) {
            fetchRecordsDelegate.onFetchFailed(e, null);
        }
    }

    public Server11Repository getServerRepository() {
        return this.repository;
    }

    public void onFetchCompleted(SyncStorageResponse response,
                                 final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
                                 final SyncStorageCollectionRequest request, long newer,
                                 long limit, boolean full, String sort, String ids) {
        removeRequestFromPending(request);

        // When we process our first request, we get back a X-Last-Modified header indicating when collection was modified last.
        // We pass it to the server with every subsequent request (if we need to make more) as the X-If-Unmodified-Since header,
        // and server is supposed to ensure that this pre-condition is met, and fail our request with a 412 error code otherwise.
        // So, if all of this happens, these checks should never fail.
        // However, we also track this header in client side, and can defensively validate against it here as well.
        final String currentLastModifiedTimestamp = response.lastModified();
        Logger.debug(LOG_TAG, "Last modified timestamp " + currentLastModifiedTimestamp);

        // Sanity check. We also did a null check in delegate before passing it into here.
        if (currentLastModifiedTimestamp == null) {
            this.abort(fetchRecordsDelegate, "Last modified timestamp is missing");
            return;
        }

        final boolean lastModifiedChanged;
        synchronized (this) {
            if (this.lastModified == null) {
                // First time seeing last modified timestamp.
                this.lastModified = currentLastModifiedTimestamp;
            }
            lastModifiedChanged = !this.lastModified.equals(currentLastModifiedTimestamp);
        }

        if (lastModifiedChanged) {
            this.abort(fetchRecordsDelegate, "Last modified timestamp has changed unexpectedly");
            return;
        }

        final boolean hasNotReachedLimit;
        synchronized (this) {
            this.numRecords += response.weaveRecords();
            hasNotReachedLimit = this.numRecords < repository.getDefaultTotalLimit();
        }

        final String offset = response.weaveOffset();
        final SyncStorageCollectionRequest newRequest;
        try {
            newRequest = makeSyncStorageCollectionRequest(newer,
                    limit, full, sort, ids, offset);
        } catch (final URISyntaxException | UnsupportedEncodingException e) {
            this.workTracker.delayWorkItem(new Runnable() {
                @Override
                public void run() {
                    Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
                    fetchRecordsDelegate.onFetchFailed(e, null);
                }
            });
            return;
        }

        if (offset != null && hasNotReachedLimit) {
            try {
                this.fetchWithParameters(newer, limit, full, sort, ids, newRequest, fetchRecordsDelegate);
            } catch (final URISyntaxException | UnsupportedEncodingException e) {
                this.workTracker.delayWorkItem(new Runnable() {
                    @Override
                    public void run() {
                        Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
                        fetchRecordsDelegate.onFetchFailed(e, null);
                    }
                });
            }
            return;
        }

        final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED);
        Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);

        this.workTracker.delayWorkItem(new Runnable() {
            @Override
            public void run() {
                Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
                fetchRecordsDelegate.onFetchCompleted(normalizedTimestamp);
            }
        });
    }

    public void onFetchFailed(final Exception ex,
                              final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
                              final SyncStorageCollectionRequest request) {
        removeRequestFromPending(request);
        this.workTracker.delayWorkItem(new Runnable() {
            @Override
            public void run() {
                Logger.debug(LOG_TAG, "Running onFetchFailed.");
                fetchRecordsDelegate.onFetchFailed(ex, null);
            }
        });
    }

    public void onFetchedRecord(CryptoRecord record,
                                RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
        this.workTracker.incrementOutstanding();
        try {
            fetchRecordsDelegate.onFetchedRecord(record);
        } catch (Exception ex) {
            Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
            throw new RuntimeException(ex);
        } finally {
            this.workTracker.decrementOutstanding();
        }
    }

    private void removeRequestFromPending(SyncStorageCollectionRequest request) {
        if (request == null) {
            return;
        }
        this.pending.remove(request);
    }

    @VisibleForTesting
    protected void abortRequests() {
        this.repositorySession.abort();
        synchronized (this.pending) {
            for (SyncStorageCollectionRequest request : this.pending) {
                request.abort();
            }
            this.pending.clear();
        }
    }

    @Nullable
    protected synchronized String getLastModified() {
        return this.lastModified;
    }

    private void abort(final RepositorySessionFetchRecordsDelegate delegate, final String msg) {
        Logger.error(LOG_TAG, msg);
        this.abortRequests();
        this.workTracker.delayWorkItem(new Runnable() {
            @Override
            public void run() {
                Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
                delegate.onFetchFailed(
                        new IllegalStateException(msg),
                        null);
            }
        });
    }
}