Skip to content
This repository has been archived by the owner on Dec 2, 2024. It is now read-only.

Commit

Permalink
feat: simpler signed tokens implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tnix100 committed Aug 23, 2024
1 parent c7b3801 commit d54ec93
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 25 deletions.
52 changes: 32 additions & 20 deletions database.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import redis
import os
import secrets
import time
from radix import Radix
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey

from utils import log, create_ed25519_keys, import_priv_ed25519_key, import_pub_ed25519_key
from signing import SigningKeys
from utils import log

CURRENT_DB_VERSION = 9

Expand Down Expand Up @@ -219,24 +217,38 @@


# Load existing signing keys or create new ones
# The active private key should be rotated every 10 days by the background thread
# and public keys older than 90 days should be invalidated.
_priv_key = Ed25519PrivateKey.generate()
if db.config.count_documents({"_id": "signing_key"}, limit=1):
_priv_key = Ed25519PrivateKey.from_private_bytes(db.config.find_one({"_id": "signing_key"})["raw"])
signing_keys = {}
if db.config.count_documents({"_id": "signing_keys"}, limit=1):
data = db.config.count_documents({"_id": "signing_keys"}, limit=1)

acc_priv = Ed25519PrivateKey.from_private_bytes(data["acc_priv"])
email_priv = Ed25519PrivateKey.from_private_bytes(data["email_priv"])

signing_keys.update({
"acc_priv": acc_priv,
"acc_pub": acc_priv.public_key(),

"email_priv": email_priv,
"email_pub": email_priv.public_key()
})
else:
db.config.update_one({"_id": "signing_key"}, {"$set": {
"raw": _priv_key.private_bytes_raw(),
"rotated_at": int(time.time())
}}, upsert=True)
db.pub_signing_keys.insert_one({
"raw": _priv_key.public_key().public_bytes_raw(),
"created_at": int(time.time())
acc_priv = Ed25519PrivateKey.generate()
email_priv = Ed25519PrivateKey.generate()

signing_keys.update({
"acc_priv": acc_priv,
"acc_pub": acc_priv.public_key(),

"email_priv": email_priv,
"email_pub": email_priv.public_key()
})
signing_keys = SigningKeys(_priv_key, [
Ed25519PublicKey.from_public_bytes(pub_signing_key["raw"])
for pub_signing_key in db.pub_signing_keys.find({})
])

data = {
"_id": "signing_keys",
"acc_priv": acc_priv.private_bytes_raw(),
"email_priv": email_priv.private_bytes_raw()
}
db.confing.insert_one(signing_keys)


# Load netblocks
Expand Down
29 changes: 29 additions & 0 deletions rest_api/v0/me.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import qrcode, qrcode.image.svg
import uuid
import secrets
import os

import security
from database import db, rdb, get_total_pages
Expand Down Expand Up @@ -45,6 +46,10 @@ class Config:
validate_assignment = True
str_strip_whitespace = True

class UpdateEmailBody(BaseModel):
password: str = Field(min_length=1, max_length=255) # change in API v1
email: Optional[str] = Field(default=None, max_length=255)

class ChangePasswordBody(BaseModel):
old: str = Field(min_length=1, max_length=255) # change in API v1
new: str = Field(min_length=8, max_length=72)
Expand Down Expand Up @@ -200,6 +205,30 @@ async def get_relationships():
}, 200


@me_bp.patch("/email")
@validate_request(UpdateEmailBody)
async def update_email(data: UpdateEmailBody):
# Make sure email is enabled
if not os.getenv("EMAIL_SMTP_HOST"):
return {"error": True, "type": "featureDisabled"}, 503

# Check authorization
if not request.user:
abort(401)

# Check ratelimits
if security.ratelimited(f"login:u:{request.user}") or security.ratelimited(f"emailch:{request.user}"):
abort(429)

# Check password
account = db.usersv0.find_one({"_id": request.user}, projection={"email": 1, "pswd": 1})
if not security.check_password_hash(data.old, account["pswd"]):
security.ratelimit(f"login:u:{request.user}", 5, 60)
return {"error": True, "type": "invalidCredentials"}, 401

# Send email


@me_bp.patch("/password")
@validate_request(ChangePasswordBody)
async def change_password(data: ChangePasswordBody):
Expand Down
42 changes: 37 additions & 5 deletions security.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from hashlib import sha256
from typing import Optional, Any
from typing import Optional, Any, Literal
from base64 import urlsafe_b64encode, urlsafe_b64decode
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
Expand Down Expand Up @@ -47,6 +48,12 @@
TOKEN_BYTES = 64


TOKEN_TYPES = Literal[
"acc", # account authorization
"email", # email actions (such as email verification or account recovery)
]


email_file_loader = jinja2.FileSystemLoader("email_templates")
email_env = jinja2.Environment(loader=email_file_loader)

Expand Down Expand Up @@ -142,6 +149,7 @@ def create_account(username: str, password: str, ip: str):
"avatar": "",
"avatar_color": "000000",
"quote": "",
"email": "",
"pswd": hash_password(password),
"mfa_recovery_code": secrets.token_hex(5),
"tokens": [],
Expand Down Expand Up @@ -262,12 +270,36 @@ def create_user_token(username: str, ip: str, used_token: Optional[str] = None)
return new_token


def create_token(ttype: int, subject: Any, scopes: int, expires_in: Optional[int] = None) -> str:
pass
def create_token(ttype: TOKEN_TYPES, claims: Any, expires_in: Optional[int] = None) -> str:
token = b"miau_" + ttype.encode()

# Add claims
token += b"." + urlsafe_b64encode(msgpack(claims))

# Add expiration
token += b"." + urlsafe_b64encode(str(int(time.time())+expires_in).encode())

# Sign token and add signature to token
token += b"." + urlsafe_b64encode(signing_keys[ttype + "_priv"].sign(token))

return token.decode()


def extract_token(token: str, expected_type: TOKEN_TYPES) -> Optional[Any]:
# Extract data from the token
ttype, claims, expires_at, signature = token.split(".")

# Check type
if ttype.replace("miau_", "") != expected_type:
return None

# Check signature
signing_keys[ttype.replace("miau_", "") + "_pub"].verify(
urlsafe_b64decode(signature),
(ttype.encode() + b"." + claims.encode() + b"." + expires_at.encode())
)

def extract_token(token: str) -> tuple[int, Any, int]:
pass
return msgpack.unpack(urlsafe_b64decode(claims))


def update_settings(username, newdata):
Expand Down

0 comments on commit d54ec93

Please sign in to comment.