Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for encrypted RSA private keys in decrypt_oaep and rsa_sign functions #23

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions jose.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def sign(claims, jwk, add_header=None, alg='HS256'):
header = dict((add_header or {}).items() + [(HEADER_ALG, alg)])
header, payload = map(b64encode_url, map(json_encode, (header, claims)))

sig = b64encode_url(hash_fn(_jws_hash_str(header, payload), jwk['k'],
sig = b64encode_url(hash_fn(_jws_hash_str(header, payload), jwk,
mod=mod))

return JWS(header, payload, sig)
Expand All @@ -499,6 +499,7 @@ def verify(jws, jwk, alg, validate_claims=True, expiry_seconds=None):
:raises: :class:`~jose.NotYetValid` if the JWT is not yet valid
:raises: :class:`~jose.Error` if there is an error decrypting the JWE
"""

header, payload, sig = map(b64decode_url, jws)
header = json_decode(header)
if alg != header[HEADER_ALG]:
Expand All @@ -507,7 +508,7 @@ def verify(jws, jwk, alg, validate_claims=True, expiry_seconds=None):
(_, verify_fn), mod = JWA[header[HEADER_ALG]]

if not verify_fn(_jws_hash_str(jws.header, jws.payload),
jwk['k'], sig, mod=mod):
jwk, sig, mod=mod):
raise Error('Mismatched signatures')

claims = json_decode(b64decode_url(jws.payload))
Expand Down Expand Up @@ -564,22 +565,23 @@ def encrypt_oaep(plaintext, jwk):

def decrypt_oaep(ciphertext, jwk):
try:
return PKCS1_OAEP.new(RSA.importKey(jwk['k'])).decrypt(ciphertext)
passphrase = jwk['passphrase'] if 'passphrase' in jwk else None
return PKCS1_OAEP.new(RSA.importKey(jwk['k'], passphrase)).decrypt(ciphertext)
except ValueError as e:
raise Error(e.args[0])


def hmac_sign(s, key, mod=SHA256):
hmac = HMAC.new(key, digestmod=mod)
def hmac_sign(s, jwk, mod=SHA256):
hmac = HMAC.new(jwk['k'], digestmod=mod)
if not isinstance(s, (tuple, list)):
s = (s,)
for item in s:
hmac.update(item)
return hmac.digest()


def hmac_verify(s, key, sig, mod=SHA256):
hmac = HMAC.new(key, digestmod=mod)
def hmac_verify(s, jwk, sig, mod=SHA256):
hmac = HMAC.new(jwk['k'], digestmod=mod)
if not isinstance(s, (tuple, list)):
s = (s,)
for item in s:
Expand All @@ -591,14 +593,15 @@ def hmac_verify(s, key, sig, mod=SHA256):
return True


def rsa_sign(s, key, mod=SHA256):
key = RSA.importKey(key)
def rsa_sign(s, jwk, mod=SHA256):
passphrase = jwk['passphrase'] if 'passphrase' in jwk else None
key = RSA.importKey(jwk['k'], passphrase)
hash = mod.new(s)
return PKCS1_v1_5_SIG.new(key).sign(hash)


def rsa_verify(s, key, sig, mod=SHA256):
key = RSA.importKey(key)
def rsa_verify(s, jwk, sig, mod=SHA256):
key = RSA.importKey(jwk['k'])
hash = mod.new(s)
return PKCS1_v1_5_SIG.new(key).verify(hash, sig)

Expand Down