-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
attempted to fix the email injection issue #9139
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,9 +1,9 @@ | ||||||
import logging | ||||||
import random | ||||||
import string | ||||||
import secrets | ||||||
from dataclasses import dataclass | ||||||
from typing import Optional | ||||||
|
||||||
import requests | ||||||
|
||||||
from app.api.helpers.db import get_new_identifier, get_or_create | ||||||
|
@@ -14,6 +14,8 @@ | |||||
from app.models.video_stream import VideoStream | ||||||
from app.settings import get_settings | ||||||
|
||||||
from typing import Union | ||||||
|
||||||
logger = logging.getLogger(__name__) | ||||||
|
||||||
|
||||||
|
@@ -39,8 +41,8 @@ class RocketChat: | |||||
def login_url(self) -> str: | ||||||
return self.api_url + '/api/v1/login' | ||||||
|
||||||
def login(self, user: User, event: Optional[Event] = None, method: str = 'login'): | ||||||
def save_token(token): | ||||||
def login(self, user: User, event: Optional[Event] = None, method: str = 'login') -> Union[dict, None]: | ||||||
def save_token(token)->None: | ||||||
user.rocket_chat_token = token | ||||||
db.session.add(user) | ||||||
db.session.commit() | ||||||
|
@@ -66,7 +68,7 @@ def register( | |||||
user: User, | ||||||
event: Optional[Event] = None, | ||||||
username_suffix='', | ||||||
): | ||||||
)->Union[dict,None]: | ||||||
settings = get_settings() | ||||||
register_url = self.api_url + '/api/v1/users.register' | ||||||
register_data = { | ||||||
|
@@ -104,7 +106,7 @@ def get_token( | |||||
event: Optional[Event] = None, | ||||||
retried=False, | ||||||
microlocation: Optional[Microlocation] = None, | ||||||
): | ||||||
)->Union[dict,None]: | ||||||
if user.rocket_chat_token: | ||||||
res = requests.post(self.login_url, json=dict(resume=user.rocket_chat_token)) | ||||||
|
||||||
|
@@ -149,7 +151,7 @@ def get_token_virtual_room( | |||||
event: Optional[Event] = None, | ||||||
retried=False, | ||||||
videoStream: Optional[VideoStream] = None, | ||||||
): | ||||||
)->Union[dict,None]: | ||||||
if user.rocket_chat_token: | ||||||
res = requests.post(self.login_url, json=dict(resume=user.rocket_chat_token)) | ||||||
|
||||||
|
@@ -190,7 +192,7 @@ def get_token_virtual_room( | |||||
|
||||||
return self.register(user, event) | ||||||
|
||||||
def check_or_create_bot(self): | ||||||
def check_or_create_bot(self)->User: | ||||||
bot_email = 'open-event-bot@open-event.invalid' | ||||||
bot_user, _ = get_or_create( | ||||||
User, | ||||||
|
@@ -200,7 +202,7 @@ def check_or_create_bot(self): | |||||
|
||||||
return bot_user | ||||||
|
||||||
def create_room(self, event: Event, microlocation: Optional[Microlocation], data): | ||||||
def create_room(self, event: Event, microlocation: Optional[Microlocation], data)->None: | ||||||
bot_token = data['token'] | ||||||
bot_id = data['res']['data']['userId'] | ||||||
if microlocation: | ||||||
|
@@ -234,7 +236,7 @@ def create_room(self, event: Event, microlocation: Optional[Microlocation], data | |||||
|
||||||
def create_room_virtual_room( | ||||||
self, event: Event, videoStream: Optional[VideoStream], data | ||||||
): | ||||||
)->None: | ||||||
bot_token = data['token'] | ||||||
bot_id = data['res']['data']['userId'] | ||||||
if videoStream: | ||||||
|
@@ -268,7 +270,7 @@ def create_room_virtual_room( | |||||
|
||||||
def add_in_room( | ||||||
self, event: Event, rocket_user_id, microlocation: Optional[Microlocation] = None | ||||||
): | ||||||
)->None: | ||||||
bot = self.check_or_create_bot() | ||||||
data = self.get_token(bot) | ||||||
|
||||||
|
@@ -299,7 +301,7 @@ def add_in_room( | |||||
|
||||||
def add_in_room_virtual_room( | ||||||
self, event: Event, rocket_user_id, videoStream: Optional[VideoStream] = None | ||||||
): | ||||||
)->None: | ||||||
bot = self.check_or_create_bot() | ||||||
data = self.get_token_virtual_room(bot, videoStream=videoStream) | ||||||
if (not event.chat_room_id) or (videoStream and not videoStream.chat_room_id): | ||||||
|
@@ -328,15 +330,21 @@ def add_in_room_virtual_room( | |||||
raise RocketChatException('Error while adding user', response=res) | ||||||
|
||||||
|
||||||
def generate_pass(size=10, chars=string.ascii_lowercase + string.digits): | ||||||
return ''.join(random.choice(chars) for _ in range(size)) | ||||||
def generate_pass(size=10, chars=string.digits + string.ascii_letters + string.punctuation)->str: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚨 suggestion (security): Consider using a more limited set of special characters for password generation While including punctuation increases password strength, it might cause compatibility issues with some systems. Consider using a subset of special characters that are widely accepted.
Suggested change
|
||||||
# Error handled cases for negative size and no password characters. | ||||||
# Secrets used instead of random, due to the code being used by company. | ||||||
if size < 0: | ||||||
raise ValueError("Negative length for password not allowed") | ||||||
elif not chars: | ||||||
raise ValueError("Password cannot be empty") | ||||||
return ''.join(secrets.choice(chars) for _ in range(size)) | ||||||
|
||||||
|
||||||
def get_rocket_chat_token( | ||||||
user: User, | ||||||
event: Optional[Event] = None, | ||||||
microlocation: Optional[Microlocation] = None, | ||||||
): | ||||||
)->dict: | ||||||
settings = get_settings() | ||||||
if not (api_url := settings['rocket_chat_url']): | ||||||
raise RocketChatException( | ||||||
|
@@ -351,7 +359,7 @@ def get_rocket_chat_token_virtual_room( | |||||
user: User, | ||||||
event: Optional[Event] = None, | ||||||
videoStream: Optional[VideoStream] = None, | ||||||
): | ||||||
)->Union[dict,None]: | ||||||
settings = get_settings() | ||||||
if not (api_url := settings['rocket_chat_url']): | ||||||
raise RocketChatException( | ||||||
|
@@ -362,7 +370,7 @@ def get_rocket_chat_token_virtual_room( | |||||
return rocket_chat.get_token_virtual_room(user, event, videoStream=videoStream) | ||||||
|
||||||
|
||||||
def rename_rocketchat_room(event: Event): | ||||||
def rename_rocketchat_room(event: Event)-> None: | ||||||
settings = get_settings() | ||||||
if not event.chat_room_id or not (api_url := settings['rocket_chat_url']): | ||||||
return | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Email regex might be too restrictive for some valid email addresses
The current regex doesn't allow for some valid email formats, such as addresses with subdomains or certain allowed special characters. Consider using a more comprehensive regex or a dedicated email validation library.