summaryrefslogtreecommitdiffstats
path: root/services/sync/tests/unit/test_syncengine_sync.js
diff options
context:
space:
mode:
Diffstat (limited to 'services/sync/tests/unit/test_syncengine_sync.js')
-rw-r--r--services/sync/tests/unit/test_syncengine_sync.js1855
1 files changed, 1855 insertions, 0 deletions
diff --git a/services/sync/tests/unit/test_syncengine_sync.js b/services/sync/tests/unit/test_syncengine_sync.js
new file mode 100644
index 000000000..97289962f
--- /dev/null
+++ b/services/sync/tests/unit/test_syncengine_sync.js
@@ -0,0 +1,1855 @@
+/* Any copyright is dedicated to the Public Domain.
+ * http://creativecommons.org/publicdomain/zero/1.0/ */
+
+Cu.import("resource://services-sync/constants.js");
+Cu.import("resource://services-sync/engines.js");
+Cu.import("resource://services-sync/policies.js");
+Cu.import("resource://services-sync/record.js");
+Cu.import("resource://services-sync/resource.js");
+Cu.import("resource://services-sync/service.js");
+Cu.import("resource://services-sync/util.js");
+Cu.import("resource://testing-common/services/sync/rotaryengine.js");
+Cu.import("resource://testing-common/services/sync/utils.js");
+
+function makeRotaryEngine() {
+ return new RotaryEngine(Service);
+}
+
+function clean() {
+ Svc.Prefs.resetBranch("");
+ Svc.Prefs.set("log.logger.engine.rotary", "Trace");
+ Service.recordManager.clearCache();
+}
+
+function cleanAndGo(server) {
+ clean();
+ server.stop(run_next_test);
+}
+
+function promiseClean(server) {
+ clean();
+ return new Promise(resolve => server.stop(resolve));
+}
+
+function configureService(server, username, password) {
+ Service.clusterURL = server.baseURI;
+
+ Service.identity.account = username || "foo";
+ Service.identity.basicPassword = password || "password";
+}
+
+function createServerAndConfigureClient() {
+ let engine = new RotaryEngine(Service);
+
+ let contents = {
+ meta: {global: {engines: {rotary: {version: engine.version,
+ syncID: engine.syncID}}}},
+ crypto: {},
+ rotary: {}
+ };
+
+ const USER = "foo";
+ let server = new SyncServer();
+ server.registerUser(USER, "password");
+ server.createContents(USER, contents);
+ server.start();
+
+ Service.serverURL = server.baseURI;
+ Service.clusterURL = server.baseURI;
+ Service.identity.username = USER;
+ Service._updateCachedURLs();
+
+ return [engine, server, USER];
+}
+
+function run_test() {
+ generateNewKeys(Service.collectionKeys);
+ Svc.Prefs.set("log.logger.engine.rotary", "Trace");
+ run_next_test();
+}
+
+/*
+ * Tests
+ *
+ * SyncEngine._sync() is divided into four rather independent steps:
+ *
+ * - _syncStartup()
+ * - _processIncoming()
+ * - _uploadOutgoing()
+ * - _syncFinish()
+ *
+ * In the spirit of unit testing, these are tested individually for
+ * different scenarios below.
+ */
+
+add_test(function test_syncStartup_emptyOrOutdatedGlobalsResetsSync() {
+ _("SyncEngine._syncStartup resets sync and wipes server data if there's no or an outdated global record");
+
+ // Some server side data that's going to be wiped
+ let collection = new ServerCollection();
+ collection.insert('flying',
+ encryptPayload({id: 'flying',
+ denomination: "LNER Class A3 4472"}));
+ collection.insert('scotsman',
+ encryptPayload({id: 'scotsman',
+ denomination: "Flying Scotsman"}));
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+ Service.identity.username = "foo";
+
+ let engine = makeRotaryEngine();
+ engine._store.items = {rekolok: "Rekonstruktionslokomotive"};
+ try {
+
+ // Confirm initial environment
+ do_check_eq(engine._tracker.changedIDs["rekolok"], undefined);
+ let metaGlobal = Service.recordManager.get(engine.metaURL);
+ do_check_eq(metaGlobal.payload.engines, undefined);
+ do_check_true(!!collection.payload("flying"));
+ do_check_true(!!collection.payload("scotsman"));
+
+ engine.lastSync = Date.now() / 1000;
+ engine.lastSyncLocal = Date.now();
+
+ // Trying to prompt a wipe -- we no longer track CryptoMeta per engine,
+ // so it has nothing to check.
+ engine._syncStartup();
+
+ // The meta/global WBO has been filled with data about the engine
+ let engineData = metaGlobal.payload.engines["rotary"];
+ do_check_eq(engineData.version, engine.version);
+ do_check_eq(engineData.syncID, engine.syncID);
+
+ // Sync was reset and server data was wiped
+ do_check_eq(engine.lastSync, 0);
+ do_check_eq(collection.payload("flying"), undefined);
+ do_check_eq(collection.payload("scotsman"), undefined);
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+add_test(function test_syncStartup_serverHasNewerVersion() {
+ _("SyncEngine._syncStartup ");
+
+ let global = new ServerWBO('global', {engines: {rotary: {version: 23456}}});
+ let server = httpd_setup({
+ "/1.1/foo/storage/meta/global": global.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+ Service.identity.username = "foo";
+
+ let engine = makeRotaryEngine();
+ try {
+
+ // The server has a newer version of the data and our engine can
+ // handle. That should give us an exception.
+ let error;
+ try {
+ engine._syncStartup();
+ } catch (ex) {
+ error = ex;
+ }
+ do_check_eq(error.failureCode, VERSION_OUT_OF_DATE);
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_test(function test_syncStartup_syncIDMismatchResetsClient() {
+ _("SyncEngine._syncStartup resets sync if syncIDs don't match");
+
+ let server = sync_httpd_setup({});
+ let syncTesting = new SyncTestingInfrastructure(server);
+ Service.identity.username = "foo";
+
+ // global record with a different syncID than our engine has
+ let engine = makeRotaryEngine();
+ let global = new ServerWBO('global',
+ {engines: {rotary: {version: engine.version,
+ syncID: 'foobar'}}});
+ server.registerPathHandler("/1.1/foo/storage/meta/global", global.handler());
+
+ try {
+
+ // Confirm initial environment
+ do_check_eq(engine.syncID, 'fake-guid-00');
+ do_check_eq(engine._tracker.changedIDs["rekolok"], undefined);
+
+ engine.lastSync = Date.now() / 1000;
+ engine.lastSyncLocal = Date.now();
+ engine._syncStartup();
+
+ // The engine has assumed the server's syncID
+ do_check_eq(engine.syncID, 'foobar');
+
+ // Sync was reset
+ do_check_eq(engine.lastSync, 0);
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_test(function test_processIncoming_emptyServer() {
+ _("SyncEngine._processIncoming working with an empty server backend");
+
+ let collection = new ServerCollection();
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+ Service.identity.username = "foo";
+
+ let engine = makeRotaryEngine();
+ try {
+
+ // Merely ensure that this code path is run without any errors
+ engine._processIncoming();
+ do_check_eq(engine.lastSync, 0);
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_test(function test_processIncoming_createFromServer() {
+ _("SyncEngine._processIncoming creates new records from server data");
+
+ // Some server records that will be downloaded
+ let collection = new ServerCollection();
+ collection.insert('flying',
+ encryptPayload({id: 'flying',
+ denomination: "LNER Class A3 4472"}));
+ collection.insert('scotsman',
+ encryptPayload({id: 'scotsman',
+ denomination: "Flying Scotsman"}));
+
+ // Two pathological cases involving relative URIs gone wrong.
+ let pathologicalPayload = encryptPayload({id: '../pathological',
+ denomination: "Pathological Case"});
+ collection.insert('../pathological', pathologicalPayload);
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler(),
+ "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(),
+ "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+ Service.identity.username = "foo";
+
+ generateNewKeys(Service.collectionKeys);
+
+ let engine = makeRotaryEngine();
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+
+ try {
+
+ // Confirm initial environment
+ do_check_eq(engine.lastSync, 0);
+ do_check_eq(engine.lastModified, null);
+ do_check_eq(engine._store.items.flying, undefined);
+ do_check_eq(engine._store.items.scotsman, undefined);
+ do_check_eq(engine._store.items['../pathological'], undefined);
+
+ engine._syncStartup();
+ engine._processIncoming();
+
+ // Timestamps of last sync and last server modification are set.
+ do_check_true(engine.lastSync > 0);
+ do_check_true(engine.lastModified > 0);
+
+ // Local records have been created from the server data.
+ do_check_eq(engine._store.items.flying, "LNER Class A3 4472");
+ do_check_eq(engine._store.items.scotsman, "Flying Scotsman");
+ do_check_eq(engine._store.items['../pathological'], "Pathological Case");
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_test(function test_processIncoming_reconcile() {
+ _("SyncEngine._processIncoming updates local records");
+
+ let collection = new ServerCollection();
+
+ // This server record is newer than the corresponding client one,
+ // so it'll update its data.
+ collection.insert('newrecord',
+ encryptPayload({id: 'newrecord',
+ denomination: "New stuff..."}));
+
+ // This server record is newer than the corresponding client one,
+ // so it'll update its data.
+ collection.insert('newerserver',
+ encryptPayload({id: 'newerserver',
+ denomination: "New data!"}));
+
+ // This server record is 2 mins older than the client counterpart
+ // but identical to it, so we're expecting the client record's
+ // changedID to be reset.
+ collection.insert('olderidentical',
+ encryptPayload({id: 'olderidentical',
+ denomination: "Older but identical"}));
+ collection._wbos.olderidentical.modified -= 120;
+
+ // This item simply has different data than the corresponding client
+ // record (which is unmodified), so it will update the client as well
+ collection.insert('updateclient',
+ encryptPayload({id: 'updateclient',
+ denomination: "Get this!"}));
+
+ // This is a dupe of 'original'.
+ collection.insert('duplication',
+ encryptPayload({id: 'duplication',
+ denomination: "Original Entry"}));
+
+ // This record is marked as deleted, so we're expecting the client
+ // record to be removed.
+ collection.insert('nukeme',
+ encryptPayload({id: 'nukeme',
+ denomination: "Nuke me!",
+ deleted: true}));
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+ Service.identity.username = "foo";
+
+ let engine = makeRotaryEngine();
+ engine._store.items = {newerserver: "New data, but not as new as server!",
+ olderidentical: "Older but identical",
+ updateclient: "Got data?",
+ original: "Original Entry",
+ long_original: "Long Original Entry",
+ nukeme: "Nuke me!"};
+ // Make this record 1 min old, thus older than the one on the server
+ engine._tracker.addChangedID('newerserver', Date.now()/1000 - 60);
+ // This record has been changed 2 mins later than the one on the server
+ engine._tracker.addChangedID('olderidentical', Date.now()/1000);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+
+ try {
+
+ // Confirm initial environment
+ do_check_eq(engine._store.items.newrecord, undefined);
+ do_check_eq(engine._store.items.newerserver, "New data, but not as new as server!");
+ do_check_eq(engine._store.items.olderidentical, "Older but identical");
+ do_check_eq(engine._store.items.updateclient, "Got data?");
+ do_check_eq(engine._store.items.nukeme, "Nuke me!");
+ do_check_true(engine._tracker.changedIDs['olderidentical'] > 0);
+
+ engine._syncStartup();
+ engine._processIncoming();
+
+ // Timestamps of last sync and last server modification are set.
+ do_check_true(engine.lastSync > 0);
+ do_check_true(engine.lastModified > 0);
+
+ // The new record is created.
+ do_check_eq(engine._store.items.newrecord, "New stuff...");
+
+ // The 'newerserver' record is updated since the server data is newer.
+ do_check_eq(engine._store.items.newerserver, "New data!");
+
+ // The data for 'olderidentical' is identical on the server, so
+ // it's no longer marked as changed anymore.
+ do_check_eq(engine._store.items.olderidentical, "Older but identical");
+ do_check_eq(engine._tracker.changedIDs['olderidentical'], undefined);
+
+ // Updated with server data.
+ do_check_eq(engine._store.items.updateclient, "Get this!");
+
+ // The incoming ID is preferred.
+ do_check_eq(engine._store.items.original, undefined);
+ do_check_eq(engine._store.items.duplication, "Original Entry");
+ do_check_neq(engine._delete.ids.indexOf("original"), -1);
+
+ // The 'nukeme' record marked as deleted is removed.
+ do_check_eq(engine._store.items.nukeme, undefined);
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+add_test(function test_processIncoming_reconcile_local_deleted() {
+ _("Ensure local, duplicate ID is deleted on server.");
+
+ // When a duplicate is resolved, the local ID (which is never taken) should
+ // be deleted on the server.
+ let [engine, server, user] = createServerAndConfigureClient();
+
+ let now = Date.now() / 1000 - 10;
+ engine.lastSync = now;
+ engine.lastModified = now + 1;
+
+ let record = encryptPayload({id: "DUPE_INCOMING", denomination: "incoming"});
+ let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
+ server.insertWBO(user, "rotary", wbo);
+
+ record = encryptPayload({id: "DUPE_LOCAL", denomination: "local"});
+ wbo = new ServerWBO("DUPE_LOCAL", record, now - 1);
+ server.insertWBO(user, "rotary", wbo);
+
+ engine._store.create({id: "DUPE_LOCAL", denomination: "local"});
+ do_check_true(engine._store.itemExists("DUPE_LOCAL"));
+ do_check_eq("DUPE_LOCAL", engine._findDupe({id: "DUPE_INCOMING"}));
+
+ engine._sync();
+
+ do_check_attribute_count(engine._store.items, 1);
+ do_check_true("DUPE_INCOMING" in engine._store.items);
+
+ let collection = server.getCollection(user, "rotary");
+ do_check_eq(1, collection.count());
+ do_check_neq(undefined, collection.wbo("DUPE_INCOMING"));
+
+ cleanAndGo(server);
+});
+
+add_test(function test_processIncoming_reconcile_equivalent() {
+ _("Ensure proper handling of incoming records that match local.");
+
+ let [engine, server, user] = createServerAndConfigureClient();
+
+ let now = Date.now() / 1000 - 10;
+ engine.lastSync = now;
+ engine.lastModified = now + 1;
+
+ let record = encryptPayload({id: "entry", denomination: "denomination"});
+ let wbo = new ServerWBO("entry", record, now + 2);
+ server.insertWBO(user, "rotary", wbo);
+
+ engine._store.items = {entry: "denomination"};
+ do_check_true(engine._store.itemExists("entry"));
+
+ engine._sync();
+
+ do_check_attribute_count(engine._store.items, 1);
+
+ cleanAndGo(server);
+});
+
+add_test(function test_processIncoming_reconcile_locally_deleted_dupe_new() {
+ _("Ensure locally deleted duplicate record newer than incoming is handled.");
+
+ // This is a somewhat complicated test. It ensures that if a client receives
+ // a modified record for an item that is deleted locally but with a different
+ // ID that the incoming record is ignored. This is a corner case for record
+ // handling, but it needs to be supported.
+ let [engine, server, user] = createServerAndConfigureClient();
+
+ let now = Date.now() / 1000 - 10;
+ engine.lastSync = now;
+ engine.lastModified = now + 1;
+
+ let record = encryptPayload({id: "DUPE_INCOMING", denomination: "incoming"});
+ let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
+ server.insertWBO(user, "rotary", wbo);
+
+ // Simulate a locally-deleted item.
+ engine._store.items = {};
+ engine._tracker.addChangedID("DUPE_LOCAL", now + 3);
+ do_check_false(engine._store.itemExists("DUPE_LOCAL"));
+ do_check_false(engine._store.itemExists("DUPE_INCOMING"));
+ do_check_eq("DUPE_LOCAL", engine._findDupe({id: "DUPE_INCOMING"}));
+
+ engine._sync();
+
+ // After the sync, the server's payload for the original ID should be marked
+ // as deleted.
+ do_check_empty(engine._store.items);
+ let collection = server.getCollection(user, "rotary");
+ do_check_eq(1, collection.count());
+ wbo = collection.wbo("DUPE_INCOMING");
+ do_check_neq(null, wbo);
+ let payload = JSON.parse(JSON.parse(wbo.payload).ciphertext);
+ do_check_true(payload.deleted);
+
+ cleanAndGo(server);
+});
+
+add_test(function test_processIncoming_reconcile_locally_deleted_dupe_old() {
+ _("Ensure locally deleted duplicate record older than incoming is restored.");
+
+ // This is similar to the above test except it tests the condition where the
+ // incoming record is newer than the local deletion, therefore overriding it.
+
+ let [engine, server, user] = createServerAndConfigureClient();
+
+ let now = Date.now() / 1000 - 10;
+ engine.lastSync = now;
+ engine.lastModified = now + 1;
+
+ let record = encryptPayload({id: "DUPE_INCOMING", denomination: "incoming"});
+ let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
+ server.insertWBO(user, "rotary", wbo);
+
+ // Simulate a locally-deleted item.
+ engine._store.items = {};
+ engine._tracker.addChangedID("DUPE_LOCAL", now + 1);
+ do_check_false(engine._store.itemExists("DUPE_LOCAL"));
+ do_check_false(engine._store.itemExists("DUPE_INCOMING"));
+ do_check_eq("DUPE_LOCAL", engine._findDupe({id: "DUPE_INCOMING"}));
+
+ engine._sync();
+
+ // Since the remote change is newer, the incoming item should exist locally.
+ do_check_attribute_count(engine._store.items, 1);
+ do_check_true("DUPE_INCOMING" in engine._store.items);
+ do_check_eq("incoming", engine._store.items.DUPE_INCOMING);
+
+ let collection = server.getCollection(user, "rotary");
+ do_check_eq(1, collection.count());
+ wbo = collection.wbo("DUPE_INCOMING");
+ let payload = JSON.parse(JSON.parse(wbo.payload).ciphertext);
+ do_check_eq("incoming", payload.denomination);
+
+ cleanAndGo(server);
+});
+
+add_test(function test_processIncoming_reconcile_changed_dupe() {
+ _("Ensure that locally changed duplicate record is handled properly.");
+
+ let [engine, server, user] = createServerAndConfigureClient();
+
+ let now = Date.now() / 1000 - 10;
+ engine.lastSync = now;
+ engine.lastModified = now + 1;
+
+ // The local record is newer than the incoming one, so it should be retained.
+ let record = encryptPayload({id: "DUPE_INCOMING", denomination: "incoming"});
+ let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
+ server.insertWBO(user, "rotary", wbo);
+
+ engine._store.create({id: "DUPE_LOCAL", denomination: "local"});
+ engine._tracker.addChangedID("DUPE_LOCAL", now + 3);
+ do_check_true(engine._store.itemExists("DUPE_LOCAL"));
+ do_check_eq("DUPE_LOCAL", engine._findDupe({id: "DUPE_INCOMING"}));
+
+ engine._sync();
+
+ // The ID should have been changed to incoming.
+ do_check_attribute_count(engine._store.items, 1);
+ do_check_true("DUPE_INCOMING" in engine._store.items);
+
+ // On the server, the local ID should be deleted and the incoming ID should
+ // have its payload set to what was in the local record.
+ let collection = server.getCollection(user, "rotary");
+ do_check_eq(1, collection.count());
+ wbo = collection.wbo("DUPE_INCOMING");
+ do_check_neq(undefined, wbo);
+ let payload = JSON.parse(JSON.parse(wbo.payload).ciphertext);
+ do_check_eq("local", payload.denomination);
+
+ cleanAndGo(server);
+});
+
+add_test(function test_processIncoming_reconcile_changed_dupe_new() {
+ _("Ensure locally changed duplicate record older than incoming is ignored.");
+
+ // This test is similar to the above except the incoming record is younger
+ // than the local record. The incoming record should be authoritative.
+ let [engine, server, user] = createServerAndConfigureClient();
+
+ let now = Date.now() / 1000 - 10;
+ engine.lastSync = now;
+ engine.lastModified = now + 1;
+
+ let record = encryptPayload({id: "DUPE_INCOMING", denomination: "incoming"});
+ let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
+ server.insertWBO(user, "rotary", wbo);
+
+ engine._store.create({id: "DUPE_LOCAL", denomination: "local"});
+ engine._tracker.addChangedID("DUPE_LOCAL", now + 1);
+ do_check_true(engine._store.itemExists("DUPE_LOCAL"));
+ do_check_eq("DUPE_LOCAL", engine._findDupe({id: "DUPE_INCOMING"}));
+
+ engine._sync();
+
+ // The ID should have been changed to incoming.
+ do_check_attribute_count(engine._store.items, 1);
+ do_check_true("DUPE_INCOMING" in engine._store.items);
+
+ // On the server, the local ID should be deleted and the incoming ID should
+ // have its payload retained.
+ let collection = server.getCollection(user, "rotary");
+ do_check_eq(1, collection.count());
+ wbo = collection.wbo("DUPE_INCOMING");
+ do_check_neq(undefined, wbo);
+ let payload = JSON.parse(JSON.parse(wbo.payload).ciphertext);
+ do_check_eq("incoming", payload.denomination);
+ cleanAndGo(server);
+});
+
+add_test(function test_processIncoming_mobile_batchSize() {
+ _("SyncEngine._processIncoming doesn't fetch everything at once on mobile clients");
+
+ Svc.Prefs.set("client.type", "mobile");
+ Service.identity.username = "foo";
+
+ // A collection that logs each GET
+ let collection = new ServerCollection();
+ collection.get_log = [];
+ collection._get = collection.get;
+ collection.get = function (options) {
+ this.get_log.push(options);
+ return this._get(options);
+ };
+
+ // Let's create some 234 server side records. They're all at least
+ // 10 minutes old.
+ for (let i = 0; i < 234; i++) {
+ let id = 'record-no-' + i;
+ let payload = encryptPayload({id: id, denomination: "Record No. " + i});
+ let wbo = new ServerWBO(id, payload);
+ wbo.modified = Date.now()/1000 - 60*(i+10);
+ collection.insertWBO(wbo);
+ }
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let engine = makeRotaryEngine();
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+
+ try {
+
+ _("On a mobile client, we get new records from the server in batches of 50.");
+ engine._syncStartup();
+ engine._processIncoming();
+ do_check_attribute_count(engine._store.items, 234);
+ do_check_true('record-no-0' in engine._store.items);
+ do_check_true('record-no-49' in engine._store.items);
+ do_check_true('record-no-50' in engine._store.items);
+ do_check_true('record-no-233' in engine._store.items);
+
+ // Verify that the right number of GET requests with the right
+ // kind of parameters were made.
+ do_check_eq(collection.get_log.length,
+ Math.ceil(234 / MOBILE_BATCH_SIZE) + 1);
+ do_check_eq(collection.get_log[0].full, 1);
+ do_check_eq(collection.get_log[0].limit, MOBILE_BATCH_SIZE);
+ do_check_eq(collection.get_log[1].full, undefined);
+ do_check_eq(collection.get_log[1].limit, undefined);
+ for (let i = 1; i <= Math.floor(234 / MOBILE_BATCH_SIZE); i++) {
+ do_check_eq(collection.get_log[i+1].full, 1);
+ do_check_eq(collection.get_log[i+1].limit, undefined);
+ if (i < Math.floor(234 / MOBILE_BATCH_SIZE))
+ do_check_eq(collection.get_log[i+1].ids.length, MOBILE_BATCH_SIZE);
+ else
+ do_check_eq(collection.get_log[i+1].ids.length, 234 % MOBILE_BATCH_SIZE);
+ }
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_task(function *test_processIncoming_store_toFetch() {
+ _("If processIncoming fails in the middle of a batch on mobile, state is saved in toFetch and lastSync.");
+ Service.identity.username = "foo";
+ Svc.Prefs.set("client.type", "mobile");
+
+ // A collection that throws at the fourth get.
+ let collection = new ServerCollection();
+ collection._get_calls = 0;
+ collection._get = collection.get;
+ collection.get = function() {
+ this._get_calls += 1;
+ if (this._get_calls > 3) {
+ throw "Abort on fourth call!";
+ }
+ return this._get.apply(this, arguments);
+ };
+
+ // Let's create three batches worth of server side records.
+ for (var i = 0; i < MOBILE_BATCH_SIZE * 3; i++) {
+ let id = 'record-no-' + i;
+ let payload = encryptPayload({id: id, denomination: "Record No. " + id});
+ let wbo = new ServerWBO(id, payload);
+ wbo.modified = Date.now()/1000 + 60 * (i - MOBILE_BATCH_SIZE * 3);
+ collection.insertWBO(wbo);
+ }
+
+ let engine = makeRotaryEngine();
+ engine.enabled = true;
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+ try {
+
+ // Confirm initial environment
+ do_check_eq(engine.lastSync, 0);
+ do_check_empty(engine._store.items);
+
+ let error;
+ try {
+ yield sync_engine_and_validate_telem(engine, true);
+ } catch (ex) {
+ error = ex;
+ }
+
+ // Only the first two batches have been applied.
+ do_check_eq(Object.keys(engine._store.items).length,
+ MOBILE_BATCH_SIZE * 2);
+
+ // The third batch is stuck in toFetch. lastSync has been moved forward to
+ // the last successful item's timestamp.
+ do_check_eq(engine.toFetch.length, MOBILE_BATCH_SIZE);
+ do_check_eq(engine.lastSync, collection.wbo("record-no-99").modified);
+
+ } finally {
+ yield promiseClean(server);
+ }
+});
+
+
+add_test(function test_processIncoming_resume_toFetch() {
+ _("toFetch and previousFailed items left over from previous syncs are fetched on the next sync, along with new items.");
+ Service.identity.username = "foo";
+
+ const LASTSYNC = Date.now() / 1000;
+
+ // Server records that will be downloaded
+ let collection = new ServerCollection();
+ collection.insert('flying',
+ encryptPayload({id: 'flying',
+ denomination: "LNER Class A3 4472"}));
+ collection.insert('scotsman',
+ encryptPayload({id: 'scotsman',
+ denomination: "Flying Scotsman"}));
+ collection.insert('rekolok',
+ encryptPayload({id: 'rekolok',
+ denomination: "Rekonstruktionslokomotive"}));
+ for (let i = 0; i < 3; i++) {
+ let id = 'failed' + i;
+ let payload = encryptPayload({id: id, denomination: "Record No. " + i});
+ let wbo = new ServerWBO(id, payload);
+ wbo.modified = LASTSYNC - 10;
+ collection.insertWBO(wbo);
+ }
+
+ collection.wbo("flying").modified =
+ collection.wbo("scotsman").modified = LASTSYNC - 10;
+ collection._wbos.rekolok.modified = LASTSYNC + 10;
+
+ // Time travel 10 seconds into the future but still download the above WBOs.
+ let engine = makeRotaryEngine();
+ engine.lastSync = LASTSYNC;
+ engine.toFetch = ["flying", "scotsman"];
+ engine.previousFailed = ["failed0", "failed1", "failed2"];
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+ try {
+
+ // Confirm initial environment
+ do_check_eq(engine._store.items.flying, undefined);
+ do_check_eq(engine._store.items.scotsman, undefined);
+ do_check_eq(engine._store.items.rekolok, undefined);
+
+ engine._syncStartup();
+ engine._processIncoming();
+
+ // Local records have been created from the server data.
+ do_check_eq(engine._store.items.flying, "LNER Class A3 4472");
+ do_check_eq(engine._store.items.scotsman, "Flying Scotsman");
+ do_check_eq(engine._store.items.rekolok, "Rekonstruktionslokomotive");
+ do_check_eq(engine._store.items.failed0, "Record No. 0");
+ do_check_eq(engine._store.items.failed1, "Record No. 1");
+ do_check_eq(engine._store.items.failed2, "Record No. 2");
+ do_check_eq(engine.previousFailed.length, 0);
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_test(function test_processIncoming_applyIncomingBatchSize_smaller() {
+ _("Ensure that a number of incoming items less than applyIncomingBatchSize is still applied.");
+ Service.identity.username = "foo";
+
+ // Engine that doesn't like the first and last record it's given.
+ const APPLY_BATCH_SIZE = 10;
+ let engine = makeRotaryEngine();
+ engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
+ engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
+ engine._store.applyIncomingBatch = function (records) {
+ let failed1 = records.shift();
+ let failed2 = records.pop();
+ this._applyIncomingBatch(records);
+ return [failed1.id, failed2.id];
+ };
+
+ // Let's create less than a batch worth of server side records.
+ let collection = new ServerCollection();
+ for (let i = 0; i < APPLY_BATCH_SIZE - 1; i++) {
+ let id = 'record-no-' + i;
+ let payload = encryptPayload({id: id, denomination: "Record No. " + id});
+ collection.insert(id, payload);
+ }
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+ try {
+
+ // Confirm initial environment
+ do_check_empty(engine._store.items);
+
+ engine._syncStartup();
+ engine._processIncoming();
+
+ // Records have been applied and the expected failures have failed.
+ do_check_attribute_count(engine._store.items, APPLY_BATCH_SIZE - 1 - 2);
+ do_check_eq(engine.toFetch.length, 0);
+ do_check_eq(engine.previousFailed.length, 2);
+ do_check_eq(engine.previousFailed[0], "record-no-0");
+ do_check_eq(engine.previousFailed[1], "record-no-8");
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_test(function test_processIncoming_applyIncomingBatchSize_multiple() {
+ _("Ensure that incoming items are applied according to applyIncomingBatchSize.");
+ Service.identity.username = "foo";
+
+ const APPLY_BATCH_SIZE = 10;
+
+ // Engine that applies records in batches.
+ let engine = makeRotaryEngine();
+ engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
+ let batchCalls = 0;
+ engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
+ engine._store.applyIncomingBatch = function (records) {
+ batchCalls += 1;
+ do_check_eq(records.length, APPLY_BATCH_SIZE);
+ this._applyIncomingBatch.apply(this, arguments);
+ };
+
+ // Let's create three batches worth of server side records.
+ let collection = new ServerCollection();
+ for (let i = 0; i < APPLY_BATCH_SIZE * 3; i++) {
+ let id = 'record-no-' + i;
+ let payload = encryptPayload({id: id, denomination: "Record No. " + id});
+ collection.insert(id, payload);
+ }
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+ try {
+
+ // Confirm initial environment
+ do_check_empty(engine._store.items);
+
+ engine._syncStartup();
+ engine._processIncoming();
+
+ // Records have been applied in 3 batches.
+ do_check_eq(batchCalls, 3);
+ do_check_attribute_count(engine._store.items, APPLY_BATCH_SIZE * 3);
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_test(function test_processIncoming_notify_count() {
+ _("Ensure that failed records are reported only once.");
+ Service.identity.username = "foo";
+
+ const APPLY_BATCH_SIZE = 5;
+ const NUMBER_OF_RECORDS = 15;
+
+ // Engine that fails the first record.
+ let engine = makeRotaryEngine();
+ engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
+ engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
+ engine._store.applyIncomingBatch = function (records) {
+ engine._store._applyIncomingBatch(records.slice(1));
+ return [records[0].id];
+ };
+
+ // Create a batch of server side records.
+ let collection = new ServerCollection();
+ for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
+ let id = 'record-no-' + i;
+ let payload = encryptPayload({id: id, denomination: "Record No. " + id});
+ collection.insert(id, payload);
+ }
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+ try {
+ // Confirm initial environment.
+ do_check_eq(engine.lastSync, 0);
+ do_check_eq(engine.toFetch.length, 0);
+ do_check_eq(engine.previousFailed.length, 0);
+ do_check_empty(engine._store.items);
+
+ let called = 0;
+ let counts;
+ function onApplied(count) {
+ _("Called with " + JSON.stringify(counts));
+ counts = count;
+ called++;
+ }
+ Svc.Obs.add("weave:engine:sync:applied", onApplied);
+
+ // Do sync.
+ engine._syncStartup();
+ engine._processIncoming();
+
+ // Confirm failures.
+ do_check_attribute_count(engine._store.items, 12);
+ do_check_eq(engine.previousFailed.length, 3);
+ do_check_eq(engine.previousFailed[0], "record-no-0");
+ do_check_eq(engine.previousFailed[1], "record-no-5");
+ do_check_eq(engine.previousFailed[2], "record-no-10");
+
+ // There are newly failed records and they are reported.
+ do_check_eq(called, 1);
+ do_check_eq(counts.failed, 3);
+ do_check_eq(counts.applied, 15);
+ do_check_eq(counts.newFailed, 3);
+ do_check_eq(counts.succeeded, 12);
+
+ // Sync again, 1 of the failed items are the same, the rest didn't fail.
+ engine._processIncoming();
+
+ // Confirming removed failures.
+ do_check_attribute_count(engine._store.items, 14);
+ do_check_eq(engine.previousFailed.length, 1);
+ do_check_eq(engine.previousFailed[0], "record-no-0");
+
+ do_check_eq(called, 2);
+ do_check_eq(counts.failed, 1);
+ do_check_eq(counts.applied, 3);
+ do_check_eq(counts.newFailed, 0);
+ do_check_eq(counts.succeeded, 2);
+
+ Svc.Obs.remove("weave:engine:sync:applied", onApplied);
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_test(function test_processIncoming_previousFailed() {
+ _("Ensure that failed records are retried.");
+ Service.identity.username = "foo";
+ Svc.Prefs.set("client.type", "mobile");
+
+ const APPLY_BATCH_SIZE = 4;
+ const NUMBER_OF_RECORDS = 14;
+
+ // Engine that fails the first 2 records.
+ let engine = makeRotaryEngine();
+ engine.mobileGUIDFetchBatchSize = engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
+ engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
+ engine._store.applyIncomingBatch = function (records) {
+ engine._store._applyIncomingBatch(records.slice(2));
+ return [records[0].id, records[1].id];
+ };
+
+ // Create a batch of server side records.
+ let collection = new ServerCollection();
+ for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
+ let id = 'record-no-' + i;
+ let payload = encryptPayload({id: id, denomination: "Record No. " + i});
+ collection.insert(id, payload);
+ }
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+ try {
+ // Confirm initial environment.
+ do_check_eq(engine.lastSync, 0);
+ do_check_eq(engine.toFetch.length, 0);
+ do_check_eq(engine.previousFailed.length, 0);
+ do_check_empty(engine._store.items);
+
+ // Initial failed items in previousFailed to be reset.
+ let previousFailed = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
+ engine.previousFailed = previousFailed;
+ do_check_eq(engine.previousFailed, previousFailed);
+
+ // Do sync.
+ engine._syncStartup();
+ engine._processIncoming();
+
+ // Expected result: 4 sync batches with 2 failures each => 8 failures
+ do_check_attribute_count(engine._store.items, 6);
+ do_check_eq(engine.previousFailed.length, 8);
+ do_check_eq(engine.previousFailed[0], "record-no-0");
+ do_check_eq(engine.previousFailed[1], "record-no-1");
+ do_check_eq(engine.previousFailed[2], "record-no-4");
+ do_check_eq(engine.previousFailed[3], "record-no-5");
+ do_check_eq(engine.previousFailed[4], "record-no-8");
+ do_check_eq(engine.previousFailed[5], "record-no-9");
+ do_check_eq(engine.previousFailed[6], "record-no-12");
+ do_check_eq(engine.previousFailed[7], "record-no-13");
+
+ // Sync again with the same failed items (records 0, 1, 8, 9).
+ engine._processIncoming();
+
+ // A second sync with the same failed items should not add the same items again.
+ // Items that did not fail a second time should no longer be in previousFailed.
+ do_check_attribute_count(engine._store.items, 10);
+ do_check_eq(engine.previousFailed.length, 4);
+ do_check_eq(engine.previousFailed[0], "record-no-0");
+ do_check_eq(engine.previousFailed[1], "record-no-1");
+ do_check_eq(engine.previousFailed[2], "record-no-8");
+ do_check_eq(engine.previousFailed[3], "record-no-9");
+
+ // Refetched items that didn't fail the second time are in engine._store.items.
+ do_check_eq(engine._store.items['record-no-4'], "Record No. 4");
+ do_check_eq(engine._store.items['record-no-5'], "Record No. 5");
+ do_check_eq(engine._store.items['record-no-12'], "Record No. 12");
+ do_check_eq(engine._store.items['record-no-13'], "Record No. 13");
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_test(function test_processIncoming_failed_records() {
+ _("Ensure that failed records from _reconcile and applyIncomingBatch are refetched.");
+ Service.identity.username = "foo";
+
+ // Let's create three and a bit batches worth of server side records.
+ let collection = new ServerCollection();
+ const NUMBER_OF_RECORDS = MOBILE_BATCH_SIZE * 3 + 5;
+ for (let i = 0; i < NUMBER_OF_RECORDS; i++) {
+ let id = 'record-no-' + i;
+ let payload = encryptPayload({id: id, denomination: "Record No. " + id});
+ let wbo = new ServerWBO(id, payload);
+ wbo.modified = Date.now()/1000 + 60 * (i - MOBILE_BATCH_SIZE * 3);
+ collection.insertWBO(wbo);
+ }
+
+ // Engine that batches but likes to throw on a couple of records,
+ // two in each batch: the even ones fail in reconcile, the odd ones
+ // in applyIncoming.
+ const BOGUS_RECORDS = ["record-no-" + 42,
+ "record-no-" + 23,
+ "record-no-" + (42 + MOBILE_BATCH_SIZE),
+ "record-no-" + (23 + MOBILE_BATCH_SIZE),
+ "record-no-" + (42 + MOBILE_BATCH_SIZE * 2),
+ "record-no-" + (23 + MOBILE_BATCH_SIZE * 2),
+ "record-no-" + (2 + MOBILE_BATCH_SIZE * 3),
+ "record-no-" + (1 + MOBILE_BATCH_SIZE * 3)];
+ let engine = makeRotaryEngine();
+ engine.applyIncomingBatchSize = MOBILE_BATCH_SIZE;
+
+ engine.__reconcile = engine._reconcile;
+ engine._reconcile = function _reconcile(record) {
+ if (BOGUS_RECORDS.indexOf(record.id) % 2 == 0) {
+ throw "I don't like this record! Baaaaaah!";
+ }
+ return this.__reconcile.apply(this, arguments);
+ };
+ engine._store._applyIncoming = engine._store.applyIncoming;
+ engine._store.applyIncoming = function (record) {
+ if (BOGUS_RECORDS.indexOf(record.id) % 2 == 1) {
+ throw "I don't like this record! Baaaaaah!";
+ }
+ return this._applyIncoming.apply(this, arguments);
+ };
+
+ // Keep track of requests made of a collection.
+ let count = 0;
+ let uris = [];
+ function recording_handler(collection) {
+ let h = collection.handler();
+ return function(req, res) {
+ ++count;
+ uris.push(req.path + "?" + req.queryString);
+ return h(req, res);
+ };
+ }
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": recording_handler(collection)
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+
+ try {
+
+ // Confirm initial environment
+ do_check_eq(engine.lastSync, 0);
+ do_check_eq(engine.toFetch.length, 0);
+ do_check_eq(engine.previousFailed.length, 0);
+ do_check_empty(engine._store.items);
+
+ let observerSubject;
+ let observerData;
+ Svc.Obs.add("weave:engine:sync:applied", function onApplied(subject, data) {
+ Svc.Obs.remove("weave:engine:sync:applied", onApplied);
+ observerSubject = subject;
+ observerData = data;
+ });
+
+ engine._syncStartup();
+ engine._processIncoming();
+
+ // Ensure that all records but the bogus 4 have been applied.
+ do_check_attribute_count(engine._store.items,
+ NUMBER_OF_RECORDS - BOGUS_RECORDS.length);
+
+ // Ensure that the bogus records will be fetched again on the next sync.
+ do_check_eq(engine.previousFailed.length, BOGUS_RECORDS.length);
+ engine.previousFailed.sort();
+ BOGUS_RECORDS.sort();
+ for (let i = 0; i < engine.previousFailed.length; i++) {
+ do_check_eq(engine.previousFailed[i], BOGUS_RECORDS[i]);
+ }
+
+ // Ensure the observer was notified
+ do_check_eq(observerData, engine.name);
+ do_check_eq(observerSubject.failed, BOGUS_RECORDS.length);
+ do_check_eq(observerSubject.newFailed, BOGUS_RECORDS.length);
+
+ // Testing batching of failed item fetches.
+ // Try to sync again. Ensure that we split the request into chunks to avoid
+ // URI length limitations.
+ function batchDownload(batchSize) {
+ count = 0;
+ uris = [];
+ engine.guidFetchBatchSize = batchSize;
+ engine._processIncoming();
+ _("Tried again. Requests: " + count + "; URIs: " + JSON.stringify(uris));
+ return count;
+ }
+
+ // There are 8 bad records, so this needs 3 fetches.
+ _("Test batching with ID batch size 3, normal mobile batch size.");
+ do_check_eq(batchDownload(3), 3);
+
+ // Now see with a more realistic limit.
+ _("Test batching with sufficient ID batch size.");
+ do_check_eq(batchDownload(BOGUS_RECORDS.length), 1);
+
+ // If we're on mobile, that limit is used by default.
+ _("Test batching with tiny mobile batch size.");
+ Svc.Prefs.set("client.type", "mobile");
+ engine.mobileGUIDFetchBatchSize = 2;
+ do_check_eq(batchDownload(BOGUS_RECORDS.length), 4);
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_task(function *test_processIncoming_decrypt_failed() {
+ _("Ensure that records failing to decrypt are either replaced or refetched.");
+
+ Service.identity.username = "foo";
+
+ // Some good and some bogus records. One doesn't contain valid JSON,
+ // the other will throw during decrypt.
+ let collection = new ServerCollection();
+ collection._wbos.flying = new ServerWBO(
+ 'flying', encryptPayload({id: 'flying',
+ denomination: "LNER Class A3 4472"}));
+ collection._wbos.nojson = new ServerWBO("nojson", "This is invalid JSON");
+ collection._wbos.nojson2 = new ServerWBO("nojson2", "This is invalid JSON");
+ collection._wbos.scotsman = new ServerWBO(
+ 'scotsman', encryptPayload({id: 'scotsman',
+ denomination: "Flying Scotsman"}));
+ collection._wbos.nodecrypt = new ServerWBO("nodecrypt", "Decrypt this!");
+ collection._wbos.nodecrypt2 = new ServerWBO("nodecrypt2", "Decrypt this!");
+
+ // Patch the fake crypto service to throw on the record above.
+ Svc.Crypto._decrypt = Svc.Crypto.decrypt;
+ Svc.Crypto.decrypt = function (ciphertext) {
+ if (ciphertext == "Decrypt this!") {
+ throw "Derp! Cipher finalized failed. Im ur crypto destroyin ur recordz.";
+ }
+ return this._decrypt.apply(this, arguments);
+ };
+
+ // Some broken records also exist locally.
+ let engine = makeRotaryEngine();
+ engine.enabled = true;
+ engine._store.items = {nojson: "Valid JSON",
+ nodecrypt: "Valid ciphertext"};
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+ try {
+
+ // Confirm initial state
+ do_check_eq(engine.toFetch.length, 0);
+ do_check_eq(engine.previousFailed.length, 0);
+
+ let observerSubject;
+ let observerData;
+ Svc.Obs.add("weave:engine:sync:applied", function onApplied(subject, data) {
+ Svc.Obs.remove("weave:engine:sync:applied", onApplied);
+ observerSubject = subject;
+ observerData = data;
+ });
+
+ engine.lastSync = collection.wbo("nojson").modified - 1;
+ let ping = yield sync_engine_and_validate_telem(engine, true);
+ do_check_eq(ping.engines[0].incoming.applied, 2);
+ do_check_eq(ping.engines[0].incoming.failed, 4);
+ do_check_eq(ping.engines[0].incoming.newFailed, 4);
+
+ do_check_eq(engine.previousFailed.length, 4);
+ do_check_eq(engine.previousFailed[0], "nojson");
+ do_check_eq(engine.previousFailed[1], "nojson2");
+ do_check_eq(engine.previousFailed[2], "nodecrypt");
+ do_check_eq(engine.previousFailed[3], "nodecrypt2");
+
+ // Ensure the observer was notified
+ do_check_eq(observerData, engine.name);
+ do_check_eq(observerSubject.applied, 2);
+ do_check_eq(observerSubject.failed, 4);
+
+ } finally {
+ yield promiseClean(server);
+ }
+});
+
+
+add_test(function test_uploadOutgoing_toEmptyServer() {
+ _("SyncEngine._uploadOutgoing uploads new records to server");
+
+ Service.identity.username = "foo";
+ let collection = new ServerCollection();
+ collection._wbos.flying = new ServerWBO('flying');
+ collection._wbos.scotsman = new ServerWBO('scotsman');
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler(),
+ "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(),
+ "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+ generateNewKeys(Service.collectionKeys);
+
+ let engine = makeRotaryEngine();
+ engine.lastSync = 123; // needs to be non-zero so that tracker is queried
+ engine._store.items = {flying: "LNER Class A3 4472",
+ scotsman: "Flying Scotsman"};
+ // Mark one of these records as changed
+ engine._tracker.addChangedID('scotsman', 0);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+
+ try {
+
+ // Confirm initial environment
+ do_check_eq(engine.lastSyncLocal, 0);
+ do_check_eq(collection.payload("flying"), undefined);
+ do_check_eq(collection.payload("scotsman"), undefined);
+
+ engine._syncStartup();
+ engine._uploadOutgoing();
+
+ // Local timestamp has been set.
+ do_check_true(engine.lastSyncLocal > 0);
+
+ // Ensure the marked record ('scotsman') has been uploaded and is
+ // no longer marked.
+ do_check_eq(collection.payload("flying"), undefined);
+ do_check_true(!!collection.payload("scotsman"));
+ do_check_eq(JSON.parse(collection.wbo("scotsman").data.ciphertext).id,
+ "scotsman");
+ do_check_eq(engine._tracker.changedIDs["scotsman"], undefined);
+
+ // The 'flying' record wasn't marked so it wasn't uploaded
+ do_check_eq(collection.payload("flying"), undefined);
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_task(function *test_uploadOutgoing_failed() {
+ _("SyncEngine._uploadOutgoing doesn't clear the tracker of objects that failed to upload.");
+
+ Service.identity.username = "foo";
+ let collection = new ServerCollection();
+ // We only define the "flying" WBO on the server, not the "scotsman"
+ // and "peppercorn" ones.
+ collection._wbos.flying = new ServerWBO('flying');
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let engine = makeRotaryEngine();
+ engine.lastSync = 123; // needs to be non-zero so that tracker is queried
+ engine._store.items = {flying: "LNER Class A3 4472",
+ scotsman: "Flying Scotsman",
+ peppercorn: "Peppercorn Class"};
+ // Mark these records as changed
+ const FLYING_CHANGED = 12345;
+ const SCOTSMAN_CHANGED = 23456;
+ const PEPPERCORN_CHANGED = 34567;
+ engine._tracker.addChangedID('flying', FLYING_CHANGED);
+ engine._tracker.addChangedID('scotsman', SCOTSMAN_CHANGED);
+ engine._tracker.addChangedID('peppercorn', PEPPERCORN_CHANGED);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+
+ try {
+
+ // Confirm initial environment
+ do_check_eq(engine.lastSyncLocal, 0);
+ do_check_eq(collection.payload("flying"), undefined);
+ do_check_eq(engine._tracker.changedIDs['flying'], FLYING_CHANGED);
+ do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
+ do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
+
+ engine.enabled = true;
+ yield sync_engine_and_validate_telem(engine, true);
+
+ // Local timestamp has been set.
+ do_check_true(engine.lastSyncLocal > 0);
+
+ // Ensure the 'flying' record has been uploaded and is no longer marked.
+ do_check_true(!!collection.payload("flying"));
+ do_check_eq(engine._tracker.changedIDs['flying'], undefined);
+
+ // The 'scotsman' and 'peppercorn' records couldn't be uploaded so
+ // they weren't cleared from the tracker.
+ do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
+ do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
+
+ } finally {
+ yield promiseClean(server);
+ }
+});
+
+/* A couple of "functional" tests to ensure we split records into appropriate
+ POST requests. More comprehensive unit-tests for this "batching" are in
+ test_postqueue.js.
+*/
+add_test(function test_uploadOutgoing_MAX_UPLOAD_RECORDS() {
+ _("SyncEngine._uploadOutgoing uploads in batches of MAX_UPLOAD_RECORDS");
+
+ Service.identity.username = "foo";
+ let collection = new ServerCollection();
+
+ // Let's count how many times the client posts to the server
+ var noOfUploads = 0;
+ collection.post = (function(orig) {
+ return function(data, request) {
+ // This test doesn't arrange for batch semantics - so we expect the
+ // first request to come in with batch=true and the others to have no
+ // batch related headers at all (as the first response did not provide
+ // a batch ID)
+ if (noOfUploads == 0) {
+ do_check_eq(request.queryString, "batch=true");
+ } else {
+ do_check_eq(request.queryString, "");
+ }
+ noOfUploads++;
+ return orig.call(this, data, request);
+ };
+ }(collection.post));
+
+ // Create a bunch of records (and server side handlers)
+ let engine = makeRotaryEngine();
+ for (var i = 0; i < 234; i++) {
+ let id = 'record-no-' + i;
+ engine._store.items[id] = "Record No. " + i;
+ engine._tracker.addChangedID(id, 0);
+ collection.insert(id);
+ }
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ try {
+
+ // Confirm initial environment.
+ do_check_eq(noOfUploads, 0);
+
+ engine._syncStartup();
+ engine._uploadOutgoing();
+
+ // Ensure all records have been uploaded.
+ for (i = 0; i < 234; i++) {
+ do_check_true(!!collection.payload('record-no-' + i));
+ }
+
+ // Ensure that the uploads were performed in batches of MAX_UPLOAD_RECORDS.
+ do_check_eq(noOfUploads, Math.ceil(234/MAX_UPLOAD_RECORDS));
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+add_test(function test_uploadOutgoing_largeRecords() {
+ _("SyncEngine._uploadOutgoing throws on records larger than MAX_UPLOAD_BYTES");
+
+ Service.identity.username = "foo";
+ let collection = new ServerCollection();
+
+ let engine = makeRotaryEngine();
+ engine.allowSkippedRecord = false;
+ engine._store.items["large-item"] = "Y".repeat(MAX_UPLOAD_BYTES*2);
+ engine._tracker.addChangedID("large-item", 0);
+ collection.insert("large-item");
+
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ try {
+ engine._syncStartup();
+ let error = null;
+ try {
+ engine._uploadOutgoing();
+ } catch (e) {
+ error = e;
+ }
+ ok(!!error);
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_test(function test_syncFinish_noDelete() {
+ _("SyncEngine._syncFinish resets tracker's score");
+
+ let server = httpd_setup({});
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+ let engine = makeRotaryEngine();
+ engine._delete = {}; // Nothing to delete
+ engine._tracker.score = 100;
+
+ // _syncFinish() will reset the engine's score.
+ engine._syncFinish();
+ do_check_eq(engine.score, 0);
+ server.stop(run_next_test);
+});
+
+
+add_test(function test_syncFinish_deleteByIds() {
+ _("SyncEngine._syncFinish deletes server records slated for deletion (list of record IDs).");
+
+ Service.identity.username = "foo";
+ let collection = new ServerCollection();
+ collection._wbos.flying = new ServerWBO(
+ 'flying', encryptPayload({id: 'flying',
+ denomination: "LNER Class A3 4472"}));
+ collection._wbos.scotsman = new ServerWBO(
+ 'scotsman', encryptPayload({id: 'scotsman',
+ denomination: "Flying Scotsman"}));
+ collection._wbos.rekolok = new ServerWBO(
+ 'rekolok', encryptPayload({id: 'rekolok',
+ denomination: "Rekonstruktionslokomotive"}));
+
+ let server = httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let engine = makeRotaryEngine();
+ try {
+ engine._delete = {ids: ['flying', 'rekolok']};
+ engine._syncFinish();
+
+ // The 'flying' and 'rekolok' records were deleted while the
+ // 'scotsman' one wasn't.
+ do_check_eq(collection.payload("flying"), undefined);
+ do_check_true(!!collection.payload("scotsman"));
+ do_check_eq(collection.payload("rekolok"), undefined);
+
+ // The deletion todo list has been reset.
+ do_check_eq(engine._delete.ids, undefined);
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_test(function test_syncFinish_deleteLotsInBatches() {
+ _("SyncEngine._syncFinish deletes server records in batches of 100 (list of record IDs).");
+
+ Service.identity.username = "foo";
+ let collection = new ServerCollection();
+
+ // Let's count how many times the client does a DELETE request to the server
+ var noOfUploads = 0;
+ collection.delete = (function(orig) {
+ return function() {
+ noOfUploads++;
+ return orig.apply(this, arguments);
+ };
+ }(collection.delete));
+
+ // Create a bunch of records on the server
+ let now = Date.now();
+ for (var i = 0; i < 234; i++) {
+ let id = 'record-no-' + i;
+ let payload = encryptPayload({id: id, denomination: "Record No. " + i});
+ let wbo = new ServerWBO(id, payload);
+ wbo.modified = now / 1000 - 60 * (i + 110);
+ collection.insertWBO(wbo);
+ }
+
+ let server = httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let engine = makeRotaryEngine();
+ try {
+
+ // Confirm initial environment
+ do_check_eq(noOfUploads, 0);
+
+ // Declare what we want to have deleted: all records no. 100 and
+ // up and all records that are less than 200 mins old (which are
+ // records 0 thru 90).
+ engine._delete = {ids: [],
+ newer: now / 1000 - 60 * 200.5};
+ for (i = 100; i < 234; i++) {
+ engine._delete.ids.push('record-no-' + i);
+ }
+
+ engine._syncFinish();
+
+ // Ensure that the appropriate server data has been wiped while
+ // preserving records 90 thru 200.
+ for (i = 0; i < 234; i++) {
+ let id = 'record-no-' + i;
+ if (i <= 90 || i >= 100) {
+ do_check_eq(collection.payload(id), undefined);
+ } else {
+ do_check_true(!!collection.payload(id));
+ }
+ }
+
+ // The deletion was done in batches
+ do_check_eq(noOfUploads, 2 + 1);
+
+ // The deletion todo list has been reset.
+ do_check_eq(engine._delete.ids, undefined);
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+
+add_task(function *test_sync_partialUpload() {
+ _("SyncEngine.sync() keeps changedIDs that couldn't be uploaded.");
+
+ Service.identity.username = "foo";
+
+ let collection = new ServerCollection();
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+ let syncTesting = new SyncTestingInfrastructure(server);
+ generateNewKeys(Service.collectionKeys);
+
+ let engine = makeRotaryEngine();
+ engine.lastSync = 123; // needs to be non-zero so that tracker is queried
+ engine.lastSyncLocal = 456;
+
+ // Let the third upload fail completely
+ var noOfUploads = 0;
+ collection.post = (function(orig) {
+ return function() {
+ if (noOfUploads == 2)
+ throw "FAIL!";
+ noOfUploads++;
+ return orig.apply(this, arguments);
+ };
+ }(collection.post));
+
+ // Create a bunch of records (and server side handlers)
+ for (let i = 0; i < 234; i++) {
+ let id = 'record-no-' + i;
+ engine._store.items[id] = "Record No. " + i;
+ engine._tracker.addChangedID(id, i);
+ // Let two items in the first upload batch fail.
+ if ((i != 23) && (i != 42)) {
+ collection.insert(id);
+ }
+ }
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+
+ try {
+
+ engine.enabled = true;
+ let error;
+ try {
+ yield sync_engine_and_validate_telem(engine, true);
+ } catch (ex) {
+ error = ex;
+ }
+
+ ok(!!error);
+
+ // The timestamp has been updated.
+ do_check_true(engine.lastSyncLocal > 456);
+
+ for (let i = 0; i < 234; i++) {
+ let id = 'record-no-' + i;
+ // Ensure failed records are back in the tracker:
+ // * records no. 23 and 42 were rejected by the server,
+ // * records no. 200 and higher couldn't be uploaded because we failed
+ // hard on the 3rd upload.
+ if ((i == 23) || (i == 42) || (i >= 200))
+ do_check_eq(engine._tracker.changedIDs[id], i);
+ else
+ do_check_false(id in engine._tracker.changedIDs);
+ }
+
+ } finally {
+ yield promiseClean(server);
+ }
+});
+
+add_test(function test_canDecrypt_noCryptoKeys() {
+ _("SyncEngine.canDecrypt returns false if the engine fails to decrypt items on the server, e.g. due to a missing crypto key collection.");
+ Service.identity.username = "foo";
+
+ // Wipe collection keys so we can test the desired scenario.
+ Service.collectionKeys.clear();
+
+ let collection = new ServerCollection();
+ collection._wbos.flying = new ServerWBO(
+ 'flying', encryptPayload({id: 'flying',
+ denomination: "LNER Class A3 4472"}));
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+ let engine = makeRotaryEngine();
+ try {
+
+ do_check_false(engine.canDecrypt());
+
+ } finally {
+ cleanAndGo(server);
+ }
+});
+
+add_test(function test_canDecrypt_true() {
+ _("SyncEngine.canDecrypt returns true if the engine can decrypt the items on the server.");
+ Service.identity.username = "foo";
+
+ generateNewKeys(Service.collectionKeys);
+
+ let collection = new ServerCollection();
+ collection._wbos.flying = new ServerWBO(
+ 'flying', encryptPayload({id: 'flying',
+ denomination: "LNER Class A3 4472"}));
+
+ let server = sync_httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+ let engine = makeRotaryEngine();
+ try {
+
+ do_check_true(engine.canDecrypt());
+
+ } finally {
+ cleanAndGo(server);
+ }
+
+});
+
+add_test(function test_syncapplied_observer() {
+ Service.identity.username = "foo";
+
+ const NUMBER_OF_RECORDS = 10;
+
+ let engine = makeRotaryEngine();
+
+ // Create a batch of server side records.
+ let collection = new ServerCollection();
+ for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
+ let id = 'record-no-' + i;
+ let payload = encryptPayload({id: id, denomination: "Record No. " + id});
+ collection.insert(id, payload);
+ }
+
+ let server = httpd_setup({
+ "/1.1/foo/storage/rotary": collection.handler()
+ });
+
+ let syncTesting = new SyncTestingInfrastructure(server);
+
+ let meta_global = Service.recordManager.set(engine.metaURL,
+ new WBORecord(engine.metaURL));
+ meta_global.payload.engines = {rotary: {version: engine.version,
+ syncID: engine.syncID}};
+
+ let numApplyCalls = 0;
+ let engine_name;
+ let count;
+ function onApplied(subject, data) {
+ numApplyCalls++;
+ engine_name = data;
+ count = subject;
+ }
+
+ Svc.Obs.add("weave:engine:sync:applied", onApplied);
+
+ try {
+ Service.scheduler.hasIncomingItems = false;
+
+ // Do sync.
+ engine._syncStartup();
+ engine._processIncoming();
+
+ do_check_attribute_count(engine._store.items, 10);
+
+ do_check_eq(numApplyCalls, 1);
+ do_check_eq(engine_name, "rotary");
+ do_check_eq(count.applied, 10);
+
+ do_check_true(Service.scheduler.hasIncomingItems);
+ } finally {
+ cleanAndGo(server);
+ Service.scheduler.hasIncomingItems = false;
+ Svc.Obs.remove("weave:engine:sync:applied", onApplied);
+ }
+});