""" 销售日志服务 — CRUD + Dify 工作流异步触发 """ from __future__ import annotations import uuid from datetime import date from typing import Any import httpx from sqlalchemy import select, func, desc, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.models.ai import SalesLog from app.schemas.auth import CurrentUserPayload async def create_log( db: AsyncSession, user: CurrentUserPayload, content: str, customer_id: str | None = None, contact_ids: list[str] | None = None, log_date: date | None = None, company_ids: list[uuid.UUID] | None = None, ) -> dict: """创建销售日志""" log = SalesLog( salesperson_id=user.user_id, customer_id=uuid.UUID(customer_id) if customer_id else None, contact_ids=contact_ids or [], content=content, log_date=log_date or date.today(), involved_company_ids=company_ids or [], ) db.add(log) await db.commit() await db.refresh(log) return _to_dict(log) async def list_logs( db: AsyncSession, user: CurrentUserPayload, page: int = 1, size: int = 20, customer_id: str | None = None, user_id: str | None = None, start_date: str | None = None, end_date: str | None = None, company_id: uuid.UUID | None = None, ) -> dict: """查询销售日志列表(按 involved_company_ids 包含过滤)""" from sqlalchemy.orm import aliased from app.models.crm import CrmCustomer from app.models.sys import SysUser conditions = [SalesLog.is_deleted.is_(False)] if company_id: # ARRAY contains: 过滤涉及当前公司的日志 conditions.append(SalesLog.involved_company_ids.any(company_id)) # 数据权限 if user.data_scope == "self": conditions.append(SalesLog.salesperson_id == user.user_id) elif user_id: conditions.append(SalesLog.salesperson_id == uuid.UUID(user_id)) if start_date: conditions.append(SalesLog.log_date >= start_date) if end_date: conditions.append(SalesLog.log_date <= end_date) if customer_id: conditions.append(SalesLog.customer_id == uuid.UUID(customer_id)) where = and_(*conditions) # count count_stmt = select(func.count()).select_from(SalesLog).where(where) total = (await db.execute(count_stmt)).scalar() or 0 # data — LEFT JOIN customer + user to get names Author = aliased(SysUser) stmt = ( select( SalesLog, CrmCustomer.name.label("customer_name"), Author.real_name.label("author_name"), ) .outerjoin(CrmCustomer, SalesLog.customer_id == CrmCustomer.id) .outerjoin(Author, SalesLog.salesperson_id == Author.id) .where(where) .order_by(desc(SalesLog.created_at)) .offset((page - 1) * size) .limit(size) ) rows = (await db.execute(stmt)).all() items = [] for log, cust_name, auth_name in rows: d = _to_dict(log) d["customer_name"] = cust_name d["author_name"] = auth_name items.append(d) return { "total": total, "page": page, "size": size, "items": items, } async def update_log( db: AsyncSession, user: CurrentUserPayload, log_id: uuid.UUID, content: str | None = None, customer_id: str | None = None, contact_ids: list[str] | None = None, log_date: str | None = None, company_id: uuid.UUID | None = None, ) -> dict: """编辑销售日志 — 员工只能改自己的,管理员可改所有""" from app.models.crm import CrmCustomer from app.models.sys import SysUserCompany log = await db.get(SalesLog, log_id) if not log or log.is_deleted: raise Exception("日志不存在") # 权限检查 if user.data_scope != "all" and log.salesperson_id != user.user_id: raise Exception("您无权编辑此日志") if content is not None: log.content = content if contact_ids is not None: log.contact_ids = contact_ids if log_date is not None: log.log_date = date.fromisoformat(log_date) # 更新客户关联 + 自动重算 involved_company_ids if customer_id is not None: log.customer_id = uuid.UUID(customer_id) if customer_id else None # 重新关联公司 resolved = set(log.involved_company_ids or []) if company_id: resolved.add(company_id) if customer_id: cust = await db.get(CrmCustomer, uuid.UUID(customer_id)) if cust and cust.owner_id: stmt = select(SysUserCompany.company_id).where( SysUserCompany.user_id == cust.owner_id ) rows = (await db.execute(stmt)).scalars().all() for cid in rows: resolved.add(cid) log.involved_company_ids = list(resolved) await db.commit() await db.refresh(log) return _to_dict(log) async def delete_log( db: AsyncSession, user: CurrentUserPayload, log_id: uuid.UUID, ) -> None: """软删除销售日志 — 员工只能删自己的,管理员可删所有""" log = await db.get(SalesLog, log_id) if not log or log.is_deleted: raise Exception("日志不存在") if user.data_scope != "all" and log.salesperson_id != user.user_id: raise Exception("您无权删除此日志") log.is_deleted = True await db.commit() async def trigger_persona_workflow( log_id: uuid.UUID, customer_id: uuid.UUID, content: str, salesperson_name: str = "", contact_ids: list[str] | None = None, ) -> None: """异步触发 Dify 画像提取 Workflow(fire-and-forget)""" from app.core.config import settings if not settings.DIFY_WORKFLOW_PERSONA_KEY or not settings.DIFY_API_BASE_URL: print("[Workflow] 画像提取 Workflow 未配置,跳过") return url = f"{settings.DIFY_API_BASE_URL}/v1/workflows/run" headers = { "Authorization": f"Bearer {settings.DIFY_WORKFLOW_PERSONA_KEY}", "Content-Type": "application/json", } payload = { "inputs": { "customer_id": str(customer_id), "content": content, "salesperson_name": salesperson_name, "contact_ids": ",".join(contact_ids) if contact_ids else "", }, "response_mode": "blocking", "user": str(customer_id), } max_retries = 3 for attempt in range(max_retries): try: async with httpx.AsyncClient(timeout=300) as client: resp = await client.post(url, json=payload, headers=headers) print(f"[Workflow] 画像提取触发 status={resp.status_code}, body={resp.text[:200]}") # 成功后回写 ai_processed if resp.status_code == 200: try: from app.db.database import async_session_factory from sqlalchemy import update as sa_update async with async_session_factory() as session: await session.execute( sa_update(SalesLog) .where(SalesLog.id == log_id) .values(ai_processed=True) ) await session.commit() print(f"[Workflow] ai_processed 已更新 log_id={log_id}") except Exception as db_err: print(f"[Workflow] ai_processed 回写失败: {db_err}") return # 成功,退出重试循环 else: print(f"[Workflow] HTTP {resp.status_code},第 {attempt+1}/{max_retries} 次") except Exception as e: print(f"[Workflow] 画像提取触发失败 (第 {attempt+1}/{max_retries} 次): {e}") # 重试前等待 if attempt < max_retries - 1: import asyncio await asyncio.sleep(10 * (attempt + 1)) def _to_dict(log: SalesLog) -> dict: return { "id": str(log.id), "salesperson_id": str(log.salesperson_id), "customer_id": str(log.customer_id) if log.customer_id else None, "contact_ids": log.contact_ids or [], "involved_company_ids": [str(c) for c in (log.involved_company_ids or [])], "content": log.content, "log_date": log.log_date.isoformat() if log.log_date else None, "ai_processed": log.ai_processed, "created_at": log.created_at.isoformat() if log.created_at else None, }