From da1a37f65c0ac817511372e1fb97ec504970f436 Mon Sep 17 00:00:00 2001 From: Micah Lee Date: Mon, 26 Aug 2024 12:34:06 -0700 Subject: [PATCH 1/9] Temporarily store encrypted message content in session when the CAPTCHA is failed, to repopulate the form on the next load --- hushline/routes.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/hushline/routes.py b/hushline/routes.py index 73aaf6d0..3434a5ca 100644 --- a/hushline/routes.py +++ b/hushline/routes.py @@ -21,7 +21,7 @@ from wtforms import Field, Form, PasswordField, StringField, TextAreaField from wtforms.validators import DataRequired, Length, Optional, ValidationError -from .crypto import encrypt_message +from .crypto import decrypt_field, encrypt_field, encrypt_message from .db import db from .forms import ComplexPassword from .model import AuthenticationLog, InviteCode, Message, SMTPEncryption, User @@ -118,6 +118,14 @@ def profile(username: str) -> Response | str: flash("🫥 User not found.") return redirect(url_for("index")) + # If the encrypted message is stored in the session, use it to populate the form + if "submit_contact_method" in session: + form.contact_method.data = decrypt_field(session["submit_contact_method"]) + session.pop("submit_contact_method", None) + if "submit_content" in session: + form.content.data = decrypt_field(session["submit_content"]) + session.pop("submit_content", None) + # Generate a simple math problem using secrets module (e.g., "What is 6 + 7?") num1 = secrets.randbelow(10) + 1 # To get a number between 1 and 10 num2 = secrets.randbelow(10) + 1 # To get a number between 1 and 10 @@ -154,6 +162,10 @@ def submit_message(username: str) -> Response | str: captcha_answer = request.form.get("captcha_answer", "") if not validate_captcha(captcha_answer): + # Encrypt the message and store it in the session + session["submit_contact_method"] = encrypt_field(form.contact_method.data) + session["submit_content"] = encrypt_field(form.content.data) + return redirect(url_for("profile", username=username)) content = form.content.data From e7b0df90d3006a362f2396364bca54cdd7eaff2d Mon Sep 17 00:00:00 2001 From: Micah Lee Date: Mon, 26 Aug 2024 12:39:34 -0700 Subject: [PATCH 2/9] Wrap the decrypt_field calls in try/except block, so that if the decryption fails for any reason it just simply doesn't re-popular that field --- hushline/routes.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/hushline/routes.py b/hushline/routes.py index 3434a5ca..2a551185 100644 --- a/hushline/routes.py +++ b/hushline/routes.py @@ -120,10 +120,16 @@ def profile(username: str) -> Response | str: # If the encrypted message is stored in the session, use it to populate the form if "submit_contact_method" in session: - form.contact_method.data = decrypt_field(session["submit_contact_method"]) + try: + form.contact_method.data = decrypt_field(session["submit_contact_method"]) + except Exception: + app.logger.error("Error decrypting contact method", exc_info=True) session.pop("submit_contact_method", None) if "submit_content" in session: - form.content.data = decrypt_field(session["submit_content"]) + try: + form.content.data = decrypt_field(session["submit_content"]) + except Exception: + app.logger.error("Error decrypting content", exc_info=True) session.pop("submit_content", None) # Generate a simple math problem using secrets module (e.g., "What is 6 + 7?") From cdc689063548451cbab03ee620a8607029849a08 Mon Sep 17 00:00:00 2001 From: Micah Lee Date: Tue, 27 Aug 2024 12:20:01 -0700 Subject: [PATCH 3/9] Add support for a scope to the encrypt_field and decrypt_field functions. If scope is None, it uses the encryption key as-is. If a scope is passed in, it derives a new encryption key based on the original key and that scope. --- hushline/crypto.py | 34 ++++++++++++++++++++++++++++++---- hushline/routes.py | 12 ++++++++---- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/hushline/crypto.py b/hushline/crypto.py index 9d25a603..ea7d2c6c 100644 --- a/hushline/crypto.py +++ b/hushline/crypto.py @@ -1,21 +1,45 @@ import os +from base64 import b64encode +from hashlib import shake_256 from cryptography.fernet import Fernet from flask import current_app from pysequoia import Cert, encrypt + +def get_encryption_key(scope: bytes | str | None = None) -> Fernet: + encryption_key = os.environ.get("ENCRYPTION_KEY") + if encryption_key is None: + raise ValueError("Encryption key not found. Please check your .env file.") + + # If a scope is provided, we will use it to derive a unique encryption key + if scope is not None: + if isinstance(scope, str): + scope_bytes = scope.encode() + elif isinstance(scope, bytes): + scope_bytes = scope + + # Use SHAKE-256 to derive a unique encryption key based on the scope + shake = shake_256() + shake.update(encryption_key.encode()) + shake.update(scope_bytes) + encryption_key = b64encode(shake.digest(32)).decode() + + return Fernet(encryption_key) + + encryption_key = os.environ.get("ENCRYPTION_KEY") if encryption_key is None: raise ValueError("Encryption key not found. Please check your .env file.") -fernet = Fernet(encryption_key) - -def encrypt_field(data: bytes | str | None) -> str | None: +def encrypt_field(data: bytes | str | None, scope: bytes | str | None = None) -> str | None: if data is None: return None + fernet = get_encryption_key(scope) + # Check if data is already a bytes object if not isinstance(data, bytes): # If data is a string, encode it to bytes @@ -26,9 +50,11 @@ def encrypt_field(data: bytes | str | None) -> str | None: return fernet.encrypt_at_time(data, current_time=0).decode() -def decrypt_field(data: str | None) -> str | None: +def decrypt_field(data: str | None, scope: bytes | str | None = None) -> str | None: if data is None: return None + + fernet = get_encryption_key(scope) return fernet.decrypt(data.encode()).decode() diff --git a/hushline/routes.py b/hushline/routes.py index 2a551185..face4cd4 100644 --- a/hushline/routes.py +++ b/hushline/routes.py @@ -121,13 +121,15 @@ def profile(username: str) -> Response | str: # If the encrypted message is stored in the session, use it to populate the form if "submit_contact_method" in session: try: - form.contact_method.data = decrypt_field(session["submit_contact_method"]) + form.contact_method.data = decrypt_field( + session["submit_contact_method"], "submit_message" + ) except Exception: app.logger.error("Error decrypting contact method", exc_info=True) session.pop("submit_contact_method", None) if "submit_content" in session: try: - form.content.data = decrypt_field(session["submit_content"]) + form.content.data = decrypt_field(session["submit_content"], "submit_message") except Exception: app.logger.error("Error decrypting content", exc_info=True) session.pop("submit_content", None) @@ -169,8 +171,10 @@ def submit_message(username: str) -> Response | str: captcha_answer = request.form.get("captcha_answer", "") if not validate_captcha(captcha_answer): # Encrypt the message and store it in the session - session["submit_contact_method"] = encrypt_field(form.contact_method.data) - session["submit_content"] = encrypt_field(form.content.data) + session["submit_contact_method"] = encrypt_field( + form.contact_method.data, "submit_message" + ) + session["submit_content"] = encrypt_field(form.content.data, "submit_message") return redirect(url_for("profile", username=username)) From f56e3df702f785850e2e806a620158b381873cbc Mon Sep 17 00:00:00 2001 From: Micah Lee Date: Tue, 27 Aug 2024 14:01:57 -0700 Subject: [PATCH 4/9] Successfully base64 decode the original encryption key before using it to derive a new one --- hushline/crypto.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/hushline/crypto.py b/hushline/crypto.py index ea7d2c6c..9ec366a6 100644 --- a/hushline/crypto.py +++ b/hushline/crypto.py @@ -1,5 +1,5 @@ import os -from base64 import b64encode +from base64 import urlsafe_b64decode, urlsafe_b64encode from hashlib import shake_256 from cryptography.fernet import Fernet @@ -19,11 +19,13 @@ def get_encryption_key(scope: bytes | str | None = None) -> Fernet: elif isinstance(scope, bytes): scope_bytes = scope + encryption_key_bytes = urlsafe_b64decode(encryption_key) + # Use SHAKE-256 to derive a unique encryption key based on the scope shake = shake_256() - shake.update(encryption_key.encode()) + shake.update(encryption_key_bytes) shake.update(scope_bytes) - encryption_key = b64encode(shake.digest(32)).decode() + encryption_key = urlsafe_b64encode(shake.digest(32)).decode() return Fernet(encryption_key) From bc8199d363f3914c506d2df0b263d6de1ada56e9 Mon Sep 17 00:00:00 2001 From: Micah Lee Date: Tue, 27 Aug 2024 14:31:49 -0700 Subject: [PATCH 5/9] Switch KDF to Scrypt, and include a salt in the session --- hushline/crypto.py | 57 +++++++++++++++++++++++++++++++++++----------- hushline/routes.py | 34 ++++++++++++++++----------- 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/hushline/crypto.py b/hushline/crypto.py index 9ec366a6..c245d287 100644 --- a/hushline/crypto.py +++ b/hushline/crypto.py @@ -1,31 +1,50 @@ import os from base64 import urlsafe_b64decode, urlsafe_b64encode -from hashlib import shake_256 from cryptography.fernet import Fernet +from cryptography.hazmat.primitives.kdf.scrypt import Scrypt from flask import current_app from pysequoia import Cert, encrypt -def get_encryption_key(scope: bytes | str | None = None) -> Fernet: +def generate_salt() -> str: + """ + Generate a random salt for use in encryption key derivation. + """ + return urlsafe_b64encode(os.urandom(16)).decode() + + +def get_encryption_key(scope: bytes | str | None = None, salt: str | None = None) -> Fernet: + """ + Return the default Fernet encryption key. If a scope and salt are provided, a unique encryption + key will be derived based on the scope and salt. + """ encryption_key = os.environ.get("ENCRYPTION_KEY") if encryption_key is None: raise ValueError("Encryption key not found. Please check your .env file.") # If a scope is provided, we will use it to derive a unique encryption key - if scope is not None: + if scope is not None and salt is not None: + # Convert the scope to bytes if it is a string if isinstance(scope, str): scope_bytes = scope.encode() elif isinstance(scope, bytes): scope_bytes = scope + # Convert the encryption key and salt to bytes encryption_key_bytes = urlsafe_b64decode(encryption_key) - - # Use SHAKE-256 to derive a unique encryption key based on the scope - shake = shake_256() - shake.update(encryption_key_bytes) - shake.update(scope_bytes) - encryption_key = urlsafe_b64encode(shake.digest(32)).decode() + salt_bytes = urlsafe_b64decode(salt) + + # Use Scrypt to derive a unique encryption key based on the scope + kdf = Scrypt( + salt=salt_bytes, + length=32, + n=2**14, + r=8, + p=1, + ) + new_encryption_key_bytes = kdf.derive(encryption_key_bytes + scope_bytes) + encryption_key = urlsafe_b64encode(new_encryption_key_bytes).decode() return Fernet(encryption_key) @@ -36,11 +55,17 @@ def get_encryption_key(scope: bytes | str | None = None) -> Fernet: raise ValueError("Encryption key not found. Please check your .env file.") -def encrypt_field(data: bytes | str | None, scope: bytes | str | None = None) -> str | None: +def encrypt_field( + data: bytes | str | None, scope: bytes | str | None = None, salt: str | None = None +) -> str | None: + """ + Encrypts the data with the default encryption key. If both scope and salt are provided, + a unique encryption key will be derived based on the scope and salt. + """ if data is None: return None - fernet = get_encryption_key(scope) + fernet = get_encryption_key(scope, salt) # Check if data is already a bytes object if not isinstance(data, bytes): @@ -52,11 +77,17 @@ def encrypt_field(data: bytes | str | None, scope: bytes | str | None = None) -> return fernet.encrypt_at_time(data, current_time=0).decode() -def decrypt_field(data: str | None, scope: bytes | str | None = None) -> str | None: +def decrypt_field( + data: str | None, scope: bytes | str | None = None, salt: str | None = None +) -> str | None: + """ + Decrypts the data with the default encryption key. If both scope and salt are provided, + a unique encryption key will be derived based on the scope and salt. + """ if data is None: return None - fernet = get_encryption_key(scope) + fernet = get_encryption_key(scope, salt) return fernet.decrypt(data.encode()).decode() diff --git a/hushline/routes.py b/hushline/routes.py index face4cd4..07784b14 100644 --- a/hushline/routes.py +++ b/hushline/routes.py @@ -21,7 +21,7 @@ from wtforms import Field, Form, PasswordField, StringField, TextAreaField from wtforms.validators import DataRequired, Length, Optional, ValidationError -from .crypto import decrypt_field, encrypt_field, encrypt_message +from .crypto import decrypt_field, encrypt_field, encrypt_message, generate_salt from .db import db from .forms import ComplexPassword from .model import AuthenticationLog, InviteCode, Message, SMTPEncryption, User @@ -119,20 +119,25 @@ def profile(username: str) -> Response | str: return redirect(url_for("index")) # If the encrypted message is stored in the session, use it to populate the form - if "submit_contact_method" in session: + scope = "submit_message" + if ( + f"{scope}:salt" in session + and f"{scope}:contact_method" in session + and f"{scope}:content" in session + ): try: form.contact_method.data = decrypt_field( - session["submit_contact_method"], "submit_message" + session[f"{scope}:contact_method"], scope, session[f"{scope}:salt"] + ) + form.content.data = decrypt_field( + session[f"{scope}:content"], scope, session[f"{scope}:salt"] ) - except Exception: - app.logger.error("Error decrypting contact method", exc_info=True) - session.pop("submit_contact_method", None) - if "submit_content" in session: - try: - form.content.data = decrypt_field(session["submit_content"], "submit_message") except Exception: app.logger.error("Error decrypting content", exc_info=True) - session.pop("submit_content", None) + + session.pop(f"{scope}:contact_method", None) + session.pop(f"{scope}:content", None) + session.pop(f"{scope}:salt", None) # Generate a simple math problem using secrets module (e.g., "What is 6 + 7?") num1 = secrets.randbelow(10) + 1 # To get a number between 1 and 10 @@ -171,10 +176,13 @@ def submit_message(username: str) -> Response | str: captcha_answer = request.form.get("captcha_answer", "") if not validate_captcha(captcha_answer): # Encrypt the message and store it in the session - session["submit_contact_method"] = encrypt_field( - form.contact_method.data, "submit_message" + scope = "submit_message" + salt = generate_salt() + session[f"{scope}:contact_method"] = encrypt_field( + form.contact_method.data, scope, salt ) - session["submit_content"] = encrypt_field(form.content.data, "submit_message") + session[f"{scope}:content"] = encrypt_field(form.content.data, scope, salt) + session[f"{scope}:salt"] = salt return redirect(url_for("profile", username=username)) From 4e65a6c3b8c4d246ae75ffc8ffdf870a8d36408f Mon Sep 17 00:00:00 2001 From: Micah Lee Date: Tue, 27 Aug 2024 14:32:14 -0700 Subject: [PATCH 6/9] Write test to ensure that the content is pre-populated if the captcha is failed --- tests/test_profile.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/test_profile.py b/tests/test_profile.py index c442bc48..2a8956af 100644 --- a/tests/test_profile.py +++ b/tests/test_profile.py @@ -164,3 +164,42 @@ def test_profile_extra_fields(client: FlaskClient, app: Flask) -> None: # XSS should be escaped assert b"" not in response.data assert b"<script>alert('xss')</script>" in response.data + + +def test_profile_submit_message_with_invalid_captcha(client: FlaskClient) -> None: + # Register a user + user = register_user(client, "test_user_concat", "Secure-Test-Pass123") + assert user is not None + + # Log in the user + login_success = login_user(client, "test_user_concat", "Secure-Test-Pass123") + assert login_success + + # Prepare the message and contact method data + message_content = "This is a test message." + contact_method = "email@example.com" + message_data = { + "content": message_content, + "contact_method": contact_method, + "client_side_encrypted": "false", + "captcha_answer": 0, # the answer is never 0 + } + + # Send a POST request to submit the message + response = client.post( + f"/to/{user.primary_username}", + data=message_data, + follow_redirects=True, + ) + + # Make sure there's a CAPTCHA error + assert response.status_code == 200 + assert b"Incorrect CAPTCHA." in response.data + + # Make sure the contact method and message content are there + assert contact_method.encode() in response.data + assert message_content.encode() in response.data + + # Verify that the message is not saved in the database + message = db.session.scalars(db.select(Message).filter_by(user_id=user.id).limit(1)).first() + assert message is None From 601a729400e85df54444f4c2d17cec0a4fabd22e Mon Sep 17 00:00:00 2001 From: Micah Lee Date: Wed, 28 Aug 2024 14:29:28 -0700 Subject: [PATCH 7/9] Improve how the key and scope are concatenated --- hushline/crypto.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/hushline/crypto.py b/hushline/crypto.py index c245d287..15d7daf8 100644 --- a/hushline/crypto.py +++ b/hushline/crypto.py @@ -43,7 +43,14 @@ def get_encryption_key(scope: bytes | str | None = None, salt: str | None = None r=8, p=1, ) - new_encryption_key_bytes = kdf.derive(encryption_key_bytes + scope_bytes) + + # Concatenate the encryption key with the scope + items = (encryption_key_bytes, scope_bytes) + result = len(items).to_bytes(8, "big") + result += b"".join(len(item).to_bytes(8, "big") + item for item in items) + + # Derive the new key + new_encryption_key_bytes = kdf.derive(result) encryption_key = urlsafe_b64encode(new_encryption_key_bytes).decode() return Fernet(encryption_key) From 740aa15eded55e3d699124a6a2752b77cf4eb92b Mon Sep 17 00:00:00 2001 From: Micah Lee Date: Wed, 28 Aug 2024 14:35:05 -0700 Subject: [PATCH 8/9] Define the scrypt literals as constants --- hushline/crypto.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/hushline/crypto.py b/hushline/crypto.py index 15d7daf8..5a428a1a 100644 --- a/hushline/crypto.py +++ b/hushline/crypto.py @@ -6,6 +6,12 @@ from flask import current_app from pysequoia import Cert, encrypt +# https://cryptography.io/en/latest/hazmat/primitives/key-derivation-functions/#scrypt +SCRYPT_LENGTH = 32 # The desired length of the derived key in bytes. +SCRYPT_N = 2**14 # CPU/Memory cost parameter. It must be larger than 1 and be a power of 2. +SCRYPT_R = 8 # Block size parameter. +SCRYPT_P = 1 # Parallelization parameter. + def generate_salt() -> str: """ @@ -38,10 +44,10 @@ def get_encryption_key(scope: bytes | str | None = None, salt: str | None = None # Use Scrypt to derive a unique encryption key based on the scope kdf = Scrypt( salt=salt_bytes, - length=32, - n=2**14, - r=8, - p=1, + length=SCRYPT_LENGTH, + n=SCRYPT_N, + r=SCRYPT_R, + p=SCRYPT_P, ) # Concatenate the encryption key with the scope From d4417126c7f9d0101c199c4f0e0a7a8054fd13fa Mon Sep 17 00:00:00 2001 From: Micah Lee Date: Fri, 30 Aug 2024 10:20:59 -0700 Subject: [PATCH 9/9] Increase salt size to 32 bytes --- hushline/crypto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hushline/crypto.py b/hushline/crypto.py index 5a428a1a..f72d6173 100644 --- a/hushline/crypto.py +++ b/hushline/crypto.py @@ -17,7 +17,7 @@ def generate_salt() -> str: """ Generate a random salt for use in encryption key derivation. """ - return urlsafe_b64encode(os.urandom(16)).decode() + return urlsafe_b64encode(os.urandom(32)).decode() def get_encryption_key(scope: bytes | str | None = None, salt: str | None = None) -> Fernet: