diff options
Diffstat (limited to 'services/sync/modules/engines.js')
-rw-r--r-- | services/sync/modules/engines.js | 442 |
1 files changed, 121 insertions, 321 deletions
diff --git a/services/sync/modules/engines.js b/services/sync/modules/engines.js index 1eaa1863a..4767a1103 100644 --- a/services/sync/modules/engines.js +++ b/services/sync/modules/engines.js @@ -7,8 +7,7 @@ this.EXPORTED_SYMBOLS = [ "Engine", "SyncEngine", "Tracker", - "Store", - "Changeset" + "Store" ]; var {classes: Cc, interfaces: Ci, results: Cr, utils: Cu} = Components; @@ -16,15 +15,13 @@ var {classes: Cc, interfaces: Ci, results: Cr, utils: Cu} = Components; Cu.import("resource://services-common/async.js"); Cu.import("resource://gre/modules/Log.jsm"); Cu.import("resource://services-common/observers.js"); +Cu.import("resource://services-common/utils.js"); Cu.import("resource://services-sync/constants.js"); Cu.import("resource://services-sync/identity.js"); Cu.import("resource://services-sync/record.js"); Cu.import("resource://services-sync/resource.js"); Cu.import("resource://services-sync/util.js"); -XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts", - "resource://gre/modules/FxAccounts.jsm"); - /* * Trackers are associated with a single engine and deal with * listening for changes to their particular data type. @@ -132,12 +129,6 @@ Tracker.prototype = { this._ignored.splice(index, 1); }, - _saveChangedID(id, when) { - this._log.trace(`Adding changed ID: ${id}, ${JSON.stringify(when)}`); - this.changedIDs[id] = when; - this.saveChangedIDs(this.onSavedChangedIDs); - }, - addChangedID: function (id, when) { if (!id) { this._log.warn("Attempted to add undefined ID to tracker"); @@ -150,12 +141,14 @@ Tracker.prototype = { // Default to the current time in seconds if no time is provided. if (when == null) { - when = this._now(); + when = Math.floor(Date.now() / 1000); } // Add/update the entry if we have a newer time. if ((this.changedIDs[id] || -Infinity) < when) { - this._saveChangedID(id, when); + this._log.trace("Adding changed ID: " + id + ", " + when); + this.changedIDs[id] = when; + this.saveChangedIDs(this.onSavedChangedIDs); } return true; @@ -183,10 +176,6 @@ Tracker.prototype = { this.saveChangedIDs(); }, - _now() { - return Date.now() / 1000; - }, - _isTracking: false, // Override these in your subclasses. @@ -314,18 +303,14 @@ Store.prototype = { for (let record of records) { try { this.applyIncoming(record); + } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) { + // This kind of exception should have a 'cause' attribute, which is an + // originating exception. + // ex.cause will carry its stack with it when rethrown. + throw ex.cause; } catch (ex) { - if (ex.code == Engine.prototype.eEngineAbortApplyIncoming) { - // This kind of exception should have a 'cause' attribute, which is an - // originating exception. - // ex.cause will carry its stack with it when rethrown. - throw ex.cause; - } - if (Async.isShutdownException(ex)) { - throw ex; - } - this._log.warn("Failed to apply incoming record " + record.id, ex); - this.engine._noteApplyFailure(); + this._log.warn("Failed to apply incoming record " + record.id); + this._log.warn("Encountered exception: ", ex); failed.push(record.id); } }; @@ -593,11 +578,16 @@ EngineManager.prototype = { this._engines[name] = engine; } } catch (ex) { + this._log.error("Engine init error: ", ex); + + let mesg = ex.message ? ex.message : ex; let name = engineObject || ""; name = name.prototype || ""; name = name.name || ""; - this._log.error(`Could not initialize engine ${name}`, ex); + let out = "Could not initialize engine '" + name + "': " + mesg; + this._log.error(out); + return engineObject; } }, @@ -643,17 +633,16 @@ Engine.prototype = { // Signal to the engine that processing further records is pointless. eEngineAbortApplyIncoming: "error.engine.abort.applyincoming", - // Should we keep syncing if we find a record that cannot be uploaded (ever)? - // If this is false, we'll throw, otherwise, we'll ignore the record and - // continue. This currently can only happen due to the record being larger - // than the record upload limit. - allowSkippedRecord: true, - get prefName() { return this.name; }, get enabled() { + // XXX: Disable non-functional add-ons syncing for the time being + // This check can go away when add-on syncing is addressed + if (this.prefName == "addons") + return false; + return Svc.Prefs.get("engine." + this.prefName, false); }, @@ -711,15 +700,6 @@ Engine.prototype = { wipeClient: function () { this._notify("wipe-client", this.name, this._wipeClient)(); - }, - - /** - * If one exists, initialize and return a validator for this engine (which - * must have a `validate(engine)` method that returns a promise to an object - * with a getSummary method). Otherwise return null. - */ - getValidator: function () { - return null; } }; @@ -813,11 +793,7 @@ SyncEngine.prototype = { return this._toFetch; }, set toFetch(val) { - let cb = (error) => { - if (error) { - this._log.error("Failed to read JSON records to fetch", error); - } - } + let cb = (error) => this._log.error("Failed to read JSON records to fetch: ", error); // Coerce the array to a string for more efficient comparison. if (val + "" == this._toFetch) { return; @@ -881,8 +857,9 @@ SyncEngine.prototype = { }, /* - * Returns a changeset for this sync. Engine implementations can override this - * method to bypass the tracker for certain or all changed items. + * Returns a mapping of IDs -> changed timestamp. Engine implementations + * can override this method to bypass the tracker for certain or all + * changed items. */ getChangedIDs: function () { return this._tracker.changedIDs; @@ -950,16 +927,20 @@ SyncEngine.prototype = { // this._modified to the tracker. this.lastSyncLocal = Date.now(); if (this.lastSync) { - this._modified = this.pullNewChanges(); + this._modified = this.getChangedIDs(); } else { + // Mark all items to be uploaded, but treat them as changed from long ago this._log.debug("First sync, uploading all items"); - this._modified = this.pullAllChanges(); + this._modified = {}; + for (let id in this._store.getAllIDs()) { + this._modified[id] = 0; + } } // Clear the tracker now. If the sync fails we'll add the ones we failed // to upload back. this._tracker.clearChangedIDs(); - this._log.info(this._modified.count() + + this._log.info(Object.keys(this._modified).length + " outgoing items pre-reconciliation"); // Keep track of what to delete at the end of sync @@ -970,7 +951,7 @@ SyncEngine.prototype = { * A tiny abstraction to make it easier to test incoming record * application. */ - itemSource: function () { + _itemSource: function () { return new Collection(this.engineURL, this._recordObj, this.service); }, @@ -987,7 +968,7 @@ SyncEngine.prototype = { let isMobile = (Svc.Prefs.get("client.type") == "mobile"); if (!newitems) { - newitems = this.itemSource(); + newitems = this._itemSource(); } if (this._defaultSort) { @@ -1024,12 +1005,9 @@ SyncEngine.prototype = { try { failed = failed.concat(this._store.applyIncomingBatch(applyBatch)); } catch (ex) { - if (Async.isShutdownException(ex)) { - throw ex; - } // Catch any error that escapes from applyIncomingBatch. At present // those will all be abort events. - this._log.warn("Got exception, aborting processIncoming", ex); + this._log.warn("Got exception, aborting processIncoming. ", ex); aborting = ex; } this._tracker.ignoreAll = false; @@ -1074,10 +1052,7 @@ SyncEngine.prototype = { try { try { item.decrypt(key); - } catch (ex) { - if (!Utils.isHMACMismatch(ex)) { - throw ex; - } + } catch (ex if Utils.isHMACMismatch(ex)) { let strategy = self.handleHMACMismatch(item, true); if (strategy == SyncEngine.kRecoveryStrategy.retry) { // You only get one retry. @@ -1087,10 +1062,7 @@ SyncEngine.prototype = { key = self.service.collectionKeys.keyForCollection(self.name); item.decrypt(key); strategy = null; - } catch (ex) { - if (!Utils.isHMACMismatch(ex)) { - throw ex; - } + } catch (ex if Utils.isHMACMismatch(ex)) { strategy = self.handleHMACMismatch(item, false); } } @@ -1103,8 +1075,7 @@ SyncEngine.prototype = { self._log.debug("Ignoring second retry suggestion."); // Fall through to error case. case SyncEngine.kRecoveryStrategy.error: - self._log.warn("Error decrypting record", ex); - self._noteApplyFailure(); + self._log.warn("Error decrypting record: ", ex); failed.push(item.id); return; case SyncEngine.kRecoveryStrategy.ignore: @@ -1114,11 +1085,7 @@ SyncEngine.prototype = { } } } catch (ex) { - if (Async.isShutdownException(ex)) { - throw ex; - } - self._log.warn("Error decrypting record", ex); - self._noteApplyFailure(); + self._log.warn("Error decrypting record: ", ex); failed.push(item.id); return; } @@ -1126,20 +1093,15 @@ SyncEngine.prototype = { let shouldApply; try { shouldApply = self._reconcile(item); + } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) { + self._log.warn("Reconciliation failed: aborting incoming processing."); + failed.push(item.id); + aborting = ex.cause; } catch (ex) { - if (ex.code == Engine.prototype.eEngineAbortApplyIncoming) { - self._log.warn("Reconciliation failed: aborting incoming processing."); - self._noteApplyFailure(); - failed.push(item.id); - aborting = ex.cause; - } else if (!Async.isShutdownException(ex)) { - self._log.warn("Failed to reconcile incoming record " + item.id, ex); - self._noteApplyFailure(); - failed.push(item.id); - return; - } else { - throw ex; - } + self._log.warn("Failed to reconcile incoming record " + item.id); + self._log.warn("Encountered exception: ", ex); + failed.push(item.id); + return; } if (shouldApply) { @@ -1158,7 +1120,7 @@ SyncEngine.prototype = { // Only bother getting data from the server if there's new things if (this.lastModified == null || this.lastModified > this.lastSync) { - let resp = newitems.getBatched(); + let resp = newitems.get(); doApplyBatchAndPersistFailed.call(this); if (!resp.success) { resp.failureCode = ENGINE_DOWNLOAD_FAIL; @@ -1243,13 +1205,7 @@ SyncEngine.prototype = { // Apply remaining items. doApplyBatchAndPersistFailed.call(this); - count.newFailed = this.previousFailed.reduce((count, engine) => { - if (failedInPreviousSync.indexOf(engine) == -1) { - count++; - this._noteApplyNewFailure(); - } - return count; - }, 0); + count.newFailed = Utils.arraySub(this.previousFailed, failedInPreviousSync).length; count.succeeded = Math.max(0, count.applied - count.failed); this._log.info(["Records:", count.applied, "applied,", @@ -1260,14 +1216,6 @@ SyncEngine.prototype = { Observers.notify("weave:engine:sync:applied", count, this.name); }, - _noteApplyFailure: function () { - // here would be a good place to record telemetry... - }, - - _noteApplyNewFailure: function () { - // here would be a good place to record telemetry... - }, - /** * Find a GUID of an item that is a duplicate of the incoming item but happens * to have a different GUID @@ -1278,16 +1226,6 @@ SyncEngine.prototype = { // By default, assume there's no dupe items for the engine }, - // Called when the server has a record marked as deleted, but locally we've - // changed it more recently than the deletion. If we return false, the - // record will be deleted locally. If we return true, we'll reupload the - // record to the server -- any extra work that's needed as part of this - // process should be done at this point (such as mark the record's parent - // for reuploading in the case of bookmarks). - _shouldReviveRemotelyDeletedRecord(remoteItem) { - return true; - }, - _deleteId: function (id) { this._tracker.removeChangedID(id); @@ -1298,18 +1236,6 @@ SyncEngine.prototype = { this._delete.ids.push(id); }, - _switchItemToDupe(localDupeGUID, incomingItem) { - // The local, duplicate ID is always deleted on the server. - this._deleteId(localDupeGUID); - - // We unconditionally change the item's ID in case the engine knows of - // an item but doesn't expose it through itemExists. If the API - // contract were stronger, this could be changed. - this._log.debug("Switching local ID to incoming: " + localDupeGUID + " -> " + - incomingItem.id); - this._store.changeItemID(localDupeGUID, incomingItem.id); - }, - /** * Reconcile incoming record with local state. * @@ -1329,12 +1255,12 @@ SyncEngine.prototype = { // because some state may change during the course of this function and we // need to operate on the original values. let existsLocally = this._store.itemExists(item.id); - let locallyModified = this._modified.has(item.id); + let locallyModified = item.id in this._modified; // TODO Handle clock drift better. Tracked in bug 721181. let remoteAge = AsyncResource.serverTime - item.modified; let localAge = locallyModified ? - (Date.now() / 1000 - this._modified.getModifiedTimestamp(item.id)) : null; + (Date.now() / 1000 - this._modified[item.id]) : null; let remoteIsNewer = remoteAge < localAge; this._log.trace("Reconciling " + item.id + ". exists=" + @@ -1363,18 +1289,15 @@ SyncEngine.prototype = { "exists and isn't modified."); return true; } - this._log.trace("Incoming record is deleted but we had local changes."); - - if (remoteIsNewer) { - this._log.trace("Remote record is newer -- deleting local record."); - return true; - } - // If the local record is newer, we defer to individual engines for - // how to handle this. By default, we revive the record. - let willRevive = this._shouldReviveRemotelyDeletedRecord(item); - this._log.trace("Local record is newer -- reviving? " + willRevive); - return !willRevive; + // TODO As part of bug 720592, determine whether we should do more here. + // In the case where the local changes are newer, it is quite possible + // that the local client will restore data a remote client had tried to + // delete. There might be a good reason for that delete and it might be + // enexpected for this client to restore that data. + this._log.trace("Incoming record is deleted but we had local changes. " + + "Applying the youngest record."); + return remoteIsNewer; } // At this point the incoming record is not for a deletion and must have @@ -1386,32 +1309,40 @@ SyncEngine.prototype = { // refresh the metadata collected above. See bug 710448 for the history // of this logic. if (!existsLocally) { - let localDupeGUID = this._findDupe(item); - if (localDupeGUID) { - this._log.trace("Local item " + localDupeGUID + " is a duplicate for " + + let dupeID = this._findDupe(item); + if (dupeID) { + this._log.trace("Local item " + dupeID + " is a duplicate for " + "incoming item " + item.id); + // The local, duplicate ID is always deleted on the server. + this._deleteId(dupeID); + // The current API contract does not mandate that the ID returned by // _findDupe() actually exists. Therefore, we have to perform this // check. - existsLocally = this._store.itemExists(localDupeGUID); + existsLocally = this._store.itemExists(dupeID); + + // We unconditionally change the item's ID in case the engine knows of + // an item but doesn't expose it through itemExists. If the API + // contract were stronger, this could be changed. + this._log.debug("Switching local ID to incoming: " + dupeID + " -> " + + item.id); + this._store.changeItemID(dupeID, item.id); // If the local item was modified, we carry its metadata forward so // appropriate reconciling can be performed. - if (this._modified.has(localDupeGUID)) { + if (dupeID in this._modified) { locallyModified = true; - localAge = this._tracker._now() - this._modified.getModifiedTimestamp(localDupeGUID); + localAge = Date.now() / 1000 - this._modified[dupeID]; remoteIsNewer = remoteAge < localAge; - this._modified.swap(localDupeGUID, item.id); + this._modified[item.id] = this._modified[dupeID]; + delete this._modified[dupeID]; } else { locallyModified = false; localAge = null; } - // Tell the engine to do whatever it needs to switch the items. - this._switchItemToDupe(localDupeGUID, item); - this._log.debug("Local item after duplication: age=" + localAge + "; modified=" + locallyModified + "; exists=" + existsLocally); @@ -1440,7 +1371,7 @@ SyncEngine.prototype = { if (remoteIsNewer) { this._log.trace("Applying incoming because local item was deleted " + "before the incoming item was changed."); - this._modified.delete(item.id); + delete this._modified[item.id]; return true; } @@ -1466,7 +1397,7 @@ SyncEngine.prototype = { this._log.trace("Ignoring incoming item because the local item is " + "identical."); - this._modified.delete(item.id); + delete this._modified[item.id]; return false; } @@ -1491,97 +1422,69 @@ SyncEngine.prototype = { _uploadOutgoing: function () { this._log.trace("Uploading local changes to server."); - let modifiedIDs = this._modified.ids(); + let modifiedIDs = Object.keys(this._modified); if (modifiedIDs.length) { this._log.trace("Preparing " + modifiedIDs.length + " outgoing records"); - let counts = { sent: modifiedIDs.length, failed: 0 }; - // collection we'll upload let up = new Collection(this.engineURL, null, this.service); + let count = 0; - let failed = []; - let successful = []; - let handleResponse = (resp, batchOngoing = false) => { - // Note: We don't want to update this.lastSync, or this._modified until - // the batch is complete, however we want to remember success/failure - // indicators for when that happens. + // Upload what we've got so far in the collection + let doUpload = Utils.bind2(this, function(desc) { + this._log.info("Uploading " + desc + " of " + modifiedIDs.length + + " records"); + let resp = up.post(); if (!resp.success) { this._log.debug("Uploading records failed: " + resp); - resp.failureCode = resp.status == 412 ? ENGINE_BATCH_INTERRUPTED : ENGINE_UPLOAD_FAIL; + resp.failureCode = ENGINE_UPLOAD_FAIL; throw resp; } // Update server timestamp from the upload. - failed = failed.concat(Object.keys(resp.obj.failed)); - successful = successful.concat(resp.obj.success); - - if (batchOngoing) { - // Nothing to do yet - return; - } - // Advance lastSync since we've finished the batch. let modified = resp.headers["x-weave-timestamp"]; - if (modified > this.lastSync) { + if (modified > this.lastSync) this.lastSync = modified; - } - if (failed.length && this._log.level <= Log.Level.Debug) { + + let failed_ids = Object.keys(resp.obj.failed); + if (failed_ids.length) this._log.debug("Records that will be uploaded again because " + "the server couldn't store them: " - + failed.join(", ")); - } - - counts.failed += failed.length; + + failed_ids.join(", ")); - for (let id of successful) { - this._modified.delete(id); + // Clear successfully uploaded objects. + for (let id of resp.obj.success) { + delete this._modified[id]; } - this._onRecordsWritten(successful, failed); - - // clear for next batch - failed.length = 0; - successful.length = 0; - }; - - let postQueue = up.newPostQueue(this._log, this.lastSync, handleResponse); + up.clearRecords(); + }); for (let id of modifiedIDs) { - let out; - let ok = false; try { - out = this._createRecord(id); + let out = this._createRecord(id); if (this._log.level <= Log.Level.Trace) this._log.trace("Outgoing: " + out); out.encrypt(this.service.collectionKeys.keyForCollection(this.name)); - ok = true; - } catch (ex) { - if (Async.isShutdownException(ex)) { - throw ex; - } - this._log.warn("Error creating record", ex); + up.pushData(out); } - if (ok) { - let { enqueued, error } = postQueue.enqueue(out); - if (!enqueued) { - ++counts.failed; - if (!this.allowSkippedRecord) { - throw error; - } - } + catch(ex) { + this._log.warn("Error creating record: ", ex); } + + // Partial upload + if ((++count % MAX_UPLOAD_RECORDS) == 0) + doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out"); + this._store._sleep(0); } - postQueue.flush(true); - Observers.notify("weave:engine:sync:uploaded", counts, this.name); - } - }, - _onRecordsWritten(succeeded, failed) { - // Implement this method to take specific actions against successfully - // uploaded records and failed records. + // Final upload + if (count % MAX_UPLOAD_RECORDS > 0) + doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all"); + } }, // Any cleanup necessary. @@ -1619,8 +1522,10 @@ SyncEngine.prototype = { } // Mark failed WBOs as changed again so they are reuploaded next time. - this.trackRemainingChanges(); - this._modified.clear(); + for (let [id, when] in Iterator(this._modified)) { + this._tracker.addChangedID(id, when); + } + this._modified = {}; }, _sync: function () { @@ -1656,11 +1561,9 @@ SyncEngine.prototype = { try { this._log.trace("Trying to decrypt a record from the server.."); test.get(); - } catch (ex) { - if (Async.isShutdownException(ex)) { - throw ex; - } - this._log.debug("Failed test decrypt", ex); + } + catch(ex) { + this._log.debug("Failed test decrypt: ", ex); } return canDecrypt; @@ -1706,108 +1609,5 @@ SyncEngine.prototype = { return (this.service.handleHMACEvent() && mayRetry) ? SyncEngine.kRecoveryStrategy.retry : SyncEngine.kRecoveryStrategy.error; - }, - - /** - * Returns a changeset containing all items in the store. The default - * implementation returns a changeset with timestamps from long ago, to - * ensure we always use the remote version if one exists. - * - * This function is only called for the first sync. Subsequent syncs call - * `pullNewChanges`. - * - * @return A `Changeset` object. - */ - pullAllChanges() { - let changeset = new Changeset(); - for (let id in this._store.getAllIDs()) { - changeset.set(id, 0); - } - return changeset; - }, - - /* - * Returns a changeset containing entries for all currently tracked items. - * The default implementation returns a changeset with timestamps indicating - * when the item was added to the tracker. - * - * @return A `Changeset` object. - */ - pullNewChanges() { - return new Changeset(this.getChangedIDs()); - }, - - /** - * Adds all remaining changeset entries back to the tracker, typically for - * items that failed to upload. This method is called at the end of each sync. - * - */ - trackRemainingChanges() { - for (let [id, change] of this._modified.entries()) { - this._tracker.addChangedID(id, change); - } - }, -}; - -/** - * A changeset is created for each sync in `Engine::get{Changed, All}IDs`, - * and stores opaque change data for tracked IDs. The default implementation - * only records timestamps, though engines can extend this to store additional - * data for each entry. - */ -class Changeset { - // Creates a changeset with an initial set of tracked entries. - constructor(changes = {}) { - this.changes = changes; - } - - // Returns the last modified time, in seconds, for an entry in the changeset. - // `id` is guaranteed to be in the set. - getModifiedTimestamp(id) { - return this.changes[id]; - } - - // Adds a change for a tracked ID to the changeset. - set(id, change) { - this.changes[id] = change; - } - - // Indicates whether an entry is in the changeset. - has(id) { - return id in this.changes; } - - // Deletes an entry from the changeset. Used to clean up entries for - // reconciled and successfully uploaded records. - delete(id) { - delete this.changes[id]; - } - - // Swaps two entries in the changeset. Used when reconciling duplicates that - // have local changes. - swap(oldID, newID) { - this.changes[newID] = this.changes[oldID]; - delete this.changes[oldID]; - } - - // Returns an array of all tracked IDs in this changeset. - ids() { - return Object.keys(this.changes); - } - - // Returns an array of `[id, change]` tuples. Used to repopulate the tracker - // with entries for failed uploads at the end of a sync. - entries() { - return Object.entries(this.changes); - } - - // Returns the number of entries in this changeset. - count() { - return this.ids().length; - } - - // Clears the changeset. - clear() { - this.changes = {}; - } -} +}; |