Files
hankin 815cbf9d8c v0.2.0: CRM/ERP 系统升级 - 清理 .gitignore 并移除误提交的 venv/env/db 文件
- 更新 .gitignore:全面覆盖环境变量、数据库、日志、缓存、上传文件
- 移除误跟踪的 server/venv/、crm_data.db、.env 文件
- 新增 server/.env.example 模板
- 新增合同管理、利润核算、AI教练等功能模块
- 新增 Playwright e2e 测试套件
- 前后端多项功能升级和 bug 修复
2026-05-11 07:24:19 +00:00

227 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
库存与利润核算 Service 层
- MWA 入库事务(悲观锁 FOR UPDATE + 零元隔离)
- 订单利润快照
- 利润报表聚合
"""
from __future__ import annotations
import uuid
from datetime import datetime
from sqlalchemy import func, select, update, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import BizException, NotFoundException
from app.models.erp import ErpSkuInventory, InventoryFlow, ProductSku
from app.models.cost import ErpOrderItemCost
from app.models.order import ErpOrder, ErpOrderItem
from app.schemas.auth import CurrentUserPayload
# ── MWA 入库事务 ────────────────────────────────────────
async def process_inbound_with_mwa(
db: AsyncSession,
sku_id: uuid.UUID,
company_id: uuid.UUID,
qty: float,
purchase_unit_price: float,
operator_id: uuid.UUID | None = None,
remark: str | None = None,
is_special_zero_cost: bool = False,
) -> dict:
"""
入库事务(悲观锁 + MWA
1. SELECT ... FOR UPDATE 锁定库存行
2. 如果非零元特殊,计算新 MWA
3. 更新库存 + 记录流水
"""
# 悲观锁获取库存记录
inv_stmt = (
select(ErpSkuInventory)
.where(
ErpSkuInventory.sku_id == sku_id,
ErpSkuInventory.company_id == company_id,
)
.with_for_update()
)
inv = (await db.execute(inv_stmt)).scalar_one_or_none()
if inv is None:
# 首次入库,创建库存记录
inv = ErpSkuInventory(
sku_id=sku_id,
company_id=company_id,
stock_qty=0,
mwa_unit_cost=0,
)
db.add(inv)
await db.flush()
# 重新锁定
inv = (await db.execute(inv_stmt)).scalar_one()
old_qty = float(inv.stock_qty or 0)
old_mwa = float(inv.mwa_unit_cost or 0)
new_qty = old_qty + qty
# MWA 计算(零元特殊入库不参与)
if is_special_zero_cost or purchase_unit_price == 0:
new_mwa = old_mwa # 保持原有 MWA
else:
if new_qty > 0:
new_mwa = (old_qty * old_mwa + qty * purchase_unit_price) / new_qty
else:
new_mwa = purchase_unit_price
# 更新库存
inv.stock_qty = new_qty
inv.mwa_unit_cost = round(new_mwa, 4)
inv.updated_at = datetime.utcnow()
# 记录流水
flow = InventoryFlow(
sku_id=sku_id,
company_id=company_id,
flow_type="in",
change_qty=qty,
reason="purchase_in",
purchase_unit_price=purchase_unit_price,
is_special_zero_cost=is_special_zero_cost,
operator_id=operator_id,
remark=remark or f"入库 {qty} 件 @ ¥{purchase_unit_price}",
)
db.add(flow)
await db.commit()
return {
"sku_id": str(sku_id),
"old_qty": old_qty,
"new_qty": new_qty,
"old_mwa": old_mwa,
"new_mwa": round(new_mwa, 4),
"is_special_zero_cost": is_special_zero_cost,
}
# ── 订单明细成本快照 ────────────────────────────────────
async def snapshot_order_item_costs(
db: AsyncSession,
order_id: uuid.UUID,
company_id: uuid.UUID,
) -> list[dict]:
"""为订单的所有明细行锚定 MWA 成本快照"""
items_stmt = select(ErpOrderItem).where(
ErpOrderItem.order_id == order_id,
ErpOrderItem.is_deleted.is_(False),
)
items = (await db.execute(items_stmt)).scalars().all()
results = []
for item in items:
# 查当前 MWA
inv = (await db.execute(
select(ErpSkuInventory).where(
ErpSkuInventory.sku_id == item.sku_id,
ErpSkuInventory.company_id == company_id,
)
)).scalar_one_or_none()
mwa_cost = float(inv.mwa_unit_cost or 0) if inv else 0
sell_price = float(item.unit_price or 0)
qty = float(item.qty or 0)
profit = (sell_price - mwa_cost) * qty
profit_rate = (sell_price - mwa_cost) / sell_price if sell_price > 0 else 0
# 检查是否已有快照
existing = (await db.execute(
select(ErpOrderItemCost).where(
ErpOrderItemCost.order_item_id == item.id
)
)).scalar_one_or_none()
if existing:
existing.purchase_unit_price = mwa_cost
existing.profit_amount = round(profit, 2)
existing.profit_rate = round(profit_rate, 4)
else:
cost_snap = ErpOrderItemCost(
order_item_id=item.id,
purchase_unit_price=mwa_cost,
profit_amount=round(profit, 2),
profit_rate=round(profit_rate, 4),
)
db.add(cost_snap)
results.append({
"sku_id": str(item.sku_id),
"qty": qty,
"sell_price": sell_price,
"mwa_cost": mwa_cost,
"profit": round(profit, 2),
"profit_rate": round(profit_rate * 100, 2),
})
await db.commit()
return results
# ── 利润报表 ────────────────────────────────────────────
async def get_profit_report(
db: AsyncSession,
company_id: uuid.UUID,
start_date: str | None = None,
end_date: str | None = None,
) -> dict:
"""聚合利润报表"""
base_where = [
ErpOrder.company_id == company_id,
ErpOrder.is_deleted.is_(False),
]
if start_date:
base_where.append(ErpOrder.order_date >= start_date)
if end_date:
base_where.append(ErpOrder.order_date <= end_date)
# 聚合:每笔订单的利润
stmt = (
select(
ErpOrder.id.label("order_id"),
ErpOrder.order_no,
ErpOrder.order_date,
ErpOrder.total_amount,
func.sum(ErpOrderItemCost.profit_amount).label("total_profit"),
)
.join(ErpOrderItem, ErpOrderItem.order_id == ErpOrder.id)
.join(ErpOrderItemCost, ErpOrderItemCost.order_item_id == ErpOrderItem.id)
.where(*base_where)
.group_by(ErpOrder.id, ErpOrder.order_no, ErpOrder.order_date, ErpOrder.total_amount)
.order_by(ErpOrder.order_date.desc())
)
rows = (await db.execute(stmt)).all()
orders = []
total_revenue = 0
total_profit = 0
for r in rows:
revenue = float(r.total_amount or 0)
profit = float(r.total_profit or 0)
total_revenue += revenue
total_profit += profit
orders.append({
"order_id": str(r.order_id),
"order_no": r.order_no,
"order_date": r.order_date.isoformat() if r.order_date else None,
"revenue": revenue,
"profit": profit,
"profit_rate": round(profit / revenue * 100, 2) if revenue > 0 else 0,
})
return {
"total_revenue": round(total_revenue, 2),
"total_profit": round(total_profit, 2),
"overall_profit_rate": round(total_profit / total_revenue * 100, 2) if total_revenue > 0 else 0,
"orders": orders,
}