""" 安全与权限测试 —— 跨模块 覆盖: IDOR 防护 / 多租户隔离 / Token 过期 / ACL """ import uuid import pytest from datetime import timedelta from httpx import AsyncClient from tests.conftest import ( make_auth_headers, ADMIN_USER_ID, SALES_USER_ID, COMPANY_ID, ) from app.core.security import create_access_token class TestIDOR: """IDOR 防护: 用伪造的 company_id 访问其他租户数据""" async def test_fake_company_id_forbidden(self, client: AsyncClient, seed_data): """伪造 X-Company-Id → 403""" fake_company = uuid.uuid4() headers = make_auth_headers(ADMIN_USER_ID, company_id=fake_company) resp = await client.get("/api/orders", headers=headers) assert resp.status_code == 403 async def test_invalid_company_id_format(self, client: AsyncClient, seed_data): """非法 UUID 格式 → 401/422""" token = create_access_token(data={"sub": str(ADMIN_USER_ID)}) headers = { "Authorization": f"Bearer {token}", "X-Company-Id": "not-a-uuid", } resp = await client.get("/api/orders", headers=headers) assert resp.status_code in (401, 422) class TestTokenSecurity: """Token 安全""" async def test_expired_token(self, client: AsyncClient, seed_data): """过期 Token → 401""" expired_token = create_access_token( data={"sub": str(ADMIN_USER_ID)}, expires_delta=timedelta(seconds=-10) # 已过期 ) resp = await client.get("/api/auth/me", headers={ "Authorization": f"Bearer {expired_token}" }) assert resp.status_code == 401 async def test_malformed_bearer(self, client: AsyncClient, seed_data): """格式错误的 Authorization → 401""" resp = await client.get("/api/auth/me", headers={ "Authorization": "Basic some-basic-auth" }) assert resp.status_code == 401 async def test_empty_bearer(self, client: AsyncClient, seed_data): """空 Bearer → 401""" resp = await client.get("/api/auth/me", headers={ "Authorization": "Bearer " }) assert resp.status_code == 401 class TestACL: """角色访问控制""" async def test_sales_cannot_access_settings(self, client: AsyncClient, sales_headers): """普通销售无法访问系统设置 → 403""" endpoints = [ "/api/settings/departments/tree", "/api/settings/roles", "/api/settings/users", ] for ep in endpoints: resp = await client.get(ep, headers=sales_headers) assert resp.status_code == 403, f"Expected 403 for {ep}, got {resp.status_code}" async def test_sales_cannot_export_customers(self, client: AsyncClient, sales_headers): """普通销售无法导出客户 → 403""" resp = await client.get("/api/crm/export", headers=sales_headers) assert resp.status_code == 403