diff options
Diffstat (limited to 'testing/web-platform/tests/tools/pywebsocket/src/test/test_msgutil.py')
-rwxr-xr-x | testing/web-platform/tests/tools/pywebsocket/src/test/test_msgutil.py | 1356 |
1 files changed, 1356 insertions, 0 deletions
diff --git a/testing/web-platform/tests/tools/pywebsocket/src/test/test_msgutil.py b/testing/web-platform/tests/tools/pywebsocket/src/test/test_msgutil.py new file mode 100755 index 000000000..5fedcf92f --- /dev/null +++ b/testing/web-platform/tests/tools/pywebsocket/src/test/test_msgutil.py @@ -0,0 +1,1356 @@ +#!/usr/bin/env python +# +# Copyright 2012, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +"""Tests for msgutil module.""" + + +import array +import Queue +import random +import struct +import unittest +import zlib + +import set_sys_path # Update sys.path to locate mod_pywebsocket module. + +from mod_pywebsocket import common +from mod_pywebsocket.extensions import DeflateFrameExtensionProcessor +from mod_pywebsocket.extensions import PerMessageCompressExtensionProcessor +from mod_pywebsocket.extensions import PerMessageDeflateExtensionProcessor +from mod_pywebsocket import msgutil +from mod_pywebsocket.stream import InvalidUTF8Exception +from mod_pywebsocket.stream import Stream +from mod_pywebsocket.stream import StreamHixie75 +from mod_pywebsocket.stream import StreamOptions +from mod_pywebsocket import util +from test import mock + + +# We use one fixed nonce for testing instead of cryptographically secure PRNG. +_MASKING_NONCE = 'ABCD' + + +def _mask_hybi(frame): + frame_key = map(ord, _MASKING_NONCE) + frame_key_len = len(frame_key) + result = array.array('B') + result.fromstring(frame) + count = 0 + for i in xrange(len(result)): + result[i] ^= frame_key[count] + count = (count + 1) % frame_key_len + return _MASKING_NONCE + result.tostring() + + +def _install_extension_processor(processor, request, stream_options): + response = processor.get_extension_response() + if response is not None: + processor.setup_stream_options(stream_options) + request.ws_extension_processors.append(processor) + + +def _create_request_from_rawdata( + read_data, + deflate_frame_request=None, + permessage_compression_request=None, + permessage_deflate_request=None): + req = mock.MockRequest(connection=mock.MockConn(''.join(read_data))) + req.ws_version = common.VERSION_HYBI_LATEST + req.ws_extension_processors = [] + + processor = None + if deflate_frame_request is not None: + processor = DeflateFrameExtensionProcessor(deflate_frame_request) + elif permessage_compression_request is not None: + processor = PerMessageCompressExtensionProcessor( + permessage_compression_request) + elif permessage_deflate_request is not None: + processor = PerMessageDeflateExtensionProcessor( + permessage_deflate_request) + + stream_options = StreamOptions() + if processor is not None: + _install_extension_processor(processor, req, stream_options) + req.ws_stream = Stream(req, stream_options) + + return req + + +def _create_request(*frames): + """Creates MockRequest using data given as frames. + + frames will be returned on calling request.connection.read() where request + is MockRequest returned by this function. + """ + + read_data = [] + for (header, body) in frames: + read_data.append(header + _mask_hybi(body)) + + return _create_request_from_rawdata(read_data) + + +def _create_blocking_request(): + """Creates MockRequest. + + Data written to a MockRequest can be read out by calling + request.connection.written_data(). + """ + + req = mock.MockRequest(connection=mock.MockBlockingConn()) + req.ws_version = common.VERSION_HYBI_LATEST + stream_options = StreamOptions() + req.ws_stream = Stream(req, stream_options) + return req + + +def _create_request_hixie75(read_data=''): + req = mock.MockRequest(connection=mock.MockConn(read_data)) + req.ws_stream = StreamHixie75(req) + return req + + +def _create_blocking_request_hixie75(): + req = mock.MockRequest(connection=mock.MockBlockingConn()) + req.ws_stream = StreamHixie75(req) + return req + + +class BasicMessageTest(unittest.TestCase): + """Basic tests for Stream.""" + + def test_send_message(self): + request = _create_request() + msgutil.send_message(request, 'Hello') + self.assertEqual('\x81\x05Hello', request.connection.written_data()) + + payload = 'a' * 125 + request = _create_request() + msgutil.send_message(request, payload) + self.assertEqual('\x81\x7d' + payload, + request.connection.written_data()) + + def test_send_medium_message(self): + payload = 'a' * 126 + request = _create_request() + msgutil.send_message(request, payload) + self.assertEqual('\x81\x7e\x00\x7e' + payload, + request.connection.written_data()) + + payload = 'a' * ((1 << 16) - 1) + request = _create_request() + msgutil.send_message(request, payload) + self.assertEqual('\x81\x7e\xff\xff' + payload, + request.connection.written_data()) + + def test_send_large_message(self): + payload = 'a' * (1 << 16) + request = _create_request() + msgutil.send_message(request, payload) + self.assertEqual('\x81\x7f\x00\x00\x00\x00\x00\x01\x00\x00' + payload, + request.connection.written_data()) + + def test_send_message_unicode(self): + request = _create_request() + msgutil.send_message(request, u'\u65e5') + # U+65e5 is encoded as e6,97,a5 in UTF-8 + self.assertEqual('\x81\x03\xe6\x97\xa5', + request.connection.written_data()) + + def test_send_message_fragments(self): + request = _create_request() + msgutil.send_message(request, 'Hello', False) + msgutil.send_message(request, ' ', False) + msgutil.send_message(request, 'World', False) + msgutil.send_message(request, '!', True) + self.assertEqual('\x01\x05Hello\x00\x01 \x00\x05World\x80\x01!', + request.connection.written_data()) + + def test_send_fragments_immediate_zero_termination(self): + request = _create_request() + msgutil.send_message(request, 'Hello World!', False) + msgutil.send_message(request, '', True) + self.assertEqual('\x01\x0cHello World!\x80\x00', + request.connection.written_data()) + + def test_receive_message(self): + request = _create_request( + ('\x81\x85', 'Hello'), ('\x81\x86', 'World!')) + self.assertEqual('Hello', msgutil.receive_message(request)) + self.assertEqual('World!', msgutil.receive_message(request)) + + payload = 'a' * 125 + request = _create_request(('\x81\xfd', payload)) + self.assertEqual(payload, msgutil.receive_message(request)) + + def test_receive_medium_message(self): + payload = 'a' * 126 + request = _create_request(('\x81\xfe\x00\x7e', payload)) + self.assertEqual(payload, msgutil.receive_message(request)) + + payload = 'a' * ((1 << 16) - 1) + request = _create_request(('\x81\xfe\xff\xff', payload)) + self.assertEqual(payload, msgutil.receive_message(request)) + + def test_receive_large_message(self): + payload = 'a' * (1 << 16) + request = _create_request( + ('\x81\xff\x00\x00\x00\x00\x00\x01\x00\x00', payload)) + self.assertEqual(payload, msgutil.receive_message(request)) + + def test_receive_length_not_encoded_using_minimal_number_of_bytes(self): + # Log warning on receiving bad payload length field that doesn't use + # minimal number of bytes but continue processing. + + payload = 'a' + # 1 byte can be represented without extended payload length field. + request = _create_request( + ('\x81\xff\x00\x00\x00\x00\x00\x00\x00\x01', payload)) + self.assertEqual(payload, msgutil.receive_message(request)) + + def test_receive_message_unicode(self): + request = _create_request(('\x81\x83', '\xe6\x9c\xac')) + # U+672c is encoded as e6,9c,ac in UTF-8 + self.assertEqual(u'\u672c', msgutil.receive_message(request)) + + def test_receive_message_erroneous_unicode(self): + # \x80 and \x81 are invalid as UTF-8. + request = _create_request(('\x81\x82', '\x80\x81')) + # Invalid characters should raise InvalidUTF8Exception + self.assertRaises(InvalidUTF8Exception, + msgutil.receive_message, + request) + + def test_receive_fragments(self): + request = _create_request( + ('\x01\x85', 'Hello'), + ('\x00\x81', ' '), + ('\x00\x85', 'World'), + ('\x80\x81', '!')) + self.assertEqual('Hello World!', msgutil.receive_message(request)) + + def test_receive_fragments_unicode(self): + # UTF-8 encodes U+6f22 into e6bca2 and U+5b57 into e5ad97. + request = _create_request( + ('\x01\x82', '\xe6\xbc'), + ('\x00\x82', '\xa2\xe5'), + ('\x80\x82', '\xad\x97')) + self.assertEqual(u'\u6f22\u5b57', msgutil.receive_message(request)) + + def test_receive_fragments_immediate_zero_termination(self): + request = _create_request( + ('\x01\x8c', 'Hello World!'), ('\x80\x80', '')) + self.assertEqual('Hello World!', msgutil.receive_message(request)) + + def test_receive_fragments_duplicate_start(self): + request = _create_request( + ('\x01\x85', 'Hello'), ('\x01\x85', 'World')) + self.assertRaises(msgutil.InvalidFrameException, + msgutil.receive_message, + request) + + def test_receive_fragments_intermediate_but_not_started(self): + request = _create_request(('\x00\x85', 'Hello')) + self.assertRaises(msgutil.InvalidFrameException, + msgutil.receive_message, + request) + + def test_receive_fragments_end_but_not_started(self): + request = _create_request(('\x80\x85', 'Hello')) + self.assertRaises(msgutil.InvalidFrameException, + msgutil.receive_message, + request) + + def test_receive_message_discard(self): + request = _create_request( + ('\x8f\x86', 'IGNORE'), ('\x81\x85', 'Hello'), + ('\x8f\x89', 'DISREGARD'), ('\x81\x86', 'World!')) + self.assertRaises(msgutil.UnsupportedFrameException, + msgutil.receive_message, request) + self.assertEqual('Hello', msgutil.receive_message(request)) + self.assertRaises(msgutil.UnsupportedFrameException, + msgutil.receive_message, request) + self.assertEqual('World!', msgutil.receive_message(request)) + + def test_receive_close(self): + request = _create_request( + ('\x88\x8a', struct.pack('!H', 1000) + 'Good bye')) + self.assertEqual(None, msgutil.receive_message(request)) + self.assertEqual(1000, request.ws_close_code) + self.assertEqual('Good bye', request.ws_close_reason) + + def test_send_longest_close(self): + reason = 'a' * 123 + request = _create_request( + ('\x88\xfd', + struct.pack('!H', common.STATUS_NORMAL_CLOSURE) + reason)) + request.ws_stream.close_connection(common.STATUS_NORMAL_CLOSURE, + reason) + self.assertEqual(request.ws_close_code, common.STATUS_NORMAL_CLOSURE) + self.assertEqual(request.ws_close_reason, reason) + + def test_send_close_too_long(self): + request = _create_request() + self.assertRaises(msgutil.BadOperationException, + Stream.close_connection, + request.ws_stream, + common.STATUS_NORMAL_CLOSURE, + 'a' * 124) + + def test_send_close_inconsistent_code_and_reason(self): + request = _create_request() + # reason parameter must not be specified when code is None. + self.assertRaises(msgutil.BadOperationException, + Stream.close_connection, + request.ws_stream, + None, + 'a') + + def test_send_ping(self): + request = _create_request() + msgutil.send_ping(request, 'Hello World!') + self.assertEqual('\x89\x0cHello World!', + request.connection.written_data()) + + def test_send_longest_ping(self): + request = _create_request() + msgutil.send_ping(request, 'a' * 125) + self.assertEqual('\x89\x7d' + 'a' * 125, + request.connection.written_data()) + + def test_send_ping_too_long(self): + request = _create_request() + self.assertRaises(msgutil.BadOperationException, + msgutil.send_ping, + request, + 'a' * 126) + + def test_receive_ping(self): + """Tests receiving a ping control frame.""" + + def handler(request, message): + request.called = True + + # Stream automatically respond to ping with pong without any action + # by application layer. + request = _create_request( + ('\x89\x85', 'Hello'), ('\x81\x85', 'World')) + self.assertEqual('World', msgutil.receive_message(request)) + self.assertEqual('\x8a\x05Hello', + request.connection.written_data()) + + request = _create_request( + ('\x89\x85', 'Hello'), ('\x81\x85', 'World')) + request.on_ping_handler = handler + self.assertEqual('World', msgutil.receive_message(request)) + self.assertTrue(request.called) + + def test_receive_longest_ping(self): + request = _create_request( + ('\x89\xfd', 'a' * 125), ('\x81\x85', 'World')) + self.assertEqual('World', msgutil.receive_message(request)) + self.assertEqual('\x8a\x7d' + 'a' * 125, + request.connection.written_data()) + + def test_receive_ping_too_long(self): + request = _create_request(('\x89\xfe\x00\x7e', 'a' * 126)) + self.assertRaises(msgutil.InvalidFrameException, + msgutil.receive_message, + request) + + def test_receive_pong(self): + """Tests receiving a pong control frame.""" + + def handler(request, message): + request.called = True + + request = _create_request( + ('\x8a\x85', 'Hello'), ('\x81\x85', 'World')) + request.on_pong_handler = handler + msgutil.send_ping(request, 'Hello') + self.assertEqual('\x89\x05Hello', + request.connection.written_data()) + # Valid pong is received, but receive_message won't return for it. + self.assertEqual('World', msgutil.receive_message(request)) + # Check that nothing was written after receive_message call. + self.assertEqual('\x89\x05Hello', + request.connection.written_data()) + + self.assertTrue(request.called) + + def test_receive_unsolicited_pong(self): + # Unsolicited pong is allowed from HyBi 07. + request = _create_request( + ('\x8a\x85', 'Hello'), ('\x81\x85', 'World')) + msgutil.receive_message(request) + + request = _create_request( + ('\x8a\x85', 'Hello'), ('\x81\x85', 'World')) + msgutil.send_ping(request, 'Jumbo') + # Body mismatch. + msgutil.receive_message(request) + + def test_ping_cannot_be_fragmented(self): + request = _create_request(('\x09\x85', 'Hello')) + self.assertRaises(msgutil.InvalidFrameException, + msgutil.receive_message, + request) + + def test_ping_with_too_long_payload(self): + request = _create_request(('\x89\xfe\x01\x00', 'a' * 256)) + self.assertRaises(msgutil.InvalidFrameException, + msgutil.receive_message, + request) + + +class DeflateFrameTest(unittest.TestCase): + """Tests for checking deflate-frame extension.""" + + def test_send_message(self): + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + request = _create_request_from_rawdata( + '', deflate_frame_request=extension) + msgutil.send_message(request, 'Hello') + msgutil.send_message(request, 'World') + + expected = '' + + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_hello = compressed_hello[:-4] + expected += '\xc1%c' % len(compressed_hello) + expected += compressed_hello + + compressed_world = compress.compress('World') + compressed_world += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_world = compressed_world[:-4] + expected += '\xc1%c' % len(compressed_world) + expected += compressed_world + + self.assertEqual(expected, request.connection.written_data()) + + def test_send_message_bfinal(self): + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + request = _create_request_from_rawdata( + '', deflate_frame_request=extension) + self.assertEquals(1, len(request.ws_extension_processors)) + deflate_frame_processor = request.ws_extension_processors[0] + deflate_frame_processor.set_bfinal(True) + msgutil.send_message(request, 'Hello') + msgutil.send_message(request, 'World') + + expected = '' + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_FINISH) + compressed_hello = compressed_hello + chr(0) + expected += '\xc1%c' % len(compressed_hello) + expected += compressed_hello + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + compressed_world = compress.compress('World') + compressed_world += compress.flush(zlib.Z_FINISH) + compressed_world = compressed_world + chr(0) + expected += '\xc1%c' % len(compressed_world) + expected += compressed_world + + self.assertEqual(expected, request.connection.written_data()) + + def test_send_message_comp_bit(self): + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + request = _create_request_from_rawdata( + '', deflate_frame_request=extension) + self.assertEquals(1, len(request.ws_extension_processors)) + deflate_frame_processor = request.ws_extension_processors[0] + msgutil.send_message(request, 'Hello') + deflate_frame_processor.disable_outgoing_compression() + msgutil.send_message(request, 'Hello') + deflate_frame_processor.enable_outgoing_compression() + msgutil.send_message(request, 'Hello') + + expected = '' + + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_hello = compressed_hello[:-4] + expected += '\xc1%c' % len(compressed_hello) + expected += compressed_hello + + expected += '\x81\x05Hello' + + compressed_2nd_hello = compress.compress('Hello') + compressed_2nd_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_2nd_hello = compressed_2nd_hello[:-4] + expected += '\xc1%c' % len(compressed_2nd_hello) + expected += compressed_2nd_hello + + self.assertEqual(expected, request.connection.written_data()) + + def test_send_message_no_context_takeover_parameter(self): + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + extension.add_parameter('no_context_takeover', None) + request = _create_request_from_rawdata( + '', deflate_frame_request=extension) + for i in xrange(3): + msgutil.send_message(request, 'Hello') + + compressed_message = compress.compress('Hello') + compressed_message += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_message = compressed_message[:-4] + expected = '\xc1%c' % len(compressed_message) + expected += compressed_message + + self.assertEqual( + expected + expected + expected, request.connection.written_data()) + + def test_bad_request_parameters(self): + """Tests that if there's anything wrong with deflate-frame extension + request, deflate-frame is rejected. + """ + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + # max_window_bits less than 8 is illegal. + extension.add_parameter('max_window_bits', '7') + processor = DeflateFrameExtensionProcessor(extension) + self.assertEqual(None, processor.get_extension_response()) + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + # max_window_bits greater than 15 is illegal. + extension.add_parameter('max_window_bits', '16') + processor = DeflateFrameExtensionProcessor(extension) + self.assertEqual(None, processor.get_extension_response()) + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + # Non integer max_window_bits is illegal. + extension.add_parameter('max_window_bits', 'foobar') + processor = DeflateFrameExtensionProcessor(extension) + self.assertEqual(None, processor.get_extension_response()) + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + # no_context_takeover must not have any value. + extension.add_parameter('no_context_takeover', 'foobar') + processor = DeflateFrameExtensionProcessor(extension) + self.assertEqual(None, processor.get_extension_response()) + + def test_response_parameters(self): + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + processor = DeflateFrameExtensionProcessor(extension) + processor.set_response_window_bits(8) + response = processor.get_extension_response() + self.assertTrue(response.has_parameter('max_window_bits')) + self.assertEqual('8', response.get_parameter_value('max_window_bits')) + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + processor = DeflateFrameExtensionProcessor(extension) + processor.set_response_no_context_takeover(True) + response = processor.get_extension_response() + self.assertTrue(response.has_parameter('no_context_takeover')) + self.assertTrue( + response.get_parameter_value('no_context_takeover') is None) + + def test_receive_message(self): + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + data = '' + + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_hello = compressed_hello[:-4] + data += '\xc1%c' % (len(compressed_hello) | 0x80) + data += _mask_hybi(compressed_hello) + + compressed_websocket = compress.compress('WebSocket') + compressed_websocket += compress.flush(zlib.Z_FINISH) + compressed_websocket += '\x00' + data += '\xc1%c' % (len(compressed_websocket) | 0x80) + data += _mask_hybi(compressed_websocket) + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + compressed_world = compress.compress('World') + compressed_world += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_world = compressed_world[:-4] + data += '\xc1%c' % (len(compressed_world) | 0x80) + data += _mask_hybi(compressed_world) + + # Close frame + data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye') + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + request = _create_request_from_rawdata( + data, deflate_frame_request=extension) + self.assertEqual('Hello', msgutil.receive_message(request)) + self.assertEqual('WebSocket', msgutil.receive_message(request)) + self.assertEqual('World', msgutil.receive_message(request)) + + self.assertEqual(None, msgutil.receive_message(request)) + + def test_receive_message_client_using_smaller_window(self): + """Test that frames coming from a client which is using smaller window + size that the server are correctly received. + """ + + # Using the smallest window bits of 8 for generating input frames. + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -8) + + data = '' + + # Use a frame whose content is bigger than the clients' DEFLATE window + # size before compression. The content mainly consists of 'a' but + # repetition of 'b' is put at the head and tail so that if the window + # size is big, the head is back-referenced but if small, not. + payload = 'b' * 64 + 'a' * 1024 + 'b' * 64 + compressed_hello = compress.compress(payload) + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_hello = compressed_hello[:-4] + data += '\xc1%c' % (len(compressed_hello) | 0x80) + data += _mask_hybi(compressed_hello) + + # Close frame + data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye') + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + request = _create_request_from_rawdata( + data, deflate_frame_request=extension) + self.assertEqual(payload, msgutil.receive_message(request)) + + self.assertEqual(None, msgutil.receive_message(request)) + + def test_receive_message_comp_bit(self): + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + data = '' + + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_hello = compressed_hello[:-4] + data += '\xc1%c' % (len(compressed_hello) | 0x80) + data += _mask_hybi(compressed_hello) + + data += '\x81\x85' + _mask_hybi('Hello') + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + compressed_2nd_hello = compress.compress('Hello') + compressed_2nd_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_2nd_hello = compressed_2nd_hello[:-4] + data += '\xc1%c' % (len(compressed_2nd_hello) | 0x80) + data += _mask_hybi(compressed_2nd_hello) + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + request = _create_request_from_rawdata( + data, deflate_frame_request=extension) + for i in xrange(3): + self.assertEqual('Hello', msgutil.receive_message(request)) + + def test_receive_message_various_btype(self): + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + data = '' + + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_hello = compressed_hello[:-4] + data += '\xc1%c' % (len(compressed_hello) | 0x80) + data += _mask_hybi(compressed_hello) + + compressed_websocket = compress.compress('WebSocket') + compressed_websocket += compress.flush(zlib.Z_FINISH) + compressed_websocket += '\x00' + data += '\xc1%c' % (len(compressed_websocket) | 0x80) + data += _mask_hybi(compressed_websocket) + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + compressed_world = compress.compress('World') + compressed_world += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_world = compressed_world[:-4] + data += '\xc1%c' % (len(compressed_world) | 0x80) + data += _mask_hybi(compressed_world) + + # Close frame + data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye') + + extension = common.ExtensionParameter(common.DEFLATE_FRAME_EXTENSION) + request = _create_request_from_rawdata( + data, deflate_frame_request=extension) + self.assertEqual('Hello', msgutil.receive_message(request)) + self.assertEqual('WebSocket', msgutil.receive_message(request)) + self.assertEqual('World', msgutil.receive_message(request)) + + self.assertEqual(None, msgutil.receive_message(request)) + + +class PerMessageDeflateTest(unittest.TestCase): + """Tests for permessage-deflate extension.""" + + def test_send_message(self): + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + '', permessage_deflate_request=extension) + msgutil.send_message(request, 'Hello') + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_hello = compressed_hello[:-4] + expected = '\xc1%c' % len(compressed_hello) + expected += compressed_hello + self.assertEqual(expected, request.connection.written_data()) + + def test_send_empty_message(self): + """Test that an empty message is compressed correctly.""" + + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + '', permessage_deflate_request=extension) + + msgutil.send_message(request, '') + + # Payload in binary: 0b00000010 0b00000000 + # From LSB, + # - 1 bit of BFINAL (0) + # - 2 bits of BTYPE (01 that means fixed Huffman) + # - 7 bits of the first code (0000000 that is the code for the + # end-of-block) + # - 1 bit of BFINAL (0) + # - 2 bits of BTYPE (no compression) + # - 3 bits of padding + self.assertEqual('\xc1\x02\x02\x00', + request.connection.written_data()) + + def test_send_message_with_null_character(self): + """Test that a simple payload (one null) is framed correctly.""" + + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + '', permessage_deflate_request=extension) + + msgutil.send_message(request, '\x00') + + # Payload in binary: 0b01100010 0b00000000 0b00000000 + # From LSB, + # - 1 bit of BFINAL (0) + # - 2 bits of BTYPE (01 that means fixed Huffman) + # - 8 bits of the first code (00110000 that is the code for the literal + # alphabet 0x00) + # - 7 bits of the second code (0000000 that is the code for the + # end-of-block) + # - 1 bit of BFINAL (0) + # - 2 bits of BTYPE (no compression) + # - 2 bits of padding + self.assertEqual('\xc1\x03\x62\x00\x00', + request.connection.written_data()) + + def test_send_two_messages(self): + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + '', permessage_deflate_request=extension) + msgutil.send_message(request, 'Hello') + msgutil.send_message(request, 'World') + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + expected = '' + + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_hello = compressed_hello[:-4] + expected += '\xc1%c' % len(compressed_hello) + expected += compressed_hello + + compressed_world = compress.compress('World') + compressed_world += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_world = compressed_world[:-4] + expected += '\xc1%c' % len(compressed_world) + expected += compressed_world + + self.assertEqual(expected, request.connection.written_data()) + + def test_send_message_fragmented(self): + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + '', permessage_deflate_request=extension) + msgutil.send_message(request, 'Hello', end=False) + msgutil.send_message(request, 'Goodbye', end=False) + msgutil.send_message(request, 'World') + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + expected = '\x41%c' % len(compressed_hello) + expected += compressed_hello + compressed_goodbye = compress.compress('Goodbye') + compressed_goodbye += compress.flush(zlib.Z_SYNC_FLUSH) + expected += '\x00%c' % len(compressed_goodbye) + expected += compressed_goodbye + compressed_world = compress.compress('World') + compressed_world += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_world = compressed_world[:-4] + expected += '\x80%c' % len(compressed_world) + expected += compressed_world + self.assertEqual(expected, request.connection.written_data()) + + def test_send_message_fragmented_empty_first_frame(self): + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + '', permessage_deflate_request=extension) + msgutil.send_message(request, '', end=False) + msgutil.send_message(request, 'Hello') + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + compressed_hello = compress.compress('') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + expected = '\x41%c' % len(compressed_hello) + expected += compressed_hello + compressed_empty = compress.compress('Hello') + compressed_empty += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_empty = compressed_empty[:-4] + expected += '\x80%c' % len(compressed_empty) + expected += compressed_empty + print '%r' % expected + self.assertEqual(expected, request.connection.written_data()) + + def test_send_message_fragmented_empty_last_frame(self): + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + '', permessage_deflate_request=extension) + msgutil.send_message(request, 'Hello', end=False) + msgutil.send_message(request, '') + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + expected = '\x41%c' % len(compressed_hello) + expected += compressed_hello + compressed_empty = compress.compress('') + compressed_empty += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_empty = compressed_empty[:-4] + expected += '\x80%c' % len(compressed_empty) + expected += compressed_empty + self.assertEqual(expected, request.connection.written_data()) + + def test_send_message_using_small_window(self): + common_part = 'abcdefghijklmnopqrstuvwxyz' + test_message = common_part + '-' * 30000 + common_part + + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + extension.add_parameter('server_max_window_bits', '8') + request = _create_request_from_rawdata( + '', permessage_deflate_request=extension) + msgutil.send_message(request, test_message) + + expected_websocket_header_size = 2 + expected_websocket_payload_size = 91 + + actual_frame = request.connection.written_data() + self.assertEqual(expected_websocket_header_size + + expected_websocket_payload_size, + len(actual_frame)) + actual_header = actual_frame[0:expected_websocket_header_size] + actual_payload = actual_frame[expected_websocket_header_size:] + + self.assertEqual( + '\xc1%c' % expected_websocket_payload_size, actual_header) + decompress = zlib.decompressobj(-8) + decompressed_message = decompress.decompress( + actual_payload + '\x00\x00\xff\xff') + decompressed_message += decompress.flush() + self.assertEqual(test_message, decompressed_message) + self.assertEqual(0, len(decompress.unused_data)) + self.assertEqual(0, len(decompress.unconsumed_tail)) + + def test_send_message_no_context_takeover_parameter(self): + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + extension.add_parameter('server_no_context_takeover', None) + request = _create_request_from_rawdata( + '', permessage_deflate_request=extension) + for i in xrange(3): + msgutil.send_message(request, 'Hello', end=False) + msgutil.send_message(request, 'Hello', end=True) + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + first_hello = compress.compress('Hello') + first_hello += compress.flush(zlib.Z_SYNC_FLUSH) + expected = '\x41%c' % len(first_hello) + expected += first_hello + second_hello = compress.compress('Hello') + second_hello += compress.flush(zlib.Z_SYNC_FLUSH) + second_hello = second_hello[:-4] + expected += '\x80%c' % len(second_hello) + expected += second_hello + + self.assertEqual( + expected + expected + expected, + request.connection.written_data()) + + def test_send_message_fragmented_bfinal(self): + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + '', permessage_deflate_request=extension) + self.assertEquals(1, len(request.ws_extension_processors)) + request.ws_extension_processors[0].set_bfinal(True) + msgutil.send_message(request, 'Hello', end=False) + msgutil.send_message(request, 'World', end=True) + + expected = '' + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_FINISH) + compressed_hello = compressed_hello + chr(0) + expected += '\x41%c' % len(compressed_hello) + expected += compressed_hello + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + compressed_world = compress.compress('World') + compressed_world += compress.flush(zlib.Z_FINISH) + compressed_world = compressed_world + chr(0) + expected += '\x80%c' % len(compressed_world) + expected += compressed_world + + self.assertEqual(expected, request.connection.written_data()) + + def test_receive_message_deflate(self): + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + compressed_hello = compress.compress('Hello') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_hello = compressed_hello[:-4] + data = '\xc1%c' % (len(compressed_hello) | 0x80) + data += _mask_hybi(compressed_hello) + + # Close frame + data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye') + + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + data, permessage_deflate_request=extension) + self.assertEqual('Hello', msgutil.receive_message(request)) + + self.assertEqual(None, msgutil.receive_message(request)) + + def test_receive_message_random_section(self): + """Test that a compressed message fragmented into lots of chunks is + correctly received. + """ + + random.seed(a=0) + payload = ''.join( + [chr(random.randint(0, 255)) for i in xrange(1000)]) + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + compressed_payload = compress.compress(payload) + compressed_payload += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_payload = compressed_payload[:-4] + + # Fragment the compressed payload into lots of frames. + bytes_chunked = 0 + data = '' + frame_count = 0 + + chunk_sizes = [] + + while bytes_chunked < len(compressed_payload): + # Make sure that + # - the length of chunks are equal or less than 125 so that we can + # use 1 octet length header format for all frames. + # - at least 10 chunks are created. + chunk_size = random.randint( + 1, min(125, + len(compressed_payload) / 10, + len(compressed_payload) - bytes_chunked)) + chunk_sizes.append(chunk_size) + chunk = compressed_payload[ + bytes_chunked:bytes_chunked + chunk_size] + bytes_chunked += chunk_size + + first_octet = 0x00 + if len(data) == 0: + first_octet = first_octet | 0x42 + if bytes_chunked == len(compressed_payload): + first_octet = first_octet | 0x80 + + data += '%c%c' % (first_octet, chunk_size | 0x80) + data += _mask_hybi(chunk) + + frame_count += 1 + + print "Chunk sizes: %r" % chunk_sizes + self.assertTrue(len(chunk_sizes) > 10) + + # Close frame + data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye') + + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + data, permessage_deflate_request=extension) + self.assertEqual(payload, msgutil.receive_message(request)) + + self.assertEqual(None, msgutil.receive_message(request)) + + def test_receive_two_messages(self): + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + data = '' + + compressed_hello = compress.compress('HelloWebSocket') + compressed_hello += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_hello = compressed_hello[:-4] + split_position = len(compressed_hello) / 2 + data += '\x41%c' % (split_position | 0x80) + data += _mask_hybi(compressed_hello[:split_position]) + + data += '\x80%c' % ((len(compressed_hello) - split_position) | 0x80) + data += _mask_hybi(compressed_hello[split_position:]) + + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + + compressed_world = compress.compress('World') + compressed_world += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_world = compressed_world[:-4] + data += '\xc1%c' % (len(compressed_world) | 0x80) + data += _mask_hybi(compressed_world) + + # Close frame + data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye') + + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + data, permessage_deflate_request=extension) + self.assertEqual('HelloWebSocket', msgutil.receive_message(request)) + self.assertEqual('World', msgutil.receive_message(request)) + + self.assertEqual(None, msgutil.receive_message(request)) + + def test_receive_message_mixed_btype(self): + """Test that a message compressed using lots of DEFLATE blocks with + various flush mode is correctly received. + """ + + random.seed(a=0) + payload = ''.join( + [chr(random.randint(0, 255)) for i in xrange(1000)]) + + compress = None + + # Fragment the compressed payload into lots of frames. + bytes_chunked = 0 + compressed_payload = '' + + chunk_sizes = [] + methods = [] + sync_used = False + finish_used = False + + while bytes_chunked < len(payload): + # Make sure at least 10 chunks are created. + chunk_size = random.randint( + 1, min(100, len(payload) - bytes_chunked)) + chunk_sizes.append(chunk_size) + chunk = payload[bytes_chunked:bytes_chunked + chunk_size] + + bytes_chunked += chunk_size + + if compress is None: + compress = zlib.compressobj( + zlib.Z_DEFAULT_COMPRESSION, + zlib.DEFLATED, + -zlib.MAX_WBITS) + + if bytes_chunked == len(payload): + compressed_payload += compress.compress(chunk) + compressed_payload += compress.flush(zlib.Z_SYNC_FLUSH) + compressed_payload = compressed_payload[:-4] + else: + method = random.randint(0, 1) + methods.append(method) + if method == 0: + compressed_payload += compress.compress(chunk) + compressed_payload += compress.flush(zlib.Z_SYNC_FLUSH) + sync_used = True + else: + compressed_payload += compress.compress(chunk) + compressed_payload += compress.flush(zlib.Z_FINISH) + compress = None + finish_used = True + + print "Chunk sizes: %r" % chunk_sizes + self.assertTrue(len(chunk_sizes) > 10) + print "Methods: %r" % methods + self.assertTrue(sync_used) + self.assertTrue(finish_used) + + self.assertTrue(125 < len(compressed_payload)) + self.assertTrue(len(compressed_payload) < 65536) + data = '\xc2\xfe' + struct.pack('!H', len(compressed_payload)) + data += _mask_hybi(compressed_payload) + + # Close frame + data += '\x88\x8a' + _mask_hybi(struct.pack('!H', 1000) + 'Good bye') + + extension = common.ExtensionParameter( + common.PERMESSAGE_DEFLATE_EXTENSION) + request = _create_request_from_rawdata( + data, permessage_deflate_request=extension) + self.assertEqual(payload, msgutil.receive_message(request)) + + self.assertEqual(None, msgutil.receive_message(request)) + + +class PerMessageCompressTest(unittest.TestCase): + """Tests for checking permessage-compression extension.""" + + def test_deflate_response_parameters(self): + extension = common.ExtensionParameter( + common.PERMESSAGE_COMPRESSION_EXTENSION) + extension.add_parameter('method', 'deflate') + processor = PerMessageCompressExtensionProcessor(extension) + response = processor.get_extension_response() + self.assertEqual('deflate', + response.get_parameter_value('method')) + + extension = common.ExtensionParameter( + common.PERMESSAGE_COMPRESSION_EXTENSION) + extension.add_parameter('method', 'deflate') + processor = PerMessageCompressExtensionProcessor(extension) + + def _compression_processor_hook(compression_processor): + compression_processor.set_client_max_window_bits(8) + compression_processor.set_client_no_context_takeover(True) + processor.set_compression_processor_hook( + _compression_processor_hook) + response = processor.get_extension_response() + self.assertEqual( + 'deflate; client_max_window_bits=8; client_no_context_takeover', + response.get_parameter_value('method')) + + +class MessageTestHixie75(unittest.TestCase): + """Tests for draft-hixie-thewebsocketprotocol-76 stream class.""" + + def test_send_message(self): + request = _create_request_hixie75() + msgutil.send_message(request, 'Hello') + self.assertEqual('\x00Hello\xff', request.connection.written_data()) + + def test_send_message_unicode(self): + request = _create_request_hixie75() + msgutil.send_message(request, u'\u65e5') + # U+65e5 is encoded as e6,97,a5 in UTF-8 + self.assertEqual('\x00\xe6\x97\xa5\xff', + request.connection.written_data()) + + def test_receive_message(self): + request = _create_request_hixie75('\x00Hello\xff\x00World!\xff') + self.assertEqual('Hello', msgutil.receive_message(request)) + self.assertEqual('World!', msgutil.receive_message(request)) + + def test_receive_message_unicode(self): + request = _create_request_hixie75('\x00\xe6\x9c\xac\xff') + # U+672c is encoded as e6,9c,ac in UTF-8 + self.assertEqual(u'\u672c', msgutil.receive_message(request)) + + def test_receive_message_erroneous_unicode(self): + # \x80 and \x81 are invalid as UTF-8. + request = _create_request_hixie75('\x00\x80\x81\xff') + # Invalid characters should be replaced with + # U+fffd REPLACEMENT CHARACTER + self.assertEqual(u'\ufffd\ufffd', msgutil.receive_message(request)) + + def test_receive_message_discard(self): + request = _create_request_hixie75('\x80\x06IGNORE\x00Hello\xff' + '\x01DISREGARD\xff\x00World!\xff') + self.assertEqual('Hello', msgutil.receive_message(request)) + self.assertEqual('World!', msgutil.receive_message(request)) + + +class MessageReceiverTest(unittest.TestCase): + """Tests the Stream class using MessageReceiver.""" + + def test_queue(self): + request = _create_blocking_request() + receiver = msgutil.MessageReceiver(request) + + self.assertEqual(None, receiver.receive_nowait()) + + request.connection.put_bytes('\x81\x86' + _mask_hybi('Hello!')) + self.assertEqual('Hello!', receiver.receive()) + + def test_onmessage(self): + onmessage_queue = Queue.Queue() + + def onmessage_handler(message): + onmessage_queue.put(message) + + request = _create_blocking_request() + receiver = msgutil.MessageReceiver(request, onmessage_handler) + + request.connection.put_bytes('\x81\x86' + _mask_hybi('Hello!')) + self.assertEqual('Hello!', onmessage_queue.get()) + + +class MessageReceiverHixie75Test(unittest.TestCase): + """Tests the StreamHixie75 class using MessageReceiver.""" + + def test_queue(self): + request = _create_blocking_request_hixie75() + receiver = msgutil.MessageReceiver(request) + + self.assertEqual(None, receiver.receive_nowait()) + + request.connection.put_bytes('\x00Hello!\xff') + self.assertEqual('Hello!', receiver.receive()) + + def test_onmessage(self): + onmessage_queue = Queue.Queue() + + def onmessage_handler(message): + onmessage_queue.put(message) + + request = _create_blocking_request_hixie75() + receiver = msgutil.MessageReceiver(request, onmessage_handler) + + request.connection.put_bytes('\x00Hello!\xff') + self.assertEqual('Hello!', onmessage_queue.get()) + + +class MessageSenderTest(unittest.TestCase): + """Tests the Stream class using MessageSender.""" + + def test_send(self): + request = _create_blocking_request() + sender = msgutil.MessageSender(request) + + sender.send('World') + self.assertEqual('\x81\x05World', request.connection.written_data()) + + def test_send_nowait(self): + # Use a queue to check the bytes written by MessageSender. + # request.connection.written_data() cannot be used here because + # MessageSender runs in a separate thread. + send_queue = Queue.Queue() + + def write(bytes): + send_queue.put(bytes) + + request = _create_blocking_request() + request.connection.write = write + + sender = msgutil.MessageSender(request) + + sender.send_nowait('Hello') + sender.send_nowait('World') + self.assertEqual('\x81\x05Hello', send_queue.get()) + self.assertEqual('\x81\x05World', send_queue.get()) + + +class MessageSenderHixie75Test(unittest.TestCase): + """Tests the StreamHixie75 class using MessageSender.""" + + def test_send(self): + request = _create_blocking_request_hixie75() + sender = msgutil.MessageSender(request) + + sender.send('World') + self.assertEqual('\x00World\xff', request.connection.written_data()) + + def test_send_nowait(self): + # Use a queue to check the bytes written by MessageSender. + # request.connection.written_data() cannot be used here because + # MessageSender runs in a separate thread. + send_queue = Queue.Queue() + + def write(bytes): + send_queue.put(bytes) + + request = _create_blocking_request_hixie75() + request.connection.write = write + + sender = msgutil.MessageSender(request) + + sender.send_nowait('Hello') + sender.send_nowait('World') + self.assertEqual('\x00Hello\xff', send_queue.get()) + self.assertEqual('\x00World\xff', send_queue.get()) + + +if __name__ == '__main__': + unittest.main() + + +# vi:sts=4 sw=4 et |