FastAPI 依賴注入實戰:資料庫管理
基本資料庫連接管理
1. 同步資料庫連接
最簡單的資料庫連接模式是為每個請求創建一個新的連接,並在請求結束時關閉它。
- 優點:簡單易懂,每個請求都有獨立的連接,避免並發問題
- 缺點:頻繁創建和關閉連接可能影響性能,不適合高並發場景
from fastapi import FastAPI, Depends
import sqlite3
from contextlib import contextmanager
app = FastAPI()
@contextmanager
def get_db_connection():
conn = sqlite3.connect("example.db")
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def get_db():
with get_db_connection() as conn:
yield conn
@app.get("/users/{user_id}")
def read_user(user_id: int, db: sqlite3.Connection = Depends(get_db)):
user = db.execute(
"SELECT * FROM users WHERE id = ?", (user_id,)
).fetchone()
if user is None:
return {"error": "User not found"}
return dict(user)
2. 異步資料庫連接
對於支持異步的資料庫驅動,我們可以使用異步依賴實現更高效的連接管理。
- 優點:支持異步操作提高並發性能,連接在應用啟動時建立,避免頻繁創建連接
- 缺點:需要使用支持異步的資料庫驅動,可能需要更複雜的錯誤處理
from fastapi import FastAPI, Depends
from databases import Database
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
# 資料庫配置
DATABASE_URL = "sqlite:///./test.db"
database = Database(DATABASE_URL)
# SQLAlchemy 模型定義
metadata = MetaData()
users = Table(
"users",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
Column("email", String)
)
# 創建表(在實際應用中通常使用遷移工具)
engine = create_engine(DATABASE_URL)
metadata.create_all(engine)
app = FastAPI()
# 啟動和關閉事件
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
# 依賴函數
async def get_db():
return database
@app.get("/users/{user_id}")
async def read_user(user_id: int, db: Database = Depends(get_db)):
query = users.select().where(users.c.id == user_id)
user = await db.fetch_one(query)
if user is None:
return {"error": "User not found"}
return dict(user)
進階資料庫管理模式
1. 連接池管理
對於高並發應用,使用連接池可以顯著提高性能。
- 優點:高效重用資料庫連接,適合高並發場景,避免連接泄漏
- 缺點:需要謹慎配置池大小和超時,可能需要處理池耗盡的情況
from fastapi import FastAPI, Depends
import databases
import sqlalchemy
# 資料庫配置
DATABASE_URL = "postgresql://user:password@localhost/dbname"
database = databases.Database(DATABASE_URL)
# SQLAlchemy 模型
metadata = sqlalchemy.MetaData()
users = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String),
sqlalchemy.Column("email", sqlalchemy.String)
)
app = FastAPI()
@app.on_event("startup")
async def startup():
# 連接池配置
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
async def get_db():
return database
@app.get("/users/{user_id}")
async def read_user(user_id: int, db: databases.Database = Depends(get_db)):
query = users.select().where(users.c.id == user_id)
user = await db.fetch_one(query)
if user is None:
return {"error": "User not found"}
return dict(user)
2. 事務管理
對於需要原子性操作的場景,我們可以使用依賴注入實現事務管理。
- 優點: 確保操作的原子性,自動處理提交和回滾,提供一致的資料視圖
- 缺點: 需要謹慎管理事務範圍,長時間事務可能影響並發性能
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.declarative import declarative_base
from pydantic import BaseModel
from typing import List
# 資料庫配置
DATABASE_URL = "postgresql://user:password@localhost/dbname"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# SQLAlchemy 模型
class UserModel(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
# Pydantic 模型
class UserCreate(BaseModel):
name: str
email: str
class User(UserCreate):
id: int
class Config:
orm_mode = True
app = FastAPI()
# 依賴函數
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 使用事務的路由
@app.post("/users/", response_model=User)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
# 檢查郵箱是否已存在
db_user = db.query(UserModel).filter(UserModel.email == user.email).first()
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
# 創建新用戶
db_user = UserModel(**user.dict())
# 在事務中執行操作
try:
db.add(db_user)
db.commit()
db.refresh(db_user)
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
return db_user
3. 資源庫模式 (Repository Pattern)
資源庫模式將資料庫操作封裝在專用類中,提供更好的關注點分離。
- 優點:更好的關注點分離,提高代碼可讀性和可維護性,便於單元測試
- 缺點:增加額外的抽象層,可能導致代碼量增加
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List, Optional
# 假設已定義 SQLAlchemy 模型和 get_db 依賴
# 用戶資源庫
class UserRepository:
def __init__(self, db: Session):
self.db = db
def get_by_id(self, user_id: int) -> Optional[UserModel]:
return self.db.query(UserModel).filter(UserModel.id == user_id).first()
def get_by_email(self, email: str) -> Optional[UserModel]:
return self.db.query(UserModel).filter(UserModel.email == email).first()
def create(self, user: UserCreate) -> UserModel:
db_user = UserModel(**user.dict())
self.db.add(db_user)
self.db.commit()
self.db.refresh(db_user)
return db_user
def list_all(self, skip: int = 0, limit: int = 100) -> List[UserModel]:
return self.db.query(UserModel).offset(skip).limit(limit).all()
# 依賴函數
def get_user_repository(db: Session = Depends(get_db)) -> UserRepository:
return UserRepository(db)
app = FastAPI()
@app.get("/users/{user_id}", response_model=User)
def read_user(
user_id: int,
user_repo: UserRepository = Depends(get_user_repository)
):
db_user = user_repo.get_by_id(user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/", response_model=User)
def create_user(
user: UserCreate,
user_repo: UserRepository = Depends(get_user_repository)
):
db_user = user_repo.get_by_email(user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return user_repo.create(user)
高級技巧與最佳實踐
1. 多資料庫支持
有時應用需要連接多個資料庫,我們可以使用依賴注入來管理多個資料庫連接。
- 優點:支持多資料庫架構,清晰區分不同資料庫的用途,便於實現讀寫分離
- 缺點:增加系統複雜性,需要處理跨資料庫一致性問題
from fastapi import FastAPI, Depends
from databases import Database
app = FastAPI()
# 多資料庫配置
main_db = Database("postgresql://user:password@localhost/main_db")
analytics_db = Database("postgresql://user:password@localhost/analytics_db")
@app.on_event("startup")
async def startup():
await main_db.connect()
await analytics_db.connect()
@app.on_event("shutdown")
async def shutdown():
await main_db.disconnect()
await analytics_db.disconnect()
# 依賴函數
async def get_main_db():
return main_db
async def get_analytics_db():
return analytics_db
@app.get("/user-stats/{user_id}")
async def get_user_stats(
user_id: int,
main_db: Database = Depends(get_main_db),
analytics_db: Database = Depends(get_analytics_db)
):
# 從主資料庫獲取用戶資訊
user = await main_db.fetch_one(
"SELECT id, name FROM users WHERE id = :id",
values={"id": user_id}
)
if not user:
return {"error": "User not found"}
# 從分析資料庫獲取用戶統計數據
stats = await analytics_db.fetch_one(
"SELECT user_id, visit_count, last_visit FROM user_stats WHERE user_id = :id",
values={"id": user_id}
)
# 組合結果
return {
"user": dict(user),
"stats": dict(stats) if stats else {"visit_count": 0}
}
2. 動態資料庫選擇
在某些場景下,我們可能需要根據請求動態選擇資料庫連接。
- 優點: 支持多租戶架構,根據請求上下文動態選擇資料庫,實現資料隔離
- 缺點: 需要管理多個資料庫連接,可能增加系統資源消耗
from fastapi import FastAPI, Depends, Header
from databases import Database
from typing import Dict, Optional
app = FastAPI()
# 資料庫連接池
db_pool: Dict[str, Database] = {
"tenant1": Database("postgresql://user:password@localhost/tenant1"),
"tenant2": Database("postgresql://user:password@localhost/tenant2"),
"default": Database("postgresql://user:password@localhost/default")
}
@app.on_event("startup")
async def startup():
for db in db_pool.values():
await db.connect()
@app.on_event("shutdown")
async def shutdown():
for db in db_pool.values():
await db.disconnect()
# 依賴函數
async def get_tenant_db(x_tenant_id: Optional[str] = Header(None)):
# 根據請求頭選擇租戶資料庫
tenant_id = x_tenant_id or "default"
return db_pool.get(tenant_id, db_pool["default"])
@app.get("/data")
async def read_data(db: Database = Depends(get_tenant_db)):
# 從選定的租戶資料庫讀取數據
data = await db.fetch_all("SELECT * FROM items LIMIT 10")
return [dict(item) for item in data]
測試資料庫依賴
測試資料庫依賴是確保應用穩定性的關鍵。以下是一些測試策略:
1. 使用測試資料庫
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 測試資料庫配置
TEST_DATABASE_URL = "sqlite:///./test.db"
test_engine = create_engine(TEST_DATABASE_URL)
TestSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=test_engine)
# 創建測試資料庫和表
@pytest.fixture(scope="module")
def setup_test_db():
Base.metadata.create_all(bind=test_engine)
yield
Base.metadata.drop_all(bind=test_engine)
# 覆蓋依賴
@pytest.fixture
def client(setup_test_db):
def override_get_db():
db = TestSessionLocal()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as client:
yield client
app.dependency_overrides = {}
# 測試用例
def test_create_user(client):
response = client.post(
"/users/",
json={"name": "Test User", "email": "test@example.com"}
)
assert response.status_code == 200
data = response.json()
assert data["name"] == "Test User"
assert data["email"] == "test@example.com"
assert "id" in data
2. 使用模擬對象
import pytest
from fastapi.testclient import TestClient
from unittest.mock import MagicMock, patch
@pytest.fixture
def mock_db():
# 創建模擬資料庫會話
mock_session = MagicMock()
# 配置模擬查詢結果
mock_user = MagicMock()
mock_user.id = 1
mock_user.name = "Test User"
mock_user.email = "test@example.com"
# 配置模擬查詢方法
mock_query = MagicMock()
mock_query.filter.return_value.first.return_value = mock_user
mock_session.query.return_value = mock_query
return mock_session
@pytest.fixture
def client(mock_db):
def override_get_db():
yield mock_db
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as client:
yield client
app.dependency_overrides = {}
def test_read_user(client, mock_db):
response = client.get("/users/1")
assert response.status_code == 200
data = response.json()
assert data["id"] == 1
assert data["name"] == "Test User"
assert data["email"] == "test@example.com"
# 驗證模擬對象的調用
mock_db.query.assert_called_once()
mock_db.query().filter.assert_called_once()
最佳實踐
- 使用連接池:對於生產環境,總是使用連接池來管理資料庫連接。
- 適當的錯誤處理:確保資料庫錯誤被正確捕獲和處理,避免暴露敏感信息。
- 使用環境變數配置:不要在代碼中硬編碼資料庫連接信息。
- 定期關閉閒置連接:避免資源泄漏。
- 使用事務管理上下文:確保資料庫操作的原子性。