diff options
Diffstat (limited to 'services/sync/modules/stages')
-rw-r--r-- | services/sync/modules/stages/cluster.js | 113 | ||||
-rw-r--r-- | services/sync/modules/stages/declined.js | 76 | ||||
-rw-r--r-- | services/sync/modules/stages/enginesync.js | 449 |
3 files changed, 638 insertions, 0 deletions
diff --git a/services/sync/modules/stages/cluster.js b/services/sync/modules/stages/cluster.js new file mode 100644 index 000000000..7665ce825 --- /dev/null +++ b/services/sync/modules/stages/cluster.js @@ -0,0 +1,113 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +this.EXPORTED_SYMBOLS = ["ClusterManager"]; + +var {utils: Cu} = Components; + +Cu.import("resource://gre/modules/Log.jsm"); +Cu.import("resource://services-sync/constants.js"); +Cu.import("resource://services-sync/policies.js"); +Cu.import("resource://services-sync/util.js"); + +/** + * Contains code for managing the Sync cluster we are in. + */ +this.ClusterManager = function ClusterManager(service) { + this._log = Log.repository.getLogger("Sync.Service"); + this._log.level = Log.Level[Svc.Prefs.get("log.logger.service.main")]; + + this.service = service; +} +ClusterManager.prototype = { + get identity() { + return this.service.identity; + }, + + /** + * Obtain the cluster for the current user. + * + * Returns the string URL of the cluster or null on error. + */ + _findCluster: function _findCluster() { + this._log.debug("Finding cluster for user " + this.identity.username); + + // This should ideally use UserAPI10Client but the legacy hackiness is + // strong with this code. + let fail; + let url = this.service.userAPIURI + this.identity.username + "/node/weave"; + let res = this.service.resource(url); + try { + let node = res.get(); + switch (node.status) { + case 400: + this.service.status.login = LOGIN_FAILED_LOGIN_REJECTED; + fail = "Find cluster denied: " + this.service.errorHandler.errorStr(node); + break; + case 404: + this._log.debug("Using serverURL as data cluster (multi-cluster support disabled)"); + return this.service.serverURL; + case 0: + case 200: + if (node == "null") { + node = null; + } + this._log.trace("_findCluster successfully returning " + node); + return node; + default: + this.service.errorHandler.checkServerError(node); + fail = "Unexpected response code: " + node.status; + break; + } + } catch (e) { + this._log.debug("Network error on findCluster"); + this.service.status.login = LOGIN_FAILED_NETWORK_ERROR; + this.service.errorHandler.checkServerError(e); + fail = e; + } + throw fail; + }, + + /** + * Determine the cluster for the current user and update state. + */ + setCluster: function setCluster() { + // Make sure we didn't get some unexpected response for the cluster. + let cluster = this._findCluster(); + this._log.debug("Cluster value = " + cluster); + if (cluster == null) { + return false; + } + + // Convert from the funky "String object with additional properties" that + // resource.js returns to a plain-old string. + cluster = cluster.toString(); + // Don't update stuff if we already have the right cluster + if (cluster == this.service.clusterURL) { + return false; + } + + this._log.debug("Setting cluster to " + cluster); + this.service.clusterURL = cluster; + + return true; + }, + + getUserBaseURL: function getUserBaseURL() { + // Legacy Sync and FxA Sync construct the userBaseURL differently. Legacy + // Sync appends path components onto an empty path, and in FxA Sync, the + // token server constructs this for us in an opaque manner. Since the + // cluster manager already sets the clusterURL on Service and also has + // access to the current identity, we added this functionality here. + + // If the clusterURL hasn't been set, the userBaseURL shouldn't be set + // either. Some tests expect "undefined" to be returned here. + if (!this.service.clusterURL) { + return undefined; + } + let storageAPI = this.service.clusterURL + SYNC_API_VERSION + "/"; + return storageAPI + this.identity.username + "/"; + } +}; +Object.freeze(ClusterManager.prototype); diff --git a/services/sync/modules/stages/declined.js b/services/sync/modules/stages/declined.js new file mode 100644 index 000000000..ff8a14181 --- /dev/null +++ b/services/sync/modules/stages/declined.js @@ -0,0 +1,76 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * This file contains code for maintaining the set of declined engines, + * in conjunction with EngineManager. + */ + +"use strict"; + +this.EXPORTED_SYMBOLS = ["DeclinedEngines"]; + +var {utils: Cu} = Components; + +Cu.import("resource://services-sync/constants.js"); +Cu.import("resource://gre/modules/Log.jsm"); +Cu.import("resource://services-common/utils.js"); +Cu.import("resource://services-common/observers.js"); +Cu.import("resource://gre/modules/Preferences.jsm"); + + + +this.DeclinedEngines = function (service) { + this._log = Log.repository.getLogger("Sync.Declined"); + this._log.level = Log.Level[new Preferences(PREFS_BRANCH).get("log.logger.declined")]; + + this.service = service; +} +this.DeclinedEngines.prototype = { + updateDeclined: function (meta, engineManager=this.service.engineManager) { + let enabled = new Set(engineManager.getEnabled().map(e => e.name)); + let known = new Set(engineManager.getAll().map(e => e.name)); + let remoteDeclined = new Set(meta.payload.declined || []); + let localDeclined = new Set(engineManager.getDeclined()); + + this._log.debug("Handling remote declined: " + JSON.stringify([...remoteDeclined])); + this._log.debug("Handling local declined: " + JSON.stringify([...localDeclined])); + + // Any engines that are locally enabled should be removed from the remote + // declined list. + // + // Any engines that are locally declined should be added to the remote + // declined list. + let newDeclined = CommonUtils.union(localDeclined, CommonUtils.difference(remoteDeclined, enabled)); + + // If our declined set has changed, put it into the meta object and mark + // it as changed. + let declinedChanged = !CommonUtils.setEqual(newDeclined, remoteDeclined); + this._log.debug("Declined changed? " + declinedChanged); + if (declinedChanged) { + meta.changed = true; + meta.payload.declined = [...newDeclined]; + } + + // Update the engine manager regardless. + engineManager.setDeclined(newDeclined); + + // Any engines that are locally known, locally disabled, and not remotely + // or locally declined, are candidates for enablement. + let undecided = CommonUtils.difference(CommonUtils.difference(known, enabled), newDeclined); + if (undecided.size) { + let subject = { + declined: newDeclined, + enabled: enabled, + known: known, + undecided: undecided, + }; + CommonUtils.nextTick(() => { + Observers.notify("weave:engines:notdeclined", subject); + }); + } + + return declinedChanged; + }, +}; diff --git a/services/sync/modules/stages/enginesync.js b/services/sync/modules/stages/enginesync.js new file mode 100644 index 000000000..a00a2f48b --- /dev/null +++ b/services/sync/modules/stages/enginesync.js @@ -0,0 +1,449 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * This file contains code for synchronizing engines. + */ + +this.EXPORTED_SYMBOLS = ["EngineSynchronizer"]; + +var {utils: Cu} = Components; + +Cu.import("resource://gre/modules/Log.jsm"); +Cu.import("resource://services-sync/constants.js"); +Cu.import("resource://services-sync/engines.js"); +Cu.import("resource://services-sync/policies.js"); +Cu.import("resource://services-sync/util.js"); +Cu.import("resource://services-common/observers.js"); +Cu.import("resource://services-common/async.js"); +Cu.import("resource://gre/modules/Task.jsm"); + +/** + * Perform synchronization of engines. + * + * This was originally split out of service.js. The API needs lots of love. + */ +this.EngineSynchronizer = function EngineSynchronizer(service) { + this._log = Log.repository.getLogger("Sync.Synchronizer"); + this._log.level = Log.Level[Svc.Prefs.get("log.logger.synchronizer")]; + + this.service = service; + + this.onComplete = null; +} + +EngineSynchronizer.prototype = { + sync: function sync(engineNamesToSync) { + if (!this.onComplete) { + throw new Error("onComplete handler not installed."); + } + + let startTime = Date.now(); + + this.service.status.resetSync(); + + // Make sure we should sync or record why we shouldn't. + let reason = this.service._checkSync(); + if (reason) { + if (reason == kSyncNetworkOffline) { + this.service.status.sync = LOGIN_FAILED_NETWORK_ERROR; + } + + // this is a purposeful abort rather than a failure, so don't set + // any status bits + reason = "Can't sync: " + reason; + this.onComplete(new Error("Can't sync: " + reason)); + return; + } + + // If we don't have a node, get one. If that fails, retry in 10 minutes. + if (!this.service.clusterURL && !this.service._clusterManager.setCluster()) { + this.service.status.sync = NO_SYNC_NODE_FOUND; + this._log.info("No cluster URL found. Cannot sync."); + this.onComplete(null); + return; + } + + // Ping the server with a special info request once a day. + let infoURL = this.service.infoURL; + let now = Math.floor(Date.now() / 1000); + let lastPing = Svc.Prefs.get("lastPing", 0); + if (now - lastPing > 86400) { // 60 * 60 * 24 + infoURL += "?v=" + WEAVE_VERSION; + Svc.Prefs.set("lastPing", now); + } + + let engineManager = this.service.engineManager; + + // Figure out what the last modified time is for each collection + let info = this.service._fetchInfo(infoURL); + + // Convert the response to an object and read out the modified times + for (let engine of [this.service.clientsEngine].concat(engineManager.getAll())) { + engine.lastModified = info.obj[engine.name] || 0; + } + + if (!(this.service._remoteSetup(info))) { + this.onComplete(new Error("Aborting sync, remote setup failed")); + return; + } + + // Make sure we have an up-to-date list of clients before sending commands + this._log.debug("Refreshing client list."); + if (!this._syncEngine(this.service.clientsEngine)) { + // Clients is an engine like any other; it can fail with a 401, + // and we can elect to abort the sync. + this._log.warn("Client engine sync failed. Aborting."); + this.onComplete(null); + return; + } + + // We only honor the "hint" of what engines to Sync if this isn't + // a first sync. + let allowEnginesHint = false; + // Wipe data in the desired direction if necessary + switch (Svc.Prefs.get("firstSync")) { + case "resetClient": + this.service.resetClient(engineManager.enabledEngineNames); + break; + case "wipeClient": + this.service.wipeClient(engineManager.enabledEngineNames); + break; + case "wipeRemote": + this.service.wipeRemote(engineManager.enabledEngineNames); + break; + default: + allowEnginesHint = true; + break; + } + + if (this.service.clientsEngine.localCommands) { + try { + if (!(this.service.clientsEngine.processIncomingCommands())) { + this.service.status.sync = ABORT_SYNC_COMMAND; + this.onComplete(new Error("Processed command aborted sync.")); + return; + } + + // Repeat remoteSetup in-case the commands forced us to reset + if (!(this.service._remoteSetup(info))) { + this.onComplete(new Error("Remote setup failed after processing commands.")); + return; + } + } + finally { + // Always immediately attempt to push back the local client (now + // without commands). + // Note that we don't abort here; if there's a 401 because we've + // been reassigned, we'll handle it around another engine. + this._syncEngine(this.service.clientsEngine); + } + } + + // Update engines because it might change what we sync. + try { + this._updateEnabledEngines(); + } catch (ex) { + this._log.debug("Updating enabled engines failed", ex); + this.service.errorHandler.checkServerError(ex); + this.onComplete(ex); + return; + } + + // If the engines to sync has been specified, we sync in the order specified. + let enginesToSync; + if (allowEnginesHint && engineNamesToSync) { + this._log.info("Syncing specified engines", engineNamesToSync); + enginesToSync = engineManager.get(engineNamesToSync).filter(e => e.enabled); + } else { + this._log.info("Syncing all enabled engines."); + enginesToSync = engineManager.getEnabled(); + } + try { + // We don't bother validating engines that failed to sync. + let enginesToValidate = []; + for (let engine of enginesToSync) { + // If there's any problems with syncing the engine, report the failure + if (!(this._syncEngine(engine)) || this.service.status.enforceBackoff) { + this._log.info("Aborting sync for failure in " + engine.name); + break; + } + enginesToValidate.push(engine); + } + + // If _syncEngine fails for a 401, we might not have a cluster URL here. + // If that's the case, break out of this immediately, rather than + // throwing an exception when trying to fetch metaURL. + if (!this.service.clusterURL) { + this._log.debug("Aborting sync, no cluster URL: " + + "not uploading new meta/global."); + this.onComplete(null); + return; + } + + // Upload meta/global if any engines changed anything. + let meta = this.service.recordManager.get(this.service.metaURL); + if (meta.isNew || meta.changed) { + this._log.info("meta/global changed locally: reuploading."); + try { + this.service.uploadMetaGlobal(meta); + delete meta.isNew; + delete meta.changed; + } catch (error) { + this._log.error("Unable to upload meta/global. Leaving marked as new."); + } + } + + Async.promiseSpinningly(this._tryValidateEngines(enginesToValidate)); + + // If there were no sync engine failures + if (this.service.status.service != SYNC_FAILED_PARTIAL) { + Svc.Prefs.set("lastSync", new Date().toString()); + this.service.status.sync = SYNC_SUCCEEDED; + } + } finally { + Svc.Prefs.reset("firstSync"); + + let syncTime = ((Date.now() - startTime) / 1000).toFixed(2); + let dateStr = Utils.formatTimestamp(new Date()); + this._log.info("Sync completed at " + dateStr + + " after " + syncTime + " secs."); + } + + this.onComplete(null); + }, + + _tryValidateEngines: Task.async(function* (recentlySyncedEngines) { + if (!Services.telemetry.canRecordBase || !Svc.Prefs.get("validation.enabled", false)) { + this._log.info("Skipping validation: validation or telemetry reporting is disabled"); + return; + } + + let lastValidation = Svc.Prefs.get("validation.lastTime", 0); + let validationInterval = Svc.Prefs.get("validation.interval"); + let nowSeconds = Math.floor(Date.now() / 1000); + + if (nowSeconds - lastValidation < validationInterval) { + this._log.info("Skipping validation: too recent since last validation attempt"); + return; + } + // Update the time now, even if we may return false still. We don't want to + // check the rest of these more frequently than once a day. + Svc.Prefs.set("validation.lastTime", nowSeconds); + + // Validation only occurs a certain percentage of the time. + let validationProbability = Svc.Prefs.get("validation.percentageChance", 0) / 100.0; + if (validationProbability < Math.random()) { + this._log.info("Skipping validation: Probability threshold not met"); + return; + } + let maxRecords = Svc.Prefs.get("validation.maxRecords"); + if (!maxRecords) { + // Don't bother asking the server for the counts if we know validation + // won't happen anyway. + return; + } + + // maxRecords of -1 means "any number", so we can skip asking the server. + // Used for tests. + let info; + if (maxRecords < 0) { + info = {}; + for (let e of recentlySyncedEngines) { + info[e.name] = 1; // needs to be < maxRecords + } + maxRecords = 2; + } else { + + let collectionCountsURL = this.service.userBaseURL + "info/collection_counts"; + try { + let infoResp = this.service._fetchInfo(collectionCountsURL); + if (!infoResp.success) { + this._log.error("Can't run validation: request to info/collection_counts responded with " + + resp.status); + return; + } + info = infoResp.obj; // might throw because obj is a getter which parses json. + } catch (e) { + // Not running validation is totally fine, so we just write an error log and return. + this._log.error("Can't run validation: Caught error when fetching counts", e); + return; + } + } + + if (!info) { + return; + } + + let engineLookup = new Map(recentlySyncedEngines.map(e => [e.name, e])); + let toRun = []; + for (let [engineName, recordCount] of Object.entries(info)) { + let engine = engineLookup.get(engineName); + if (recordCount > maxRecords || !engine) { + this._log.debug(`Skipping validation for ${engineName} because it's not an engine or ` + + `the number of records (${recordCount}) is greater than the maximum allowed (${maxRecords}).`); + continue; + } + let validator = engine.getValidator(); + if (!validator) { + continue; + } + // Put this in an array so that we know how many we're going to do, so we + // don't tell users we're going to run some validators when we aren't. + toRun.push({ engine, validator }); + } + + if (!toRun.length) { + return; + } + Services.console.logStringMessage( + "Sync is about to run a consistency check. This may be slow, and " + + "can be controlled using the pref \"services.sync.validation.enabled\".\n" + + "If you encounter any problems because of this, please file a bug."); + for (let { validator, engine } of toRun) { + try { + let result = yield validator.validate(engine); + Observers.notify("weave:engine:validate:finish", result, engine.name); + } catch (e) { + this._log.error(`Failed to run validation on ${engine.name}!`, e); + Observers.notify("weave:engine:validate:error", e, engine.name) + // Keep validating -- there's no reason to think that a failure for one + // validator would mean the others will fail. + } + } + }), + + // Returns true if sync should proceed. + // false / no return value means sync should be aborted. + _syncEngine: function _syncEngine(engine) { + try { + engine.sync(); + } + catch(e) { + if (e.status == 401) { + // Maybe a 401, cluster update perhaps needed? + // We rely on ErrorHandler observing the sync failure notification to + // schedule another sync and clear node assignment values. + // Here we simply want to muffle the exception and return an + // appropriate value. + return false; + } + } + + return true; + }, + + _updateEnabledFromMeta: function (meta, numClients, engineManager=this.service.engineManager) { + this._log.info("Updating enabled engines: " + + numClients + " clients."); + + if (meta.isNew || !meta.payload.engines) { + this._log.debug("meta/global isn't new, or is missing engines. Not updating enabled state."); + return; + } + + // If we're the only client, and no engines are marked as enabled, + // thumb our noses at the server data: it can't be right. + // Belt-and-suspenders approach to Bug 615926. + let hasEnabledEngines = false; + for (let e in meta.payload.engines) { + if (e != "clients") { + hasEnabledEngines = true; + break; + } + } + + if ((numClients <= 1) && !hasEnabledEngines) { + this._log.info("One client and no enabled engines: not touching local engine status."); + return; + } + + this.service._ignorePrefObserver = true; + + let enabled = engineManager.enabledEngineNames; + + let toDecline = new Set(); + let toUndecline = new Set(); + + for (let engineName in meta.payload.engines) { + if (engineName == "clients") { + // Clients is special. + continue; + } + let index = enabled.indexOf(engineName); + if (index != -1) { + // The engine is enabled locally. Nothing to do. + enabled.splice(index, 1); + continue; + } + let engine = engineManager.get(engineName); + if (!engine) { + // The engine doesn't exist locally. Nothing to do. + continue; + } + + let attemptedEnable = false; + // If the engine was enabled remotely, enable it locally. + if (!Svc.Prefs.get("engineStatusChanged." + engine.prefName, false)) { + this._log.trace("Engine " + engineName + " was enabled. Marking as non-declined."); + toUndecline.add(engineName); + this._log.trace(engineName + " engine was enabled remotely."); + engine.enabled = true; + // Note that setting engine.enabled to true might not have worked for + // the password engine if a master-password is enabled. However, it's + // still OK that we added it to undeclined - the user *tried* to enable + // it remotely - so it still winds up as not being flagged as declined + // even though it's disabled remotely. + attemptedEnable = true; + } + + // If either the engine was disabled locally or enabling the engine + // failed (see above re master-password) then wipe server data and + // disable it everywhere. + if (!engine.enabled) { + this._log.trace("Wiping data for " + engineName + " engine."); + engine.wipeServer(); + delete meta.payload.engines[engineName]; + meta.changed = true; // the new enabled state must propagate + // We also here mark the engine as declined, because the pref + // was explicitly changed to false - unless we tried, and failed, + // to enable it - in which case we leave the declined state alone. + if (!attemptedEnable) { + // This will be reflected in meta/global in the next stage. + this._log.trace("Engine " + engineName + " was disabled locally. Marking as declined."); + toDecline.add(engineName); + } + } + } + + // Any remaining engines were either enabled locally or disabled remotely. + for (let engineName of enabled) { + let engine = engineManager.get(engineName); + if (Svc.Prefs.get("engineStatusChanged." + engine.prefName, false)) { + this._log.trace("The " + engineName + " engine was enabled locally."); + toUndecline.add(engineName); + } else { + this._log.trace("The " + engineName + " engine was disabled remotely."); + + // Don't automatically mark it as declined! + engine.enabled = false; + } + } + + engineManager.decline(toDecline); + engineManager.undecline(toUndecline); + + Svc.Prefs.resetBranch("engineStatusChanged."); + this.service._ignorePrefObserver = false; + }, + + _updateEnabledEngines: function () { + let meta = this.service.recordManager.get(this.service.metaURL); + let numClients = this.service.scheduler.numClients; + let engineManager = this.service.engineManager; + + this._updateEnabledFromMeta(meta, numClients, engineManager); + }, +}; +Object.freeze(EngineSynchronizer.prototype); |