Files
hankin 815cbf9d8c v0.2.0: CRM/ERP 系统升级 - 清理 .gitignore 并移除误提交的 venv/env/db 文件
- 更新 .gitignore:全面覆盖环境变量、数据库、日志、缓存、上传文件
- 移除误跟踪的 server/venv/、crm_data.db、.env 文件
- 新增 server/.env.example 模板
- 新增合同管理、利润核算、AI教练等功能模块
- 新增 Playwright e2e 测试套件
- 前后端多项功能升级和 bug 修复
2026-05-11 07:24:19 +00:00

84 lines
2.9 KiB
Python

"""
安全与权限测试 —— 跨模块
覆盖: IDOR 防护 / 多租户隔离 / Token 过期 / ACL
"""
import uuid
import pytest
from datetime import timedelta
from httpx import AsyncClient
from tests.conftest import (
make_auth_headers, ADMIN_USER_ID, SALES_USER_ID,
COMPANY_ID,
)
from app.core.security import create_access_token
class TestIDOR:
"""IDOR 防护: 用伪造的 company_id 访问其他租户数据"""
async def test_fake_company_id_forbidden(self, client: AsyncClient, seed_data):
"""伪造 X-Company-Id → 403"""
fake_company = uuid.uuid4()
headers = make_auth_headers(ADMIN_USER_ID, company_id=fake_company)
resp = await client.get("/api/orders", headers=headers)
assert resp.status_code == 403
async def test_invalid_company_id_format(self, client: AsyncClient, seed_data):
"""非法 UUID 格式 → 401/422"""
token = create_access_token(data={"sub": str(ADMIN_USER_ID)})
headers = {
"Authorization": f"Bearer {token}",
"X-Company-Id": "not-a-uuid",
}
resp = await client.get("/api/orders", headers=headers)
assert resp.status_code in (401, 422)
class TestTokenSecurity:
"""Token 安全"""
async def test_expired_token(self, client: AsyncClient, seed_data):
"""过期 Token → 401"""
expired_token = create_access_token(
data={"sub": str(ADMIN_USER_ID)},
expires_delta=timedelta(seconds=-10) # 已过期
)
resp = await client.get("/api/auth/me", headers={
"Authorization": f"Bearer {expired_token}"
})
assert resp.status_code == 401
async def test_malformed_bearer(self, client: AsyncClient, seed_data):
"""格式错误的 Authorization → 401"""
resp = await client.get("/api/auth/me", headers={
"Authorization": "Basic some-basic-auth"
})
assert resp.status_code == 401
async def test_empty_bearer(self, client: AsyncClient, seed_data):
"""空 Bearer → 401"""
resp = await client.get("/api/auth/me", headers={
"Authorization": "Bearer "
})
assert resp.status_code == 401
class TestACL:
"""角色访问控制"""
async def test_sales_cannot_access_settings(self, client: AsyncClient, sales_headers):
"""普通销售无法访问系统设置 → 403"""
endpoints = [
"/api/settings/departments/tree",
"/api/settings/roles",
"/api/settings/users",
]
for ep in endpoints:
resp = await client.get(ep, headers=sales_headers)
assert resp.status_code == 403, f"Expected 403 for {ep}, got {resp.status_code}"
async def test_sales_cannot_export_customers(self, client: AsyncClient, sales_headers):
"""普通销售无法导出客户 → 403"""
resp = await client.get("/api/crm/export", headers=sales_headers)
assert resp.status_code == 403