summaryrefslogtreecommitdiffstats
path: root/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncStorageCollectionRequest.java
blob: 3ae672f21e7cda4c8c601acc2921da63ed138495 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

package org.mozilla.gecko.sync.net;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;

import org.mozilla.gecko.background.common.log.Logger;

import ch.boye.httpclientandroidlib.Header;
import ch.boye.httpclientandroidlib.HttpEntity;
import ch.boye.httpclientandroidlib.HttpResponse;
import ch.boye.httpclientandroidlib.client.methods.HttpRequestBase;
import ch.boye.httpclientandroidlib.impl.client.DefaultHttpClient;

/**
 * A request class that handles line-by-line responses. Eventually this will
 * handle real stream processing; for now, just parse the returned body
 * line-by-line.
 *
 * @author rnewman
 *
 */
public class SyncStorageCollectionRequest extends SyncStorageRequest {
  private static final String LOG_TAG = "CollectionRequest";

  public SyncStorageCollectionRequest(URI uri) {
    super(uri);
  }

  protected volatile boolean aborting = false;

  /**
   * Instruct the request that it should process no more records,
   * and decline to notify any more delegate callbacks.
   */
  public void abort() {
    aborting = true;
    try {
      this.resource.request.abort();
    } catch (Exception e) {
      // Just in case.
      Logger.warn(LOG_TAG, "Got exception in abort: " + e);
    }
  }

  @Override
  protected BaseResourceDelegate makeResourceDelegate(SyncStorageRequest request) {
    return new SyncCollectionResourceDelegate((SyncStorageCollectionRequest) request);
  }

  // TODO: this is awful.
  public class SyncCollectionResourceDelegate extends
      SyncStorageResourceDelegate {

    private static final String CONTENT_TYPE_INCREMENTAL = "application/newlines";
    private static final int FETCH_BUFFER_SIZE = 16 * 1024;   // 16K chars.

    SyncCollectionResourceDelegate(SyncStorageCollectionRequest request) {
      super(request);
    }

    @Override
    public void addHeaders(HttpRequestBase request, DefaultHttpClient client) {
      super.addHeaders(request, client);
      request.setHeader("Accept", CONTENT_TYPE_INCREMENTAL);
      // Caller is responsible for setting full=1.
    }

    @Override
    public void handleHttpResponse(HttpResponse response) {
      if (aborting) {
        return;
      }

      if (response.getStatusLine().getStatusCode() != 200) {
        super.handleHttpResponse(response);
        return;
      }

      HttpEntity entity = response.getEntity();
      Header contentType = entity.getContentType();
      if (!contentType.getValue().startsWith(CONTENT_TYPE_INCREMENTAL)) {
        // Not incremental!
        super.handleHttpResponse(response);
        return;
      }

      // TODO: at this point we can access X-Weave-Timestamp, compare
      // that to our local timestamp, and compute an estimate of clock
      // skew. We can provide this to the incremental delegate, which
      // will allow it to seamlessly correct timestamps on the records
      // it processes. Bug 721887.

      // Line-by-line processing, then invoke success.
      SyncStorageCollectionRequestDelegate delegate = (SyncStorageCollectionRequestDelegate) this.request.delegate;
      InputStream content = null;
      BufferedReader br = null;
      try {
        content = entity.getContent();
        br = new BufferedReader(new InputStreamReader(content), FETCH_BUFFER_SIZE);
        String line;

        // This relies on connection timeouts at the HTTP layer.
        while (!aborting &&
               null != (line = br.readLine())) {
          try {
            delegate.handleRequestProgress(line);
          } catch (Exception ex) {
            delegate.handleRequestError(new HandleProgressException(ex));
            BaseResource.consumeEntity(entity);
            return;
          }
        }
        if (aborting) {
          // So we don't hit the success case below.
          return;
        }
      } catch (IOException ex) {
        if (!aborting) {
          delegate.handleRequestError(ex);
        }
        BaseResource.consumeEntity(entity);
        return;
      } finally {
        // Attempt to close the stream and reader.
        if (br != null) {
          try {
            br.close();
          } catch (IOException e) {
            // We don't care if this fails.
          }
        }
      }
      // We're done processing the entity. Don't let fetching the body succeed!
      BaseResource.consumeEntity(entity);
      delegate.handleRequestSuccess(new SyncStorageResponse(response));
    }
  }
}