diff --git a/dlunch/__init__.py b/dlunch/__init__.py
index bdc2c0f..f53c67d 100755
--- a/dlunch/__init__.py
+++ b/dlunch/__init__.py
@@ -10,8 +10,7 @@
from . import core
from .core import __version__
from . import gui
-from . import auth
-from .auth import pn_user
+from .auth import AuthUser
# LOGGER ----------------------------------------------------------------------
log: logging.Logger = logging.getLogger(__name__)
@@ -41,16 +40,21 @@ def create_app(config: DictConfig) -> pn.Template:
"""
log.info("starting initialization process")
+ # Create an instance of AuthUser (which has also an instance of AuthContext
+ # among its attributes)
+ auth_user = AuthUser(config=config)
+
log.info("initialize database")
# Create tables
models.create_database(
- config, add_basic_auth_users=auth.is_basic_auth_active(config=config)
+ config,
+ add_basic_auth_users=auth_user.auth_context.is_basic_auth_active(),
)
log.info("initialize support variables")
# Generate a random password only if requested (check on flag)
log.debug("config guest user")
- guest_password = auth.set_guest_user_password(config)
+ guest_password = auth_user.auth_context.set_guest_user_password()
log.info("instantiate app")
@@ -62,13 +66,11 @@ def create_app(config: DictConfig) -> pn.Template:
# Set guest override flag if it is None (not found in flags table)
# Guest override flag is per-user and is not set for guests
if (
- models.get_flag(config=config, id=f"{pn_user(config)}_guest_override")
+ models.get_flag(config=config, id=f"{auth_user.name}_guest_override")
is None
- ) and not auth.is_guest(
- user=pn_user(config), config=config, allow_override=False
- ):
+ ) and not auth_user.is_guest():
models.set_flag(
- config=config, id=f"{pn_user(config)}_guest_override", value=False
+ config=config, id=f"{auth_user.name}_guest_override", value=False
)
# DASHBOARD BASE TEMPLATE
@@ -100,6 +102,7 @@ def create_app(config: DictConfig) -> pn.Template:
app=app,
person=person,
guest_password=guest_password,
+ auth_user=auth_user,
)
# DASHBOARD
@@ -127,7 +130,7 @@ def create_app(config: DictConfig) -> pn.Template:
gi.reload_on_guest_override(
toggle=models.get_flag(
config=config,
- id=f"{pn_user(config)}_guest_override",
+ id=f"{auth_user.name}_guest_override",
value_if_missing=False,
),
reload=False,
@@ -156,10 +159,15 @@ def create_backend(config: DictConfig) -> pn.Template:
log.info("starting initialization process")
+ # Create an instance of AuthUser (which has also an instance of AuthContext
+ # among its attributes)
+ auth_user = AuthUser(config=config)
+
log.info("initialize database")
# Create tables
models.create_database(
- config, add_basic_auth_users=auth.is_basic_auth_active(config=config)
+ config,
+ add_basic_auth_users=auth_user.auth_context.is_basic_auth_active(),
)
log.info("instantiate backend")
@@ -180,7 +188,7 @@ def create_backend(config: DictConfig) -> pn.Template:
)
# CONFIGURABLE OBJECTS
- backend_gi = gui.BackendInterface(config)
+ backend_gi = gui.BackendInterface(config, auth_user=auth_user)
# DASHBOARD
# Build dashboard
diff --git a/dlunch/__main__.py b/dlunch/__main__.py
index b94ebe7..3044db1 100755
--- a/dlunch/__main__.py
+++ b/dlunch/__main__.py
@@ -43,11 +43,12 @@ def run_app(config: DictConfig) -> None:
waiter.set_config(config)
# Set auth configurations
- log.info("set auth config and encryption")
+ log.info("set auth context and encryption")
+ auth_context = auth.AuthContext(config=config)
# Auth encryption
- auth.set_app_auth_and_encryption(config)
+ auth_context.set_app_auth_and_encryption()
log.debug(
- f'authentication {"" if auth.is_auth_active(config) else "not "}active'
+ f'authentication {"" if auth_context.is_auth_active() else "not "}active'
)
log.info("set panel config")
@@ -68,12 +69,12 @@ def run_app(config: DictConfig) -> None:
pn.config.auth_template = config.auth.auth_error_template
# If basic auth is used the database and users credentials shall be created here
- if auth.is_basic_auth_active:
+ if auth_context.is_basic_auth_active():
log.info("initialize database and users credentials for basic auth")
# Create tables
models.create_database(
config,
- add_basic_auth_users=auth.is_basic_auth_active(config=config),
+ add_basic_auth_users=auth_context.is_basic_auth_active(),
)
# Starting scheduled tasks
@@ -94,14 +95,14 @@ def run_app(config: DictConfig) -> None:
# Health is an endpoint for app health assessments
# Pass a dictionary for a multipage app
pages = {"": lambda: create_app(config=config)}
- if auth.is_auth_active(config=config):
+ if auth_context.is_auth_active():
pages["backend"] = lambda: create_backend(config=config)
# If basic authentication is active, instantiate ta special auth object
# otherwise leave an empty dict
# This step is done before panel.serve because auth_provider requires that
# the whole config is passed as an input
- if auth.is_basic_auth_active(config=config):
+ if auth_context.is_basic_auth_active():
auth_object = {
"auth_provider": hydra.utils.instantiate(
config.basic_auth.auth_provider, config
diff --git a/dlunch/auth.py b/dlunch/auth.py
index b73d9b5..d96cd1b 100644
--- a/dlunch/auth.py
+++ b/dlunch/auth.py
@@ -97,13 +97,14 @@ def check_permission(self, user: str, password: str) -> bool:
Returns:
bool: user authentication flag (`True` if authenticated)
"""
- password_hash = get_hash_from_user(user, self.config)
- if password_hash == password:
+ auth_user = AuthUser(config=self.config, name=user)
+ password_hash = auth_user.password_hash
+ if auth_user.password_hash == password:
# Check if hash needs update
valid, new_hash = password_hash.verify_and_update(password)
if valid and new_hash:
# Update to new hash
- add_user_hashed_password(user, password, config=self.config)
+ auth_user.add_user_hashed_password(password)
# Return the OK value
return True
# Return the NOT OK value
@@ -395,699 +396,625 @@ def from_str(cls, password: str) -> Self:
return cls(cls.encrypt(password))
-# FUNCTIONS -------------------------------------------------------------------
-
-
-def pn_user(config: DictConfig) -> str:
- """Return the user from Panel state object.
-
- If `config.auth.remove_email_domain` is `True`, remove the email domain from username.
-
- Args:
- config (DictConfig): Hydra configuration dictionary.
-
- Returns:
- str: username.
- """
- # Store user
- user = pn.state.user
-
- if user:
- # Check if username is an email
- if re.fullmatch(r"[^@]+@[^@]+\.[^@]+", user):
- # Remove domain from username
- if config.auth.remove_email_domain:
- user = user.split("@")[0]
-
- return user
-
-
-def is_basic_auth_active(config: DictConfig) -> bool:
- """Check configuration object and return `True` if basic authentication is active.
- Return `False` otherwise.
-
- Args:
- config (DictConfig): Hydra configuration dictionary.
-
- Returns:
- bool: `True` if basic authentication is active, `False` otherwise.
- """
-
- # Check if a valid auth key exists
- auth_provider = config.get("basic_auth", None)
-
- return auth_provider is not None
-
-
-def is_auth_active(config: DictConfig) -> bool:
- """Check configuration object and return `True` if basic authentication or OAuth is active.
- Return `False` otherwise.
+class AuthContext:
+ """Class to handle authentication context and related operations.
Args:
config (DictConfig): Hydra configuration dictionary.
-
- Returns:
- bool: `True` if authentication (basic or OAuth) is active, `False` otherwise.
"""
- # Check if a valid auth key exists
- auth_provider = is_basic_auth_active(config=config)
- oauth_provider = config.server.get("oauth_provider", None) is not None
+ def __init__(self, config: DictConfig) -> None:
+ self.config = config
- return auth_provider or oauth_provider
+ def is_basic_auth_active(self) -> bool:
+ """Check configuration object and return `True` if basic authentication is active.
+ Return `False` otherwise.
+ Returns:
+ bool: `True` if basic authentication is active, `False` otherwise.
+ """
+ # Check if a valid basic_auth key exists
+ auth_provider = self.config.get("basic_auth", None)
+ return auth_provider is not None
-def auth_type(config: DictConfig) -> str | None:
- """Check configuration object and return authentication type.
+ def is_auth_active(self) -> bool:
+ """Check configuration object and return `True` if basic authentication or OAuth is active.
+ Return `False` otherwise.
- Args:
- config (DictConfig): Hydra configuration dictionary.
+ Returns:
+ bool: `True` if authentication (basic or OAuth) is active, `False` otherwise.
+ """
+ # Check if a valid oauth key exists
+ auth_provider = self.is_basic_auth_active()
+ oauth_provider = (
+ self.config.server.get("oauth_provider", None) is not None
+ )
+ return auth_provider or oauth_provider
- Returns:
- str | None: authentication type. None if no authentication is active.
- """
+ def auth_type(self) -> str | None:
+ """Check configuration object and return authentication type.
- # Check if a valid auth key exists
- if is_basic_auth_active(config=config):
- auth_type = "basic"
- elif config.server.get("oauth_provider", None) is not None:
- auth_type = config.server.oauth_provider
- else:
- auth_type = None
+ Returns:
+ str | None: authentication type. None if no authentication is active.
+ """
+ if self.is_basic_auth_active():
+ auth_type = "basic"
+ elif self.config.server.get("oauth_provider", None) is not None:
+ auth_type = self.config.server.oauth_provider
+ else:
+ auth_type = None
- return auth_type
+ return auth_type
+ def set_app_auth_and_encryption(self) -> None:
+ """Setup Panel authorization and encryption.
-def authorize(
- config: DictConfig,
- user_info: dict,
- target_path: str,
- authorize_guest_users=False,
-) -> bool:
- """Authorization callback: read config, user info and the target path of the
- requested resource.
+ Namely:
+ - Encryption key
+ - Cookie expiry date
- Return `True` (authorized) or `False` (not authorized) by checking current user
- and target path.
+ Raises:
+ ImportError: missing library (cryptography).
+ """
+ # Encryption key
+ try:
+ if self.config.auth.oauth_encryption_key:
+ pn.config.oauth_encryption_key = (
+ self.config.auth.oauth_encryption_key.encode("ascii")
+ )
+ pn.state.encryption = Fernet(pn.config.oauth_encryption_key)
+ except ConfigAttributeError:
+ log.warning(
+ "missing authentication encryption key, generate a key with the `panel oauth-secret` CLI command and then provide it to hydra using the DATA_LUNCH_OAUTH_ENC_KEY environment variable"
+ )
+ # Cookie expiry date
+ try:
+ if self.config.auth.oauth_expiry:
+ pn.config.oauth_expiry = self.config.auth.oauth_expiry
+ except ConfigAttributeError:
+ log.warning(
+ "missing explicit authentication expiry date for cookies, defaults to 1 day"
+ )
- Args:
- config (DictConfig): Hydra configuration dictionary.
- user_info (dict): dictionary with user info passed by Panel to the authorization handle.
- target_path (str): path of the requested resource.
- authorize_guest_users (bool, optional): Set to `True` to enable the main page to guest users.
- Defaults to `False`.
+ def list_privileged_users(self) -> list[str]:
+ """List only privileged users (from `privileged_users` table).
- Returns:
- bool: authorization flag. `True` if authorized.
- """
+ Returns:
+ list[str]: list of usernames.
+ """
+ session = models.create_session(self.config)
- # If authorization is not active authorize every user
- if not is_auth_active(config=config):
- return True
+ with session:
+ privileged_users = session.scalars(
+ select(models.PrivilegedUsers)
+ ).all()
- # Set current user from panel state
- current_user = pn_user(config)
- privileged_users = list_privileged_users(config=config)
- log.debug(f"target path: {target_path}")
- # If user is not authenticated block it
- if not current_user:
- return False
- # All privileged users can reach backend (but the backend will have
- # controls only for admins)
- if current_user in privileged_users:
- return True
- # If the target is the mainpage always authorized (if authenticated)
- if authorize_guest_users and (target_path == "/"):
- return True
+ # Return users
+ users_list = [u.user for u in privileged_users]
+ users_list.sort()
- # In all other cases, don't authorize and logout
- pn.state.location.pathname.split("/")[0] + "/logout"
- return False
+ return users_list
+ def list_users_guests_and_privileges(self) -> pd.DataFrame:
+ """Join `privileged_users` and `credentials` tables to list normal users,
+ admins and guests.
-def set_app_auth_and_encryption(config: DictConfig) -> None:
- """Setup Panel authorization and encryption.
+ Returns a dataframe.
- Args:
- config (DictConfig): Hydra configuration dictionary.
+ Returns:
+ pd.DataFrame: dataframe with users and privileges.
+ """
- Raises:
- ImportError: missing library (cryptography).
- """
- try:
- if config.auth.oauth_encryption_key:
- pn.config.oauth_encryption_key = (
- config.auth.oauth_encryption_key.encode("ascii")
- )
- pn.state.encryption = Fernet(pn.config.oauth_encryption_key)
- except ConfigAttributeError:
- log.warning(
- "missing authentication encryption key, generate a key with the `panel oauth-secret` CLI command and then provide it to hydra using the DATA_LUNCH_OAUTH_ENC_KEY environment variable"
- )
- # Cookie expiry date
- try:
- if config.auth.oauth_expiry:
- pn.config.oauth_expiry = config.auth.oauth_expiry
- except ConfigAttributeError:
- log.warning(
- "missing explicit authentication expiry date for cookies, defaults to 1 day"
+ # Query tables required to understand users and guests
+ df_privileged_users = models.PrivilegedUsers.read_as_df(
+ config=self.config,
+ index_col="user",
)
+ # Leave credentials table empty if basic auth is not active
+ if self.is_basic_auth_active():
+ df_credentials = models.Credentials.read_as_df(
+ config=self.config,
+ index_col="user",
+ )
+ else:
+ df_credentials = pd.DataFrame()
+ # Change admin column to privileges (used after join)
+ df_privileged_users["group"] = df_privileged_users.admin.map(
+ {True: "admin", False: "user"}
+ )
+ df_user_guests_privileges = df_privileged_users.join(
+ df_credentials, how="outer"
+ )[["group"]]
+ df_user_guests_privileges = df_user_guests_privileges.fillna("guest")
-def get_hash_from_user(user: str, config: DictConfig) -> PasswordHash | None:
- """Query the database to retrieve the hashed password for a certain user.
-
- Args:
- user (str): username.
- config (DictConfig): Hydra configuration dictionary.
-
- Returns:
- PasswordHash | None: returns password object if the user exist, `None` otherwise.
- """
- # Create session
- session = models.create_session(config)
- # Load user from database
- with session:
- user_credential = session.get(models.Credentials, user)
-
- # Get the hashed password
- if user_credential:
- hash = user_credential.password_hash or None
- else:
- hash = None
+ return df_user_guests_privileges
- return hash
+ @staticmethod
+ def generate_password(
+ alphabet: str | None = None,
+ special_chars: str | None = "",
+ length: int = 12,
+ ) -> str:
+ """Generate a random password.
+ Args:
+ alphabet (str | None, optional): list of characters to use as alphabet to generate the password.
+ Defaults to None.
+ special_chars (str | None, optional): special characters to include inside the password string.
+ Defaults to "".
+ length (int, optional): length of the random password.
+ Defaults to 12.
-def add_privileged_user(user: str, is_admin: bool, config: DictConfig) -> None:
- """Add user id to `privileged_users` table.
+ Returns:
+ str: random password.
+ """
+ # If alphabet is not avilable use a default one
+ if alphabet is None:
+ alphabet = string.ascii_letters + string.digits + special_chars
+ # Infinite loop for finding a valid password
+ while True:
+ password = "".join(secrets.choice(alphabet) for i in range(length))
+ # Create special chars condition only if special chars is non-empty
+ if special_chars:
+ special_chars_condition = any(
+ c in special_chars for c in password
+ )
+ else:
+ special_chars_condition = True
+ if (
+ any(c.islower() for c in password)
+ and any(c.isupper() for c in password)
+ and any(c.isdigit() for c in password)
+ and special_chars_condition
+ ):
+ break
+ return password
- The table is used by every authentication methods to understand which users are
- privileged and which ones are guests.
+ def set_guest_user_password(self) -> str:
+ """If guest user is active return a password, otherwise return an empty string.
- Args:
- user (str): username.
- is_admin (bool): admin flag.
- Set to `True` if the new user has admin privileges.
- config (DictConfig): Hydra configuration dictionary.
- """
- # Create session
- session = models.create_session(config)
- # New credentials
- new_privileged_user = models.PrivilegedUsers(user=user, admin=is_admin)
+ This function always returns an empty string if basic authentication is not active.
- # Update credentials
- # Use an upsert for postgresql, a simple session add otherwise
- models.session_add_with_upsert(
- session=session,
- constraint="privileged_users_pkey",
- new_record=new_privileged_user,
- )
- session.commit()
+ Guest user and basic authentication are handled through configuration files.
+ If the flag `reset_guest_user_password` is set to `True` the password is created
+ and uploaded to database. Otherwise the existing password is queried from database
+ `credentials` table.
-def add_user_hashed_password(
- user: str, password: str, config: DictConfig
-) -> None:
- """Add user credentials to `credentials` table.
+ Returns:
+ str: guest user password or empty string if basic authentication is not active.
+ """
+ # Check if basic auth is active
+ if self.is_basic_auth_active():
+ # If active basic_auth.guest_user is true if guest user is active
+ is_guest_user_active = self.config.basic_auth.guest_user
+ log.debug("guest user flag is {is_guest_user_active}")
+ else:
+ # Otherwise the guest user feature is not applicable
+ is_guest_user_active = False
+ log.debug("guest user not applicable")
+
+ # Set the guest password variable
+ if is_guest_user_active:
+ # If flag for resetting the password does not exist use the default
+ # value
+ if (
+ models.get_flag(
+ config=self.config, id="reset_guest_user_password"
+ )
+ is None
+ ):
+ models.set_flag(
+ config=self.config,
+ id="reset_guest_user_password",
+ value=self.config.basic_auth.default_reset_guest_user_password_flag,
+ )
+ # Generate a random password only if requested (check on flag)
+ # otherwise load from database
+ if models.get_flag(
+ config=self.config, id="reset_guest_user_password"
+ ):
+ # Turn off reset user password (in order to reset it only once)
+ # This statement also acquire a lock on database (so it is
+ # called first)
+ models.set_flag(
+ config=self.config,
+ id="reset_guest_user_password",
+ value=False,
+ )
+ # Create password
+ guest_password = self.generate_password(
+ special_chars=self.config.basic_auth.psw_special_chars,
+ length=self.config.basic_auth.generated_psw_length,
+ )
+ # Add hashed password to database
+ AuthUser(
+ config=self.config, auth_context=self, name="guest"
+ ).add_user_hashed_password(guest_password)
+ else:
+ # Load from database
+ session = models.create_session(self.config)
+ with session:
+ try:
+ guest_password = session.get(
+ models.Credentials, "guest"
+ ).password_encrypted.decrypt()
+ except InvalidToken:
+ # Notify exception and suggest to reset guest user password
+ guest_password = ""
+ log.warning(
+ "Unable to decrypt 'guest' user password because an invalid token has been detected: reset password from backend"
+ )
+ pn.state.notifications.warning(
+ "Unable to decrypt 'guest' user password
Invalid token detected: reset password from backend",
+ duration=self.config.panel.notifications.duration,
+ )
+ else:
+ guest_password = ""
- Used only by basic authentication.
+ return guest_password
- Args:
- user (str): username
- password (str): plain password (not hashed).
- config (DictConfig): Hydra configuration dictionary.
- """
- # Create session
- session = models.create_session(config)
- # New credentials
- # For the user named "guest" add also the encrypted password so that panel
- # can show the decrypted guest password to logged users
- # Can't use is_guest to determine the user that need encription, because
- # only the user named guest is shown in the guest user password widget
- if user == "guest":
- new_user_credential = models.Credentials(
- user=user, password_hash=password, password_encrypted=password
- )
- else:
- new_user_credential = models.Credentials(
- user=user, password_hash=password
- )
+ def submit_password(self, gi: gui.GraphicInterface) -> bool:
+ """Same as backend_submit_password with an additional check on old password.
- # Update credentials
- # Use an upsert for postgresql, a simple session add otherwise
- models.session_add_with_upsert(
- session=session,
- constraint="credentials_pkey",
- new_record=new_user_credential,
- )
- session.commit()
+ Args:
+ gi (gui.GraphicInterface): graphic interface object (used to interact with Panel widgets).
+ Returns:
+ bool: true if successful, false otherwise.
+ """
+ # Get authenticated user from Panel state
+ auth_user = AuthUser(config=self.config, auth_context=self)
+ # Get username, updated updated at each key press
+ old_password_key_press = gi.password_widget._widgets[
+ "old_password"
+ ].value_input
+ # Check if old password is correct
+ if auth_user.password_hash == old_password_key_press:
+ # Then run the same checks used for backend
+ return self.backend_submit_password(
+ gi=gi, user=auth_user.name, logout_on_success=True
+ )
+ else:
+ pn.state.notifications.error(
+ "Incorrect old password!",
+ duration=self.config.panel.notifications.duration,
+ )
+ return False
-def remove_user(user: str, config: DictConfig) -> dict:
- """Remove user from the database.
+ def backend_submit_password(
+ self,
+ gi: gui.GraphicInterface | gui.BackendInterface,
+ user: str = None,
+ user_is_guest: bool | None = None,
+ user_is_admin: bool | None = None,
+ logout_on_success: bool = False,
+ ) -> bool:
+ """Submit password to database from backend but used also from frontend as part of `submit_password` function.
- User is removed from `privileged_users` and `credentials` tables.
+ Args:
+ gi (gui.GraphicInterface | gui.BackendInterface): graphic interface object (used to interact with Panel widgets).
+ user (str, optional): username. Defaults to None.
+ user_is_guest (bool | None, optional): guest flag (true if guest). Defaults to None.
+ user_is_admin (bool | None, optional): admin flag (true if admin). Defaults to None.
+ logout_on_success (bool, optional): set to true to force logout once the new password is set. Defaults to False.
- Args:
- user (str): username.
- config (DictConfig): Hydra configuration dictionary.
+ Returns:
+ bool: true if successful, false otherwise.
+ """
+ # Check if user is passed, otherwise check if backend widget
+ # (password_widget.object.user) is available
+ if not user:
+ username = gi.password_widget._widgets["user"].value_input
+ else:
+ username = user
+ # Get all passwords, updated at each key press
+ new_password_key_press = gi.password_widget._widgets[
+ "new_password"
+ ].value_input
+ repeat_new_password_key_press = gi.password_widget._widgets[
+ "repeat_new_password"
+ ].value_input
+ # Check if new password match repeat password
+ if username:
+ if new_password_key_press == repeat_new_password_key_press:
+ # Check if new password is valid with regex
+ if re.fullmatch(
+ self.config.basic_auth.psw_regex,
+ new_password_key_press,
+ ):
+ auth_user = AuthUser(
+ config=self.config, auth_context=self, name=username
+ )
+ # If is_guest and is_admin are None (not passed) use the ones
+ # already set for the user
+ if user_is_guest is None:
+ user_is_guest = auth_user.is_guest()
+ if user_is_admin is None:
+ user_is_admin = auth_user.is_admin()
+ # First remove user from both 'privileged_users' and
+ # 'credentials' tables.
+ deleted_data = auth_user.remove_user()
+ if (deleted_data["privileged_users_deleted"] > 0) or (
+ deleted_data["credentials_deleted"] > 0
+ ):
+ pn.state.notifications.success(
+ f"Removed old data for
'{username}'
auth: {deleted_data['privileged_users_deleted']}
cred: {deleted_data['credentials_deleted']}",
+ duration=self.config.panel.notifications.duration,
+ )
+ else:
+ pn.state.notifications.warning(
+ f"Creating new user
'{username}' does not exist",
+ duration=self.config.panel.notifications.duration,
+ )
+ # Add a privileged users only if guest option is not active
+ if not user_is_guest:
+ auth_user.add_privileged_user(is_admin=user_is_admin)
+ # Green light: update the password!
+ auth_user.add_user_hashed_password(
+ password=new_password_key_press
+ )
- Returns:
- dict: dictionary with `privileged_users_deleted` and `credentials_deleted`
- with deleted rows from each table.
- """
- # Create session
- session = models.create_session(config)
-
- with session:
- # Delete user from privileged_users table
- privileged_users_deleted = session.execute(
- delete(models.PrivilegedUsers).where(
- models.PrivilegedUsers.user == user
+ # Logout if requested
+ if logout_on_success:
+ pn.state.notifications.success(
+ "Password updated
Logging out",
+ duration=self.config.panel.notifications.duration,
+ )
+ sleep(4)
+ gi.force_logout()
+ else:
+ pn.state.notifications.success(
+ "Password updated",
+ duration=self.config.panel.notifications.duration,
+ )
+ return True
+ else:
+ pn.state.notifications.error(
+ "Password requirements not satisfied
Check again!",
+ duration=self.config.panel.notifications.duration,
+ )
+ else:
+ pn.state.notifications.error(
+ "Passwords are different!",
+ duration=self.config.panel.notifications.duration,
+ )
+ else:
+ pn.state.notifications.error(
+ "Missing user!",
+ duration=self.config.panel.notifications.duration,
)
- )
- session.commit()
- # Delete user from credentials table
- credentials_deleted = session.execute(
- delete(models.Credentials).where(models.Credentials.user == user)
- )
- session.commit()
-
- return {
- "privileged_users_deleted": privileged_users_deleted.rowcount,
- "credentials_deleted": credentials_deleted.rowcount,
- }
+ return False
-def list_privileged_users(config: DictConfig) -> list[str]:
- """List only privileged users (from `privileged_users` table).
+class AuthUser:
+ """Class to handle user authentication and management.
Args:
config (DictConfig): Hydra configuration dictionary.
-
- Returns:
- list[str]: list of usernames.
+ name (str | None, optional): username. Defaults to None.
+ auth_context (AuthContext | None, optional): authentication context. Defaults to None.
"""
- # Create session
- session = models.create_session(config)
-
- with session:
- privileged_users = session.scalars(
- select(models.PrivilegedUsers)
- ).all()
-
- # Return users
- users_list = [u.user for u in privileged_users]
- users_list.sort()
-
- return users_list
+ def __init__(
+ self,
+ config: DictConfig,
+ name: str | None = None,
+ auth_context: AuthContext | None = None,
+ ) -> None:
+ self.config = config
+ self.auth_context = auth_context or AuthContext(config)
+ # Take username from Panel state if not provided
+ self.name = name or self.get_user_from_panel_state()
-def list_users_guests_and_privileges(config: DictConfig) -> pd.DataFrame:
- """Join `privileged_users` and `credentials` tables to list normal users,
- admins and guests.
+ def get_user_from_panel_state(self) -> str:
+ """Return the user from Panel state object.
- `credentials` table is populated only if basic authetication is active (in configuration files).
- A is considered a guest if it is not listed in `privileged_users` table
- but it is available in `credentials` table.
+ If `config.auth.remove_email_domain` is `True`, remove the email domain from username.
- Returns a dataframe.
+ Returns:
+ str: username.
+ """
+ user = pn.state.user
+ # Check if username is an email
+ if user and re.fullmatch(r"[^@]+@[^@]+\.[^@]+", user):
+ # Remove domain from username
+ if self.config.auth.remove_email_domain:
+ user = user.split("@")[0]
+ return user
- Args:
- config (DictConfig): Hydra configuration dictionary.
+ def is_guest(self, allow_override: bool = True) -> bool:
+ """Check if a user is a guest by checking if it is listed inside the `privileged_users` table.
- Returns:
- pd.DataFrame: dataframe with users and privileges.
- """
+ Args:
+ allow_override (bool, optional): override enablement flag. Defaults to True.
- # Query tables required to understand users and guests
- df_privileged_users = models.PrivilegedUsers.read_as_df(
- config=config,
- index_col="user",
- )
- # Leave credentials table empty if basic auth is not active
- if is_basic_auth_active(config=config):
- df_credentials = models.Credentials.read_as_df(
- config=config,
- index_col="user",
+ Returns:
+ bool: guest flag. `True` if the user is a guest.
+ """
+ # If authorization is not active always return false (user is not guest)
+ if not self.auth_context.is_auth_active():
+ return False
+
+ # Load guest override from flag table (if the button is pressed its value
+ # is True). If not available use False.
+ guest_override = models.get_flag(
+ config=self.config,
+ id=f"{self.name}_guest_override",
+ value_if_missing=False,
)
- else:
- df_credentials = pd.DataFrame()
- # Change admin column to privileges (used after join)
- df_privileged_users["group"] = df_privileged_users.admin.map(
- {True: "admin", False: "user"}
- )
- df_user_guests_privileges = df_privileged_users.join(
- df_credentials, how="outer"
- )[["group"]]
- df_user_guests_privileges = df_user_guests_privileges.fillna("guest")
-
- return df_user_guests_privileges
-
-
-def is_guest(
- user: str, config: DictConfig, allow_override: bool = True
-) -> bool:
- """Check if a user is a guest by checking if it is listed inside the `privileged_users` table.
-
- The guest override chached value (stored in `flags` table, on a per-user basis) can force
- the function to always return True.
-
- If `allow_override` is set to `False` the guest override value is ignored.
-
- Args:
- user (str): username.
- config (DictConfig): Hydra configuration dictionary.
- allow_override (bool, optional): override enablement flag, set to `False` to ignore guest override value.
- Defaults to True.
- Returns:
- bool: guest flag. `True` if the user is a guest.
- """
-
- # If authorization is not active always return false (user is not guest)
- if not is_auth_active(config=config):
- return False
-
- # Load guest override from flag table (if the button is pressed its value
- # is True). If not available use False.
- guest_override = models.get_flag(
- config=config,
- id=f"{pn_user(config)}_guest_override",
- value_if_missing=False,
- )
+ # If guest override is active always return true (user act like guest)
+ if guest_override and allow_override:
+ return True
- # If guest override is active always return true (user act like guest)
- if guest_override and allow_override:
- return True
+ # Otherwise check if user is not included in privileged users
+ privileged_users = self.auth_context.list_privileged_users()
- # Otherwise check if user is not included in privileged users
- privileged_users = list_privileged_users(config)
+ return self.name not in privileged_users
- is_guest = user not in privileged_users
+ def is_admin(self) -> bool:
+ """Check if a user is an admin by checking the `privileged_users` table.
- return is_guest
+ Returns:
+ bool: admin flag. `True` if the user is an admin.
+ """
+ # If authorization is not active always return false (ther is no admin)
+ if not self.auth_context.is_auth_active():
+ return False
+ session = models.create_session(self.config)
+ with session:
+ admin_users = session.scalars(
+ select(models.PrivilegedUsers).where(
+ models.PrivilegedUsers.admin == sql_true()
+ )
+ ).all()
+ return self.name in [u.user for u in admin_users]
-def is_admin(user: str, config: DictConfig) -> bool:
- """Check if a user is an admin by checking the `privileged_users` table
+ @property
+ def password_hash(self) -> PasswordHash | None:
+ """Query the database to retrieve the hashed password for the user.
- Args:
- user (str): username.
- config (DictConfig): Hydra configuration dictionary.
+ Returns:
+ PasswordHash | None: returns password object if the user exists, `None` otherwise.
+ """
+ session = models.create_session(self.config)
+ # Get the hashed password if user exists
+ with session:
+ user_credential = session.get(models.Credentials, self.name)
+ return user_credential.password_hash if user_credential else None
- Returns:
- bool: admin flag. `True` if the user is an admin.
- """
+ def add_privileged_user(self, is_admin: bool) -> None:
+ """Add user id to `privileged_users` table.
- # If authorization is not active always return false (ther is no admin)
- if not is_auth_active(config=config):
- return False
+ Args:
+ is_admin (bool): admin flag.
+ """
+ session = models.create_session(self.config)
+ # New credentials
+ new_privileged_user = models.PrivilegedUsers(
+ user=self.name, admin=is_admin
+ )
+ # Update credentials
+ # Use an upsert for postgresql, a simple session add otherwise
+ models.session_add_with_upsert(
+ session=session,
+ constraint="privileged_users_pkey",
+ new_record=new_privileged_user,
+ )
+ session.commit()
- # Create session
- session = models.create_session(config)
+ def add_user_hashed_password(self, password: str) -> None:
+ """Add user credentials to `credentials` table.
- with session:
- admin_users = session.scalars(
- select(models.PrivilegedUsers).where(
- models.PrivilegedUsers.admin == sql_true()
+ Args:
+ password (str): plain password (not hashed).
+ """
+ session = models.create_session(self.config)
+ # New credentials
+ # For the user named "guest" add also the encrypted password so that panel
+ # can show the decrypted guest password to logged users
+ # Can't use is_guest to determine the user that need encription, because
+ # only the user named guest is shown in the guest user password widget
+ if self.name == "guest":
+ new_user_credential = models.Credentials(
+ user=self.name,
+ password_hash=password,
+ password_encrypted=password,
)
- ).all()
- admin_list = [u.user for u in admin_users]
-
- is_admin = user in admin_list
-
- return is_admin
-
-
-def generate_password(
- alphabet: str | None = None,
- special_chars: str | None = "",
- length: int = 12,
-) -> str:
- """_summary_
-
- Args:
- alphabet (str | None, optional): list of charachters to use as alphabet to generate the password.
- Defaults to None.
- special_chars (str | None, optional): special charachters to include inside the password string.
- Defaults to "".
- length (int, optional): length of the random password.
- Defaults to 12.
-
- Returns:
- str: random password.
- """
- # If alphabet is not avilable use a default one
- if alphabet is None:
- alphabet = string.ascii_letters + string.digits + special_chars
- # Infinite loop for finding a valid password
- while True:
- password = "".join(secrets.choice(alphabet) for i in range(length))
- # Create special chars condition only if special chars is non-empty
- if special_chars:
- special_chars_condition = any(c in special_chars for c in password)
else:
- special_chars_condition = True
- if (
- any(c.islower() for c in password)
- and any(c.isupper() for c in password)
- and any(c.isdigit() for c in password)
- and special_chars_condition
- ):
- break
-
- return password
-
-
-def set_guest_user_password(config: DictConfig) -> str:
- """If guest user is active return a password, otherwise return an empty string.
-
- This function always returns an empty string if basic authentication is not active.
-
- Guest user and basic authentication are handled through configuration files.
+ new_user_credential = models.Credentials(
+ user=self.name, password_hash=password
+ )
+ # Update credentials
+ # Use an upsert for postgresql, a simple session add otherwise
+ models.session_add_with_upsert(
+ session=session,
+ constraint="credentials_pkey",
+ new_record=new_user_credential,
+ )
+ session.commit()
- If the flag `reset_guest_user_password` is set to `True` the password is created
- and uploaded to database. Otherwise the existing password is queried from database
- `credentials` table.
+ def remove_user(self) -> dict:
+ """Remove user from the database.
- Args:
- config (DictConfig): Hydra configuration dictionary.
+ Returns:
+ dict: dictionary with `privileged_users_deleted` and `credentials_deleted`
+ with deleted rows from each table.
+ """
+ session = models.create_session(self.config)
- Returns:
- str: guest user password or empty string if basic authentication is not active.
- """
- # Check if basic auth is active
- if is_basic_auth_active(config=config):
- # If active basic_auth.guest_user is true if guest user is active
- is_guest_user_active = config.basic_auth.guest_user
- log.debug("guest user flag is {is_guest_user_active}")
- else:
- # Otherwise the guest user feature is not applicable
- is_guest_user_active = False
- log.debug("guest user not applicable")
-
- # Set the guest password variable
- if is_guest_user_active:
- # If flag for resetting the password does not exist use the default
- # value
- if (
- models.get_flag(config=config, id="reset_guest_user_password")
- is None
- ):
- models.set_flag(
- config=config,
- id="reset_guest_user_password",
- value=config.basic_auth.default_reset_guest_user_password_flag,
- )
- # Generate a random password only if requested (check on flag)
- # otherwise load from pickle
- if models.get_flag(config=config, id="reset_guest_user_password"):
- # Turn off reset user password (in order to reset it only once)
- # This statement also acquire a lock on database (so it is
- # called first)
- models.set_flag(
- config=config,
- id="reset_guest_user_password",
- value=False,
- )
- # Create password
- guest_password = generate_password(
- special_chars=config.basic_auth.psw_special_chars,
- length=config.basic_auth.generated_psw_length,
+ with session:
+ # Delete user from privileged_users table
+ privileged_users_deleted = session.execute(
+ delete(models.PrivilegedUsers).where(
+ models.PrivilegedUsers.user == self.name
+ )
)
- # Add hashed password to database
- add_user_hashed_password("guest", guest_password, config=config)
- else:
- # Load from database
- session = models.create_session(config)
- with session:
- try:
- guest_password = session.get(
- models.Credentials, "guest"
- ).password_encrypted.decrypt()
- except InvalidToken:
- # Notify exception and suggest to reset guest user password
- guest_password = ""
- log.warning(
- "Unable to decrypt 'guest' user password because an invalid token has been detected: reset password from backend"
- )
- pn.state.notifications.warning(
- "Unable to decrypt 'guest' user password
Invalid token detected: reset password from backend",
- duration=config.panel.notifications.duration,
- )
- else:
- guest_password = ""
-
- return guest_password
-
+ session.commit()
-def submit_password(gi: gui.GraphicInterface, config: DictConfig) -> bool:
- """Same as backend_submit_password with an additional check on old
- password.
+ # Delete user from credentials table
+ credentials_deleted = session.execute(
+ delete(models.Credentials).where(
+ models.Credentials.user == self.name
+ )
+ )
+ session.commit()
- Args:
- config (DictConfig): Hydra configuration dictionary.
- gi (gui.GraphicInterface): graphic interface object (used to interact with Panel widgets).
+ return {
+ "privileged_users_deleted": privileged_users_deleted.rowcount,
+ "credentials_deleted": credentials_deleted.rowcount,
+ }
- Returns:
- bool: true if successful, false otherwise.
- """
- # Get user's password hash
- password_hash = get_hash_from_user(pn_user(config), config=config)
- # Get username, updated updated at each key press
- old_password_key_press = gi.password_widget._widgets[
- "old_password"
- ].value_input
- # Check if old password is correct
- if password_hash == old_password_key_press:
- # Check if new password match repeat password
- return backend_submit_password(
- gi=gi, config=config, user=pn_user(config), logout_on_success=True
- )
- else:
- pn.state.notifications.error(
- "Incorrect old password!",
- duration=config.panel.notifications.duration,
- )
- return False
+# FUNCTIONS -------------------------------------------------------------------
-def backend_submit_password(
- gi: gui.GraphicInterface | gui.BackendInterface,
+def authorize(
config: DictConfig,
- user: str = None,
- user_is_guest: bool | None = None,
- user_is_admin: bool | None = None,
- logout_on_success: bool = False,
+ user_info: dict,
+ target_path: str,
+ authorize_guest_users=False,
) -> bool:
- """Submit password to database from backend but used also from frontend as
- part of `submit_password` function.
+ """Authorization callback: read config, user info and the target path of the
+ requested resource.
- When used for backend `is_guest` and `is_admin` are selected from a widget.
- When called from frontend they are `None` and the function read them from
- database using the user input.
+ Return `True` (authorized) or `False` (not authorized) by checking current user
+ and target path.
Args:
- gi (gui.GraphicInterface | gui.BackendInterface): graphic interface object (used to interact with Panel widgets).
config (DictConfig): Hydra configuration dictionary.
- user (str, optional): username. Defaults to None.
- user_is_guest (bool | None, optional): guest flag (true if guest). Defaults to None.
- user_is_admin (bool | None, optional): admin flag (true if admin). Defaults to None.
- logout_on_success (bool, optional): set to true to force logout once the new password is set. Defaults to False.
+ user_info (dict): dictionary with user info passed by Panel to the authorization handle.
+ target_path (str): path of the requested resource.
+ authorize_guest_users (bool, optional): Set to `True` to enable the main page to guest users.
+ Defaults to `False`.
Returns:
- bool: true if successful, false otherwise.
+ bool: authorization flag. `True` if authorized.
"""
- # Check if user is passed, otherwise check if backend widget
- # (password_widget.object.user) is available
- if not user:
- username = gi.password_widget._widgets["user"].value_input
- else:
- username = user
- # Get all passwords, updated at each key press
- new_password_key_press = gi.password_widget._widgets[
- "new_password"
- ].value_input
- repeat_new_password_key_press = gi.password_widget._widgets[
- "repeat_new_password"
- ].value_input
- # Check if new password match repeat password
- if username:
- if new_password_key_press == repeat_new_password_key_press:
- # Check if new password is valid with regex
- if re.fullmatch(
- config.basic_auth.psw_regex,
- new_password_key_press,
- ):
- # If is_guest and is_admin are None (not passed) use the ones
- # already set for the user
- if user_is_guest is None:
- user_is_guest = is_guest(user=user, config=config)
- if user_is_admin is None:
- user_is_admin = is_admin(user=user, config=config)
- # First remove user from both 'privileged_users' and
- # 'credentials' tables.
- deleted_data = remove_user(user=username, config=config)
- if (deleted_data["privileged_users_deleted"] > 0) or (
- deleted_data["credentials_deleted"] > 0
- ):
- pn.state.notifications.success(
- f"Removed old data for
'{username}'
auth: {deleted_data['privileged_users_deleted']}
cred: {deleted_data['credentials_deleted']}",
- duration=config.panel.notifications.duration,
- )
- else:
- pn.state.notifications.warning(
- f"Creating new user
'{username}' does not exist",
- duration=config.panel.notifications.duration,
- )
- # Add a privileged users only if guest option is not active
- if not user_is_guest:
- add_privileged_user(
- user=username,
- is_admin=user_is_admin,
- config=config,
- )
- # Green light: update the password!
- add_user_hashed_password(
- user=username,
- password=new_password_key_press,
- config=config,
- )
-
- # Logout if requested
- if logout_on_success:
- pn.state.notifications.success(
- "Password updated
Logging out",
- duration=config.panel.notifications.duration,
- )
- sleep(4)
- gi.force_logout()
- else:
- pn.state.notifications.success(
- "Password updated",
- duration=config.panel.notifications.duration,
- )
- return True
-
- else:
- pn.state.notifications.error(
- "Password requirements not satisfied
Check again!",
- duration=config.panel.notifications.duration,
- )
-
- else:
- pn.state.notifications.error(
- "Passwords are different!",
- duration=config.panel.notifications.duration,
- )
- else:
- pn.state.notifications.error(
- "Missing user!",
- duration=config.panel.notifications.duration,
- )
+ # Set authenticated user from panel state (authentication context is
+ # instantiated automatically)
+ auth_user = AuthUser(config=config)
+ # If authorization is not active authorize every user
+ if not auth_user.auth_context.is_auth_active():
+ return True
+ # Get privileged users
+ privileged_users = auth_user.auth_context.list_privileged_users()
+ log.debug(f"target path: {target_path}")
+ # If user is not authenticated block it
+ if not auth_user.name:
+ return False
+ # All privileged users can reach backend (but the backend will have
+ # controls only for admins)
+ if auth_user.name in privileged_users:
+ return True
+ # If the target is the mainpage always authorized (if authenticated)
+ if authorize_guest_users and (target_path == "/"):
+ return True
+ # In all other cases, don't authorize and logout
+ pn.state.location.pathname.split("/")[0] + "/logout"
return False
diff --git a/dlunch/cli.py b/dlunch/cli.py
index 2921a38..92a7cb5 100755
--- a/dlunch/cli.py
+++ b/dlunch/cli.py
@@ -47,13 +47,20 @@ def cli(ctx, hydra_overrides: tuple | None):
config_path="conf", job_name="data_lunch_cli", version_base="1.3"
)
config = compose(config_name="config", overrides=hydra_overrides)
- ctx.obj = {"config": config}
- # Set waiter config
+ # Instance auth context and waiter
+ auth_context = auth.AuthContext(config=config)
waiter.set_config(config)
+ # Store common objects in context
+ ctx.obj = {
+ "config": config,
+ "auth_context": auth_context,
+ "waiter": waiter,
+ }
+
# Auth encryption
- auth.set_app_auth_and_encryption(config)
+ auth_context.set_app_auth_and_encryption()
@cli.group()
@@ -79,18 +86,18 @@ def _left_justify(df):
return df.str.ljust(df.str.len().max())
# Auth settings
- auth_type = auth.auth_type(config=obj["config"]) or "not active"
+ auth_type = obj["auth_context"].auth_type() or "not active"
click.secho("AUTH SETTINGS", fg="yellow", bold=True)
click.secho(f"authentication: {auth_type}\n")
# List user
click.secho("USERS", fg="yellow", bold=True)
if list_only_privileged_users:
- users = auth.list_privileged_users(config=obj["config"])
+ users = obj["auth_context"].list_privileged_users()
click.secho("user", fg="cyan")
click.secho("\n".join(users))
else:
- df_users = auth.list_users_guests_and_privileges(config=obj["config"])
+ df_users = obj["auth_context"].list_users_guests_and_privileges()
df_users = (
df_users.reset_index()
.apply(_left_justify)
@@ -110,11 +117,8 @@ def add_privileged_user(obj, user, is_admin):
"""Add privileged users (with or without admin privileges)."""
# Add privileged user to 'privileged_users' table
- auth.add_privileged_user(
- user=user,
- is_admin=is_admin,
- config=obj["config"],
- )
+ auth_user = auth.AuthUser(config=obj["config"], name=user)
+ auth_user.add_privileged_user(is_admin=is_admin)
click.secho(f"User '{user}' added (admin: {is_admin})", fg="green")
@@ -127,7 +131,7 @@ def remove_privileged_user(obj, user):
"""Remove user from both privileged users and basic login credentials table."""
# Clear action
- deleted_data = auth.remove_user(user, config=obj["config"])
+ deleted_data = auth.AuthUser(config=obj["config"], name=user).remove_user()
if (deleted_data["privileged_users_deleted"] > 0) or (
deleted_data["credentials_deleted"] > 0
@@ -162,14 +166,11 @@ def add_user_credential(obj, user, password, is_admin, is_guest):
and to privileged users (if not guest)."""
# Add a privileged users only if guest option is not active
+ auth_user = auth.AuthUser(config=obj["config"], name=user)
if not is_guest:
- auth.add_privileged_user(
- user=user,
- is_admin=is_admin,
- config=obj["config"],
- )
+ auth_user.add_privileged_user(is_admin=is_admin)
# Add hashed password to credentials table
- auth.add_user_hashed_password(user, password, config=obj["config"])
+ auth_user.add_user_hashed_password(password)
click.secho(f"User '{user}' added", fg="green")
@@ -182,7 +183,7 @@ def remove_user_credential(obj, user):
"""Remove user from both privileged users and basic login credentials table."""
# Clear action
- deleted_data = auth.remove_user(user, config=obj["config"])
+ deleted_data = auth.AuthUser(config=obj["config"], name=user).remove_user()
if (deleted_data["privileged_users_deleted"] > 0) or (
deleted_data["credentials_deleted"] > 0
diff --git a/dlunch/core.py b/dlunch/core.py
index 000010f..a68868c 100644
--- a/dlunch/core.py
+++ b/dlunch/core.py
@@ -27,8 +27,7 @@
# Graphic interface imports (after class definition)
from . import models
from . import gui
-from . import auth
-from .auth import pn_user
+from .auth import AuthUser
# APP METADATA ----------------------------------------------------------------
__version__: str = "3.4.0"
@@ -308,6 +307,10 @@ def reload_menu(
event (param.parameterized.Event): Panel button event.
gi (gui.GraphicInterface): graphic interface object (used to interact with Panel widgets).
"""
+
+ # Read user from Panel state
+ auth_user = AuthUser(config=self.config)
+
# Create session
session = models.create_session(self.config)
@@ -329,17 +332,13 @@ def reload_menu(
# Check guest override button status (if not in table use False)
gi.toggle_guest_override_button.value = models.get_flag(
config=self.config,
- id=f"{pn_user(self.config)}_guest_override",
+ id=f"{auth_user.name}_guest_override",
value_if_missing=False,
)
# Set no more orders toggle button and the change order time button
# visibility and activation
- if auth.is_guest(
- user=pn_user(self.config),
- config=self.config,
- allow_override=False,
- ):
+ if auth_user.is_guest(allow_override=False):
# Deactivate the no_more_orders_button for guest users
gi.toggle_no_more_order_button.disabled = True
gi.toggle_no_more_order_button.visible = False
@@ -355,7 +354,7 @@ def reload_menu(
gi.change_order_time_takeaway_button.visible = True
# Guest graphic configuration
- if auth.is_guest(user=pn_user(self.config), config=self.config):
+ if auth_user.is_guest():
# If guest show guest type selection group
gi.person_widget.widgets["guest"].disabled = False
gi.person_widget.widgets["guest"].visible = True
@@ -584,9 +583,9 @@ def reload_menu(
)
# Stats top text
stats_and_info_text = gi.build_stats_and_info_text(
- config=self.config,
+ auth_user=auth_user,
df_stats=df_stats,
- user=pn_user(self.config),
+ user=auth_user.name,
version=__version__,
host_name=self.get_host_name(),
stylesheets=[self.config.panel.gui.css_files.stats_info_path],
@@ -637,6 +636,10 @@ def send_order(
person (gui.Person): class that collect order data for the user that is the target of the order.
gi (gui.GraphicInterface): graphic interface object (used to interact with Panel widgets).
"""
+
+ # Read user from Panel state
+ auth_user = AuthUser(config=self.config)
+
# Get username updated at each key press
username_key_press = gi.person_widget._widgets["username"].value_input
@@ -665,9 +668,12 @@ def send_order(
# If auth is active, check if a guests is using a name reserved to a
# privileged user
if (
- auth.is_guest(user=pn_user(self.config), config=self.config)
- and (username_key_press in auth.list_users(config=self.config))
- and (auth.is_auth_active(config=self.config))
+ auth_user.is_guest()
+ and (
+ username_key_press
+ in auth_user.auth_context.list_privileged_users()
+ )
+ and (auth_user.auth_context.is_auth_active())
):
pn.state.notifications.error(
f"{username_key_press} is a reserved name
Please choose a different one",
@@ -684,20 +690,16 @@ def send_order(
# Check if a privileged user is ordering for an invalid name
if (
- not auth.is_guest(
- user=pn_user(self.config), config=self.config
- )
+ not auth_user.is_guest()
and (
username_key_press
not in (
name
- for name in auth.list_privileged_users(
- config=self.config
- )
+ for name in auth_user.auth_context.list_privileged_users()
if name != "guest"
)
)
- and (auth.is_auth_active(config=self.config))
+ and (auth_user.auth_context.is_auth_active())
):
pn.state.notifications.error(
f"{username_key_press} is not a valid name
for a privileged user
Please choose a different one",
@@ -731,9 +733,7 @@ def send_order(
try:
# Add User
# Do not pass guest for privileged users (default to NotAGuest)
- if auth.is_guest(
- user=pn_user(self.config), config=self.config
- ):
+ if auth_user.is_guest():
new_user = models.Users(
id=username_key_press,
guest=person.guest,
@@ -817,6 +817,10 @@ def delete_order(
app (pn.Template): Panel app template (used to open modal windows in case of database errors).
gi (gui.GraphicInterface): graphic interface object (used to interact with Panel widgets).
"""
+
+ # Read user from Panel state
+ auth_user = AuthUser(config=self.config)
+
# Get username, updated on every keypress
username_key_press = gi.person_widget._widgets["username"].value_input
@@ -846,14 +850,12 @@ def delete_order(
# If auth is active, check if a guests is deleting an order of a
# privileged user
if (
- auth.is_guest(
- user=pn_user(self.config), config=self.config
- )
+ auth_user.is_guest()
and (
username_key_press
- in auth.list_privileged_users(config=self.config)
+ in auth_user.auth_context.list_privileged_users()
)
- and (auth.is_auth_active(config=self.config))
+ and (auth_user.auth_context.is_auth_active())
):
pn.state.notifications.error(
f"You do not have enough privileges
to delete
{username_key_press}'s order",
diff --git a/dlunch/gui.py b/dlunch/gui.py
index b004e36..37d1f70 100644
--- a/dlunch/gui.py
+++ b/dlunch/gui.py
@@ -23,8 +23,7 @@
from . import models
# Auth
-from . import auth
-from .auth import pn_user
+from .auth import AuthUser
# Import used only for type checking, that have problems with circular imports
# TYPE_CHECKING is False at runtime (thus the import is not executed)
@@ -93,11 +92,11 @@ def __init__(self, config: OmegaConf, **params):
self.param.guest.default = config.panel.guest_types[0]
self.guest = config.panel.guest_types[0]
# Check user (a username is already set for privileged users)
- username = pn_user(config)
- if not auth.is_guest(
- user=username, config=config, allow_override=False
- ) and (username is not None):
- self.username = username
+ auth_user = AuthUser(config=config)
+ if not auth_user.is_guest(allow_override=False) and (
+ auth_user.name is not None
+ ):
+ self.username = auth_user.name
def __str__(self):
"""String representation of this object.
@@ -265,9 +264,9 @@ class GraphicInterface:
waiter (core.Waiter): Waiter object with methods to handle user requests.
app (pn.Template): App panel template (see `Panel docs `__).
person (Person): Object with user data and preferences for the lunch order.
+ auth_user (AuthUser): AuthUser object with authenticated user data.
guest_password (str, optional): guest password to show in password tab. Used only if basic authentication is active.
Defaults to empty string (`""`).
-
"""
def __init__(
@@ -276,8 +275,13 @@ def __init__(
waiter: core.Waiter,
app: pn.Template,
person: Person,
+ auth_user: AuthUser,
guest_password: str = "",
):
+ # CONTEXT VARIABLES ---------------------------------------------------
+ # Store authentication context
+ self.auth_context = auth_user.auth_context
+
# HEADER SECTION ------------------------------------------------------
# WIDGET
# Create PNG pane with app icon
@@ -325,15 +329,13 @@ def __init__(
if config.panel.gui.header_object:
self.header_row.append(self.header_object)
# Append a controls to the right side of header
- if auth.is_auth_active(config=config):
+ if self.auth_context.is_auth_active():
self.header_row.append(pn.HSpacer())
# Backend only for admin
- if auth.is_admin(user=pn_user(config), config=config):
+ if auth_user.is_admin():
self.header_row.append(self.backend_button)
# Guest override only for non guests
- if not auth.is_guest(
- user=pn_user(config), config=config, allow_override=False
- ):
+ if not auth_user.is_guest(allow_override=False):
self.header_row.append(self.toggle_guest_override_button)
self.header_row.append(self.logout_button)
self.header_row.append(
@@ -353,13 +355,11 @@ def reload_on_guest_override_callback(
):
# Update global variable that control guest override
# Only non guest can store this value in 'flags' table (guest users
- # are always guests, there is no use in sotring a flag for them)
- if not auth.is_guest(
- user=pn_user(config), config=config, allow_override=False
- ):
+ # are always guests, there is no use in sorting a flag for them)
+ if not auth_user.is_guest(allow_override=False):
models.set_flag(
config=config,
- id=f"{pn_user(config)}_guest_override",
+ id=f"{auth_user.name}_guest_override",
value=toggle,
)
# Show banner if override is active
@@ -810,9 +810,9 @@ def reload_on_no_more_order_callback(
self.sidebar_tabs = pn.Tabs(
width=sidebar_content_width,
)
- # Reload tabs according to auth.is_guest results and guest_override
+ # Reload tabs according to auth_user.is_guest results and guest_override
# flag (no need to cleans, tabs are already empty)
- self.load_sidebar_tabs(config=config, clear_before_loading=False)
+ self.load_sidebar_tabs(auth_user=auth_user, clear_before_loading=False)
# CALLBACKS
# Build menu button callback
@@ -825,7 +825,7 @@ def reload_on_no_more_order_callback(
)
# Submit password button callback
self.submit_password_button.on_click(
- lambda e: auth.submit_password(gi=self, config=config)
+ lambda e: self.auth_context.submit_password(gi=self)
)
# UTILITY METHODS ---------------------------------------------------------
@@ -935,7 +935,7 @@ def build_time_label(
# SIDEBAR SECTION
def load_sidebar_tabs(
- self, config: DictConfig, clear_before_loading: bool = True
+ self, auth_user: AuthUser, clear_before_loading: bool = True
) -> None:
"""Append tabs to the app template sidebar.
@@ -943,7 +943,7 @@ def load_sidebar_tabs(
Use the default value during normal operation to avoid tabs duplication.
Args:
- config (DictConfig): Hydra configuration dictionary.
+ auth_user (AuthUser): AuthUser object with authenticated user data.
clear_before_loading (bool, optional): Set to true to remove all tabs before appending the new ones. Defaults to True.
"""
# Clean tabs
@@ -953,18 +953,16 @@ def load_sidebar_tabs(
self.sidebar_tabs.append(self.sidebar_person_column)
# Append upload, download and stats only for non-guest
# Append password only for non-guest users if auth is active
- if not auth.is_guest(
- user=pn_user(config), config=config, allow_override=False
- ):
+ if not auth_user.is_guest(allow_override=False):
self.sidebar_tabs.append(self.sidebar_menu_upload_col)
self.sidebar_tabs.append(self.sidebar_download_orders_col)
self.sidebar_tabs.append(self.sidebar_stats_col)
- if auth.is_basic_auth_active(config=config):
+ if self.auth_context.is_basic_auth_active():
self.sidebar_tabs.append(self.sidebar_password)
def build_stats_and_info_text(
self,
- config: DictConfig,
+ auth_user: AuthUser,
df_stats: pd.DataFrame,
user: str,
version: str,
@@ -976,7 +974,7 @@ def build_stats_and_info_text(
This functions needs Data-Lunch version and the name of the hosting machine to populate the info section.
Args:
- config (DictConfig): Hydra configuration dictionary.
+ auth_user (AuthUser): AuthUser object with authenticated user data.
df_stats (pd.DataFrame): dataframe with statistics.
user (str): username.
version (str): Data-Lunch version.
@@ -1006,9 +1004,9 @@ def build_stats_and_info_text(
stylesheets=stylesheets,
)
# Define user group
- if auth.is_guest(user=user, config=config, allow_override=False):
+ if auth_user.is_guest(allow_override=False):
user_group = "guest"
- elif auth.is_admin(user=user, config=config):
+ elif auth_user.is_admin():
user_group = "admin"
else:
user_group = "user"
@@ -1088,12 +1086,18 @@ class BackendInterface:
Args:
config (DictConfig): Hydra configuration dictionary.
+ auth_user (AuthUser): AuthUser object with authenticated user data.
"""
def __init__(
self,
config: DictConfig,
+ auth_user: AuthUser,
):
+ # CONTEXT VARIABLES ---------------------------------------------------
+ # Store authentication context
+ self.auth_context = auth_user.auth_context
+
# HEADER SECTION ------------------------------------------------------
# WIDGET
@@ -1177,7 +1181,7 @@ def __init__(
)
# User list
self.users_tabulator = pn.widgets.Tabulator(
- value=auth.list_users_guests_and_privileges(config),
+ value=self.auth_context.list_users_guests_and_privileges(),
sizing_mode="stretch_height",
)
# Flags content (use empty dataframe to instantiate)
@@ -1283,13 +1287,13 @@ def __init__(
min_height=backend_min_height,
)
# Add controls only for admin users
- if not auth.is_admin(user=pn_user(config), config=config):
+ if not auth_user.is_admin():
self.backend_controls.append(self.access_denied_text)
self.backend_controls.append(pn.Spacer(height=15))
else:
# For basic auth use a password renewer, for oauth a widget for
# adding privileged users
- if auth.is_basic_auth_active(config=config):
+ if self.auth_context.is_basic_auth_active():
self.backend_controls.append(self.add_update_user_column)
else:
self.backend_controls.append(self.add_privileged_user_column)
@@ -1321,11 +1325,10 @@ def __init__(
# CALLBACKS
# Submit password button callback
def submit_password_button_callback(self, config):
- success = auth.backend_submit_password(
+ success = self.auth_context.backend_submit_password(
gi=self,
user_is_admin=self.password_widget.object.admin,
user_is_guest=self.password_widget.object.guest,
- config=config,
)
if success:
self.reload_backend(config)
@@ -1341,10 +1344,10 @@ def add_privileged_user_button_callback(self):
"user"
].value_input
# Add user
- auth.add_privileged_user(
- username_key_press,
+ AuthUser(
+ config=config, name=username_key_press
+ ).add_privileged_user(
is_admin=self.add_privileged_user_widget.object.admin,
- config=config,
)
self.reload_backend(config)
@@ -1362,9 +1365,9 @@ def delete_user_button_callback(self):
# Get username, updated at each key press
username_key_press = self.user_eraser._widgets["user"].value_input
# Delete user
- deleted_data = auth.remove_user(
- user=username_key_press, config=config
- )
+ deleted_data = AuthUser(
+ config=config, name=username_key_press
+ ).remove_user()
if (deleted_data["privileged_users_deleted"] > 0) or (
deleted_data["credentials_deleted"] > 0
):
@@ -1421,8 +1424,8 @@ def reload_backend(self, config: DictConfig) -> None:
config (DictConfig): Hydra configuration dictionary.
"""
# Users and guests lists
- self.users_tabulator.value = auth.list_users_guests_and_privileges(
- config
+ self.users_tabulator.value = (
+ self.auth_context.list_users_guests_and_privileges()
)
# Flags table content
df_flags = models.Flags.read_as_df(
diff --git a/dlunch/models.py b/dlunch/models.py
index b414d56..be669f8 100755
--- a/dlunch/models.py
+++ b/dlunch/models.py
@@ -3,7 +3,6 @@
Helper classes and utility functions for data management are defined here.
"""
-from datetime import datetime
import hydra
import logging
from omegaconf import DictConfig
@@ -1021,16 +1020,9 @@ def _create_database_with_retries(config: DictConfig) -> None:
# Check if admin exists
if session.get(Credentials, "admin") is None:
# Add authorization and credentials for admin
- auth.add_privileged_user(
- user="admin",
- is_admin=True,
- config=config,
- )
- auth.add_user_hashed_password(
- user="admin",
- password="admin",
- config=config,
- )
+ auth_user = auth.AuthUser(config=config, name="admin")
+ auth_user.add_privileged_user(is_admin=True)
+ auth_user.add_user_hashed_password(password="admin")
log.warning(
"admin user created, remember to change the default password"
)
@@ -1040,11 +1032,8 @@ def _create_database_with_retries(config: DictConfig) -> None:
) and config.basic_auth.guest_user:
# Add only credentials for guest (guest users are not included
# in privileged_users table)
- auth.add_user_hashed_password(
- user="guest",
- password="guest",
- config=config,
- )
+ auth_user = auth.AuthUser(config=config, name="guest")
+ auth_user.add_user_hashed_password(password="guest")
log.warning(
"guest user created, remember to change the default password"
)
diff --git a/dlunch/scheduled_tasks.py b/dlunch/scheduled_tasks.py
index 9a3fe6c..992d986 100644
--- a/dlunch/scheduled_tasks.py
+++ b/dlunch/scheduled_tasks.py
@@ -53,6 +53,9 @@ def reset_guest_user_password(config: DictConfig) -> callable:
callable: function to be scheduled.
"""
+ # Create instance of AuthContext
+ auth_context = auth.AuthContext(config=config)
+
async def scheduled_function() -> None:
"""Reset guest user password."""
log.info(f"reset guest user password executed at {dt.datetime.now()}")
@@ -61,7 +64,7 @@ async def scheduled_function() -> None:
config=config, id="reset_guest_user_password", value=True
)
# Reset password
- auth.set_guest_user_password(config)
+ auth_context.set_guest_user_password()
return scheduled_function
diff --git a/scripts/create_users_from_list.py b/scripts/create_users_from_list.py
index 3935a76..cfcf34e 100644
--- a/scripts/create_users_from_list.py
+++ b/scripts/create_users_from_list.py
@@ -10,7 +10,6 @@
# Email client
import ssl
import smtplib
-import html
from jinja2 import Environment, FileSystemLoader, select_autoescape
from email.message import EmailMessage
from email.utils import formatdate, make_msgid
@@ -75,11 +74,12 @@
# MIME
for user in new_users_names_df.itertuples():
# Generate a random password
- password = auth.generate_password(
+ auth_user = auth.AuthUser(config=config, name=user.name)
+ password = auth_user.auth_context.generate_password(
special_chars=config.basic_auth.psw_special_chars
)
# Add hashed password to credentials file
- auth.add_user_hashed_password(user.name, password, config=config)
+ auth_user.add_user_hashed_password(password=password)
# Send email to user: recipient (built from username)
send_to = user.email