815cbf9d8c
- 更新 .gitignore:全面覆盖环境变量、数据库、日志、缓存、上传文件 - 移除误跟踪的 server/venv/、crm_data.db、.env 文件 - 新增 server/.env.example 模板 - 新增合同管理、利润核算、AI教练等功能模块 - 新增 Playwright e2e 测试套件 - 前后端多项功能升级和 bug 修复
259 lines
8.5 KiB
Python
259 lines
8.5 KiB
Python
"""
|
||
销售日志服务 — CRUD + Dify 工作流异步触发
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import uuid
|
||
from datetime import date
|
||
from typing import Any
|
||
|
||
import httpx
|
||
from sqlalchemy import select, func, desc, and_
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.models.ai import SalesLog
|
||
from app.schemas.auth import CurrentUserPayload
|
||
|
||
|
||
async def create_log(
|
||
db: AsyncSession,
|
||
user: CurrentUserPayload,
|
||
content: str,
|
||
customer_id: str | None = None,
|
||
contact_ids: list[str] | None = None,
|
||
log_date: date | None = None,
|
||
company_ids: list[uuid.UUID] | None = None,
|
||
) -> dict:
|
||
"""创建销售日志"""
|
||
log = SalesLog(
|
||
salesperson_id=user.user_id,
|
||
customer_id=uuid.UUID(customer_id) if customer_id else None,
|
||
contact_ids=contact_ids or [],
|
||
content=content,
|
||
log_date=log_date or date.today(),
|
||
involved_company_ids=company_ids or [],
|
||
)
|
||
db.add(log)
|
||
await db.commit()
|
||
await db.refresh(log)
|
||
return _to_dict(log)
|
||
|
||
|
||
async def list_logs(
|
||
db: AsyncSession,
|
||
user: CurrentUserPayload,
|
||
page: int = 1,
|
||
size: int = 20,
|
||
customer_id: str | None = None,
|
||
user_id: str | None = None,
|
||
start_date: str | None = None,
|
||
end_date: str | None = None,
|
||
company_id: uuid.UUID | None = None,
|
||
) -> dict:
|
||
"""查询销售日志列表(按 involved_company_ids 包含过滤)"""
|
||
from sqlalchemy.orm import aliased
|
||
from app.models.crm import CrmCustomer
|
||
from app.models.sys import SysUser
|
||
|
||
conditions = [SalesLog.is_deleted.is_(False)]
|
||
if company_id:
|
||
# ARRAY contains: 过滤涉及当前公司的日志
|
||
conditions.append(SalesLog.involved_company_ids.any(company_id))
|
||
|
||
# 数据权限
|
||
if user.data_scope == "self":
|
||
conditions.append(SalesLog.salesperson_id == user.user_id)
|
||
elif user_id:
|
||
conditions.append(SalesLog.salesperson_id == uuid.UUID(user_id))
|
||
|
||
if start_date:
|
||
conditions.append(SalesLog.log_date >= start_date)
|
||
if end_date:
|
||
conditions.append(SalesLog.log_date <= end_date)
|
||
if customer_id:
|
||
conditions.append(SalesLog.customer_id == uuid.UUID(customer_id))
|
||
|
||
where = and_(*conditions)
|
||
|
||
# count
|
||
count_stmt = select(func.count()).select_from(SalesLog).where(where)
|
||
total = (await db.execute(count_stmt)).scalar() or 0
|
||
|
||
# data — LEFT JOIN customer + user to get names
|
||
Author = aliased(SysUser)
|
||
stmt = (
|
||
select(
|
||
SalesLog,
|
||
CrmCustomer.name.label("customer_name"),
|
||
Author.real_name.label("author_name"),
|
||
)
|
||
.outerjoin(CrmCustomer, SalesLog.customer_id == CrmCustomer.id)
|
||
.outerjoin(Author, SalesLog.salesperson_id == Author.id)
|
||
.where(where)
|
||
.order_by(desc(SalesLog.created_at))
|
||
.offset((page - 1) * size)
|
||
.limit(size)
|
||
)
|
||
rows = (await db.execute(stmt)).all()
|
||
|
||
items = []
|
||
for log, cust_name, auth_name in rows:
|
||
d = _to_dict(log)
|
||
d["customer_name"] = cust_name
|
||
d["author_name"] = auth_name
|
||
items.append(d)
|
||
|
||
return {
|
||
"total": total,
|
||
"page": page,
|
||
"size": size,
|
||
"items": items,
|
||
}
|
||
|
||
|
||
async def update_log(
|
||
db: AsyncSession,
|
||
user: CurrentUserPayload,
|
||
log_id: uuid.UUID,
|
||
content: str | None = None,
|
||
customer_id: str | None = None,
|
||
contact_ids: list[str] | None = None,
|
||
log_date: str | None = None,
|
||
company_id: uuid.UUID | None = None,
|
||
) -> dict:
|
||
"""编辑销售日志 — 员工只能改自己的,管理员可改所有"""
|
||
from app.models.crm import CrmCustomer
|
||
from app.models.sys import SysUserCompany
|
||
|
||
log = await db.get(SalesLog, log_id)
|
||
if not log or log.is_deleted:
|
||
raise Exception("日志不存在")
|
||
|
||
# 权限检查
|
||
if user.data_scope != "all" and log.salesperson_id != user.user_id:
|
||
raise Exception("您无权编辑此日志")
|
||
|
||
if content is not None:
|
||
log.content = content
|
||
if contact_ids is not None:
|
||
log.contact_ids = contact_ids
|
||
if log_date is not None:
|
||
log.log_date = date.fromisoformat(log_date)
|
||
|
||
# 更新客户关联 + 自动重算 involved_company_ids
|
||
if customer_id is not None:
|
||
log.customer_id = uuid.UUID(customer_id) if customer_id else None
|
||
# 重新关联公司
|
||
resolved = set(log.involved_company_ids or [])
|
||
if company_id:
|
||
resolved.add(company_id)
|
||
if customer_id:
|
||
cust = await db.get(CrmCustomer, uuid.UUID(customer_id))
|
||
if cust and cust.owner_id:
|
||
stmt = select(SysUserCompany.company_id).where(
|
||
SysUserCompany.user_id == cust.owner_id
|
||
)
|
||
rows = (await db.execute(stmt)).scalars().all()
|
||
for cid in rows:
|
||
resolved.add(cid)
|
||
log.involved_company_ids = list(resolved)
|
||
|
||
await db.commit()
|
||
await db.refresh(log)
|
||
return _to_dict(log)
|
||
|
||
|
||
async def delete_log(
|
||
db: AsyncSession,
|
||
user: CurrentUserPayload,
|
||
log_id: uuid.UUID,
|
||
) -> None:
|
||
"""软删除销售日志 — 员工只能删自己的,管理员可删所有"""
|
||
log = await db.get(SalesLog, log_id)
|
||
if not log or log.is_deleted:
|
||
raise Exception("日志不存在")
|
||
|
||
if user.data_scope != "all" and log.salesperson_id != user.user_id:
|
||
raise Exception("您无权删除此日志")
|
||
|
||
log.is_deleted = True
|
||
await db.commit()
|
||
|
||
|
||
async def trigger_persona_workflow(
|
||
log_id: uuid.UUID,
|
||
customer_id: uuid.UUID,
|
||
content: str,
|
||
salesperson_name: str = "",
|
||
contact_ids: list[str] | None = None,
|
||
) -> None:
|
||
"""异步触发 Dify 画像提取 Workflow(fire-and-forget)"""
|
||
from app.core.config import settings
|
||
|
||
if not settings.DIFY_WORKFLOW_PERSONA_KEY or not settings.DIFY_API_BASE_URL:
|
||
print("[Workflow] 画像提取 Workflow 未配置,跳过")
|
||
return
|
||
|
||
url = f"{settings.DIFY_API_BASE_URL}/v1/workflows/run"
|
||
headers = {
|
||
"Authorization": f"Bearer {settings.DIFY_WORKFLOW_PERSONA_KEY}",
|
||
"Content-Type": "application/json",
|
||
}
|
||
payload = {
|
||
"inputs": {
|
||
"customer_id": str(customer_id),
|
||
"content": content,
|
||
"salesperson_name": salesperson_name,
|
||
"contact_ids": ",".join(contact_ids) if contact_ids else "",
|
||
},
|
||
"response_mode": "blocking",
|
||
"user": str(customer_id),
|
||
}
|
||
|
||
max_retries = 3
|
||
for attempt in range(max_retries):
|
||
try:
|
||
async with httpx.AsyncClient(timeout=300) as client:
|
||
resp = await client.post(url, json=payload, headers=headers)
|
||
print(f"[Workflow] 画像提取触发 status={resp.status_code}, body={resp.text[:200]}")
|
||
|
||
# 成功后回写 ai_processed
|
||
if resp.status_code == 200:
|
||
try:
|
||
from app.db.database import async_session_factory
|
||
from sqlalchemy import update as sa_update
|
||
async with async_session_factory() as session:
|
||
await session.execute(
|
||
sa_update(SalesLog)
|
||
.where(SalesLog.id == log_id)
|
||
.values(ai_processed=True)
|
||
)
|
||
await session.commit()
|
||
print(f"[Workflow] ai_processed 已更新 log_id={log_id}")
|
||
except Exception as db_err:
|
||
print(f"[Workflow] ai_processed 回写失败: {db_err}")
|
||
return # 成功,退出重试循环
|
||
else:
|
||
print(f"[Workflow] HTTP {resp.status_code},第 {attempt+1}/{max_retries} 次")
|
||
except Exception as e:
|
||
print(f"[Workflow] 画像提取触发失败 (第 {attempt+1}/{max_retries} 次): {e}")
|
||
|
||
# 重试前等待
|
||
if attempt < max_retries - 1:
|
||
import asyncio
|
||
await asyncio.sleep(10 * (attempt + 1))
|
||
|
||
|
||
def _to_dict(log: SalesLog) -> dict:
|
||
return {
|
||
"id": str(log.id),
|
||
"salesperson_id": str(log.salesperson_id),
|
||
"customer_id": str(log.customer_id) if log.customer_id else None,
|
||
"contact_ids": log.contact_ids or [],
|
||
"involved_company_ids": [str(c) for c in (log.involved_company_ids or [])],
|
||
"content": log.content,
|
||
"log_date": log.log_date.isoformat() if log.log_date else None,
|
||
"ai_processed": log.ai_processed,
|
||
"created_at": log.created_at.isoformat() if log.created_at else None,
|
||
}
|