summaryrefslogtreecommitdiffstats
path: root/services/sync/modules/record.js
diff options
context:
space:
mode:
Diffstat (limited to 'services/sync/modules/record.js')
-rw-r--r--services/sync/modules/record.js469
1 files changed, 29 insertions, 440 deletions
diff --git a/services/sync/modules/record.js b/services/sync/modules/record.js
index f7a69d9ef..5dc1c012c 100644
--- a/services/sync/modules/record.js
+++ b/services/sync/modules/record.js
@@ -23,7 +23,6 @@ Cu.import("resource://services-sync/constants.js");
Cu.import("resource://services-sync/keys.js");
Cu.import("resource://services-sync/resource.js");
Cu.import("resource://services-sync/util.js");
-Cu.import("resource://services-common/async.js");
this.WBORecord = function WBORecord(collection, id) {
this.data = {};
@@ -196,9 +195,7 @@ CryptoWrapper.prototype = {
},
// The custom setter below masks the parent's getter, so explicitly call it :(
- get id() {
- return WBORecord.prototype.__lookupGetter__("id").call(this);
- },
+ get id() WBORecord.prototype.__lookupGetter__("id").call(this),
// Keep both plaintext and encrypted versions of the id to verify integrity
set id(val) {
@@ -238,11 +235,8 @@ RecordManager.prototype = {
record.deserialize(this.response);
return this.set(url, record);
- } catch (ex) {
- if (Async.isShutdownException(ex)) {
- throw ex;
- }
- this._log.debug("Failed to import record", ex);
+ } catch(ex) {
+ this._log.debug("Failed to import record: ", ex);
return null;
}
},
@@ -281,10 +275,10 @@ RecordManager.prototype = {
* You can update this thing simply by giving it /info/collections. It'll
* use the last modified time to bring itself up to date.
*/
-this.CollectionKeyManager = function CollectionKeyManager(lastModified, default_, collections) {
- this.lastModified = lastModified || 0;
- this._default = default_ || null;
- this._collections = collections || {};
+this.CollectionKeyManager = function CollectionKeyManager() {
+ this.lastModified = 0;
+ this._collections = {};
+ this._default = null;
this._log = Log.repository.getLogger("Sync.CollectionKeyManager");
}
@@ -293,19 +287,6 @@ this.CollectionKeyManager = function CollectionKeyManager(lastModified, default_
// Note that the last modified time needs to be preserved.
CollectionKeyManager.prototype = {
- /**
- * Generate a new CollectionKeyManager that has the same attributes
- * as this one.
- */
- clone() {
- const newCollections = {};
- for (let c in this._collections) {
- newCollections[c] = this._collections[c];
- }
-
- return new CollectionKeyManager(this.lastModified, this._default, newCollections);
- },
-
// Return information about old vs new keys:
// * same: true if two collections are equal
// * changed: an array of collection names that changed.
@@ -374,15 +355,15 @@ CollectionKeyManager.prototype = {
/**
* Create a WBO for the current keys.
*/
- asWBO: function(collection, id) {
- return this._makeWBO(this._collections, this._default);
- },
+ asWBO: function(collection, id)
+ this._makeWBO(this._collections, this._default),
/**
* Compute a new default key, and new keys for any specified collections.
*/
newKeys: function(collections) {
- let newDefaultKeyBundle = this.newDefaultKeyBundle();
+ let newDefaultKey = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME);
+ newDefaultKey.generateRandom();
let newColls = {};
if (collections) {
@@ -392,7 +373,7 @@ CollectionKeyManager.prototype = {
newColls[c] = b;
});
}
- return [newDefaultKeyBundle, newColls];
+ return [newDefaultKey, newColls];
},
/**
@@ -406,57 +387,6 @@ CollectionKeyManager.prototype = {
return this._makeWBO(newColls, newDefaultKey);
},
- /**
- * Create a new default key.
- *
- * @returns {BulkKeyBundle}
- */
- newDefaultKeyBundle() {
- const key = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME);
- key.generateRandom();
- return key;
- },
-
- /**
- * Create a new default key and store it as this._default, since without one you cannot use setContents.
- */
- generateDefaultKey() {
- this._default = this.newDefaultKeyBundle();
- },
-
- /**
- * Return true if keys are already present for each of the given
- * collections.
- */
- hasKeysFor(collections) {
- // We can't use filter() here because sometimes collections is an iterator.
- for (let collection of collections) {
- if (!this._collections[collection]) {
- return false;
- }
- }
- return true;
- },
-
- /**
- * Return a new CollectionKeyManager that has keys for each of the
- * given collections (creating new ones for collections where we
- * don't already have keys).
- */
- ensureKeysFor(collections) {
- const newKeys = Object.assign({}, this._collections);
- for (let c of collections) {
- if (newKeys[c]) {
- continue; // don't replace existing keys
- }
-
- const b = new BulkKeyBundle(c);
- b.generateRandom();
- newKeys[c] = b;
- }
- return new CollectionKeyManager(this.lastModified, this._default, newKeys);
- },
-
// Take the fetched info/collections WBO, checking the change
// time of the crypto collection.
updateNeeded: function(info_collections) {
@@ -487,6 +417,9 @@ CollectionKeyManager.prototype = {
//
setContents: function setContents(payload, modified) {
+ if (!modified)
+ throw "No modified time provided to setContents.";
+
let self = this;
this._log.info("Setting collection keys contents. Our last modified: " +
@@ -516,7 +449,9 @@ CollectionKeyManager.prototype = {
if (v) {
let keyObj = new BulkKeyBundle(k);
keyObj.keyPairB64 = v;
- newCollections[k] = keyObj;
+ if (keyObj) {
+ newCollections[k] = keyObj;
+ }
}
}
}
@@ -527,11 +462,8 @@ CollectionKeyManager.prototype = {
let sameColls = collComparison.same;
if (sameDefault && sameColls) {
- self._log.info("New keys are the same as our old keys!");
- if (modified) {
- self._log.info("Bumped local modified time.");
- self.lastModified = modified;
- }
+ self._log.info("New keys are the same as our old keys! Bumped local modified time.");
+ self.lastModified = modified;
return false;
}
@@ -543,10 +475,8 @@ CollectionKeyManager.prototype = {
this._collections = newCollections;
// Always trust the server.
- if (modified) {
- self._log.info("Bumping last modified to " + modified);
- self.lastModified = modified;
- }
+ self._log.info("Bumping last modified to " + modified);
+ self.lastModified = modified;
return sameDefault ? collComparison.changed : true;
},
@@ -594,12 +524,6 @@ this.Collection = function Collection(uri, recordObj, service) {
this._older = 0;
this._newer = 0;
this._data = [];
- // optional members used by batch upload operations.
- this._batch = null;
- this._commit = false;
- // Used for batch download operations -- note that this is explicitly an
- // opaque value and not (necessarily) a number.
- this._offset = null;
}
Collection.prototype = {
__proto__: Resource.prototype,
@@ -623,12 +547,6 @@ Collection.prototype = {
args.push("ids=" + this.ids);
if (this.limit > 0 && this.limit != Infinity)
args.push("limit=" + this.limit);
- if (this._batch)
- args.push("batch=" + encodeURIComponent(this._batch));
- if (this._commit)
- args.push("commit=true");
- if (this._offset)
- args.push("offset=" + encodeURIComponent(this._offset));
this.uri.query = (args.length > 0)? '?' + args.join('&') : '';
},
@@ -641,14 +559,14 @@ Collection.prototype = {
},
// Apply the action to a certain set of ids
- get ids() { return this._ids; },
+ get ids() this._ids,
set ids(value) {
this._ids = value;
this._rebuildURL();
},
// Limit how many records to get
- get limit() { return this._limit; },
+ get limit() this._limit,
set limit(value) {
this._limit = value;
this._rebuildURL();
@@ -678,100 +596,12 @@ Collection.prototype = {
this._rebuildURL();
},
- get offset() { return this._offset; },
- set offset(value) {
- this._offset = value;
- this._rebuildURL();
- },
-
- // Set information about the batch for this request.
- get batch() { return this._batch; },
- set batch(value) {
- this._batch = value;
- this._rebuildURL();
- },
-
- get commit() { return this._commit; },
- set commit(value) {
- this._commit = value && true;
- this._rebuildURL();
+ pushData: function Coll_pushData(data) {
+ this._data.push(data);
},
- // Similar to get(), but will page through the items `batchSize` at a time,
- // deferring calling the record handler until we've gotten them all.
- //
- // Returns the last response processed, and doesn't run the record handler
- // on any items if a non-success status is received while downloading the
- // records (or if a network error occurs).
- getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) {
- let totalLimit = Number(this.limit) || Infinity;
- if (batchSize <= 0 || batchSize >= totalLimit) {
- // Invalid batch sizes should arguably be an error, but they're easy to handle
- return this.get();
- }
-
- if (!this.full) {
- throw new Error("getBatched is unimplemented for guid-only GETs");
- }
-
- // _onComplete and _onProgress are reset after each `get` by AsyncResource.
- // We overwrite _onRecord to something that stores the data in an array
- // until the end.
- let { _onComplete, _onProgress, _onRecord } = this;
- let recordBuffer = [];
- let resp;
- try {
- this._onRecord = r => recordBuffer.push(r);
- let lastModifiedTime;
- this.limit = batchSize;
-
- do {
- this._onProgress = _onProgress;
- this._onComplete = _onComplete;
- if (batchSize + recordBuffer.length > totalLimit) {
- this.limit = totalLimit - recordBuffer.length;
- }
- this._log.trace("Performing batched GET", { limit: this.limit, offset: this.offset });
- // Actually perform the request
- resp = this.get();
- if (!resp.success) {
- break;
- }
-
- // Initialize last modified, or check that something broken isn't happening.
- let lastModified = resp.headers["x-last-modified"];
- if (!lastModifiedTime) {
- lastModifiedTime = lastModified;
- this.setHeader("X-If-Unmodified-Since", lastModified);
- } else if (lastModified != lastModifiedTime) {
- // Should be impossible -- We'd get a 412 in this case.
- throw new Error("X-Last-Modified changed in the middle of a download batch! " +
- `${lastModified} => ${lastModifiedTime}`)
- }
-
- // If this is missing, we're finished.
- this.offset = resp.headers["x-weave-next-offset"];
- } while (this.offset && totalLimit > recordBuffer.length);
- } finally {
- // Ensure we undo any temporary state so that subsequent calls to get()
- // or getBatched() work properly. We do this before calling the record
- // handler so that we can more convincingly pretend to be a normal get()
- // call. Note: we're resetting these to the values they had before this
- // function was called.
- this._onRecord = _onRecord;
- this._limit = totalLimit;
- this._offset = null;
- delete this._headers["x-if-unmodified-since"];
- this._rebuildURL();
- }
- if (resp.success && Async.checkAppReady()) {
- // call the original _onRecord (e.g. the user supplied record handler)
- // for each record we've stored
- for (let record of recordBuffer) {
- this._onRecord(record);
- }
- }
- return resp;
+ clearRecords: function Coll_clearRecords() {
+ this._data = [];
},
set recordHandler(onRecord) {
@@ -781,8 +611,6 @@ Collection.prototype = {
// Switch to newline separated records for incremental parsing
coll.setHeader("Accept", "application/newlines");
- this._onRecord = onRecord;
-
this._onProgress = function() {
let newline;
while ((newline = this._data.indexOf("\n")) > 0) {
@@ -793,247 +621,8 @@ Collection.prototype = {
// Deserialize a record from json and give it to the callback
let record = new coll._recordObj();
record.deserialize(json);
- coll._onRecord(record);
+ onRecord(record);
}
};
},
-
- // This object only supports posting via the postQueue object.
- post() {
- throw new Error("Don't directly post to a collection - use newPostQueue instead");
- },
-
- newPostQueue(log, timestamp, postCallback) {
- let poster = (data, headers, batch, commit) => {
- this.batch = batch;
- this.commit = commit;
- for (let [header, value] of headers) {
- this.setHeader(header, value);
- }
- return Resource.prototype.post.call(this, data);
- }
- let getConfig = (name, defaultVal) => {
- if (this._service.serverConfiguration && this._service.serverConfiguration.hasOwnProperty(name)) {
- return this._service.serverConfiguration[name];
- }
- return defaultVal;
- }
-
- let config = {
- max_post_bytes: getConfig("max_post_bytes", MAX_UPLOAD_BYTES),
- max_post_records: getConfig("max_post_records", MAX_UPLOAD_RECORDS),
-
- max_batch_bytes: getConfig("max_total_bytes", Infinity),
- max_batch_records: getConfig("max_total_records", Infinity),
- }
-
- // Handle config edge cases
- if (config.max_post_records <= 0) { config.max_post_records = MAX_UPLOAD_RECORDS; }
- if (config.max_batch_records <= 0) { config.max_batch_records = Infinity; }
- if (config.max_post_bytes <= 0) { config.max_post_bytes = MAX_UPLOAD_BYTES; }
- if (config.max_batch_bytes <= 0) { config.max_batch_bytes = Infinity; }
-
- // Max size of BSO payload is 256k. This assumes at most 4k of overhead,
- // which sounds like plenty. If the server says it can't handle this, we
- // might have valid records we can't sync, so we give up on syncing.
- let requiredMax = 260 * 1024;
- if (config.max_post_bytes < requiredMax) {
- this._log.error("Server configuration max_post_bytes is too low", config);
- throw new Error("Server configuration max_post_bytes is too low");
- }
-
- return new PostQueue(poster, timestamp, config, log, postCallback);
- },
};
-
-/* A helper to manage the posting of records while respecting the various
- size limits.
-
- This supports the concept of a server-side "batch". The general idea is:
- * We queue as many records as allowed in memory, then make a single POST.
- * This first POST (optionally) gives us a batch ID, which we use for
- all subsequent posts, until...
- * At some point we hit a batch-maximum, and jump through a few hoops to
- commit the current batch (ie, all previous POSTs) and start a new one.
- * Eventually commit the final batch.
-
- In most cases we expect there to be exactly 1 batch consisting of possibly
- multiple POSTs.
-*/
-function PostQueue(poster, timestamp, config, log, postCallback) {
- // The "post" function we should use when it comes time to do the post.
- this.poster = poster;
- this.log = log;
-
- // The config we use. We expect it to have fields "max_post_records",
- // "max_batch_records", "max_post_bytes", and "max_batch_bytes"
- this.config = config;
-
- // The callback we make with the response when we do get around to making the
- // post (which could be during any of the enqueue() calls or the final flush())
- // This callback may be called multiple times and must not add new items to
- // the queue.
- // The second argument passed to this callback is a boolean value that is true
- // if we're in the middle of a batch, and false if either the batch is
- // complete, or it's a post to a server that does not understand batching.
- this.postCallback = postCallback;
-
- // The string where we are capturing the stringified version of the records
- // queued so far. It will always be invalid JSON as it is always missing the
- // closing bracket.
- this.queued = "";
-
- // The number of records we've queued so far but are yet to POST.
- this.numQueued = 0;
-
- // The number of records/bytes we've processed in previous POSTs for our
- // current batch. Does *not* include records currently queued for the next POST.
- this.numAlreadyBatched = 0;
- this.bytesAlreadyBatched = 0;
-
- // The ID of our current batch. Can be undefined (meaning we are yet to make
- // the first post of a patch, so don't know if we have a batch), null (meaning
- // we've made the first post but the server response indicated no batching
- // semantics), otherwise we have made the first post and it holds the batch ID
- // returned from the server.
- this.batchID = undefined;
-
- // Time used for X-If-Unmodified-Since -- should be the timestamp from the last GET.
- this.lastModified = timestamp;
-}
-
-PostQueue.prototype = {
- enqueue(record) {
- // We want to ensure the record has a .toJSON() method defined - even
- // though JSON.stringify() would implicitly call it, the stringify might
- // still work even if it isn't defined, which isn't what we want.
- let jsonRepr = record.toJSON();
- if (!jsonRepr) {
- throw new Error("You must only call this with objects that explicitly support JSON");
- }
- let bytes = JSON.stringify(jsonRepr);
-
- // Do a flush if we can't add this record without exceeding our single-request
- // limits, or without exceeding the total limit for a single batch.
- let newLength = this.queued.length + bytes.length + 2; // extras for leading "[" / "," and trailing "]"
-
- let maxAllowedBytes = Math.min(256 * 1024, this.config.max_post_bytes);
-
- let postSizeExceeded = this.numQueued >= this.config.max_post_records ||
- newLength >= maxAllowedBytes;
-
- let batchSizeExceeded = (this.numQueued + this.numAlreadyBatched) >= this.config.max_batch_records ||
- (newLength + this.bytesAlreadyBatched) >= this.config.max_batch_bytes;
-
- let singleRecordTooBig = bytes.length + 2 > maxAllowedBytes;
-
- if (postSizeExceeded || batchSizeExceeded) {
- this.log.trace(`PostQueue flushing due to postSizeExceeded=${postSizeExceeded}, batchSizeExceeded=${batchSizeExceeded}` +
- `, max_batch_bytes: ${this.config.max_batch_bytes}, max_post_bytes: ${this.config.max_post_bytes}`);
-
- if (singleRecordTooBig) {
- return { enqueued: false, error: new Error("Single record too large to submit to server") };
- }
-
- // We need to write the queue out before handling this one, but we only
- // commit the batch (and thus start a new one) if the batch is full.
- // Note that if a single record is too big for the batch or post, then
- // the batch may be empty, and so we don't flush in that case.
- if (this.numQueued) {
- this.flush(batchSizeExceeded || singleRecordTooBig);
- }
- }
- // Either a ',' or a '[' depending on whether this is the first record.
- this.queued += this.numQueued ? "," : "[";
- this.queued += bytes;
- this.numQueued++;
- return { enqueued: true };
- },
-
- flush(finalBatchPost) {
- if (!this.queued) {
- // nothing queued - we can't be in a batch, and something has gone very
- // bad if we think we are.
- if (this.batchID) {
- throw new Error(`Flush called when no queued records but we are in a batch ${this.batchID}`);
- }
- return;
- }
- // the batch query-param and headers we'll send.
- let batch;
- let headers = [];
- if (this.batchID === undefined) {
- // First commit in a (possible) batch.
- batch = "true";
- } else if (this.batchID) {
- // We have an existing batch.
- batch = this.batchID;
- } else {
- // Not the first post and we know we have no batch semantics.
- batch = null;
- }
-
- headers.push(["x-if-unmodified-since", this.lastModified]);
-
- this.log.info(`Posting ${this.numQueued} records of ${this.queued.length+1} bytes with batch=${batch}`);
- let queued = this.queued + "]";
- if (finalBatchPost) {
- this.bytesAlreadyBatched = 0;
- this.numAlreadyBatched = 0;
- } else {
- this.bytesAlreadyBatched += queued.length;
- this.numAlreadyBatched += this.numQueued;
- }
- this.queued = "";
- this.numQueued = 0;
- let response = this.poster(queued, headers, batch, !!(finalBatchPost && this.batchID !== null));
-
- if (!response.success) {
- this.log.trace("Server error response during a batch", response);
- // not clear what we should do here - we expect the consumer of this to
- // abort by throwing in the postCallback below.
- return this.postCallback(response, !finalBatchPost);
- }
-
- if (finalBatchPost) {
- this.log.trace("Committed batch", this.batchID);
- this.batchID = undefined; // we are now in "first post for the batch" state.
- this.lastModified = response.headers["x-last-modified"];
- return this.postCallback(response, false);
- }
-
- if (response.status != 202) {
- if (this.batchID) {
- throw new Error("Server responded non-202 success code while a batch was in progress");
- }
- this.batchID = null; // no batch semantics are in place.
- this.lastModified = response.headers["x-last-modified"];
- return this.postCallback(response, false);
- }
-
- // this response is saying the server has batch semantics - we should
- // always have a batch ID in the response.
- let responseBatchID = response.obj.batch;
- this.log.trace("Server responsed 202 with batch", responseBatchID);
- if (!responseBatchID) {
- this.log.error("Invalid server response: 202 without a batch ID", response);
- throw new Error("Invalid server response: 202 without a batch ID");
- }
-
- if (this.batchID === undefined) {
- this.batchID = responseBatchID;
- if (!this.lastModified) {
- this.lastModified = response.headers["x-last-modified"];
- if (!this.lastModified) {
- throw new Error("Batch response without x-last-modified");
- }
- }
- }
-
- if (this.batchID != responseBatchID) {
- throw new Error(`Invalid client/server batch state - client has ${this.batchID}, server has ${responseBatchID}`);
- }
-
- this.postCallback(response, true);
- },
-}