This repository has been archived by the owner on Feb 24, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add utils for webhook signature verification (#139)
* add utils for webhook signature verification * improve error handling and add more test cases
- Loading branch information
Showing
3 changed files
with
213 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
import hashlib | ||
import hmac | ||
import json | ||
import logging | ||
|
||
from django.conf import settings | ||
from django.core.exceptions import ImproperlyConfigured | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def webhook_signature_valid(request): | ||
"""Verifies the integrity of signed webhooks from Closeio. | ||
When a webhook subscription is created via the Closeio API you get a secret | ||
signature key in the response of the request. This is the key that Closeio | ||
uses to sign their webhooks. You can use the key to verify that the request | ||
is indeed coming from Closeio (only Closeio should have the signature key) | ||
and that the payload was not altered inbetween. | ||
This is a simple security mechanism to protect you from false requests to your | ||
endpoints that receive the webhooks. | ||
The signature key for each webhook subscription needs to be stored inside the | ||
environment variable `CLOSEIO_WEBHOOK_SIGNATURE_KEYS` as json string containing | ||
the webhook subscription ID as key and the signature key as value, e.g.:: | ||
CLOSEIO_WEBHOOK_SIGNATURE_KEYS = '{"whsub_1": "123", "whsub_2": "456"}' | ||
More information can be found in the `Closeio Docs`_ in the section "Webhook signatures". | ||
.. _Closeio Docs: https://developer.close.com/#webhook | ||
Args: | ||
request: an instance of Django's ``HttpRequest`` object | ||
""" | ||
payload = json.loads(request.body) | ||
subscription_id = payload.get('subscription_id') | ||
if not subscription_id: | ||
return False | ||
|
||
try: | ||
signature_keys = json.loads(settings.CLOSEIO_WEBHOOK_SIGNATURE_KEYS) | ||
except AttributeError: | ||
raise ImproperlyConfigured('CLOSEIO_WEBHOOK_SIGNATURE_KEYS setting not set.') | ||
except TypeError: | ||
raise ImproperlyConfigured('Cannot load value of CLOSEIO_WEBHOOK_SIGNATURE_KEYS to json.') | ||
|
||
try: | ||
signature_key = signature_keys[subscription_id] | ||
except TypeError: | ||
raise ImproperlyConfigured('Parsed value of CLOSEIO_WEBHOOK_SIGNATURE_KEYS is expected ' | ||
'to be a dictionary.') | ||
except KeyError: | ||
logger.error('No signature key set for Closeio webhook subscription with id %s.', | ||
subscription_id) | ||
return False | ||
|
||
close_sig_hash = request.META.get('HTTP_CLOSE_SIG_HASH') | ||
close_sig_timestamp = request.META.get('HTTP_CLOSE_SIG_TIMESTAMP') | ||
|
||
if not close_sig_hash or not close_sig_timestamp: | ||
return False | ||
|
||
data = close_sig_timestamp + request.body.decode('utf-8') | ||
try: | ||
signature = hmac.new( | ||
bytearray.fromhex(signature_key), | ||
data.encode('utf-8'), | ||
hashlib.sha256 | ||
).hexdigest() | ||
except (TypeError, ValueError): | ||
raise ImproperlyConfigured('The signature key for a Closeio webhook subscription must ' | ||
'be a valid string from a hexadecimal number.') | ||
valid = hmac.compare_digest(close_sig_hash, signature) | ||
|
||
return valid |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
import json | ||
|
||
import pytest | ||
from django.core.exceptions import ImproperlyConfigured | ||
|
||
from closeio.contrib.django.utils import webhook_signature_valid | ||
|
||
|
||
class TestCloseioWebhookPermission: | ||
@pytest.fixture | ||
def payload(self): | ||
return '{"event": {"id": "ev_4YUAT9G7PQNYEX0V7HpiZa", '\ | ||
'"date_created": "2019-01-01T13:33:00.842000", "data": {"object_type": '\ | ||
'"activity.note", "action": "created", "note": "test content"}}, '\ | ||
'"subscription_id": "whsub_mBTylJxRXaBOXcuQmgUdmL"}' | ||
|
||
@pytest.fixture | ||
def headers(self): | ||
return { | ||
'HTTP_CLOSE_SIG_HASH': | ||
'e6cf27a55de463dc46d8b5c16d2afe74d766e8f0c9892422c93573521cd377ed', | ||
'HTTP_CLOSE_SIG_TIMESTAMP': '1557751596', | ||
'content_type': 'application/json', | ||
} | ||
|
||
@pytest.fixture | ||
def webhook_settings(self, settings): | ||
webhook_id = 'whsub_mBTylJxRXaBOXcuQmgUdmL' | ||
signature_key = 'b9260244ef33625f9b4b26a27db08758cdd39478b852c73f2d33ab042eb8abb4' | ||
settings.CLOSEIO_WEBHOOK_SIGNATURE_KEYS = json.dumps({webhook_id: signature_key}) | ||
|
||
def test_valid(self, rf, webhook_settings, headers, payload): | ||
"""Test the signture check with a valid example.""" | ||
request = rf.post('/some/webhook/view', payload, **headers) | ||
assert webhook_signature_valid(request) is True | ||
|
||
def test_errors_because_no_signature_key_was_set(self, rf, headers, payload): | ||
"""Should raise an error if we don't find the signature key inside the settings.""" | ||
request = rf.post('/some/webhook/view', payload, **headers) | ||
with pytest.raises(ImproperlyConfigured): | ||
webhook_signature_valid(request) | ||
|
||
def test_errors_because_signature_key_is_not_in_a_dict(self, rf, settings, headers, payload): | ||
"""Should raise an error if the signature key is not a dictionary.""" | ||
settings.CLOSEIO_WEBHOOK_SIGNATURE_KEYS = json.dumps(['list', 'instead', 'of', 'dict']) | ||
|
||
request = rf.post('/some/webhook/view', payload, **headers) | ||
with pytest.raises(ImproperlyConfigured): | ||
webhook_signature_valid(request) | ||
|
||
def test_errors_because_signature_key_is_not_hex(self, rf, settings, headers, payload): | ||
"""Should fail if the signature key is not a valid hex string.""" | ||
webhook_id = 'whsub_mBTylJxRXaBOXcuQmgUdmL' | ||
signature_key = 'this is not a valid hexadecimal format' | ||
settings.CLOSEIO_WEBHOOK_SIGNATURE_KEYS = json.dumps({webhook_id: signature_key}) | ||
|
||
request = rf.post('/some/webhook/view', payload, **headers) | ||
with pytest.raises(ImproperlyConfigured): | ||
webhook_signature_valid(request) | ||
|
||
def test_invalid_because_signature_key_is_wrong(self, rf, settings, headers, payload): | ||
"""Should fail if the signature key is wrong.""" | ||
webhook_id = 'whsub_mBTylJxRXaBOXcuQmgUdmL' | ||
signature_key = '0000000000000000000000000000000000000000000000000000000000000000' | ||
settings.CLOSEIO_WEBHOOK_SIGNATURE_KEYS = json.dumps({webhook_id: signature_key}) | ||
|
||
request = rf.post('/some/webhook/view', payload, **headers) | ||
assert webhook_signature_valid(request) is False | ||
|
||
def test_invalid_because_payload_was_altered(self, rf, webhook_settings, headers, payload): | ||
"""Should fail if the payload was altered after signing.""" | ||
payload = payload.replace('test content', 'content altered by Dr. Evil') | ||
request = rf.post('/some/webhook/view', payload, **headers) | ||
assert webhook_signature_valid(request) is False | ||
|
||
def test_invalid_because_hash_was_not_provided(self, rf, webhook_settings, headers, payload): | ||
"""Should fail if the hash doesn't match.""" | ||
headers.pop('HTTP_CLOSE_SIG_HASH') | ||
request = rf.post('/some/webhook/view', payload, **headers) | ||
assert webhook_signature_valid(request) is False | ||
|
||
def test_invalid_because_hash_does_not_match(self, rf, webhook_settings, headers, payload): | ||
"""Should fail if the hash doesn't match.""" | ||
headers['HTTP_CLOSE_SIG_HASH'] = 'xxxxxx' | ||
request = rf.post('/some/webhook/view', payload, **headers) | ||
assert webhook_signature_valid(request) is False | ||
|
||
def test_invalid_because_timestamp_has_changed(self, rf, webhook_settings, headers, payload): | ||
"""Should fail if the timestamp has changed.""" | ||
headers['HTTP_CLOSE_SIG_TIMESTAMP'] = '1234567890' | ||
request = rf.post('/some/webhook/view', payload, **headers) | ||
assert webhook_signature_valid(request) is False |