Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

format using black #24

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,38 @@ on:
pull_request:

jobs:
code-format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 10
- name: Verify git status
run: |
git status
git remote -v
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Display Python version
run: python -c "import sys; print(sys.version)"
- name: Display installed python package versions
run: |
pip list || :
- name: Install build dependencies
run: |
pip install -r requirements.txt
- name: Install black
run: |
pip install black==24.4.2
- name: Display installed python package versions
run: |
pip list || :
- name: Check formatting
run: |
black --check --line-length 79 .

test:
runs-on: ${{ matrix.os }}
strategy:
Expand Down
59 changes: 35 additions & 24 deletions aes256_ctr_drbg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@
from utils import xor_bytes
from Crypto.Cipher import AES


class AES256_CTR_DRBG:
def __init__(self, seed=None, personalization=b""):
self.seed_length = 48
self.reseed_interval = 2**48
self.key = bytes([0])*32
self.V = bytes([0])*16
self.key = bytes([0]) * 32
self.V = bytes([0]) * 16
self.entropy_input = self.__check_entropy_input(seed)

seed_material = self.__instantiate(personalization=personalization)
self.ctr_drbg_update(seed_material)
self.reseed_ctr = 1

def __check_entropy_input(self, entropy_input):
"""
If no entropy given, us os.urandom, else
Expand All @@ -22,76 +23,86 @@ def __check_entropy_input(self, entropy_input):
if entropy_input is None:
return os.urandom(self.seed_length)
elif len(entropy_input) != self.seed_length:
raise ValueError(f"The entropy input must be of length: {self.seed_length}. Input has length {len(entropy_input)}")
raise ValueError(
f"The entropy input must be of length: {self.seed_length}. "
f"Input has length {len(entropy_input)}"
)
return entropy_input

def __instantiate(self, personalization=b""):
"""
Combine the input seed and optional personalisation
string into the seed material for the DRBG
"""
if len(personalization) > self.seed_length:
raise ValueError(f"The Personalization String must be at most length: {self.seed_length}. Input has length {len(personalization)}")
raise ValueError(
f"The Personalization String must be at most length: "
f"{self.seed_length}. Input has length {len(personalization)}"
)
elif len(personalization) < self.seed_length:
personalization += bytes([0]) * (self.seed_length - len(personalization))
personalization += bytes([0]) * (
self.seed_length - len(personalization)
)
# debugging
assert len(personalization) == self.seed_length
return xor_bytes(self.entropy_input, personalization)

def __increment_counter(self):
int_V = int.from_bytes(self.V, 'big')
new_V = (int_V + 1) % 2**(8*16)
self.V = new_V.to_bytes(16, byteorder='big')
int_V = int.from_bytes(self.V, "big")
new_V = (int_V + 1) % 2 ** (8 * 16)
self.V = new_V.to_bytes(16, byteorder="big")

def ctr_drbg_update(self, provided_data):
tmp = b""
cipher = AES.new(self.key, AES.MODE_ECB)
# Collect bytes from AES ECB
while len(tmp) != self.seed_length:
self.__increment_counter()
tmp += cipher.encrypt(self.V)
tmp += cipher.encrypt(self.V)

# Take the first 48 bytes
tmp = tmp[:self.seed_length]
tmp = tmp[: self.seed_length]
tmp = xor_bytes(tmp, provided_data)

# Set the new values of key and V
self.key = tmp[:32]
self.V = tmp[32:]

def reseed(self, additional_information=b""):
"""
Reseed the DRBG for when reseed_ctr hits the
Reseed the DRBG for when reseed_ctr hits the
limit.
"""
seed_material = self.__instantiate(additional_information)
self.ctr_drbg_update(seed_material)
self.reseed_ctr = 1

def random_bytes(self, num_bytes, additional=None):
if self.reseed_ctr >= self.reseed_interval:
raise Warning("The DRBG has been exhausted! Reseed!")

# Set the optional additional information
if additional is None:
additional = bytes([0]) * self.seed_length
else:
if len(additional) > self.seed_length:
raise ValueError(f"The additional input must be of length at most: {self.seed_length}. Input has length {len(seed)}")
raise ValueError(
f"The additional input must be of length at most: "
f"{self.seed_length}. Input has length {len(seed)}"
)
elif len(additional) < self.seed_length:
additional += bytes([0]) * (self.seed_length - len(additional))
self.ctr_drbg_update(additional)

# Collect bytes!
tmp = b""
cipher = AES.new(self.key, AES.MODE_ECB)
while len(tmp) < num_bytes:
self.__increment_counter()
tmp += cipher.encrypt(self.V)

# Collect only the requested number of bits
output_bytes = tmp[:num_bytes]
self.ctr_drbg_update(additional)
self.reseed_ctr += 1
return output_bytes

63 changes: 42 additions & 21 deletions benchmark_kyber.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,76 @@
import cProfile
from time import time


def profile_kyber(Kyber):
pk, sk = Kyber.keygen()
c, key = Kyber.enc(pk)

gvars = {}
lvars = {"Kyber": Kyber, "c": c, "pk": pk, "sk": sk}

cProfile.runctx("[Kyber.keygen() for _ in range(100)]", globals=gvars, locals=lvars, sort=1)
cProfile.runctx("[Kyber.enc(pk) for _ in range(100)]", globals=gvars, locals=lvars, sort=1)
cProfile.runctx("[Kyber.dec(c, sk) for _ in range(100)]", globals=gvars, locals=lvars, sort=1)


cProfile.runctx(
"[Kyber.keygen() for _ in range(100)]",
globals=gvars,
locals=lvars,
sort=1,
)
cProfile.runctx(
"[Kyber.enc(pk) for _ in range(100)]",
globals=gvars,
locals=lvars,
sort=1,
)
cProfile.runctx(
"[Kyber.dec(c, sk) for _ in range(100)]",
globals=gvars,
locals=lvars,
sort=1,
)


def benchmark_kyber(Kyber, name, count):
keygen_times = []
enc_times = []
dec_times = []

for _ in range(count):
t0 = time()
pk, sk = Kyber.keygen()
keygen_times.append(time() - t0)

t1 = time()
c, key = Kyber.enc(pk)
enc_times.append(time() - t1)

t2 = time()
dec = Kyber.dec(c, sk)
dec_times.append(time() - t2)

avg_keygen = sum(keygen_times)/count
avg_enc = sum(enc_times)/count
avg_dec = sum(dec_times)/count
print(f" {name:11} |"
f"{avg_keygen*1000:8.2f}ms {1/avg_keygen:11.2f}"
f"{avg_enc*1000:8.2f}ms {1/avg_enc:10.2f}"
f"{avg_dec*1000:8.2f}ms {1/avg_dec:8.2f}")
avg_keygen = sum(keygen_times) / count
avg_enc = sum(enc_times) / count
avg_dec = sum(dec_times) / count
print(
f" {name:11} |"
f"{avg_keygen*1000:8.2f}ms {1/avg_keygen:11.2f}"
f"{avg_enc*1000:8.2f}ms {1/avg_enc:10.2f}"
f"{avg_dec*1000:8.2f}ms {1/avg_dec:8.2f}"
)


if __name__ == '__main__':
if __name__ == "__main__":
# profile_kyber(Kyber512)
# profile_kyber(Kyber768)
# profile_kyber(Kyber1024)

count = 1000
# common banner
print("-" * 80)
print(" Params | keygen | keygen/s | encap | encap/s "
"| decap | decap/s")
print(
" Params | keygen | keygen/s | encap | encap/s "
"| decap | decap/s"
)
print("-" * 80)
benchmark_kyber(Kyber512, "Kyber512", count)
benchmark_kyber(Kyber768, "Kyber768", count)
benchmark_kyber(Kyber768, "Kyber768", count)
benchmark_kyber(Kyber1024, "Kyber1024", count)
Loading
Loading