Skip to content

Commit

Permalink
RSA PKCS 1.5 SHA1 signing support and der helper functions (#609)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Graeb <[email protected]>
  • Loading branch information
DmitriyMusatkin and graebm authored Nov 19, 2024
1 parent 084e141 commit e856566
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 8 deletions.
25 changes: 24 additions & 1 deletion awscrt/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ class RSASignatureAlgorithm(IntEnum):
PKCSv1.5 padding with sha256 hash function
"""

PSS_SHA256 = 1
PKCS1_5_SHA1 = 1
"""
PKCSv1.5 padding with sha1 hash function
"""

PSS_SHA256 = 2
"""
PSS padding with sha256 hash function
"""
Expand All @@ -118,6 +123,24 @@ def new_public_key_from_pem_data(pem_data: Union[str, bytes, bytearray, memoryvi
"""
return RSA(binding=_awscrt.rsa_public_key_from_pem_data(pem_data))

@staticmethod
def new_private_key_from_der_data(der_data: Union[bytes, bytearray, memoryview]) -> 'RSA':
"""
Creates a new instance of private RSA key pair from der data.
Expects key in PKCS1 format.
Raises ValueError if pem does not have private key object.
"""
return RSA(binding=_awscrt.rsa_private_key_from_der_data(der_data))

@staticmethod
def new_public_key_from_der_data(der_data: Union[bytes, bytearray, memoryview]) -> 'RSA':
"""
Creates a new instance of public RSA key pair from der data.
Expects key in PKCS1 format.
Raises ValueError if pem does not have public key object.
"""
return RSA(binding=_awscrt.rsa_public_key_from_der_data(der_data))

def encrypt(self, encryption_algorithm: RSAEncryptionAlgorithm,
plaintext: Union[bytes, bytearray, memoryview]) -> bytes:
"""
Expand Down
57 changes: 57 additions & 0 deletions source/crypto.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "aws/cal/hash.h"
#include "aws/cal/hmac.h"
#include "aws/cal/rsa.h"
#include "aws/common/encoding.h"
#include "aws/io/pem.h"

const char *s_capsule_name_hash = "aws_hash";
Expand Down Expand Up @@ -350,6 +351,62 @@ PyObject *aws_py_rsa_public_key_from_pem_data(PyObject *self, PyObject *args) {
return capsule;
}

PyObject *aws_py_rsa_private_key_from_der_data(PyObject *self, PyObject *args) {
(void)self;

struct aws_byte_cursor der_data_cur;
if (!PyArg_ParseTuple(args, "y#", &der_data_cur.ptr, &der_data_cur.len)) {
return NULL;
}

PyObject *capsule = NULL;
struct aws_allocator *allocator = aws_py_get_allocator();

struct aws_rsa_key_pair *key_pair = aws_rsa_key_pair_new_from_private_key_pkcs1(allocator, der_data_cur);

if (key_pair == NULL) {
PyErr_AwsLastError();
goto on_done;
}

capsule = PyCapsule_New(key_pair, s_capsule_name_rsa, s_rsa_destructor);

if (capsule == NULL) {
aws_rsa_key_pair_release(key_pair);
}

on_done:
return capsule;
}

PyObject *aws_py_rsa_public_key_from_der_data(PyObject *self, PyObject *args) {
(void)self;

struct aws_byte_cursor der_data_cur;
if (!PyArg_ParseTuple(args, "y#", &der_data_cur.ptr, &der_data_cur.len)) {
return NULL;
}

PyObject *capsule = NULL;
struct aws_allocator *allocator = aws_py_get_allocator();

struct aws_rsa_key_pair *key_pair = aws_rsa_key_pair_new_from_public_key_pkcs1(allocator, der_data_cur);

if (key_pair == NULL) {
PyErr_AwsLastError();
goto on_done;
}

capsule = PyCapsule_New(key_pair, s_capsule_name_rsa, s_rsa_destructor);

if (capsule == NULL) {
aws_rsa_key_pair_release(key_pair);
}

on_done:
return capsule;
}

PyObject *aws_py_rsa_encrypt(PyObject *self, PyObject *args) {
(void)self;

Expand Down
3 changes: 3 additions & 0 deletions source/crypto.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ PyObject *aws_py_sha256_hmac_compute(PyObject *self, PyObject *args);
PyObject *aws_py_rsa_private_key_from_pem_data(PyObject *self, PyObject *args);
PyObject *aws_py_rsa_public_key_from_pem_data(PyObject *self, PyObject *args);

PyObject *aws_py_rsa_private_key_from_der_data(PyObject *self, PyObject *args);
PyObject *aws_py_rsa_public_key_from_der_data(PyObject *self, PyObject *args);

PyObject *aws_py_rsa_encrypt(PyObject *self, PyObject *args);
PyObject *aws_py_rsa_decrypt(PyObject *self, PyObject *args);
PyObject *aws_py_rsa_sign(PyObject *self, PyObject *args);
Expand Down
2 changes: 2 additions & 0 deletions source/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,8 @@ static PyMethodDef s_module_methods[] = {
/* RSA crypto primitives */
AWS_PY_METHOD_DEF(rsa_private_key_from_pem_data, METH_VARARGS),
AWS_PY_METHOD_DEF(rsa_public_key_from_pem_data, METH_VARARGS),
AWS_PY_METHOD_DEF(rsa_private_key_from_der_data, METH_VARARGS),
AWS_PY_METHOD_DEF(rsa_public_key_from_der_data, METH_VARARGS),
AWS_PY_METHOD_DEF(rsa_encrypt, METH_VARARGS),
AWS_PY_METHOD_DEF(rsa_decrypt, METH_VARARGS),
AWS_PY_METHOD_DEF(rsa_sign, METH_VARARGS),
Expand Down
95 changes: 89 additions & 6 deletions test/test_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from test import NativeResourceTest
from awscrt.crypto import Hash, RSA, RSAEncryptionAlgorithm, RSASignatureAlgorithm
import base64
import unittest

RSA_PRIVATE_KEY_PEM = """
Expand Down Expand Up @@ -47,6 +48,41 @@
-----END RSA PUBLIC KEY-----
"""

RSA_PUBLIC_KEY_DER_BASE64 = (
'MIIBCgKCAQEAxaEsLWE2t3kJqsF1sFHYk7rSCGfGTSDa+3r5typT0cb/TtJ989C8'
'dLcfInx4Dxq0ewo6NOxQ/TD8JevUda86jSh1UKEQUOl7qy+QwOhFMpwHq/uOgMy5'
'khDDLlkxD5U32RrDfqLK+4WUDapHlQ6g+E6wS1j1yDRoTZJk3WnTpR0sJHsttLWV'
'+mb2wPC7TkhGMbFMzbt6v0ahF7abVOOGiHVZ77uhS66hgP9nfgMHug8EN/xmVc/T'
'xgMJci1Irh66xVZQ9aT2OZwb0TXglULm+b8HM+GKHgoTMwr9gAGpFDoYi22PvxC/'
'cqKHKIaYw7KNOPwImzQ6cp5oQJTAPQKRUwIDAQAB')

RSA_PRIVATE_KEY_DER_BASE64 = (
'MIIEowIBAAKCAQEAxaEsLWE2t3kJqsF1sFHYk7rSCGfGTSDa+3r5typT0cb/TtJ9'
'89C8dLcfInx4Dxq0ewo6NOxQ/TD8JevUda86jSh1UKEQUOl7qy+QwOhFMpwHq/uO'
'gMy5khDDLlkxD5U32RrDfqLK+4WUDapHlQ6g+E6wS1j1yDRoTZJk3WnTpR0sJHst'
'tLWV+mb2wPC7TkhGMbFMzbt6v0ahF7abVOOGiHVZ77uhS66hgP9nfgMHug8EN/xm'
'Vc/TxgMJci1Irh66xVZQ9aT2OZwb0TXglULm+b8HM+GKHgoTMwr9gAGpFDoYi22P'
'vxC/cqKHKIaYw7KNOPwImzQ6cp5oQJTAPQKRUwIDAQABAoIBACcuUfTZPiDX1UvO'
'OQfw4hA/zJ4v/MeTyPZspg9jS+TeIAW/g4sQChzVpU2QAbl04O031NxjMZdQ29yk'
'yaVfTStpJwEKPZLdB1CkCH3GTtm+x2KYZ+MvM2c6/Yc11Z0yRzU6siFsIvQEwpqG'
'9NQfZ1hzOU5m36uGgFtIt8iRz4z/RxpZUOXpaEosb0uMK3VPBuZBu8uVQBFdyAA7'
'xAGtJphxQ5u0Ct9aidPjD7MhCVzcb2XbgCgxb2hbCmDMOgeNVYrTo2fdBzNxLcXv'
'j4sUNmO+mLbUMFOePuP8JZaGNTTmznZkavskozfdbubuS3/4/0HH1goytFheVt1B'
'vfxzpgkCgYEA9QgEMKny0knDHV7BC2uAd7Vvd+5iikA3WdJ9i11zas9AbMMmf9cX'
'E3xNt6DO42hnVCNN4uAWH5uGWltWZ8pmGKk6mesqZfYPsyTz1cK6fP6KyQrkWRNT'
'V3nRMEMbziAWxFD5hxP9p1KlqI2Py+W4fJ0LGZ4Mwvn3dKYOilxK+50CgYEAznny'
'ZxQiJGt8/FtH9f/GDIY24Cz53Cuj+BWG2EH4kLo24ET2QTVvohFJVCm3Hf8Qe4cA'
'ASabRUg1vS4Tr2FmIqD2Iw/ogSmDcJdYuwhdtWKa8fDbehCN5hmXjn2WKYvjvZNv'
'Gcx6gfqULD9SaQv+N7lL8eJxKiLLBeVYD7qoha8CgYA8udnf/Z5yQ1mZw8vv+pqC'
'EHMps+iz/qo5FpOKoIRkKiz7R3oZIMNVTu8r3Syo600Aayd4XLTe7HplllFZs62N'
'2xLs5n1Be7P0X+oWRgZVx/e5T3u8H6/98/DGFzui4A0EZlURBwFMII1xsnO6wpnw'
'ODNyC9t5zt1nCWh9HdZveQKBgAm4+E8eRZVNcm83pSXSS3Mfhsn7lDBn5aqy6Mya'
'HqhB/H+G/8mGSKFrCvbpl/PTpOUMMFXdiYYzpkQoPUkO3w5WYgC4qQwb9lKA7e6w'
'sCjwYbduzgbrbKMfJWHSTBXcvnaY0Kx4UnR4Zi3HNYw4wlnBYfAb55RCWykF6aWj'
'9neFAoGBAMqQA2YWCHhnRtjn4iGMrTk8iOHBd8AGBBzX9rPKXDqWlOr/iQq90qX0'
'59309stR/bAhMzxOx31777XEPO1md854iXXr0XDMQlwCYkWyWb6hp4JlsqFBPMjn'
'nGXWA0Gp6UWgpg4Hvjdsu+0FQ3AhDMBKZZ8fBFb4EW+HRQIHPnbH')


class TestCredentials(NativeResourceTest):

Expand Down Expand Up @@ -134,21 +170,68 @@ def test_rsa_encryption_roundtrip(self):
pt_pub = rsa.decrypt(p, ct_pub)
self.assertEqual(test_pt, pt_pub)

def test_rsa_signing_roundtrip(self):
h = Hash.sha256_new()
h.update(b'totally original test string')
digest = h.digest()
def test_rsa_encryption_roundtrip_der(self):
param_list = [RSAEncryptionAlgorithm.PKCS1_5,
RSAEncryptionAlgorithm.OAEP_SHA256,
RSAEncryptionAlgorithm.OAEP_SHA512]

for p in param_list:
with self.subTest(msg="RSA Encryption Roundtrip using algo p", p=p):
test_pt = b'totally original test string'
private_key_der_bytes = base64.b64decode(RSA_PRIVATE_KEY_DER_BASE64)
rsa = RSA.new_private_key_from_der_data(private_key_der_bytes)
ct = rsa.encrypt(p, test_pt)
pt = rsa.decrypt(p, ct)
self.assertEqual(test_pt, pt)

public_key_der_bytes = base64.b64decode(RSA_PUBLIC_KEY_DER_BASE64)
rsa_pub = RSA.new_public_key_from_der_data(public_key_der_bytes)
ct_pub = rsa_pub.encrypt(p, test_pt)
pt_pub = rsa.decrypt(p, ct_pub)
self.assertEqual(test_pt, pt_pub)

def test_rsa_signing_roundtrip(self):
param_list = [RSASignatureAlgorithm.PKCS1_5_SHA256,
RSASignatureAlgorithm.PSS_SHA256]
RSASignatureAlgorithm.PSS_SHA256,
RSASignatureAlgorithm.PKCS1_5_SHA1]

for p in param_list:
with self.subTest(msg="RSA Signing Roundtrip using algo p", p=p):
if (p == RSASignatureAlgorithm.PKCS1_5_SHA1):
h = Hash.sha1_new()
else:
h = Hash.sha256_new()
h.update(b'totally original test string')
digest = h.digest()

rsa = RSA.new_private_key_from_pem_data(RSA_PRIVATE_KEY_PEM)
signature = rsa.sign(p, digest)
self.assertTrue(rsa.verify(p, digest, signature))

rsa_pub = RSA.new_private_key_from_pem_data(RSA_PRIVATE_KEY_PEM)
rsa_pub = RSA.new_public_key_from_pem_data(RSA_PUBLIC_KEY_PEM)
self.assertTrue(rsa_pub.verify(p, digest, signature))

def test_rsa_signing_roundtrip_der(self):
param_list = [RSASignatureAlgorithm.PKCS1_5_SHA256,
RSASignatureAlgorithm.PSS_SHA256,
RSASignatureAlgorithm.PKCS1_5_SHA1]

for p in param_list:
with self.subTest(msg="RSA Signing Roundtrip using algo p", p=p):
if (p == RSASignatureAlgorithm.PKCS1_5_SHA1):
h = Hash.sha1_new()
else:
h = Hash.sha256_new()
h.update(b'totally original test string')
digest = h.digest()

private_key_der_bytes = base64.b64decode(RSA_PRIVATE_KEY_DER_BASE64)
rsa = RSA.new_private_key_from_der_data(private_key_der_bytes)
signature = rsa.sign(p, digest)
self.assertTrue(rsa.verify(p, digest, signature))

public_key_der_bytes = base64.b64decode(RSA_PUBLIC_KEY_DER_BASE64)
rsa_pub = RSA.new_public_key_from_der_data(public_key_der_bytes)
self.assertTrue(rsa_pub.verify(p, digest, signature))

def test_rsa_load_error(self):
Expand Down

0 comments on commit e856566

Please sign in to comment.