Files
hankin 815cbf9d8c v0.2.0: CRM/ERP 系统升级 - 清理 .gitignore 并移除误提交的 venv/env/db 文件
- 更新 .gitignore:全面覆盖环境变量、数据库、日志、缓存、上传文件
- 移除误跟踪的 server/venv/、crm_data.db、.env 文件
- 新增 server/.env.example 模板
- 新增合同管理、利润核算、AI教练等功能模块
- 新增 Playwright e2e 测试套件
- 前后端多项功能升级和 bug 修复
2026-05-11 07:24:19 +00:00

288 lines
13 KiB
Python

"""
财务票据 Service 层
REST API 路由 和 MCP 工具 共用此层函数
"""
from __future__ import annotations
import uuid
from datetime import date, datetime
from sqlalchemy import func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import BizException, ForbiddenException, NotFoundException
from app.models.finance import FinExpenseDetail, FinExpenseRecord, FinInvoicePool
from app.models.sys import SysUser
from app.schemas.auth import CurrentUserPayload
from app.schemas.finance import (
ExpenseBriefResponse, ExpenseCreate, ExpenseDetailResponse,
ExpenseListResponse, ExpenseResponse, ExpenseStatusUpdate,
InvoiceCreate, InvoiceListResponse, InvoiceResponse,
)
def _inv_to_resp(inv: FinInvoicePool) -> InvoiceResponse:
return InvoiceResponse(
id=inv.id, uploader_id=inv.uploader_id,
uploader_name=inv.uploader.real_name if inv.uploader else None,
file_url=inv.file_url, merchant_name=inv.merchant_name,
amount=float(inv.amount or 0), invoice_date=inv.invoice_date,
type=inv.type, ai_extracted_data=inv.ai_extracted_data or {},
is_used=inv.is_used, created_at=inv.created_at,
)
def _detail_to_resp(d: FinExpenseDetail) -> ExpenseDetailResponse:
return ExpenseDetailResponse(
id=d.id, invoice_id=d.invoice_id,
invoice_merchant=d.invoice.merchant_name if d.invoice else None,
invoice_amount=float(d.invoice.amount) if d.invoice else None,
expense_desc=d.expense_desc, original_type=d.original_type,
offset_type=d.offset_type, amount=float(d.amount or 0),
)
def _exp_to_resp(exp: FinExpenseRecord, with_details: bool = True) -> ExpenseResponse:
return ExpenseResponse(
id=exp.id, system_no=exp.system_no, applicant_id=exp.applicant_id,
applicant_name=exp.applicant.real_name if exp.applicant else None,
total_amount=float(exp.total_amount or 0), status=exp.status,
remark=exp.remark, approved_by=exp.approved_by,
approver_name=exp.approver.real_name if exp.approver else None,
approved_at=exp.approved_at,
details=[_detail_to_resp(d) for d in exp.details] if with_details else [],
created_at=exp.created_at, updated_at=exp.updated_at,
)
def _exp_to_brief(exp: FinExpenseRecord) -> ExpenseBriefResponse:
return ExpenseBriefResponse(
id=exp.id, system_no=exp.system_no, applicant_id=exp.applicant_id,
applicant_name=exp.applicant.real_name if exp.applicant else None,
total_amount=float(exp.total_amount or 0), status=exp.status,
created_at=exp.created_at,
)
async def _generate_expense_no(db: AsyncSession) -> str:
today = date.today().strftime("%Y%m%d")
prefix = f"EXP-{today}-"
count = (await db.execute(
select(func.count()).select_from(FinExpenseRecord)
.where(FinExpenseRecord.system_no.like(f"{prefix}%"))
)).scalar() or 0
return f"{prefix}{count + 1:03d}"
async def _release_invoices(db: AsyncSession, expense_id: uuid.UUID, now: datetime) -> None:
detail_stmt = select(FinExpenseDetail.invoice_id).where(
FinExpenseDetail.expense_id == expense_id, FinExpenseDetail.invoice_id.is_not(None),
)
inv_ids = (await db.execute(detail_stmt)).scalars().all()
if inv_ids:
await db.execute(
update(FinInvoicePool).where(FinInvoicePool.id.in_(inv_ids))
.values(is_used=False, updated_at=now)
)
# ── Service Functions ────────────────────────────────────
async def create_invoice(db: AsyncSession, user: CurrentUserPayload, body: InvoiceCreate, company_id: uuid.UUID) -> InvoiceResponse:
invoice = FinInvoicePool(
uploader_id=user.user_id, file_url=body.file_url,
merchant_name=body.merchant_name, amount=body.amount,
invoice_date=body.invoice_date, type=body.type,
ai_extracted_data=body.ai_extracted_data, is_used=False,
company_id=company_id,
)
db.add(invoice)
await db.commit()
await db.refresh(invoice)
return _inv_to_resp(invoice)
async def list_invoices(
db: AsyncSession, user: CurrentUserPayload,
page: int = 1, size: int = 20,
inv_type: str | None = None, is_used: bool | None = None,
company_id: uuid.UUID | None = None,
) -> InvoiceListResponse:
where = [FinInvoicePool.is_deleted.is_(False)]
if company_id:
where.append(FinInvoicePool.company_id == company_id)
if user.data_scope == "self":
where.append(FinInvoicePool.uploader_id == user.user_id)
elif user.data_scope == "dept_and_sub":
if user.dept_id is not None:
from app.models.sys import SysUser
dept_users = select(SysUser.id).where(SysUser.dept_id == user.dept_id, SysUser.is_deleted.is_(False))
where.append(FinInvoicePool.uploader_id.in_(dept_users))
if inv_type:
where.append(FinInvoicePool.type == inv_type)
if is_used is not None:
where.append(FinInvoicePool.is_used == is_used)
total = (await db.execute(select(func.count()).select_from(FinInvoicePool).where(*where))).scalar() or 0
stmt = select(FinInvoicePool).where(*where).order_by(FinInvoicePool.created_at.desc()).offset((page - 1) * size).limit(size)
invoices = (await db.execute(stmt)).scalars().all()
return InvoiceListResponse(total=total, items=[_inv_to_resp(i) for i in invoices], page=page, size=size)
async def void_invoice(db: AsyncSession, user: CurrentUserPayload, invoice_id: uuid.UUID) -> None:
inv = (await db.execute(
select(FinInvoicePool).where(FinInvoicePool.id == invoice_id, FinInvoicePool.is_deleted.is_(False))
)).scalar_one_or_none()
if inv is None:
raise NotFoundException("票据不存在或已作废")
if user.data_scope != "all" and inv.uploader_id != user.user_id:
raise ForbiddenException("无权作废他人上传的票据")
if inv.is_used:
raise BizException(message="该票据已关联报销单,无法作废。请先撤回对应报销单。")
await db.execute(update(FinInvoicePool).where(FinInvoicePool.id == invoice_id).values(is_deleted=True, updated_at=datetime.utcnow()))
await db.commit()
async def create_expense(db: AsyncSession, user: CurrentUserPayload, body: ExpenseCreate, company_id: uuid.UUID) -> ExpenseResponse:
invoice_ids = [item.invoice_id for item in body.items]
try:
async with db.begin_nested():
lock_stmt = select(FinInvoicePool).where(
FinInvoicePool.id.in_(invoice_ids), FinInvoicePool.is_deleted.is_(False),
).with_for_update()
locked_invs = (await db.execute(lock_stmt)).scalars().all()
locked_map = {inv.id: inv for inv in locked_invs}
for item in body.items:
inv = locked_map.get(item.invoice_id)
if inv is None:
raise BizException(message=f"发票 {item.invoice_id} 不存在或已被作废")
if inv.is_used:
raise BizException(message=f"发票 {inv.merchant_name or inv.id}{inv.amount}) 已被其他报销单使用,禁止重复报销")
system_no = await _generate_expense_no(db)
expense = FinExpenseRecord(
system_no=system_no, applicant_id=user.user_id,
company_id=company_id,
total_amount=body.total_amount, status="submitted", remark=body.remark,
)
db.add(expense)
await db.flush()
for item in body.items:
db.add(FinExpenseDetail(
expense_id=expense.id, invoice_id=item.invoice_id,
expense_desc=item.expense_desc, original_type=item.original_type,
offset_type=item.offset_type, amount=item.amount,
))
await db.execute(
update(FinInvoicePool).where(FinInvoicePool.id.in_(invoice_ids))
.values(is_used=True, updated_at=datetime.utcnow())
)
await db.commit()
except BizException:
await db.rollback()
raise
except Exception as e:
await db.rollback()
raise BizException(code=500, message=f"报销单创建事务失败: {e!s}") from e
refreshed = (await db.execute(select(FinExpenseRecord).where(FinExpenseRecord.id == expense.id))).scalar_one()
return _exp_to_resp(refreshed)
async def list_expenses(
db: AsyncSession, user: CurrentUserPayload,
page: int = 1, size: int = 20,
status: str | None = None, applicant_id: uuid.UUID | None = None,
company_id: uuid.UUID | None = None,
) -> ExpenseListResponse:
where = [FinExpenseRecord.is_deleted.is_(False)]
if company_id:
where.append(FinExpenseRecord.company_id == company_id)
if user.data_scope == "self":
where.append(FinExpenseRecord.applicant_id == user.user_id)
elif user.data_scope == "dept_and_sub":
if user.dept_id is not None:
from app.models.sys import SysUser
dept_users = select(SysUser.id).where(SysUser.dept_id == user.dept_id, SysUser.is_deleted.is_(False))
where.append(FinExpenseRecord.applicant_id.in_(dept_users))
if status:
where.append(FinExpenseRecord.status == status)
if applicant_id and user.data_scope == "all":
where.append(FinExpenseRecord.applicant_id == applicant_id)
total = (await db.execute(select(func.count()).select_from(FinExpenseRecord).where(*where))).scalar() or 0
stmt = select(FinExpenseRecord).where(*where).order_by(FinExpenseRecord.created_at.desc()).offset((page - 1) * size).limit(size)
expenses = (await db.execute(stmt)).scalars().all()
return ExpenseListResponse(total=total, items=[_exp_to_brief(e) for e in expenses], page=page, size=size)
async def get_expense(db: AsyncSession, user: CurrentUserPayload, expense_id: uuid.UUID) -> ExpenseResponse:
exp = (await db.execute(
select(FinExpenseRecord).where(FinExpenseRecord.id == expense_id, FinExpenseRecord.is_deleted.is_(False))
)).scalar_one_or_none()
if exp is None:
raise NotFoundException("报销单不存在")
if user.data_scope == "self" and exp.applicant_id != user.user_id:
raise ForbiddenException("无权查看他人的报销单")
return _exp_to_resp(exp)
async def update_expense_status(
db: AsyncSession, user: CurrentUserPayload,
expense_id: uuid.UUID, body: ExpenseStatusUpdate,
) -> str:
"""返回操作结果消息"""
exp = (await db.execute(
select(FinExpenseRecord).where(FinExpenseRecord.id == expense_id, FinExpenseRecord.is_deleted.is_(False))
)).scalar_one_or_none()
if exp is None:
raise NotFoundException("报销单不存在")
now = datetime.utcnow()
if body.action == "withdraw":
if exp.applicant_id != user.user_id:
raise ForbiddenException("只能撤回自己的报销单")
if exp.status != "submitted":
raise BizException(message=f"当前状态 [{exp.status}] 不允许撤回,仅 submitted 状态可撤回")
try:
async with db.begin_nested():
await db.execute(update(FinExpenseRecord).where(FinExpenseRecord.id == expense_id).values(status="voided", updated_at=now))
await _release_invoices(db, expense_id, now)
await db.commit()
except BizException:
await db.rollback()
raise
except Exception as e:
await db.rollback()
raise BizException(code=500, message=f"撤回事务失败: {e!s}") from e
return "报销单已撤回,关联发票已释放"
elif body.action == "approve":
if user.data_scope != "all":
raise ForbiddenException("仅管理员/财务可审批")
if exp.status != "submitted":
raise BizException(message=f"当前状态 [{exp.status}] 不允许审批")
await db.execute(update(FinExpenseRecord).where(FinExpenseRecord.id == expense_id).values(
status="approved", approved_by=user.user_id, approved_at=now, updated_at=now,
))
await db.commit()
return "报销单已审批通过"
elif body.action == "reject":
if user.data_scope != "all":
raise ForbiddenException("仅管理员/财务可驳回")
if exp.status != "submitted":
raise BizException(message=f"当前状态 [{exp.status}] 不允许驳回")
try:
async with db.begin_nested():
await db.execute(update(FinExpenseRecord).where(FinExpenseRecord.id == expense_id).values(
status="rejected", approved_by=user.user_id, approved_at=now, updated_at=now,
))
await _release_invoices(db, expense_id, now)
await db.commit()
except BizException:
await db.rollback()
raise
except Exception as e:
await db.rollback()
raise BizException(code=500, message=f"驳回事务失败: {e!s}") from e
return "报销单已驳回,关联发票已释放"
raise BizException(message=f"未知操作: {body.action}")