Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to override SSL certificate checking #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions usosapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class USOSAPIConnection():
(check them with test_connection function), you may already use a subset
of USOS API services that don't require a valid access key.

To prevent the 'self signed certificate in certificate chain' error
for development environments you can disable the certificate checking by
passing verify_certificate=False to the constructor.

To log in as a specific user you need to get an URL address with
get_authorization_url and somehow display it to the user (this module
doesn't provide any UI). On the web page, after accepting
Expand All @@ -89,7 +93,7 @@ class USOSAPIConnection():
authorize_with_pin function, you will have an authorized_session.
"""
def __init__(self, api_base_address: str, consumer_key: str,
consumer_secret: str):
consumer_secret: str, verify_certificate: bool = True):
self.base_address = str(api_base_address)
if not self.base_address:
raise ValueError('Empty USOS API address.')
Expand Down Expand Up @@ -121,10 +125,11 @@ def __init__(self, api_base_address: str, consumer_key: str,
_LOGGER.info('New connection to {} created with key: {} '
'and secret: {}.'.format(api_base_address,
consumer_key, consumer_secret))
self.verify_certificate = verify_certificate

def _generate_request_token(self):
params = {'oauth_callback': 'oob', 'scopes': SCOPES}
token_tuple = self._service.get_request_token(params=params)
token_tuple = self._service.get_request_token(params=params, verify=self.verify_certificate)
self._request_token, self._request_token_secret = token_tuple
_LOGGER.info("New request token generated: {}".format(token_tuple[0]))
return
Expand Down Expand Up @@ -160,7 +165,7 @@ def test_connection(self) -> bool:
time_re = '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}$'
try:
anonymous_session = self._service.get_session()
now = anonymous_session.get('services/apisrv/now')
now = anonymous_session.get('services/apisrv/now', verify=self.verify_certificate)
now = now.json()
return bool(re.match(time_re, now))
except Exception as e:
Expand All @@ -176,7 +181,7 @@ def get_authorization_url(self) -> str:
successful authorization.
"""
self._generate_request_token()
url = self._service.get_authorize_url(self._request_token)
url = self._service.get_authorize_url(self._request_token, verify=self.verify_certificate)
_LOGGER.info('New authorization URL generated: {}'.format(url))
return url

Expand Down Expand Up @@ -263,7 +268,7 @@ def get(self, service: str, **kwargs):
session = self._authorized_session

start = time.time()
response = session.post(service, params=kwargs, data={})
response = session.post(service, params=kwargs, data={}, verify=self.verify_certificate)
ex_time = time.time() - start

if not response.ok:
Expand Down Expand Up @@ -310,7 +315,7 @@ def current_identity(self):
If current session is anonymous it will raise USOSAPIException.
"""
try:
data = self.get('services/users/user')
data = self.get('services/users/user', self.verify_certificate)
return data
except USOSAPIException:
raise USOSAPIException('Trying to get identity of an unauthorized'
Expand Down