summaryrefslogtreecommitdiffstats
path: root/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SerialRecordConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SerialRecordConsumer.java')
-rw-r--r--mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SerialRecordConsumer.java131
1 files changed, 0 insertions, 131 deletions
diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SerialRecordConsumer.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SerialRecordConsumer.java
deleted file mode 100644
index 6ee44ea2b..000000000
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/SerialRecordConsumer.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync.synchronizer;
-
-import org.mozilla.gecko.background.common.log.Logger;
-import org.mozilla.gecko.sync.repositories.domain.Record;
-
-/**
- * Consume records from a queue inside a RecordsChannel, storing them serially.
- * @author rnewman
- *
- */
-class SerialRecordConsumer extends RecordConsumer {
- private static final String LOG_TAG = "SerialRecordConsumer";
- protected boolean stopEventually = false;
- private volatile long counter = 0;
-
- public SerialRecordConsumer(RecordsConsumerDelegate delegate) {
- this.delegate = delegate;
- }
-
- private final Object monitor = new Object();
- @Override
- public void doNotify() {
- synchronized (monitor) {
- monitor.notify();
- }
- }
-
- @Override
- public void queueFilled() {
- Logger.debug(LOG_TAG, "Queue filled.");
- synchronized (monitor) {
- this.stopEventually = true;
- monitor.notify();
- }
- }
-
- @Override
- public void halt() {
- Logger.debug(LOG_TAG, "Halting.");
- synchronized (monitor) {
- this.stopEventually = true;
- this.stopImmediately = true;
- monitor.notify();
- }
- }
-
- private final Object storeSerializer = new Object();
- @Override
- public void stored() {
- Logger.debug(LOG_TAG, "Record stored. Notifying.");
- synchronized (storeSerializer) {
- Logger.debug(LOG_TAG, "stored() took storeSerializer.");
- counter++;
- storeSerializer.notify();
- Logger.debug(LOG_TAG, "stored() dropped storeSerializer.");
- }
- }
- private void storeSerially(Record record) {
- Logger.debug(LOG_TAG, "New record to store.");
- synchronized (storeSerializer) {
- Logger.debug(LOG_TAG, "storeSerially() took storeSerializer.");
- Logger.debug(LOG_TAG, "Storing...");
- try {
- this.delegate.store(record);
- } catch (Exception e) {
- Logger.warn(LOG_TAG, "Got exception in store. Not waiting.", e);
- return; // So we don't block for a stored() that never comes.
- }
- try {
- Logger.debug(LOG_TAG, "Waiting...");
- storeSerializer.wait();
- } catch (InterruptedException e) {
- // TODO
- }
- Logger.debug(LOG_TAG, "storeSerially() dropped storeSerializer.");
- }
- }
-
- private void consumerIsDone() {
- long counterNow = this.counter;
- Logger.info(LOG_TAG, "Consumer is done. Processed " + counterNow + ((counterNow == 1) ? " record." : " records."));
- delegate.consumerIsDone(stopImmediately);
- }
-
- @Override
- public void run() {
- while (true) {
- synchronized (monitor) {
- Logger.debug(LOG_TAG, "run() took monitor.");
- if (stopImmediately) {
- Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue.");
- delegate.getQueue().clear();
- Logger.debug(LOG_TAG, "Notifying consumer.");
- consumerIsDone();
- return;
- }
- Logger.debug(LOG_TAG, "run() dropped monitor.");
- }
- // The queue is concurrent-safe.
- while (!delegate.getQueue().isEmpty()) {
- Logger.debug(LOG_TAG, "Grabbing record...");
- Record record = delegate.getQueue().remove();
- // Block here, allowing us to process records
- // serially.
- Logger.debug(LOG_TAG, "Invoking storeSerially...");
- this.storeSerially(record);
- Logger.debug(LOG_TAG, "Done with record.");
- }
- synchronized (monitor) {
- Logger.debug(LOG_TAG, "run() took monitor.");
-
- if (stopEventually) {
- Logger.debug(LOG_TAG, "Done with records and told to stop. Notifying consumer.");
- consumerIsDone();
- return;
- }
- try {
- Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting.");
- monitor.wait(10000);
- } catch (InterruptedException e) {
- // TODO
- }
- Logger.debug(LOG_TAG, "run() dropped monitor.");
- }
- }
- }
-}