Files
crm_project/server/app/services/invoice_parser.py
T
hankin 815cbf9d8c v0.2.0: CRM/ERP 系统升级 - 清理 .gitignore 并移除误提交的 venv/env/db 文件
- 更新 .gitignore:全面覆盖环境变量、数据库、日志、缓存、上传文件
- 移除误跟踪的 server/venv/、crm_data.db、.env 文件
- 新增 server/.env.example 模板
- 新增合同管理、利润核算、AI教练等功能模块
- 新增 Playwright e2e 测试套件
- 前后端多项功能升级和 bug 修复
2026-05-11 07:24:19 +00:00

212 lines
7.6 KiB
Python

"""
发票结构化解析器 — OFD / XML 零算力提取
OFD 文件本质是 ZIP 包含 XML,直接解包提取发票字段。
XML 电子发票(数电票)直接 XPath 提取。
"""
from __future__ import annotations
import io
import os
import re
import zipfile
from xml.etree import ElementTree as ET
from typing import Optional
def parse_ofd_invoice(file_bytes: bytes) -> dict:
"""
解析 OFD 电子发票文件。
OFD = ZIP 压缩包,内含 XML 描述文件。
提取发票关键字段,返回结构化 dict。
"""
result: dict = {}
try:
with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf:
# 收集所有 XML 内容
all_text = ""
for name in zf.namelist():
if name.endswith(".xml"):
try:
xml_bytes = zf.read(name)
xml_text = xml_bytes.decode("utf-8", errors="replace")
all_text += xml_text + "\n"
# 尝试从 XML 标签中提取结构化数据
extracted = _extract_from_xml_text(xml_text)
if extracted:
result.update(extracted)
except Exception:
continue
# 如果解析出了字段就直接返回
if result.get("merchant") or result.get("amount"):
return {"success": True, "data": result}
# 降级:把所有 XML 文本当纯文本返回,交给 LLM 处理
if all_text.strip():
return {"success": True, "data": {"raw_text": all_text[:8000]}, "needs_llm": True}
return {"success": False, "data": {}, "error": "OFD 文件中未找到有效 XML 内容"}
except zipfile.BadZipFile:
return {"success": False, "data": {}, "error": "OFD 文件格式损坏或不是有效的 OFD 文件"}
except Exception as e:
return {"success": False, "data": {}, "error": f"OFD 解析失败: {e}"}
def parse_xml_invoice(file_bytes: bytes) -> dict:
"""
解析 XML 格式电子发票(数电票)。
直接从 XML 标签提取所有发票字段。
"""
try:
xml_text = file_bytes.decode("utf-8", errors="replace")
result = _extract_from_xml_text(xml_text)
if result and (result.get("merchant") or result.get("amount")):
return {"success": True, "data": result}
# 降级:XML 结构未匹配预设标签,交给 LLM
if xml_text.strip():
return {"success": True, "data": {"raw_text": xml_text[:8000]}, "needs_llm": True}
return {"success": False, "data": {}, "error": "XML 文件内容为空"}
except Exception as e:
return {"success": False, "data": {}, "error": f"XML 解析失败: {e}"}
def parse_zip_invoices(file_bytes: bytes) -> list[dict]:
"""
解析 ZIP 压缩包中的所有 XML 发票文件。
返回列表,每个元素 = {"filename": str, "success": bool, "data": dict, ...}
支持系统导出的 ZIP 格式(内含多个 XML 发票)。
"""
results: list[dict] = []
try:
with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf:
xml_names = [n for n in zf.namelist() if n.lower().endswith(".xml")]
if not xml_names:
return [{"filename": "(zip)", "success": False, "data": {}, "error": "ZIP 包中未找到 XML 文件"}]
for name in xml_names:
try:
xml_bytes = zf.read(name)
result = parse_xml_invoice(xml_bytes)
result["filename"] = os.path.basename(name)
results.append(result)
except Exception as e:
results.append({"filename": os.path.basename(name), "success": False, "data": {}, "error": str(e)})
except zipfile.BadZipFile:
return [{"filename": "(zip)", "success": False, "data": {}, "error": "不是有效的 ZIP 文件"}]
except Exception as e:
return [{"filename": "(zip)", "success": False, "data": {}, "error": f"ZIP 解析失败: {e}"}]
return results
# ── 内部工具函数 ──────────────────────────────────────
# 常见发票 XML 标签名映射(兼容多种数电票 XML 格式)
_FIELD_PATTERNS = {
"merchant": [
"SalesName", "SellerName", "销售方名称", "销方名称",
"开票方", "Seller", "salername", "xfmc",
],
"buyer": [
"BuyerName", "PurchaserName", "购买方名称", "购方名称",
"Buyer", "buyername", "gfmc",
],
"amount": [
"TotalAmount", "Amount", "InvoiceAmount", "金额",
"合计金额", "价税合计", "jshj", "hjje",
],
"tax_amount": [
"TotalTax", "TaxAmount", "Tax", "税额",
"合计税额", "hjse",
],
"date": [
"IssueDate", "InvoiceDate", "BillingDate", "开票日期",
"kprq",
],
"invoice_code": [
"InvoiceCode", "发票代码", "fpdm",
],
"invoice_number": [
"InvoiceNumber", "InvoiceNo", "发票号码", "fphm",
],
"items": [
"GoodsName", "ItemName", "商品名称", "货物名称", "spmc",
],
"tax_rate": [
"TaxRate", "税率", "sl",
],
"remark": [
"Remark", "备注", "bz",
],
}
def _extract_from_xml_text(xml_text: str) -> Optional[dict]:
"""从 XML 文本中用多种策略提取发票字段。"""
result: dict = {}
# 策略 1: 正则匹配 <TagName>Value</TagName> 格式
for field, tag_names in _FIELD_PATTERNS.items():
for tag in tag_names:
# 匹配 <Tag>value</Tag> 或 <ns:Tag>value</ns:Tag>
pattern = rf'<(?:\w+:)?{re.escape(tag)}[^>]*>([^<]+)</(?:\w+:)?{re.escape(tag)}>'
match = re.search(pattern, xml_text, re.IGNORECASE)
if match:
value = match.group(1).strip()
if value:
# 数字字段转数值
if field in ("amount", "tax_amount"):
try:
result[field] = float(value)
except ValueError:
result[field] = value
else:
result[field] = value
break # 找到一个就跳到下一个字段
# 策略 2: 尝试 ElementTree 解析
if not result:
try:
# 移除 XML 声明中可能的编码问题
cleaned = re.sub(r'<\?xml[^?]*\?>', '', xml_text).strip()
if cleaned:
root = ET.fromstring(cleaned)
_extract_from_element(root, result)
except ET.ParseError:
pass
return result if result else None
def _extract_from_element(elem: ET.Element, result: dict, depth: int = 0):
"""递归遍历 XML 元素树提取字段。"""
if depth > 10:
return
tag_local = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag
for field, tag_names in _FIELD_PATTERNS.items():
if field not in result:
for tn in tag_names:
if tag_local.lower() == tn.lower():
text = (elem.text or "").strip()
if text:
if field in ("amount", "tax_amount"):
try:
result[field] = float(text)
except ValueError:
result[field] = text
else:
result[field] = text
break
for child in elem:
_extract_from_element(child, result, depth + 1)