diff options
Diffstat (limited to 'services/sync/modules/record.js')
-rw-r--r-- | services/sync/modules/record.js | 481 |
1 files changed, 35 insertions, 446 deletions
diff --git a/services/sync/modules/record.js b/services/sync/modules/record.js index f7a69d9ef..04ccd2dd2 100644 --- a/services/sync/modules/record.js +++ b/services/sync/modules/record.js @@ -10,10 +10,10 @@ this.EXPORTED_SYMBOLS = [ "Collection", ]; -var Cc = Components.classes; -var Ci = Components.interfaces; -var Cr = Components.results; -var Cu = Components.utils; +const Cc = Components.classes; +const Ci = Components.interfaces; +const Cr = Components.results; +const Cu = Components.utils; const CRYPTO_COLLECTION = "crypto"; const KEYS_WBO = "keys"; @@ -23,7 +23,6 @@ Cu.import("resource://services-sync/constants.js"); Cu.import("resource://services-sync/keys.js"); Cu.import("resource://services-sync/resource.js"); Cu.import("resource://services-sync/util.js"); -Cu.import("resource://services-common/async.js"); this.WBORecord = function WBORecord(collection, id) { this.data = {}; @@ -86,7 +85,7 @@ WBORecord.prototype = { toJSON: function toJSON() { // Copy fields from data to be stringified, making sure payload is a string let obj = {}; - for (let [key, val] of Object.entries(this.data)) + for (let [key, val] in Iterator(this.data)) obj[key] = key == "payload" ? JSON.stringify(val) : val; if (this.ttl) obj.ttl = this.ttl; @@ -196,9 +195,7 @@ CryptoWrapper.prototype = { }, // The custom setter below masks the parent's getter, so explicitly call it :( - get id() { - return WBORecord.prototype.__lookupGetter__("id").call(this); - }, + get id() WBORecord.prototype.__lookupGetter__("id").call(this), // Keep both plaintext and encrypted versions of the id to verify integrity set id(val) { @@ -238,11 +235,8 @@ RecordManager.prototype = { record.deserialize(this.response); return this.set(url, record); - } catch (ex) { - if (Async.isShutdownException(ex)) { - throw ex; - } - this._log.debug("Failed to import record", ex); + } catch(ex) { + this._log.debug("Failed to import record: " + Utils.exceptionStr(ex)); return null; } }, @@ -281,10 +275,10 @@ RecordManager.prototype = { * You can update this thing simply by giving it /info/collections. It'll * use the last modified time to bring itself up to date. */ -this.CollectionKeyManager = function CollectionKeyManager(lastModified, default_, collections) { - this.lastModified = lastModified || 0; - this._default = default_ || null; - this._collections = collections || {}; +this.CollectionKeyManager = function CollectionKeyManager() { + this.lastModified = 0; + this._collections = {}; + this._default = null; this._log = Log.repository.getLogger("Sync.CollectionKeyManager"); } @@ -293,19 +287,6 @@ this.CollectionKeyManager = function CollectionKeyManager(lastModified, default_ // Note that the last modified time needs to be preserved. CollectionKeyManager.prototype = { - /** - * Generate a new CollectionKeyManager that has the same attributes - * as this one. - */ - clone() { - const newCollections = {}; - for (let c in this._collections) { - newCollections[c] = this._collections[c]; - } - - return new CollectionKeyManager(this.lastModified, this._default, newCollections); - }, - // Return information about old vs new keys: // * same: true if two collections are equal // * changed: an array of collection names that changed. @@ -328,7 +309,7 @@ CollectionKeyManager.prototype = { // Return a sorted, unique array. changed.sort(); let last; - changed = changed.filter(x => (x != last) && (last = x)); + changed = [x for each (x in changed) if ((x != last) && (last = x))]; return {same: changed.length == 0, changed: changed}; }, @@ -374,15 +355,15 @@ CollectionKeyManager.prototype = { /** * Create a WBO for the current keys. */ - asWBO: function(collection, id) { - return this._makeWBO(this._collections, this._default); - }, + asWBO: function(collection, id) + this._makeWBO(this._collections, this._default), /** * Compute a new default key, and new keys for any specified collections. */ newKeys: function(collections) { - let newDefaultKeyBundle = this.newDefaultKeyBundle(); + let newDefaultKey = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME); + newDefaultKey.generateRandom(); let newColls = {}; if (collections) { @@ -392,7 +373,7 @@ CollectionKeyManager.prototype = { newColls[c] = b; }); } - return [newDefaultKeyBundle, newColls]; + return [newDefaultKey, newColls]; }, /** @@ -406,57 +387,6 @@ CollectionKeyManager.prototype = { return this._makeWBO(newColls, newDefaultKey); }, - /** - * Create a new default key. - * - * @returns {BulkKeyBundle} - */ - newDefaultKeyBundle() { - const key = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME); - key.generateRandom(); - return key; - }, - - /** - * Create a new default key and store it as this._default, since without one you cannot use setContents. - */ - generateDefaultKey() { - this._default = this.newDefaultKeyBundle(); - }, - - /** - * Return true if keys are already present for each of the given - * collections. - */ - hasKeysFor(collections) { - // We can't use filter() here because sometimes collections is an iterator. - for (let collection of collections) { - if (!this._collections[collection]) { - return false; - } - } - return true; - }, - - /** - * Return a new CollectionKeyManager that has keys for each of the - * given collections (creating new ones for collections where we - * don't already have keys). - */ - ensureKeysFor(collections) { - const newKeys = Object.assign({}, this._collections); - for (let c of collections) { - if (newKeys[c]) { - continue; // don't replace existing keys - } - - const b = new BulkKeyBundle(c); - b.generateRandom(); - newKeys[c] = b; - } - return new CollectionKeyManager(this.lastModified, this._default, newKeys); - }, - // Take the fetched info/collections WBO, checking the change // time of the crypto collection. updateNeeded: function(info_collections) { @@ -487,6 +417,9 @@ CollectionKeyManager.prototype = { // setContents: function setContents(payload, modified) { + if (!modified) + throw "No modified time provided to setContents."; + let self = this; this._log.info("Setting collection keys contents. Our last modified: " + @@ -516,7 +449,9 @@ CollectionKeyManager.prototype = { if (v) { let keyObj = new BulkKeyBundle(k); keyObj.keyPairB64 = v; - newCollections[k] = keyObj; + if (keyObj) { + newCollections[k] = keyObj; + } } } } @@ -527,11 +462,8 @@ CollectionKeyManager.prototype = { let sameColls = collComparison.same; if (sameDefault && sameColls) { - self._log.info("New keys are the same as our old keys!"); - if (modified) { - self._log.info("Bumped local modified time."); - self.lastModified = modified; - } + self._log.info("New keys are the same as our old keys! Bumped local modified time."); + self.lastModified = modified; return false; } @@ -543,10 +475,8 @@ CollectionKeyManager.prototype = { this._collections = newCollections; // Always trust the server. - if (modified) { - self._log.info("Bumping last modified to " + modified); - self.lastModified = modified; - } + self._log.info("Bumping last modified to " + modified); + self.lastModified = modified; return sameDefault ? collComparison.changed : true; }, @@ -594,12 +524,6 @@ this.Collection = function Collection(uri, recordObj, service) { this._older = 0; this._newer = 0; this._data = []; - // optional members used by batch upload operations. - this._batch = null; - this._commit = false; - // Used for batch download operations -- note that this is explicitly an - // opaque value and not (necessarily) a number. - this._offset = null; } Collection.prototype = { __proto__: Resource.prototype, @@ -623,12 +547,6 @@ Collection.prototype = { args.push("ids=" + this.ids); if (this.limit > 0 && this.limit != Infinity) args.push("limit=" + this.limit); - if (this._batch) - args.push("batch=" + encodeURIComponent(this._batch)); - if (this._commit) - args.push("commit=true"); - if (this._offset) - args.push("offset=" + encodeURIComponent(this._offset)); this.uri.query = (args.length > 0)? '?' + args.join('&') : ''; }, @@ -641,14 +559,14 @@ Collection.prototype = { }, // Apply the action to a certain set of ids - get ids() { return this._ids; }, + get ids() this._ids, set ids(value) { this._ids = value; this._rebuildURL(); }, // Limit how many records to get - get limit() { return this._limit; }, + get limit() this._limit, set limit(value) { this._limit = value; this._rebuildURL(); @@ -678,100 +596,12 @@ Collection.prototype = { this._rebuildURL(); }, - get offset() { return this._offset; }, - set offset(value) { - this._offset = value; - this._rebuildURL(); - }, - - // Set information about the batch for this request. - get batch() { return this._batch; }, - set batch(value) { - this._batch = value; - this._rebuildURL(); - }, - - get commit() { return this._commit; }, - set commit(value) { - this._commit = value && true; - this._rebuildURL(); + pushData: function Coll_pushData(data) { + this._data.push(data); }, - // Similar to get(), but will page through the items `batchSize` at a time, - // deferring calling the record handler until we've gotten them all. - // - // Returns the last response processed, and doesn't run the record handler - // on any items if a non-success status is received while downloading the - // records (or if a network error occurs). - getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) { - let totalLimit = Number(this.limit) || Infinity; - if (batchSize <= 0 || batchSize >= totalLimit) { - // Invalid batch sizes should arguably be an error, but they're easy to handle - return this.get(); - } - - if (!this.full) { - throw new Error("getBatched is unimplemented for guid-only GETs"); - } - - // _onComplete and _onProgress are reset after each `get` by AsyncResource. - // We overwrite _onRecord to something that stores the data in an array - // until the end. - let { _onComplete, _onProgress, _onRecord } = this; - let recordBuffer = []; - let resp; - try { - this._onRecord = r => recordBuffer.push(r); - let lastModifiedTime; - this.limit = batchSize; - - do { - this._onProgress = _onProgress; - this._onComplete = _onComplete; - if (batchSize + recordBuffer.length > totalLimit) { - this.limit = totalLimit - recordBuffer.length; - } - this._log.trace("Performing batched GET", { limit: this.limit, offset: this.offset }); - // Actually perform the request - resp = this.get(); - if (!resp.success) { - break; - } - - // Initialize last modified, or check that something broken isn't happening. - let lastModified = resp.headers["x-last-modified"]; - if (!lastModifiedTime) { - lastModifiedTime = lastModified; - this.setHeader("X-If-Unmodified-Since", lastModified); - } else if (lastModified != lastModifiedTime) { - // Should be impossible -- We'd get a 412 in this case. - throw new Error("X-Last-Modified changed in the middle of a download batch! " + - `${lastModified} => ${lastModifiedTime}`) - } - - // If this is missing, we're finished. - this.offset = resp.headers["x-weave-next-offset"]; - } while (this.offset && totalLimit > recordBuffer.length); - } finally { - // Ensure we undo any temporary state so that subsequent calls to get() - // or getBatched() work properly. We do this before calling the record - // handler so that we can more convincingly pretend to be a normal get() - // call. Note: we're resetting these to the values they had before this - // function was called. - this._onRecord = _onRecord; - this._limit = totalLimit; - this._offset = null; - delete this._headers["x-if-unmodified-since"]; - this._rebuildURL(); - } - if (resp.success && Async.checkAppReady()) { - // call the original _onRecord (e.g. the user supplied record handler) - // for each record we've stored - for (let record of recordBuffer) { - this._onRecord(record); - } - } - return resp; + clearRecords: function Coll_clearRecords() { + this._data = []; }, set recordHandler(onRecord) { @@ -781,8 +611,6 @@ Collection.prototype = { // Switch to newline separated records for incremental parsing coll.setHeader("Accept", "application/newlines"); - this._onRecord = onRecord; - this._onProgress = function() { let newline; while ((newline = this._data.indexOf("\n")) > 0) { @@ -793,247 +621,8 @@ Collection.prototype = { // Deserialize a record from json and give it to the callback let record = new coll._recordObj(); record.deserialize(json); - coll._onRecord(record); + onRecord(record); } }; }, - - // This object only supports posting via the postQueue object. - post() { - throw new Error("Don't directly post to a collection - use newPostQueue instead"); - }, - - newPostQueue(log, timestamp, postCallback) { - let poster = (data, headers, batch, commit) => { - this.batch = batch; - this.commit = commit; - for (let [header, value] of headers) { - this.setHeader(header, value); - } - return Resource.prototype.post.call(this, data); - } - let getConfig = (name, defaultVal) => { - if (this._service.serverConfiguration && this._service.serverConfiguration.hasOwnProperty(name)) { - return this._service.serverConfiguration[name]; - } - return defaultVal; - } - - let config = { - max_post_bytes: getConfig("max_post_bytes", MAX_UPLOAD_BYTES), - max_post_records: getConfig("max_post_records", MAX_UPLOAD_RECORDS), - - max_batch_bytes: getConfig("max_total_bytes", Infinity), - max_batch_records: getConfig("max_total_records", Infinity), - } - - // Handle config edge cases - if (config.max_post_records <= 0) { config.max_post_records = MAX_UPLOAD_RECORDS; } - if (config.max_batch_records <= 0) { config.max_batch_records = Infinity; } - if (config.max_post_bytes <= 0) { config.max_post_bytes = MAX_UPLOAD_BYTES; } - if (config.max_batch_bytes <= 0) { config.max_batch_bytes = Infinity; } - - // Max size of BSO payload is 256k. This assumes at most 4k of overhead, - // which sounds like plenty. If the server says it can't handle this, we - // might have valid records we can't sync, so we give up on syncing. - let requiredMax = 260 * 1024; - if (config.max_post_bytes < requiredMax) { - this._log.error("Server configuration max_post_bytes is too low", config); - throw new Error("Server configuration max_post_bytes is too low"); - } - - return new PostQueue(poster, timestamp, config, log, postCallback); - }, }; - -/* A helper to manage the posting of records while respecting the various - size limits. - - This supports the concept of a server-side "batch". The general idea is: - * We queue as many records as allowed in memory, then make a single POST. - * This first POST (optionally) gives us a batch ID, which we use for - all subsequent posts, until... - * At some point we hit a batch-maximum, and jump through a few hoops to - commit the current batch (ie, all previous POSTs) and start a new one. - * Eventually commit the final batch. - - In most cases we expect there to be exactly 1 batch consisting of possibly - multiple POSTs. -*/ -function PostQueue(poster, timestamp, config, log, postCallback) { - // The "post" function we should use when it comes time to do the post. - this.poster = poster; - this.log = log; - - // The config we use. We expect it to have fields "max_post_records", - // "max_batch_records", "max_post_bytes", and "max_batch_bytes" - this.config = config; - - // The callback we make with the response when we do get around to making the - // post (which could be during any of the enqueue() calls or the final flush()) - // This callback may be called multiple times and must not add new items to - // the queue. - // The second argument passed to this callback is a boolean value that is true - // if we're in the middle of a batch, and false if either the batch is - // complete, or it's a post to a server that does not understand batching. - this.postCallback = postCallback; - - // The string where we are capturing the stringified version of the records - // queued so far. It will always be invalid JSON as it is always missing the - // closing bracket. - this.queued = ""; - - // The number of records we've queued so far but are yet to POST. - this.numQueued = 0; - - // The number of records/bytes we've processed in previous POSTs for our - // current batch. Does *not* include records currently queued for the next POST. - this.numAlreadyBatched = 0; - this.bytesAlreadyBatched = 0; - - // The ID of our current batch. Can be undefined (meaning we are yet to make - // the first post of a patch, so don't know if we have a batch), null (meaning - // we've made the first post but the server response indicated no batching - // semantics), otherwise we have made the first post and it holds the batch ID - // returned from the server. - this.batchID = undefined; - - // Time used for X-If-Unmodified-Since -- should be the timestamp from the last GET. - this.lastModified = timestamp; -} - -PostQueue.prototype = { - enqueue(record) { - // We want to ensure the record has a .toJSON() method defined - even - // though JSON.stringify() would implicitly call it, the stringify might - // still work even if it isn't defined, which isn't what we want. - let jsonRepr = record.toJSON(); - if (!jsonRepr) { - throw new Error("You must only call this with objects that explicitly support JSON"); - } - let bytes = JSON.stringify(jsonRepr); - - // Do a flush if we can't add this record without exceeding our single-request - // limits, or without exceeding the total limit for a single batch. - let newLength = this.queued.length + bytes.length + 2; // extras for leading "[" / "," and trailing "]" - - let maxAllowedBytes = Math.min(256 * 1024, this.config.max_post_bytes); - - let postSizeExceeded = this.numQueued >= this.config.max_post_records || - newLength >= maxAllowedBytes; - - let batchSizeExceeded = (this.numQueued + this.numAlreadyBatched) >= this.config.max_batch_records || - (newLength + this.bytesAlreadyBatched) >= this.config.max_batch_bytes; - - let singleRecordTooBig = bytes.length + 2 > maxAllowedBytes; - - if (postSizeExceeded || batchSizeExceeded) { - this.log.trace(`PostQueue flushing due to postSizeExceeded=${postSizeExceeded}, batchSizeExceeded=${batchSizeExceeded}` + - `, max_batch_bytes: ${this.config.max_batch_bytes}, max_post_bytes: ${this.config.max_post_bytes}`); - - if (singleRecordTooBig) { - return { enqueued: false, error: new Error("Single record too large to submit to server") }; - } - - // We need to write the queue out before handling this one, but we only - // commit the batch (and thus start a new one) if the batch is full. - // Note that if a single record is too big for the batch or post, then - // the batch may be empty, and so we don't flush in that case. - if (this.numQueued) { - this.flush(batchSizeExceeded || singleRecordTooBig); - } - } - // Either a ',' or a '[' depending on whether this is the first record. - this.queued += this.numQueued ? "," : "["; - this.queued += bytes; - this.numQueued++; - return { enqueued: true }; - }, - - flush(finalBatchPost) { - if (!this.queued) { - // nothing queued - we can't be in a batch, and something has gone very - // bad if we think we are. - if (this.batchID) { - throw new Error(`Flush called when no queued records but we are in a batch ${this.batchID}`); - } - return; - } - // the batch query-param and headers we'll send. - let batch; - let headers = []; - if (this.batchID === undefined) { - // First commit in a (possible) batch. - batch = "true"; - } else if (this.batchID) { - // We have an existing batch. - batch = this.batchID; - } else { - // Not the first post and we know we have no batch semantics. - batch = null; - } - - headers.push(["x-if-unmodified-since", this.lastModified]); - - this.log.info(`Posting ${this.numQueued} records of ${this.queued.length+1} bytes with batch=${batch}`); - let queued = this.queued + "]"; - if (finalBatchPost) { - this.bytesAlreadyBatched = 0; - this.numAlreadyBatched = 0; - } else { - this.bytesAlreadyBatched += queued.length; - this.numAlreadyBatched += this.numQueued; - } - this.queued = ""; - this.numQueued = 0; - let response = this.poster(queued, headers, batch, !!(finalBatchPost && this.batchID !== null)); - - if (!response.success) { - this.log.trace("Server error response during a batch", response); - // not clear what we should do here - we expect the consumer of this to - // abort by throwing in the postCallback below. - return this.postCallback(response, !finalBatchPost); - } - - if (finalBatchPost) { - this.log.trace("Committed batch", this.batchID); - this.batchID = undefined; // we are now in "first post for the batch" state. - this.lastModified = response.headers["x-last-modified"]; - return this.postCallback(response, false); - } - - if (response.status != 202) { - if (this.batchID) { - throw new Error("Server responded non-202 success code while a batch was in progress"); - } - this.batchID = null; // no batch semantics are in place. - this.lastModified = response.headers["x-last-modified"]; - return this.postCallback(response, false); - } - - // this response is saying the server has batch semantics - we should - // always have a batch ID in the response. - let responseBatchID = response.obj.batch; - this.log.trace("Server responsed 202 with batch", responseBatchID); - if (!responseBatchID) { - this.log.error("Invalid server response: 202 without a batch ID", response); - throw new Error("Invalid server response: 202 without a batch ID"); - } - - if (this.batchID === undefined) { - this.batchID = responseBatchID; - if (!this.lastModified) { - this.lastModified = response.headers["x-last-modified"]; - if (!this.lastModified) { - throw new Error("Batch response without x-last-modified"); - } - } - } - - if (this.batchID != responseBatchID) { - throw new Error(`Invalid client/server batch state - client has ${this.batchID}, server has ${responseBatchID}`); - } - - this.postCallback(response, true); - }, -} |