summaryrefslogtreecommitdiffstats
path: root/services/sync/modules/stages
diff options
context:
space:
mode:
Diffstat (limited to 'services/sync/modules/stages')
-rw-r--r--services/sync/modules/stages/cluster.js113
-rw-r--r--services/sync/modules/stages/declined.js76
-rw-r--r--services/sync/modules/stages/enginesync.js449
3 files changed, 638 insertions, 0 deletions
diff --git a/services/sync/modules/stages/cluster.js b/services/sync/modules/stages/cluster.js
new file mode 100644
index 000000000..7665ce825
--- /dev/null
+++ b/services/sync/modules/stages/cluster.js
@@ -0,0 +1,113 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+this.EXPORTED_SYMBOLS = ["ClusterManager"];
+
+var {utils: Cu} = Components;
+
+Cu.import("resource://gre/modules/Log.jsm");
+Cu.import("resource://services-sync/constants.js");
+Cu.import("resource://services-sync/policies.js");
+Cu.import("resource://services-sync/util.js");
+
+/**
+ * Contains code for managing the Sync cluster we are in.
+ */
+this.ClusterManager = function ClusterManager(service) {
+ this._log = Log.repository.getLogger("Sync.Service");
+ this._log.level = Log.Level[Svc.Prefs.get("log.logger.service.main")];
+
+ this.service = service;
+}
+ClusterManager.prototype = {
+ get identity() {
+ return this.service.identity;
+ },
+
+ /**
+ * Obtain the cluster for the current user.
+ *
+ * Returns the string URL of the cluster or null on error.
+ */
+ _findCluster: function _findCluster() {
+ this._log.debug("Finding cluster for user " + this.identity.username);
+
+ // This should ideally use UserAPI10Client but the legacy hackiness is
+ // strong with this code.
+ let fail;
+ let url = this.service.userAPIURI + this.identity.username + "/node/weave";
+ let res = this.service.resource(url);
+ try {
+ let node = res.get();
+ switch (node.status) {
+ case 400:
+ this.service.status.login = LOGIN_FAILED_LOGIN_REJECTED;
+ fail = "Find cluster denied: " + this.service.errorHandler.errorStr(node);
+ break;
+ case 404:
+ this._log.debug("Using serverURL as data cluster (multi-cluster support disabled)");
+ return this.service.serverURL;
+ case 0:
+ case 200:
+ if (node == "null") {
+ node = null;
+ }
+ this._log.trace("_findCluster successfully returning " + node);
+ return node;
+ default:
+ this.service.errorHandler.checkServerError(node);
+ fail = "Unexpected response code: " + node.status;
+ break;
+ }
+ } catch (e) {
+ this._log.debug("Network error on findCluster");
+ this.service.status.login = LOGIN_FAILED_NETWORK_ERROR;
+ this.service.errorHandler.checkServerError(e);
+ fail = e;
+ }
+ throw fail;
+ },
+
+ /**
+ * Determine the cluster for the current user and update state.
+ */
+ setCluster: function setCluster() {
+ // Make sure we didn't get some unexpected response for the cluster.
+ let cluster = this._findCluster();
+ this._log.debug("Cluster value = " + cluster);
+ if (cluster == null) {
+ return false;
+ }
+
+ // Convert from the funky "String object with additional properties" that
+ // resource.js returns to a plain-old string.
+ cluster = cluster.toString();
+ // Don't update stuff if we already have the right cluster
+ if (cluster == this.service.clusterURL) {
+ return false;
+ }
+
+ this._log.debug("Setting cluster to " + cluster);
+ this.service.clusterURL = cluster;
+
+ return true;
+ },
+
+ getUserBaseURL: function getUserBaseURL() {
+ // Legacy Sync and FxA Sync construct the userBaseURL differently. Legacy
+ // Sync appends path components onto an empty path, and in FxA Sync, the
+ // token server constructs this for us in an opaque manner. Since the
+ // cluster manager already sets the clusterURL on Service and also has
+ // access to the current identity, we added this functionality here.
+
+ // If the clusterURL hasn't been set, the userBaseURL shouldn't be set
+ // either. Some tests expect "undefined" to be returned here.
+ if (!this.service.clusterURL) {
+ return undefined;
+ }
+ let storageAPI = this.service.clusterURL + SYNC_API_VERSION + "/";
+ return storageAPI + this.identity.username + "/";
+ }
+};
+Object.freeze(ClusterManager.prototype);
diff --git a/services/sync/modules/stages/declined.js b/services/sync/modules/stages/declined.js
new file mode 100644
index 000000000..ff8a14181
--- /dev/null
+++ b/services/sync/modules/stages/declined.js
@@ -0,0 +1,76 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * This file contains code for maintaining the set of declined engines,
+ * in conjunction with EngineManager.
+ */
+
+"use strict";
+
+this.EXPORTED_SYMBOLS = ["DeclinedEngines"];
+
+var {utils: Cu} = Components;
+
+Cu.import("resource://services-sync/constants.js");
+Cu.import("resource://gre/modules/Log.jsm");
+Cu.import("resource://services-common/utils.js");
+Cu.import("resource://services-common/observers.js");
+Cu.import("resource://gre/modules/Preferences.jsm");
+
+
+
+this.DeclinedEngines = function (service) {
+ this._log = Log.repository.getLogger("Sync.Declined");
+ this._log.level = Log.Level[new Preferences(PREFS_BRANCH).get("log.logger.declined")];
+
+ this.service = service;
+}
+this.DeclinedEngines.prototype = {
+ updateDeclined: function (meta, engineManager=this.service.engineManager) {
+ let enabled = new Set(engineManager.getEnabled().map(e => e.name));
+ let known = new Set(engineManager.getAll().map(e => e.name));
+ let remoteDeclined = new Set(meta.payload.declined || []);
+ let localDeclined = new Set(engineManager.getDeclined());
+
+ this._log.debug("Handling remote declined: " + JSON.stringify([...remoteDeclined]));
+ this._log.debug("Handling local declined: " + JSON.stringify([...localDeclined]));
+
+ // Any engines that are locally enabled should be removed from the remote
+ // declined list.
+ //
+ // Any engines that are locally declined should be added to the remote
+ // declined list.
+ let newDeclined = CommonUtils.union(localDeclined, CommonUtils.difference(remoteDeclined, enabled));
+
+ // If our declined set has changed, put it into the meta object and mark
+ // it as changed.
+ let declinedChanged = !CommonUtils.setEqual(newDeclined, remoteDeclined);
+ this._log.debug("Declined changed? " + declinedChanged);
+ if (declinedChanged) {
+ meta.changed = true;
+ meta.payload.declined = [...newDeclined];
+ }
+
+ // Update the engine manager regardless.
+ engineManager.setDeclined(newDeclined);
+
+ // Any engines that are locally known, locally disabled, and not remotely
+ // or locally declined, are candidates for enablement.
+ let undecided = CommonUtils.difference(CommonUtils.difference(known, enabled), newDeclined);
+ if (undecided.size) {
+ let subject = {
+ declined: newDeclined,
+ enabled: enabled,
+ known: known,
+ undecided: undecided,
+ };
+ CommonUtils.nextTick(() => {
+ Observers.notify("weave:engines:notdeclined", subject);
+ });
+ }
+
+ return declinedChanged;
+ },
+};
diff --git a/services/sync/modules/stages/enginesync.js b/services/sync/modules/stages/enginesync.js
new file mode 100644
index 000000000..a00a2f48b
--- /dev/null
+++ b/services/sync/modules/stages/enginesync.js
@@ -0,0 +1,449 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * This file contains code for synchronizing engines.
+ */
+
+this.EXPORTED_SYMBOLS = ["EngineSynchronizer"];
+
+var {utils: Cu} = Components;
+
+Cu.import("resource://gre/modules/Log.jsm");
+Cu.import("resource://services-sync/constants.js");
+Cu.import("resource://services-sync/engines.js");
+Cu.import("resource://services-sync/policies.js");
+Cu.import("resource://services-sync/util.js");
+Cu.import("resource://services-common/observers.js");
+Cu.import("resource://services-common/async.js");
+Cu.import("resource://gre/modules/Task.jsm");
+
+/**
+ * Perform synchronization of engines.
+ *
+ * This was originally split out of service.js. The API needs lots of love.
+ */
+this.EngineSynchronizer = function EngineSynchronizer(service) {
+ this._log = Log.repository.getLogger("Sync.Synchronizer");
+ this._log.level = Log.Level[Svc.Prefs.get("log.logger.synchronizer")];
+
+ this.service = service;
+
+ this.onComplete = null;
+}
+
+EngineSynchronizer.prototype = {
+ sync: function sync(engineNamesToSync) {
+ if (!this.onComplete) {
+ throw new Error("onComplete handler not installed.");
+ }
+
+ let startTime = Date.now();
+
+ this.service.status.resetSync();
+
+ // Make sure we should sync or record why we shouldn't.
+ let reason = this.service._checkSync();
+ if (reason) {
+ if (reason == kSyncNetworkOffline) {
+ this.service.status.sync = LOGIN_FAILED_NETWORK_ERROR;
+ }
+
+ // this is a purposeful abort rather than a failure, so don't set
+ // any status bits
+ reason = "Can't sync: " + reason;
+ this.onComplete(new Error("Can't sync: " + reason));
+ return;
+ }
+
+ // If we don't have a node, get one. If that fails, retry in 10 minutes.
+ if (!this.service.clusterURL && !this.service._clusterManager.setCluster()) {
+ this.service.status.sync = NO_SYNC_NODE_FOUND;
+ this._log.info("No cluster URL found. Cannot sync.");
+ this.onComplete(null);
+ return;
+ }
+
+ // Ping the server with a special info request once a day.
+ let infoURL = this.service.infoURL;
+ let now = Math.floor(Date.now() / 1000);
+ let lastPing = Svc.Prefs.get("lastPing", 0);
+ if (now - lastPing > 86400) { // 60 * 60 * 24
+ infoURL += "?v=" + WEAVE_VERSION;
+ Svc.Prefs.set("lastPing", now);
+ }
+
+ let engineManager = this.service.engineManager;
+
+ // Figure out what the last modified time is for each collection
+ let info = this.service._fetchInfo(infoURL);
+
+ // Convert the response to an object and read out the modified times
+ for (let engine of [this.service.clientsEngine].concat(engineManager.getAll())) {
+ engine.lastModified = info.obj[engine.name] || 0;
+ }
+
+ if (!(this.service._remoteSetup(info))) {
+ this.onComplete(new Error("Aborting sync, remote setup failed"));
+ return;
+ }
+
+ // Make sure we have an up-to-date list of clients before sending commands
+ this._log.debug("Refreshing client list.");
+ if (!this._syncEngine(this.service.clientsEngine)) {
+ // Clients is an engine like any other; it can fail with a 401,
+ // and we can elect to abort the sync.
+ this._log.warn("Client engine sync failed. Aborting.");
+ this.onComplete(null);
+ return;
+ }
+
+ // We only honor the "hint" of what engines to Sync if this isn't
+ // a first sync.
+ let allowEnginesHint = false;
+ // Wipe data in the desired direction if necessary
+ switch (Svc.Prefs.get("firstSync")) {
+ case "resetClient":
+ this.service.resetClient(engineManager.enabledEngineNames);
+ break;
+ case "wipeClient":
+ this.service.wipeClient(engineManager.enabledEngineNames);
+ break;
+ case "wipeRemote":
+ this.service.wipeRemote(engineManager.enabledEngineNames);
+ break;
+ default:
+ allowEnginesHint = true;
+ break;
+ }
+
+ if (this.service.clientsEngine.localCommands) {
+ try {
+ if (!(this.service.clientsEngine.processIncomingCommands())) {
+ this.service.status.sync = ABORT_SYNC_COMMAND;
+ this.onComplete(new Error("Processed command aborted sync."));
+ return;
+ }
+
+ // Repeat remoteSetup in-case the commands forced us to reset
+ if (!(this.service._remoteSetup(info))) {
+ this.onComplete(new Error("Remote setup failed after processing commands."));
+ return;
+ }
+ }
+ finally {
+ // Always immediately attempt to push back the local client (now
+ // without commands).
+ // Note that we don't abort here; if there's a 401 because we've
+ // been reassigned, we'll handle it around another engine.
+ this._syncEngine(this.service.clientsEngine);
+ }
+ }
+
+ // Update engines because it might change what we sync.
+ try {
+ this._updateEnabledEngines();
+ } catch (ex) {
+ this._log.debug("Updating enabled engines failed", ex);
+ this.service.errorHandler.checkServerError(ex);
+ this.onComplete(ex);
+ return;
+ }
+
+ // If the engines to sync has been specified, we sync in the order specified.
+ let enginesToSync;
+ if (allowEnginesHint && engineNamesToSync) {
+ this._log.info("Syncing specified engines", engineNamesToSync);
+ enginesToSync = engineManager.get(engineNamesToSync).filter(e => e.enabled);
+ } else {
+ this._log.info("Syncing all enabled engines.");
+ enginesToSync = engineManager.getEnabled();
+ }
+ try {
+ // We don't bother validating engines that failed to sync.
+ let enginesToValidate = [];
+ for (let engine of enginesToSync) {
+ // If there's any problems with syncing the engine, report the failure
+ if (!(this._syncEngine(engine)) || this.service.status.enforceBackoff) {
+ this._log.info("Aborting sync for failure in " + engine.name);
+ break;
+ }
+ enginesToValidate.push(engine);
+ }
+
+ // If _syncEngine fails for a 401, we might not have a cluster URL here.
+ // If that's the case, break out of this immediately, rather than
+ // throwing an exception when trying to fetch metaURL.
+ if (!this.service.clusterURL) {
+ this._log.debug("Aborting sync, no cluster URL: " +
+ "not uploading new meta/global.");
+ this.onComplete(null);
+ return;
+ }
+
+ // Upload meta/global if any engines changed anything.
+ let meta = this.service.recordManager.get(this.service.metaURL);
+ if (meta.isNew || meta.changed) {
+ this._log.info("meta/global changed locally: reuploading.");
+ try {
+ this.service.uploadMetaGlobal(meta);
+ delete meta.isNew;
+ delete meta.changed;
+ } catch (error) {
+ this._log.error("Unable to upload meta/global. Leaving marked as new.");
+ }
+ }
+
+ Async.promiseSpinningly(this._tryValidateEngines(enginesToValidate));
+
+ // If there were no sync engine failures
+ if (this.service.status.service != SYNC_FAILED_PARTIAL) {
+ Svc.Prefs.set("lastSync", new Date().toString());
+ this.service.status.sync = SYNC_SUCCEEDED;
+ }
+ } finally {
+ Svc.Prefs.reset("firstSync");
+
+ let syncTime = ((Date.now() - startTime) / 1000).toFixed(2);
+ let dateStr = Utils.formatTimestamp(new Date());
+ this._log.info("Sync completed at " + dateStr
+ + " after " + syncTime + " secs.");
+ }
+
+ this.onComplete(null);
+ },
+
+ _tryValidateEngines: Task.async(function* (recentlySyncedEngines) {
+ if (!Services.telemetry.canRecordBase || !Svc.Prefs.get("validation.enabled", false)) {
+ this._log.info("Skipping validation: validation or telemetry reporting is disabled");
+ return;
+ }
+
+ let lastValidation = Svc.Prefs.get("validation.lastTime", 0);
+ let validationInterval = Svc.Prefs.get("validation.interval");
+ let nowSeconds = Math.floor(Date.now() / 1000);
+
+ if (nowSeconds - lastValidation < validationInterval) {
+ this._log.info("Skipping validation: too recent since last validation attempt");
+ return;
+ }
+ // Update the time now, even if we may return false still. We don't want to
+ // check the rest of these more frequently than once a day.
+ Svc.Prefs.set("validation.lastTime", nowSeconds);
+
+ // Validation only occurs a certain percentage of the time.
+ let validationProbability = Svc.Prefs.get("validation.percentageChance", 0) / 100.0;
+ if (validationProbability < Math.random()) {
+ this._log.info("Skipping validation: Probability threshold not met");
+ return;
+ }
+ let maxRecords = Svc.Prefs.get("validation.maxRecords");
+ if (!maxRecords) {
+ // Don't bother asking the server for the counts if we know validation
+ // won't happen anyway.
+ return;
+ }
+
+ // maxRecords of -1 means "any number", so we can skip asking the server.
+ // Used for tests.
+ let info;
+ if (maxRecords < 0) {
+ info = {};
+ for (let e of recentlySyncedEngines) {
+ info[e.name] = 1; // needs to be < maxRecords
+ }
+ maxRecords = 2;
+ } else {
+
+ let collectionCountsURL = this.service.userBaseURL + "info/collection_counts";
+ try {
+ let infoResp = this.service._fetchInfo(collectionCountsURL);
+ if (!infoResp.success) {
+ this._log.error("Can't run validation: request to info/collection_counts responded with "
+ + resp.status);
+ return;
+ }
+ info = infoResp.obj; // might throw because obj is a getter which parses json.
+ } catch (e) {
+ // Not running validation is totally fine, so we just write an error log and return.
+ this._log.error("Can't run validation: Caught error when fetching counts", e);
+ return;
+ }
+ }
+
+ if (!info) {
+ return;
+ }
+
+ let engineLookup = new Map(recentlySyncedEngines.map(e => [e.name, e]));
+ let toRun = [];
+ for (let [engineName, recordCount] of Object.entries(info)) {
+ let engine = engineLookup.get(engineName);
+ if (recordCount > maxRecords || !engine) {
+ this._log.debug(`Skipping validation for ${engineName} because it's not an engine or ` +
+ `the number of records (${recordCount}) is greater than the maximum allowed (${maxRecords}).`);
+ continue;
+ }
+ let validator = engine.getValidator();
+ if (!validator) {
+ continue;
+ }
+ // Put this in an array so that we know how many we're going to do, so we
+ // don't tell users we're going to run some validators when we aren't.
+ toRun.push({ engine, validator });
+ }
+
+ if (!toRun.length) {
+ return;
+ }
+ Services.console.logStringMessage(
+ "Sync is about to run a consistency check. This may be slow, and " +
+ "can be controlled using the pref \"services.sync.validation.enabled\".\n" +
+ "If you encounter any problems because of this, please file a bug.");
+ for (let { validator, engine } of toRun) {
+ try {
+ let result = yield validator.validate(engine);
+ Observers.notify("weave:engine:validate:finish", result, engine.name);
+ } catch (e) {
+ this._log.error(`Failed to run validation on ${engine.name}!`, e);
+ Observers.notify("weave:engine:validate:error", e, engine.name)
+ // Keep validating -- there's no reason to think that a failure for one
+ // validator would mean the others will fail.
+ }
+ }
+ }),
+
+ // Returns true if sync should proceed.
+ // false / no return value means sync should be aborted.
+ _syncEngine: function _syncEngine(engine) {
+ try {
+ engine.sync();
+ }
+ catch(e) {
+ if (e.status == 401) {
+ // Maybe a 401, cluster update perhaps needed?
+ // We rely on ErrorHandler observing the sync failure notification to
+ // schedule another sync and clear node assignment values.
+ // Here we simply want to muffle the exception and return an
+ // appropriate value.
+ return false;
+ }
+ }
+
+ return true;
+ },
+
+ _updateEnabledFromMeta: function (meta, numClients, engineManager=this.service.engineManager) {
+ this._log.info("Updating enabled engines: " +
+ numClients + " clients.");
+
+ if (meta.isNew || !meta.payload.engines) {
+ this._log.debug("meta/global isn't new, or is missing engines. Not updating enabled state.");
+ return;
+ }
+
+ // If we're the only client, and no engines are marked as enabled,
+ // thumb our noses at the server data: it can't be right.
+ // Belt-and-suspenders approach to Bug 615926.
+ let hasEnabledEngines = false;
+ for (let e in meta.payload.engines) {
+ if (e != "clients") {
+ hasEnabledEngines = true;
+ break;
+ }
+ }
+
+ if ((numClients <= 1) && !hasEnabledEngines) {
+ this._log.info("One client and no enabled engines: not touching local engine status.");
+ return;
+ }
+
+ this.service._ignorePrefObserver = true;
+
+ let enabled = engineManager.enabledEngineNames;
+
+ let toDecline = new Set();
+ let toUndecline = new Set();
+
+ for (let engineName in meta.payload.engines) {
+ if (engineName == "clients") {
+ // Clients is special.
+ continue;
+ }
+ let index = enabled.indexOf(engineName);
+ if (index != -1) {
+ // The engine is enabled locally. Nothing to do.
+ enabled.splice(index, 1);
+ continue;
+ }
+ let engine = engineManager.get(engineName);
+ if (!engine) {
+ // The engine doesn't exist locally. Nothing to do.
+ continue;
+ }
+
+ let attemptedEnable = false;
+ // If the engine was enabled remotely, enable it locally.
+ if (!Svc.Prefs.get("engineStatusChanged." + engine.prefName, false)) {
+ this._log.trace("Engine " + engineName + " was enabled. Marking as non-declined.");
+ toUndecline.add(engineName);
+ this._log.trace(engineName + " engine was enabled remotely.");
+ engine.enabled = true;
+ // Note that setting engine.enabled to true might not have worked for
+ // the password engine if a master-password is enabled. However, it's
+ // still OK that we added it to undeclined - the user *tried* to enable
+ // it remotely - so it still winds up as not being flagged as declined
+ // even though it's disabled remotely.
+ attemptedEnable = true;
+ }
+
+ // If either the engine was disabled locally or enabling the engine
+ // failed (see above re master-password) then wipe server data and
+ // disable it everywhere.
+ if (!engine.enabled) {
+ this._log.trace("Wiping data for " + engineName + " engine.");
+ engine.wipeServer();
+ delete meta.payload.engines[engineName];
+ meta.changed = true; // the new enabled state must propagate
+ // We also here mark the engine as declined, because the pref
+ // was explicitly changed to false - unless we tried, and failed,
+ // to enable it - in which case we leave the declined state alone.
+ if (!attemptedEnable) {
+ // This will be reflected in meta/global in the next stage.
+ this._log.trace("Engine " + engineName + " was disabled locally. Marking as declined.");
+ toDecline.add(engineName);
+ }
+ }
+ }
+
+ // Any remaining engines were either enabled locally or disabled remotely.
+ for (let engineName of enabled) {
+ let engine = engineManager.get(engineName);
+ if (Svc.Prefs.get("engineStatusChanged." + engine.prefName, false)) {
+ this._log.trace("The " + engineName + " engine was enabled locally.");
+ toUndecline.add(engineName);
+ } else {
+ this._log.trace("The " + engineName + " engine was disabled remotely.");
+
+ // Don't automatically mark it as declined!
+ engine.enabled = false;
+ }
+ }
+
+ engineManager.decline(toDecline);
+ engineManager.undecline(toUndecline);
+
+ Svc.Prefs.resetBranch("engineStatusChanged.");
+ this.service._ignorePrefObserver = false;
+ },
+
+ _updateEnabledEngines: function () {
+ let meta = this.service.recordManager.get(this.service.metaURL);
+ let numClients = this.service.scheduler.numClients;
+ let engineManager = this.service.engineManager;
+
+ this._updateEnabledFromMeta(meta, numClients, engineManager);
+ },
+};
+Object.freeze(EngineSynchronizer.prototype);