使用 Encode/Databases 的非同步 SQL (關聯式) 資料庫 (已棄用)¶
「已棄用」
本教學已棄用,並將在未來版本中移除。
您也可以在 FastAPI 中使用 encode/databases
搭配 async
和 await
連線到資料庫。
它相容於
- PostgreSQL
- MySQL
- SQLite
在本範例中,我們將使用 SQLite,因為它使用單一檔案且 Python 已內建支援。因此,您可以複製此範例並照原樣執行。
之後,對於您的正式應用程式,您可能需要使用像是 PostgreSQL 的資料庫伺服器。
提示
您可以參考關於 SQLAlchemy ORM 的章節(SQL (關聯式) 資料庫)中的概念,例如使用工具函式在資料庫中執行操作,使其獨立於您的 FastAPI 程式碼。
本節並未應用這些概念,以便與 Starlette 中的對應部分保持一致。
匯入並設定 SQLAlchemy
¶
- 匯入
SQLAlchemy
。 - 建立一個
metadata
物件。 - 使用
metadata
物件建立一個名為notes
的資料表。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
提示
請注意,所有這些程式碼都是純 SQLAlchemy Core。
databases
目前尚未執行任何操作。
匯入並設定 databases
¶
- 匯入
databases
。 - 建立
DATABASE_URL
。 - 建立一個
database
物件。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
提示
如果您要連線到不同的資料庫(例如 PostgreSQL),則需要更改 DATABASE_URL
。
建立資料表¶
在這種情況下,我們在同一個 Python 檔案中建立資料表,但在正式環境中,您可能希望使用 Alembic 建立它們,並整合遷移等功能。
這裡,這個部分會在啟動您的 FastAPI 應用程式之前直接執行。
- 建立一個
engine
。 - 從
metadata
物件建立所有資料表。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
建立模型¶
建立 Pydantic 模型以用於
- 要建立的筆記(
NoteIn
)。 - 要返回的筆記(
Note
)。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
透過建立這些 Pydantic 模型,輸入資料將會被驗證、序列化(轉換)和註釋(記錄)。
因此,您將能夠在互動式 API 文件中看到所有內容。
連線和斷線¶
- 建立您的
FastAPI
應用程式。 - 建立事件處理常式以連線和斷開資料庫。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
讀取筆記¶
建立用於讀取筆記的路徑操作函式
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
筆記
請注意,由於我們使用 await
與資料庫通訊,因此路徑操作函式是用 async
宣告的。
請注意 response_model=List[Note]
¶
它使用 typing.List
。
這會將輸出資料記錄(並驗證、序列化、篩選)為 Note
的 list
(列表)。
建立筆記¶
建立用於建立筆記的路徑操作函式
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
資訊
在 Pydantic v1 中,該方法稱為 .dict()
,在 Pydantic v2 中已棄用(但仍支援),並重新命名為 .model_dump()
。
此處的範例使用 .dict()
以與 Pydantic v1 相容,但如果您可以使用 Pydantic v2,則應改用 .model_dump()
。
筆記
請注意,由於我們使用 await
與資料庫通訊,因此路徑操作函式是用 async
宣告的。
關於 {**note.dict(), "id": last_record_id}
¶
note
是一個 Pydantic Note
物件。
note.dict()
會傳回一個包含其資料的 dict
(字典),類似
{
"text": "Some note",
"completed": False,
}
但它沒有 id
欄位。
因此,我們建立一個新的 dict
,其中包含來自 note.dict()
的鍵值對,使用
{**note.dict()}
**note.dict()
會直接「解包」鍵值對,因此,{**note.dict()}
或多或少會是 note.dict()
的副本。
然後,我們擴展該副本 dict
,新增另一個鍵值對:"id": last_record_id
{**note.dict(), "id": last_record_id}
因此,傳回的最終結果會類似
{
"id": 1,
"text": "Some note",
"completed": False,
}
檢查它¶
您可以照原樣複製此程式碼,並在 http://127.0.0.1:8000/docs 查看文件。
您可以在那裡看到所有 API 文件並與其互動
更多資訊¶
您可以在其 GitHub 頁面上閱讀更多關於 encode/databases
的資訊。