Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

attempted to fix the email injection issue #9139

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions app/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ def verify_email():
logging.info('Email Verified')
return make_response(jsonify(message="Email Verified"), 200)

# implemented regular expression to limit the character set to be used in the emails.
# used bleach library to prevent cross site scripting attacks
import re
from bleach import clean

EMAIL_REGEX = r"^[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Email regex might be too restrictive for some valid email addresses

The current regex doesn't allow for some valid email formats, such as addresses with subdomains or certain allowed special characters. Consider using a more comprehensive regex or a dedicated email validation library.

import email_validator

def validate_email(email):
    try:
        email_validator.validate_email(email)
        return True
    except email_validator.EmailNotValidError:
        return False


@auth_routes.route('/resend-verification-email', methods=['POST'])
def resend_verification_email():
Expand All @@ -317,6 +323,13 @@ def resend_verification_email():
logging.error('Bad Request')
raise BadRequestError({'source': ''}, 'Bad Request Error')

if not re.match(EMAIL_REGEX, email): # this portion matches the email with the fixed character set.
logging.error('Invalid email address')
raise BadRequestError({'source': ''}, 'Invalid email address')

sanitized_email = clean(email) # here the mail is cleaned from XSS attacks
email=sanitized_email

try:
user = User.query.filter_by(email=email).one()
except NoResultFound:
Expand Down
42 changes: 25 additions & 17 deletions app/api/chat/rocket_chat.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging
import random
import string
import secrets
from dataclasses import dataclass
from typing import Optional

import requests

from app.api.helpers.db import get_new_identifier, get_or_create
Expand All @@ -14,6 +14,8 @@
from app.models.video_stream import VideoStream
from app.settings import get_settings

from typing import Union

logger = logging.getLogger(__name__)


Expand All @@ -39,8 +41,8 @@ class RocketChat:
def login_url(self) -> str:
return self.api_url + '/api/v1/login'

def login(self, user: User, event: Optional[Event] = None, method: str = 'login'):
def save_token(token):
def login(self, user: User, event: Optional[Event] = None, method: str = 'login') -> Union[dict, None]:
def save_token(token)->None:
user.rocket_chat_token = token
db.session.add(user)
db.session.commit()
Expand All @@ -66,7 +68,7 @@ def register(
user: User,
event: Optional[Event] = None,
username_suffix='',
):
)->Union[dict,None]:
settings = get_settings()
register_url = self.api_url + '/api/v1/users.register'
register_data = {
Expand Down Expand Up @@ -104,7 +106,7 @@ def get_token(
event: Optional[Event] = None,
retried=False,
microlocation: Optional[Microlocation] = None,
):
)->Union[dict,None]:
if user.rocket_chat_token:
res = requests.post(self.login_url, json=dict(resume=user.rocket_chat_token))

Expand Down Expand Up @@ -149,7 +151,7 @@ def get_token_virtual_room(
event: Optional[Event] = None,
retried=False,
videoStream: Optional[VideoStream] = None,
):
)->Union[dict,None]:
if user.rocket_chat_token:
res = requests.post(self.login_url, json=dict(resume=user.rocket_chat_token))

Expand Down Expand Up @@ -190,7 +192,7 @@ def get_token_virtual_room(

return self.register(user, event)

def check_or_create_bot(self):
def check_or_create_bot(self)->User:
bot_email = 'open-event-bot@open-event.invalid'
bot_user, _ = get_or_create(
User,
Expand All @@ -200,7 +202,7 @@ def check_or_create_bot(self):

return bot_user

def create_room(self, event: Event, microlocation: Optional[Microlocation], data):
def create_room(self, event: Event, microlocation: Optional[Microlocation], data)->None:
bot_token = data['token']
bot_id = data['res']['data']['userId']
if microlocation:
Expand Down Expand Up @@ -234,7 +236,7 @@ def create_room(self, event: Event, microlocation: Optional[Microlocation], data

def create_room_virtual_room(
self, event: Event, videoStream: Optional[VideoStream], data
):
)->None:
bot_token = data['token']
bot_id = data['res']['data']['userId']
if videoStream:
Expand Down Expand Up @@ -268,7 +270,7 @@ def create_room_virtual_room(

def add_in_room(
self, event: Event, rocket_user_id, microlocation: Optional[Microlocation] = None
):
)->None:
bot = self.check_or_create_bot()
data = self.get_token(bot)

Expand Down Expand Up @@ -299,7 +301,7 @@ def add_in_room(

def add_in_room_virtual_room(
self, event: Event, rocket_user_id, videoStream: Optional[VideoStream] = None
):
)->None:
bot = self.check_or_create_bot()
data = self.get_token_virtual_room(bot, videoStream=videoStream)
if (not event.chat_room_id) or (videoStream and not videoStream.chat_room_id):
Expand Down Expand Up @@ -328,15 +330,21 @@ def add_in_room_virtual_room(
raise RocketChatException('Error while adding user', response=res)


def generate_pass(size=10, chars=string.ascii_lowercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
def generate_pass(size=10, chars=string.digits + string.ascii_letters + string.punctuation)->str:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 suggestion (security): Consider using a more limited set of special characters for password generation

While including punctuation increases password strength, it might cause compatibility issues with some systems. Consider using a subset of special characters that are widely accepted.

Suggested change
def generate_pass(size=10, chars=string.digits + string.ascii_letters + string.punctuation)->str:
def generate_pass(size=10, chars=string.digits + string.ascii_letters + "!@#$%^&*()-_=+")->str:

# Error handled cases for negative size and no password characters.
# Secrets used instead of random, due to the code being used by company.
if size < 0:
raise ValueError("Negative length for password not allowed")
elif not chars:
raise ValueError("Password cannot be empty")
return ''.join(secrets.choice(chars) for _ in range(size))


def get_rocket_chat_token(
user: User,
event: Optional[Event] = None,
microlocation: Optional[Microlocation] = None,
):
)->dict:
settings = get_settings()
if not (api_url := settings['rocket_chat_url']):
raise RocketChatException(
Expand All @@ -351,7 +359,7 @@ def get_rocket_chat_token_virtual_room(
user: User,
event: Optional[Event] = None,
videoStream: Optional[VideoStream] = None,
):
)->Union[dict,None]:
settings = get_settings()
if not (api_url := settings['rocket_chat_url']):
raise RocketChatException(
Expand All @@ -362,7 +370,7 @@ def get_rocket_chat_token_virtual_room(
return rocket_chat.get_token_virtual_room(user, event, videoStream=videoStream)


def rename_rocketchat_room(event: Event):
def rename_rocketchat_room(event: Event)-> None:
settings = get_settings()
if not event.chat_room_id or not (api_url := settings['rocket_chat_url']):
return
Expand Down