summaryrefslogtreecommitdiffstats
path: root/testing/marionette/harness/marionette_harness/runner/mixins/browsermob-proxy-py/browsermobproxy/client.py
blob: 00b98ca5ee3f781fc8be8d3c8cc9425214d8e3a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
import requests

try:
  from urllib.parse import urlencode, unquote
except ImportError:
  from urllib import urlencode, unquote
import json


class Client(object):
    def __init__(self, url, params={}):
        """
        Initialises a new Client object


        :param url: This is where the BrowserMob Proxy lives
        :param params: URL query (for example httpProxy and httpsProxy vars)
        """
        self.host = "http://" + url
        if params:
            urlparams = "?" + unquote(urlencode(params))
        else:
            urlparams = ""
        resp = requests.post('{}/proxy'.format(self.host + urlparams))
        jcontent = json.loads(resp.content.decode('utf-8'))
        self.port = jcontent['port']
        url_parts = self.host.split(":")
        self.proxy = url_parts[1][2:] + ":" + str(self.port)

    def close(self):
        """
        shuts down the proxy and closes the port
        """
        r = requests.delete('{0}/proxy/{1}'.format(self.host, self.port))
        return r.status_code

    # webdriver integration
    # ...as a proxy object
    def selenium_proxy(self):
        """
        Returns a Selenium WebDriver Proxy class with details of the HTTP Proxy
        """
        from selenium import webdriver
        return webdriver.Proxy({
            "httpProxy": self.proxy,
            "sslProxy": self.proxy,
        })

    def webdriver_proxy(self):
        """
        Returns a Selenium WebDriver Proxy class with details of the HTTP Proxy
        """
        return self.selenium_proxy()

    # ...as a capability
    def add_to_capabilities(self, capabilities):
        """
        Adds an 'proxy' entry to a desired capabilities dictionary with the
        BrowserMob proxy information


        :param capabilities: The Desired capabilities object from Selenium WebDriver
        """
        capabilities['proxy'] = {
            'proxyType': "MANUAL",
            'httpProxy': self.proxy,
            'sslProxy': self.proxy
        }

    def add_to_webdriver_capabilities(self, capabilities):
        self.add_to_capabilities(capabilities)

    # browsermob proxy api
    @property
    def har(self):
        """
        Gets the HAR that has been recorded
        """
        r = requests.get('{0}/proxy/{1}/har'.format(self.host, self.port))

        return r.json()

    def new_har(self, ref=None, options={}):
        """
        This sets a new HAR to be recorded


        :param ref: A reference for the HAR. Defaults to None
        :param options: A dictionary that will be passed to BrowserMob Proxy \
                   with specific keywords. Keywords are: \
                   captureHeaders - Boolean, capture headers \
                   captureContent - Boolean, capture content bodies \
                   captureBinaryContent - Boolean, capture binary content
        """
        if ref:
            payload = {"initialPageRef": ref}
        else:
            payload = {}
        if options:
            payload.update(options)

        r = requests.put('{0}/proxy/{1}/har'.format(self.host, self.port), payload)
        if r.status_code == 200:
            return (r.status_code, r.json())
        else:
            return (r.status_code, None)

    def new_page(self, ref=None):
        """
        This sets a new page to be recorded


        :param ref: A reference for the new page. Defaults to None
        """
        if ref:
            payload = {"pageRef": ref}
        else:
            payload = {}
        r = requests.put('{0}/proxy/{1}/har/pageRef'.format(self.host, self.port),
                         payload)
        return r.status_code

    def blacklist(self, regexp, status_code):
        """
        Sets a list of URL patterns to blacklist


        :param regex: a comma separated list of regular expressions
        :param status_code: the HTTP status code to return for URLs that do not \
                       match the blacklist

        """
        r = requests.put('{0}/proxy/{1}/blacklist'.format(self.host, self.port),
                         {'regex': regexp, 'status': status_code})
        return r.status_code

    def whitelist(self, regexp, status_code):
        """
        Sets a list of URL patterns to whitelist


        :param regex: a comma separated list of regular expressions
        :param status_code: the HTTP status code to return for URLs that do not \
                       match the whitelist
        """
        r = requests.put('{0}/proxy/{1}/whitelist'.format(self.host, self.port),
                         {'regex': regexp, 'status': status_code})
        return r.status_code

    def basic_authentication(self, domain, username, password):
        """
        This add automatic basic authentication


        :param domain: domain to set authentication credentials for
        :param username: valid username to use when authenticating
        :param  password: valid password to use when authenticating
        """
        r = requests.post(url='{0}/proxy/{1}/auth/basic/{2}'.format(self.host, self.port, domain),
                          data=json.dumps({'username': username, 'password': password}),
                          headers={'content-type': 'application/json'})
        return r.status_code

    def headers(self, headers):
        """
        This sets the headers that will set by the proxy on all requests


        :param headers: this is a dictionary of the headers to be set
        """
        if not isinstance(headers, dict):
            raise TypeError("headers needs to be dictionary")

        r = requests.post(url='{0}/proxy/{1}/headers'.format(self.host, self.port),
                          data=json.dumps(headers),
                          headers={'content-type': 'application/json'})
        return r.status_code

    def response_interceptor(self, js):
        """
        Executes the javascript against each response


        :param js: the javascript to execute
        """
        r = requests.post(url='{0}/proxy/{1}/interceptor/response'.format(self.host, self.port),
                  data=js,
                  headers={'content-type': 'x-www-form-urlencoded'})
        return r.status_code

    def request_interceptor(self, js):
        """
        Executes the javascript against each request


        :param js: the javascript to execute
        """
        r = requests.post(url='{0}/proxy/{1}/interceptor/request'.format(self.host, self.port),
                  data=js,
                  headers={'content-type': 'x-www-form-urlencoded'})
        return r.status_code

    LIMITS = {
        'upstream_kbps': 'upstreamKbps',
        'downstream_kbps': 'downstreamKbps',
        'latency': 'latency'
    }

    def limits(self, options):
        """
        Limit the bandwidth through the proxy.


        :param options: A dictionary with all the details you want to set. \
                        downstreamKbps - Sets the downstream kbps \
                        upstreamKbps - Sets the upstream kbps \
                        latency - Add the given latency to each HTTP request
        """
        params = {}

        for (k, v) in list(options.items()):
            if k not in self.LIMITS:
                raise KeyError('invalid key: {}'.format(k))

            params[self.LIMITS[k]] = int(v)

        if len(list(params.items())) == 0:
            raise KeyError("You need to specify one of the valid Keys")

        r = requests.put('{0}/proxy/{1}/limit'.format(self.host, self.port),
                         params)
        return r.status_code

    TIMEOUTS = {
        'request': 'requestTimeout',
        'read': 'readTimeout',
        'connection': 'connectionTimeout',
        'dns': 'dnsCacheTimeout'
    }

    def timeouts(self, options):
        """
        Configure various timeouts in the proxy


        :param options: A dictionary with all the details you want to set. \
                        request - request timeout (in seconds) \
                        read - read timeout (in seconds) \
                        connection - connection timeout (in seconds) \
                        dns - dns lookup timeout (in seconds)
        """
        params = {}

        for (k, v) in list(options.items()):
            if k not in self.TIMEOUTS:
                raise KeyError('invalid key: {}'.format(k))

            params[self.TIMEOUTS[k]] = int(v)

        if len(list(params.items())) == 0:
            raise KeyError("You need to specify one of the valid Keys")

        r = requests.put('{0}/proxy/{1}/timeout'.format(self.host, self.port),
                         params)
        return r.status_code

    def remap_hosts(self, address, ip_address):
        """
        Remap the hosts for a specific URL


        :param address: url that you wish to remap
        :param ip_address: IP Address that will handle all traffic for the address passed in
        """
        assert address is not None and ip_address is not None
        r = requests.post('{0}/proxy/{1}/hosts'.format(self.host, self.port),
                         json.dumps({address: ip_address}),
                          headers={'content-type': 'application/json'})
        return r.status_code

    def wait_for_traffic_to_stop(self, quiet_period, timeout):
        """
        Waits for the network to be quiet


        :param quiet_period: number of miliseconds the network needs to be quiet for
        :param timeout: max number of miliseconds to wait
        """
        r = requests.put('{0}/proxy/{1}/wait'.format(self.host, self.port),
                 {'quietPeriodInMs': quiet_period, 'timeoutInMs': timeout})
        return r.status_code

    def clear_dns_cache(self):
        """
        Clears the DNS cache associated with the proxy instance
        """
        r = requests.delete('{0}/proxy/{1}/dns/cache'.format(self.host, self.port))
        return r.status_code

    def rewrite_url(self, match, replace):
        """
        Rewrites the requested url.


        :param match: a regex to match requests with
        :param replace: unicode \
                   a string to replace the matches with
        """
        params = {
            "matchRegex": match,
            "replace": replace
        }
        r = requests.put('{0}/proxy/{1}/rewrite'.format(self.host, self.port),
                         params)
        return r.status_code

    def retry(self, retry_count):
        """
        Retries. No idea what its used for, but its in the API...


        :param retry_count: the number of retries
        """
        r = requests.put('{0}/proxy/{1}/retry'.format(self.host, self.port),
                 {'retrycount': retry_count})
        return r.status_code