Files
hankin 815cbf9d8c v0.2.0: CRM/ERP 系统升级 - 清理 .gitignore 并移除误提交的 venv/env/db 文件
- 更新 .gitignore:全面覆盖环境变量、数据库、日志、缓存、上传文件
- 移除误跟踪的 server/venv/、crm_data.db、.env 文件
- 新增 server/.env.example 模板
- 新增合同管理、利润核算、AI教练等功能模块
- 新增 Playwright e2e 测试套件
- 前后端多项功能升级和 bug 修复
2026-05-11 07:24:19 +00:00

537 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
财务票据域路由 —— /api/finance
薄路由层:参数解析 + 调用 Service + 包装响应
"""
from __future__ import annotations
import uuid
import os
import time
import base64
from fastapi import APIRouter, Depends, Query, Body, File, UploadFile, Form
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_user, get_current_company_id
from app.db.database import get_db
from app.schemas.auth import CurrentUserPayload
from app.schemas.finance import ExpenseCreate, ExpenseStatusUpdate, InvoiceCreate
from app.schemas.response import ok
from app.core.exceptions import BizException
from app.services import finance_service as svc
router = APIRouter(prefix="/finance", tags=["财务票据"])
@router.post("/ocr", summary="上传票据图片并做 AI 发票/名片 OCR 识别")
async def ocr_recognize(
file: UploadFile = File(...),
scene: str = Form("invoice"),
current_user: CurrentUserPayload = Depends(get_current_user),
) -> dict:
from app.services.ocr_service import ocr_image
# 读取并在本地保存原始文件
file_bytes = await file.read()
upload_dir = "uploads/finance"
os.makedirs(upload_dir, exist_ok=True)
ext = os.path.splitext(file.filename or "")[1].lower() or ".png"
ts = int(time.time())
safe_filename = f"{ts}_{current_user.user_id}{ext}"
file_path = os.path.join(upload_dir, safe_filename)
with open(file_path, "wb") as f:
f.write(file_bytes)
file_url = f"/uploads/finance/{safe_filename}"
# 支持的格式:结构化零算力 > 文本 LLM > 图片 Vision
supported = {".png", ".jpg", ".jpeg", ".pdf", ".md", ".ofd", ".xml", ".zip"}
if ext not in supported:
raise BizException(message=f"不支持的文件格式 {ext},仅支持: {', '.join(sorted(supported))}")
# ── 策略 A0: ZIP → 解包所有 XML 并逐个解析 ──
if ext == ".zip":
from app.services.invoice_parser import parse_zip_invoices
results = parse_zip_invoices(file_bytes)
return ok(data={"zip_results": [
{"filename": r.get("filename", ""), "success": r.get("success", False),
"ocr_data": r.get("data", {}), "needs_llm": r.get("needs_llm", False),
"error": r.get("error")}
for r in results
], "file_url": file_url}, message=f"ZIP 解析完成:{sum(1 for r in results if r.get('success'))}/{len(results)} 成功")
# ── 策略 A: OFD / XML → 结构化零算力提取(最快最准)──
if ext in (".ofd", ".xml"):
from app.services.invoice_parser import parse_ofd_invoice, parse_xml_invoice
parser = parse_ofd_invoice if ext == ".ofd" else parse_xml_invoice
result = parser(file_bytes)
print(f"[OCR] {ext.upper()} 解析: success={result.get('success')}")
if result.get("success"):
# 如果解析器提取到 raw_text 且标记 needs_llm,交给 LLM 做字段提取
if result.get("needs_llm") and result["data"].get("raw_text"):
from app.services.ocr_service import extract_invoice_from_text
llm_result = await extract_invoice_from_text(result["data"]["raw_text"], scene)
if llm_result.get("success"):
return ok(data={"ocr_data": llm_result["data"], "file_url": file_url}, message=f"AI 发票识别成功({ext.upper()} → LLM")
return ok(data={"ocr_data": llm_result.get("data", {}), "file_url": file_url}, message=llm_result.get("error", "LLM 解析失败"))
return ok(data={"ocr_data": result["data"], "file_url": file_url}, message=f"发票识别成功({ext.upper()} 结构化提取)")
return ok(data={"ocr_data": {}, "file_url": file_url}, message=result.get("error", f"{ext.upper()} 解析失败"))
# ── 策略 B: MD → 纯文本 LLM 理解(零 GPU Vision)──
if ext == ".md":
text = file_bytes.decode("utf-8", errors="replace").strip()
print(f"[OCR] MD 文本: {len(text)} 字符")
if len(text) < 20:
return ok(data={"ocr_data": {}, "file_url": file_url}, message="MD 文件内容过少,无法识别")
from app.services.ocr_service import extract_invoice_from_text
result = await extract_invoice_from_text(text, scene)
if result.get("success"):
return ok(data={"ocr_data": result["data"], "file_url": file_url}, message="AI 发票识别成功(MD 文本解析)")
return ok(data={"ocr_data": result.get("data", {}), "file_url": file_url}, message=result.get("error", "MD 文本解析失败"))
# ── 策略 C: PDF → PyMuPDF 提取文本 → LLM(零 GPU Vision)──
if ext == ".pdf":
try:
import fitz # PyMuPDF
doc = fitz.open(stream=file_bytes, filetype="pdf")
text = ""
for page in doc:
text += page.get_text() + "\n"
doc.close()
text = text.strip()
print(f"[OCR] PDF 文本提取: {len(text)} 字符")
if len(text) > 50: # 有足够文本内容
from app.services.ocr_service import extract_invoice_from_text
result = await extract_invoice_from_text(text, scene)
if result.get("success"):
return ok(data={"ocr_data": result["data"], "file_url": file_url}, message="AI 发票识别成功(PDF 文本解析)")
return ok(data={"ocr_data": result.get("data", {}), "file_url": file_url}, message=result.get("error", "PDF 文本提取失败"))
else:
# PDF 是扫描件(无文字层),降级到图片 OCR
print(f"[OCR] PDF 无文本层(仅 {len(text)} 字符),降级到图片 OCR")
page = fitz.open(stream=file_bytes, filetype="pdf")[0]
pix = page.get_pixmap(dpi=150)
ocr_bytes = pix.tobytes("png")
print(f"[OCR] PDF 转 PNG 成功: {len(ocr_bytes)} bytes")
except Exception as e:
print(f"[OCR] PDF 处理失败: {e}")
return ok(data={"ocr_data": {}, "file_url": file_url}, message=f"PDF 处理失败: {e}")
else:
ocr_bytes = file_bytes
# ── 策略 D: 图片/扫描PDF → Vision OCR(需要视觉模型)──
from app.services.ocr_service import ocr_image
image_base64 = base64.b64encode(ocr_bytes).decode("utf-8")
result = await ocr_image(image_base64, scene)
if result.get("success"):
return ok(data={"ocr_data": result["data"], "file_url": file_url}, message="AI OCR 识别成功")
# Vision 失败时友好提示
error_msg = result.get("error", "OCR 识别失败")
if "模型进程崩溃" in error_msg or "unexpectedly stopped" in error_msg or "服务异常" in error_msg:
error_msg += "。建议:请上传电子版 PDF/OFD/XML 发票,系统可零算力直接提取数据"
return ok(data={"ocr_data": {}, "file_url": file_url}, message=error_msg)
@router.post("/invoices", summary="上传票据入池(含 AI/OCR JSONB 数据)")
async def create_invoice(
body: InvoiceCreate,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
result = await svc.create_invoice(db, current_user, body, company_id)
return ok(data=result.model_dump(mode="json"), message="票据入池成功")
@router.get("/invoices", summary="票据池列表(数据权限隔离)")
async def list_invoices(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
type: str | None = Query(None, alias="category", pattern=r"^(expense|customer)$"),
is_used: bool | None = Query(None),
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
result = await svc.list_invoices(db, current_user, page, size, type, is_used, company_id)
return ok(data=result.model_dump(mode="json"))
@router.delete("/invoices/{invoice_id}", summary="作废票据(软删除)")
async def void_invoice(
invoice_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
) -> dict:
await svc.void_invoice(db, current_user, invoice_id)
return ok(message="票据已作废")
@router.post("/expenses", summary="生成报销单(防重锁定 + 强事务)")
async def create_expense(
body: ExpenseCreate,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
result = await svc.create_expense(db, current_user, body, company_id)
return ok(data=result.model_dump(mode="json"), message=f"报销单 {result.system_no} 提交成功")
@router.get("/expenses", summary="报销单大盘(多角色数据权限)")
async def list_expenses(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
status: str | None = Query(None, pattern=r"^(submitted|approved|rejected|voided)$"),
applicant_id: uuid.UUID | None = Query(None, description="按申请人过滤(管理员用)"),
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
result = await svc.list_expenses(db, current_user, page, size, status, applicant_id, company_id)
return ok(data=result.model_dump(mode="json"))
@router.get("/expenses/{expense_id}", summary="报销单详情(含明细行)")
async def get_expense(
expense_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
) -> dict:
result = await svc.get_expense(db, current_user, expense_id)
return ok(data=result.model_dump(mode="json"))
@router.put("/expenses/{expense_id}/status", summary="审批/撤回报销单(含发票释放)")
async def update_expense_status(
expense_id: uuid.UUID,
body: ExpenseStatusUpdate,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
) -> dict:
msg = await svc.update_expense_status(db, current_user, expense_id, body)
return ok(message=msg)
# ════════════════════════════════════════════════════════════
# 批量上传 + OCR 任务队列 API
# ════════════════════════════════════════════════════════════
@router.post("/upload-batch", summary="批量上传发票(ZIP/XML 即时入池,图片PDF 入队列)")
async def upload_batch(
files: list[UploadFile] = File(...),
scene: str = Form("invoice"),
inv_type: str = Form("expense"),
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
from app.services.invoice_parser import parse_xml_invoice, parse_ofd_invoice, parse_zip_invoices
from app.services.ocr_service import extract_invoice_from_text
from app.models.finance import FinInvoicePool, FinOcrTask
upload_dir = "uploads/finance"
os.makedirs(upload_dir, exist_ok=True)
results = [] # 返回给前端
for file in files:
file_bytes = await file.read()
ext = os.path.splitext(file.filename or "")[1].lower() or ".bin"
ts = int(time.time())
safe_fn = f"{ts}_{uuid.uuid4().hex[:8]}{ext}"
file_path = os.path.join(upload_dir, safe_fn)
with open(file_path, "wb") as f:
f.write(file_bytes)
file_url = f"/uploads/finance/{safe_fn}"
# ── ZIP: 解压内部 XML,逐个即时入池 ──
if ext == ".zip":
zip_results = parse_zip_invoices(file_bytes)
for zr in zip_results:
if zr.get("success") and not zr.get("needs_llm"):
ai_data = zr.get("data", {})
# 需要 LLM 的 zip 中的 xml 也立刻处理
merchant = ai_data.get("merchant") or ai_data.get("merchant_name") or "(ZIP)"
amount = float(ai_data.get("amount", 0) or 0)
inv_date_str = ai_data.get("date")
inv_date = None
if inv_date_str:
try:
from datetime import date as d
inv_date = d.fromisoformat(inv_date_str)
except ValueError:
pass
inv = FinInvoicePool(
uploader_id=current_user.user_id, company_id=company_id,
file_url=file_url, merchant_name=merchant, amount=amount,
invoice_date=inv_date, type=inv_type, ai_extracted_data=ai_data,
)
db.add(inv)
results.append({"filename": zr.get("filename", file.filename), "action": "pooled",
"status": "success", "message": f"{merchant} ¥{amount}"})
elif zr.get("needs_llm") and zr.get("data", {}).get("raw_text"):
# LLM 文本理解(即时,<5s
try:
llm_r = await extract_invoice_from_text(zr["data"]["raw_text"], scene)
if llm_r.get("success"):
ai_data = llm_r["data"]
merchant = ai_data.get("merchant") or "(LLM)"
amount = float(ai_data.get("amount", 0) or 0)
inv = FinInvoicePool(
uploader_id=current_user.user_id, company_id=company_id,
file_url=file_url, merchant_name=merchant, amount=amount,
type=inv_type, ai_extracted_data=ai_data,
)
db.add(inv)
results.append({"filename": zr.get("filename"), "action": "pooled",
"status": "success", "message": f"{merchant} ¥{amount} (LLM)"})
else:
results.append({"filename": zr.get("filename"), "action": "failed",
"status": "error", "message": llm_r.get("error", "LLM 解析失败")})
except Exception as e:
results.append({"filename": zr.get("filename"), "action": "failed",
"status": "error", "message": str(e)})
else:
results.append({"filename": zr.get("filename", file.filename), "action": "failed",
"status": "error", "message": zr.get("error", "解析失败")})
continue
# ── XML / OFD: 零算力即时入池 ──
if ext in (".xml", ".ofd"):
parser = parse_xml_invoice if ext == ".xml" else parse_ofd_invoice
r = parser(file_bytes)
if r.get("success") and not r.get("needs_llm"):
ai_data = r.get("data", {})
merchant = ai_data.get("merchant") or ai_data.get("merchant_name") or "(解析)"
amount = float(ai_data.get("amount", 0) or 0)
inv_date_str = ai_data.get("date")
inv_date = None
if inv_date_str:
try:
from datetime import date as d
inv_date = d.fromisoformat(inv_date_str)
except ValueError:
pass
inv = FinInvoicePool(
uploader_id=current_user.user_id, company_id=company_id,
file_url=file_url, merchant_name=merchant, amount=amount,
invoice_date=inv_date, type=inv_type, ai_extracted_data=ai_data,
)
db.add(inv)
results.append({"filename": file.filename, "action": "pooled",
"status": "success", "message": f"{merchant} ¥{amount}"})
elif r.get("needs_llm") and r.get("data", {}).get("raw_text"):
try:
llm_r = await extract_invoice_from_text(r["data"]["raw_text"], scene)
if llm_r.get("success"):
ai_data = llm_r["data"]
merchant = ai_data.get("merchant") or "(LLM)"
amount = float(ai_data.get("amount", 0) or 0)
inv = FinInvoicePool(
uploader_id=current_user.user_id, company_id=company_id,
file_url=file_url, merchant_name=merchant, amount=amount,
type=inv_type, ai_extracted_data=ai_data,
)
db.add(inv)
results.append({"filename": file.filename, "action": "pooled",
"status": "success", "message": f"{merchant} ¥{amount} (LLM)"})
else:
results.append({"filename": file.filename, "action": "failed",
"status": "error", "message": llm_r.get("error", "LLM 失败")})
except Exception as e:
results.append({"filename": file.filename, "action": "failed",
"status": "error", "message": str(e)})
else:
results.append({"filename": file.filename, "action": "failed",
"status": "error", "message": r.get("error", "解析失败")})
continue
# ── 图片 / PDF : 写入 DB 任务队列 ──
task = FinOcrTask(
file_url=file_url, file_ext=ext,
original_name=file.filename or "unknown",
uploader_id=current_user.user_id,
company_id=company_id,
inv_type=inv_type,
priority=50 if ext == ".pdf" else 100, # PDF 优先(可能有文字层)
)
db.add(task)
await db.flush()
results.append({"filename": file.filename, "action": "queued",
"status": "pending", "task_id": str(task.id),
"message": "🕐 已加入 OCR 处理队列"})
await db.commit()
pooled = sum(1 for r in results if r["action"] == "pooled")
queued = sum(1 for r in results if r["action"] == "queued")
failed = sum(1 for r in results if r["action"] == "failed")
return ok(data={"results": results},
message=f"批量处理完成:{pooled} 即时入池,{queued} 排队中,{failed} 失败")
@router.get("/ocr-tasks", summary="OCR 任务队列列表")
async def list_ocr_tasks(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
status: str | None = Query(None, description="pending/processing/success/failed/manual"),
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
company_id: uuid.UUID = Depends(get_current_company_id),
) -> dict:
from sqlalchemy import func, select
from app.models.finance import FinOcrTask
where = [FinOcrTask.company_id == company_id, FinOcrTask.is_deleted.is_(False)]
if current_user.data_scope == "self":
where.append(FinOcrTask.uploader_id == current_user.user_id)
if status:
where.append(FinOcrTask.status == status)
total = (await db.execute(select(func.count()).select_from(FinOcrTask).where(*where))).scalar() or 0
stmt = (
select(FinOcrTask).where(*where)
.order_by(FinOcrTask.priority, FinOcrTask.created_at.desc())
.offset((page - 1) * size).limit(size)
)
tasks = (await db.execute(stmt)).scalars().all()
return ok(data={
"total": total, "page": page, "size": size,
"items": [{
"id": str(t.id),
"original_name": t.original_name,
"file_ext": t.file_ext,
"file_url": t.file_url,
"status": t.status,
"priority": t.priority,
"retry_count": t.retry_count,
"max_retries": t.max_retries,
"error_message": t.error_message,
"ocr_result": t.ocr_result,
"invoice_pool_id": str(t.invoice_pool_id) if t.invoice_pool_id else None,
"uploader_name": t.uploader.real_name if t.uploader else None,
"inv_type": t.inv_type,
"created_at": str(t.created_at),
"updated_at": str(t.updated_at),
} for t in tasks],
})
@router.post("/ocr-tasks/{task_id}/retry", summary="重试失败的 OCR 任务")
async def retry_ocr_task(
task_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
) -> dict:
from sqlalchemy import select, update
from app.models.finance import FinOcrTask
task = (await db.execute(
select(FinOcrTask).where(FinOcrTask.id == task_id, FinOcrTask.is_deleted.is_(False))
)).scalar_one_or_none()
if not task:
raise BizException(message="任务不存在")
if task.status not in ("failed", "manual"):
raise BizException(message=f"当前状态 [{task.status}] 不允许重试")
task.status = "pending"
task.retry_count = 0
task.error_message = None
await db.commit()
return ok(message="任务已重新入队")
@router.post("/ocr-tasks/{task_id}/manual", summary="手动录入 OCR 结果并入池")
async def manual_ocr_task(
task_id: uuid.UUID,
body: dict,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
) -> dict:
from sqlalchemy import select
from app.models.finance import FinOcrTask, FinInvoicePool
task = (await db.execute(
select(FinOcrTask).where(FinOcrTask.id == task_id, FinOcrTask.is_deleted.is_(False))
)).scalar_one_or_none()
if not task:
raise BizException(message="任务不存在")
merchant = body.get("merchant_name", "手动录入")
amount = float(body.get("amount", 0))
inv_date_str = body.get("invoice_date")
inv_date = None
if inv_date_str:
try:
from datetime import date as d
inv_date = d.fromisoformat(inv_date_str)
except ValueError:
pass
inv = FinInvoicePool(
uploader_id=task.uploader_id, company_id=task.company_id,
file_url=task.file_url, merchant_name=merchant, amount=amount,
invoice_date=inv_date, type=task.inv_type, ai_extracted_data=body,
)
db.add(inv)
await db.flush()
task.status = "manual"
task.invoice_pool_id = inv.id
task.ocr_result = body
task.error_message = None
await db.commit()
return ok(data={"invoice_pool_id": str(inv.id)}, message="手动录入成功,发票已入池")
@router.put("/ocr-tasks/{task_id}/priority", summary="调整 OCR 任务优先级")
async def update_ocr_task_priority(
task_id: uuid.UUID,
body: dict,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
) -> dict:
from sqlalchemy import select
from app.models.finance import FinOcrTask
task = (await db.execute(
select(FinOcrTask).where(FinOcrTask.id == task_id, FinOcrTask.is_deleted.is_(False))
)).scalar_one_or_none()
if not task:
raise BizException(message="任务不存在")
if task.status not in ("pending",):
raise BizException(message="仅待处理任务可调整优先级")
new_priority = body.get("priority", task.priority)
task.priority = int(new_priority)
await db.commit()
return ok(message=f"优先级已调整为 {task.priority}")
@router.delete("/ocr-tasks/{task_id}", summary="取消/删除 OCR 任务")
async def delete_ocr_task(
task_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
) -> dict:
from sqlalchemy import select
from app.models.finance import FinOcrTask
task = (await db.execute(
select(FinOcrTask).where(FinOcrTask.id == task_id, FinOcrTask.is_deleted.is_(False))
)).scalar_one_or_none()
if not task:
raise BizException(message="任务不存在")
if task.status == "processing":
raise BizException(message="正在处理中的任务无法取消")
task.is_deleted = True
await db.commit()
return ok(message="任务已取消")