""" 发票结构化解析器 — OFD / XML 零算力提取 OFD 文件本质是 ZIP 包含 XML,直接解包提取发票字段。 XML 电子发票(数电票)直接 XPath 提取。 """ from __future__ import annotations import io import os import re import zipfile from xml.etree import ElementTree as ET from typing import Optional def parse_ofd_invoice(file_bytes: bytes) -> dict: """ 解析 OFD 电子发票文件。 OFD = ZIP 压缩包,内含 XML 描述文件。 提取发票关键字段,返回结构化 dict。 """ result: dict = {} try: with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf: # 收集所有 XML 内容 all_text = "" for name in zf.namelist(): if name.endswith(".xml"): try: xml_bytes = zf.read(name) xml_text = xml_bytes.decode("utf-8", errors="replace") all_text += xml_text + "\n" # 尝试从 XML 标签中提取结构化数据 extracted = _extract_from_xml_text(xml_text) if extracted: result.update(extracted) except Exception: continue # 如果解析出了字段就直接返回 if result.get("merchant") or result.get("amount"): return {"success": True, "data": result} # 降级:把所有 XML 文本当纯文本返回,交给 LLM 处理 if all_text.strip(): return {"success": True, "data": {"raw_text": all_text[:8000]}, "needs_llm": True} return {"success": False, "data": {}, "error": "OFD 文件中未找到有效 XML 内容"} except zipfile.BadZipFile: return {"success": False, "data": {}, "error": "OFD 文件格式损坏或不是有效的 OFD 文件"} except Exception as e: return {"success": False, "data": {}, "error": f"OFD 解析失败: {e}"} def parse_xml_invoice(file_bytes: bytes) -> dict: """ 解析 XML 格式电子发票(数电票)。 直接从 XML 标签提取所有发票字段。 """ try: xml_text = file_bytes.decode("utf-8", errors="replace") result = _extract_from_xml_text(xml_text) if result and (result.get("merchant") or result.get("amount")): return {"success": True, "data": result} # 降级:XML 结构未匹配预设标签,交给 LLM if xml_text.strip(): return {"success": True, "data": {"raw_text": xml_text[:8000]}, "needs_llm": True} return {"success": False, "data": {}, "error": "XML 文件内容为空"} except Exception as e: return {"success": False, "data": {}, "error": f"XML 解析失败: {e}"} def parse_zip_invoices(file_bytes: bytes) -> list[dict]: """ 解析 ZIP 压缩包中的所有 XML 发票文件。 返回列表,每个元素 = {"filename": str, "success": bool, "data": dict, ...} 支持系统导出的 ZIP 格式(内含多个 XML 发票)。 """ results: list[dict] = [] try: with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf: xml_names = [n for n in zf.namelist() if n.lower().endswith(".xml")] if not xml_names: return [{"filename": "(zip)", "success": False, "data": {}, "error": "ZIP 包中未找到 XML 文件"}] for name in xml_names: try: xml_bytes = zf.read(name) result = parse_xml_invoice(xml_bytes) result["filename"] = os.path.basename(name) results.append(result) except Exception as e: results.append({"filename": os.path.basename(name), "success": False, "data": {}, "error": str(e)}) except zipfile.BadZipFile: return [{"filename": "(zip)", "success": False, "data": {}, "error": "不是有效的 ZIP 文件"}] except Exception as e: return [{"filename": "(zip)", "success": False, "data": {}, "error": f"ZIP 解析失败: {e}"}] return results # ── 内部工具函数 ────────────────────────────────────── # 常见发票 XML 标签名映射(兼容多种数电票 XML 格式) _FIELD_PATTERNS = { "merchant": [ "SalesName", "SellerName", "销售方名称", "销方名称", "开票方", "Seller", "salername", "xfmc", ], "buyer": [ "BuyerName", "PurchaserName", "购买方名称", "购方名称", "Buyer", "buyername", "gfmc", ], "amount": [ "TotalAmount", "Amount", "InvoiceAmount", "金额", "合计金额", "价税合计", "jshj", "hjje", ], "tax_amount": [ "TotalTax", "TaxAmount", "Tax", "税额", "合计税额", "hjse", ], "date": [ "IssueDate", "InvoiceDate", "BillingDate", "开票日期", "kprq", ], "invoice_code": [ "InvoiceCode", "发票代码", "fpdm", ], "invoice_number": [ "InvoiceNumber", "InvoiceNo", "发票号码", "fphm", ], "items": [ "GoodsName", "ItemName", "商品名称", "货物名称", "spmc", ], "tax_rate": [ "TaxRate", "税率", "sl", ], "remark": [ "Remark", "备注", "bz", ], } def _extract_from_xml_text(xml_text: str) -> Optional[dict]: """从 XML 文本中用多种策略提取发票字段。""" result: dict = {} # 策略 1: 正则匹配 Value 格式 for field, tag_names in _FIELD_PATTERNS.items(): for tag in tag_names: # 匹配 valuevalue pattern = rf'<(?:\w+:)?{re.escape(tag)}[^>]*>([^<]+)' match = re.search(pattern, xml_text, re.IGNORECASE) if match: value = match.group(1).strip() if value: # 数字字段转数值 if field in ("amount", "tax_amount"): try: result[field] = float(value) except ValueError: result[field] = value else: result[field] = value break # 找到一个就跳到下一个字段 # 策略 2: 尝试 ElementTree 解析 if not result: try: # 移除 XML 声明中可能的编码问题 cleaned = re.sub(r'<\?xml[^?]*\?>', '', xml_text).strip() if cleaned: root = ET.fromstring(cleaned) _extract_from_element(root, result) except ET.ParseError: pass return result if result else None def _extract_from_element(elem: ET.Element, result: dict, depth: int = 0): """递归遍历 XML 元素树提取字段。""" if depth > 10: return tag_local = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag for field, tag_names in _FIELD_PATTERNS.items(): if field not in result: for tn in tag_names: if tag_local.lower() == tn.lower(): text = (elem.text or "").strip() if text: if field in ("amount", "tax_amount"): try: result[field] = float(text) except ValueError: result[field] = text else: result[field] = text break for child in elem: _extract_from_element(child, result, depth + 1)