Skip to content

Commit

Permalink
Merge pull request #24 from tomato42/add-black2
Browse files Browse the repository at this point in the history
format using black
  • Loading branch information
GiacomoPope authored Jul 19, 2024
2 parents 4c246b5 + c8a0117 commit c90c505
Show file tree
Hide file tree
Showing 13 changed files with 525 additions and 324 deletions.
32 changes: 32 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,38 @@ on:
pull_request:

jobs:
code-format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 10
- name: Verify git status
run: |
git status
git remote -v
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Display Python version
run: python -c "import sys; print(sys.version)"
- name: Display installed python package versions
run: |
pip list || :
- name: Install build dependencies
run: |
pip install -r requirements.txt
- name: Install black
run: |
pip install black==24.4.2
- name: Display installed python package versions
run: |
pip list || :
- name: Check formatting
run: |
black --check --line-length 79 .
test:
runs-on: ${{ matrix.os }}
strategy:
Expand Down
59 changes: 35 additions & 24 deletions aes256_ctr_drbg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@
from utils import xor_bytes
from Crypto.Cipher import AES


class AES256_CTR_DRBG:
def __init__(self, seed=None, personalization=b""):
self.seed_length = 48
self.reseed_interval = 2**48
self.key = bytes([0])*32
self.V = bytes([0])*16
self.key = bytes([0]) * 32
self.V = bytes([0]) * 16
self.entropy_input = self.__check_entropy_input(seed)

seed_material = self.__instantiate(personalization=personalization)
self.ctr_drbg_update(seed_material)
self.reseed_ctr = 1

def __check_entropy_input(self, entropy_input):
"""
If no entropy given, us os.urandom, else
Expand All @@ -22,76 +23,86 @@ def __check_entropy_input(self, entropy_input):
if entropy_input is None:
return os.urandom(self.seed_length)
elif len(entropy_input) != self.seed_length:
raise ValueError(f"The entropy input must be of length: {self.seed_length}. Input has length {len(entropy_input)}")
raise ValueError(
f"The entropy input must be of length: {self.seed_length}. "
f"Input has length {len(entropy_input)}"
)
return entropy_input

def __instantiate(self, personalization=b""):
"""
Combine the input seed and optional personalisation
string into the seed material for the DRBG
"""
if len(personalization) > self.seed_length:
raise ValueError(f"The Personalization String must be at most length: {self.seed_length}. Input has length {len(personalization)}")
raise ValueError(
f"The Personalization String must be at most length: "
f"{self.seed_length}. Input has length {len(personalization)}"
)
elif len(personalization) < self.seed_length:
personalization += bytes([0]) * (self.seed_length - len(personalization))
personalization += bytes([0]) * (
self.seed_length - len(personalization)
)
# debugging
assert len(personalization) == self.seed_length
return xor_bytes(self.entropy_input, personalization)

def __increment_counter(self):
int_V = int.from_bytes(self.V, 'big')
new_V = (int_V + 1) % 2**(8*16)
self.V = new_V.to_bytes(16, byteorder='big')
int_V = int.from_bytes(self.V, "big")
new_V = (int_V + 1) % 2 ** (8 * 16)
self.V = new_V.to_bytes(16, byteorder="big")

def ctr_drbg_update(self, provided_data):
tmp = b""
cipher = AES.new(self.key, AES.MODE_ECB)
# Collect bytes from AES ECB
while len(tmp) != self.seed_length:
self.__increment_counter()
tmp += cipher.encrypt(self.V)
tmp += cipher.encrypt(self.V)

# Take the first 48 bytes
tmp = tmp[:self.seed_length]
tmp = tmp[: self.seed_length]
tmp = xor_bytes(tmp, provided_data)

# Set the new values of key and V
self.key = tmp[:32]
self.V = tmp[32:]

def reseed(self, additional_information=b""):
"""
Reseed the DRBG for when reseed_ctr hits the
Reseed the DRBG for when reseed_ctr hits the
limit.
"""
seed_material = self.__instantiate(additional_information)
self.ctr_drbg_update(seed_material)
self.reseed_ctr = 1

def random_bytes(self, num_bytes, additional=None):
if self.reseed_ctr >= self.reseed_interval:
raise Warning("The DRBG has been exhausted! Reseed!")

# Set the optional additional information
if additional is None:
additional = bytes([0]) * self.seed_length
else:
if len(additional) > self.seed_length:
raise ValueError(f"The additional input must be of length at most: {self.seed_length}. Input has length {len(seed)}")
raise ValueError(
f"The additional input must be of length at most: "
f"{self.seed_length}. Input has length {len(seed)}"
)
elif len(additional) < self.seed_length:
additional += bytes([0]) * (self.seed_length - len(additional))
self.ctr_drbg_update(additional)

# Collect bytes!
tmp = b""
cipher = AES.new(self.key, AES.MODE_ECB)
while len(tmp) < num_bytes:
self.__increment_counter()
tmp += cipher.encrypt(self.V)

# Collect only the requested number of bits
output_bytes = tmp[:num_bytes]
self.ctr_drbg_update(additional)
self.reseed_ctr += 1
return output_bytes

63 changes: 42 additions & 21 deletions benchmark_kyber.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,76 @@
import cProfile
from time import time


def profile_kyber(Kyber):
pk, sk = Kyber.keygen()
c, key = Kyber.enc(pk)

gvars = {}
lvars = {"Kyber": Kyber, "c": c, "pk": pk, "sk": sk}

cProfile.runctx("[Kyber.keygen() for _ in range(100)]", globals=gvars, locals=lvars, sort=1)
cProfile.runctx("[Kyber.enc(pk) for _ in range(100)]", globals=gvars, locals=lvars, sort=1)
cProfile.runctx("[Kyber.dec(c, sk) for _ in range(100)]", globals=gvars, locals=lvars, sort=1)


cProfile.runctx(
"[Kyber.keygen() for _ in range(100)]",
globals=gvars,
locals=lvars,
sort=1,
)
cProfile.runctx(
"[Kyber.enc(pk) for _ in range(100)]",
globals=gvars,
locals=lvars,
sort=1,
)
cProfile.runctx(
"[Kyber.dec(c, sk) for _ in range(100)]",
globals=gvars,
locals=lvars,
sort=1,
)


def benchmark_kyber(Kyber, name, count):
keygen_times = []
enc_times = []
dec_times = []

for _ in range(count):
t0 = time()
pk, sk = Kyber.keygen()
keygen_times.append(time() - t0)

t1 = time()
c, key = Kyber.enc(pk)
enc_times.append(time() - t1)

t2 = time()
dec = Kyber.dec(c, sk)
dec_times.append(time() - t2)

avg_keygen = sum(keygen_times)/count
avg_enc = sum(enc_times)/count
avg_dec = sum(dec_times)/count
print(f" {name:11} |"
f"{avg_keygen*1000:8.2f}ms {1/avg_keygen:11.2f}"
f"{avg_enc*1000:8.2f}ms {1/avg_enc:10.2f}"
f"{avg_dec*1000:8.2f}ms {1/avg_dec:8.2f}")
avg_keygen = sum(keygen_times) / count
avg_enc = sum(enc_times) / count
avg_dec = sum(dec_times) / count
print(
f" {name:11} |"
f"{avg_keygen*1000:8.2f}ms {1/avg_keygen:11.2f}"
f"{avg_enc*1000:8.2f}ms {1/avg_enc:10.2f}"
f"{avg_dec*1000:8.2f}ms {1/avg_dec:8.2f}"
)


if __name__ == '__main__':
if __name__ == "__main__":
# profile_kyber(Kyber512)
# profile_kyber(Kyber768)
# profile_kyber(Kyber1024)

count = 1000
# common banner
print("-" * 80)
print(" Params | keygen | keygen/s | encap | encap/s "
"| decap | decap/s")
print(
" Params | keygen | keygen/s | encap | encap/s "
"| decap | decap/s"
)
print("-" * 80)
benchmark_kyber(Kyber512, "Kyber512", count)
benchmark_kyber(Kyber768, "Kyber768", count)
benchmark_kyber(Kyber768, "Kyber768", count)
benchmark_kyber(Kyber1024, "Kyber1024", count)
Loading

0 comments on commit c90c505

Please sign in to comment.