summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/tools/pywebsocket/src/test/test_msgutil.py
diff options
context:
space:
mode:
Diffstat (limited to 'testing/web-platform/tests/tools/pywebsocket/src/test/test_msgutil.py')
-rwxr-xr-xtesting/web-platform/tests/tools/pywebsocket/src/test/test_msgutil.py1356
1 files changed, 1356 insertions, 0 deletions
diff --git a/testing/web-platform/tests/tools/pywebsocket/src/test/test_msgutil.py b/testing/web-platform/tests/tools/pywebsocket/src/test/test_msgutil.py
new file mode 100755
index 000000000..5fedcf92f
--- /dev/null
+++ b/testing/web-platform/tests/tools/pywebsocket/src/test/test_msgutil.py
@@ -0,0 +1,1356 @@
+#!/usr/bin/env python
+#
+# Copyright 2012, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+"""Tests for msgutil module."""
+
+
+import array
+import Queue
+import random
+import struct
+import unittest
+import zlib
+
+import set_sys_path # Update sys.path to locate mod_pywebsocket module.
+
+from mod_pywebsocket import common
+from mod_pywebsocket.extensions import DeflateFrameExtensionProcessor
+from mod_pywebsocket.extensions import PerMessageCompressExtensionProcessor
+from mod_pywebsocket.extensions import PerMessageDeflateExtensionProcessor
+from mod_pywebsocket import msgutil
+from mod_pywebsocket.stream import InvalidUTF8Exception
+from mod_pywebsocket.stream import Stream
+from mod_pywebsocket.stream import StreamHixie75
+from mod_pywebsocket.stream import StreamOptions
+from mod_pywebsocket import util
+from test import mock
+
+
+# We use one fixed nonce for testing instead of cryptographically secure PRNG.
+_MASKING_NONCE = 'ABCD'
+
+
+def _mask_hybi(frame):
+ frame_key = map(ord, _MASKING_NONCE)
+ frame_key_len = len(frame_key)
+ result = array.array('B')
+ result.fromstring(frame)
+ count = 0
+ for i in xrange(len(result)):
+ result[i] ^= frame_key[count]
+ count = (count + 1) % frame_key_len
+ return _MASKING_NONCE + result.tostring()
+
+
+def _install_extension_processor(processor, request, stream_options):
+ response = processor.get_extension_response()
+ if response is not None:
+ processor.setup_stream_options(stream_options)
+ request.ws_extension_processors.append(processor)
+
+
+def _create_request_from_rawdata(
+ read_data,
+ deflate_frame_request=None,
+ permessage_compression_request=None,
+ permessage_deflate_request=None):
+ req = mock.MockRequest(connection=mock.MockConn(''.join(read_data)))
+ req.ws_version = common.VERSION_HYBI_LATEST
+ req.ws_extension_processors = []
+
+ processor = None
+ if deflate_frame_request is not None:
+ processor = DeflateFrameExtensionProcessor(deflate_frame_request)
+ elif permessage_compression_request is not None:
+ processor = PerMessageCompressExtensionProcessor(
+ permessage_compression_request)
+ elif permessage_deflate_request is not None:
+ processor = PerMessageDeflateExtensionProcessor(
+ permessage_deflate_request)
+
+ stream_options = StreamOptions()
+ if processor is not None:
+ _install_extension_processor(processor, req, stream_options)
+ req.ws_stream = Stream(req, stream_options)
+
+ return req
+
+
+def _create_request(*frames):
+ """Creates MockRequest using data given as frames.
+
+ frames will be returned on calling request.connection.read() where request
+ is MockRequest returned by this function.
+ """
+
+ read_data = []
+ for (header, body) in frames:
+ read_data.append(header + _mask_hybi(body))
+
+ return _create_request_from_rawdata(read_data)
+
+
+def _create_blocking_request():
+ """Creates MockRequest.
+
+ Data written to a MockRequest can be read out by calling
+ request.connection.written_data().
+ """
+
+ req = mock.MockRequest(connection=mock.MockBlockingConn())
+ req.ws_version = common.VERSION_HYBI_LATEST
+ stream_options = StreamOptions()
+ req.ws_stream = Stream(req, stream_options)
+ return req
+
+
+def _create_request_hixie75(read_data=''):
+ req = mock.MockRequest(connection=mock.MockConn(read_data))
+ req.ws_stream = StreamHixie75(req)
+ return req
+
+
+def _create_blocking_request_hixie75():
+ req = mock.MockRequest(connection=mock.MockBlockingConn())
+ req.ws_stream = StreamHixie75(req)
+ return req
+
+
+class BasicMessageTest(unittest.TestCase):
+ """Basic tests for Stream."""
+
+ def test_send_message(self):
+ request = _create_request()
+ msgutil.send_message(request, 'Hello')
+ self.assertEqual('\x81\x05Hello', request.connection.written_data())
+
+ payload = 'a' * 125
+ request = _create_request()
+ msgutil.send_message(request, payload)
+ self.assertEqual('\x81\x7d' + payload,
+ request.connection.written_data())
+
+ def test_send_medium_message(self):
+ payload = 'a' * 126
+ request = _create_request()
+ msgutil.send_message(request, payload)
+ self.assertEqual('\x81\x7e\x00\x7e' + payload,
+ request.connection.written_data())
+
+ payload = 'a' * ((1 << 16) - 1)
+ request = _create_request()
+ msgutil.send_message(request, payload)
+ self.assertEqual('\x81\x7e\xff\xff' + payload,
+ request.connection.written_data())
+
+ def test_send_large_message(self):
+ payload = 'a' * (1 << 16)
+ request = _create_request()
+ msgutil.send_message(request, payload)
+ self.assertEqual('\x81\x7f\x00\x00\x00\x00\x00\x01\x00\x00' + payload,
+ request.connection.written_data())
+
+ def test_send_message_unicode(self):
+ request = _create_request()
+ msgutil.send_message(request, u'\u65e5')
+ # U+65e5 is encoded as e6,97,a5 in UTF-8
+ self.assertEqual('\x81\x03\xe6\x97\xa5',
+ request.connection.written_data())
+
+ def test_send_message_fragments(self):
+ request = _create_request()
+ msgutil.send_message(request, 'Hello', False)
+ msgutil.send_message(request, ' ', False)
+ msgutil.send_message(request, 'World', False)
+ msgutil.send_message(request, '!', True)
+ self.assertEqual('\x01\x05Hello\x00\x01 \x00\x05World\x80\x01!',
+ request.connection.written_data())
+
+ def test_send_fragments_immediate_zero_termination(self):
+ request = _create_request()
+ msgutil.send_message(request, 'Hello World!', False)
+ msgutil.send_message(request, '', True)
+ self.assertEqual('\x01\x0cHello World!\x80\x00',
+ request.connection.written_data())
+
+ def test_receive_message(self):
+ request = _create_request(
+ ('\x81\x85', 'Hello'), ('\x81\x86', 'World!'))
+ self.assertEqual('Hello', msgutil.receive_message(request))
+ self.assertEqual('World!', msgutil.receive_message(request))
+
+ payload = 'a' * 125
+ request = _create_request(('\x81\xfd', payload))
+ self.assertEqual(payload, msgutil.receive_message(request))
+
+ def test_receive_medium_message(self):
+ payload = 'a' * 126
+ request = _create_request(('\x81\xfe\x00\x7e', payload))
+ self.assertEqual(payload, msgutil.receive_message(request))
+
+ payload = 'a' * ((1 << 16) - 1)
+ request = _create_request(('\x81\xfe\xff\xff', payload))
+ self.assertEqual(payload, msgutil.receive_message(request))
+
+ def test_receive_large_message(self):
+ payload = 'a' * (1 << 16)
+ request = _create_request(
+ ('\x81\xff\x00\x00\x00\x00\x00\x01\x00\x00', payload))
+ self.assertEqual(payload, msgutil.receive_message(request))
+
+ def test_receive_length_not_encoded_using_minimal_number_of_bytes(self):
+ # Log warning on receiving bad payload length field that doesn't use
+ # minimal number of bytes but continue processing.
+
+ payload = 'a'
+ # 1 byte can be represented without extended payload length field.
+ request = _create_request(
+ ('\x81\xff\x00\x00\x00\x00\x00\x00\x00\x01', payload))
+ self.assertEqual(payload, msgutil.receive_message(request))
+
+ def test_receive_message_unicode(self):
+ request = _create_request(('\x81\x83', '\xe6\x9c\xac'))
+ # U+672c is encoded as e6,9c,ac in UTF-8
+ self.assertEqual(u'\u672c', msgutil.receive_message(request))
+
+ def test_receive_message_erroneous_unicode(self):
+ # \x80 and \x81 are invalid as UTF-8.
+ request = _create_request(('\x81\x82', '\x80\x81'))
+ # Invalid characters should raise InvalidUTF8Exception
+ self.assertRaises(InvalidUTF8Exception,
+ msgutil.receive_message,
+ request)
+
+ def test_receive_fragments(self):
+ request = _create_request(
+ ('\x01\x85', 'Hello'),
+ ('\x00\x81', ' '),
+ ('\x00\x85', 'World'),
+ ('\x80\x81', '!'))
+ self.assertEqual('Hello World!', msgutil.receive_message(request))
+
+ def test_receive_fragments_unicode(self):
+ # UTF-8 encodes U+6f22 into e6bca2 and U+5b57 into e5ad97.
+ request = _create_request(
+ ('\x01\x82', '\xe6\xbc'),
+ ('\x00\x82', '\xa2\xe5'),
+ ('\x80\x82', '\xad\x97'))
+ self.assertEqual(u'\u6f22\u5b57', msgutil.receive_message(request))
+
+ def test_receive_fragments_immediate_zero_termination(self):
+ request = _create_request(
+ ('\x01\x8c', 'Hello World!'), ('\x80\x80', ''))
+ self.assertEqual('Hello World!', msgutil.receive_message(request))
+
+ def test_receive_fragments_duplicate_start(self):
+ request = _create_request(
+ ('\x01\x85', 'Hello'), ('\x01\x85', 'World'))
+ self.assertRaises(msgutil.InvalidFrameException,
+ msgutil.receive_message,
+ request)
+
+ def test_receive_fragments_intermediate_but_not_started(self):
+ request = _create_request(('\x00\x85', 'Hello'))
+ self.assertRaises(msgutil.InvalidFrameException,
+ msgutil.receive_message,
+ request)
+
+ def test_receive_fragments_end_but_not_started(self):
+ request = _create_request(('\x80\x85', 'Hello'))
+ self.assertRaises(msgutil.InvalidFrameException,
+ msgutil.receive_message,
+ request)
+
+ def test_receive_message_discard(self):
+ request = _create_request(
+ ('\x8f\x86', 'IGNORE'), ('\x81\x85', 'Hello'),
+ ('\x8f\x89', 'DISREGARD'), ('\x81\x86', 'World!'))
+ self.assertRaises(msgutil.UnsupportedFrameException,
+ msgutil.receive_message, request)
+ self.assertEqual('Hello', msgutil.receive_message(request))
+ self.assertRaises(msgutil.UnsupportedFrameException,
+ msgutil.receive_message, request)
+ self.assertEqual('World!', msgutil.receive_message(request))
+
+ def test_receive_close(self):
+ request = _create_request(
+ ('\x88\x8a', struct.pack('!H', 1000) + 'Good bye'))
+ self.assertEqual(None, msgutil.receive_message(request))
+ self.assertEqual(1000, request.ws_close_code)
+ self.assertEqual('Good bye', request.ws_close_reason)
+
+ def test_send_longest_close(self):
+ reason = 'a' * 123
+ request = _create_request(
+ ('\x88\xfd',
+ struct.pack('!H', common.STATUS_NORMAL_CLOSURE) + reason))
+ request.ws_stream.close_connection(common.STATUS_NORMAL_CLOSURE,
+ reason)
+ self.assertEqual(request.ws_close_code, common.STATUS_NORMAL_CLOSURE)
+ self.assertEqual(request.ws_close_reason, reason)
+
+ def test_send_close_too_long(self):
+ request = _create_request()
+ self.assertRaises(msgutil.BadOperationException,
+ Stream.close_connection,
+ request.ws_stream,
+ common.STATUS_NORMAL_CLOSURE,
+ 'a' * 124)
+
+ def test_send_close_inconsistent_code_and_reason(self):
+ request = _create_request()
+ # reason parameter must not be specified when code is None.
+ self.assertRaises(msgutil.BadOperationException,
+ Stream.close_connection,
+ request.ws_stream,
+ None,
+ 'a')
+
+ def test_send_ping(self):
+ request = _create_request()
+ msgutil.send_ping(request, 'Hello World!')
+ self.assertEqual('\x89\x0cHello World!',
+ request.connection.written_data())
+
+ def test_send_longest_ping(self):
+ request = _create_request()
+ msgutil.send_ping(request, 'a' * 125)
+ self.assertEqual('\x89\x7d' + 'a' * 125,
+ request.connection.written_data())
+
+ def test_send_ping_too_long(self):
+ request = _create_request()
+ self.assertRaises(msgutil.BadOperationException,
+ msgutil.send_ping,
+ request,
+ 'a' * 126)
+
+ def test_receive_ping(self):
+ """Tests receiving a ping control frame."""
+
+ def handler(request, message):
+ request.called = True
+
+ # Stream automatically respond to ping with pong without any action
+ # by application layer.
+ request = _create_request(
+ ('\x89\x85', 'Hello'), ('\x81\x85', 'World'))
+ self.assertEqual('World', msgutil.receive_message(request))
+ self.assertEqual('\x8a\x05Hello',
+ request.connection.written_data())
+
+ request = _create_request(
+ ('\x89\x85', 'Hello'), ('\x81\x85', 'World'))
+ request.on_ping_handler = handler
+ self.assertEqual('World', msgutil.receive_message(request))
+ self.assertTrue(request.called)
+
+ def test_receive_longest_ping(self):
+ request = _create_request(
+ ('\x89\xfd', 'a' * 125), ('\x81\x85', 'World'))
+ self.assertEqual('World', msgutil.receive_message(request))
+ self.assertEqual('\x8a\x7d' + 'a' * 125,
+ request.connection.written_data())
+
+ def test_receive_ping_too_long(self):
+ request = _create_request(('\x89\xfe\x00\x7e', 'a' * 126))
+ self.assertRaises(msgutil.InvalidFrameException,
+ msgutil.receive_message,
+ request)
+
+ def test_receive_pong(self):
+ """Tests receiving a pong control frame."""
+
+ def handler(request, message):
+ request.called = True
+
+ request = _create_request(
+ ('\x8a\x85', 'Hello'), ('\x81\x85', 'World'))
+ request.on_pong_handler = handler
+ msgutil.send_ping(request, 'Hello')
+ self.assertEqual('\x89\x05Hello',
+ request.connection.written_data())
+ # Valid pong is received, but receive_message won't return for it.
+ self.assertEqual('World', msgutil.receive_message(request))
+ # Check that nothing was written after receive_message call.
+ self.assertEqual('\x89\x05Hello',
+ request.connection.written_data())
+
+ self.assertTrue(request.called)
+
+ def test_receive_unsolicited_pong(self):
+ # Unsolicited pong is allowed from HyBi 07.
+ request = _create_request(
+ ('\x8a\x85', 'Hello'), ('\x81\x85', 'World'))
+ msgutil.receive_message(request)
+
+ request = _create_request(
+ ('\x8a\x85', 'Hello'), ('\x81\x85', 'World'))
+ msgutil.send_ping(request, 'Jumbo')
+ # Body mismatch.
+ msgutil.receive_message(request)
+
+ def test_ping_cannot_be_fragmented(self):
+ request = _create_request(('\x09\x85', 'Hello'))
+ self.assertRaises(msgutil.InvalidFrameException,
+ msgutil.receive_message,
+ request)
+
+ def test_ping_with_too_long_payload(self):
+ request = _create_request(('\x89\xfe\x01\x00', 'a' * 256))
+ self.assertRaises(msgutil.InvalidFrameException,
+ msgutil.receive_message,
+ request)
+
+
+class DeflateFrameTest(unittest.TestCase):
+ """Tests for checking deflate-frame extension."""
+
+ def test_send_message(self):
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ request = _create_request_from_rawdata(
+ '', deflate_frame_request=extension)
+ msgutil.send_message(request, 'Hello')
+ msgutil.send_message(request, 'World')
+
+ expected = ''
+
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_hello = compressed_hello[:-4]
+ expected += '\xc1%c' % len(compressed_hello)
+ expected += compressed_hello
+
+ compressed_world = compress.compress('World')
+ compressed_world += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_world = compressed_world[:-4]
+ expected += '\xc1%c' % len(compressed_world)
+ expected += compressed_world
+
+ self.assertEqual(expected, request.connection.written_data())
+
+ def test_send_message_bfinal(self):
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ request = _create_request_from_rawdata(
+ '', deflate_frame_request=extension)
+ self.assertEquals(1, len(request.ws_extension_processors))
+ deflate_frame_processor = request.ws_extension_processors[0]
+ deflate_frame_processor.set_bfinal(True)
+ msgutil.send_message(request, 'Hello')
+ msgutil.send_message(request, 'World')
+
+ expected = ''
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_FINISH)
+ compressed_hello = compressed_hello + chr(0)
+ expected += '\xc1%c' % len(compressed_hello)
+ expected += compressed_hello
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+ compressed_world = compress.compress('World')
+ compressed_world += compress.flush(zlib.Z_FINISH)
+ compressed_world = compressed_world + chr(0)
+ expected += '\xc1%c' % len(compressed_world)
+ expected += compressed_world
+
+ self.assertEqual(expected, request.connection.written_data())
+
+ def test_send_message_comp_bit(self):
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ request = _create_request_from_rawdata(
+ '', deflate_frame_request=extension)
+ self.assertEquals(1, len(request.ws_extension_processors))
+ deflate_frame_processor = request.ws_extension_processors[0]
+ msgutil.send_message(request, 'Hello')
+ deflate_frame_processor.disable_outgoing_compression()
+ msgutil.send_message(request, 'Hello')
+ deflate_frame_processor.enable_outgoing_compression()
+ msgutil.send_message(request, 'Hello')
+
+ expected = ''
+
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_hello = compressed_hello[:-4]
+ expected += '\xc1%c' % len(compressed_hello)
+ expected += compressed_hello
+
+ expected += '\x81\x05Hello'
+
+ compressed_2nd_hello = compress.compress('Hello')
+ compressed_2nd_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_2nd_hello = compressed_2nd_hello[:-4]
+ expected += '\xc1%c' % len(compressed_2nd_hello)
+ expected += compressed_2nd_hello
+
+ self.assertEqual(expected, request.connection.written_data())
+
+ def test_send_message_no_context_takeover_parameter(self):
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ extension.add_parameter('no_context_takeover', None)
+ request = _create_request_from_rawdata(
+ '', deflate_frame_request=extension)
+ for i in xrange(3):
+ msgutil.send_message(request, 'Hello')
+
+ compressed_message = compress.compress('Hello')
+ compressed_message += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_message = compressed_message[:-4]
+ expected = '\xc1%c' % len(compressed_message)
+ expected += compressed_message
+
+ self.assertEqual(
+ expected + expected + expected, request.connection.written_data())
+
+ def test_bad_request_parameters(self):
+ """Tests that if there's anything wrong with deflate-frame extension
+ request, deflate-frame is rejected.
+ """
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ # max_window_bits less than 8 is illegal.
+ extension.add_parameter('max_window_bits', '7')
+ processor = DeflateFrameExtensionProcessor(extension)
+ self.assertEqual(None, processor.get_extension_response())
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ # max_window_bits greater than 15 is illegal.
+ extension.add_parameter('max_window_bits', '16')
+ processor = DeflateFrameExtensionProcessor(extension)
+ self.assertEqual(None, processor.get_extension_response())
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ # Non integer max_window_bits is illegal.
+ extension.add_parameter('max_window_bits', 'foobar')
+ processor = DeflateFrameExtensionProcessor(extension)
+ self.assertEqual(None, processor.get_extension_response())
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ # no_context_takeover must not have any value.
+ extension.add_parameter('no_context_takeover', 'foobar')
+ processor = DeflateFrameExtensionProcessor(extension)
+ self.assertEqual(None, processor.get_extension_response())
+
+ def test_response_parameters(self):
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ processor = DeflateFrameExtensionProcessor(extension)
+ processor.set_response_window_bits(8)
+ response = processor.get_extension_response()
+ self.assertTrue(response.has_parameter('max_window_bits'))
+ self.assertEqual('8', response.get_parameter_value('max_window_bits'))
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ processor = DeflateFrameExtensionProcessor(extension)
+ processor.set_response_no_context_takeover(True)
+ response = processor.get_extension_response()
+ self.assertTrue(response.has_parameter('no_context_takeover'))
+ self.assertTrue(
+ response.get_parameter_value('no_context_takeover') is None)
+
+ def test_receive_message(self):
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ data = ''
+
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_hello = compressed_hello[:-4]
+ data += '\xc1%c' % (len(compressed_hello) | 0x80)
+ data += _mask_hybi(compressed_hello)
+
+ compressed_websocket = compress.compress('WebSocket')
+ compressed_websocket += compress.flush(zlib.Z_FINISH)
+ compressed_websocket += '\x00'
+ data += '\xc1%c' % (len(compressed_websocket) | 0x80)
+ data += _mask_hybi(compressed_websocket)
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ compressed_world = compress.compress('World')
+ compressed_world += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_world = compressed_world[:-4]
+ data += '\xc1%c' % (len(compressed_world) | 0x80)
+ data += _mask_hybi(compressed_world)
+
+ # Close frame
+ data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye')
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ request = _create_request_from_rawdata(
+ data, deflate_frame_request=extension)
+ self.assertEqual('Hello', msgutil.receive_message(request))
+ self.assertEqual('WebSocket', msgutil.receive_message(request))
+ self.assertEqual('World', msgutil.receive_message(request))
+
+ self.assertEqual(None, msgutil.receive_message(request))
+
+ def test_receive_message_client_using_smaller_window(self):
+ """Test that frames coming from a client which is using smaller window
+ size that the server are correctly received.
+ """
+
+ # Using the smallest window bits of 8 for generating input frames.
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -8)
+
+ data = ''
+
+ # Use a frame whose content is bigger than the clients' DEFLATE window
+ # size before compression. The content mainly consists of 'a' but
+ # repetition of 'b' is put at the head and tail so that if the window
+ # size is big, the head is back-referenced but if small, not.
+ payload = 'b' * 64 + 'a' * 1024 + 'b' * 64
+ compressed_hello = compress.compress(payload)
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_hello = compressed_hello[:-4]
+ data += '\xc1%c' % (len(compressed_hello) | 0x80)
+ data += _mask_hybi(compressed_hello)
+
+ # Close frame
+ data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye')
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ request = _create_request_from_rawdata(
+ data, deflate_frame_request=extension)
+ self.assertEqual(payload, msgutil.receive_message(request))
+
+ self.assertEqual(None, msgutil.receive_message(request))
+
+ def test_receive_message_comp_bit(self):
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ data = ''
+
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_hello = compressed_hello[:-4]
+ data += '\xc1%c' % (len(compressed_hello) | 0x80)
+ data += _mask_hybi(compressed_hello)
+
+ data += '\x81\x85' + _mask_hybi('Hello')
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ compressed_2nd_hello = compress.compress('Hello')
+ compressed_2nd_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_2nd_hello = compressed_2nd_hello[:-4]
+ data += '\xc1%c' % (len(compressed_2nd_hello) | 0x80)
+ data += _mask_hybi(compressed_2nd_hello)
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ request = _create_request_from_rawdata(
+ data, deflate_frame_request=extension)
+ for i in xrange(3):
+ self.assertEqual('Hello', msgutil.receive_message(request))
+
+ def test_receive_message_various_btype(self):
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ data = ''
+
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_hello = compressed_hello[:-4]
+ data += '\xc1%c' % (len(compressed_hello) | 0x80)
+ data += _mask_hybi(compressed_hello)
+
+ compressed_websocket = compress.compress('WebSocket')
+ compressed_websocket += compress.flush(zlib.Z_FINISH)
+ compressed_websocket += '\x00'
+ data += '\xc1%c' % (len(compressed_websocket) | 0x80)
+ data += _mask_hybi(compressed_websocket)
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ compressed_world = compress.compress('World')
+ compressed_world += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_world = compressed_world[:-4]
+ data += '\xc1%c' % (len(compressed_world) | 0x80)
+ data += _mask_hybi(compressed_world)
+
+ # Close frame
+ data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye')
+
+ extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION)
+ request = _create_request_from_rawdata(
+ data, deflate_frame_request=extension)
+ self.assertEqual('Hello', msgutil.receive_message(request))
+ self.assertEqual('WebSocket', msgutil.receive_message(request))
+ self.assertEqual('World', msgutil.receive_message(request))
+
+ self.assertEqual(None, msgutil.receive_message(request))
+
+
+class PerMessageDeflateTest(unittest.TestCase):
+ """Tests for permessage-deflate extension."""
+
+ def test_send_message(self):
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ '', permessage_deflate_request=extension)
+ msgutil.send_message(request, 'Hello')
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_hello = compressed_hello[:-4]
+ expected = '\xc1%c' % len(compressed_hello)
+ expected += compressed_hello
+ self.assertEqual(expected, request.connection.written_data())
+
+ def test_send_empty_message(self):
+ """Test that an empty message is compressed correctly."""
+
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ '', permessage_deflate_request=extension)
+
+ msgutil.send_message(request, '')
+
+ # Payload in binary: 0b00000010 0b00000000
+ # From LSB,
+ # - 1 bit of BFINAL (0)
+ # - 2 bits of BTYPE (01 that means fixed Huffman)
+ # - 7 bits of the first code (0000000 that is the code for the
+ # end-of-block)
+ # - 1 bit of BFINAL (0)
+ # - 2 bits of BTYPE (no compression)
+ # - 3 bits of padding
+ self.assertEqual('\xc1\x02\x02\x00',
+ request.connection.written_data())
+
+ def test_send_message_with_null_character(self):
+ """Test that a simple payload (one null) is framed correctly."""
+
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ '', permessage_deflate_request=extension)
+
+ msgutil.send_message(request, '\x00')
+
+ # Payload in binary: 0b01100010 0b00000000 0b00000000
+ # From LSB,
+ # - 1 bit of BFINAL (0)
+ # - 2 bits of BTYPE (01 that means fixed Huffman)
+ # - 8 bits of the first code (00110000 that is the code for the literal
+ # alphabet 0x00)
+ # - 7 bits of the second code (0000000 that is the code for the
+ # end-of-block)
+ # - 1 bit of BFINAL (0)
+ # - 2 bits of BTYPE (no compression)
+ # - 2 bits of padding
+ self.assertEqual('\xc1\x03\x62\x00\x00',
+ request.connection.written_data())
+
+ def test_send_two_messages(self):
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ '', permessage_deflate_request=extension)
+ msgutil.send_message(request, 'Hello')
+ msgutil.send_message(request, 'World')
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ expected = ''
+
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_hello = compressed_hello[:-4]
+ expected += '\xc1%c' % len(compressed_hello)
+ expected += compressed_hello
+
+ compressed_world = compress.compress('World')
+ compressed_world += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_world = compressed_world[:-4]
+ expected += '\xc1%c' % len(compressed_world)
+ expected += compressed_world
+
+ self.assertEqual(expected, request.connection.written_data())
+
+ def test_send_message_fragmented(self):
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ '', permessage_deflate_request=extension)
+ msgutil.send_message(request, 'Hello', end=False)
+ msgutil.send_message(request, 'Goodbye', end=False)
+ msgutil.send_message(request, 'World')
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ expected = '\x41%c' % len(compressed_hello)
+ expected += compressed_hello
+ compressed_goodbye = compress.compress('Goodbye')
+ compressed_goodbye += compress.flush(zlib.Z_SYNC_FLUSH)
+ expected += '\x00%c' % len(compressed_goodbye)
+ expected += compressed_goodbye
+ compressed_world = compress.compress('World')
+ compressed_world += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_world = compressed_world[:-4]
+ expected += '\x80%c' % len(compressed_world)
+ expected += compressed_world
+ self.assertEqual(expected, request.connection.written_data())
+
+ def test_send_message_fragmented_empty_first_frame(self):
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ '', permessage_deflate_request=extension)
+ msgutil.send_message(request, '', end=False)
+ msgutil.send_message(request, 'Hello')
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+ compressed_hello = compress.compress('')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ expected = '\x41%c' % len(compressed_hello)
+ expected += compressed_hello
+ compressed_empty = compress.compress('Hello')
+ compressed_empty += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_empty = compressed_empty[:-4]
+ expected += '\x80%c' % len(compressed_empty)
+ expected += compressed_empty
+ print '%r' % expected
+ self.assertEqual(expected, request.connection.written_data())
+
+ def test_send_message_fragmented_empty_last_frame(self):
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ '', permessage_deflate_request=extension)
+ msgutil.send_message(request, 'Hello', end=False)
+ msgutil.send_message(request, '')
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ expected = '\x41%c' % len(compressed_hello)
+ expected += compressed_hello
+ compressed_empty = compress.compress('')
+ compressed_empty += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_empty = compressed_empty[:-4]
+ expected += '\x80%c' % len(compressed_empty)
+ expected += compressed_empty
+ self.assertEqual(expected, request.connection.written_data())
+
+ def test_send_message_using_small_window(self):
+ common_part = 'abcdefghijklmnopqrstuvwxyz'
+ test_message = common_part + '-' * 30000 + common_part
+
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ extension.add_parameter('server_max_window_bits', '8')
+ request = _create_request_from_rawdata(
+ '', permessage_deflate_request=extension)
+ msgutil.send_message(request, test_message)
+
+ expected_websocket_header_size = 2
+ expected_websocket_payload_size = 91
+
+ actual_frame = request.connection.written_data()
+ self.assertEqual(expected_websocket_header_size +
+ expected_websocket_payload_size,
+ len(actual_frame))
+ actual_header = actual_frame[0:expected_websocket_header_size]
+ actual_payload = actual_frame[expected_websocket_header_size:]
+
+ self.assertEqual(
+ '\xc1%c' % expected_websocket_payload_size, actual_header)
+ decompress = zlib.decompressobj(-8)
+ decompressed_message = decompress.decompress(
+ actual_payload + '\x00\x00\xff\xff')
+ decompressed_message += decompress.flush()
+ self.assertEqual(test_message, decompressed_message)
+ self.assertEqual(0, len(decompress.unused_data))
+ self.assertEqual(0, len(decompress.unconsumed_tail))
+
+ def test_send_message_no_context_takeover_parameter(self):
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ extension.add_parameter('server_no_context_takeover', None)
+ request = _create_request_from_rawdata(
+ '', permessage_deflate_request=extension)
+ for i in xrange(3):
+ msgutil.send_message(request, 'Hello', end=False)
+ msgutil.send_message(request, 'Hello', end=True)
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ first_hello = compress.compress('Hello')
+ first_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ expected = '\x41%c' % len(first_hello)
+ expected += first_hello
+ second_hello = compress.compress('Hello')
+ second_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ second_hello = second_hello[:-4]
+ expected += '\x80%c' % len(second_hello)
+ expected += second_hello
+
+ self.assertEqual(
+ expected + expected + expected,
+ request.connection.written_data())
+
+ def test_send_message_fragmented_bfinal(self):
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ '', permessage_deflate_request=extension)
+ self.assertEquals(1, len(request.ws_extension_processors))
+ request.ws_extension_processors[0].set_bfinal(True)
+ msgutil.send_message(request, 'Hello', end=False)
+ msgutil.send_message(request, 'World', end=True)
+
+ expected = ''
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_FINISH)
+ compressed_hello = compressed_hello + chr(0)
+ expected += '\x41%c' % len(compressed_hello)
+ expected += compressed_hello
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+ compressed_world = compress.compress('World')
+ compressed_world += compress.flush(zlib.Z_FINISH)
+ compressed_world = compressed_world + chr(0)
+ expected += '\x80%c' % len(compressed_world)
+ expected += compressed_world
+
+ self.assertEqual(expected, request.connection.written_data())
+
+ def test_receive_message_deflate(self):
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ compressed_hello = compress.compress('Hello')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_hello = compressed_hello[:-4]
+ data = '\xc1%c' % (len(compressed_hello) | 0x80)
+ data += _mask_hybi(compressed_hello)
+
+ # Close frame
+ data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye')
+
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ data, permessage_deflate_request=extension)
+ self.assertEqual('Hello', msgutil.receive_message(request))
+
+ self.assertEqual(None, msgutil.receive_message(request))
+
+ def test_receive_message_random_section(self):
+ """Test that a compressed message fragmented into lots of chunks is
+ correctly received.
+ """
+
+ random.seed(a=0)
+ payload = ''.join(
+ [chr(random.randint(0, 255)) for i in xrange(1000)])
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+ compressed_payload = compress.compress(payload)
+ compressed_payload += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_payload = compressed_payload[:-4]
+
+ # Fragment the compressed payload into lots of frames.
+ bytes_chunked = 0
+ data = ''
+ frame_count = 0
+
+ chunk_sizes = []
+
+ while bytes_chunked < len(compressed_payload):
+ # Make sure that
+ # - the length of chunks are equal or less than 125 so that we can
+ # use 1 octet length header format for all frames.
+ # - at least 10 chunks are created.
+ chunk_size = random.randint(
+ 1, min(125,
+ len(compressed_payload) / 10,
+ len(compressed_payload) - bytes_chunked))
+ chunk_sizes.append(chunk_size)
+ chunk = compressed_payload[
+ bytes_chunked:bytes_chunked + chunk_size]
+ bytes_chunked += chunk_size
+
+ first_octet = 0x00
+ if len(data) == 0:
+ first_octet = first_octet | 0x42
+ if bytes_chunked == len(compressed_payload):
+ first_octet = first_octet | 0x80
+
+ data += '%c%c' % (first_octet, chunk_size | 0x80)
+ data += _mask_hybi(chunk)
+
+ frame_count += 1
+
+ print "Chunk sizes: %r" % chunk_sizes
+ self.assertTrue(len(chunk_sizes) > 10)
+
+ # Close frame
+ data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye')
+
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ data, permessage_deflate_request=extension)
+ self.assertEqual(payload, msgutil.receive_message(request))
+
+ self.assertEqual(None, msgutil.receive_message(request))
+
+ def test_receive_two_messages(self):
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ data = ''
+
+ compressed_hello = compress.compress('HelloWebSocket')
+ compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_hello = compressed_hello[:-4]
+ split_position = len(compressed_hello) / 2
+ data += '\x41%c' % (split_position | 0x80)
+ data += _mask_hybi(compressed_hello[:split_position])
+
+ data += '\x80%c' % ((len(compressed_hello) - split_position) | 0x80)
+ data += _mask_hybi(compressed_hello[split_position:])
+
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS)
+
+ compressed_world = compress.compress('World')
+ compressed_world += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_world = compressed_world[:-4]
+ data += '\xc1%c' % (len(compressed_world) | 0x80)
+ data += _mask_hybi(compressed_world)
+
+ # Close frame
+ data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye')
+
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ data, permessage_deflate_request=extension)
+ self.assertEqual('HelloWebSocket', msgutil.receive_message(request))
+ self.assertEqual('World', msgutil.receive_message(request))
+
+ self.assertEqual(None, msgutil.receive_message(request))
+
+ def test_receive_message_mixed_btype(self):
+ """Test that a message compressed using lots of DEFLATE blocks with
+ various flush mode is correctly received.
+ """
+
+ random.seed(a=0)
+ payload = ''.join(
+ [chr(random.randint(0, 255)) for i in xrange(1000)])
+
+ compress = None
+
+ # Fragment the compressed payload into lots of frames.
+ bytes_chunked = 0
+ compressed_payload = ''
+
+ chunk_sizes = []
+ methods = []
+ sync_used = False
+ finish_used = False
+
+ while bytes_chunked < len(payload):
+ # Make sure at least 10 chunks are created.
+ chunk_size = random.randint(
+ 1, min(100, len(payload) - bytes_chunked))
+ chunk_sizes.append(chunk_size)
+ chunk = payload[bytes_chunked:bytes_chunked + chunk_size]
+
+ bytes_chunked += chunk_size
+
+ if compress is None:
+ compress = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION,
+ zlib.DEFLATED,
+ -zlib.MAX_WBITS)
+
+ if bytes_chunked == len(payload):
+ compressed_payload += compress.compress(chunk)
+ compressed_payload += compress.flush(zlib.Z_SYNC_FLUSH)
+ compressed_payload = compressed_payload[:-4]
+ else:
+ method = random.randint(0, 1)
+ methods.append(method)
+ if method == 0:
+ compressed_payload += compress.compress(chunk)
+ compressed_payload += compress.flush(zlib.Z_SYNC_FLUSH)
+ sync_used = True
+ else:
+ compressed_payload += compress.compress(chunk)
+ compressed_payload += compress.flush(zlib.Z_FINISH)
+ compress = None
+ finish_used = True
+
+ print "Chunk sizes: %r" % chunk_sizes
+ self.assertTrue(len(chunk_sizes) > 10)
+ print "Methods: %r" % methods
+ self.assertTrue(sync_used)
+ self.assertTrue(finish_used)
+
+ self.assertTrue(125 < len(compressed_payload))
+ self.assertTrue(len(compressed_payload) < 65536)
+ data = '\xc2\xfe' + struct.pack('!H', len(compressed_payload))
+ data += _mask_hybi(compressed_payload)
+
+ # Close frame
+ data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye')
+
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_DEFLATE_EXTENSION)
+ request = _create_request_from_rawdata(
+ data, permessage_deflate_request=extension)
+ self.assertEqual(payload, msgutil.receive_message(request))
+
+ self.assertEqual(None, msgutil.receive_message(request))
+
+
+class PerMessageCompressTest(unittest.TestCase):
+ """Tests for checking permessage-compression extension."""
+
+ def test_deflate_response_parameters(self):
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_COMPRESSION_EXTENSION)
+ extension.add_parameter('method', 'deflate')
+ processor = PerMessageCompressExtensionProcessor(extension)
+ response = processor.get_extension_response()
+ self.assertEqual('deflate',
+ response.get_parameter_value('method'))
+
+ extension = common.ExtensionParameter(
+ common.PERMESSAGE_COMPRESSION_EXTENSION)
+ extension.add_parameter('method', 'deflate')
+ processor = PerMessageCompressExtensionProcessor(extension)
+
+ def _compression_processor_hook(compression_processor):
+ compression_processor.set_client_max_window_bits(8)
+ compression_processor.set_client_no_context_takeover(True)
+ processor.set_compression_processor_hook(
+ _compression_processor_hook)
+ response = processor.get_extension_response()
+ self.assertEqual(
+ 'deflate; client_max_window_bits=8; client_no_context_takeover',
+ response.get_parameter_value('method'))
+
+
+class MessageTestHixie75(unittest.TestCase):
+ """Tests for draft-hixie-thewebsocketprotocol-76 stream class."""
+
+ def test_send_message(self):
+ request = _create_request_hixie75()
+ msgutil.send_message(request, 'Hello')
+ self.assertEqual('\x00Hello\xff', request.connection.written_data())
+
+ def test_send_message_unicode(self):
+ request = _create_request_hixie75()
+ msgutil.send_message(request, u'\u65e5')
+ # U+65e5 is encoded as e6,97,a5 in UTF-8
+ self.assertEqual('\x00\xe6\x97\xa5\xff',
+ request.connection.written_data())
+
+ def test_receive_message(self):
+ request = _create_request_hixie75('\x00Hello\xff\x00World!\xff')
+ self.assertEqual('Hello', msgutil.receive_message(request))
+ self.assertEqual('World!', msgutil.receive_message(request))
+
+ def test_receive_message_unicode(self):
+ request = _create_request_hixie75('\x00\xe6\x9c\xac\xff')
+ # U+672c is encoded as e6,9c,ac in UTF-8
+ self.assertEqual(u'\u672c', msgutil.receive_message(request))
+
+ def test_receive_message_erroneous_unicode(self):
+ # \x80 and \x81 are invalid as UTF-8.
+ request = _create_request_hixie75('\x00\x80\x81\xff')
+ # Invalid characters should be replaced with
+ # U+fffd REPLACEMENT CHARACTER
+ self.assertEqual(u'\ufffd\ufffd', msgutil.receive_message(request))
+
+ def test_receive_message_discard(self):
+ request = _create_request_hixie75('\x80\x06IGNORE\x00Hello\xff'
+ '\x01DISREGARD\xff\x00World!\xff')
+ self.assertEqual('Hello', msgutil.receive_message(request))
+ self.assertEqual('World!', msgutil.receive_message(request))
+
+
+class MessageReceiverTest(unittest.TestCase):
+ """Tests the Stream class using MessageReceiver."""
+
+ def test_queue(self):
+ request = _create_blocking_request()
+ receiver = msgutil.MessageReceiver(request)
+
+ self.assertEqual(None, receiver.receive_nowait())
+
+ request.connection.put_bytes('\x81\x86' + _mask_hybi('Hello!'))
+ self.assertEqual('Hello!', receiver.receive())
+
+ def test_onmessage(self):
+ onmessage_queue = Queue.Queue()
+
+ def onmessage_handler(message):
+ onmessage_queue.put(message)
+
+ request = _create_blocking_request()
+ receiver = msgutil.MessageReceiver(request, onmessage_handler)
+
+ request.connection.put_bytes('\x81\x86' + _mask_hybi('Hello!'))
+ self.assertEqual('Hello!', onmessage_queue.get())
+
+
+class MessageReceiverHixie75Test(unittest.TestCase):
+ """Tests the StreamHixie75 class using MessageReceiver."""
+
+ def test_queue(self):
+ request = _create_blocking_request_hixie75()
+ receiver = msgutil.MessageReceiver(request)
+
+ self.assertEqual(None, receiver.receive_nowait())
+
+ request.connection.put_bytes('\x00Hello!\xff')
+ self.assertEqual('Hello!', receiver.receive())
+
+ def test_onmessage(self):
+ onmessage_queue = Queue.Queue()
+
+ def onmessage_handler(message):
+ onmessage_queue.put(message)
+
+ request = _create_blocking_request_hixie75()
+ receiver = msgutil.MessageReceiver(request, onmessage_handler)
+
+ request.connection.put_bytes('\x00Hello!\xff')
+ self.assertEqual('Hello!', onmessage_queue.get())
+
+
+class MessageSenderTest(unittest.TestCase):
+ """Tests the Stream class using MessageSender."""
+
+ def test_send(self):
+ request = _create_blocking_request()
+ sender = msgutil.MessageSender(request)
+
+ sender.send('World')
+ self.assertEqual('\x81\x05World', request.connection.written_data())
+
+ def test_send_nowait(self):
+ # Use a queue to check the bytes written by MessageSender.
+ # request.connection.written_data() cannot be used here because
+ # MessageSender runs in a separate thread.
+ send_queue = Queue.Queue()
+
+ def write(bytes):
+ send_queue.put(bytes)
+
+ request = _create_blocking_request()
+ request.connection.write = write
+
+ sender = msgutil.MessageSender(request)
+
+ sender.send_nowait('Hello')
+ sender.send_nowait('World')
+ self.assertEqual('\x81\x05Hello', send_queue.get())
+ self.assertEqual('\x81\x05World', send_queue.get())
+
+
+class MessageSenderHixie75Test(unittest.TestCase):
+ """Tests the StreamHixie75 class using MessageSender."""
+
+ def test_send(self):
+ request = _create_blocking_request_hixie75()
+ sender = msgutil.MessageSender(request)
+
+ sender.send('World')
+ self.assertEqual('\x00World\xff', request.connection.written_data())
+
+ def test_send_nowait(self):
+ # Use a queue to check the bytes written by MessageSender.
+ # request.connection.written_data() cannot be used here because
+ # MessageSender runs in a separate thread.
+ send_queue = Queue.Queue()
+
+ def write(bytes):
+ send_queue.put(bytes)
+
+ request = _create_blocking_request_hixie75()
+ request.connection.write = write
+
+ sender = msgutil.MessageSender(request)
+
+ sender.send_nowait('Hello')
+ sender.send_nowait('World')
+ self.assertEqual('\x00Hello\xff', send_queue.get())
+ self.assertEqual('\x00World\xff', send_queue.get())
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+
+# vi:sts=4 sw=4 et