summaryrefslogtreecommitdiffstats
path: root/testing/marionette/harness/marionette_harness/runner/mixins/browsermob-proxy-py/browsermobproxy/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'testing/marionette/harness/marionette_harness/runner/mixins/browsermob-proxy-py/browsermobproxy/client.py')
-rw-r--r--testing/marionette/harness/marionette_harness/runner/mixins/browsermob-proxy-py/browsermobproxy/client.py326
1 files changed, 326 insertions, 0 deletions
diff --git a/testing/marionette/harness/marionette_harness/runner/mixins/browsermob-proxy-py/browsermobproxy/client.py b/testing/marionette/harness/marionette_harness/runner/mixins/browsermob-proxy-py/browsermobproxy/client.py
new file mode 100644
index 000000000..00b98ca5e
--- /dev/null
+++ b/testing/marionette/harness/marionette_harness/runner/mixins/browsermob-proxy-py/browsermobproxy/client.py
@@ -0,0 +1,326 @@
+import requests
+
+try:
+ from urllib.parse import urlencode, unquote
+except ImportError:
+ from urllib import urlencode, unquote
+import json
+
+
+class Client(object):
+ def __init__(self, url, params={}):
+ """
+ Initialises a new Client object
+
+
+ :param url: This is where the BrowserMob Proxy lives
+ :param params: URL query (for example httpProxy and httpsProxy vars)
+ """
+ self.host = "http://" + url
+ if params:
+ urlparams = "?" + unquote(urlencode(params))
+ else:
+ urlparams = ""
+ resp = requests.post('{}/proxy'.format(self.host + urlparams))
+ jcontent = json.loads(resp.content.decode('utf-8'))
+ self.port = jcontent['port']
+ url_parts = self.host.split(":")
+ self.proxy = url_parts[1][2:] + ":" + str(self.port)
+
+ def close(self):
+ """
+ shuts down the proxy and closes the port
+ """
+ r = requests.delete('{0}/proxy/{1}'.format(self.host, self.port))
+ return r.status_code
+
+ # webdriver integration
+ # ...as a proxy object
+ def selenium_proxy(self):
+ """
+ Returns a Selenium WebDriver Proxy class with details of the HTTP Proxy
+ """
+ from selenium import webdriver
+ return webdriver.Proxy({
+ "httpProxy": self.proxy,
+ "sslProxy": self.proxy,
+ })
+
+ def webdriver_proxy(self):
+ """
+ Returns a Selenium WebDriver Proxy class with details of the HTTP Proxy
+ """
+ return self.selenium_proxy()
+
+ # ...as a capability
+ def add_to_capabilities(self, capabilities):
+ """
+ Adds an 'proxy' entry to a desired capabilities dictionary with the
+ BrowserMob proxy information
+
+
+ :param capabilities: The Desired capabilities object from Selenium WebDriver
+ """
+ capabilities['proxy'] = {
+ 'proxyType': "MANUAL",
+ 'httpProxy': self.proxy,
+ 'sslProxy': self.proxy
+ }
+
+ def add_to_webdriver_capabilities(self, capabilities):
+ self.add_to_capabilities(capabilities)
+
+ # browsermob proxy api
+ @property
+ def har(self):
+ """
+ Gets the HAR that has been recorded
+ """
+ r = requests.get('{0}/proxy/{1}/har'.format(self.host, self.port))
+
+ return r.json()
+
+ def new_har(self, ref=None, options={}):
+ """
+ This sets a new HAR to be recorded
+
+
+ :param ref: A reference for the HAR. Defaults to None
+ :param options: A dictionary that will be passed to BrowserMob Proxy \
+ with specific keywords. Keywords are: \
+ captureHeaders - Boolean, capture headers \
+ captureContent - Boolean, capture content bodies \
+ captureBinaryContent - Boolean, capture binary content
+ """
+ if ref:
+ payload = {"initialPageRef": ref}
+ else:
+ payload = {}
+ if options:
+ payload.update(options)
+
+ r = requests.put('{0}/proxy/{1}/har'.format(self.host, self.port), payload)
+ if r.status_code == 200:
+ return (r.status_code, r.json())
+ else:
+ return (r.status_code, None)
+
+ def new_page(self, ref=None):
+ """
+ This sets a new page to be recorded
+
+
+ :param ref: A reference for the new page. Defaults to None
+ """
+ if ref:
+ payload = {"pageRef": ref}
+ else:
+ payload = {}
+ r = requests.put('{0}/proxy/{1}/har/pageRef'.format(self.host, self.port),
+ payload)
+ return r.status_code
+
+ def blacklist(self, regexp, status_code):
+ """
+ Sets a list of URL patterns to blacklist
+
+
+ :param regex: a comma separated list of regular expressions
+ :param status_code: the HTTP status code to return for URLs that do not \
+ match the blacklist
+
+ """
+ r = requests.put('{0}/proxy/{1}/blacklist'.format(self.host, self.port),
+ {'regex': regexp, 'status': status_code})
+ return r.status_code
+
+ def whitelist(self, regexp, status_code):
+ """
+ Sets a list of URL patterns to whitelist
+
+
+ :param regex: a comma separated list of regular expressions
+ :param status_code: the HTTP status code to return for URLs that do not \
+ match the whitelist
+ """
+ r = requests.put('{0}/proxy/{1}/whitelist'.format(self.host, self.port),
+ {'regex': regexp, 'status': status_code})
+ return r.status_code
+
+ def basic_authentication(self, domain, username, password):
+ """
+ This add automatic basic authentication
+
+
+ :param domain: domain to set authentication credentials for
+ :param username: valid username to use when authenticating
+ :param password: valid password to use when authenticating
+ """
+ r = requests.post(url='{0}/proxy/{1}/auth/basic/{2}'.format(self.host, self.port, domain),
+ data=json.dumps({'username': username, 'password': password}),
+ headers={'content-type': 'application/json'})
+ return r.status_code
+
+ def headers(self, headers):
+ """
+ This sets the headers that will set by the proxy on all requests
+
+
+ :param headers: this is a dictionary of the headers to be set
+ """
+ if not isinstance(headers, dict):
+ raise TypeError("headers needs to be dictionary")
+
+ r = requests.post(url='{0}/proxy/{1}/headers'.format(self.host, self.port),
+ data=json.dumps(headers),
+ headers={'content-type': 'application/json'})
+ return r.status_code
+
+ def response_interceptor(self, js):
+ """
+ Executes the javascript against each response
+
+
+ :param js: the javascript to execute
+ """
+ r = requests.post(url='{0}/proxy/{1}/interceptor/response'.format(self.host, self.port),
+ data=js,
+ headers={'content-type': 'x-www-form-urlencoded'})
+ return r.status_code
+
+ def request_interceptor(self, js):
+ """
+ Executes the javascript against each request
+
+
+ :param js: the javascript to execute
+ """
+ r = requests.post(url='{0}/proxy/{1}/interceptor/request'.format(self.host, self.port),
+ data=js,
+ headers={'content-type': 'x-www-form-urlencoded'})
+ return r.status_code
+
+ LIMITS = {
+ 'upstream_kbps': 'upstreamKbps',
+ 'downstream_kbps': 'downstreamKbps',
+ 'latency': 'latency'
+ }
+
+ def limits(self, options):
+ """
+ Limit the bandwidth through the proxy.
+
+
+ :param options: A dictionary with all the details you want to set. \
+ downstreamKbps - Sets the downstream kbps \
+ upstreamKbps - Sets the upstream kbps \
+ latency - Add the given latency to each HTTP request
+ """
+ params = {}
+
+ for (k, v) in list(options.items()):
+ if k not in self.LIMITS:
+ raise KeyError('invalid key: {}'.format(k))
+
+ params[self.LIMITS[k]] = int(v)
+
+ if len(list(params.items())) == 0:
+ raise KeyError("You need to specify one of the valid Keys")
+
+ r = requests.put('{0}/proxy/{1}/limit'.format(self.host, self.port),
+ params)
+ return r.status_code
+
+ TIMEOUTS = {
+ 'request': 'requestTimeout',
+ 'read': 'readTimeout',
+ 'connection': 'connectionTimeout',
+ 'dns': 'dnsCacheTimeout'
+ }
+
+ def timeouts(self, options):
+ """
+ Configure various timeouts in the proxy
+
+
+ :param options: A dictionary with all the details you want to set. \
+ request - request timeout (in seconds) \
+ read - read timeout (in seconds) \
+ connection - connection timeout (in seconds) \
+ dns - dns lookup timeout (in seconds)
+ """
+ params = {}
+
+ for (k, v) in list(options.items()):
+ if k not in self.TIMEOUTS:
+ raise KeyError('invalid key: {}'.format(k))
+
+ params[self.TIMEOUTS[k]] = int(v)
+
+ if len(list(params.items())) == 0:
+ raise KeyError("You need to specify one of the valid Keys")
+
+ r = requests.put('{0}/proxy/{1}/timeout'.format(self.host, self.port),
+ params)
+ return r.status_code
+
+ def remap_hosts(self, address, ip_address):
+ """
+ Remap the hosts for a specific URL
+
+
+ :param address: url that you wish to remap
+ :param ip_address: IP Address that will handle all traffic for the address passed in
+ """
+ assert address is not None and ip_address is not None
+ r = requests.post('{0}/proxy/{1}/hosts'.format(self.host, self.port),
+ json.dumps({address: ip_address}),
+ headers={'content-type': 'application/json'})
+ return r.status_code
+
+ def wait_for_traffic_to_stop(self, quiet_period, timeout):
+ """
+ Waits for the network to be quiet
+
+
+ :param quiet_period: number of miliseconds the network needs to be quiet for
+ :param timeout: max number of miliseconds to wait
+ """
+ r = requests.put('{0}/proxy/{1}/wait'.format(self.host, self.port),
+ {'quietPeriodInMs': quiet_period, 'timeoutInMs': timeout})
+ return r.status_code
+
+ def clear_dns_cache(self):
+ """
+ Clears the DNS cache associated with the proxy instance
+ """
+ r = requests.delete('{0}/proxy/{1}/dns/cache'.format(self.host, self.port))
+ return r.status_code
+
+ def rewrite_url(self, match, replace):
+ """
+ Rewrites the requested url.
+
+
+ :param match: a regex to match requests with
+ :param replace: unicode \
+ a string to replace the matches with
+ """
+ params = {
+ "matchRegex": match,
+ "replace": replace
+ }
+ r = requests.put('{0}/proxy/{1}/rewrite'.format(self.host, self.port),
+ params)
+ return r.status_code
+
+ def retry(self, retry_count):
+ """
+ Retries. No idea what its used for, but its in the API...
+
+
+ :param retry_count: the number of retries
+ """
+ r = requests.put('{0}/proxy/{1}/retry'.format(self.host, self.port),
+ {'retrycount': retry_count})
+ return r.status_code