Files
crm_project/server/app/api/contracts.py
T
hankin 815cbf9d8c v0.2.0: CRM/ERP 系统升级 - 清理 .gitignore 并移除误提交的 venv/env/db 文件
- 更新 .gitignore:全面覆盖环境变量、数据库、日志、缓存、上传文件
- 移除误跟踪的 server/venv/、crm_data.db、.env 文件
- 新增 server/.env.example 模板
- 新增合同管理、利润核算、AI教练等功能模块
- 新增 Playwright e2e 测试套件
- 前后端多项功能升级和 bug 修复
2026-05-11 07:24:19 +00:00

157 lines
5.7 KiB
Python

"""
合同管理路由 —— /api/contracts
"""
from __future__ import annotations
import uuid
from fastapi import APIRouter, Body, Depends, Query, UploadFile, File
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_user, get_current_company_id
from app.db.database import get_db
from app.schemas.auth import CurrentUserPayload
from app.schemas.contract import ContractCreate, ContractUpdate
from app.schemas.response import ok
from app.services import contract_service as svc
router = APIRouter(prefix="/contracts", tags=["合同管理"])
@router.post("", summary="新增合同")
async def create_contract(
body: ContractCreate,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
result = await svc.create_contract(db, current_user, company_id, body)
return ok(data=result.model_dump(mode="json"), message="合同创建成功")
@router.get("", summary="合同列表(分页)")
async def list_contracts(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
keyword: str | None = Query(None, description="合同编号搜索"),
status: str | None = Query(None, description="状态筛选"),
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
result = await svc.list_contracts(db, company_id, page, size, keyword, status)
return ok(data=result.model_dump(mode="json"))
@router.get("/{contract_id}", summary="合同详情(含执行进度)")
async def get_contract(
contract_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
result = await svc.get_contract(db, contract_id, company_id)
return ok(data=result.model_dump(mode="json"))
@router.put("/{contract_id}", summary="编辑合同")
async def update_contract(
contract_id: uuid.UUID,
body: ContractUpdate,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
result = await svc.update_contract(db, contract_id, company_id, body)
return ok(data=result.model_dump(mode="json"), message="合同已更新")
@router.delete("/{contract_id}", summary="删除合同")
async def delete_contract(
contract_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
await svc.delete_contract(db, contract_id, company_id)
return ok(message="合同已删除")
@router.post("/{contract_id}/generate-order", summary="一键从合同生成订单")
async def generate_order_from_contract(
contract_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
result = await svc.generate_order_from_contract(db, current_user, contract_id, company_id)
return ok(data=result, message="订单生成成功")
@router.get("/{contract_id}/generate", summary="生成合同 Word 文档下载")
async def generate_contract_document(
contract_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
):
from fastapi.responses import Response
docx_bytes = await svc.generate_contract_docx(db, contract_id, company_id)
return Response(
content=docx_bytes,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": f"attachment; filename=contract_{contract_id}.docx"},
)
@router.post("/{contract_id}/upload-signed", summary="上传双签盖章版")
async def upload_signed_copy(
contract_id: uuid.UUID,
file: UploadFile = File(...),
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
import os
from app.models.contract import ErpContract, ErpContractAttachment
from sqlalchemy import update as sa_update
# 验证合同存在
from sqlalchemy import select as sa_select
contract = (await db.execute(
sa_select(ErpContract).where(
ErpContract.id == contract_id,
ErpContract.company_id == company_id,
ErpContract.is_deleted.is_(False),
)
)).scalar_one_or_none()
if contract is None:
raise Exception("合同不存在")
# 保存文件
upload_dir = f"uploads/contracts/{contract_id}"
os.makedirs(upload_dir, exist_ok=True)
file_path = f"{upload_dir}/{file.filename}"
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
file_url = f"/{file_path}"
# 记录附件
attachment = ErpContractAttachment(
contract_id=contract_id,
file_name=file.filename or "signed_copy",
file_url=file_url,
file_type="signed_copy",
uploader_id=current_user.user_id,
)
db.add(attachment)
# 更新合同签署状态
await db.execute(
sa_update(ErpContract)
.where(ErpContract.id == contract_id)
.values(is_signed=True, signed_file_url=file_url)
)
await db.commit()
return ok(message="双签盖章版上传成功", data={"file_url": file_url})