非同步測試
非同步測試的基本概念
FastAPI 是基於 ASGI 的框架,它的核心是非同步(async/await)編程模型。測試非同步代碼需要特殊的技術和工具,以確保異步操作能夠正確執行和驗證。
概念 | 說明 |
---|---|
非同步函數 | 使用 async def 定義的函數,可以使用 await 暫停執行 |
事件循環 | 管理和調度異步任務的執行環境 |
協程 | 可以暫停和恢復執行的函數,是非同步編程的基本單位 |
任務 | 在事件循環中調度的協程,可以並行執行 |
等待點 | 使用 await 的位置,函數在此暫停,讓出控制權 |
非同步測試的挑戰與解決方案
挑戰 | 解決方案 |
---|---|
事件循環管理 | 使用 pytest-asyncio 等工具管理測試的事件循環 |
非同步夾具 | 創建和使用非同步夾具(async fixtures) |
測試超時 | 設置測試超時,避免無限等待 |
並發控制 | 使用信號量和鎖控制並發級別 |
模擬非同步依賴 | 創建非同步模擬對象和替身 |
設置非同步測試環境
安裝必要的工具
配置 pytest-asyncio
或者在測試模塊中標記:
# tests/api/test_async_endpoints.py
import pytest
pytestmark = pytest.mark.asyncio # 標記整個模塊使用 asyncio
測試非同步 API 端點
使用 TestClient 測試非同步端點
雖然 FastAPI 的 TestClient 提供了同步接口,但它內部處理了非同步調用,使其適用於測試非同步端點:
# tests/api/test_async_endpoints.py
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_async_endpoint():
response = client.get("/async-endpoint")
assert response.status_code == 200
assert response.json() == {"message": "This is an async endpoint"}
使用 HTTPX 直接測試非同步端點
對於更複雜的非同步測試場景,可以使用 HTTPX 的非同步客戶端:
# tests/api/test_async_endpoints_with_httpx.py
import pytest
import httpx
from app.main import app
@pytest.mark.asyncio
async def test_async_endpoint_with_httpx():
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/async-endpoint")
assert response.status_code == 200
assert response.json() == {"message": "This is an async endpoint"}
非同步夾具(Async Fixtures)
pytest-asyncio 允許創建非同步夾具,這對於準備需要非同步操作的測試環境非常有用。
基本的非同步夾具
# tests/conftest.py
import pytest
import asyncio
from app.database import async_engine, AsyncSessionLocal
from app.models import Base
@pytest.fixture(scope="session")
def event_loop():
"""創建一個實例會話範圍的事件循環"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="function")
async def async_db():
"""為測試提供非同步數據庫會話"""
# 創建所有表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 創建會話
async with AsyncSessionLocal() as session:
yield session
# 清理 - 刪除所有表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
async def async_client():
"""提供非同步 HTTP 客戶端"""
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
yield client
使用非同步夾具
# tests/api/test_async_users.py
import pytest
@pytest.mark.asyncio
async def test_create_user(async_client, async_db):
# 準備測試數據
user_data = {
"username": "asyncuser",
"email": "async@example.com",
"password": "password123"
}
# 發送請求
response = await async_client.post("/users/", json=user_data)
# 驗證結果
assert response.status_code == 201
data = response.json()
assert data["username"] == user_data["username"]
assert data["email"] == user_data["email"]
assert "id" in data
測試非同步存儲庫
非同步存儲庫使用 async/await
語法進行數據庫操作,需要特殊的測試方式。
非同步存儲庫示例
# app/repositories/async_user_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.models import User
class AsyncUserRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def create(self, user_data):
"""創建新用戶"""
user = User(**user_data)
self.db.add(user)
await self.db.commit()
await self.db.refresh(user)
return user
async def get_by_id(self, user_id):
"""根據 ID 獲取用戶"""
result = await self.db.execute(
select(User).where(User.id == user_id)
)
return result.scalars().first()
async def get_all(self, skip=0, limit=100):
"""獲取所有用戶"""
result = await self.db.execute(
select(User).offset(skip).limit(limit)
)
return result.scalars().all()
測試非同步存儲庫
# tests/repositories/test_async_user_repository.py
import pytest
from app.repositories.async_user_repository import AsyncUserRepository
@pytest.fixture
async def async_user_repo(async_db):
return AsyncUserRepository(async_db)
@pytest.mark.asyncio
async def test_create_user(async_user_repo):
# 創建用戶
user_data = {
"username": "asyncrepouser",
"email": "asyncrepo@example.com",
"hashed_password": "hashedpassword"
}
user = await async_user_repo.create(user_data)
# 驗證用戶被創建
assert user.id is not None
assert user.username == user_data["username"]
assert user.email == user_data["email"]
@pytest.mark.asyncio
async def test_get_user_by_id(async_user_repo):
# 創建用戶
user_data = {
"username": "getasyncuser",
"email": "getasync@example.com",
"hashed_password": "hashedpassword"
}
created_user = await async_user_repo.create(user_data)
# 獲取用戶
user = await async_user_repo.get_by_id(created_user.id)
# 驗證
assert user is not None
assert user.id == created_user.id
assert user.username == user_data["username"]
測試非同步服務
非同步服務通常依賴於其他非同步組件,如存儲庫或外部 API。
非同步服務示例
# app/services/async_user_service.py
from app.repositories.async_user_repository import AsyncUserRepository
from app.utils.security import get_password_hash
class AsyncUserService:
def __init__(self, user_repository: AsyncUserRepository):
self.user_repository = user_repository
async def create_user(self, user_data):
"""創建新用戶"""
# 檢查電子郵件是否已存在
existing_user = await self.user_repository.get_by_email(user_data.email)
if existing_user:
raise ValueError("電子郵件已被註冊")
# 哈希密碼
hashed_password = get_password_hash(user_data.password)
# 創建用戶
user_dict = user_data.dict()
user_dict.pop("password")
user_dict["hashed_password"] = hashed_password
return await self.user_repository.create(user_dict)
測試非同步服務
# tests/services/test_async_user_service.py
import pytest
from unittest.mock import AsyncMock, patch
from app.services.async_user_service import AsyncUserService
from app.schemas.user import UserCreate
@pytest.fixture
def mock_async_user_repo():
return AsyncMock()
@pytest.fixture
def async_user_service(mock_async_user_repo):
return AsyncUserService(mock_async_user_repo)
@pytest.mark.asyncio
async def test_create_user(async_user_service, mock_async_user_repo):
# 設置模擬
mock_async_user_repo.get_by_email.return_value = None
mock_async_user_repo.create.return_value = AsyncMock(
id=1,
username="testuser",
email="test@example.com",
hashed_password="hashedpw"
)
# 創建用戶數據
user_data = UserCreate(
username="testuser",
email="test@example.com",
password="password123"
)
# 調用服務
with patch("app.services.async_user_service.get_password_hash", return_value="hashedpw"):
user = await async_user_service.create_user(user_data)
# 驗證結果
assert user.id == 1
assert user.username == "testuser"
assert user.email == "test@example.com"
# 驗證模擬調用
mock_async_user_repo.get_by_email.assert_called_once_with("test@example.com")
mock_async_user_repo.create.assert_called_once()
測試非同步背景任務
FastAPI 允許創建背景任務,這些任務在請求處理完成後異步執行。測試這些任務需要特殊的技術。
背景任務示例
# app/tasks/email_tasks.py
import aiosmtplib
from email.message import EmailMessage
from app.config import settings
async def send_email_async(to_email: str, subject: str, content: str):
"""非同步發送電子郵件"""
message = EmailMessage()
message.set_content(content)
message["Subject"] = subject
message["From"] = settings.SMTP_USERNAME
message["To"] = to_email
try:
await aiosmtplib.send(
message,
hostname=settings.SMTP_SERVER,
port=settings.SMTP_PORT,
username=settings.SMTP_USERNAME,
password=settings.SMTP_PASSWORD,
use_tls=True
)
return True
except Exception as e:
print(f"發送郵件失敗: {str(e)}")
return False
測試背景任務
# tests/tasks/test_email_tasks.py
import pytest
from unittest.mock import patch, AsyncMock
from app.tasks.email_tasks import send_email_async
@pytest.mark.asyncio
async def test_send_email_async_success():
# 模擬 aiosmtplib.send
with patch("app.tasks.email_tasks.aiosmtplib.send", new_callable=AsyncMock) as mock_send:
mock_send.return_value = True
# 調用函數
result = await send_email_async(
"recipient@example.com",
"測試郵件",
"這是一封測試郵件"
)
# 驗證結果
assert result is True
mock_send.assert_called_once()
測試非同步 WebSocket 端點
FastAPI 支持 WebSocket 連接,這是一種非同步的長連接通信方式。
WebSocket 端點示例
# app/routers/websocket.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from typing import List
router = APIRouter()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@router.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
測試 WebSocket 端點
# tests/api/test_websocket.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_websocket_connection():
with client.websocket_connect("/ws/1") as websocket:
# 發送消息
websocket.send_text("Hello WebSocket")
# 接收個人消息
data = websocket.receive_text()
assert data == "You wrote: Hello WebSocket"
# 接收廣播消息
data = websocket.receive_text()
assert data == "Client #1 says: Hello WebSocket"
測試非同步依賴注入
FastAPI 的依賴注入系統支持非同步依賴,測試這些依賴需要特殊的技術。
非同步依賴示例
# app/dependencies/async_auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from app.services.async_user_service import AsyncUserService
from app.services.async_token_service import AsyncTokenService
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
token_service: AsyncTokenService = Depends(),
user_service: AsyncUserService = Depends()
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="無效的認證憑證",
headers={"WWW-Authenticate": "Bearer"},
)
# 驗證令牌
user_id = token_service.verify_token(token)
if user_id is None:
raise credentials_exception
# 獲取用戶
user = await user_service.get_user_by_id(user_id)
if user is None:
raise credentials_exception
return user
測試非同步依賴
# tests/dependencies/test_async_auth.py
import pytest
from unittest.mock import AsyncMock, patch
from fastapi import HTTPException
from app.dependencies.async_auth import get_current_user
@pytest.mark.asyncio
async def test_get_current_user_valid_token():
# 模擬依賴
token_service = AsyncMock()
user_service = AsyncMock()
# 設置模擬返回值
token_service.verify_token.return_value = 1
user_service.get_user_by_id.return_value = AsyncMock(
id=1,
username="testuser",
is_active=True
)
# 調用依賴
user = await get_current_user("valid_token", token_service, user_service)
# 驗證結果
assert user.id == 1
assert user.username == "testuser"
token_service.verify_token.assert_called_once_with("valid_token")
user_service.get_user_by_id.assert_called_once_with(1)
測試非同步中間件
中間件在 FastAPI 中是非同步的,它們處理請求和響應的流程。
非同步中間件示例
# app/middleware/async_logging.py
import time
import uuid
from fastapi import Request
import logging
logger = logging.getLogger(__name__)
async def async_logging_middleware(request: Request, call_next):
# 生成請求 ID
request_id = str(uuid.uuid4())
# 記錄請求開始
start_time = time.time()
logger.info(f"Request started: {request_id} - {request.method} {request.url.path}")
# 處理請求
try:
response = await call_next(request)
# 記錄請求完成
process_time = time.time() - start_time
logger.info(
f"Request completed: {request_id} - {request.method} {request.url.path} - "
f"Status: {response.status_code} - Duration: {process_time:.4f}s"
)
# 添加自定義響應頭
response.headers["X-Request-ID"] = request_id
return response
except Exception as e:
# 記錄異常
logger.error(f"Request failed: {request_id} - Error: {str(e)}")
raise
測試非同步中間件
# tests/middleware/test_async_logging.py
import pytest
from unittest.mock import patch, AsyncMock
from fastapi import FastAPI
from fastapi.testclient import TestClient
from app.middleware.async_logging import async_logging_middleware
@pytest.fixture
def app_with_middleware():
app = FastAPI()
@app.middleware("http")
async def logging_middleware(request, call_next):
return await async_logging_middleware(request, call_next)
@app.get("/test")
async def test_route():
return {"message": "Test route"}
return app
@pytest.fixture
def client(app_with_middleware):
return TestClient(app_with_middleware)
def test_logging_middleware_success(client):
with patch("app.middleware.async_logging.logger") as mock_logger:
# 發送請求
response = client.get("/test")
# 驗證響應
assert response.status_code == 200
assert response.json() == {"message": "Test route"}
assert "X-Request-ID" in response.headers
# 驗證日誌
mock_logger.info.assert_called()
assert "Request started" in mock_logger.info.call_args_list[0][0][0]
assert "Request completed" in mock_logger.info.call_args_list[1][0][0]
測試非同步並發
測試非同步代碼的並發行為是確保系統在高負載下正常工作的重要部分。
簡單的並發測試
# tests/concurrency/test_async_concurrency.py
import pytest
import asyncio
from app.services.async_counter_service import AsyncCounterService
@pytest.fixture
def counter_service():
return AsyncCounterService()
@pytest.mark.asyncio
async def test_concurrent_increment(counter_service):
# 創建多個並發任務
tasks = [counter_service.increment() for _ in range(10)]
# 等待所有任務完成
await asyncio.gather(*tasks)
# 驗證計數器值
assert await counter_service.get_count() == 10
非同步測試的模式和技巧
模擬非同步函數
使用 AsyncMock
來模擬非同步函數和方法:
# tests/utils/test_async_utils.py
import pytest
from unittest.mock import AsyncMock, patch
from app.utils.async_utils import fetch_external_data
@pytest.mark.asyncio
async def test_fetch_external_data():
# 創建模擬的非同步響應
mock_response = AsyncMock()
mock_response.json.return_value = {"data": "test_data"}
mock_response.status_code = 200
# 模擬 httpx.AsyncClient.get 方法
with patch("httpx.AsyncClient.get", new_callable=AsyncMock) as mock_get:
mock_get.return_value = mock_response
# 調用被測試的函數
result = await fetch_external_data("https://api.example.com/data")
# 驗證結果
assert result == {"data": "test_data"}
mock_get.assert_called_once_with("https://api.example.com/data")
測試非同步超時
測試函數在超時情況下的行為:
# tests/utils/test_async_timeout.py
import pytest
import asyncio
from app.utils.async_timeout import fetch_with_timeout
@pytest.mark.asyncio
async def test_fetch_with_timeout_success():
# 模擬快速完成的非同步函數
async def mock_fetch():
await asyncio.sleep(0.1)
return "data"
# 調用帶有足夠超時的函數
result = await fetch_with_timeout(mock_fetch(), timeout=1.0)
assert result == "data"
@pytest.mark.asyncio
async def test_fetch_with_timeout_timeout():
# 模擬慢速完成的非同步函數
async def mock_slow_fetch():
await asyncio.sleep(2.0)
return "data"
# 調用帶有較短超時的函數
with pytest.raises(asyncio.TimeoutError):
await fetch_with_timeout(mock_slow_fetch(), timeout=0.5)
測試非同步上下文管理器
測試實現了 __aenter__
和 __aexit__
方法的非同步上下文管理器:
# tests/utils/test_async_context_manager.py
import pytest
from app.utils.async_context import AsyncResourceManager
@pytest.mark.asyncio
async def test_async_context_manager():
manager = AsyncResourceManager()
# 驗證初始狀態
assert manager.is_acquired is False
# 使用上下文管理器
async with manager as resource:
# 驗證資源已獲取
assert manager.is_acquired is True
assert resource is not None
# 驗證資源已釋放
assert manager.is_acquired is False
非同步測試的最佳實踐
避免混合同步和非同步代碼
在測試中,應該避免在同一個函數中混合同步和非同步代碼,這可能導致事件循環問題:
# 不好的做法
@pytest.mark.asyncio
async def test_mixed_sync_async():
sync_result = do_something_sync() # 同步調用
async_result = await do_something_async() # 非同步調用
assert sync_result == async_result
# 好的做法
@pytest.mark.asyncio
async def test_async_wrapper():
# 將同步調用包裝在非同步函數中
async def sync_wrapper():
return do_something_sync()
sync_result = await sync_wrapper()
async_result = await do_something_async()
assert sync_result == async_result
使用非同步夾具鏈
將多個非同步夾具組合在一起,創建複雜的測試環境:
# tests/conftest.py
import pytest
@pytest.fixture
async def async_user(async_db):
# 創建測試用戶
user = await create_test_user(async_db)
yield user
# 清理
await delete_test_user(async_db, user.id)
@pytest.fixture
async def async_token(async_user, token_service):
# 為測試用戶創建令牌
token = await token_service.create_token(async_user.id)
return token
@pytest.fixture
async def authenticated_client(async_client, async_token):
# 創建已認證的客戶端
async_client.headers.update({"Authorization": f"Bearer {async_token}"})
return async_client
設置測試超時
為非同步測試設置超時,避免測試無限等待:
# tests/slow/test_slow_async.py
import pytest
@pytest.mark.asyncio
@pytest.mark.timeout(5) # 5秒超時
async def test_slow_operation():
result = await potentially_slow_operation()
assert result is not None
非同步測試的常見問題和解決方案
問題:事件循環已關閉
解決方案:確保在測試會話中正確管理事件循環:
# tests/conftest.py
import pytest
import asyncio
@pytest.fixture(scope="session")
def event_loop():
"""創建一個會話範圍的事件循環"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
問題:非同步測試無法並行運行
解決方案:確保每個測試使用獨立的資源,如數據庫連接:
# tests/conftest.py
import pytest
import asyncio
from app.database import create_async_engine
@pytest.fixture
async def async_db():
"""為每個測試創建獨立的數據庫連接"""
# 創建唯一的數據庫 URL
db_url = f"sqlite+aiosqlite:///:memory:"
engine = create_async_engine(db_url)
# 設置數據庫
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 創建會話
async_session = AsyncSession(engine)
yield async_session
# 清理
await async_session.close()
await engine.dispose()
問題:模擬非同步方法
解決方案:使用 AsyncMock
並設置適當的返回值或副作用:
# tests/services/test_async_service_with_mocks.py
import pytest
from unittest.mock import AsyncMock, patch
from app.services.async_notification_service import AsyncNotificationService
@pytest.mark.asyncio
async def test_send_notification():
# 創建模擬的電子郵件和推送服務
mock_email_service = AsyncMock()
mock_push_service = AsyncMock()
# 設置返回值
mock_email_service.send.return_value = True
mock_push_service.send.return_value = True
# 創建通知服務
notification_service = AsyncNotificationService(
email_service=mock_email_service,
push_service=mock_push_service
)
# 發送通知
result = await notification_service.send_notification(
user_id=1,
message="Test notification"
)
# 驗證結果
assert result is True
mock_email_service.send.assert_called_once()
mock_push_service.send.assert_called_once()
總結
非同步測試是測試 FastAPI 應用程序的關鍵部分,它確保非同步代碼能夠正確執行並按預期工作。通過使用適當的工具和技術,可以有效地測試非同步 API 端點、服務、存儲庫和其他組件。
方面 | 關鍵點 |
---|---|
測試環境 | 使用 pytest-asyncio 管理事件循環 創建非同步夾具準備測試環境 |
測試客戶端 | 使用 TestClient 或 httpx.AsyncClient 測試 API 端點 處理 WebSocket 和長連接測試 |
模擬技術 | 使用 AsyncMock 模擬非同步依賴 模擬外部服務和 API |
並發測試 | 測試代碼在並發環境下的行為 使用 asyncio.gather 執行並發任務 |
測試超時 | 設置適當的超時避免測試無限等待 測試超時處理邏輯 |
最佳實踐
- 隔離測試環境:每個測試應該有自己的隔離環境,避免測試之間的相互干擾。
- 正確管理事件循環:確保事件循環在測試會話中正確創建和關閉。
- 使用非同步夾具:創建和使用非同步夾具來準備測試數據和環境。
- 模擬非同步依賴:使用 AsyncMock 模擬非同步依賴,簡化測試並提高測試速度。
- 測試並發行為:測試代碼在並發環境下的行為,確保沒有競態條件和死鎖。
- 設置適當的超時:為非同步測試設置超時,避免測試無限等待。
- 測試錯誤處理:測試非同步代碼的錯誤處理邏輯,確保系統在出現錯誤時能夠正確恢復。
- 避免混合同步和非同步代碼:在測試中避免混合同步和非同步代碼,這可能導致事件循環問題。
非同步測試的工具和庫
工具/庫 | 用途 |
---|---|
pytest-asyncio | 為 pytest 提供非同步測試支持 |
httpx | 提供同步和非同步 HTTP 客戶端 |
AsyncMock | 用於模擬非同步函數和方法 |
aioresponses | 模擬非同步 HTTP 請求 |
pytest-timeout | 為測試設置超時 |
非同步測試的未來趨勢
隨著非同步編程在 Python 生態系統中的普及,非同步測試工具和技術也在不斷發展。未來的趨勢包括:
- 更好的測試工具:更完善的非同步測試工具,提供更好的調試和報告功能。
- 更高級的模擬技術:更高級的非同步模擬技術,更容易模擬複雜的非同步行為。
- 更好的並發測試支持:更好的工具來測試並發行為,識別潛在的競態條件和死鎖。
- 更好的性能測試:更好的工具來測試非同步代碼的性能,識別性能瓶頸。
通過掌握非同步測試的技術和最佳實踐,你可以確保 FastAPI 應用程序的非同步代碼能夠正確執行,提供高性能、可靠的服務。