Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ordering fix, 3rd party scopes #52

Merged
merged 1 commit into from
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion services_backend/models/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from enum import Enum

from fastapi_sqlalchemy import db
from sqlalchemy import Boolean
from sqlalchemy import Enum as DbEnum
from sqlalchemy import ForeignKey, Integer, String
from sqlalchemy.ext.hybrid import hybrid_property
Expand Down Expand Up @@ -63,9 +64,54 @@ class Button(Base):
link: Mapped[str] = mapped_column(String)
type: Mapped[Type] = mapped_column(DbEnum(Type, native_enum=False), nullable=False)

_scopes: Mapped[list[Scope]] = relationship("Scope", back_populates="button", lazy='joined', cascade='delete')

@hybrid_property
def required_scopes(self) -> set[str]:
return set(s.name for s in self._scopes if s.is_required is True)

@required_scopes.inplace.setter
def _required_scopes_setter(self, value: set[str]):
old_scopes = self.required_scopes
new_scopes = set(value)

# Удаляем более ненужные скоупы
for s in self._scopes:
if s.name in (old_scopes - new_scopes):
db.session.delete(s)

# Добавляем недостающие скоупы
for s in new_scopes - old_scopes:
new_scope = Scope(button=self, name=s, is_required=True)
db.session.add(new_scope)
self._scopes.append(new_scope)

@hybrid_property
def optional_scopes(self) -> set[str]:
return set(s.name for s in self._scopes if s.is_required is False)

@optional_scopes.inplace.setter
def _optional_scopes_setter(self, value: set[str]):
old_scopes = self.optional_scopes
new_scopes = set(value)

# Удаляем более ненужные скоупы
for s in self._scopes:
if s.name in (old_scopes - new_scopes):
db.session.delete(s)

# Добавляем недостающие скоупы
for s in new_scopes - old_scopes:
new_scope = Scope(button=self, name=s, is_required=False)
db.session.add(new_scope)
self._scopes.append(new_scope)


class Scope(Base):
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = mapped_column(String, nullable=True)
category_id: Mapped[int] = mapped_column(Integer, ForeignKey("category.id"))
category_id: Mapped[int] = mapped_column(Integer, ForeignKey("category.id"), nullable=True)
button_id: Mapped[int] = mapped_column(Integer, ForeignKey("button.id"), nullable=True)
is_required: Mapped[bool] = mapped_column(Boolean, default=True, nullable=True)
category: Mapped[Category] = relationship("Category", back_populates="_scopes", foreign_keys=[category_id])
button: Mapped[Category] = relationship("Button", back_populates="_scopes", foreign_keys=[button_id])
170 changes: 137 additions & 33 deletions services_backend/routes/button.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from enum import Enum

from auth_lib.fastapi import UnionAuth
from fastapi import APIRouter, Depends, HTTPException
from fastapi_sqlalchemy import db
from pydantic import Field
from pydantic import Field, conint

from services_backend.models.database import Button, Category, Type
from services_backend.schemas import Base
Expand All @@ -21,17 +22,26 @@ class ButtonCreate(Base):
name: str = Field(description='Название кнопки')
link: str = Field(description='Ссылка, на которую перенаправляет кнопка')
type: Type = Field(description='Тип открываемой ссылки (Ссылка приложения/Браузер в приложении/Браузер')
required_scopes: set[str] | None = Field(description='Каким скоупы нужны, чтобы кнопка была доступна', default=None)
optional_scopes: set[str] | None = Field(description='Каким скоупы желательны', default=None)


class ButtonUpdate(Base):
category_id: int | None = Field(description='Айди категории', default=None)
icon: str | None = Field(description='Иконка кнопки', default=None)
name: str | None = Field(description='Название кнопки', default=None)
order: int | None = Field(description='Порядок, в котором отображаются кнопки', default=None)
order: conint(gt=0) | None = Field(description='Порядок, в котором отображаются кнопки', default=None)
link: str | None = Field(description='Ссылка, на которую перенаправляет кнопка', default=None)
type: Type | None = Field(
description='Тип открываемой ссылки (Ссылка приложения/Браузер в приложении/Браузер', default=None
)
required_scopes: set[str] | None = Field(description='Каким скоупы нужны, чтобы кнопка была доступна', default=None)
optional_scopes: set[str] | None = Field(description='Каким скоупы желательны', default=None)


class ButtonView(Enum):
ACTIVE = "active"
BLOCKED = "blocked"


class ButtonGet(Base):
Expand All @@ -41,6 +51,10 @@ class ButtonGet(Base):
link: str | None = Field(description='Ссылка, на которую перенаправляет кнопка')
order: int | None = Field(description='Порядок, в котором отображаются кнопки')
type: Type | None = Field(description='Тип открываемой ссылки (Ссылка приложения/Браузер в приложении/Браузер')
view: ButtonView | None = Field(description='Доступна ли запрашиваемая кнопка', default=None)
required_scopes: list[str] | None = None
optional_scopes: list[str] | None = None
scopes: list[str] | None = Field(description='Скоупы, которые можно запросить', default=None)


class ButtonsGet(Base):
Expand All @@ -50,7 +64,7 @@ class ButtonsGet(Base):
# endregion


@button.post("", response_model=ButtonGet)
@button.post("", response_model=ButtonGet, response_model_exclude_none=True)
def create_button(
button_inp: ButtonCreate,
category_id: int,
Expand All @@ -67,16 +81,24 @@ def create_button(
last_button = (
db.session.query(Button).filter(Button.category_id == category_id).order_by(Button.order.desc()).first()
)
button = Button(**button_inp.dict(exclude_none=True))
required_scopes = button_inp.required_scopes
optional_scopes = button_inp.optional_scopes
button_inp.optional_scopes = None
button_inp.required_scopes = None
button = Button(**button_inp.model_dump(exclude_none=True))
button.category_id = category_id
if last_button:
button.order = last_button.order + 1
if required_scopes is not None:
button.required_scopes = required_scopes
if optional_scopes is not None:
button.optional_scopes = optional_scopes
db.session.add(button)
db.session.flush()
return button


@button.get("", response_model=ButtonsGet)
@button.get("", response_model=ButtonsGet, response_model_exclude_unset=True)
def get_buttons(
category_id: int,
user=Depends(UnionAuth(allow_none=True, auto_error=False)),
Expand All @@ -90,10 +112,37 @@ def get_buttons(
category = db.session.query(Category).filter(Category.id == category_id).one_or_none()
if not category:
raise HTTPException(status_code=404, detail="Category does not exist")
return category


@button.get("/{button_id}", response_model=ButtonGet)
result = {"buttons": []}
try:
user_scopes = {scope["name"] for scope in user["user_scopes"]}
except TypeError:
user_scopes = frozenset()
for button in category.buttons:
view = ButtonView.ACTIVE
scopes = set()
if button.required_scopes - user_scopes:
view = ButtonView.BLOCKED
else:
scopes |= button.required_scopes
scopes |= user_scopes & button.optional_scopes
to_add = {
"id": button.id,
"icon": button.icon,
"name": button.name,
"link": button.link,
"order": button.order,
"type": button.type,
"view": view.value,
"required_scopes": button.required_scopes,
"optional_scopes": button.optional_scopes,
}
if scopes:
to_add["scopes"] = list(scopes)
result["buttons"].append(to_add)
return result


@button.get("/{button_id}", response_model=ButtonGet, response_model_exclude_unset=True)
def get_button(
button_id: int,
category_id: int,
Expand All @@ -104,6 +153,10 @@ def get_button(
Необходимые scopes: `-`
"""
user_id = user.get('id') if user is not None else None
try:
user_scopes = {scope["name"] for scope in user["user_scopes"]}
except TypeError:
user_scopes = frozenset()
logger.info(f"User {user_id} triggered get_button")
category = db.session.query(Category).filter(Category.id == category_id).one_or_none()
if not category:
Expand All @@ -113,7 +166,27 @@ def get_button(
raise HTTPException(status_code=404, detail="Button does not exist")
if button.category_id != category_id:
raise HTTPException(status_code=404, detail="Button is not this category")
return button
view = ButtonView.ACTIVE
scopes = set()
if button.required_scopes - user_scopes:
view = ButtonView.BLOCKED
else:
scopes |= button.required_scopes
scopes |= user_scopes & button.optional_scopes
result = {
"id": button.id,
"icon": button.icon,
"name": button.name,
"link": button.link,
"order": button.order,
"type": button.type,
"view": view.value,
"required_scopes": button.required_scopes,
"optional_scopes": button.optional_scopes,
}
if scopes:
result["scopes"] = list(scopes)
return result


@button.delete("/{button_id}", response_model=None)
Expand Down Expand Up @@ -152,45 +225,52 @@ def update_button(
Необходимые scopes: `services.button.update`
"""
logger.info(f"User {user.get('id')} triggered create_category")
query = db.session.query(Button).filter(Category.id == category_id).filter(Button.id == button_id)
query = db.session.query(Button).filter(Button.category_id == category_id).filter(Button.id == button_id)
button = query.one_or_none()
last_button = (
db.session.query(Button)
.filter(Category.id == category_id)
.filter(Button.category_id == category_id)
.order_by(Button.order.desc())
.first()
db.session.query(Button).filter(Button.category_id == category_id).order_by(Button.order.desc()).first()
)
category = db.session.query(Category).filter(Category.id == category_id).one_or_none()

if not category:
raise HTTPException(status_code=404, detail="Category does not exist")
if not button:
raise HTTPException(status_code=404, detail="Button does not exist")
if not any(button_inp.dict().values()):
if not any(button_inp.model_dump().values()):
raise HTTPException(status_code=400, detail="Empty schema")
if button.category_id != category_id:
raise HTTPException(status_code=404, detail="Button is not this category")

if button_inp.order:
if last_button and (button_inp.order > last_button.order + 1):
if button_inp.required_scopes is not None:
button.required_scopes = button_inp.required_scopes
if button_inp.optional_scopes is not None:
button.optional_scopes = button_inp.optional_scopes
db.session.flush()

if button_inp.order and button.order != button_inp.order:
if last_button and (button_inp.order > last_button.order):
raise HTTPException(
status_code=400,
detail=f"Can`t create button with order {button_inp.order}. " f"Last category is {last_button.order}",
)
if button_inp.order < 1:
raise HTTPException(status_code=400, detail="Order can`t be less than 1")
if button.order > button_inp.order:
db.session.query(Button).filter(Category.id == category_id).filter(Button.order < button.order).update(
{"order": Button.order + 1}
)
elif button.order < button_inp.order:
db.session.query(Button).filter(Category.id == category_id).filter(Button.order > button.order).update(
{"order": Button.order - 1}
)

query.update(button_inp.dict(exclude_unset=True, exclude_none=True))
db.session.flush()
swapping_button = (
db.session.query(Button)
.filter(Button.category_id == category_id)
.filter(Button.order == button_inp.order)
.one()
)
swapping_button.order, button.order = button.order, swapping_button.order
required_scopes = button_inp.required_scopes
optional_scopes = button_inp.optional_scopes
button_inp.optional_scopes = None
button_inp.required_scopes = None
if dump := button_inp.model_dump(exclude_unset=True, exclude_none=True):
query.update(dump)
db.session.flush()
if required_scopes is not None:
button.required_scopes = required_scopes
if optional_scopes is not None:
button.optional_scopes = optional_scopes
return button


Expand All @@ -206,8 +286,32 @@ def get_service(
TODO: Переделать ручку, сделав сервис независимым от кнопки
"""
user_id = user.get('id') if user is not None else None
try:
user_scopes = {scope["name"] for scope in user["user_scopes"]}
except TypeError:
user_scopes = frozenset()
logger.info(f"User {user_id} triggered get_button")
button = db.session.query(Button).filter(Button.id == button_id).one_or_none()
if not button:
raise HTTPException(status_code=404, detail="Button does not exist")
return button
view = ButtonView.ACTIVE
scopes = set()
if button.required_scopes - user_scopes:
view = ButtonView.BLOCKED
else:
scopes |= button.required_scopes
scopes |= user_scopes & button.optional_scopes
result = {
"id": button.id,
"icon": button.icon,
"name": button.name,
"link": button.link,
"order": button.order,
"type": button.type,
"view": view.value,
"required_scopes": button.required_scopes,
"optional_scopes": button.optional_scopes,
}
if scopes:
result["scopes"] = list(scopes)
return scopes
Loading
Loading