1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
import json, os, urllib, urlparse
def redirect(url, response):
response.add_required_headers = False
response.writer.write_status(301)
response.writer.write_header("access-control-allow-origin", "*")
response.writer.write_header("location", url)
response.writer.end_headers()
response.writer.write("")
def create_redirect_url(request, swap_scheme = False):
parsed = urlparse.urlsplit(request.url)
destination_netloc = parsed.netloc
scheme = parsed.scheme
if swap_scheme:
scheme = "http" if parsed.scheme == "https" else "https"
hostname = parsed.netloc.split(':')[0]
port = request.server.config["ports"][scheme][0]
destination_netloc = ":".join([hostname, str(port)])
# Remove "redirection" from query to avoid redirect loops.
parsed_query = dict(urlparse.parse_qsl(parsed.query))
assert "redirection" in parsed_query
del parsed_query["redirection"]
destination_url = urlparse.urlunsplit(urlparse.SplitResult(
scheme = scheme,
netloc = destination_netloc,
path = parsed.path,
query = urllib.urlencode(parsed_query),
fragment = None))
return destination_url
def main(request, response):
if "redirection" in request.GET:
redirection = request.GET["redirection"]
if redirection == "no-redirect":
pass
elif redirection == "keep-scheme-redirect":
redirect(create_redirect_url(request, swap_scheme=False), response)
return
elif redirection == "swap-scheme-redirect":
redirect(create_redirect_url(request, swap_scheme=True), response)
return
else:
raise ValueError ("Invalid redirect type: %s" % redirection)
content_type = "text/plain"
response_data = ""
if "action" in request.GET:
action = request.GET["action"]
if "content_type" in request.GET:
content_type = request.GET["content_type"]
key = request.GET["key"]
stash = request.server.stash
path = request.GET.get("path", request.url.split('?'))[0]
if action == "put":
value = request.GET["value"]
stash.take(key=key, path=path)
stash.put(key=key, value=value, path=path)
response_data = json.dumps({"status": "success", "result": key})
elif action == "purge":
value = stash.take(key=key, path=path)
if content_type == "image/png":
response_data = open(os.path.join(request.doc_root,
"images",
"smiley.png"), "rb").read()
elif content_type == "audio/mpeg":
response_data = open(os.path.join(request.doc_root,
"media",
"sound_5.oga"), "rb").read()
elif content_type == "video/mp4":
response_data = open(os.path.join(request.doc_root,
"media",
"movie_5.mp4"), "rb").read()
elif content_type == "application/javascript":
response_data = open(os.path.join(request.doc_root,
"mixed-content",
"generic",
"worker.js"), "rb").read()
else:
response_data = "/* purged */"
elif action == "take":
value = stash.take(key=key, path=path)
if value is None:
status = "allowed"
else:
status = "blocked"
response_data = json.dumps({"status": status, "result": value})
response.add_required_headers = False
response.writer.write_status(200)
response.writer.write_header("content-type", content_type)
response.writer.write_header("cache-control", "no-cache; must-revalidate")
response.writer.end_headers()
response.writer.write(response_data)
|