diff options
Diffstat (limited to 'python/rsa')
41 files changed, 4674 insertions, 0 deletions
diff --git a/python/rsa/LICENSE b/python/rsa/LICENSE new file mode 100644 index 000000000..da76c9d7f --- /dev/null +++ b/python/rsa/LICENSE @@ -0,0 +1,13 @@ +Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/python/rsa/MANIFEST.in b/python/rsa/MANIFEST.in new file mode 100644 index 000000000..8cf0021b9 --- /dev/null +++ b/python/rsa/MANIFEST.in @@ -0,0 +1,5 @@ +include README +include LICENSE +include *.py +recursive-include rsa *.py +recursive-include tests *.py diff --git a/python/rsa/PKG-INFO b/python/rsa/PKG-INFO new file mode 100644 index 000000000..399ba7b3e --- /dev/null +++ b/python/rsa/PKG-INFO @@ -0,0 +1,18 @@ +Metadata-Version: 1.1 +Name: rsa +Version: 3.1.4 +Summary: Pure-Python RSA implementation +Home-page: http://stuvel.eu/rsa +Author: Sybren A. Stuvel +Author-email: sybren@stuvel.eu +License: ASL 2 +Description: UNKNOWN +Platform: UNKNOWN +Classifier: Development Status :: 5 - Production/Stable +Classifier: Intended Audience :: Developers +Classifier: Intended Audience :: Education +Classifier: Intended Audience :: Information Technology +Classifier: License :: OSI Approved :: Apache Software License +Classifier: Operating System :: OS Independent +Classifier: Programming Language :: Python +Classifier: Topic :: Security :: Cryptography diff --git a/python/rsa/README.rst b/python/rsa/README.rst new file mode 100644 index 000000000..9f348636d --- /dev/null +++ b/python/rsa/README.rst @@ -0,0 +1,31 @@ +Pure Python RSA implementation +============================== + +`Python-RSA`_ is a pure-Python RSA implementation. It supports +encryption and decryption, signing and verifying signatures, and key +generation according to PKCS#1 version 1.5. It can be used as a Python +library as well as on the commandline. The code was mostly written by +Sybren A. Stüvel. + +Documentation can be found at the Python-RSA homepage: +http://stuvel.eu/rsa + +Download and install using:: + + pip install rsa + +or:: + + easy_install rsa + +or download it from the `Python Package Index`_. + +The source code is maintained in a `Mercurial repository`_ and is +licensed under the `Apache License, version 2.0`_ + + +.. _`Python-RSA`: http://stuvel.eu/rsa +.. _`Mercurial repository`: https://bitbucket.org/sybren/python-rsa +.. _`Python Package Index`: http://pypi.python.org/pypi/rsa +.. _`Apache License, version 2.0`: http://www.apache.org/licenses/LICENSE-2.0 + diff --git a/python/rsa/create_timing_table.py b/python/rsa/create_timing_table.py new file mode 100755 index 000000000..b1b2871b3 --- /dev/null +++ b/python/rsa/create_timing_table.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python + +import time +import rsa + +poolsize = 8 +accurate = True + +def run_speed_test(bitsize): + + iterations = 0 + start = end = time.time() + + # At least a number of iterations, and at least 2 seconds + while iterations < 10 or end - start < 2: + iterations += 1 + rsa.newkeys(bitsize, accurate=accurate, poolsize=poolsize) + end = time.time() + + duration = end - start + dur_per_call = duration / iterations + + print '%5i bit: %9.3f sec. (%i iterations over %.1f seconds)' % (bitsize, + dur_per_call, iterations, duration) + +for bitsize in (128, 256, 384, 512, 1024, 2048, 3072, 4096): + run_speed_test(bitsize) + + diff --git a/python/rsa/playstuff.py b/python/rsa/playstuff.py new file mode 100755 index 000000000..bfb941b88 --- /dev/null +++ b/python/rsa/playstuff.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python + +import re +import rsa + +def _logon( username, password ): + # Retrive the public key + # network stuff # req = urllib2.Request(AAA_GET_KEY, headers={'User-Agent': CLIENT_ID}) + # network stuff # response = urllib2.urlopen(req) + # network stuff # html = response.read() + # network stuff # print response.info() # DEBUG + # network stuff # print html # DEBUG + + # replacement for network stuff # + html="<x509PublicKey>30820122300d06092a864886f70d01010105000382010f003082010a0282010100dad8e3c084137bab285e869ae99a5de9752a095753680e9128adbe981e8141225704e558b8ee437836ec8c5460514efae61550bfdd883549981458bae388c9490b5ab43475068b169b32da446b0aae2dfbb3a5f425c74b284ced3f57ed33b30ec7b4b95a8216f8b063e34af2c84fef58bab381f3b79b80d06b687e0b5fc7aaeb311a88389ab7aa1422ae0b58956bb9e91c5cbf2b98422b05e1eacb82e29938566f6f05274294a8c596677c950ce97dcd003709d008f1ae6418ce5bf55ad2bf921318c6e31b324bdda4b4f12ff1fd86b5b71e647d1fc175aea137ba0ff869d5fbcf9ed0289fe7da3619c1204fc42d616462ac1b6a4e6ca2655d44bce039db519d0203010001</x509PublicKey>" + # end replacement for network stuff # + + # This shall pick the key + hexstring = re.compile('<x509PublicKey[^>]*>([0-9a-fA-F]+)</x509PublicKey>') + + # pick the key and convert it to der format + hex_pub_der = hexstring.search(html).group(1) + pub_der = hex_pub_der.decode('hex') + + # Convert it to a public key + pub_key = rsa.PublicKey.load_pkcs1_openssl_der(pub_der) + + # encode the password + enc_pass = rsa.encrypt(password, pub_key) + + # and hex-encode it + hex_pass = enc_pass.encode('hex') + +# _logon('me', 'MyPass') + +import timeit +timeit.timeit('_logon( "me", "MyPass" )', + setup='from __main__ import _logon', + number=1000) + + diff --git a/python/rsa/rsa/__init__.py b/python/rsa/rsa/__init__.py new file mode 100644 index 000000000..2d01c12e0 --- /dev/null +++ b/python/rsa/rsa/__init__.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""RSA module + +Module for calculating large primes, and RSA encryption, decryption, signing +and verification. Includes generating public and private keys. + +WARNING: this implementation does not use random padding, compression of the +cleartext input to prevent repetitions, or other common security improvements. +Use with care. + +If you want to have a more secure implementation, use the functions from the +``rsa.pkcs1`` module. + +""" + +__author__ = "Sybren Stuvel, Barry Mead and Yesudeep Mangalapilly" +__date__ = "2014-02-22" +__version__ = '3.1.4' + +from rsa.key import newkeys, PrivateKey, PublicKey +from rsa.pkcs1 import encrypt, decrypt, sign, verify, DecryptionError, \ + VerificationError + +# Do doctest if we're run directly +if __name__ == "__main__": + import doctest + doctest.testmod() + +__all__ = ["newkeys", "encrypt", "decrypt", "sign", "verify", 'PublicKey', + 'PrivateKey', 'DecryptionError', 'VerificationError'] + diff --git a/python/rsa/rsa/_compat.py b/python/rsa/rsa/_compat.py new file mode 100644 index 000000000..3c4eb81b1 --- /dev/null +++ b/python/rsa/rsa/_compat.py @@ -0,0 +1,160 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Python compatibility wrappers.""" + + +from __future__ import absolute_import + +import sys +from struct import pack + +try: + MAX_INT = sys.maxsize +except AttributeError: + MAX_INT = sys.maxint + +MAX_INT64 = (1 << 63) - 1 +MAX_INT32 = (1 << 31) - 1 +MAX_INT16 = (1 << 15) - 1 + +# Determine the word size of the processor. +if MAX_INT == MAX_INT64: + # 64-bit processor. + MACHINE_WORD_SIZE = 64 +elif MAX_INT == MAX_INT32: + # 32-bit processor. + MACHINE_WORD_SIZE = 32 +else: + # Else we just assume 64-bit processor keeping up with modern times. + MACHINE_WORD_SIZE = 64 + + +try: + # < Python3 + unicode_type = unicode + have_python3 = False +except NameError: + # Python3. + unicode_type = str + have_python3 = True + +# Fake byte literals. +if str is unicode_type: + def byte_literal(s): + return s.encode('latin1') +else: + def byte_literal(s): + return s + +# ``long`` is no more. Do type detection using this instead. +try: + integer_types = (int, long) +except NameError: + integer_types = (int,) + +b = byte_literal + +try: + # Python 2.6 or higher. + bytes_type = bytes +except NameError: + # Python 2.5 + bytes_type = str + + +# To avoid calling b() multiple times in tight loops. +ZERO_BYTE = b('\x00') +EMPTY_BYTE = b('') + + +def is_bytes(obj): + """ + Determines whether the given value is a byte string. + + :param obj: + The value to test. + :returns: + ``True`` if ``value`` is a byte string; ``False`` otherwise. + """ + return isinstance(obj, bytes_type) + + +def is_integer(obj): + """ + Determines whether the given value is an integer. + + :param obj: + The value to test. + :returns: + ``True`` if ``value`` is an integer; ``False`` otherwise. + """ + return isinstance(obj, integer_types) + + +def byte(num): + """ + Converts a number between 0 and 255 (both inclusive) to a base-256 (byte) + representation. + + Use it as a replacement for ``chr`` where you are expecting a byte + because this will work on all current versions of Python:: + + :param num: + An unsigned integer between 0 and 255 (both inclusive). + :returns: + A single byte. + """ + return pack("B", num) + + +def get_word_alignment(num, force_arch=64, + _machine_word_size=MACHINE_WORD_SIZE): + """ + Returns alignment details for the given number based on the platform + Python is running on. + + :param num: + Unsigned integral number. + :param force_arch: + If you don't want to use 64-bit unsigned chunks, set this to + anything other than 64. 32-bit chunks will be preferred then. + Default 64 will be used when on a 64-bit machine. + :param _machine_word_size: + (Internal) The machine word size used for alignment. + :returns: + 4-tuple:: + + (word_bits, word_bytes, + max_uint, packing_format_type) + """ + max_uint64 = 0xffffffffffffffff + max_uint32 = 0xffffffff + max_uint16 = 0xffff + max_uint8 = 0xff + + if force_arch == 64 and _machine_word_size >= 64 and num > max_uint32: + # 64-bit unsigned integer. + return 64, 8, max_uint64, "Q" + elif num > max_uint16: + # 32-bit unsigned integer + return 32, 4, max_uint32, "L" + elif num > max_uint8: + # 16-bit unsigned integer. + return 16, 2, max_uint16, "H" + else: + # 8-bit unsigned integer. + return 8, 1, max_uint8, "B" diff --git a/python/rsa/rsa/_version133.py b/python/rsa/rsa/_version133.py new file mode 100644 index 000000000..230a03c84 --- /dev/null +++ b/python/rsa/rsa/_version133.py @@ -0,0 +1,442 @@ +"""RSA module +pri = k[1] //Private part of keys d,p,q + +Module for calculating large primes, and RSA encryption, decryption, +signing and verification. Includes generating public and private keys. + +WARNING: this code implements the mathematics of RSA. It is not suitable for +real-world secure cryptography purposes. It has not been reviewed by a security +expert. It does not include padding of data. There are many ways in which the +output of this module, when used without any modification, can be sucessfully +attacked. +""" + +__author__ = "Sybren Stuvel, Marloes de Boer and Ivo Tamboer" +__date__ = "2010-02-05" +__version__ = '1.3.3' + +# NOTE: Python's modulo can return negative numbers. We compensate for +# this behaviour using the abs() function + +from cPickle import dumps, loads +import base64 +import math +import os +import random +import sys +import types +import zlib + +from rsa._compat import byte + +# Display a warning that this insecure version is imported. +import warnings +warnings.warn('Insecure version of the RSA module is imported as %s, be careful' + % __name__) + +def gcd(p, q): + """Returns the greatest common divisor of p and q + + + >>> gcd(42, 6) + 6 + """ + if p<q: return gcd(q, p) + if q == 0: return p + return gcd(q, abs(p%q)) + +def bytes2int(bytes): + """Converts a list of bytes or a string to an integer + + >>> (128*256 + 64)*256 + + 15 + 8405007 + >>> l = [128, 64, 15] + >>> bytes2int(l) + 8405007 + """ + + if not (type(bytes) is types.ListType or type(bytes) is types.StringType): + raise TypeError("You must pass a string or a list") + + # Convert byte stream to integer + integer = 0 + for byte in bytes: + integer *= 256 + if type(byte) is types.StringType: byte = ord(byte) + integer += byte + + return integer + +def int2bytes(number): + """Converts a number to a string of bytes + + >>> bytes2int(int2bytes(123456789)) + 123456789 + """ + + if not (type(number) is types.LongType or type(number) is types.IntType): + raise TypeError("You must pass a long or an int") + + string = "" + + while number > 0: + string = "%s%s" % (byte(number & 0xFF), string) + number /= 256 + + return string + +def fast_exponentiation(a, p, n): + """Calculates r = a^p mod n + """ + result = a % n + remainders = [] + while p != 1: + remainders.append(p & 1) + p = p >> 1 + while remainders: + rem = remainders.pop() + result = ((a ** rem) * result ** 2) % n + return result + +def read_random_int(nbits): + """Reads a random integer of approximately nbits bits rounded up + to whole bytes""" + + nbytes = ceil(nbits/8.) + randomdata = os.urandom(nbytes) + return bytes2int(randomdata) + +def ceil(x): + """ceil(x) -> int(math.ceil(x))""" + + return int(math.ceil(x)) + +def randint(minvalue, maxvalue): + """Returns a random integer x with minvalue <= x <= maxvalue""" + + # Safety - get a lot of random data even if the range is fairly + # small + min_nbits = 32 + + # The range of the random numbers we need to generate + range = maxvalue - minvalue + + # Which is this number of bytes + rangebytes = ceil(math.log(range, 2) / 8.) + + # Convert to bits, but make sure it's always at least min_nbits*2 + rangebits = max(rangebytes * 8, min_nbits * 2) + + # Take a random number of bits between min_nbits and rangebits + nbits = random.randint(min_nbits, rangebits) + + return (read_random_int(nbits) % range) + minvalue + +def fermat_little_theorem(p): + """Returns 1 if p may be prime, and something else if p definitely + is not prime""" + + a = randint(1, p-1) + return fast_exponentiation(a, p-1, p) + +def jacobi(a, b): + """Calculates the value of the Jacobi symbol (a/b) + """ + + if a % b == 0: + return 0 + result = 1 + while a > 1: + if a & 1: + if ((a-1)*(b-1) >> 2) & 1: + result = -result + b, a = a, b % a + else: + if ((b ** 2 - 1) >> 3) & 1: + result = -result + a = a >> 1 + return result + +def jacobi_witness(x, n): + """Returns False if n is an Euler pseudo-prime with base x, and + True otherwise. + """ + + j = jacobi(x, n) % n + f = fast_exponentiation(x, (n-1)/2, n) + + if j == f: return False + return True + +def randomized_primality_testing(n, k): + """Calculates whether n is composite (which is always correct) or + prime (which is incorrect with error probability 2**-k) + + Returns False if the number if composite, and True if it's + probably prime. + """ + + q = 0.5 # Property of the jacobi_witness function + + # t = int(math.ceil(k / math.log(1/q, 2))) + t = ceil(k / math.log(1/q, 2)) + for i in range(t+1): + x = randint(1, n-1) + if jacobi_witness(x, n): return False + + return True + +def is_prime(number): + """Returns True if the number is prime, and False otherwise. + + >>> is_prime(42) + 0 + >>> is_prime(41) + 1 + """ + + """ + if not fermat_little_theorem(number) == 1: + # Not prime, according to Fermat's little theorem + return False + """ + + if randomized_primality_testing(number, 5): + # Prime, according to Jacobi + return True + + # Not prime + return False + + +def getprime(nbits): + """Returns a prime number of max. 'math.ceil(nbits/8)*8' bits. In + other words: nbits is rounded up to whole bytes. + + >>> p = getprime(8) + >>> is_prime(p-1) + 0 + >>> is_prime(p) + 1 + >>> is_prime(p+1) + 0 + """ + + nbytes = int(math.ceil(nbits/8.)) + + while True: + integer = read_random_int(nbits) + + # Make sure it's odd + integer |= 1 + + # Test for primeness + if is_prime(integer): break + + # Retry if not prime + + return integer + +def are_relatively_prime(a, b): + """Returns True if a and b are relatively prime, and False if they + are not. + + >>> are_relatively_prime(2, 3) + 1 + >>> are_relatively_prime(2, 4) + 0 + """ + + d = gcd(a, b) + return (d == 1) + +def find_p_q(nbits): + """Returns a tuple of two different primes of nbits bits""" + + p = getprime(nbits) + while True: + q = getprime(nbits) + if not q == p: break + + return (p, q) + +def extended_euclid_gcd(a, b): + """Returns a tuple (d, i, j) such that d = gcd(a, b) = ia + jb + """ + + if b == 0: + return (a, 1, 0) + + q = abs(a % b) + r = long(a / b) + (d, k, l) = extended_euclid_gcd(b, q) + + return (d, l, k - l*r) + +# Main function: calculate encryption and decryption keys +def calculate_keys(p, q, nbits): + """Calculates an encryption and a decryption key for p and q, and + returns them as a tuple (e, d)""" + + n = p * q + phi_n = (p-1) * (q-1) + + while True: + # Make sure e has enough bits so we ensure "wrapping" through + # modulo n + e = getprime(max(8, nbits/2)) + if are_relatively_prime(e, n) and are_relatively_prime(e, phi_n): break + + (d, i, j) = extended_euclid_gcd(e, phi_n) + + if not d == 1: + raise Exception("e (%d) and phi_n (%d) are not relatively prime" % (e, phi_n)) + + if not (e * i) % phi_n == 1: + raise Exception("e (%d) and i (%d) are not mult. inv. modulo phi_n (%d)" % (e, i, phi_n)) + + return (e, i) + + +def gen_keys(nbits): + """Generate RSA keys of nbits bits. Returns (p, q, e, d). + + Note: this can take a long time, depending on the key size. + """ + + while True: + (p, q) = find_p_q(nbits) + (e, d) = calculate_keys(p, q, nbits) + + # For some reason, d is sometimes negative. We don't know how + # to fix it (yet), so we keep trying until everything is shiny + if d > 0: break + + return (p, q, e, d) + +def gen_pubpriv_keys(nbits): + """Generates public and private keys, and returns them as (pub, + priv). + + The public key consists of a dict {e: ..., , n: ....). The private + key consists of a dict {d: ...., p: ...., q: ....). + """ + + (p, q, e, d) = gen_keys(nbits) + + return ( {'e': e, 'n': p*q}, {'d': d, 'p': p, 'q': q} ) + +def encrypt_int(message, ekey, n): + """Encrypts a message using encryption key 'ekey', working modulo + n""" + + if type(message) is types.IntType: + return encrypt_int(long(message), ekey, n) + + if not type(message) is types.LongType: + raise TypeError("You must pass a long or an int") + + if message > 0 and \ + math.floor(math.log(message, 2)) > math.floor(math.log(n, 2)): + raise OverflowError("The message is too long") + + return fast_exponentiation(message, ekey, n) + +def decrypt_int(cyphertext, dkey, n): + """Decrypts a cypher text using the decryption key 'dkey', working + modulo n""" + + return encrypt_int(cyphertext, dkey, n) + +def sign_int(message, dkey, n): + """Signs 'message' using key 'dkey', working modulo n""" + + return decrypt_int(message, dkey, n) + +def verify_int(signed, ekey, n): + """verifies 'signed' using key 'ekey', working modulo n""" + + return encrypt_int(signed, ekey, n) + +def picklechops(chops): + """Pickles and base64encodes it's argument chops""" + + value = zlib.compress(dumps(chops)) + encoded = base64.encodestring(value) + return encoded.strip() + +def unpicklechops(string): + """base64decodes and unpickes it's argument string into chops""" + + return loads(zlib.decompress(base64.decodestring(string))) + +def chopstring(message, key, n, funcref): + """Splits 'message' into chops that are at most as long as n, + converts these into integers, and calls funcref(integer, key, n) + for each chop. + + Used by 'encrypt' and 'sign'. + """ + + msglen = len(message) + mbits = msglen * 8 + nbits = int(math.floor(math.log(n, 2))) + nbytes = nbits / 8 + blocks = msglen / nbytes + + if msglen % nbytes > 0: + blocks += 1 + + cypher = [] + + for bindex in range(blocks): + offset = bindex * nbytes + block = message[offset:offset+nbytes] + value = bytes2int(block) + cypher.append(funcref(value, key, n)) + + return picklechops(cypher) + +def gluechops(chops, key, n, funcref): + """Glues chops back together into a string. calls + funcref(integer, key, n) for each chop. + + Used by 'decrypt' and 'verify'. + """ + message = "" + + chops = unpicklechops(chops) + + for cpart in chops: + mpart = funcref(cpart, key, n) + message += int2bytes(mpart) + + return message + +def encrypt(message, key): + """Encrypts a string 'message' with the public key 'key'""" + + return chopstring(message, key['e'], key['n'], encrypt_int) + +def sign(message, key): + """Signs a string 'message' with the private key 'key'""" + + return chopstring(message, key['d'], key['p']*key['q'], decrypt_int) + +def decrypt(cypher, key): + """Decrypts a cypher with the private key 'key'""" + + return gluechops(cypher, key['d'], key['p']*key['q'], decrypt_int) + +def verify(cypher, key): + """Verifies a cypher with the public key 'key'""" + + return gluechops(cypher, key['e'], key['n'], encrypt_int) + +# Do doctest if we're not imported +if __name__ == "__main__": + import doctest + doctest.testmod() + +__all__ = ["gen_pubpriv_keys", "encrypt", "decrypt", "sign", "verify"] + diff --git a/python/rsa/rsa/_version200.py b/python/rsa/rsa/_version200.py new file mode 100644 index 000000000..f91565385 --- /dev/null +++ b/python/rsa/rsa/_version200.py @@ -0,0 +1,529 @@ +"""RSA module + +Module for calculating large primes, and RSA encryption, decryption, +signing and verification. Includes generating public and private keys. + +WARNING: this implementation does not use random padding, compression of the +cleartext input to prevent repetitions, or other common security improvements. +Use with care. + +""" + +__author__ = "Sybren Stuvel, Marloes de Boer, Ivo Tamboer, and Barry Mead" +__date__ = "2010-02-08" +__version__ = '2.0' + +import math +import os +import random +import sys +import types +from rsa._compat import byte + +# Display a warning that this insecure version is imported. +import warnings +warnings.warn('Insecure version of the RSA module is imported as %s' % __name__) + + +def bit_size(number): + """Returns the number of bits required to hold a specific long number""" + + return int(math.ceil(math.log(number,2))) + +def gcd(p, q): + """Returns the greatest common divisor of p and q + >>> gcd(48, 180) + 12 + """ + # Iterateive Version is faster and uses much less stack space + while q != 0: + if p < q: (p,q) = (q,p) + (p,q) = (q, p % q) + return p + + +def bytes2int(bytes): + """Converts a list of bytes or a string to an integer + + >>> (((128 * 256) + 64) * 256) + 15 + 8405007 + >>> l = [128, 64, 15] + >>> bytes2int(l) #same as bytes2int('\x80@\x0f') + 8405007 + """ + + if not (type(bytes) is types.ListType or type(bytes) is types.StringType): + raise TypeError("You must pass a string or a list") + + # Convert byte stream to integer + integer = 0 + for byte in bytes: + integer *= 256 + if type(byte) is types.StringType: byte = ord(byte) + integer += byte + + return integer + +def int2bytes(number): + """ + Converts a number to a string of bytes + """ + + if not (type(number) is types.LongType or type(number) is types.IntType): + raise TypeError("You must pass a long or an int") + + string = "" + + while number > 0: + string = "%s%s" % (byte(number & 0xFF), string) + number /= 256 + + return string + +def to64(number): + """Converts a number in the range of 0 to 63 into base 64 digit + character in the range of '0'-'9', 'A'-'Z', 'a'-'z','-','_'. + + >>> to64(10) + 'A' + """ + + if not (type(number) is types.LongType or type(number) is types.IntType): + raise TypeError("You must pass a long or an int") + + if 0 <= number <= 9: #00-09 translates to '0' - '9' + return byte(number + 48) + + if 10 <= number <= 35: + return byte(number + 55) #10-35 translates to 'A' - 'Z' + + if 36 <= number <= 61: + return byte(number + 61) #36-61 translates to 'a' - 'z' + + if number == 62: # 62 translates to '-' (minus) + return byte(45) + + if number == 63: # 63 translates to '_' (underscore) + return byte(95) + + raise ValueError('Invalid Base64 value: %i' % number) + + +def from64(number): + """Converts an ordinal character value in the range of + 0-9,A-Z,a-z,-,_ to a number in the range of 0-63. + + >>> from64(49) + 1 + """ + + if not (type(number) is types.LongType or type(number) is types.IntType): + raise TypeError("You must pass a long or an int") + + if 48 <= number <= 57: #ord('0') - ord('9') translates to 0-9 + return(number - 48) + + if 65 <= number <= 90: #ord('A') - ord('Z') translates to 10-35 + return(number - 55) + + if 97 <= number <= 122: #ord('a') - ord('z') translates to 36-61 + return(number - 61) + + if number == 45: #ord('-') translates to 62 + return(62) + + if number == 95: #ord('_') translates to 63 + return(63) + + raise ValueError('Invalid Base64 value: %i' % number) + + +def int2str64(number): + """Converts a number to a string of base64 encoded characters in + the range of '0'-'9','A'-'Z,'a'-'z','-','_'. + + >>> int2str64(123456789) + '7MyqL' + """ + + if not (type(number) is types.LongType or type(number) is types.IntType): + raise TypeError("You must pass a long or an int") + + string = "" + + while number > 0: + string = "%s%s" % (to64(number & 0x3F), string) + number /= 64 + + return string + + +def str642int(string): + """Converts a base64 encoded string into an integer. + The chars of this string in in the range '0'-'9','A'-'Z','a'-'z','-','_' + + >>> str642int('7MyqL') + 123456789 + """ + + if not (type(string) is types.ListType or type(string) is types.StringType): + raise TypeError("You must pass a string or a list") + + integer = 0 + for byte in string: + integer *= 64 + if type(byte) is types.StringType: byte = ord(byte) + integer += from64(byte) + + return integer + +def read_random_int(nbits): + """Reads a random integer of approximately nbits bits rounded up + to whole bytes""" + + nbytes = int(math.ceil(nbits/8.)) + randomdata = os.urandom(nbytes) + return bytes2int(randomdata) + +def randint(minvalue, maxvalue): + """Returns a random integer x with minvalue <= x <= maxvalue""" + + # Safety - get a lot of random data even if the range is fairly + # small + min_nbits = 32 + + # The range of the random numbers we need to generate + range = (maxvalue - minvalue) + 1 + + # Which is this number of bytes + rangebytes = ((bit_size(range) + 7) / 8) + + # Convert to bits, but make sure it's always at least min_nbits*2 + rangebits = max(rangebytes * 8, min_nbits * 2) + + # Take a random number of bits between min_nbits and rangebits + nbits = random.randint(min_nbits, rangebits) + + return (read_random_int(nbits) % range) + minvalue + +def jacobi(a, b): + """Calculates the value of the Jacobi symbol (a/b) + where both a and b are positive integers, and b is odd + """ + + if a == 0: return 0 + result = 1 + while a > 1: + if a & 1: + if ((a-1)*(b-1) >> 2) & 1: + result = -result + a, b = b % a, a + else: + if (((b * b) - 1) >> 3) & 1: + result = -result + a >>= 1 + if a == 0: return 0 + return result + +def jacobi_witness(x, n): + """Returns False if n is an Euler pseudo-prime with base x, and + True otherwise. + """ + + j = jacobi(x, n) % n + f = pow(x, (n-1)/2, n) + + if j == f: return False + return True + +def randomized_primality_testing(n, k): + """Calculates whether n is composite (which is always correct) or + prime (which is incorrect with error probability 2**-k) + + Returns False if the number is composite, and True if it's + probably prime. + """ + + # 50% of Jacobi-witnesses can report compositness of non-prime numbers + + for i in range(k): + x = randint(1, n-1) + if jacobi_witness(x, n): return False + + return True + +def is_prime(number): + """Returns True if the number is prime, and False otherwise. + + >>> is_prime(42) + 0 + >>> is_prime(41) + 1 + """ + + if randomized_primality_testing(number, 6): + # Prime, according to Jacobi + return True + + # Not prime + return False + + +def getprime(nbits): + """Returns a prime number of max. 'math.ceil(nbits/8)*8' bits. In + other words: nbits is rounded up to whole bytes. + + >>> p = getprime(8) + >>> is_prime(p-1) + 0 + >>> is_prime(p) + 1 + >>> is_prime(p+1) + 0 + """ + + while True: + integer = read_random_int(nbits) + + # Make sure it's odd + integer |= 1 + + # Test for primeness + if is_prime(integer): break + + # Retry if not prime + + return integer + +def are_relatively_prime(a, b): + """Returns True if a and b are relatively prime, and False if they + are not. + + >>> are_relatively_prime(2, 3) + 1 + >>> are_relatively_prime(2, 4) + 0 + """ + + d = gcd(a, b) + return (d == 1) + +def find_p_q(nbits): + """Returns a tuple of two different primes of nbits bits""" + pbits = nbits + (nbits/16) #Make sure that p and q aren't too close + qbits = nbits - (nbits/16) #or the factoring programs can factor n + p = getprime(pbits) + while True: + q = getprime(qbits) + #Make sure p and q are different. + if not q == p: break + return (p, q) + +def extended_gcd(a, b): + """Returns a tuple (r, i, j) such that r = gcd(a, b) = ia + jb + """ + # r = gcd(a,b) i = multiplicitive inverse of a mod b + # or j = multiplicitive inverse of b mod a + # Neg return values for i or j are made positive mod b or a respectively + # Iterateive Version is faster and uses much less stack space + x = 0 + y = 1 + lx = 1 + ly = 0 + oa = a #Remember original a/b to remove + ob = b #negative values from return results + while b != 0: + q = long(a/b) + (a, b) = (b, a % b) + (x, lx) = ((lx - (q * x)),x) + (y, ly) = ((ly - (q * y)),y) + if (lx < 0): lx += ob #If neg wrap modulo orignal b + if (ly < 0): ly += oa #If neg wrap modulo orignal a + return (a, lx, ly) #Return only positive values + +# Main function: calculate encryption and decryption keys +def calculate_keys(p, q, nbits): + """Calculates an encryption and a decryption key for p and q, and + returns them as a tuple (e, d)""" + + n = p * q + phi_n = (p-1) * (q-1) + + while True: + # Make sure e has enough bits so we ensure "wrapping" through + # modulo n + e = max(65537,getprime(nbits/4)) + if are_relatively_prime(e, n) and are_relatively_prime(e, phi_n): break + + (d, i, j) = extended_gcd(e, phi_n) + + if not d == 1: + raise Exception("e (%d) and phi_n (%d) are not relatively prime" % (e, phi_n)) + if (i < 0): + raise Exception("New extended_gcd shouldn't return negative values") + if not (e * i) % phi_n == 1: + raise Exception("e (%d) and i (%d) are not mult. inv. modulo phi_n (%d)" % (e, i, phi_n)) + + return (e, i) + + +def gen_keys(nbits): + """Generate RSA keys of nbits bits. Returns (p, q, e, d). + + Note: this can take a long time, depending on the key size. + """ + + (p, q) = find_p_q(nbits) + (e, d) = calculate_keys(p, q, nbits) + + return (p, q, e, d) + +def newkeys(nbits): + """Generates public and private keys, and returns them as (pub, + priv). + + The public key consists of a dict {e: ..., , n: ....). The private + key consists of a dict {d: ...., p: ...., q: ....). + """ + nbits = max(9,nbits) # Don't let nbits go below 9 bits + (p, q, e, d) = gen_keys(nbits) + + return ( {'e': e, 'n': p*q}, {'d': d, 'p': p, 'q': q} ) + +def encrypt_int(message, ekey, n): + """Encrypts a message using encryption key 'ekey', working modulo n""" + + if type(message) is types.IntType: + message = long(message) + + if not type(message) is types.LongType: + raise TypeError("You must pass a long or int") + + if message < 0 or message > n: + raise OverflowError("The message is too long") + + #Note: Bit exponents start at zero (bit counts start at 1) this is correct + safebit = bit_size(n) - 2 #compute safe bit (MSB - 1) + message += (1 << safebit) #add safebit to ensure folding + + return pow(message, ekey, n) + +def decrypt_int(cyphertext, dkey, n): + """Decrypts a cypher text using the decryption key 'dkey', working + modulo n""" + + message = pow(cyphertext, dkey, n) + + safebit = bit_size(n) - 2 #compute safe bit (MSB - 1) + message -= (1 << safebit) #remove safebit before decode + + return message + +def encode64chops(chops): + """base64encodes chops and combines them into a ',' delimited string""" + + chips = [] #chips are character chops + + for value in chops: + chips.append(int2str64(value)) + + #delimit chops with comma + encoded = ','.join(chips) + + return encoded + +def decode64chops(string): + """base64decodes and makes a ',' delimited string into chops""" + + chips = string.split(',') #split chops at commas + + chops = [] + + for string in chips: #make char chops (chips) into chops + chops.append(str642int(string)) + + return chops + +def chopstring(message, key, n, funcref): + """Chops the 'message' into integers that fit into n, + leaving room for a safebit to be added to ensure that all + messages fold during exponentiation. The MSB of the number n + is not independant modulo n (setting it could cause overflow), so + use the next lower bit for the safebit. Therefore reserve 2-bits + in the number n for non-data bits. Calls specified encryption + function for each chop. + + Used by 'encrypt' and 'sign'. + """ + + msglen = len(message) + mbits = msglen * 8 + #Set aside 2-bits so setting of safebit won't overflow modulo n. + nbits = bit_size(n) - 2 # leave room for safebit + nbytes = nbits / 8 + blocks = msglen / nbytes + + if msglen % nbytes > 0: + blocks += 1 + + cypher = [] + + for bindex in range(blocks): + offset = bindex * nbytes + block = message[offset:offset+nbytes] + value = bytes2int(block) + cypher.append(funcref(value, key, n)) + + return encode64chops(cypher) #Encode encrypted ints to base64 strings + +def gluechops(string, key, n, funcref): + """Glues chops back together into a string. calls + funcref(integer, key, n) for each chop. + + Used by 'decrypt' and 'verify'. + """ + message = "" + + chops = decode64chops(string) #Decode base64 strings into integer chops + + for cpart in chops: + mpart = funcref(cpart, key, n) #Decrypt each chop + message += int2bytes(mpart) #Combine decrypted strings into a msg + + return message + +def encrypt(message, key): + """Encrypts a string 'message' with the public key 'key'""" + if 'n' not in key: + raise Exception("You must use the public key with encrypt") + + return chopstring(message, key['e'], key['n'], encrypt_int) + +def sign(message, key): + """Signs a string 'message' with the private key 'key'""" + if 'p' not in key: + raise Exception("You must use the private key with sign") + + return chopstring(message, key['d'], key['p']*key['q'], encrypt_int) + +def decrypt(cypher, key): + """Decrypts a string 'cypher' with the private key 'key'""" + if 'p' not in key: + raise Exception("You must use the private key with decrypt") + + return gluechops(cypher, key['d'], key['p']*key['q'], decrypt_int) + +def verify(cypher, key): + """Verifies a string 'cypher' with the public key 'key'""" + if 'n' not in key: + raise Exception("You must use the public key with verify") + + return gluechops(cypher, key['e'], key['n'], decrypt_int) + +# Do doctest if we're not imported +if __name__ == "__main__": + import doctest + doctest.testmod() + +__all__ = ["newkeys", "encrypt", "decrypt", "sign", "verify"] + diff --git a/python/rsa/rsa/asn1.py b/python/rsa/rsa/asn1.py new file mode 100644 index 000000000..706e6cf22 --- /dev/null +++ b/python/rsa/rsa/asn1.py @@ -0,0 +1,35 @@ +'''ASN.1 definitions. + +Not all ASN.1-handling code use these definitions, but when it does, they should be here. +''' + +from pyasn1.type import univ, namedtype, tag + +class PubKeyHeader(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('oid', univ.ObjectIdentifier()), + namedtype.NamedType('parameters', univ.Null()), + ) + +class OpenSSLPubKey(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('header', PubKeyHeader()), + + # This little hack (the implicit tag) allows us to get a Bit String as Octet String + namedtype.NamedType('key', univ.OctetString().subtype( + implicitTag=tag.Tag(tagClass=0, tagFormat=0, tagId=3))), + ) + + +class AsnPubKey(univ.Sequence): + '''ASN.1 contents of DER encoded public key: + + RSAPublicKey ::= SEQUENCE { + modulus INTEGER, -- n + publicExponent INTEGER, -- e + ''' + + componentType = namedtype.NamedTypes( + namedtype.NamedType('modulus', univ.Integer()), + namedtype.NamedType('publicExponent', univ.Integer()), + ) diff --git a/python/rsa/rsa/bigfile.py b/python/rsa/rsa/bigfile.py new file mode 100644 index 000000000..516cf56b5 --- /dev/null +++ b/python/rsa/rsa/bigfile.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Large file support + + - break a file into smaller blocks, and encrypt them, and store the + encrypted blocks in another file. + + - take such an encrypted files, decrypt its blocks, and reconstruct the + original file. + +The encrypted file format is as follows, where || denotes byte concatenation: + + FILE := VERSION || BLOCK || BLOCK ... + + BLOCK := LENGTH || DATA + + LENGTH := varint-encoded length of the subsequent data. Varint comes from + Google Protobuf, and encodes an integer into a variable number of bytes. + Each byte uses the 7 lowest bits to encode the value. The highest bit set + to 1 indicates the next byte is also part of the varint. The last byte will + have this bit set to 0. + +This file format is called the VARBLOCK format, in line with the varint format +used to denote the block sizes. + +''' + +from rsa import key, common, pkcs1, varblock +from rsa._compat import byte + +def encrypt_bigfile(infile, outfile, pub_key): + '''Encrypts a file, writing it to 'outfile' in VARBLOCK format. + + :param infile: file-like object to read the cleartext from + :param outfile: file-like object to write the crypto in VARBLOCK format to + :param pub_key: :py:class:`rsa.PublicKey` to encrypt with + + ''' + + if not isinstance(pub_key, key.PublicKey): + raise TypeError('Public key required, but got %r' % pub_key) + + key_bytes = common.bit_size(pub_key.n) // 8 + blocksize = key_bytes - 11 # keep space for PKCS#1 padding + + # Write the version number to the VARBLOCK file + outfile.write(byte(varblock.VARBLOCK_VERSION)) + + # Encrypt and write each block + for block in varblock.yield_fixedblocks(infile, blocksize): + crypto = pkcs1.encrypt(block, pub_key) + + varblock.write_varint(outfile, len(crypto)) + outfile.write(crypto) + +def decrypt_bigfile(infile, outfile, priv_key): + '''Decrypts an encrypted VARBLOCK file, writing it to 'outfile' + + :param infile: file-like object to read the crypto in VARBLOCK format from + :param outfile: file-like object to write the cleartext to + :param priv_key: :py:class:`rsa.PrivateKey` to decrypt with + + ''' + + if not isinstance(priv_key, key.PrivateKey): + raise TypeError('Private key required, but got %r' % priv_key) + + for block in varblock.yield_varblocks(infile): + cleartext = pkcs1.decrypt(block, priv_key) + outfile.write(cleartext) + +__all__ = ['encrypt_bigfile', 'decrypt_bigfile'] + diff --git a/python/rsa/rsa/cli.py b/python/rsa/rsa/cli.py new file mode 100644 index 000000000..527cc4979 --- /dev/null +++ b/python/rsa/rsa/cli.py @@ -0,0 +1,379 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Commandline scripts. + +These scripts are called by the executables defined in setup.py. +''' + +from __future__ import with_statement, print_function + +import abc +import sys +from optparse import OptionParser + +import rsa +import rsa.bigfile +import rsa.pkcs1 + +HASH_METHODS = sorted(rsa.pkcs1.HASH_METHODS.keys()) + +def keygen(): + '''Key generator.''' + + # Parse the CLI options + parser = OptionParser(usage='usage: %prog [options] keysize', + description='Generates a new RSA keypair of "keysize" bits.') + + parser.add_option('--pubout', type='string', + help='Output filename for the public key. The public key is ' + 'not saved if this option is not present. You can use ' + 'pyrsa-priv2pub to create the public key file later.') + + parser.add_option('-o', '--out', type='string', + help='Output filename for the private key. The key is ' + 'written to stdout if this option is not present.') + + parser.add_option('--form', + help='key format of the private and public keys - default PEM', + choices=('PEM', 'DER'), default='PEM') + + (cli, cli_args) = parser.parse_args(sys.argv[1:]) + + if len(cli_args) != 1: + parser.print_help() + raise SystemExit(1) + + try: + keysize = int(cli_args[0]) + except ValueError: + parser.print_help() + print('Not a valid number: %s' % cli_args[0], file=sys.stderr) + raise SystemExit(1) + + print('Generating %i-bit key' % keysize, file=sys.stderr) + (pub_key, priv_key) = rsa.newkeys(keysize) + + + # Save public key + if cli.pubout: + print('Writing public key to %s' % cli.pubout, file=sys.stderr) + data = pub_key.save_pkcs1(format=cli.form) + with open(cli.pubout, 'wb') as outfile: + outfile.write(data) + + # Save private key + data = priv_key.save_pkcs1(format=cli.form) + + if cli.out: + print('Writing private key to %s' % cli.out, file=sys.stderr) + with open(cli.out, 'wb') as outfile: + outfile.write(data) + else: + print('Writing private key to stdout', file=sys.stderr) + sys.stdout.write(data) + + +class CryptoOperation(object): + '''CLI callable that operates with input, output, and a key.''' + + __metaclass__ = abc.ABCMeta + + keyname = 'public' # or 'private' + usage = 'usage: %%prog [options] %(keyname)s_key' + description = None + operation = 'decrypt' + operation_past = 'decrypted' + operation_progressive = 'decrypting' + input_help = 'Name of the file to %(operation)s. Reads from stdin if ' \ + 'not specified.' + output_help = 'Name of the file to write the %(operation_past)s file ' \ + 'to. Written to stdout if this option is not present.' + expected_cli_args = 1 + has_output = True + + key_class = rsa.PublicKey + + def __init__(self): + self.usage = self.usage % self.__class__.__dict__ + self.input_help = self.input_help % self.__class__.__dict__ + self.output_help = self.output_help % self.__class__.__dict__ + + @abc.abstractmethod + def perform_operation(self, indata, key, cli_args=None): + '''Performs the program's operation. + + Implement in a subclass. + + :returns: the data to write to the output. + ''' + + def __call__(self): + '''Runs the program.''' + + (cli, cli_args) = self.parse_cli() + + key = self.read_key(cli_args[0], cli.keyform) + + indata = self.read_infile(cli.input) + + print(self.operation_progressive.title(), file=sys.stderr) + outdata = self.perform_operation(indata, key, cli_args) + + if self.has_output: + self.write_outfile(outdata, cli.output) + + def parse_cli(self): + '''Parse the CLI options + + :returns: (cli_opts, cli_args) + ''' + + parser = OptionParser(usage=self.usage, description=self.description) + + parser.add_option('-i', '--input', type='string', help=self.input_help) + + if self.has_output: + parser.add_option('-o', '--output', type='string', help=self.output_help) + + parser.add_option('--keyform', + help='Key format of the %s key - default PEM' % self.keyname, + choices=('PEM', 'DER'), default='PEM') + + (cli, cli_args) = parser.parse_args(sys.argv[1:]) + + if len(cli_args) != self.expected_cli_args: + parser.print_help() + raise SystemExit(1) + + return (cli, cli_args) + + def read_key(self, filename, keyform): + '''Reads a public or private key.''' + + print('Reading %s key from %s' % (self.keyname, filename), file=sys.stderr) + with open(filename, 'rb') as keyfile: + keydata = keyfile.read() + + return self.key_class.load_pkcs1(keydata, keyform) + + def read_infile(self, inname): + '''Read the input file''' + + if inname: + print('Reading input from %s' % inname, file=sys.stderr) + with open(inname, 'rb') as infile: + return infile.read() + + print('Reading input from stdin', file=sys.stderr) + return sys.stdin.read() + + def write_outfile(self, outdata, outname): + '''Write the output file''' + + if outname: + print('Writing output to %s' % outname, file=sys.stderr) + with open(outname, 'wb') as outfile: + outfile.write(outdata) + else: + print('Writing output to stdout', file=sys.stderr) + sys.stdout.write(outdata) + +class EncryptOperation(CryptoOperation): + '''Encrypts a file.''' + + keyname = 'public' + description = ('Encrypts a file. The file must be shorter than the key ' + 'length in order to be encrypted. For larger files, use the ' + 'pyrsa-encrypt-bigfile command.') + operation = 'encrypt' + operation_past = 'encrypted' + operation_progressive = 'encrypting' + + + def perform_operation(self, indata, pub_key, cli_args=None): + '''Encrypts files.''' + + return rsa.encrypt(indata, pub_key) + +class DecryptOperation(CryptoOperation): + '''Decrypts a file.''' + + keyname = 'private' + description = ('Decrypts a file. The original file must be shorter than ' + 'the key length in order to have been encrypted. For larger ' + 'files, use the pyrsa-decrypt-bigfile command.') + operation = 'decrypt' + operation_past = 'decrypted' + operation_progressive = 'decrypting' + key_class = rsa.PrivateKey + + def perform_operation(self, indata, priv_key, cli_args=None): + '''Decrypts files.''' + + return rsa.decrypt(indata, priv_key) + +class SignOperation(CryptoOperation): + '''Signs a file.''' + + keyname = 'private' + usage = 'usage: %%prog [options] private_key hash_method' + description = ('Signs a file, outputs the signature. Choose the hash ' + 'method from %s' % ', '.join(HASH_METHODS)) + operation = 'sign' + operation_past = 'signature' + operation_progressive = 'Signing' + key_class = rsa.PrivateKey + expected_cli_args = 2 + + output_help = ('Name of the file to write the signature to. Written ' + 'to stdout if this option is not present.') + + def perform_operation(self, indata, priv_key, cli_args): + '''Decrypts files.''' + + hash_method = cli_args[1] + if hash_method not in HASH_METHODS: + raise SystemExit('Invalid hash method, choose one of %s' % + ', '.join(HASH_METHODS)) + + return rsa.sign(indata, priv_key, hash_method) + +class VerifyOperation(CryptoOperation): + '''Verify a signature.''' + + keyname = 'public' + usage = 'usage: %%prog [options] public_key signature_file' + description = ('Verifies a signature, exits with status 0 upon success, ' + 'prints an error message and exits with status 1 upon error.') + operation = 'verify' + operation_past = 'verified' + operation_progressive = 'Verifying' + key_class = rsa.PublicKey + expected_cli_args = 2 + has_output = False + + def perform_operation(self, indata, pub_key, cli_args): + '''Decrypts files.''' + + signature_file = cli_args[1] + + with open(signature_file, 'rb') as sigfile: + signature = sigfile.read() + + try: + rsa.verify(indata, signature, pub_key) + except rsa.VerificationError: + raise SystemExit('Verification failed.') + + print('Verification OK', file=sys.stderr) + + +class BigfileOperation(CryptoOperation): + '''CryptoOperation that doesn't read the entire file into memory.''' + + def __init__(self): + CryptoOperation.__init__(self) + + self.file_objects = [] + + def __del__(self): + '''Closes any open file handles.''' + + for fobj in self.file_objects: + fobj.close() + + def __call__(self): + '''Runs the program.''' + + (cli, cli_args) = self.parse_cli() + + key = self.read_key(cli_args[0], cli.keyform) + + # Get the file handles + infile = self.get_infile(cli.input) + outfile = self.get_outfile(cli.output) + + # Call the operation + print(self.operation_progressive.title(), file=sys.stderr) + self.perform_operation(infile, outfile, key, cli_args) + + def get_infile(self, inname): + '''Returns the input file object''' + + if inname: + print('Reading input from %s' % inname, file=sys.stderr) + fobj = open(inname, 'rb') + self.file_objects.append(fobj) + else: + print('Reading input from stdin', file=sys.stderr) + fobj = sys.stdin + + return fobj + + def get_outfile(self, outname): + '''Returns the output file object''' + + if outname: + print('Will write output to %s' % outname, file=sys.stderr) + fobj = open(outname, 'wb') + self.file_objects.append(fobj) + else: + print('Will write output to stdout', file=sys.stderr) + fobj = sys.stdout + + return fobj + +class EncryptBigfileOperation(BigfileOperation): + '''Encrypts a file to VARBLOCK format.''' + + keyname = 'public' + description = ('Encrypts a file to an encrypted VARBLOCK file. The file ' + 'can be larger than the key length, but the output file is only ' + 'compatible with Python-RSA.') + operation = 'encrypt' + operation_past = 'encrypted' + operation_progressive = 'encrypting' + + def perform_operation(self, infile, outfile, pub_key, cli_args=None): + '''Encrypts files to VARBLOCK.''' + + return rsa.bigfile.encrypt_bigfile(infile, outfile, pub_key) + +class DecryptBigfileOperation(BigfileOperation): + '''Decrypts a file in VARBLOCK format.''' + + keyname = 'private' + description = ('Decrypts an encrypted VARBLOCK file that was encrypted ' + 'with pyrsa-encrypt-bigfile') + operation = 'decrypt' + operation_past = 'decrypted' + operation_progressive = 'decrypting' + key_class = rsa.PrivateKey + + def perform_operation(self, infile, outfile, priv_key, cli_args=None): + '''Decrypts a VARBLOCK file.''' + + return rsa.bigfile.decrypt_bigfile(infile, outfile, priv_key) + + +encrypt = EncryptOperation() +decrypt = DecryptOperation() +sign = SignOperation() +verify = VerifyOperation() +encrypt_bigfile = EncryptBigfileOperation() +decrypt_bigfile = DecryptBigfileOperation() + diff --git a/python/rsa/rsa/common.py b/python/rsa/rsa/common.py new file mode 100644 index 000000000..39feb8c22 --- /dev/null +++ b/python/rsa/rsa/common.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Common functionality shared by several modules.''' + + +def bit_size(num): + ''' + Number of bits needed to represent a integer excluding any prefix + 0 bits. + + As per definition from http://wiki.python.org/moin/BitManipulation and + to match the behavior of the Python 3 API. + + Usage:: + + >>> bit_size(1023) + 10 + >>> bit_size(1024) + 11 + >>> bit_size(1025) + 11 + + :param num: + Integer value. If num is 0, returns 0. Only the absolute value of the + number is considered. Therefore, signed integers will be abs(num) + before the number's bit length is determined. + :returns: + Returns the number of bits in the integer. + ''' + if num == 0: + return 0 + if num < 0: + num = -num + + # Make sure this is an int and not a float. + num & 1 + + hex_num = "%x" % num + return ((len(hex_num) - 1) * 4) + { + '0':0, '1':1, '2':2, '3':2, + '4':3, '5':3, '6':3, '7':3, + '8':4, '9':4, 'a':4, 'b':4, + 'c':4, 'd':4, 'e':4, 'f':4, + }[hex_num[0]] + + +def _bit_size(number): + ''' + Returns the number of bits required to hold a specific long number. + ''' + if number < 0: + raise ValueError('Only nonnegative numbers possible: %s' % number) + + if number == 0: + return 0 + + # This works, even with very large numbers. When using math.log(number, 2), + # you'll get rounding errors and it'll fail. + bits = 0 + while number: + bits += 1 + number >>= 1 + + return bits + + +def byte_size(number): + ''' + Returns the number of bytes required to hold a specific long number. + + The number of bytes is rounded up. + + Usage:: + + >>> byte_size(1 << 1023) + 128 + >>> byte_size((1 << 1024) - 1) + 128 + >>> byte_size(1 << 1024) + 129 + + :param number: + An unsigned integer + :returns: + The number of bytes required to hold a specific long number. + ''' + quanta, mod = divmod(bit_size(number), 8) + if mod or number == 0: + quanta += 1 + return quanta + #return int(math.ceil(bit_size(number) / 8.0)) + + +def extended_gcd(a, b): + '''Returns a tuple (r, i, j) such that r = gcd(a, b) = ia + jb + ''' + # r = gcd(a,b) i = multiplicitive inverse of a mod b + # or j = multiplicitive inverse of b mod a + # Neg return values for i or j are made positive mod b or a respectively + # Iterateive Version is faster and uses much less stack space + x = 0 + y = 1 + lx = 1 + ly = 0 + oa = a #Remember original a/b to remove + ob = b #negative values from return results + while b != 0: + q = a // b + (a, b) = (b, a % b) + (x, lx) = ((lx - (q * x)),x) + (y, ly) = ((ly - (q * y)),y) + if (lx < 0): lx += ob #If neg wrap modulo orignal b + if (ly < 0): ly += oa #If neg wrap modulo orignal a + return (a, lx, ly) #Return only positive values + + +def inverse(x, n): + '''Returns x^-1 (mod n) + + >>> inverse(7, 4) + 3 + >>> (inverse(143, 4) * 143) % 4 + 1 + ''' + + (divider, inv, _) = extended_gcd(x, n) + + if divider != 1: + raise ValueError("x (%d) and n (%d) are not relatively prime" % (x, n)) + + return inv + + +def crt(a_values, modulo_values): + '''Chinese Remainder Theorem. + + Calculates x such that x = a[i] (mod m[i]) for each i. + + :param a_values: the a-values of the above equation + :param modulo_values: the m-values of the above equation + :returns: x such that x = a[i] (mod m[i]) for each i + + + >>> crt([2, 3], [3, 5]) + 8 + + >>> crt([2, 3, 2], [3, 5, 7]) + 23 + + >>> crt([2, 3, 0], [7, 11, 15]) + 135 + ''' + + m = 1 + x = 0 + + for modulo in modulo_values: + m *= modulo + + for (m_i, a_i) in zip(modulo_values, a_values): + M_i = m // m_i + inv = inverse(M_i, m_i) + + x = (x + a_i * M_i * inv) % m + + return x + +if __name__ == '__main__': + import doctest + doctest.testmod() + diff --git a/python/rsa/rsa/core.py b/python/rsa/rsa/core.py new file mode 100644 index 000000000..90dfee8e5 --- /dev/null +++ b/python/rsa/rsa/core.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Core mathematical operations. + +This is the actual core RSA implementation, which is only defined +mathematically on integers. +''' + + +from rsa._compat import is_integer + +def assert_int(var, name): + + if is_integer(var): + return + + raise TypeError('%s should be an integer, not %s' % (name, var.__class__)) + +def encrypt_int(message, ekey, n): + '''Encrypts a message using encryption key 'ekey', working modulo n''' + + assert_int(message, 'message') + assert_int(ekey, 'ekey') + assert_int(n, 'n') + + if message < 0: + raise ValueError('Only non-negative numbers are supported') + + if message > n: + raise OverflowError("The message %i is too long for n=%i" % (message, n)) + + return pow(message, ekey, n) + +def decrypt_int(cyphertext, dkey, n): + '''Decrypts a cypher text using the decryption key 'dkey', working + modulo n''' + + assert_int(cyphertext, 'cyphertext') + assert_int(dkey, 'dkey') + assert_int(n, 'n') + + message = pow(cyphertext, dkey, n) + return message + diff --git a/python/rsa/rsa/key.py b/python/rsa/rsa/key.py new file mode 100644 index 000000000..b6de7b3f3 --- /dev/null +++ b/python/rsa/rsa/key.py @@ -0,0 +1,612 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''RSA key generation code. + +Create new keys with the newkeys() function. It will give you a PublicKey and a +PrivateKey object. + +Loading and saving keys requires the pyasn1 module. This module is imported as +late as possible, such that other functionality will remain working in absence +of pyasn1. + +''' + +import logging +from rsa._compat import b, bytes_type + +import rsa.prime +import rsa.pem +import rsa.common + +log = logging.getLogger(__name__) + + + +class AbstractKey(object): + '''Abstract superclass for private and public keys.''' + + @classmethod + def load_pkcs1(cls, keyfile, format='PEM'): + r'''Loads a key in PKCS#1 DER or PEM format. + + :param keyfile: contents of a DER- or PEM-encoded file that contains + the public key. + :param format: the format of the file to load; 'PEM' or 'DER' + + :return: a PublicKey object + + ''' + + methods = { + 'PEM': cls._load_pkcs1_pem, + 'DER': cls._load_pkcs1_der, + } + + if format not in methods: + formats = ', '.join(sorted(methods.keys())) + raise ValueError('Unsupported format: %r, try one of %s' % (format, + formats)) + + method = methods[format] + return method(keyfile) + + def save_pkcs1(self, format='PEM'): + '''Saves the public key in PKCS#1 DER or PEM format. + + :param format: the format to save; 'PEM' or 'DER' + :returns: the DER- or PEM-encoded public key. + + ''' + + methods = { + 'PEM': self._save_pkcs1_pem, + 'DER': self._save_pkcs1_der, + } + + if format not in methods: + formats = ', '.join(sorted(methods.keys())) + raise ValueError('Unsupported format: %r, try one of %s' % (format, + formats)) + + method = methods[format] + return method() + +class PublicKey(AbstractKey): + '''Represents a public RSA key. + + This key is also known as the 'encryption key'. It contains the 'n' and 'e' + values. + + Supports attributes as well as dictionary-like access. Attribute accesss is + faster, though. + + >>> PublicKey(5, 3) + PublicKey(5, 3) + + >>> key = PublicKey(5, 3) + >>> key.n + 5 + >>> key['n'] + 5 + >>> key.e + 3 + >>> key['e'] + 3 + + ''' + + __slots__ = ('n', 'e') + + def __init__(self, n, e): + self.n = n + self.e = e + + def __getitem__(self, key): + return getattr(self, key) + + def __repr__(self): + return 'PublicKey(%i, %i)' % (self.n, self.e) + + def __eq__(self, other): + if other is None: + return False + + if not isinstance(other, PublicKey): + return False + + return self.n == other.n and self.e == other.e + + def __ne__(self, other): + return not (self == other) + + @classmethod + def _load_pkcs1_der(cls, keyfile): + r'''Loads a key in PKCS#1 DER format. + + @param keyfile: contents of a DER-encoded file that contains the public + key. + @return: a PublicKey object + + First let's construct a DER encoded key: + + >>> import base64 + >>> b64der = 'MAwCBQCNGmYtAgMBAAE=' + >>> der = base64.decodestring(b64der) + + This loads the file: + + >>> PublicKey._load_pkcs1_der(der) + PublicKey(2367317549, 65537) + + ''' + + from pyasn1.codec.der import decoder + from rsa.asn1 import AsnPubKey + + (priv, _) = decoder.decode(keyfile, asn1Spec=AsnPubKey()) + return cls(n=int(priv['modulus']), e=int(priv['publicExponent'])) + + def _save_pkcs1_der(self): + '''Saves the public key in PKCS#1 DER format. + + @returns: the DER-encoded public key. + ''' + + from pyasn1.codec.der import encoder + from rsa.asn1 import AsnPubKey + + # Create the ASN object + asn_key = AsnPubKey() + asn_key.setComponentByName('modulus', self.n) + asn_key.setComponentByName('publicExponent', self.e) + + return encoder.encode(asn_key) + + @classmethod + def _load_pkcs1_pem(cls, keyfile): + '''Loads a PKCS#1 PEM-encoded public key file. + + The contents of the file before the "-----BEGIN RSA PUBLIC KEY-----" and + after the "-----END RSA PUBLIC KEY-----" lines is ignored. + + @param keyfile: contents of a PEM-encoded file that contains the public + key. + @return: a PublicKey object + ''' + + der = rsa.pem.load_pem(keyfile, 'RSA PUBLIC KEY') + return cls._load_pkcs1_der(der) + + def _save_pkcs1_pem(self): + '''Saves a PKCS#1 PEM-encoded public key file. + + @return: contents of a PEM-encoded file that contains the public key. + ''' + + der = self._save_pkcs1_der() + return rsa.pem.save_pem(der, 'RSA PUBLIC KEY') + + @classmethod + def load_pkcs1_openssl_pem(cls, keyfile): + '''Loads a PKCS#1.5 PEM-encoded public key file from OpenSSL. + + These files can be recognised in that they start with BEGIN PUBLIC KEY + rather than BEGIN RSA PUBLIC KEY. + + The contents of the file before the "-----BEGIN PUBLIC KEY-----" and + after the "-----END PUBLIC KEY-----" lines is ignored. + + @param keyfile: contents of a PEM-encoded file that contains the public + key, from OpenSSL. + @return: a PublicKey object + ''' + + der = rsa.pem.load_pem(keyfile, 'PUBLIC KEY') + return cls.load_pkcs1_openssl_der(der) + + @classmethod + def load_pkcs1_openssl_der(cls, keyfile): + '''Loads a PKCS#1 DER-encoded public key file from OpenSSL. + + @param keyfile: contents of a DER-encoded file that contains the public + key, from OpenSSL. + @return: a PublicKey object + ''' + + from rsa.asn1 import OpenSSLPubKey + from pyasn1.codec.der import decoder + from pyasn1.type import univ + + (keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey()) + + if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'): + raise TypeError("This is not a DER-encoded OpenSSL-compatible public key") + + return cls._load_pkcs1_der(keyinfo['key'][1:]) + + + + +class PrivateKey(AbstractKey): + '''Represents a private RSA key. + + This key is also known as the 'decryption key'. It contains the 'n', 'e', + 'd', 'p', 'q' and other values. + + Supports attributes as well as dictionary-like access. Attribute accesss is + faster, though. + + >>> PrivateKey(3247, 65537, 833, 191, 17) + PrivateKey(3247, 65537, 833, 191, 17) + + exp1, exp2 and coef don't have to be given, they will be calculated: + + >>> pk = PrivateKey(3727264081, 65537, 3349121513, 65063, 57287) + >>> pk.exp1 + 55063 + >>> pk.exp2 + 10095 + >>> pk.coef + 50797 + + If you give exp1, exp2 or coef, they will be used as-is: + + >>> pk = PrivateKey(1, 2, 3, 4, 5, 6, 7, 8) + >>> pk.exp1 + 6 + >>> pk.exp2 + 7 + >>> pk.coef + 8 + + ''' + + __slots__ = ('n', 'e', 'd', 'p', 'q', 'exp1', 'exp2', 'coef') + + def __init__(self, n, e, d, p, q, exp1=None, exp2=None, coef=None): + self.n = n + self.e = e + self.d = d + self.p = p + self.q = q + + # Calculate the other values if they aren't supplied + if exp1 is None: + self.exp1 = int(d % (p - 1)) + else: + self.exp1 = exp1 + + if exp1 is None: + self.exp2 = int(d % (q - 1)) + else: + self.exp2 = exp2 + + if coef is None: + self.coef = rsa.common.inverse(q, p) + else: + self.coef = coef + + def __getitem__(self, key): + return getattr(self, key) + + def __repr__(self): + return 'PrivateKey(%(n)i, %(e)i, %(d)i, %(p)i, %(q)i)' % self + + def __eq__(self, other): + if other is None: + return False + + if not isinstance(other, PrivateKey): + return False + + return (self.n == other.n and + self.e == other.e and + self.d == other.d and + self.p == other.p and + self.q == other.q and + self.exp1 == other.exp1 and + self.exp2 == other.exp2 and + self.coef == other.coef) + + def __ne__(self, other): + return not (self == other) + + @classmethod + def _load_pkcs1_der(cls, keyfile): + r'''Loads a key in PKCS#1 DER format. + + @param keyfile: contents of a DER-encoded file that contains the private + key. + @return: a PrivateKey object + + First let's construct a DER encoded key: + + >>> import base64 + >>> b64der = 'MC4CAQACBQDeKYlRAgMBAAECBQDHn4npAgMA/icCAwDfxwIDANcXAgInbwIDAMZt' + >>> der = base64.decodestring(b64der) + + This loads the file: + + >>> PrivateKey._load_pkcs1_der(der) + PrivateKey(3727264081, 65537, 3349121513, 65063, 57287) + + ''' + + from pyasn1.codec.der import decoder + (priv, _) = decoder.decode(keyfile) + + # ASN.1 contents of DER encoded private key: + # + # RSAPrivateKey ::= SEQUENCE { + # version Version, + # modulus INTEGER, -- n + # publicExponent INTEGER, -- e + # privateExponent INTEGER, -- d + # prime1 INTEGER, -- p + # prime2 INTEGER, -- q + # exponent1 INTEGER, -- d mod (p-1) + # exponent2 INTEGER, -- d mod (q-1) + # coefficient INTEGER, -- (inverse of q) mod p + # otherPrimeInfos OtherPrimeInfos OPTIONAL + # } + + if priv[0] != 0: + raise ValueError('Unable to read this file, version %s != 0' % priv[0]) + + as_ints = tuple(int(x) for x in priv[1:9]) + return cls(*as_ints) + + def _save_pkcs1_der(self): + '''Saves the private key in PKCS#1 DER format. + + @returns: the DER-encoded private key. + ''' + + from pyasn1.type import univ, namedtype + from pyasn1.codec.der import encoder + + class AsnPrivKey(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('version', univ.Integer()), + namedtype.NamedType('modulus', univ.Integer()), + namedtype.NamedType('publicExponent', univ.Integer()), + namedtype.NamedType('privateExponent', univ.Integer()), + namedtype.NamedType('prime1', univ.Integer()), + namedtype.NamedType('prime2', univ.Integer()), + namedtype.NamedType('exponent1', univ.Integer()), + namedtype.NamedType('exponent2', univ.Integer()), + namedtype.NamedType('coefficient', univ.Integer()), + ) + + # Create the ASN object + asn_key = AsnPrivKey() + asn_key.setComponentByName('version', 0) + asn_key.setComponentByName('modulus', self.n) + asn_key.setComponentByName('publicExponent', self.e) + asn_key.setComponentByName('privateExponent', self.d) + asn_key.setComponentByName('prime1', self.p) + asn_key.setComponentByName('prime2', self.q) + asn_key.setComponentByName('exponent1', self.exp1) + asn_key.setComponentByName('exponent2', self.exp2) + asn_key.setComponentByName('coefficient', self.coef) + + return encoder.encode(asn_key) + + @classmethod + def _load_pkcs1_pem(cls, keyfile): + '''Loads a PKCS#1 PEM-encoded private key file. + + The contents of the file before the "-----BEGIN RSA PRIVATE KEY-----" and + after the "-----END RSA PRIVATE KEY-----" lines is ignored. + + @param keyfile: contents of a PEM-encoded file that contains the private + key. + @return: a PrivateKey object + ''' + + der = rsa.pem.load_pem(keyfile, b('RSA PRIVATE KEY')) + return cls._load_pkcs1_der(der) + + def _save_pkcs1_pem(self): + '''Saves a PKCS#1 PEM-encoded private key file. + + @return: contents of a PEM-encoded file that contains the private key. + ''' + + der = self._save_pkcs1_der() + return rsa.pem.save_pem(der, b('RSA PRIVATE KEY')) + +def find_p_q(nbits, getprime_func=rsa.prime.getprime, accurate=True): + ''''Returns a tuple of two different primes of nbits bits each. + + The resulting p * q has exacty 2 * nbits bits, and the returned p and q + will not be equal. + + :param nbits: the number of bits in each of p and q. + :param getprime_func: the getprime function, defaults to + :py:func:`rsa.prime.getprime`. + + *Introduced in Python-RSA 3.1* + + :param accurate: whether to enable accurate mode or not. + :returns: (p, q), where p > q + + >>> (p, q) = find_p_q(128) + >>> from rsa import common + >>> common.bit_size(p * q) + 256 + + When not in accurate mode, the number of bits can be slightly less + + >>> (p, q) = find_p_q(128, accurate=False) + >>> from rsa import common + >>> common.bit_size(p * q) <= 256 + True + >>> common.bit_size(p * q) > 240 + True + + ''' + + total_bits = nbits * 2 + + # Make sure that p and q aren't too close or the factoring programs can + # factor n. + shift = nbits // 16 + pbits = nbits + shift + qbits = nbits - shift + + # Choose the two initial primes + log.debug('find_p_q(%i): Finding p', nbits) + p = getprime_func(pbits) + log.debug('find_p_q(%i): Finding q', nbits) + q = getprime_func(qbits) + + def is_acceptable(p, q): + '''Returns True iff p and q are acceptable: + + - p and q differ + - (p * q) has the right nr of bits (when accurate=True) + ''' + + if p == q: + return False + + if not accurate: + return True + + # Make sure we have just the right amount of bits + found_size = rsa.common.bit_size(p * q) + return total_bits == found_size + + # Keep choosing other primes until they match our requirements. + change_p = False + while not is_acceptable(p, q): + # Change p on one iteration and q on the other + if change_p: + p = getprime_func(pbits) + else: + q = getprime_func(qbits) + + change_p = not change_p + + # We want p > q as described on + # http://www.di-mgt.com.au/rsa_alg.html#crt + return (max(p, q), min(p, q)) + +def calculate_keys(p, q, nbits): + '''Calculates an encryption and a decryption key given p and q, and + returns them as a tuple (e, d) + + ''' + + phi_n = (p - 1) * (q - 1) + + # A very common choice for e is 65537 + e = 65537 + + try: + d = rsa.common.inverse(e, phi_n) + except ValueError: + raise ValueError("e (%d) and phi_n (%d) are not relatively prime" % + (e, phi_n)) + + if (e * d) % phi_n != 1: + raise ValueError("e (%d) and d (%d) are not mult. inv. modulo " + "phi_n (%d)" % (e, d, phi_n)) + + return (e, d) + +def gen_keys(nbits, getprime_func, accurate=True): + '''Generate RSA keys of nbits bits. Returns (p, q, e, d). + + Note: this can take a long time, depending on the key size. + + :param nbits: the total number of bits in ``p`` and ``q``. Both ``p`` and + ``q`` will use ``nbits/2`` bits. + :param getprime_func: either :py:func:`rsa.prime.getprime` or a function + with similar signature. + ''' + + (p, q) = find_p_q(nbits // 2, getprime_func, accurate) + (e, d) = calculate_keys(p, q, nbits // 2) + + return (p, q, e, d) + +def newkeys(nbits, accurate=True, poolsize=1): + '''Generates public and private keys, and returns them as (pub, priv). + + The public key is also known as the 'encryption key', and is a + :py:class:`rsa.PublicKey` object. The private key is also known as the + 'decryption key' and is a :py:class:`rsa.PrivateKey` object. + + :param nbits: the number of bits required to store ``n = p*q``. + :param accurate: when True, ``n`` will have exactly the number of bits you + asked for. However, this makes key generation much slower. When False, + `n`` may have slightly less bits. + :param poolsize: the number of processes to use to generate the prime + numbers. If set to a number > 1, a parallel algorithm will be used. + This requires Python 2.6 or newer. + + :returns: a tuple (:py:class:`rsa.PublicKey`, :py:class:`rsa.PrivateKey`) + + The ``poolsize`` parameter was added in *Python-RSA 3.1* and requires + Python 2.6 or newer. + + ''' + + if nbits < 16: + raise ValueError('Key too small') + + if poolsize < 1: + raise ValueError('Pool size (%i) should be >= 1' % poolsize) + + # Determine which getprime function to use + if poolsize > 1: + from rsa import parallel + import functools + + getprime_func = functools.partial(parallel.getprime, poolsize=poolsize) + else: getprime_func = rsa.prime.getprime + + # Generate the key components + (p, q, e, d) = gen_keys(nbits, getprime_func) + + # Create the key objects + n = p * q + + return ( + PublicKey(n, e), + PrivateKey(n, e, d, p, q) + ) + +__all__ = ['PublicKey', 'PrivateKey', 'newkeys'] + +if __name__ == '__main__': + import doctest + + try: + for count in range(100): + (failures, tests) = doctest.testmod() + if failures: + break + + if (count and count % 10 == 0) or count == 1: + print('%i times' % count) + except KeyboardInterrupt: + print('Aborted') + else: + print('Doctests done') diff --git a/python/rsa/rsa/parallel.py b/python/rsa/rsa/parallel.py new file mode 100644 index 000000000..e5034ac70 --- /dev/null +++ b/python/rsa/rsa/parallel.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Functions for parallel computation on multiple cores. + +Introduced in Python-RSA 3.1. + +.. note:: + + Requires Python 2.6 or newer. + +''' + +from __future__ import print_function + +import multiprocessing as mp + +import rsa.prime +import rsa.randnum + +def _find_prime(nbits, pipe): + while True: + integer = rsa.randnum.read_random_int(nbits) + + # Make sure it's odd + integer |= 1 + + # Test for primeness + if rsa.prime.is_prime(integer): + pipe.send(integer) + return + +def getprime(nbits, poolsize): + '''Returns a prime number that can be stored in 'nbits' bits. + + Works in multiple threads at the same time. + + >>> p = getprime(128, 3) + >>> rsa.prime.is_prime(p-1) + False + >>> rsa.prime.is_prime(p) + True + >>> rsa.prime.is_prime(p+1) + False + + >>> from rsa import common + >>> common.bit_size(p) == 128 + True + + ''' + + (pipe_recv, pipe_send) = mp.Pipe(duplex=False) + + # Create processes + procs = [mp.Process(target=_find_prime, args=(nbits, pipe_send)) + for _ in range(poolsize)] + [p.start() for p in procs] + + result = pipe_recv.recv() + + [p.terminate() for p in procs] + + return result + +__all__ = ['getprime'] + + +if __name__ == '__main__': + print('Running doctests 1000x or until failure') + import doctest + + for count in range(100): + (failures, tests) = doctest.testmod() + if failures: + break + + if count and count % 10 == 0: + print('%i times' % count) + + print('Doctests done') + diff --git a/python/rsa/rsa/pem.py b/python/rsa/rsa/pem.py new file mode 100644 index 000000000..b1c3a0edb --- /dev/null +++ b/python/rsa/rsa/pem.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Functions that load and write PEM-encoded files.''' + +import base64 +from rsa._compat import b, is_bytes + +def _markers(pem_marker): + ''' + Returns the start and end PEM markers + ''' + + if is_bytes(pem_marker): + pem_marker = pem_marker.decode('utf-8') + + return (b('-----BEGIN %s-----' % pem_marker), + b('-----END %s-----' % pem_marker)) + +def load_pem(contents, pem_marker): + '''Loads a PEM file. + + @param contents: the contents of the file to interpret + @param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY' + when your file has '-----BEGIN RSA PRIVATE KEY-----' and + '-----END RSA PRIVATE KEY-----' markers. + + @return the base64-decoded content between the start and end markers. + + @raise ValueError: when the content is invalid, for example when the start + marker cannot be found. + + ''' + + (pem_start, pem_end) = _markers(pem_marker) + + pem_lines = [] + in_pem_part = False + + for line in contents.splitlines(): + line = line.strip() + + # Skip empty lines + if not line: + continue + + # Handle start marker + if line == pem_start: + if in_pem_part: + raise ValueError('Seen start marker "%s" twice' % pem_start) + + in_pem_part = True + continue + + # Skip stuff before first marker + if not in_pem_part: + continue + + # Handle end marker + if in_pem_part and line == pem_end: + in_pem_part = False + break + + # Load fields + if b(':') in line: + continue + + pem_lines.append(line) + + # Do some sanity checks + if not pem_lines: + raise ValueError('No PEM start marker "%s" found' % pem_start) + + if in_pem_part: + raise ValueError('No PEM end marker "%s" found' % pem_end) + + # Base64-decode the contents + pem = b('').join(pem_lines) + return base64.decodestring(pem) + + +def save_pem(contents, pem_marker): + '''Saves a PEM file. + + @param contents: the contents to encode in PEM format + @param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY' + when your file has '-----BEGIN RSA PRIVATE KEY-----' and + '-----END RSA PRIVATE KEY-----' markers. + + @return the base64-encoded content between the start and end markers. + + ''' + + (pem_start, pem_end) = _markers(pem_marker) + + b64 = base64.encodestring(contents).replace(b('\n'), b('')) + pem_lines = [pem_start] + + for block_start in range(0, len(b64), 64): + block = b64[block_start:block_start + 64] + pem_lines.append(block) + + pem_lines.append(pem_end) + pem_lines.append(b('')) + + return b('\n').join(pem_lines) + diff --git a/python/rsa/rsa/pkcs1.py b/python/rsa/rsa/pkcs1.py new file mode 100644 index 000000000..15e4cf639 --- /dev/null +++ b/python/rsa/rsa/pkcs1.py @@ -0,0 +1,391 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Functions for PKCS#1 version 1.5 encryption and signing + +This module implements certain functionality from PKCS#1 version 1.5. For a +very clear example, read http://www.di-mgt.com.au/rsa_alg.html#pkcs1schemes + +At least 8 bytes of random padding is used when encrypting a message. This makes +these methods much more secure than the ones in the ``rsa`` module. + +WARNING: this module leaks information when decryption or verification fails. +The exceptions that are raised contain the Python traceback information, which +can be used to deduce where in the process the failure occurred. DO NOT PASS +SUCH INFORMATION to your users. +''' + +import hashlib +import os + +from rsa._compat import b +from rsa import common, transform, core, varblock + +# ASN.1 codes that describe the hash algorithm used. +HASH_ASN1 = { + 'MD5': b('\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x05\x05\x00\x04\x10'), + 'SHA-1': b('\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'), + 'SHA-256': b('\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20'), + 'SHA-384': b('\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30'), + 'SHA-512': b('\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40'), +} + +HASH_METHODS = { + 'MD5': hashlib.md5, + 'SHA-1': hashlib.sha1, + 'SHA-256': hashlib.sha256, + 'SHA-384': hashlib.sha384, + 'SHA-512': hashlib.sha512, +} + +class CryptoError(Exception): + '''Base class for all exceptions in this module.''' + +class DecryptionError(CryptoError): + '''Raised when decryption fails.''' + +class VerificationError(CryptoError): + '''Raised when verification fails.''' + +def _pad_for_encryption(message, target_length): + r'''Pads the message for encryption, returning the padded message. + + :return: 00 02 RANDOM_DATA 00 MESSAGE + + >>> block = _pad_for_encryption('hello', 16) + >>> len(block) + 16 + >>> block[0:2] + '\x00\x02' + >>> block[-6:] + '\x00hello' + + ''' + + max_msglength = target_length - 11 + msglength = len(message) + + if msglength > max_msglength: + raise OverflowError('%i bytes needed for message, but there is only' + ' space for %i' % (msglength, max_msglength)) + + # Get random padding + padding = b('') + padding_length = target_length - msglength - 3 + + # We remove 0-bytes, so we'll end up with less padding than we've asked for, + # so keep adding data until we're at the correct length. + while len(padding) < padding_length: + needed_bytes = padding_length - len(padding) + + # Always read at least 8 bytes more than we need, and trim off the rest + # after removing the 0-bytes. This increases the chance of getting + # enough bytes, especially when needed_bytes is small + new_padding = os.urandom(needed_bytes + 5) + new_padding = new_padding.replace(b('\x00'), b('')) + padding = padding + new_padding[:needed_bytes] + + assert len(padding) == padding_length + + return b('').join([b('\x00\x02'), + padding, + b('\x00'), + message]) + + +def _pad_for_signing(message, target_length): + r'''Pads the message for signing, returning the padded message. + + The padding is always a repetition of FF bytes. + + :return: 00 01 PADDING 00 MESSAGE + + >>> block = _pad_for_signing('hello', 16) + >>> len(block) + 16 + >>> block[0:2] + '\x00\x01' + >>> block[-6:] + '\x00hello' + >>> block[2:-6] + '\xff\xff\xff\xff\xff\xff\xff\xff' + + ''' + + max_msglength = target_length - 11 + msglength = len(message) + + if msglength > max_msglength: + raise OverflowError('%i bytes needed for message, but there is only' + ' space for %i' % (msglength, max_msglength)) + + padding_length = target_length - msglength - 3 + + return b('').join([b('\x00\x01'), + padding_length * b('\xff'), + b('\x00'), + message]) + + +def encrypt(message, pub_key): + '''Encrypts the given message using PKCS#1 v1.5 + + :param message: the message to encrypt. Must be a byte string no longer than + ``k-11`` bytes, where ``k`` is the number of bytes needed to encode + the ``n`` component of the public key. + :param pub_key: the :py:class:`rsa.PublicKey` to encrypt with. + :raise OverflowError: when the message is too large to fit in the padded + block. + + >>> from rsa import key, common + >>> (pub_key, priv_key) = key.newkeys(256) + >>> message = 'hello' + >>> crypto = encrypt(message, pub_key) + + The crypto text should be just as long as the public key 'n' component: + + >>> len(crypto) == common.byte_size(pub_key.n) + True + + ''' + + keylength = common.byte_size(pub_key.n) + padded = _pad_for_encryption(message, keylength) + + payload = transform.bytes2int(padded) + encrypted = core.encrypt_int(payload, pub_key.e, pub_key.n) + block = transform.int2bytes(encrypted, keylength) + + return block + +def decrypt(crypto, priv_key): + r'''Decrypts the given message using PKCS#1 v1.5 + + The decryption is considered 'failed' when the resulting cleartext doesn't + start with the bytes 00 02, or when the 00 byte between the padding and + the message cannot be found. + + :param crypto: the crypto text as returned by :py:func:`rsa.encrypt` + :param priv_key: the :py:class:`rsa.PrivateKey` to decrypt with. + :raise DecryptionError: when the decryption fails. No details are given as + to why the code thinks the decryption fails, as this would leak + information about the private key. + + + >>> import rsa + >>> (pub_key, priv_key) = rsa.newkeys(256) + + It works with strings: + + >>> crypto = encrypt('hello', pub_key) + >>> decrypt(crypto, priv_key) + 'hello' + + And with binary data: + + >>> crypto = encrypt('\x00\x00\x00\x00\x01', pub_key) + >>> decrypt(crypto, priv_key) + '\x00\x00\x00\x00\x01' + + Altering the encrypted information will *likely* cause a + :py:class:`rsa.pkcs1.DecryptionError`. If you want to be *sure*, use + :py:func:`rsa.sign`. + + + .. warning:: + + Never display the stack trace of a + :py:class:`rsa.pkcs1.DecryptionError` exception. It shows where in the + code the exception occurred, and thus leaks information about the key. + It's only a tiny bit of information, but every bit makes cracking the + keys easier. + + >>> crypto = encrypt('hello', pub_key) + >>> crypto = crypto[0:5] + 'X' + crypto[6:] # change a byte + >>> decrypt(crypto, priv_key) + Traceback (most recent call last): + ... + DecryptionError: Decryption failed + + ''' + + blocksize = common.byte_size(priv_key.n) + encrypted = transform.bytes2int(crypto) + decrypted = core.decrypt_int(encrypted, priv_key.d, priv_key.n) + cleartext = transform.int2bytes(decrypted, blocksize) + + # If we can't find the cleartext marker, decryption failed. + if cleartext[0:2] != b('\x00\x02'): + raise DecryptionError('Decryption failed') + + # Find the 00 separator between the padding and the message + try: + sep_idx = cleartext.index(b('\x00'), 2) + except ValueError: + raise DecryptionError('Decryption failed') + + return cleartext[sep_idx+1:] + +def sign(message, priv_key, hash): + '''Signs the message with the private key. + + Hashes the message, then signs the hash with the given key. This is known + as a "detached signature", because the message itself isn't altered. + + :param message: the message to sign. Can be an 8-bit string or a file-like + object. If ``message`` has a ``read()`` method, it is assumed to be a + file-like object. + :param priv_key: the :py:class:`rsa.PrivateKey` to sign with + :param hash: the hash method used on the message. Use 'MD5', 'SHA-1', + 'SHA-256', 'SHA-384' or 'SHA-512'. + :return: a message signature block. + :raise OverflowError: if the private key is too small to contain the + requested hash. + + ''' + + # Get the ASN1 code for this hash method + if hash not in HASH_ASN1: + raise ValueError('Invalid hash method: %s' % hash) + asn1code = HASH_ASN1[hash] + + # Calculate the hash + hash = _hash(message, hash) + + # Encrypt the hash with the private key + cleartext = asn1code + hash + keylength = common.byte_size(priv_key.n) + padded = _pad_for_signing(cleartext, keylength) + + payload = transform.bytes2int(padded) + encrypted = core.encrypt_int(payload, priv_key.d, priv_key.n) + block = transform.int2bytes(encrypted, keylength) + + return block + +def verify(message, signature, pub_key): + '''Verifies that the signature matches the message. + + The hash method is detected automatically from the signature. + + :param message: the signed message. Can be an 8-bit string or a file-like + object. If ``message`` has a ``read()`` method, it is assumed to be a + file-like object. + :param signature: the signature block, as created with :py:func:`rsa.sign`. + :param pub_key: the :py:class:`rsa.PublicKey` of the person signing the message. + :raise VerificationError: when the signature doesn't match the message. + + .. warning:: + + Never display the stack trace of a + :py:class:`rsa.pkcs1.VerificationError` exception. It shows where in + the code the exception occurred, and thus leaks information about the + key. It's only a tiny bit of information, but every bit makes cracking + the keys easier. + + ''' + + blocksize = common.byte_size(pub_key.n) + encrypted = transform.bytes2int(signature) + decrypted = core.decrypt_int(encrypted, pub_key.e, pub_key.n) + clearsig = transform.int2bytes(decrypted, blocksize) + + # If we can't find the signature marker, verification failed. + if clearsig[0:2] != b('\x00\x01'): + raise VerificationError('Verification failed') + + # Find the 00 separator between the padding and the payload + try: + sep_idx = clearsig.index(b('\x00'), 2) + except ValueError: + raise VerificationError('Verification failed') + + # Get the hash and the hash method + (method_name, signature_hash) = _find_method_hash(clearsig[sep_idx+1:]) + message_hash = _hash(message, method_name) + + # Compare the real hash to the hash in the signature + if message_hash != signature_hash: + raise VerificationError('Verification failed') + + return True + +def _hash(message, method_name): + '''Returns the message digest. + + :param message: the signed message. Can be an 8-bit string or a file-like + object. If ``message`` has a ``read()`` method, it is assumed to be a + file-like object. + :param method_name: the hash method, must be a key of + :py:const:`HASH_METHODS`. + + ''' + + if method_name not in HASH_METHODS: + raise ValueError('Invalid hash method: %s' % method_name) + + method = HASH_METHODS[method_name] + hasher = method() + + if hasattr(message, 'read') and hasattr(message.read, '__call__'): + # read as 1K blocks + for block in varblock.yield_fixedblocks(message, 1024): + hasher.update(block) + else: + # hash the message object itself. + hasher.update(message) + + return hasher.digest() + + +def _find_method_hash(method_hash): + '''Finds the hash method and the hash itself. + + :param method_hash: ASN1 code for the hash method concatenated with the + hash itself. + + :return: tuple (method, hash) where ``method`` is the used hash method, and + ``hash`` is the hash itself. + + :raise VerificationFailed: when the hash method cannot be found + + ''' + + for (hashname, asn1code) in HASH_ASN1.items(): + if not method_hash.startswith(asn1code): + continue + + return (hashname, method_hash[len(asn1code):]) + + raise VerificationError('Verification failed') + + +__all__ = ['encrypt', 'decrypt', 'sign', 'verify', + 'DecryptionError', 'VerificationError', 'CryptoError'] + +if __name__ == '__main__': + print('Running doctests 1000x or until failure') + import doctest + + for count in range(1000): + (failures, tests) = doctest.testmod() + if failures: + break + + if count and count % 100 == 0: + print('%i times' % count) + + print('Doctests done') diff --git a/python/rsa/rsa/prime.py b/python/rsa/rsa/prime.py new file mode 100644 index 000000000..7422eb1d2 --- /dev/null +++ b/python/rsa/rsa/prime.py @@ -0,0 +1,166 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Numerical functions related to primes. + +Implementation based on the book Algorithm Design by Michael T. Goodrich and +Roberto Tamassia, 2002. +''' + +__all__ = [ 'getprime', 'are_relatively_prime'] + +import rsa.randnum + +def gcd(p, q): + '''Returns the greatest common divisor of p and q + + >>> gcd(48, 180) + 12 + ''' + + while q != 0: + if p < q: (p,q) = (q,p) + (p,q) = (q, p % q) + return p + + +def jacobi(a, b): + '''Calculates the value of the Jacobi symbol (a/b) where both a and b are + positive integers, and b is odd + + :returns: -1, 0 or 1 + ''' + + assert a > 0 + assert b > 0 + + if a == 0: return 0 + result = 1 + while a > 1: + if a & 1: + if ((a-1)*(b-1) >> 2) & 1: + result = -result + a, b = b % a, a + else: + if (((b * b) - 1) >> 3) & 1: + result = -result + a >>= 1 + if a == 0: return 0 + return result + +def jacobi_witness(x, n): + '''Returns False if n is an Euler pseudo-prime with base x, and + True otherwise. + ''' + + j = jacobi(x, n) % n + + f = pow(x, n >> 1, n) + + if j == f: return False + return True + +def randomized_primality_testing(n, k): + '''Calculates whether n is composite (which is always correct) or + prime (which is incorrect with error probability 2**-k) + + Returns False if the number is composite, and True if it's + probably prime. + ''' + + # 50% of Jacobi-witnesses can report compositness of non-prime numbers + + # The implemented algorithm using the Jacobi witness function has error + # probability q <= 0.5, according to Goodrich et. al + # + # q = 0.5 + # t = int(math.ceil(k / log(1 / q, 2))) + # So t = k / log(2, 2) = k / 1 = k + # this means we can use range(k) rather than range(t) + + for _ in range(k): + x = rsa.randnum.randint(n-1) + if jacobi_witness(x, n): return False + + return True + +def is_prime(number): + '''Returns True if the number is prime, and False otherwise. + + >>> is_prime(42) + False + >>> is_prime(41) + True + ''' + + return randomized_primality_testing(number, 6) + +def getprime(nbits): + '''Returns a prime number that can be stored in 'nbits' bits. + + >>> p = getprime(128) + >>> is_prime(p-1) + False + >>> is_prime(p) + True + >>> is_prime(p+1) + False + + >>> from rsa import common + >>> common.bit_size(p) == 128 + True + + ''' + + while True: + integer = rsa.randnum.read_random_int(nbits) + + # Make sure it's odd + integer |= 1 + + # Test for primeness + if is_prime(integer): + return integer + + # Retry if not prime + + +def are_relatively_prime(a, b): + '''Returns True if a and b are relatively prime, and False if they + are not. + + >>> are_relatively_prime(2, 3) + 1 + >>> are_relatively_prime(2, 4) + 0 + ''' + + d = gcd(a, b) + return (d == 1) + +if __name__ == '__main__': + print('Running doctests 1000x or until failure') + import doctest + + for count in range(1000): + (failures, tests) = doctest.testmod() + if failures: + break + + if count and count % 100 == 0: + print('%i times' % count) + + print('Doctests done') diff --git a/python/rsa/rsa/randnum.py b/python/rsa/rsa/randnum.py new file mode 100644 index 000000000..0e782744c --- /dev/null +++ b/python/rsa/rsa/randnum.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Functions for generating random numbers.''' + +# Source inspired by code by Yesudeep Mangalapilly <yesudeep@gmail.com> + +import os + +from rsa import common, transform +from rsa._compat import byte + +def read_random_bits(nbits): + '''Reads 'nbits' random bits. + + If nbits isn't a whole number of bytes, an extra byte will be appended with + only the lower bits set. + ''' + + nbytes, rbits = divmod(nbits, 8) + + # Get the random bytes + randomdata = os.urandom(nbytes) + + # Add the remaining random bits + if rbits > 0: + randomvalue = ord(os.urandom(1)) + randomvalue >>= (8 - rbits) + randomdata = byte(randomvalue) + randomdata + + return randomdata + + +def read_random_int(nbits): + '''Reads a random integer of approximately nbits bits. + ''' + + randomdata = read_random_bits(nbits) + value = transform.bytes2int(randomdata) + + # Ensure that the number is large enough to just fill out the required + # number of bits. + value |= 1 << (nbits - 1) + + return value + +def randint(maxvalue): + '''Returns a random integer x with 1 <= x <= maxvalue + + May take a very long time in specific situations. If maxvalue needs N bits + to store, the closer maxvalue is to (2 ** N) - 1, the faster this function + is. + ''' + + bit_size = common.bit_size(maxvalue) + + tries = 0 + while True: + value = read_random_int(bit_size) + if value <= maxvalue: + break + + if tries and tries % 10 == 0: + # After a lot of tries to get the right number of bits but still + # smaller than maxvalue, decrease the number of bits by 1. That'll + # dramatically increase the chances to get a large enough number. + bit_size -= 1 + tries += 1 + + return value + + diff --git a/python/rsa/rsa/transform.py b/python/rsa/rsa/transform.py new file mode 100644 index 000000000..c740b2d27 --- /dev/null +++ b/python/rsa/rsa/transform.py @@ -0,0 +1,220 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Data transformation functions. + +From bytes to a number, number to bytes, etc. +''' + +from __future__ import absolute_import + +try: + # We'll use psyco if available on 32-bit architectures to speed up code. + # Using psyco (if available) cuts down the execution time on Python 2.5 + # at least by half. + import psyco + psyco.full() +except ImportError: + pass + +import binascii +from struct import pack +from rsa import common +from rsa._compat import is_integer, b, byte, get_word_alignment, ZERO_BYTE, EMPTY_BYTE + + +def bytes2int(raw_bytes): + r'''Converts a list of bytes or an 8-bit string to an integer. + + When using unicode strings, encode it to some encoding like UTF8 first. + + >>> (((128 * 256) + 64) * 256) + 15 + 8405007 + >>> bytes2int('\x80@\x0f') + 8405007 + + ''' + + return int(binascii.hexlify(raw_bytes), 16) + + +def _int2bytes(number, block_size=None): + r'''Converts a number to a string of bytes. + + Usage:: + + >>> _int2bytes(123456789) + '\x07[\xcd\x15' + >>> bytes2int(_int2bytes(123456789)) + 123456789 + + >>> _int2bytes(123456789, 6) + '\x00\x00\x07[\xcd\x15' + >>> bytes2int(_int2bytes(123456789, 128)) + 123456789 + + >>> _int2bytes(123456789, 3) + Traceback (most recent call last): + ... + OverflowError: Needed 4 bytes for number, but block size is 3 + + @param number: the number to convert + @param block_size: the number of bytes to output. If the number encoded to + bytes is less than this, the block will be zero-padded. When not given, + the returned block is not padded. + + @throws OverflowError when block_size is given and the number takes up more + bytes than fit into the block. + ''' + # Type checking + if not is_integer(number): + raise TypeError("You must pass an integer for 'number', not %s" % + number.__class__) + + if number < 0: + raise ValueError('Negative numbers cannot be used: %i' % number) + + # Do some bounds checking + if number == 0: + needed_bytes = 1 + raw_bytes = [ZERO_BYTE] + else: + needed_bytes = common.byte_size(number) + raw_bytes = [] + + # You cannot compare None > 0 in Python 3x. It will fail with a TypeError. + if block_size and block_size > 0: + if needed_bytes > block_size: + raise OverflowError('Needed %i bytes for number, but block size ' + 'is %i' % (needed_bytes, block_size)) + + # Convert the number to bytes. + while number > 0: + raw_bytes.insert(0, byte(number & 0xFF)) + number >>= 8 + + # Pad with zeroes to fill the block + if block_size and block_size > 0: + padding = (block_size - needed_bytes) * ZERO_BYTE + else: + padding = EMPTY_BYTE + + return padding + EMPTY_BYTE.join(raw_bytes) + + +def bytes_leading(raw_bytes, needle=ZERO_BYTE): + ''' + Finds the number of prefixed byte occurrences in the haystack. + + Useful when you want to deal with padding. + + :param raw_bytes: + Raw bytes. + :param needle: + The byte to count. Default \000. + :returns: + The number of leading needle bytes. + ''' + leading = 0 + # Indexing keeps compatibility between Python 2.x and Python 3.x + _byte = needle[0] + for x in raw_bytes: + if x == _byte: + leading += 1 + else: + break + return leading + + +def int2bytes(number, fill_size=None, chunk_size=None, overflow=False): + ''' + Convert an unsigned integer to bytes (base-256 representation):: + + Does not preserve leading zeros if you don't specify a chunk size or + fill size. + + .. NOTE: + You must not specify both fill_size and chunk_size. Only one + of them is allowed. + + :param number: + Integer value + :param fill_size: + If the optional fill size is given the length of the resulting + byte string is expected to be the fill size and will be padded + with prefix zero bytes to satisfy that length. + :param chunk_size: + If optional chunk size is given and greater than zero, pad the front of + the byte string with binary zeros so that the length is a multiple of + ``chunk_size``. + :param overflow: + ``False`` (default). If this is ``True``, no ``OverflowError`` + will be raised when the fill_size is shorter than the length + of the generated byte sequence. Instead the byte sequence will + be returned as is. + :returns: + Raw bytes (base-256 representation). + :raises: + ``OverflowError`` when fill_size is given and the number takes up more + bytes than fit into the block. This requires the ``overflow`` + argument to this function to be set to ``False`` otherwise, no + error will be raised. + ''' + if number < 0: + raise ValueError("Number must be an unsigned integer: %d" % number) + + if fill_size and chunk_size: + raise ValueError("You can either fill or pad chunks, but not both") + + # Ensure these are integers. + number & 1 + + raw_bytes = b('') + + # Pack the integer one machine word at a time into bytes. + num = number + word_bits, _, max_uint, pack_type = get_word_alignment(num) + pack_format = ">%s" % pack_type + while num > 0: + raw_bytes = pack(pack_format, num & max_uint) + raw_bytes + num >>= word_bits + # Obtain the index of the first non-zero byte. + zero_leading = bytes_leading(raw_bytes) + if number == 0: + raw_bytes = ZERO_BYTE + # De-padding. + raw_bytes = raw_bytes[zero_leading:] + + length = len(raw_bytes) + if fill_size and fill_size > 0: + if not overflow and length > fill_size: + raise OverflowError( + "Need %d bytes for number, but fill size is %d" % + (length, fill_size) + ) + raw_bytes = raw_bytes.rjust(fill_size, ZERO_BYTE) + elif chunk_size and chunk_size > 0: + remainder = length % chunk_size + if remainder: + padding_size = chunk_size - remainder + raw_bytes = raw_bytes.rjust(length + padding_size, ZERO_BYTE) + return raw_bytes + + +if __name__ == '__main__': + import doctest + doctest.testmod() + diff --git a/python/rsa/rsa/util.py b/python/rsa/rsa/util.py new file mode 100644 index 000000000..5bbb70be1 --- /dev/null +++ b/python/rsa/rsa/util.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''Utility functions.''' + +from __future__ import with_statement, print_function + +import sys +from optparse import OptionParser + +import rsa.key + +def private_to_public(): + '''Reads a private key and outputs the corresponding public key.''' + + # Parse the CLI options + parser = OptionParser(usage='usage: %prog [options]', + description='Reads a private key and outputs the ' + 'corresponding public key. Both private and public keys use ' + 'the format described in PKCS#1 v1.5') + + parser.add_option('-i', '--input', dest='infilename', type='string', + help='Input filename. Reads from stdin if not specified') + parser.add_option('-o', '--output', dest='outfilename', type='string', + help='Output filename. Writes to stdout of not specified') + + parser.add_option('--inform', dest='inform', + help='key format of input - default PEM', + choices=('PEM', 'DER'), default='PEM') + + parser.add_option('--outform', dest='outform', + help='key format of output - default PEM', + choices=('PEM', 'DER'), default='PEM') + + (cli, cli_args) = parser.parse_args(sys.argv) + + # Read the input data + if cli.infilename: + print('Reading private key from %s in %s format' % \ + (cli.infilename, cli.inform), file=sys.stderr) + with open(cli.infilename, 'rb') as infile: + in_data = infile.read() + else: + print('Reading private key from stdin in %s format' % cli.inform, + file=sys.stderr) + in_data = sys.stdin.read().encode('ascii') + + assert type(in_data) == bytes, type(in_data) + + + # Take the public fields and create a public key + priv_key = rsa.key.PrivateKey.load_pkcs1(in_data, cli.inform) + pub_key = rsa.key.PublicKey(priv_key.n, priv_key.e) + + # Save to the output file + out_data = pub_key.save_pkcs1(cli.outform) + + if cli.outfilename: + print('Writing public key to %s in %s format' % \ + (cli.outfilename, cli.outform), file=sys.stderr) + with open(cli.outfilename, 'wb') as outfile: + outfile.write(out_data) + else: + print('Writing public key to stdout in %s format' % cli.outform, + file=sys.stderr) + sys.stdout.write(out_data.decode('ascii')) + + diff --git a/python/rsa/rsa/varblock.py b/python/rsa/rsa/varblock.py new file mode 100644 index 000000000..c7d96ae6a --- /dev/null +++ b/python/rsa/rsa/varblock.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +'''VARBLOCK file support + +The VARBLOCK file format is as follows, where || denotes byte concatenation: + + FILE := VERSION || BLOCK || BLOCK ... + + BLOCK := LENGTH || DATA + + LENGTH := varint-encoded length of the subsequent data. Varint comes from + Google Protobuf, and encodes an integer into a variable number of bytes. + Each byte uses the 7 lowest bits to encode the value. The highest bit set + to 1 indicates the next byte is also part of the varint. The last byte will + have this bit set to 0. + +This file format is called the VARBLOCK format, in line with the varint format +used to denote the block sizes. + +''' + +from rsa._compat import byte, b + + +ZERO_BYTE = b('\x00') +VARBLOCK_VERSION = 1 + +def read_varint(infile): + '''Reads a varint from the file. + + When the first byte to be read indicates EOF, (0, 0) is returned. When an + EOF occurs when at least one byte has been read, an EOFError exception is + raised. + + @param infile: the file-like object to read from. It should have a read() + method. + @returns (varint, length), the read varint and the number of read bytes. + ''' + + varint = 0 + read_bytes = 0 + + while True: + char = infile.read(1) + if len(char) == 0: + if read_bytes == 0: + return (0, 0) + raise EOFError('EOF while reading varint, value is %i so far' % + varint) + + byte = ord(char) + varint += (byte & 0x7F) << (7 * read_bytes) + + read_bytes += 1 + + if not byte & 0x80: + return (varint, read_bytes) + + +def write_varint(outfile, value): + '''Writes a varint to a file. + + @param outfile: the file-like object to write to. It should have a write() + method. + @returns the number of written bytes. + ''' + + # there is a big difference between 'write the value 0' (this case) and + # 'there is nothing left to write' (the false-case of the while loop) + + if value == 0: + outfile.write(ZERO_BYTE) + return 1 + + written_bytes = 0 + while value > 0: + to_write = value & 0x7f + value = value >> 7 + + if value > 0: + to_write |= 0x80 + + outfile.write(byte(to_write)) + written_bytes += 1 + + return written_bytes + + +def yield_varblocks(infile): + '''Generator, yields each block in the input file. + + @param infile: file to read, is expected to have the VARBLOCK format as + described in the module's docstring. + @yields the contents of each block. + ''' + + # Check the version number + first_char = infile.read(1) + if len(first_char) == 0: + raise EOFError('Unable to read VARBLOCK version number') + + version = ord(first_char) + if version != VARBLOCK_VERSION: + raise ValueError('VARBLOCK version %i not supported' % version) + + while True: + (block_size, read_bytes) = read_varint(infile) + + # EOF at block boundary, that's fine. + if read_bytes == 0 and block_size == 0: + break + + block = infile.read(block_size) + + read_size = len(block) + if read_size != block_size: + raise EOFError('Block size is %i, but could read only %i bytes' % + (block_size, read_size)) + + yield block + + +def yield_fixedblocks(infile, blocksize): + '''Generator, yields each block of ``blocksize`` bytes in the input file. + + :param infile: file to read and separate in blocks. + :returns: a generator that yields the contents of each block + ''' + + while True: + block = infile.read(blocksize) + + read_bytes = len(block) + if read_bytes == 0: + break + + yield block + + if read_bytes < blocksize: + break + diff --git a/python/rsa/run_tests.py b/python/rsa/run_tests.py new file mode 100644 index 000000000..e0f249081 --- /dev/null +++ b/python/rsa/run_tests.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import sys +import unittest2 as unittest + +current_path = os.path.abspath(os.path.dirname(__file__)) +tests_path = os.path.join(current_path, 'tests') +sys.path[0:0] = [ + current_path, + tests_path, +] + +all_tests = [f[:-3] for f in os.listdir(tests_path) + if f.startswith('test_') and f.endswith(".py")] + +def get_suite(tests): + tests = sorted(tests) + suite = unittest.TestSuite() + loader = unittest.TestLoader() + for test in tests: + suite.addTest(loader.loadTestsFromName(test)) + return suite + +if __name__ == '__main__': + """ + To run all tests: + $ python run_tests.py + To run a single test: + $ python run_tests.py app + To run a couple of tests: + $ python run_tests.py app config sessions + To run code coverage: + $ coverage run run_tests.py + $ coverage report -m + """ + tests = sys.argv[1:] + if not tests: + tests = all_tests + tests = ['%s' % t for t in tests] + suite = get_suite(tests) + unittest.TextTestRunner(verbosity=1).run(suite) diff --git a/python/rsa/setup.cfg b/python/rsa/setup.cfg new file mode 100644 index 000000000..2675c2767 --- /dev/null +++ b/python/rsa/setup.cfg @@ -0,0 +1,8 @@ +[nosetests] +verbosity = 2 + +[egg_info] +tag_date = 0 +tag_build = +tag_svn_revision = 0 + diff --git a/python/rsa/setup.py b/python/rsa/setup.py new file mode 100755 index 000000000..8a2df8d1f --- /dev/null +++ b/python/rsa/setup.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python + +from setuptools import setup + +import rsa + +setup(name='rsa', + version=rsa.__version__, + description='Pure-Python RSA implementation', + author='Sybren A. Stuvel', + author_email='sybren@stuvel.eu', + maintainer='Sybren A. Stuvel', + maintainer_email='sybren@stuvel.eu', + url='http://stuvel.eu/rsa', + packages=['rsa'], + license='ASL 2', + classifiers=[ + 'Development Status :: 5 - Production/Stable', + 'Intended Audience :: Developers', + 'Intended Audience :: Education', + 'Intended Audience :: Information Technology', + 'License :: OSI Approved :: Apache Software License', + 'Operating System :: OS Independent', + 'Programming Language :: Python', + 'Topic :: Security :: Cryptography', + ], + install_requires=[ + 'pyasn1 >= 0.1.3', + ], + entry_points={ 'console_scripts': [ + 'pyrsa-priv2pub = rsa.util:private_to_public', + 'pyrsa-keygen = rsa.cli:keygen', + 'pyrsa-encrypt = rsa.cli:encrypt', + 'pyrsa-decrypt = rsa.cli:decrypt', + 'pyrsa-sign = rsa.cli:sign', + 'pyrsa-verify = rsa.cli:verify', + 'pyrsa-encrypt-bigfile = rsa.cli:encrypt_bigfile', + 'pyrsa-decrypt-bigfile = rsa.cli:decrypt_bigfile', + ]}, + +) diff --git a/python/rsa/tests/__init__.py b/python/rsa/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/python/rsa/tests/__init__.py diff --git a/python/rsa/tests/constants.py b/python/rsa/tests/constants.py new file mode 100644 index 000000000..6a0d08183 --- /dev/null +++ b/python/rsa/tests/constants.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from rsa._compat import have_python3 + +if have_python3: + from py3kconstants import * +else: + from py2kconstants import * + diff --git a/python/rsa/tests/py2kconstants.py b/python/rsa/tests/py2kconstants.py new file mode 100644 index 000000000..5f695dd22 --- /dev/null +++ b/python/rsa/tests/py2kconstants.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- + +unicode_string = u"Euro=\u20ac ABCDEFGHIJKLMNOPQRSTUVWXYZ" diff --git a/python/rsa/tests/py3kconstants.py b/python/rsa/tests/py3kconstants.py new file mode 100644 index 000000000..83b67129c --- /dev/null +++ b/python/rsa/tests/py3kconstants.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- + +unicode_string = "Euro=\u20ac ABCDEFGHIJKLMNOPQRSTUVWXYZ" diff --git a/python/rsa/tests/test_bigfile.py b/python/rsa/tests/test_bigfile.py new file mode 100644 index 000000000..86bcbbac6 --- /dev/null +++ b/python/rsa/tests/test_bigfile.py @@ -0,0 +1,60 @@ +'''Tests block operations.''' +from rsa._compat import b + +try: + from StringIO import StringIO as BytesIO +except ImportError: + from io import BytesIO +import unittest2 + +import rsa +from rsa import bigfile, varblock, pkcs1 + +class BigfileTest(unittest2.TestCase): + + def test_encrypt_decrypt_bigfile(self): + + # Expected block size + 11 bytes padding + pub_key, priv_key = rsa.newkeys((6 + 11) * 8) + + # Encrypt the file + message = b('123456Sybren') + infile = BytesIO(message) + outfile = BytesIO() + + bigfile.encrypt_bigfile(infile, outfile, pub_key) + + # Test + crypto = outfile.getvalue() + + cryptfile = BytesIO(crypto) + clearfile = BytesIO() + + bigfile.decrypt_bigfile(cryptfile, clearfile, priv_key) + self.assertEquals(clearfile.getvalue(), message) + + # We have 2x6 bytes in the message, so that should result in two + # bigfile. + cryptfile.seek(0) + varblocks = list(varblock.yield_varblocks(cryptfile)) + self.assertEqual(2, len(varblocks)) + + + def test_sign_verify_bigfile(self): + + # Large enough to store MD5-sum and ASN.1 code for MD5 + pub_key, priv_key = rsa.newkeys((34 + 11) * 8) + + # Sign the file + msgfile = BytesIO(b('123456Sybren')) + signature = pkcs1.sign(msgfile, priv_key, 'MD5') + + # Check the signature + msgfile.seek(0) + self.assertTrue(pkcs1.verify(msgfile, signature, pub_key)) + + # Alter the message, re-check + msgfile = BytesIO(b('123456sybren')) + self.assertRaises(pkcs1.VerificationError, + pkcs1.verify, msgfile, signature, pub_key) + diff --git a/python/rsa/tests/test_common.py b/python/rsa/tests/test_common.py new file mode 100644 index 000000000..d105dc020 --- /dev/null +++ b/python/rsa/tests/test_common.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import unittest2 +import struct +from rsa._compat import byte, b +from rsa.common import byte_size, bit_size, _bit_size + + +class Test_byte(unittest2.TestCase): + def test_values(self): + self.assertEqual(byte(0), b('\x00')) + self.assertEqual(byte(255), b('\xff')) + + def test_struct_error_when_out_of_bounds(self): + self.assertRaises(struct.error, byte, 256) + self.assertRaises(struct.error, byte, -1) + +class Test_byte_size(unittest2.TestCase): + def test_values(self): + self.assertEqual(byte_size(1 << 1023), 128) + self.assertEqual(byte_size((1 << 1024) - 1), 128) + self.assertEqual(byte_size(1 << 1024), 129) + self.assertEqual(byte_size(255), 1) + self.assertEqual(byte_size(256), 2) + self.assertEqual(byte_size(0xffff), 2) + self.assertEqual(byte_size(0xffffff), 3) + self.assertEqual(byte_size(0xffffffff), 4) + self.assertEqual(byte_size(0xffffffffff), 5) + self.assertEqual(byte_size(0xffffffffffff), 6) + self.assertEqual(byte_size(0xffffffffffffff), 7) + self.assertEqual(byte_size(0xffffffffffffffff), 8) + + def test_zero(self): + self.assertEqual(byte_size(0), 1) + + def test_bad_type(self): + self.assertRaises(TypeError, byte_size, []) + self.assertRaises(TypeError, byte_size, ()) + self.assertRaises(TypeError, byte_size, dict()) + self.assertRaises(TypeError, byte_size, "") + self.assertRaises(TypeError, byte_size, None) + +class Test_bit_size(unittest2.TestCase): + def test_zero(self): + self.assertEqual(bit_size(0), 0) + + def test_values(self): + self.assertEqual(bit_size(1023), 10) + self.assertEqual(bit_size(1024), 11) + self.assertEqual(bit_size(1025), 11) + self.assertEqual(bit_size(1 << 1024), 1025) + self.assertEqual(bit_size((1 << 1024) + 1), 1025) + self.assertEqual(bit_size((1 << 1024) - 1), 1024) + + self.assertEqual(_bit_size(1023), 10) + self.assertEqual(_bit_size(1024), 11) + self.assertEqual(_bit_size(1025), 11) + self.assertEqual(_bit_size(1 << 1024), 1025) + self.assertEqual(_bit_size((1 << 1024) + 1), 1025) + self.assertEqual(_bit_size((1 << 1024) - 1), 1024) diff --git a/python/rsa/tests/test_compat.py b/python/rsa/tests/test_compat.py new file mode 100644 index 000000000..3652c82d5 --- /dev/null +++ b/python/rsa/tests/test_compat.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +import unittest2 +import struct + +from rsa._compat import is_bytes, byte + +class Test_byte(unittest2.TestCase): + def test_byte(self): + for i in range(256): + byt = byte(i) + self.assertTrue(is_bytes(byt)) + self.assertEqual(ord(byt), i) + + def test_raises_StructError_on_overflow(self): + self.assertRaises(struct.error, byte, 256) + self.assertRaises(struct.error, byte, -1) diff --git a/python/rsa/tests/test_integers.py b/python/rsa/tests/test_integers.py new file mode 100644 index 000000000..0a712aa0f --- /dev/null +++ b/python/rsa/tests/test_integers.py @@ -0,0 +1,36 @@ +'''Tests integer operations.''' + +import unittest2 + +import rsa.core + +class IntegerTest(unittest2.TestCase): + + def setUp(self): + (self.pub, self.priv) = rsa.newkeys(64) + + def test_enc_dec(self): + + message = 42 + print("\tMessage: %d" % message) + + encrypted = rsa.core.encrypt_int(message, self.pub.e, self.pub.n) + print("\tEncrypted: %d" % encrypted) + + decrypted = rsa.core.decrypt_int(encrypted, self.priv.d, self.pub.n) + print("\tDecrypted: %d" % decrypted) + + self.assertEqual(message, decrypted) + + def test_sign_verify(self): + + message = 42 + + signed = rsa.core.encrypt_int(message,self.priv.d, self.pub.n) + print("\tSigned: %d" % signed) + + verified = rsa.core.decrypt_int(signed, self.pub.e,self.pub.n) + print("\tVerified: %d" % verified) + + self.assertEqual(message, verified) + diff --git a/python/rsa/tests/test_load_save_keys.py b/python/rsa/tests/test_load_save_keys.py new file mode 100644 index 000000000..fc1a1aaae --- /dev/null +++ b/python/rsa/tests/test_load_save_keys.py @@ -0,0 +1,127 @@ +'''Unittest for saving and loading keys.''' + +import base64 +import unittest2 +from rsa._compat import b + +import rsa.key + +B64PRIV_DER = b('MC4CAQACBQDeKYlRAgMBAAECBQDHn4npAgMA/icCAwDfxwIDANcXAgInbwIDAMZt') +PRIVATE_DER = base64.decodestring(B64PRIV_DER) + +B64PUB_DER = b('MAwCBQDeKYlRAgMBAAE=') +PUBLIC_DER = base64.decodestring(B64PUB_DER) + +PRIVATE_PEM = b(''' +-----BEGIN CONFUSING STUFF----- +Cruft before the key + +-----BEGIN RSA PRIVATE KEY----- +Comment: something blah + +%s +-----END RSA PRIVATE KEY----- + +Stuff after the key +-----END CONFUSING STUFF----- +''' % B64PRIV_DER.decode("utf-8")) + +CLEAN_PRIVATE_PEM = b('''\ +-----BEGIN RSA PRIVATE KEY----- +%s +-----END RSA PRIVATE KEY----- +''' % B64PRIV_DER.decode("utf-8")) + +PUBLIC_PEM = b(''' +-----BEGIN CONFUSING STUFF----- +Cruft before the key + +-----BEGIN RSA PUBLIC KEY----- +Comment: something blah + +%s +-----END RSA PUBLIC KEY----- + +Stuff after the key +-----END CONFUSING STUFF----- +''' % B64PUB_DER.decode("utf-8")) + +CLEAN_PUBLIC_PEM = b('''\ +-----BEGIN RSA PUBLIC KEY----- +%s +-----END RSA PUBLIC KEY----- +''' % B64PUB_DER.decode("utf-8")) + + +class DerTest(unittest2.TestCase): + '''Test saving and loading DER keys.''' + + def test_load_private_key(self): + '''Test loading private DER keys.''' + + key = rsa.key.PrivateKey.load_pkcs1(PRIVATE_DER, 'DER') + expected = rsa.key.PrivateKey(3727264081, 65537, 3349121513, 65063, 57287) + + self.assertEqual(expected, key) + + def test_save_private_key(self): + '''Test saving private DER keys.''' + + key = rsa.key.PrivateKey(3727264081, 65537, 3349121513, 65063, 57287) + der = key.save_pkcs1('DER') + + self.assertEqual(PRIVATE_DER, der) + + def test_load_public_key(self): + '''Test loading public DER keys.''' + + key = rsa.key.PublicKey.load_pkcs1(PUBLIC_DER, 'DER') + expected = rsa.key.PublicKey(3727264081, 65537) + + self.assertEqual(expected, key) + + def test_save_public_key(self): + '''Test saving public DER keys.''' + + key = rsa.key.PublicKey(3727264081, 65537) + der = key.save_pkcs1('DER') + + self.assertEqual(PUBLIC_DER, der) + +class PemTest(unittest2.TestCase): + '''Test saving and loading PEM keys.''' + + + def test_load_private_key(self): + '''Test loading private PEM files.''' + + key = rsa.key.PrivateKey.load_pkcs1(PRIVATE_PEM, 'PEM') + expected = rsa.key.PrivateKey(3727264081, 65537, 3349121513, 65063, 57287) + + self.assertEqual(expected, key) + + def test_save_private_key(self): + '''Test saving private PEM files.''' + + key = rsa.key.PrivateKey(3727264081, 65537, 3349121513, 65063, 57287) + pem = key.save_pkcs1('PEM') + + self.assertEqual(CLEAN_PRIVATE_PEM, pem) + + def test_load_public_key(self): + '''Test loading public PEM files.''' + + key = rsa.key.PublicKey.load_pkcs1(PUBLIC_PEM, 'PEM') + expected = rsa.key.PublicKey(3727264081, 65537) + + self.assertEqual(expected, key) + + def test_save_public_key(self): + '''Test saving public PEM files.''' + + key = rsa.key.PublicKey(3727264081, 65537) + pem = key.save_pkcs1('PEM') + + self.assertEqual(CLEAN_PUBLIC_PEM, pem) + + diff --git a/python/rsa/tests/test_pem.py b/python/rsa/tests/test_pem.py new file mode 100644 index 000000000..867f678a0 --- /dev/null +++ b/python/rsa/tests/test_pem.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + + +import unittest2 +from rsa._compat import b +from rsa.pem import _markers + + +class Test__markers(unittest2.TestCase): + def test_values(self): + self.assertEqual(_markers('RSA PRIVATE KEY'), + (b('-----BEGIN RSA PRIVATE KEY-----'), + b('-----END RSA PRIVATE KEY-----'))) diff --git a/python/rsa/tests/test_pkcs1.py b/python/rsa/tests/test_pkcs1.py new file mode 100644 index 000000000..d5882dfd1 --- /dev/null +++ b/python/rsa/tests/test_pkcs1.py @@ -0,0 +1,94 @@ +'''Tests string operations.''' + +import struct +import unittest2 + +import rsa +from rsa import pkcs1 +from rsa._compat import byte, is_integer, b, is_bytes + +class BinaryTest(unittest2.TestCase): + + def setUp(self): + (self.pub, self.priv) = rsa.newkeys(256) + + def test_enc_dec(self): + + message = struct.pack('>IIII', 0, 0, 0, 1) + print("\tMessage: %r" % message) + + encrypted = pkcs1.encrypt(message, self.pub) + print("\tEncrypted: %r" % encrypted) + + decrypted = pkcs1.decrypt(encrypted, self.priv) + print("\tDecrypted: %r" % decrypted) + + self.assertEqual(message, decrypted) + + def test_decoding_failure(self): + + message = struct.pack('>IIII', 0, 0, 0, 1) + encrypted = pkcs1.encrypt(message, self.pub) + + # Alter the encrypted stream + a = encrypted[5] + if is_bytes(a): + a = ord(a) + encrypted = encrypted[:5] + byte(a + 1) + encrypted[6:] + + self.assertRaises(pkcs1.DecryptionError, pkcs1.decrypt, encrypted, + self.priv) + + def test_randomness(self): + '''Encrypting the same message twice should result in different + cryptos. + ''' + + message = struct.pack('>IIII', 0, 0, 0, 1) + encrypted1 = pkcs1.encrypt(message, self.pub) + encrypted2 = pkcs1.encrypt(message, self.pub) + + self.assertNotEqual(encrypted1, encrypted2) + +class SignatureTest(unittest2.TestCase): + + def setUp(self): + (self.pub, self.priv) = rsa.newkeys(512) + + def test_sign_verify(self): + '''Test happy flow of sign and verify''' + + message = b('je moeder') + print("\tMessage: %r" % message) + + signature = pkcs1.sign(message, self.priv, 'SHA-256') + print("\tSignature: %r" % signature) + + self.assertTrue(pkcs1.verify(message, signature, self.pub)) + + def test_alter_message(self): + '''Altering the message should let the verification fail.''' + + signature = pkcs1.sign(b('je moeder'), self.priv, 'SHA-256') + self.assertRaises(pkcs1.VerificationError, pkcs1.verify, + b('mijn moeder'), signature, self.pub) + + def test_sign_different_key(self): + '''Signing with another key should let the verification fail.''' + + (otherpub, _) = rsa.newkeys(512) + + message = b('je moeder') + signature = pkcs1.sign(message, self.priv, 'SHA-256') + self.assertRaises(pkcs1.VerificationError, pkcs1.verify, + message, signature, otherpub) + + def test_multiple_signings(self): + '''Signing the same message twice should return the same signatures.''' + + message = struct.pack('>IIII', 0, 0, 0, 1) + signature1 = pkcs1.sign(message, self.priv, 'SHA-1') + signature2 = pkcs1.sign(message, self.priv, 'SHA-1') + + self.assertEqual(signature1, signature2) + diff --git a/python/rsa/tests/test_strings.py b/python/rsa/tests/test_strings.py new file mode 100644 index 000000000..4af06291d --- /dev/null +++ b/python/rsa/tests/test_strings.py @@ -0,0 +1,28 @@ +'''Tests string operations.''' + +from __future__ import absolute_import + +import unittest2 + +import rsa + +from constants import unicode_string + +class StringTest(unittest2.TestCase): + + def setUp(self): + (self.pub, self.priv) = rsa.newkeys(384) + + def test_enc_dec(self): + + message = unicode_string.encode('utf-8') + print("\tMessage: %s" % message) + + encrypted = rsa.encrypt(message, self.pub) + print("\tEncrypted: %s" % encrypted) + + decrypted = rsa.decrypt(encrypted, self.priv) + print("\tDecrypted: %s" % decrypted) + + self.assertEqual(message, decrypted) + diff --git a/python/rsa/tests/test_transform.py b/python/rsa/tests/test_transform.py new file mode 100644 index 000000000..ffd9ec892 --- /dev/null +++ b/python/rsa/tests/test_transform.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- + + +import unittest2 +from rsa._compat import b +from rsa.transform import int2bytes, bytes2int, _int2bytes + + +class Test_int2bytes(unittest2.TestCase): + def test_accuracy(self): + self.assertEqual(int2bytes(123456789), b('\x07[\xcd\x15')) + self.assertEqual(_int2bytes(123456789), b('\x07[\xcd\x15')) + + def test_codec_identity(self): + self.assertEqual(bytes2int(int2bytes(123456789, 128)), 123456789) + self.assertEqual(bytes2int(_int2bytes(123456789, 128)), 123456789) + + def test_chunk_size(self): + self.assertEqual(int2bytes(123456789, 6), b('\x00\x00\x07[\xcd\x15')) + self.assertEqual(int2bytes(123456789, 7), + b('\x00\x00\x00\x07[\xcd\x15')) + + self.assertEqual(_int2bytes(123456789, 6), + b('\x00\x00\x07[\xcd\x15')) + self.assertEqual(_int2bytes(123456789, 7), + b('\x00\x00\x00\x07[\xcd\x15')) + + def test_zero(self): + self.assertEqual(int2bytes(0, 4), b('\x00') * 4) + self.assertEqual(int2bytes(0, 7), b('\x00') * 7) + self.assertEqual(int2bytes(0), b('\x00')) + + self.assertEqual(_int2bytes(0, 4), b('\x00') * 4) + self.assertEqual(_int2bytes(0, 7), b('\x00') * 7) + self.assertEqual(_int2bytes(0), b('\x00')) + + def test_correctness_against_base_implementation(self): + # Slow test. + values = [ + 1 << 512, + 1 << 8192, + 1 << 77, + ] + for value in values: + self.assertEqual(int2bytes(value), _int2bytes(value), + "Boom %d" % value) + self.assertEqual(bytes2int(int2bytes(value)), + value, + "Boom %d" % value) + self.assertEqual(bytes2int(_int2bytes(value)), + value, + "Boom %d" % value) + + def test_raises_OverflowError_when_chunk_size_is_insufficient(self): + self.assertRaises(OverflowError, int2bytes, 123456789, 3) + self.assertRaises(OverflowError, int2bytes, 299999999999, 4) + + self.assertRaises(OverflowError, _int2bytes, 123456789, 3) + self.assertRaises(OverflowError, _int2bytes, 299999999999, 4) + + def test_raises_ValueError_when_negative_integer(self): + self.assertRaises(ValueError, int2bytes, -1) + self.assertRaises(ValueError, _int2bytes, -1) + + def test_raises_TypeError_when_not_integer(self): + self.assertRaises(TypeError, int2bytes, None) + self.assertRaises(TypeError, _int2bytes, None) diff --git a/python/rsa/tests/test_varblock.py b/python/rsa/tests/test_varblock.py new file mode 100644 index 000000000..24ea50f1f --- /dev/null +++ b/python/rsa/tests/test_varblock.py @@ -0,0 +1,82 @@ +'''Tests varblock operations.''' + + +try: + from StringIO import StringIO as BytesIO +except ImportError: + from io import BytesIO +import unittest + +import rsa +from rsa._compat import b +from rsa import varblock + +class VarintTest(unittest.TestCase): + + def test_read_varint(self): + + encoded = b('\xac\x02crummy') + infile = BytesIO(encoded) + + (decoded, read) = varblock.read_varint(infile) + + # Test the returned values + self.assertEqual(300, decoded) + self.assertEqual(2, read) + + # The rest of the file should be untouched + self.assertEqual(b('crummy'), infile.read()) + + def test_read_zero(self): + + encoded = b('\x00crummy') + infile = BytesIO(encoded) + + (decoded, read) = varblock.read_varint(infile) + + # Test the returned values + self.assertEqual(0, decoded) + self.assertEqual(1, read) + + # The rest of the file should be untouched + self.assertEqual(b('crummy'), infile.read()) + + def test_write_varint(self): + + expected = b('\xac\x02') + outfile = BytesIO() + + written = varblock.write_varint(outfile, 300) + + # Test the returned values + self.assertEqual(expected, outfile.getvalue()) + self.assertEqual(2, written) + + + def test_write_zero(self): + + outfile = BytesIO() + written = varblock.write_varint(outfile, 0) + + # Test the returned values + self.assertEqual(b('\x00'), outfile.getvalue()) + self.assertEqual(1, written) + + +class VarblockTest(unittest.TestCase): + + def test_yield_varblock(self): + infile = BytesIO(b('\x01\x0512345\x06Sybren')) + + varblocks = list(varblock.yield_varblocks(infile)) + self.assertEqual([b('12345'), b('Sybren')], varblocks) + +class FixedblockTest(unittest.TestCase): + + def test_yield_fixedblock(self): + + infile = BytesIO(b('123456Sybren')) + + fixedblocks = list(varblock.yield_fixedblocks(infile, 6)) + self.assertEqual([b('123456'), b('Sybren')], fixedblocks) + |