Asynchronous abstract methods for service layer built into the repository pattern for SQLAlchemy or MongoDB with the same service layer.
Inherit the usual service methods:
pagination
, create
, get
, update
, delete
, count
$ pip install service-repository
---> 100%
Create the model, as in the example song/models.py
Create and extend of BaseRepositorySqlalchemy
in you repository, as in the example song/repositories.py
Create and extend you service, as in the example song/services.py
The following is an example with the connection session, but pass the session as needed.
import pytest
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlmodel import SQLModel
# Get you connection session of SQLAlchemy, example next, but make as you
@pytest.fixture()
async def sqlalchemy():
SQLALCHEMY_DATABASE_URI = "sqlite+aiosqlite:///./tests/testing.db"
engine = create_async_engine(
SQLALCHEMY_DATABASE_URI, future=True, echo=True
)
async_session = sessionmaker(
bind=engine, expire_on_commit=False, class_=AsyncSession
)
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
yield async_session
await conn.run_sync(SQLModel.metadata.drop_all)
The following is an example with register created.
import pytest
from tests.song.services import SongService
from tests.song.models import SongCreate
@pytest.fixture
async def song_one(sqlalchemy):
song_create = {
"title": "Song title 1",
"is_active": True,
}
async with sqlalchemy() as session:
song = await SongService(db=session).create(
schema_in=SongCreate(**song_create)
)
return song
import pytest
from tests.song.services import SongService
@pytest.mark.asyncio
async def test_song_service_with_or_and_asc_filter_paginate_song(sqlalchemy):
criteria = [
{"field": "foo", "op": "ilike", "value": "%bar%"},
]
# Or with criteria in format or
criteria = [
{
"or": [
{"field": "title", "op": "==", "value": "Song title 2"},
{"field": "title", "op": "==", "value": "Song title 3"},
]
}
]
sort = [
{"field": "title", "direction": "asc"},
]
async with sqlalchemy() as session:
pagination = await SongService(db=session).paginate(
page=1, per_page=5, criteria=criteria, sort=sort
)
Operator options for filtering are:
is_null, is_not_null, eq, ne, gt, lt, ge, le, like, ilike, not_ilike, in, not_in, any, not_any
import pytest
from tests.song.services import SongService
from tests.song.models import SongCreate
@pytest.mark.asyncio
async def test_song_service_create_song(sqlalchemy):
song_data_one = {
"title": "Song title 1",
"is_active": True,
}
async with sqlalchemy() as session:
song = await SongService(db=session).create(
schema_in=SongCreate(**song_data_one)
)
import pytest
from tests.song.services import SongService
from tests.song.models import SongUpdate
@pytest.mark.asyncio
async def test_song_service_update_song(sqlalchemy, song_one):
song_data = {
"title": "Song title 2",
"is_active": False,
}
async with sqlalchemy() as session:
song = await SongService(db=session).update(
instance=song_one, schema_in=SongUpdate(**song_data)
)
import pytest
from tests.song.services import SongService
@pytest.mark.asyncio
async def test_song_service_get_song(sqlalchemy, song_one):
async with sqlalchemy() as session:
song = await SongService(db=session).get(id=song_one.id)
import pytest
from tests.song.services import SongService
@pytest.mark.asyncio
async def test_song_service_delete_song(sqlalchemy, song_one):
async with sqlalchemy() as session:
await SongService(db=session).delete(id=song_one.id)
import pytest
from tests.song.services import SongService
@pytest.mark.asyncio
async def test_song_service_count_song(sqlalchemy):
async with sqlalchemy() as session:
total = await SongService(db=session).count()
Create the model, as in the example product/models.py
Create and extend of BaseRepositoryMotor
in you repository, as in the example product/repositories.py
Create and extend you service, as in the example product/services.py
The following is an example with the connection session, but pass the session as needed.
import pytest
from motor.motor_asyncio import AsyncIOMotorClient
# Get you connection session of Motor, example next, but make as you
@pytest.fixture()
async def motor():
client = AsyncIOMotorClient(
"localhost",
maxPoolSize=10,
minPoolSize=10,
tz_aware=True,
)
await client.drop_database("testing")
session = client.get_database("testing")
yield session
The following is an example with register created.
import pytest
from tests.product.models import ProductCreate
from tests.product.services import ProductService
@pytest.fixture
async def product_one(motor, product_data_one):
product_data = {
"title": "Product title 1",
"is_active": True,
}
product = await ProductService(db=motor).create(
schema_in=ProductCreate(**product_data)
)
return product
import pytest
from tests.product.services import ProductService
@pytest.mark.asyncio
async def test_product_service_with_or_and_asc_filter_paginate_product(
app, motor, product_data_one
):
criteria = {"title": "Product title 1"}
sort = [("title", -1)]
pagination = await ProductService(db=motor).paginate(
page=1, per_page=5, criteria=criteria, sort=sort
)
import pytest
from tests.product.models import ProductCreate
from tests.product.services import ProductService
@pytest.mark.asyncio
async def test_product_service_create_product(motor):
product_data = {
"title": "Product title 1",
"is_active": True,
}
product = await ProductService(db=motor).create(
schema_in=ProductCreate(**product_data)
)
import pytest
from tests.product.models import ProductUpdate
from tests.product.services import ProductService
@pytest.mark.asyncio
async def test_product_service_update_product(motor, product_one):
product_data = {
"title": "Product title 2",
"is_active": False,
}
product = await ProductService(db=motor).update(
instance=product_one, schema_in=ProductUpdate(**product_data)
)
import pytest
from tests.product.services import ProductService
@pytest.mark.asyncio
async def test_product_service_get_product(motor, product_one):
product = await ProductService(db=motor).get(_id=product_one.id)
import pytest
from tests.product.services import ProductService
@pytest.mark.asyncio
async def test_product_service_delete_product(motor, product_one):
await ProductService(db=motor).delete(_id=product_one.id)
import pytest
from tests.product.services import ProductService
@pytest.mark.asyncio
async def test_product_service_count_product(motor):
total = await ProductService(db=motor).count()
You can create an inherited base service layer and add its methods, as in the example in FastApi with convenience methods for 404:
from fastapi import HTTPException, status
from pydantic import BaseModel
from service_repository.services import BaseService as Base
class BaseService(Base):
"""Class representing the base service."""
def __init__(self, db) -> None:
self.db = db
async def get_or_404(self, **kwargs):
"""Get one instance by filter or 404 if not exists."""
instance = await self.get(**kwargs)
if not instance:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return instance
async def update_or_404(self, schema_in: BaseModel, **kwargs):
"""Update instance or 404 if not exists."""
instance = await self.get_or_404(**kwargs)
instance = await self.update(instance=instance, schema_in=schema_in)
return instance
async def delete_or_404(self, **kwargs):
"""Delete one instance by filter or 404 if not exists."""
await self.get_or_404(**kwargs)
await self.delete(**kwargs)
You can create an inherited base service layer and add its methods, as in the example in Flask with convenience methods for 404:
from flask import abort
from pydantic import BaseModel
from service_repository.services import BaseService as Base
class BaseService(Base):
"""Class representing the base service."""
def __init__(self, db) -> None:
self.db = db
async def get_or_404(self, **kwargs):
"""Get one instance by filter or 404 if not exists."""
instance = await self.get(**kwargs)
if not instance:
raise abort(404)
return instance
async def update_or_404(self, schema_in: BaseModel, **kwargs):
"""Update instance or 404 if not exists."""
instance = await self.get_or_404(**kwargs)
instance = await self.update(instance=instance, schema_in=schema_in)
return instance
async def delete_or_404(self, **kwargs):
"""Delete one instance by filter or 404 if not exists."""
await self.get_or_404(**kwargs)
await self.delete(**kwargs)
If you discover any security related issues, please email fndmiranda@gmail.com instead of using the issue tracker.
The MIT License (MIT). Please see License File for more information.