""" 财务票据域路由 —— /api/finance 薄路由层:参数解析 + 调用 Service + 包装响应 """ from __future__ import annotations import uuid import os import time import base64 from fastapi import APIRouter, Depends, Query, Body, File, UploadFile, Form from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_current_user, get_current_company_id from app.db.database import get_db from app.schemas.auth import CurrentUserPayload from app.schemas.finance import ExpenseCreate, ExpenseStatusUpdate, InvoiceCreate from app.schemas.response import ok from app.core.exceptions import BizException from app.services import finance_service as svc router = APIRouter(prefix="/finance", tags=["财务票据"]) @router.post("/ocr", summary="上传票据图片并做 AI 发票/名片 OCR 识别") async def ocr_recognize( file: UploadFile = File(...), scene: str = Form("invoice"), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: from app.services.ocr_service import ocr_image # 读取并在本地保存原始文件 file_bytes = await file.read() upload_dir = "uploads/finance" os.makedirs(upload_dir, exist_ok=True) ext = os.path.splitext(file.filename or "")[1].lower() or ".png" ts = int(time.time()) safe_filename = f"{ts}_{current_user.user_id}{ext}" file_path = os.path.join(upload_dir, safe_filename) with open(file_path, "wb") as f: f.write(file_bytes) file_url = f"/uploads/finance/{safe_filename}" # 支持的格式:结构化零算力 > 文本 LLM > 图片 Vision supported = {".png", ".jpg", ".jpeg", ".pdf", ".md", ".ofd", ".xml", ".zip"} if ext not in supported: raise BizException(message=f"不支持的文件格式 {ext},仅支持: {', '.join(sorted(supported))}") # ── 策略 A0: ZIP → 解包所有 XML 并逐个解析 ── if ext == ".zip": from app.services.invoice_parser import parse_zip_invoices results = parse_zip_invoices(file_bytes) return ok(data={"zip_results": [ {"filename": r.get("filename", ""), "success": r.get("success", False), "ocr_data": r.get("data", {}), "needs_llm": r.get("needs_llm", False), "error": r.get("error")} for r in results ], "file_url": file_url}, message=f"ZIP 解析完成:{sum(1 for r in results if r.get('success'))}/{len(results)} 成功") # ── 策略 A: OFD / XML → 结构化零算力提取(最快最准)── if ext in (".ofd", ".xml"): from app.services.invoice_parser import parse_ofd_invoice, parse_xml_invoice parser = parse_ofd_invoice if ext == ".ofd" else parse_xml_invoice result = parser(file_bytes) print(f"[OCR] {ext.upper()} 解析: success={result.get('success')}") if result.get("success"): # 如果解析器提取到 raw_text 且标记 needs_llm,交给 LLM 做字段提取 if result.get("needs_llm") and result["data"].get("raw_text"): from app.services.ocr_service import extract_invoice_from_text llm_result = await extract_invoice_from_text(result["data"]["raw_text"], scene) if llm_result.get("success"): return ok(data={"ocr_data": llm_result["data"], "file_url": file_url}, message=f"AI 发票识别成功({ext.upper()} → LLM)") return ok(data={"ocr_data": llm_result.get("data", {}), "file_url": file_url}, message=llm_result.get("error", "LLM 解析失败")) return ok(data={"ocr_data": result["data"], "file_url": file_url}, message=f"发票识别成功({ext.upper()} 结构化提取)") return ok(data={"ocr_data": {}, "file_url": file_url}, message=result.get("error", f"{ext.upper()} 解析失败")) # ── 策略 B: MD → 纯文本 LLM 理解(零 GPU Vision)── if ext == ".md": text = file_bytes.decode("utf-8", errors="replace").strip() print(f"[OCR] MD 文本: {len(text)} 字符") if len(text) < 20: return ok(data={"ocr_data": {}, "file_url": file_url}, message="MD 文件内容过少,无法识别") from app.services.ocr_service import extract_invoice_from_text result = await extract_invoice_from_text(text, scene) if result.get("success"): return ok(data={"ocr_data": result["data"], "file_url": file_url}, message="AI 发票识别成功(MD 文本解析)") return ok(data={"ocr_data": result.get("data", {}), "file_url": file_url}, message=result.get("error", "MD 文本解析失败")) # ── 策略 C: PDF → PyMuPDF 提取文本 → LLM(零 GPU Vision)── if ext == ".pdf": try: import fitz # PyMuPDF doc = fitz.open(stream=file_bytes, filetype="pdf") text = "" for page in doc: text += page.get_text() + "\n" doc.close() text = text.strip() print(f"[OCR] PDF 文本提取: {len(text)} 字符") if len(text) > 50: # 有足够文本内容 from app.services.ocr_service import extract_invoice_from_text result = await extract_invoice_from_text(text, scene) if result.get("success"): return ok(data={"ocr_data": result["data"], "file_url": file_url}, message="AI 发票识别成功(PDF 文本解析)") return ok(data={"ocr_data": result.get("data", {}), "file_url": file_url}, message=result.get("error", "PDF 文本提取失败")) else: # PDF 是扫描件(无文字层),降级到图片 OCR print(f"[OCR] PDF 无文本层(仅 {len(text)} 字符),降级到图片 OCR") page = fitz.open(stream=file_bytes, filetype="pdf")[0] pix = page.get_pixmap(dpi=150) ocr_bytes = pix.tobytes("png") print(f"[OCR] PDF 转 PNG 成功: {len(ocr_bytes)} bytes") except Exception as e: print(f"[OCR] PDF 处理失败: {e}") return ok(data={"ocr_data": {}, "file_url": file_url}, message=f"PDF 处理失败: {e}") else: ocr_bytes = file_bytes # ── 策略 D: 图片/扫描PDF → Vision OCR(需要视觉模型)── from app.services.ocr_service import ocr_image image_base64 = base64.b64encode(ocr_bytes).decode("utf-8") result = await ocr_image(image_base64, scene) if result.get("success"): return ok(data={"ocr_data": result["data"], "file_url": file_url}, message="AI OCR 识别成功") # Vision 失败时友好提示 error_msg = result.get("error", "OCR 识别失败") if "模型进程崩溃" in error_msg or "unexpectedly stopped" in error_msg or "服务异常" in error_msg: error_msg += "。建议:请上传电子版 PDF/OFD/XML 发票,系统可零算力直接提取数据" return ok(data={"ocr_data": {}, "file_url": file_url}, message=error_msg) @router.post("/invoices", summary="上传票据入池(含 AI/OCR JSONB 数据)") async def create_invoice( body: InvoiceCreate, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), company_id: uuid.UUID = Depends(get_current_company_id), ) -> dict: result = await svc.create_invoice(db, current_user, body, company_id) return ok(data=result.model_dump(mode="json"), message="票据入池成功") @router.get("/invoices", summary="票据池列表(数据权限隔离)") async def list_invoices( page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), type: str | None = Query(None, alias="category", pattern=r"^(expense|customer)$"), is_used: bool | None = Query(None), db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), company_id: uuid.UUID = Depends(get_current_company_id), ) -> dict: result = await svc.list_invoices(db, current_user, page, size, type, is_used, company_id) return ok(data=result.model_dump(mode="json")) @router.delete("/invoices/{invoice_id}", summary="作废票据(软删除)") async def void_invoice( invoice_id: uuid.UUID, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: await svc.void_invoice(db, current_user, invoice_id) return ok(message="票据已作废") @router.post("/expenses", summary="生成报销单(防重锁定 + 强事务)") async def create_expense( body: ExpenseCreate, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), company_id: uuid.UUID = Depends(get_current_company_id), ) -> dict: result = await svc.create_expense(db, current_user, body, company_id) return ok(data=result.model_dump(mode="json"), message=f"报销单 {result.system_no} 提交成功") @router.get("/expenses", summary="报销单大盘(多角色数据权限)") async def list_expenses( page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), status: str | None = Query(None, pattern=r"^(submitted|approved|rejected|voided)$"), applicant_id: uuid.UUID | None = Query(None, description="按申请人过滤(管理员用)"), db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), company_id: uuid.UUID = Depends(get_current_company_id), ) -> dict: result = await svc.list_expenses(db, current_user, page, size, status, applicant_id, company_id) return ok(data=result.model_dump(mode="json")) @router.get("/expenses/{expense_id}", summary="报销单详情(含明细行)") async def get_expense( expense_id: uuid.UUID, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: result = await svc.get_expense(db, current_user, expense_id) return ok(data=result.model_dump(mode="json")) @router.put("/expenses/{expense_id}/status", summary="审批/撤回报销单(含发票释放)") async def update_expense_status( expense_id: uuid.UUID, body: ExpenseStatusUpdate, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: msg = await svc.update_expense_status(db, current_user, expense_id, body) return ok(message=msg) # ════════════════════════════════════════════════════════════ # 批量上传 + OCR 任务队列 API # ════════════════════════════════════════════════════════════ @router.post("/upload-batch", summary="批量上传发票(ZIP/XML 即时入池,图片PDF 入队列)") async def upload_batch( files: list[UploadFile] = File(...), scene: str = Form("invoice"), inv_type: str = Form("expense"), db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), company_id: uuid.UUID = Depends(get_current_company_id), ) -> dict: from app.services.invoice_parser import parse_xml_invoice, parse_ofd_invoice, parse_zip_invoices from app.services.ocr_service import extract_invoice_from_text from app.models.finance import FinInvoicePool, FinOcrTask upload_dir = "uploads/finance" os.makedirs(upload_dir, exist_ok=True) results = [] # 返回给前端 for file in files: file_bytes = await file.read() ext = os.path.splitext(file.filename or "")[1].lower() or ".bin" ts = int(time.time()) safe_fn = f"{ts}_{uuid.uuid4().hex[:8]}{ext}" file_path = os.path.join(upload_dir, safe_fn) with open(file_path, "wb") as f: f.write(file_bytes) file_url = f"/uploads/finance/{safe_fn}" # ── ZIP: 解压内部 XML,逐个即时入池 ── if ext == ".zip": zip_results = parse_zip_invoices(file_bytes) for zr in zip_results: if zr.get("success") and not zr.get("needs_llm"): ai_data = zr.get("data", {}) # 需要 LLM 的 zip 中的 xml 也立刻处理 merchant = ai_data.get("merchant") or ai_data.get("merchant_name") or "(ZIP)" amount = float(ai_data.get("amount", 0) or 0) inv_date_str = ai_data.get("date") inv_date = None if inv_date_str: try: from datetime import date as d inv_date = d.fromisoformat(inv_date_str) except ValueError: pass inv = FinInvoicePool( uploader_id=current_user.user_id, company_id=company_id, file_url=file_url, merchant_name=merchant, amount=amount, invoice_date=inv_date, type=inv_type, ai_extracted_data=ai_data, ) db.add(inv) results.append({"filename": zr.get("filename", file.filename), "action": "pooled", "status": "success", "message": f"✅ {merchant} ¥{amount}"}) elif zr.get("needs_llm") and zr.get("data", {}).get("raw_text"): # LLM 文本理解(即时,<5s) try: llm_r = await extract_invoice_from_text(zr["data"]["raw_text"], scene) if llm_r.get("success"): ai_data = llm_r["data"] merchant = ai_data.get("merchant") or "(LLM)" amount = float(ai_data.get("amount", 0) or 0) inv = FinInvoicePool( uploader_id=current_user.user_id, company_id=company_id, file_url=file_url, merchant_name=merchant, amount=amount, type=inv_type, ai_extracted_data=ai_data, ) db.add(inv) results.append({"filename": zr.get("filename"), "action": "pooled", "status": "success", "message": f"✅ {merchant} ¥{amount} (LLM)"}) else: results.append({"filename": zr.get("filename"), "action": "failed", "status": "error", "message": llm_r.get("error", "LLM 解析失败")}) except Exception as e: results.append({"filename": zr.get("filename"), "action": "failed", "status": "error", "message": str(e)}) else: results.append({"filename": zr.get("filename", file.filename), "action": "failed", "status": "error", "message": zr.get("error", "解析失败")}) continue # ── XML / OFD: 零算力即时入池 ── if ext in (".xml", ".ofd"): parser = parse_xml_invoice if ext == ".xml" else parse_ofd_invoice r = parser(file_bytes) if r.get("success") and not r.get("needs_llm"): ai_data = r.get("data", {}) merchant = ai_data.get("merchant") or ai_data.get("merchant_name") or "(解析)" amount = float(ai_data.get("amount", 0) or 0) inv_date_str = ai_data.get("date") inv_date = None if inv_date_str: try: from datetime import date as d inv_date = d.fromisoformat(inv_date_str) except ValueError: pass inv = FinInvoicePool( uploader_id=current_user.user_id, company_id=company_id, file_url=file_url, merchant_name=merchant, amount=amount, invoice_date=inv_date, type=inv_type, ai_extracted_data=ai_data, ) db.add(inv) results.append({"filename": file.filename, "action": "pooled", "status": "success", "message": f"✅ {merchant} ¥{amount}"}) elif r.get("needs_llm") and r.get("data", {}).get("raw_text"): try: llm_r = await extract_invoice_from_text(r["data"]["raw_text"], scene) if llm_r.get("success"): ai_data = llm_r["data"] merchant = ai_data.get("merchant") or "(LLM)" amount = float(ai_data.get("amount", 0) or 0) inv = FinInvoicePool( uploader_id=current_user.user_id, company_id=company_id, file_url=file_url, merchant_name=merchant, amount=amount, type=inv_type, ai_extracted_data=ai_data, ) db.add(inv) results.append({"filename": file.filename, "action": "pooled", "status": "success", "message": f"✅ {merchant} ¥{amount} (LLM)"}) else: results.append({"filename": file.filename, "action": "failed", "status": "error", "message": llm_r.get("error", "LLM 失败")}) except Exception as e: results.append({"filename": file.filename, "action": "failed", "status": "error", "message": str(e)}) else: results.append({"filename": file.filename, "action": "failed", "status": "error", "message": r.get("error", "解析失败")}) continue # ── 图片 / PDF : 写入 DB 任务队列 ── task = FinOcrTask( file_url=file_url, file_ext=ext, original_name=file.filename or "unknown", uploader_id=current_user.user_id, company_id=company_id, inv_type=inv_type, priority=50 if ext == ".pdf" else 100, # PDF 优先(可能有文字层) ) db.add(task) await db.flush() results.append({"filename": file.filename, "action": "queued", "status": "pending", "task_id": str(task.id), "message": "🕐 已加入 OCR 处理队列"}) await db.commit() pooled = sum(1 for r in results if r["action"] == "pooled") queued = sum(1 for r in results if r["action"] == "queued") failed = sum(1 for r in results if r["action"] == "failed") return ok(data={"results": results}, message=f"批量处理完成:{pooled} 即时入池,{queued} 排队中,{failed} 失败") @router.get("/ocr-tasks", summary="OCR 任务队列列表") async def list_ocr_tasks( page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), status: str | None = Query(None, description="pending/processing/success/failed/manual"), db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), company_id: uuid.UUID = Depends(get_current_company_id), ) -> dict: from sqlalchemy import func, select from app.models.finance import FinOcrTask where = [FinOcrTask.company_id == company_id, FinOcrTask.is_deleted.is_(False)] if current_user.data_scope == "self": where.append(FinOcrTask.uploader_id == current_user.user_id) if status: where.append(FinOcrTask.status == status) total = (await db.execute(select(func.count()).select_from(FinOcrTask).where(*where))).scalar() or 0 stmt = ( select(FinOcrTask).where(*where) .order_by(FinOcrTask.priority, FinOcrTask.created_at.desc()) .offset((page - 1) * size).limit(size) ) tasks = (await db.execute(stmt)).scalars().all() return ok(data={ "total": total, "page": page, "size": size, "items": [{ "id": str(t.id), "original_name": t.original_name, "file_ext": t.file_ext, "file_url": t.file_url, "status": t.status, "priority": t.priority, "retry_count": t.retry_count, "max_retries": t.max_retries, "error_message": t.error_message, "ocr_result": t.ocr_result, "invoice_pool_id": str(t.invoice_pool_id) if t.invoice_pool_id else None, "uploader_name": t.uploader.real_name if t.uploader else None, "inv_type": t.inv_type, "created_at": str(t.created_at), "updated_at": str(t.updated_at), } for t in tasks], }) @router.post("/ocr-tasks/{task_id}/retry", summary="重试失败的 OCR 任务") async def retry_ocr_task( task_id: uuid.UUID, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: from sqlalchemy import select, update from app.models.finance import FinOcrTask task = (await db.execute( select(FinOcrTask).where(FinOcrTask.id == task_id, FinOcrTask.is_deleted.is_(False)) )).scalar_one_or_none() if not task: raise BizException(message="任务不存在") if task.status not in ("failed", "manual"): raise BizException(message=f"当前状态 [{task.status}] 不允许重试") task.status = "pending" task.retry_count = 0 task.error_message = None await db.commit() return ok(message="任务已重新入队") @router.post("/ocr-tasks/{task_id}/manual", summary="手动录入 OCR 结果并入池") async def manual_ocr_task( task_id: uuid.UUID, body: dict, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: from sqlalchemy import select from app.models.finance import FinOcrTask, FinInvoicePool task = (await db.execute( select(FinOcrTask).where(FinOcrTask.id == task_id, FinOcrTask.is_deleted.is_(False)) )).scalar_one_or_none() if not task: raise BizException(message="任务不存在") merchant = body.get("merchant_name", "手动录入") amount = float(body.get("amount", 0)) inv_date_str = body.get("invoice_date") inv_date = None if inv_date_str: try: from datetime import date as d inv_date = d.fromisoformat(inv_date_str) except ValueError: pass inv = FinInvoicePool( uploader_id=task.uploader_id, company_id=task.company_id, file_url=task.file_url, merchant_name=merchant, amount=amount, invoice_date=inv_date, type=task.inv_type, ai_extracted_data=body, ) db.add(inv) await db.flush() task.status = "manual" task.invoice_pool_id = inv.id task.ocr_result = body task.error_message = None await db.commit() return ok(data={"invoice_pool_id": str(inv.id)}, message="手动录入成功,发票已入池") @router.put("/ocr-tasks/{task_id}/priority", summary="调整 OCR 任务优先级") async def update_ocr_task_priority( task_id: uuid.UUID, body: dict, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: from sqlalchemy import select from app.models.finance import FinOcrTask task = (await db.execute( select(FinOcrTask).where(FinOcrTask.id == task_id, FinOcrTask.is_deleted.is_(False)) )).scalar_one_or_none() if not task: raise BizException(message="任务不存在") if task.status not in ("pending",): raise BizException(message="仅待处理任务可调整优先级") new_priority = body.get("priority", task.priority) task.priority = int(new_priority) await db.commit() return ok(message=f"优先级已调整为 {task.priority}") @router.delete("/ocr-tasks/{task_id}", summary="取消/删除 OCR 任务") async def delete_ocr_task( task_id: uuid.UUID, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: from sqlalchemy import select from app.models.finance import FinOcrTask task = (await db.execute( select(FinOcrTask).where(FinOcrTask.id == task_id, FinOcrTask.is_deleted.is_(False)) )).scalar_one_or_none() if not task: raise BizException(message="任务不存在") if task.status == "processing": raise BizException(message="正在处理中的任务无法取消") task.is_deleted = True await db.commit() return ok(message="任务已取消")