815cbf9d8c
- 更新 .gitignore:全面覆盖环境变量、数据库、日志、缓存、上传文件 - 移除误跟踪的 server/venv/、crm_data.db、.env 文件 - 新增 server/.env.example 模板 - 新增合同管理、利润核算、AI教练等功能模块 - 新增 Playwright e2e 测试套件 - 前后端多项功能升级和 bug 修复
256 lines
11 KiB
Python
256 lines
11 KiB
Python
"""
|
||
物流发货 Service 层
|
||
REST API 路由 和 MCP 工具 共用此层函数
|
||
"""
|
||
from __future__ import annotations
|
||
import uuid
|
||
from datetime import date, datetime
|
||
from decimal import Decimal
|
||
from typing import Any
|
||
from sqlalchemy import func, select, update
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from app.core.exceptions import BizException, ForbiddenException, NotFoundException
|
||
from app.models.erp import ErpSkuInventory, InventoryFlow, ProductSku
|
||
from app.models.order import ErpOrder, ErpOrderItem
|
||
from app.models.shipping import ErpShippingItem, ErpShippingRecord
|
||
from app.models.sys import SysUser
|
||
from app.models.crm import CrmCustomer
|
||
from app.schemas.auth import CurrentUserPayload
|
||
from app.schemas.shipping import (
|
||
ShippingBriefResponse, ShippingCreate, ShippingItemResponse,
|
||
ShippingListResponse, ShippingResponse,
|
||
)
|
||
|
||
|
||
async def _generate_shipping_no(db: AsyncSession) -> str:
|
||
today = date.today().strftime("%Y%m%d")
|
||
prefix = f"SHP-{today}-"
|
||
count = (await db.execute(
|
||
select(func.count()).select_from(ErpShippingRecord)
|
||
.where(ErpShippingRecord.shipping_no.like(f"{prefix}%"))
|
||
)).scalar() or 0
|
||
return f"{prefix}{count + 1:03d}"
|
||
|
||
|
||
def _ship_item_to_resp(si: ErpShippingItem) -> ShippingItemResponse:
|
||
return ShippingItemResponse(
|
||
id=si.id, order_item_id=si.order_item_id, sku_id=si.sku_id,
|
||
sku_code=si.sku.sku_code if si.sku else None,
|
||
sku_name=si.sku.name if si.sku else None,
|
||
spec=si.sku.spec if si.sku else None,
|
||
unit=si.sku.unit if si.sku else None,
|
||
shipped_qty=float(si.shipped_qty),
|
||
)
|
||
|
||
|
||
def _ship_to_resp(sr: ErpShippingRecord, with_items: bool = True) -> ShippingResponse:
|
||
return ShippingResponse(
|
||
id=sr.id, shipping_no=sr.shipping_no, order_id=sr.order_id,
|
||
order_no=sr.order.order_no if sr.order else None,
|
||
customer_name=sr.order.customer.name if sr.order and sr.order.customer else None,
|
||
carrier=sr.carrier, tracking_no=sr.tracking_no, status=sr.status,
|
||
ship_date=sr.ship_date, remark=sr.remark,
|
||
operator_name=sr.operator.real_name if sr.operator else None,
|
||
items=[_ship_item_to_resp(i) for i in sr.items] if with_items else [],
|
||
created_at=sr.created_at,
|
||
)
|
||
|
||
|
||
def _ship_to_brief(sr: ErpShippingRecord) -> ShippingBriefResponse:
|
||
return ShippingBriefResponse(
|
||
id=sr.id, shipping_no=sr.shipping_no, order_id=sr.order_id,
|
||
order_no=sr.order.order_no if sr.order else None,
|
||
customer_name=sr.order.customer.name if sr.order and sr.order.customer else None,
|
||
carrier=sr.carrier, tracking_no=sr.tracking_no, status=sr.status,
|
||
ship_date=sr.ship_date,
|
||
operator_name=sr.operator.real_name if sr.operator else None,
|
||
created_at=sr.created_at,
|
||
)
|
||
|
||
|
||
def _check_shipping_access(order: ErpOrder, user: CurrentUserPayload) -> None:
|
||
if user.data_scope == "all":
|
||
return
|
||
if user.data_scope == "self" and order.salesperson_id != user.user_id:
|
||
raise ForbiddenException("无权访问该订单的发货记录(数据权限:仅本人)")
|
||
|
||
|
||
async def create_shipping(
|
||
db: AsyncSession, user: CurrentUserPayload, body: ShippingCreate,
|
||
company_id: uuid.UUID,
|
||
) -> tuple[ShippingResponse, str]:
|
||
"""返回 (response, new_shipping_state)。库存从 erp_sku_inventory 扣减"""
|
||
order = (await db.execute(
|
||
select(ErpOrder).where(
|
||
ErpOrder.id == body.order_id,
|
||
ErpOrder.is_deleted.is_(False),
|
||
ErpOrder.company_id == company_id,
|
||
)
|
||
)).scalar_one_or_none()
|
||
if order is None:
|
||
raise NotFoundException("订单不存在")
|
||
if order.shipping_state == "shipped":
|
||
raise BizException(message="该订单已全部发完,无法再次发货")
|
||
_check_shipping_access(order, user)
|
||
|
||
order_item_ids = [item.order_item_id for item in body.items]
|
||
oi_rows = (await db.execute(
|
||
select(ErpOrderItem).where(
|
||
ErpOrderItem.id.in_(order_item_ids),
|
||
ErpOrderItem.order_id == body.order_id,
|
||
ErpOrderItem.is_deleted.is_(False),
|
||
)
|
||
)).scalars().all()
|
||
oi_map: dict[uuid.UUID, ErpOrderItem] = {oi.id: oi for oi in oi_rows}
|
||
|
||
for item in body.items:
|
||
oi = oi_map.get(item.order_item_id)
|
||
if oi is None:
|
||
raise BizException(message=f"订单明细行 {item.order_item_id} 不存在或不属于该订单")
|
||
remaining = float(oi.qty) - float(oi.shipped_qty or 0)
|
||
if item.shipped_qty > remaining:
|
||
raise BizException(message=f"SKU {item.sku_id} 发货数量超出未发余量:本次 {item.shipped_qty},剩余可发 {remaining}")
|
||
|
||
new_state = "partial"
|
||
try:
|
||
async with db.begin_nested():
|
||
now = datetime.utcnow()
|
||
shipping_no = await _generate_shipping_no(db)
|
||
record = ErpShippingRecord(
|
||
shipping_no=shipping_no, order_id=body.order_id,
|
||
carrier=body.carrier, tracking_no=body.tracking_no,
|
||
status="transit", ship_date=body.ship_date or date.today(),
|
||
remark=body.remark, operator_id=user.user_id,
|
||
company_id=company_id,
|
||
)
|
||
db.add(record)
|
||
await db.flush()
|
||
|
||
for item in body.items:
|
||
si = ErpShippingItem(
|
||
shipping_id=record.id, order_item_id=item.order_item_id,
|
||
sku_id=item.sku_id, shipped_qty=item.shipped_qty,
|
||
)
|
||
db.add(si)
|
||
|
||
# ── 从 erp_sku_inventory 扣减库存(行锁) ──
|
||
inv = (
|
||
await db.execute(
|
||
select(ErpSkuInventory)
|
||
.where(
|
||
ErpSkuInventory.sku_id == item.sku_id,
|
||
ErpSkuInventory.company_id == company_id,
|
||
)
|
||
.with_for_update()
|
||
)
|
||
).scalar_one_or_none()
|
||
|
||
current_stock = float(inv.stock_qty) if inv else 0
|
||
if current_stock < item.shipped_qty:
|
||
raise BizException(
|
||
message=f"库存不足无法发货: SKU {item.sku_id},"
|
||
f"当前库存 {current_stock},请求出库 {item.shipped_qty}"
|
||
)
|
||
|
||
if inv is None:
|
||
# 不应出现此情况,但防御性处理
|
||
raise BizException(message=f"SKU {item.sku_id} 在当前公司无库存记录")
|
||
|
||
await db.execute(
|
||
update(ErpSkuInventory)
|
||
.where(ErpSkuInventory.id == inv.id)
|
||
.values(
|
||
stock_qty=ErpSkuInventory.stock_qty - Decimal(str(item.shipped_qty)),
|
||
updated_at=now,
|
||
)
|
||
)
|
||
|
||
db.add(InventoryFlow(
|
||
sku_id=item.sku_id, company_id=company_id,
|
||
change_qty=-item.shipped_qty,
|
||
reason="shipment", remark=f"订单发货出库 - 发货单 {shipping_no}",
|
||
operator_id=user.user_id,
|
||
))
|
||
|
||
await db.execute(
|
||
update(ErpOrderItem).where(ErpOrderItem.id == item.order_item_id)
|
||
.values(shipped_qty=ErpOrderItem.shipped_qty + Decimal(str(item.shipped_qty)), updated_at=now)
|
||
)
|
||
|
||
await db.flush()
|
||
all_items = (await db.execute(
|
||
select(ErpOrderItem).where(ErpOrderItem.order_id == body.order_id, ErpOrderItem.is_deleted.is_(False))
|
||
)).scalars().all()
|
||
all_shipped = all(float(i.shipped_qty or 0) >= float(i.qty) for i in all_items)
|
||
new_state = "shipped" if all_shipped else "partial"
|
||
await db.execute(
|
||
update(ErpOrder).where(ErpOrder.id == body.order_id)
|
||
.values(shipping_state=new_state, updated_at=now)
|
||
)
|
||
await db.commit()
|
||
except BizException:
|
||
await db.rollback()
|
||
raise
|
||
except Exception as e:
|
||
await db.rollback()
|
||
raise BizException(code=500, message=f"发货事务失败: {e!s}") from e
|
||
|
||
refreshed = (await db.execute(
|
||
select(ErpShippingRecord).where(ErpShippingRecord.id == record.id)
|
||
)).scalar_one()
|
||
return _ship_to_resp(refreshed), new_state
|
||
|
||
|
||
async def list_shipping(
|
||
db: AsyncSession, user: CurrentUserPayload,
|
||
page: int = 1, size: int = 20,
|
||
order_no: str | None = None, tracking_no: str | None = None,
|
||
company_id: uuid.UUID | None = None,
|
||
) -> ShippingListResponse:
|
||
where: list[Any] = [ErpShippingRecord.is_deleted.is_(False)]
|
||
if company_id:
|
||
where.append(ErpShippingRecord.company_id == company_id)
|
||
if user.data_scope == "self":
|
||
my_orders = select(ErpOrder.id).where(ErpOrder.salesperson_id == user.user_id, ErpOrder.is_deleted.is_(False))
|
||
where.append(ErpShippingRecord.order_id.in_(my_orders))
|
||
elif user.data_scope == "dept_and_sub":
|
||
if user.dept_id is not None:
|
||
from app.models.sys import SysUser
|
||
dept_users = select(SysUser.id).where(SysUser.dept_id == user.dept_id, SysUser.is_deleted.is_(False))
|
||
dept_orders = select(ErpOrder.id).where(ErpOrder.salesperson_id.in_(dept_users), ErpOrder.is_deleted.is_(False))
|
||
where.append(ErpShippingRecord.order_id.in_(dept_orders))
|
||
if order_no:
|
||
matched = select(ErpOrder.id).where(ErpOrder.order_no.ilike(f"%{order_no}%"))
|
||
where.append(ErpShippingRecord.order_id.in_(matched))
|
||
if tracking_no:
|
||
where.append(ErpShippingRecord.tracking_no.ilike(f"%{tracking_no}%"))
|
||
|
||
total = (await db.execute(select(func.count()).select_from(ErpShippingRecord).where(*where))).scalar() or 0
|
||
stmt = select(ErpShippingRecord).where(*where).order_by(ErpShippingRecord.created_at.desc()).offset((page - 1) * size).limit(size)
|
||
records = (await db.execute(stmt)).scalars().all()
|
||
return ShippingListResponse(total=total, items=[_ship_to_brief(r) for r in records], page=page, size=size)
|
||
|
||
|
||
async def get_shipping_by_order(
|
||
db: AsyncSession, user: CurrentUserPayload, order_id: uuid.UUID,
|
||
company_id: uuid.UUID | None = None,
|
||
) -> dict[str, Any]:
|
||
where_clause = [ErpOrder.id == order_id, ErpOrder.is_deleted.is_(False)]
|
||
if company_id:
|
||
where_clause.append(ErpOrder.company_id == company_id)
|
||
order = (await db.execute(
|
||
select(ErpOrder).where(*where_clause)
|
||
)).scalar_one_or_none()
|
||
if order is None:
|
||
raise NotFoundException("订单不存在")
|
||
_check_shipping_access(order, user)
|
||
stmt = select(ErpShippingRecord).where(
|
||
ErpShippingRecord.order_id == order_id, ErpShippingRecord.is_deleted.is_(False),
|
||
).order_by(ErpShippingRecord.created_at.desc())
|
||
records = (await db.execute(stmt)).scalars().all()
|
||
return {
|
||
"order_id": str(order_id), "order_no": order.order_no,
|
||
"shipping_state": order.shipping_state, "total_shipments": len(records),
|
||
"shipments": [_ship_to_resp(r).model_dump(mode="json") for r in records],
|
||
}
|