diff options
Diffstat (limited to 'services/sync/modules/engines/clients.js')
-rw-r--r-- | services/sync/modules/engines/clients.js | 442 |
1 files changed, 68 insertions, 374 deletions
diff --git a/services/sync/modules/engines/clients.js b/services/sync/modules/engines/clients.js index 3dd679570..6c8e37a7b 100644 --- a/services/sync/modules/engines/clients.js +++ b/services/sync/modules/engines/clients.js @@ -2,24 +2,6 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -/** - * How does the clients engine work? - * - * - We use 2 files - commands.json and commands-syncing.json. - * - * - At sync upload time, we attempt a rename of commands.json to - * commands-syncing.json, and ignore errors (helps for crash during sync!). - * - We load commands-syncing.json and stash the contents in - * _currentlySyncingCommands which lives for the duration of the upload process. - * - We use _currentlySyncingCommands to build the outgoing records - * - Immediately after successful upload, we delete commands-syncing.json from - * disk (and clear _currentlySyncingCommands). We reconcile our local records - * with what we just wrote in the server, and add failed IDs commands - * back in commands.json - * - Any time we need to "save" a command for future syncs, we load - * commands.json, update it, and write it back out. - */ - this.EXPORTED_SYMBOLS = [ "ClientEngine", "ClientsRec" @@ -27,32 +9,17 @@ this.EXPORTED_SYMBOLS = [ var {classes: Cc, interfaces: Ci, utils: Cu} = Components; -Cu.import("resource://services-common/async.js"); Cu.import("resource://services-common/stringbundle.js"); Cu.import("resource://services-sync/constants.js"); Cu.import("resource://services-sync/engines.js"); Cu.import("resource://services-sync/record.js"); -Cu.import("resource://services-sync/resource.js"); Cu.import("resource://services-sync/util.js"); -Cu.import("resource://gre/modules/Services.jsm"); - -XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts", - "resource://gre/modules/FxAccounts.jsm"); const CLIENTS_TTL = 1814400; // 21 days const CLIENTS_TTL_REFRESH = 604800; // 7 days -const STALE_CLIENT_REMOTE_AGE = 604800; // 7 days const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"]; -function hasDupeCommand(commands, action) { - if (!commands) { - return false; - } - return commands.some(other => other.command == action.command && - Utils.deepEquals(other.args, action.args)); -} - this.ClientsRec = function ClientsRec(collection, id) { CryptoWrapper.call(this, collection, id); } @@ -66,27 +33,23 @@ Utils.deferGetSet(ClientsRec, "cleartext", ["name", "type", "commands", "version", "protocols", - "formfactor", "os", "appPackage", "application", "device", - "fxaDeviceId"]); + "formfactor", "os", "appPackage", "application", "device"]); this.ClientEngine = function ClientEngine(service) { SyncEngine.call(this, "Clients", service); - // Reset the last sync timestamp on every startup so that we fetch all clients - this.resetLastSync(); + // Reset the client on every startup so that we fetch recent clients + this._resetClient(); } ClientEngine.prototype = { __proto__: SyncEngine.prototype, _storeObj: ClientStore, _recordObj: ClientsRec, _trackerObj: ClientsTracker, - allowSkippedRecord: false, // Always sync client data as it controls other sync behavior - get enabled() { - return true; - }, + get enabled() true, get lastRecordUpload() { return Svc.Prefs.get(this.name + ".lastRecordUpload", 0); @@ -95,20 +58,10 @@ ClientEngine.prototype = { Svc.Prefs.set(this.name + ".lastRecordUpload", Math.floor(value)); }, - get remoteClients() { - // return all non-stale clients for external consumption. - return Object.values(this._store._remoteClients).filter(v => !v.stale); - }, - - remoteClientExists(id) { - let client = this._store._remoteClients[id]; - return !!(client && !client.stale); - }, - // Aggregate some stats on the composition of clients on this account get stats() { let stats = { - hasMobile: this.localType == DEVICE_TYPE_MOBILE, + hasMobile: this.localType == "mobile", names: [this.localName], numClients: 1, }; @@ -156,9 +109,7 @@ ClientEngine.prototype = { let localID = Svc.Prefs.get("client.GUID", ""); return localID == "" ? this.localID = Utils.makeGUID() : localID; }, - set localID(value) { - Svc.Prefs.set("client.GUID", value); - }, + set localID(value) Svc.Prefs.set("client.GUID", value), get brandName() { let brand = new StringBundle("chrome://branding/locale/brand.properties"); @@ -166,97 +117,23 @@ ClientEngine.prototype = { }, get localName() { - let name = Utils.getDeviceName(); - // If `getDeviceName` returns the default name, set the pref. FxA registers - // the device before syncing, so we don't need to update the registration - // in this case. - Svc.Prefs.set("client.name", name); - return name; - }, - set localName(value) { - Svc.Prefs.set("client.name", value); - // Update the registration in the background. - fxAccounts.updateDeviceRegistration().catch(error => { - this._log.warn("failed to update fxa device registration", error); - }); - }, + let localName = Svc.Prefs.get("client.name", ""); + if (localName != "") + return localName; - get localType() { - return Utils.getDeviceType(); - }, - set localType(value) { - Svc.Prefs.set("client.type", value); + return this.localName = Utils.getDefaultDeviceName(); }, + set localName(value) Svc.Prefs.set("client.name", value), - getClientName(id) { - if (id == this.localID) { - return this.localName; - } - let client = this._store._remoteClients[id]; - return client ? client.name : ""; - }, - - getClientFxaDeviceId(id) { - if (this._store._remoteClients[id]) { - return this._store._remoteClients[id].fxaDeviceId; - } - return null; - }, + get localType() Svc.Prefs.get("client.type", "desktop"), + set localType(value) Svc.Prefs.set("client.type", value), isMobile: function isMobile(id) { if (this._store._remoteClients[id]) - return this._store._remoteClients[id].type == DEVICE_TYPE_MOBILE; + return this._store._remoteClients[id].type == "mobile"; return false; }, - _readCommands() { - let cb = Async.makeSpinningCallback(); - Utils.jsonLoad("commands", this, commands => cb(null, commands)); - return cb.wait() || {}; - }, - - /** - * Low level function, do not use directly (use _addClientCommand instead). - */ - _saveCommands(commands) { - let cb = Async.makeSpinningCallback(); - Utils.jsonSave("commands", this, commands, error => { - if (error) { - this._log.error("Failed to save JSON outgoing commands", error); - } - cb(); - }); - cb.wait(); - }, - - _prepareCommandsForUpload() { - let cb = Async.makeSpinningCallback(); - Utils.jsonMove("commands", "commands-syncing", this).catch(() => {}) // Ignore errors - .then(() => { - Utils.jsonLoad("commands-syncing", this, commands => cb(null, commands)); - }); - return cb.wait() || {}; - }, - - _deleteUploadedCommands() { - delete this._currentlySyncingCommands; - Async.promiseSpinningly( - Utils.jsonRemove("commands-syncing", this).catch(err => { - this._log.error("Failed to delete syncing-commands file", err); - }) - ); - }, - - _addClientCommand(clientId, command) { - const allCommands = this._readCommands(); - const clientCommands = allCommands[clientId] || []; - if (hasDupeCommand(clientCommands, command)) { - return; - } - allCommands[clientId] = clientCommands.concat(command); - this._saveCommands(allCommands); - }, - _syncStartup: function _syncStartup() { // Reupload new client record periodically. if (Date.now() / 1000 - this.lastRecordUpload > CLIENTS_TTL_REFRESH) { @@ -266,157 +143,9 @@ ClientEngine.prototype = { SyncEngine.prototype._syncStartup.call(this); }, - _processIncoming() { - // Fetch all records from the server. - this.lastSync = 0; - this._incomingClients = {}; - try { - SyncEngine.prototype._processIncoming.call(this); - // Since clients are synced unconditionally, any records in the local store - // that don't exist on the server must be for disconnected clients. Remove - // them, so that we don't upload records with commands for clients that will - // never see them. We also do this to filter out stale clients from the - // tabs collection, since showing their list of tabs is confusing. - for (let id in this._store._remoteClients) { - if (!this._incomingClients[id]) { - this._log.info(`Removing local state for deleted client ${id}`); - this._removeRemoteClient(id); - } - } - // Bug 1264498: Mobile clients don't remove themselves from the clients - // collection when the user disconnects Sync, so we mark as stale clients - // with the same name that haven't synced in over a week. - // (Note we can't simply delete them, or we re-apply them next sync - see - // bug 1287687) - delete this._incomingClients[this.localID]; - let names = new Set([this.localName]); - for (let id in this._incomingClients) { - let record = this._store._remoteClients[id]; - if (!names.has(record.name)) { - names.add(record.name); - continue; - } - let remoteAge = AsyncResource.serverTime - this._incomingClients[id]; - if (remoteAge > STALE_CLIENT_REMOTE_AGE) { - this._log.info(`Hiding stale client ${id} with age ${remoteAge}`); - record.stale = true; - } - } - } finally { - this._incomingClients = null; - } - }, - - _uploadOutgoing() { - this._currentlySyncingCommands = this._prepareCommandsForUpload(); - const clientWithPendingCommands = Object.keys(this._currentlySyncingCommands); - for (let clientId of clientWithPendingCommands) { - if (this._store._remoteClients[clientId] || this.localID == clientId) { - this._modified.set(clientId, 0); - } - } - SyncEngine.prototype._uploadOutgoing.call(this); - }, - - _onRecordsWritten(succeeded, failed) { - // Reconcile the status of the local records with what we just wrote on the - // server - for (let id of succeeded) { - const commandChanges = this._currentlySyncingCommands[id]; - if (id == this.localID) { - if (this.localCommands) { - this.localCommands = this.localCommands.filter(command => !hasDupeCommand(commandChanges, command)); - } - } else { - const clientRecord = this._store._remoteClients[id]; - if (!commandChanges || !clientRecord) { - // should be impossible, else we wouldn't have been writing it. - this._log.warn("No command/No record changes for a client we uploaded"); - continue; - } - // fixup the client record, so our copy of _remoteClients matches what we uploaded. - clientRecord.commands = this._store.createRecord(id); - // we could do better and pass the reference to the record we just uploaded, - // but this will do for now - } - } - - // Re-add failed commands - for (let id of failed) { - const commandChanges = this._currentlySyncingCommands[id]; - if (!commandChanges) { - continue; - } - this._addClientCommand(id, commandChanges); - } - - this._deleteUploadedCommands(); - - // Notify other devices that their own client collection changed - const idsToNotify = succeeded.reduce((acc, id) => { - if (id == this.localID) { - return acc; - } - const fxaDeviceId = this.getClientFxaDeviceId(id); - return fxaDeviceId ? acc.concat(fxaDeviceId) : acc; - }, []); - if (idsToNotify.length > 0) { - this._notifyCollectionChanged(idsToNotify); - } - }, - - _notifyCollectionChanged(ids) { - const message = { - version: 1, - command: "sync:collection_changed", - data: { - collections: ["clients"] - } - }; - fxAccounts.notifyDevices(ids, message, NOTIFY_TAB_SENT_TTL_SECS); - }, - - _syncFinish() { - // Record histograms for our device types, and also write them to a pref - // so non-histogram telemetry (eg, UITelemetry) has easy access to them. - for (let [deviceType, count] of this.deviceTypes) { - let hid; - let prefName = this.name + ".devices."; - switch (deviceType) { - case "desktop": - hid = "WEAVE_DEVICE_COUNT_DESKTOP"; - prefName += "desktop"; - break; - case "mobile": - hid = "WEAVE_DEVICE_COUNT_MOBILE"; - prefName += "mobile"; - break; - default: - this._log.warn(`Unexpected deviceType "${deviceType}" recording device telemetry.`); - continue; - } - Services.telemetry.getHistogramById(hid).add(count); - Svc.Prefs.set(prefName, count); - } - SyncEngine.prototype._syncFinish.call(this); - }, - - _reconcile: function _reconcile(item) { - // Every incoming record is reconciled, so we use this to track the - // contents of the collection on the server. - this._incomingClients[item.id] = item.modified; - - if (!this._store.itemExists(item.id)) { - return true; - } - // Clients are synced unconditionally, so we'll always have new records. - // Unfortunately, this will cause the scheduler to use the immediate sync - // interval for the multi-device case, instead of the active interval. We - // work around this by updating the record during reconciliation, and - // returning false to indicate that the record doesn't need to be applied - // later. - this._store.update(item); - return false; + // Always process incoming items because they might have commands + _reconcile: function _reconcile() { + return true; }, // Treat reset the same as wiping for locally cached clients @@ -426,13 +155,7 @@ ClientEngine.prototype = { _wipeClient: function _wipeClient() { SyncEngine.prototype._resetClient.call(this); - delete this.localCommands; this._store.wipe(); - const logRemoveError = err => this._log.warn("Could not delete json file", err); - Async.promiseSpinningly( - Utils.jsonRemove("commands", this).catch(logRemoveError) - .then(Utils.jsonRemove("commands-syncing", this).catch(logRemoveError)) - ); }, removeClientData: function removeClientData() { @@ -471,6 +194,14 @@ ClientEngine.prototype = { }, /** + * Remove any commands for the local client and mark it for upload. + */ + clearCommands: function clearCommands() { + delete this.localCommands; + this._tracker.addChangedID(this.localID); + }, + + /** * Sends a command+args pair to a specific client. * * @param command Command string @@ -484,17 +215,30 @@ ClientEngine.prototype = { if (!client) { throw new Error("Unknown remote client ID: '" + clientId + "'."); } - if (client.stale) { - throw new Error("Stale remote client ID: '" + clientId + "'."); - } + + // notDupe compares two commands and returns if they are not equal. + let notDupe = function(other) { + return other.command != command || !Utils.deepEquals(other.args, args); + }; let action = { command: command, args: args, }; + if (!client.commands) { + client.commands = [action]; + } + // Add the new action if there are no duplicates. + else if (client.commands.every(notDupe)) { + client.commands.push(action); + } + // It must be a dupe. Skip. + else { + return; + } + this._log.trace("Client " + clientId + " got a new action: " + [command, args]); - this._addClientCommand(clientId, action); this._tracker.addChangedID(clientId); }, @@ -505,17 +249,13 @@ ClientEngine.prototype = { */ processIncomingCommands: function processIncomingCommands() { return this._notify("clients:process-commands", "", function() { - if (!this.localCommands) { - return true; - } + let commands = this.localCommands; - const clearedCommands = this._readCommands()[this.localID]; - const commands = this.localCommands.filter(command => !hasDupeCommand(clearedCommands, command)); + // Immediately clear out the commands as we've got them locally. + this.clearCommands(); - let URIsToDisplay = []; // Process each command in order. - for (let rawCommand of commands) { - let {command, args} = rawCommand; + for each (let {command, args} in commands) { this._log.debug("Processing command: " + command + "(" + args + ")"); let engines = [args[0]]; @@ -536,20 +276,12 @@ ClientEngine.prototype = { this.service.logout(); return false; case "displayURI": - let [uri, clientId, title] = args; - URIsToDisplay.push({ uri, clientId, title }); + this._handleDisplayURI.apply(this, args); break; default: this._log.debug("Received an unknown command: " + command); break; } - // Add the command to the "cleared" commands list - this._addClientCommand(this.localID, rawCommand) - } - this._tracker.addChangedID(this.localID); - - if (URIsToDisplay.length) { - this._handleDisplayURIs(URIsToDisplay); } return true; @@ -588,10 +320,8 @@ ClientEngine.prototype = { if (clientId) { this._sendCommandToClient(command, args, clientId); } else { - for (let [id, record] of Object.entries(this._store._remoteClients)) { - if (!record.stale) { - this._sendCommandToClient(command, args, id); - } + for (let id in this._store._remoteClients) { + this._sendCommandToClient(command, args, id); } } }, @@ -622,11 +352,11 @@ ClientEngine.prototype = { }, /** - * Handle a bunch of received 'displayURI' commands. + * Handle a single received 'displayURI' command. * - * Interested parties should observe the "weave:engine:clients:display-uris" - * topic. The callback will receive an array as the subject parameter - * containing objects with the following keys: + * Interested parties should observe the "weave:engine:clients:display-uri" + * topic. The callback will receive an object as the subject parameter with + * the following keys: * * uri URI (string) that is requested for display. * clientId ID of client that sent the command. @@ -634,24 +364,21 @@ ClientEngine.prototype = { * * The 'data' parameter to the callback will not be defined. * - * @param uris - * An array containing URI objects to display - * @param uris[].uri + * @param uri * String URI that was received - * @param uris[].clientId + * @param clientId * ID of client that sent URI - * @param uris[].title + * @param title * String title of page that URI corresponds to. Older clients may not * send this. */ - _handleDisplayURIs: function _handleDisplayURIs(uris) { - Svc.Obs.notify("weave:engine:clients:display-uris", uris); - }, + _handleDisplayURI: function _handleDisplayURI(uri, clientId, title) { + this._log.info("Received a URI for display: " + uri + " (" + title + + ") from " + clientId); - _removeRemoteClient(id) { - delete this._store._remoteClients[id]; - this._tracker.removeChangedID(id); - }, + let subject = {uri: uri, client: clientId, title: title}; + Svc.Obs.notify("weave:engine:clients:display-uri", subject); + } }; function ClientStore(name, engine) { @@ -660,48 +387,29 @@ function ClientStore(name, engine) { ClientStore.prototype = { __proto__: Store.prototype, - _remoteClients: {}, - create(record) { - this.update(record); + this.update(record) }, update: function update(record) { - if (record.id == this.engine.localID) { - // Only grab commands from the server; local name/type always wins + // Only grab commands from the server; local name/type always wins + if (record.id == this.engine.localID) this.engine.localCommands = record.commands; - } else { + else this._remoteClients[record.id] = record.cleartext; - } }, createRecord: function createRecord(id, collection) { let record = new ClientsRec(collection, id); - const commandsChanges = this.engine._currentlySyncingCommands ? - this.engine._currentlySyncingCommands[id] : - []; - // Package the individual components into a record for the local client if (id == this.engine.localID) { - let cb = Async.makeSpinningCallback(); - fxAccounts.getDeviceId().then(id => cb(null, id), cb); - try { - record.fxaDeviceId = cb.wait(); - } catch(error) { - this._log.warn("failed to get fxa device id", error); - } record.name = this.engine.localName; record.type = this.engine.localType; + record.commands = this.engine.localCommands; record.version = Services.appinfo.version; record.protocols = SUPPORTED_PROTOCOL_VERSIONS; - // Substract the commands we recorded that we've already executed - if (commandsChanges && commandsChanges.length && - this.engine.localCommands && this.engine.localCommands.length) { - record.commands = this.engine.localCommands.filter(command => !hasDupeCommand(commandsChanges, command)); - } - // Optional fields. record.os = Services.appinfo.OS; // "Darwin" record.appPackage = Services.appinfo.ID; @@ -712,20 +420,6 @@ ClientStore.prototype = { // record.formfactor = ""; // Bug 1100722 } else { record.cleartext = this._remoteClients[id]; - - // Add the commands we have to send - if (commandsChanges && commandsChanges.length) { - const recordCommands = record.cleartext.commands || []; - const newCommands = commandsChanges.filter(command => !hasDupeCommand(recordCommands, command)); - record.cleartext.commands = recordCommands.concat(newCommands); - } - - if (record.cleartext.stale) { - // It's almost certainly a logic error for us to upload a record we - // consider stale, so make log noise, but still remove the flag. - this._log.error(`Preparing to upload record ${id} that we consider stale`); - delete record.cleartext.stale; - } } return record; @@ -768,7 +462,7 @@ ClientsTracker.prototype = { break; case "weave:engine:stop-tracking": if (this._enabled) { - Svc.Prefs.ignore("client.name", this); + Svc.Prefs.ignore("clients.name", this); this._enabled = false; } break; |