/* -*- indent-tabs-mode: nil; js-indent-level: 2 -*- */
/* vim: set ft= javascript ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

"use strict";

const {CC, Ci, Cu, Cc} = require("chrome");

const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1",
                                  "nsIArrayBufferInputStream");
const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
                             "nsIBinaryInputStream", "setInputStream");

loader.lazyServiceGetter(this, "gActivityDistributor",
                         "@mozilla.org/network/http-activity-distributor;1",
                         "nsIHttpActivityDistributor");

const {XPCOMUtils} = require("resource://gre/modules/XPCOMUtils.jsm");
const {setTimeout} = Cu.import("resource://gre/modules/Timer.jsm", {});

/**
 * Construct a new nsIStreamListener that buffers data and provides a
 * method to notify another listener when data is available.  This is
 * used to throttle network data on a per-channel basis.
 *
 * After construction, @see setOriginalListener must be called on the
 * new object.
 *
 * @param {NetworkThrottleQueue} queue the NetworkThrottleQueue to
 *        which status changes should be reported
 */
function NetworkThrottleListener(queue) {
  this.queue = queue;
  this.pendingData = [];
  this.pendingException = null;
  this.offset = 0;
  this.responseStarted = false;
  this.activities = {};
}

NetworkThrottleListener.prototype = {
  QueryInterface:
    XPCOMUtils.generateQI([Ci.nsIStreamListener, Ci.nsIInterfaceRequestor,
                           Ci.nsISupports]),

  /**
   * Set the original listener for this object.  The original listener
   * will receive requests from this object when the queue allows data
   * through.
   *
   * @param {nsIStreamListener} originalListener the original listener
   *        for the channel, to which all requests will be sent
   */
  setOriginalListener: function (originalListener) {
    this.originalListener = originalListener;
  },

  /**
   * @see nsIStreamListener.onStartRequest.
   */
  onStartRequest: function (request, context) {
    this.originalListener.onStartRequest(request, context);
    this.queue.start(this);
  },

  /**
   * @see nsIStreamListener.onStopRequest.
   */
  onStopRequest: function (request, context, statusCode) {
    this.pendingData.push({request, context, statusCode});
    this.queue.dataAvailable(this);
  },

  /**
   * @see nsIStreamListener.onDataAvailable.
   */
  onDataAvailable: function (request, context, inputStream, offset, count) {
    if (this.pendingException) {
      throw this.pendingException;
    }

    const bin = new BinaryInputStream(inputStream);
    const bytes = new ArrayBuffer(count);
    bin.readArrayBuffer(count, bytes);

    const stream = new ArrayBufferInputStream();
    stream.setData(bytes, 0, count);

    this.pendingData.push({request, context, stream, count});
    this.queue.dataAvailable(this);
  },

  /**
   * Allow some buffered data from this object to be forwarded to this
   * object's originalListener.
   *
   * @param {Number} bytesPermitted The maximum number of bytes
   *        permitted to be sent.
   * @return {Object} an object of the form {length, done}, where
   *         |length| is the number of bytes actually forwarded, and
   *         |done| is a boolean indicating whether this particular
   *         request has been completed.  (A NetworkThrottleListener
   *         may be queued multiple times, so this does not mean that
   *         all available data has been sent.)
   */
  sendSomeData: function (bytesPermitted) {
    if (this.pendingData.length === 0) {
      // Shouldn't happen.
      return {length: 0, done: true};
    }

    const {request, context, stream, count, statusCode} = this.pendingData[0];

    if (statusCode !== undefined) {
      this.pendingData.shift();
      this.originalListener.onStopRequest(request, context, statusCode);
      return {length: 0, done: true};
    }

    if (bytesPermitted > count) {
      bytesPermitted = count;
    }

    try {
      this.originalListener.onDataAvailable(request, context, stream,
                                            this.offset, bytesPermitted);
    } catch (e) {
      this.pendingException = e;
    }

    let done = false;
    if (bytesPermitted === count) {
      this.pendingData.shift();
      done = true;
    } else {
      this.pendingData[0].count -= bytesPermitted;
    }

    this.offset += bytesPermitted;
    // Maybe our state has changed enough to emit an event.
    this.maybeEmitEvents();

    return {length: bytesPermitted, done};
  },

  /**
   * Return the number of pending data requests available for this
   * listener.
   */
  pendingCount: function () {
    return this.pendingData.length;
  },

  /**
   * This is called when an http activity event is delivered.  This
   * object delays the event until the appropriate moment.
   */
  addActivityCallback: function (callback, httpActivity, channel, activityType,
                                 activitySubtype, timestamp, extraSizeData,
                                 extraStringData) {
    let datum = {callback, httpActivity, channel, activityType,
                 activitySubtype, extraSizeData,
                 extraStringData};
    this.activities[activitySubtype] = datum;

    if (activitySubtype ===
        gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE) {
      this.totalSize = extraSizeData;
    }

    this.maybeEmitEvents();
  },

  /**
   * This is called for a download throttler when the latency timeout
   * has ended.
   */
  responseStart: function () {
    this.responseStarted = true;
    this.maybeEmitEvents();
  },

  /**
   * Check our internal state and emit any http activity events as
   * needed.  Note that we wait until both our internal state has
   * changed and we've received the real http activity event from
   * platform.  This approach ensures we can both pass on the correct
   * data from the original event, and update the reported time to be
   * consistent with the delay we're introducing.
   */
  maybeEmitEvents: function () {
    if (this.responseStarted) {
      this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_START);
      this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_HEADER);
    }

    if (this.totalSize !== undefined && this.offset >= this.totalSize) {
      this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE);
      this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_TRANSACTION_CLOSE);
    }
  },

  /**
   * Emit an event for |code|, if the appropriate entry in
   * |activities| is defined.
   */
  maybeEmit: function (code) {
    if (this.activities[code] !== undefined) {
      let {callback, httpActivity, channel, activityType,
           activitySubtype, extraSizeData,
           extraStringData} = this.activities[code];
      let now = Date.now() * 1000;
      callback(httpActivity, channel, activityType, activitySubtype,
               now, extraSizeData, extraStringData);
      this.activities[code] = undefined;
    }
  },
};

/**
 * Construct a new queue that can be used to throttle the network for
 * a group of related network requests.
 *
 * meanBPS {Number} Mean bytes per second.
 * maxBPS {Number} Maximum bytes per second.
 * roundTripTimeMean {Number} Mean round trip time in milliseconds.
 * roundTripTimeMax {Number} Maximum round trip time in milliseconds.
 */
function NetworkThrottleQueue(meanBPS, maxBPS,
                              roundTripTimeMean, roundTripTimeMax) {
  this.meanBPS = meanBPS;
  this.maxBPS = maxBPS;
  this.roundTripTimeMean = roundTripTimeMean;
  this.roundTripTimeMax = roundTripTimeMax;

  this.pendingRequests = new Set();
  this.downloadQueue = [];
  this.previousReads = [];

  this.pumping = false;
}

NetworkThrottleQueue.prototype = {
  /**
   * A helper function that, given a mean and a maximum, returns a
   * random integer between (mean - (max - mean)) and max.
   */
  random: function (mean, max) {
    return mean - (max - mean) + Math.floor(2 * (max - mean) * Math.random());
  },

  /**
   * A helper function that lets the indicating listener start sending
   * data.  This is called after the initial round trip time for the
   * listener has elapsed.
   */
  allowDataFrom: function (throttleListener) {
    throttleListener.responseStart();
    this.pendingRequests.delete(throttleListener);
    const count = throttleListener.pendingCount();
    for (let i = 0; i < count; ++i) {
      this.downloadQueue.push(throttleListener);
    }
    this.pump();
  },

  /**
   * Notice a new listener object.  This is called by the
   * NetworkThrottleListener when the request has started.  Initially
   * a new listener object is put into a "pending" state, until the
   * round-trip time has elapsed.  This is used to simulate latency.
   *
   * @param {NetworkThrottleListener} throttleListener the new listener
   */
  start: function (throttleListener) {
    this.pendingRequests.add(throttleListener);
    let delay = this.random(this.roundTripTimeMean, this.roundTripTimeMax);
    if (delay > 0) {
      setTimeout(() => this.allowDataFrom(throttleListener), delay);
    } else {
      this.allowDataFrom(throttleListener);
    }
  },

  /**
   * Note that new data is available for a given listener.  Each time
   * data is available, the listener will be re-queued.
   *
   * @param {NetworkThrottleListener} throttleListener the listener
   *        which has data available.
   */
  dataAvailable: function (throttleListener) {
    if (!this.pendingRequests.has(throttleListener)) {
      this.downloadQueue.push(throttleListener);
      this.pump();
    }
  },

  /**
   * An internal function that permits individual listeners to send
   * data.
   */
  pump: function () {
    // A redirect will cause two NetworkThrottleListeners to be on a
    // listener chain.  In this case, we might recursively call into
    // this method.  Avoid infinite recursion here.
    if (this.pumping) {
      return;
    }
    this.pumping = true;

    const now = Date.now();
    const oneSecondAgo = now - 1000;

    while (this.previousReads.length &&
           this.previousReads[0].when < oneSecondAgo) {
      this.previousReads.shift();
    }

    const totalBytes = this.previousReads.reduce((sum, elt) => {
      return sum + elt.numBytes;
    }, 0);

    let thisSliceBytes = this.random(this.meanBPS, this.maxBPS);
    if (totalBytes < thisSliceBytes) {
      thisSliceBytes -= totalBytes;
      let readThisTime = 0;
      while (thisSliceBytes > 0 && this.downloadQueue.length) {
        let {length, done} = this.downloadQueue[0].sendSomeData(thisSliceBytes);
        thisSliceBytes -= length;
        readThisTime += length;
        if (done) {
          this.downloadQueue.shift();
        }
      }
      this.previousReads.push({when: now, numBytes: readThisTime});
    }

    // If there is more data to download, then schedule ourselves for
    // one second after the oldest previous read.
    if (this.downloadQueue.length) {
      const when = this.previousReads[0].when + 1000;
      setTimeout(this.pump.bind(this), when - now);
    }

    this.pumping = false;
  },
};

/**
 * Construct a new object that can be used to throttle the network for
 * a group of related network requests.
 *
 * @param {Object} An object with the following attributes:
 * roundTripTimeMean {Number} Mean round trip time in milliseconds.
 * roundTripTimeMax {Number} Maximum round trip time in milliseconds.
 * downloadBPSMean {Number} Mean bytes per second for downloads.
 * downloadBPSMax {Number} Maximum bytes per second for downloads.
 * uploadBPSMean {Number} Mean bytes per second for uploads.
 * uploadBPSMax {Number} Maximum bytes per second for uploads.
 *
 * Download throttling will not be done if downloadBPSMean and
 * downloadBPSMax are <= 0.  Upload throttling will not be done if
 * uploadBPSMean and uploadBPSMax are <= 0.
 */
function NetworkThrottleManager({roundTripTimeMean, roundTripTimeMax,
                                 downloadBPSMean, downloadBPSMax,
                                 uploadBPSMean, uploadBPSMax}) {
  if (downloadBPSMax <= 0 && downloadBPSMean <= 0) {
    this.downloadQueue = null;
  } else {
    this.downloadQueue =
      new NetworkThrottleQueue(downloadBPSMean, downloadBPSMax,
                               roundTripTimeMean, roundTripTimeMax);
  }
  if (uploadBPSMax <= 0 && uploadBPSMean <= 0) {
    this.uploadQueue = null;
  } else {
    this.uploadQueue = Cc["@mozilla.org/network/throttlequeue;1"]
      .createInstance(Ci.nsIInputChannelThrottleQueue);
    this.uploadQueue.init(uploadBPSMean, uploadBPSMax);
  }
}
exports.NetworkThrottleManager = NetworkThrottleManager;

NetworkThrottleManager.prototype = {
  /**
   * Create a new NetworkThrottleListener for a given channel and
   * install it using |setNewListener|.
   *
   * @param {nsITraceableChannel} channel the channel to manage
   * @return {NetworkThrottleListener} the new listener, or null if
   *         download throttling is not being done.
   */
  manage: function (channel) {
    if (this.downloadQueue) {
      let listener = new NetworkThrottleListener(this.downloadQueue);
      let originalListener = channel.setNewListener(listener);
      listener.setOriginalListener(originalListener);
      return listener;
    }
    return null;
  },

  /**
   * Throttle uploads taking place on the given channel.
   *
   * @param {nsITraceableChannel} channel the channel to manage
   */
  manageUpload: function (channel) {
    if (this.uploadQueue) {
      channel = channel.QueryInterface(Ci.nsIThrottledInputChannel);
      channel.throttleQueue = this.uploadQueue;
    }
  },
};