-
Notifications
You must be signed in to change notification settings - Fork 0
/
sql_alchemy_dal_example.py
136 lines (108 loc) · 3.56 KB
/
sql_alchemy_dal_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from contextlib import asynccontextmanager
from quart import Quart
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String
from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy.future import select
from sqlalchemy import update
#
# SA Engine init
#
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
engine = create_async_engine(DATABASE_URL, future=True, echo=True)
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
Base = declarative_base()
#
# Data Model
#
class Book(Base):
__tablename__ = "books"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
author = Column(String, nullable=False)
release_year = Column(Integer, nullable=False)
def json(self):
return {
"id": self.id,
"name": self.name,
"author": self.author,
"release_year": self.release_year,
}
#
# Data Access Layer
#
class BookDAL:
def __init__(self, db_session: Session):
self.db_session = db_session
async def create_book(self, name: str, author: str, release_year: int):
new_book = Book(name=name, author=author, release_year=release_year)
self.db_session.add(new_book)
await self.db_session.flush()
return new_book.json()
async def get_all_books(self) -> List[Book]:
q = await self.db_session.execute(select(Book).order_by(Book.id))
return {"books": [b.json() for b in q.scalars().all()]}
async def get_book(self, book_id) -> Book:
q = select(Book).where(Book.id == book_id)
q = await self.db_session.execute(q)
b = q.one()
return b[0].json()
async def update_book(
self,
book_id: int,
name: Optional[str],
author: Optional[str],
release_year: Optional[int],
):
q = update(Book).where(Book.id == book_id)
if name:
q = q.values(name=name)
if author:
q = q.values(author=author)
if release_year:
q = q.values(release_year=release_year)
q.execution_option
await self.db_session.execute(q)
return get_book(book_id)
#
# Quart App
#
app = Quart(__name__)
@app.before_serving
async def startup():
# create db tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async with book_dal() as bd:
await bd.create_book("name", "author", 2010)
@asynccontextmanager
async def book_dal():
async with async_session() as session:
async with session.begin():
yield BookDAL(session)
@app.post("/books")
async def create_book(name: str, author: str, release_year: int):
async with book_dal() as bd:
await bd.create_book(name, author, release_year)
@app.get("/books/<int:book_id>")
async def get_book(book_id: int):
async with book_dal() as bd:
return await bd.get_book(book_id)
@app.get("/books")
async def get_all_books() -> List[Book]:
async with book_dal() as bd:
return await bd.get_all_books()
@app.put("/books/<int:book_id>")
async def update_book(
book_id: int,
name: Optional[str] = None,
author: Optional[str] = None,
release_year: Optional[int] = None,
):
async with book_dal() as bd:
return await bd.update_book(book_id, name, author, release_year)
if __name__ == "__main__":
app.run(port=1111, host="127.0.0.1")