summaryrefslogtreecommitdiffstats
path: root/services/sync/modules/engines/clients.js
diff options
context:
space:
mode:
Diffstat (limited to 'services/sync/modules/engines/clients.js')
-rw-r--r--services/sync/modules/engines/clients.js442
1 files changed, 68 insertions, 374 deletions
diff --git a/services/sync/modules/engines/clients.js b/services/sync/modules/engines/clients.js
index 3dd679570..6c8e37a7b 100644
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -2,24 +2,6 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-/**
- * How does the clients engine work?
- *
- * - We use 2 files - commands.json and commands-syncing.json.
- *
- * - At sync upload time, we attempt a rename of commands.json to
- * commands-syncing.json, and ignore errors (helps for crash during sync!).
- * - We load commands-syncing.json and stash the contents in
- * _currentlySyncingCommands which lives for the duration of the upload process.
- * - We use _currentlySyncingCommands to build the outgoing records
- * - Immediately after successful upload, we delete commands-syncing.json from
- * disk (and clear _currentlySyncingCommands). We reconcile our local records
- * with what we just wrote in the server, and add failed IDs commands
- * back in commands.json
- * - Any time we need to "save" a command for future syncs, we load
- * commands.json, update it, and write it back out.
- */
-
this.EXPORTED_SYMBOLS = [
"ClientEngine",
"ClientsRec"
@@ -27,32 +9,17 @@ this.EXPORTED_SYMBOLS = [
var {classes: Cc, interfaces: Ci, utils: Cu} = Components;
-Cu.import("resource://services-common/async.js");
Cu.import("resource://services-common/stringbundle.js");
Cu.import("resource://services-sync/constants.js");
Cu.import("resource://services-sync/engines.js");
Cu.import("resource://services-sync/record.js");
-Cu.import("resource://services-sync/resource.js");
Cu.import("resource://services-sync/util.js");
-Cu.import("resource://gre/modules/Services.jsm");
-
-XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts",
- "resource://gre/modules/FxAccounts.jsm");
const CLIENTS_TTL = 1814400; // 21 days
const CLIENTS_TTL_REFRESH = 604800; // 7 days
-const STALE_CLIENT_REMOTE_AGE = 604800; // 7 days
const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
-function hasDupeCommand(commands, action) {
- if (!commands) {
- return false;
- }
- return commands.some(other => other.command == action.command &&
- Utils.deepEquals(other.args, action.args));
-}
-
this.ClientsRec = function ClientsRec(collection, id) {
CryptoWrapper.call(this, collection, id);
}
@@ -66,27 +33,23 @@ Utils.deferGetSet(ClientsRec,
"cleartext",
["name", "type", "commands",
"version", "protocols",
- "formfactor", "os", "appPackage", "application", "device",
- "fxaDeviceId"]);
+ "formfactor", "os", "appPackage", "application", "device"]);
this.ClientEngine = function ClientEngine(service) {
SyncEngine.call(this, "Clients", service);
- // Reset the last sync timestamp on every startup so that we fetch all clients
- this.resetLastSync();
+ // Reset the client on every startup so that we fetch recent clients
+ this._resetClient();
}
ClientEngine.prototype = {
__proto__: SyncEngine.prototype,
_storeObj: ClientStore,
_recordObj: ClientsRec,
_trackerObj: ClientsTracker,
- allowSkippedRecord: false,
// Always sync client data as it controls other sync behavior
- get enabled() {
- return true;
- },
+ get enabled() true,
get lastRecordUpload() {
return Svc.Prefs.get(this.name + ".lastRecordUpload", 0);
@@ -95,20 +58,10 @@ ClientEngine.prototype = {
Svc.Prefs.set(this.name + ".lastRecordUpload", Math.floor(value));
},
- get remoteClients() {
- // return all non-stale clients for external consumption.
- return Object.values(this._store._remoteClients).filter(v => !v.stale);
- },
-
- remoteClientExists(id) {
- let client = this._store._remoteClients[id];
- return !!(client && !client.stale);
- },
-
// Aggregate some stats on the composition of clients on this account
get stats() {
let stats = {
- hasMobile: this.localType == DEVICE_TYPE_MOBILE,
+ hasMobile: this.localType == "mobile",
names: [this.localName],
numClients: 1,
};
@@ -156,9 +109,7 @@ ClientEngine.prototype = {
let localID = Svc.Prefs.get("client.GUID", "");
return localID == "" ? this.localID = Utils.makeGUID() : localID;
},
- set localID(value) {
- Svc.Prefs.set("client.GUID", value);
- },
+ set localID(value) Svc.Prefs.set("client.GUID", value),
get brandName() {
let brand = new StringBundle("chrome://branding/locale/brand.properties");
@@ -166,97 +117,23 @@ ClientEngine.prototype = {
},
get localName() {
- let name = Utils.getDeviceName();
- // If `getDeviceName` returns the default name, set the pref. FxA registers
- // the device before syncing, so we don't need to update the registration
- // in this case.
- Svc.Prefs.set("client.name", name);
- return name;
- },
- set localName(value) {
- Svc.Prefs.set("client.name", value);
- // Update the registration in the background.
- fxAccounts.updateDeviceRegistration().catch(error => {
- this._log.warn("failed to update fxa device registration", error);
- });
- },
+ let localName = Svc.Prefs.get("client.name", "");
+ if (localName != "")
+ return localName;
- get localType() {
- return Utils.getDeviceType();
- },
- set localType(value) {
- Svc.Prefs.set("client.type", value);
+ return this.localName = Utils.getDefaultDeviceName();
},
+ set localName(value) Svc.Prefs.set("client.name", value),
- getClientName(id) {
- if (id == this.localID) {
- return this.localName;
- }
- let client = this._store._remoteClients[id];
- return client ? client.name : "";
- },
-
- getClientFxaDeviceId(id) {
- if (this._store._remoteClients[id]) {
- return this._store._remoteClients[id].fxaDeviceId;
- }
- return null;
- },
+ get localType() Svc.Prefs.get("client.type", "desktop"),
+ set localType(value) Svc.Prefs.set("client.type", value),
isMobile: function isMobile(id) {
if (this._store._remoteClients[id])
- return this._store._remoteClients[id].type == DEVICE_TYPE_MOBILE;
+ return this._store._remoteClients[id].type == "mobile";
return false;
},
- _readCommands() {
- let cb = Async.makeSpinningCallback();
- Utils.jsonLoad("commands", this, commands => cb(null, commands));
- return cb.wait() || {};
- },
-
- /**
- * Low level function, do not use directly (use _addClientCommand instead).
- */
- _saveCommands(commands) {
- let cb = Async.makeSpinningCallback();
- Utils.jsonSave("commands", this, commands, error => {
- if (error) {
- this._log.error("Failed to save JSON outgoing commands", error);
- }
- cb();
- });
- cb.wait();
- },
-
- _prepareCommandsForUpload() {
- let cb = Async.makeSpinningCallback();
- Utils.jsonMove("commands", "commands-syncing", this).catch(() => {}) // Ignore errors
- .then(() => {
- Utils.jsonLoad("commands-syncing", this, commands => cb(null, commands));
- });
- return cb.wait() || {};
- },
-
- _deleteUploadedCommands() {
- delete this._currentlySyncingCommands;
- Async.promiseSpinningly(
- Utils.jsonRemove("commands-syncing", this).catch(err => {
- this._log.error("Failed to delete syncing-commands file", err);
- })
- );
- },
-
- _addClientCommand(clientId, command) {
- const allCommands = this._readCommands();
- const clientCommands = allCommands[clientId] || [];
- if (hasDupeCommand(clientCommands, command)) {
- return;
- }
- allCommands[clientId] = clientCommands.concat(command);
- this._saveCommands(allCommands);
- },
-
_syncStartup: function _syncStartup() {
// Reupload new client record periodically.
if (Date.now() / 1000 - this.lastRecordUpload > CLIENTS_TTL_REFRESH) {
@@ -266,157 +143,9 @@ ClientEngine.prototype = {
SyncEngine.prototype._syncStartup.call(this);
},
- _processIncoming() {
- // Fetch all records from the server.
- this.lastSync = 0;
- this._incomingClients = {};
- try {
- SyncEngine.prototype._processIncoming.call(this);
- // Since clients are synced unconditionally, any records in the local store
- // that don't exist on the server must be for disconnected clients. Remove
- // them, so that we don't upload records with commands for clients that will
- // never see them. We also do this to filter out stale clients from the
- // tabs collection, since showing their list of tabs is confusing.
- for (let id in this._store._remoteClients) {
- if (!this._incomingClients[id]) {
- this._log.info(`Removing local state for deleted client ${id}`);
- this._removeRemoteClient(id);
- }
- }
- // Bug 1264498: Mobile clients don't remove themselves from the clients
- // collection when the user disconnects Sync, so we mark as stale clients
- // with the same name that haven't synced in over a week.
- // (Note we can't simply delete them, or we re-apply them next sync - see
- // bug 1287687)
- delete this._incomingClients[this.localID];
- let names = new Set([this.localName]);
- for (let id in this._incomingClients) {
- let record = this._store._remoteClients[id];
- if (!names.has(record.name)) {
- names.add(record.name);
- continue;
- }
- let remoteAge = AsyncResource.serverTime - this._incomingClients[id];
- if (remoteAge > STALE_CLIENT_REMOTE_AGE) {
- this._log.info(`Hiding stale client ${id} with age ${remoteAge}`);
- record.stale = true;
- }
- }
- } finally {
- this._incomingClients = null;
- }
- },
-
- _uploadOutgoing() {
- this._currentlySyncingCommands = this._prepareCommandsForUpload();
- const clientWithPendingCommands = Object.keys(this._currentlySyncingCommands);
- for (let clientId of clientWithPendingCommands) {
- if (this._store._remoteClients[clientId] || this.localID == clientId) {
- this._modified.set(clientId, 0);
- }
- }
- SyncEngine.prototype._uploadOutgoing.call(this);
- },
-
- _onRecordsWritten(succeeded, failed) {
- // Reconcile the status of the local records with what we just wrote on the
- // server
- for (let id of succeeded) {
- const commandChanges = this._currentlySyncingCommands[id];
- if (id == this.localID) {
- if (this.localCommands) {
- this.localCommands = this.localCommands.filter(command => !hasDupeCommand(commandChanges, command));
- }
- } else {
- const clientRecord = this._store._remoteClients[id];
- if (!commandChanges || !clientRecord) {
- // should be impossible, else we wouldn't have been writing it.
- this._log.warn("No command/No record changes for a client we uploaded");
- continue;
- }
- // fixup the client record, so our copy of _remoteClients matches what we uploaded.
- clientRecord.commands = this._store.createRecord(id);
- // we could do better and pass the reference to the record we just uploaded,
- // but this will do for now
- }
- }
-
- // Re-add failed commands
- for (let id of failed) {
- const commandChanges = this._currentlySyncingCommands[id];
- if (!commandChanges) {
- continue;
- }
- this._addClientCommand(id, commandChanges);
- }
-
- this._deleteUploadedCommands();
-
- // Notify other devices that their own client collection changed
- const idsToNotify = succeeded.reduce((acc, id) => {
- if (id == this.localID) {
- return acc;
- }
- const fxaDeviceId = this.getClientFxaDeviceId(id);
- return fxaDeviceId ? acc.concat(fxaDeviceId) : acc;
- }, []);
- if (idsToNotify.length > 0) {
- this._notifyCollectionChanged(idsToNotify);
- }
- },
-
- _notifyCollectionChanged(ids) {
- const message = {
- version: 1,
- command: "sync:collection_changed",
- data: {
- collections: ["clients"]
- }
- };
- fxAccounts.notifyDevices(ids, message, NOTIFY_TAB_SENT_TTL_SECS);
- },
-
- _syncFinish() {
- // Record histograms for our device types, and also write them to a pref
- // so non-histogram telemetry (eg, UITelemetry) has easy access to them.
- for (let [deviceType, count] of this.deviceTypes) {
- let hid;
- let prefName = this.name + ".devices.";
- switch (deviceType) {
- case "desktop":
- hid = "WEAVE_DEVICE_COUNT_DESKTOP";
- prefName += "desktop";
- break;
- case "mobile":
- hid = "WEAVE_DEVICE_COUNT_MOBILE";
- prefName += "mobile";
- break;
- default:
- this._log.warn(`Unexpected deviceType "${deviceType}" recording device telemetry.`);
- continue;
- }
- Services.telemetry.getHistogramById(hid).add(count);
- Svc.Prefs.set(prefName, count);
- }
- SyncEngine.prototype._syncFinish.call(this);
- },
-
- _reconcile: function _reconcile(item) {
- // Every incoming record is reconciled, so we use this to track the
- // contents of the collection on the server.
- this._incomingClients[item.id] = item.modified;
-
- if (!this._store.itemExists(item.id)) {
- return true;
- }
- // Clients are synced unconditionally, so we'll always have new records.
- // Unfortunately, this will cause the scheduler to use the immediate sync
- // interval for the multi-device case, instead of the active interval. We
- // work around this by updating the record during reconciliation, and
- // returning false to indicate that the record doesn't need to be applied
- // later.
- this._store.update(item);
- return false;
+ // Always process incoming items because they might have commands
+ _reconcile: function _reconcile() {
+ return true;
},
// Treat reset the same as wiping for locally cached clients
@@ -426,13 +155,7 @@ ClientEngine.prototype = {
_wipeClient: function _wipeClient() {
SyncEngine.prototype._resetClient.call(this);
- delete this.localCommands;
this._store.wipe();
- const logRemoveError = err => this._log.warn("Could not delete json file", err);
- Async.promiseSpinningly(
- Utils.jsonRemove("commands", this).catch(logRemoveError)
- .then(Utils.jsonRemove("commands-syncing", this).catch(logRemoveError))
- );
},
removeClientData: function removeClientData() {
@@ -471,6 +194,14 @@ ClientEngine.prototype = {
},
/**
+ * Remove any commands for the local client and mark it for upload.
+ */
+ clearCommands: function clearCommands() {
+ delete this.localCommands;
+ this._tracker.addChangedID(this.localID);
+ },
+
+ /**
* Sends a command+args pair to a specific client.
*
* @param command Command string
@@ -484,17 +215,30 @@ ClientEngine.prototype = {
if (!client) {
throw new Error("Unknown remote client ID: '" + clientId + "'.");
}
- if (client.stale) {
- throw new Error("Stale remote client ID: '" + clientId + "'.");
- }
+
+ // notDupe compares two commands and returns if they are not equal.
+ let notDupe = function(other) {
+ return other.command != command || !Utils.deepEquals(other.args, args);
+ };
let action = {
command: command,
args: args,
};
+ if (!client.commands) {
+ client.commands = [action];
+ }
+ // Add the new action if there are no duplicates.
+ else if (client.commands.every(notDupe)) {
+ client.commands.push(action);
+ }
+ // It must be a dupe. Skip.
+ else {
+ return;
+ }
+
this._log.trace("Client " + clientId + " got a new action: " + [command, args]);
- this._addClientCommand(clientId, action);
this._tracker.addChangedID(clientId);
},
@@ -505,17 +249,13 @@ ClientEngine.prototype = {
*/
processIncomingCommands: function processIncomingCommands() {
return this._notify("clients:process-commands", "", function() {
- if (!this.localCommands) {
- return true;
- }
+ let commands = this.localCommands;
- const clearedCommands = this._readCommands()[this.localID];
- const commands = this.localCommands.filter(command => !hasDupeCommand(clearedCommands, command));
+ // Immediately clear out the commands as we've got them locally.
+ this.clearCommands();
- let URIsToDisplay = [];
// Process each command in order.
- for (let rawCommand of commands) {
- let {command, args} = rawCommand;
+ for each (let {command, args} in commands) {
this._log.debug("Processing command: " + command + "(" + args + ")");
let engines = [args[0]];
@@ -536,20 +276,12 @@ ClientEngine.prototype = {
this.service.logout();
return false;
case "displayURI":
- let [uri, clientId, title] = args;
- URIsToDisplay.push({ uri, clientId, title });
+ this._handleDisplayURI.apply(this, args);
break;
default:
this._log.debug("Received an unknown command: " + command);
break;
}
- // Add the command to the "cleared" commands list
- this._addClientCommand(this.localID, rawCommand)
- }
- this._tracker.addChangedID(this.localID);
-
- if (URIsToDisplay.length) {
- this._handleDisplayURIs(URIsToDisplay);
}
return true;
@@ -588,10 +320,8 @@ ClientEngine.prototype = {
if (clientId) {
this._sendCommandToClient(command, args, clientId);
} else {
- for (let [id, record] of Object.entries(this._store._remoteClients)) {
- if (!record.stale) {
- this._sendCommandToClient(command, args, id);
- }
+ for (let id in this._store._remoteClients) {
+ this._sendCommandToClient(command, args, id);
}
}
},
@@ -622,11 +352,11 @@ ClientEngine.prototype = {
},
/**
- * Handle a bunch of received 'displayURI' commands.
+ * Handle a single received 'displayURI' command.
*
- * Interested parties should observe the "weave:engine:clients:display-uris"
- * topic. The callback will receive an array as the subject parameter
- * containing objects with the following keys:
+ * Interested parties should observe the "weave:engine:clients:display-uri"
+ * topic. The callback will receive an object as the subject parameter with
+ * the following keys:
*
* uri URI (string) that is requested for display.
* clientId ID of client that sent the command.
@@ -634,24 +364,21 @@ ClientEngine.prototype = {
*
* The 'data' parameter to the callback will not be defined.
*
- * @param uris
- * An array containing URI objects to display
- * @param uris[].uri
+ * @param uri
* String URI that was received
- * @param uris[].clientId
+ * @param clientId
* ID of client that sent URI
- * @param uris[].title
+ * @param title
* String title of page that URI corresponds to. Older clients may not
* send this.
*/
- _handleDisplayURIs: function _handleDisplayURIs(uris) {
- Svc.Obs.notify("weave:engine:clients:display-uris", uris);
- },
+ _handleDisplayURI: function _handleDisplayURI(uri, clientId, title) {
+ this._log.info("Received a URI for display: " + uri + " (" + title +
+ ") from " + clientId);
- _removeRemoteClient(id) {
- delete this._store._remoteClients[id];
- this._tracker.removeChangedID(id);
- },
+ let subject = {uri: uri, client: clientId, title: title};
+ Svc.Obs.notify("weave:engine:clients:display-uri", subject);
+ }
};
function ClientStore(name, engine) {
@@ -660,48 +387,29 @@ function ClientStore(name, engine) {
ClientStore.prototype = {
__proto__: Store.prototype,
- _remoteClients: {},
-
create(record) {
- this.update(record);
+ this.update(record)
},
update: function update(record) {
- if (record.id == this.engine.localID) {
- // Only grab commands from the server; local name/type always wins
+ // Only grab commands from the server; local name/type always wins
+ if (record.id == this.engine.localID)
this.engine.localCommands = record.commands;
- } else {
+ else
this._remoteClients[record.id] = record.cleartext;
- }
},
createRecord: function createRecord(id, collection) {
let record = new ClientsRec(collection, id);
- const commandsChanges = this.engine._currentlySyncingCommands ?
- this.engine._currentlySyncingCommands[id] :
- [];
-
// Package the individual components into a record for the local client
if (id == this.engine.localID) {
- let cb = Async.makeSpinningCallback();
- fxAccounts.getDeviceId().then(id => cb(null, id), cb);
- try {
- record.fxaDeviceId = cb.wait();
- } catch(error) {
- this._log.warn("failed to get fxa device id", error);
- }
record.name = this.engine.localName;
record.type = this.engine.localType;
+ record.commands = this.engine.localCommands;
record.version = Services.appinfo.version;
record.protocols = SUPPORTED_PROTOCOL_VERSIONS;
- // Substract the commands we recorded that we've already executed
- if (commandsChanges && commandsChanges.length &&
- this.engine.localCommands && this.engine.localCommands.length) {
- record.commands = this.engine.localCommands.filter(command => !hasDupeCommand(commandsChanges, command));
- }
-
// Optional fields.
record.os = Services.appinfo.OS; // "Darwin"
record.appPackage = Services.appinfo.ID;
@@ -712,20 +420,6 @@ ClientStore.prototype = {
// record.formfactor = ""; // Bug 1100722
} else {
record.cleartext = this._remoteClients[id];
-
- // Add the commands we have to send
- if (commandsChanges && commandsChanges.length) {
- const recordCommands = record.cleartext.commands || [];
- const newCommands = commandsChanges.filter(command => !hasDupeCommand(recordCommands, command));
- record.cleartext.commands = recordCommands.concat(newCommands);
- }
-
- if (record.cleartext.stale) {
- // It's almost certainly a logic error for us to upload a record we
- // consider stale, so make log noise, but still remove the flag.
- this._log.error(`Preparing to upload record ${id} that we consider stale`);
- delete record.cleartext.stale;
- }
}
return record;
@@ -768,7 +462,7 @@ ClientsTracker.prototype = {
break;
case "weave:engine:stop-tracking":
if (this._enabled) {
- Svc.Prefs.ignore("client.name", this);
+ Svc.Prefs.ignore("clients.name", this);
this._enabled = false;
}
break;