Files
hankin 815cbf9d8c v0.2.0: CRM/ERP 系统升级 - 清理 .gitignore 并移除误提交的 venv/env/db 文件
- 更新 .gitignore:全面覆盖环境变量、数据库、日志、缓存、上传文件
- 移除误跟踪的 server/venv/、crm_data.db、.env 文件
- 新增 server/.env.example 模板
- 新增合同管理、利润核算、AI教练等功能模块
- 新增 Playwright e2e 测试套件
- 前后端多项功能升级和 bug 修复
2026-05-11 07:24:19 +00:00

259 lines
8.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
销售日志服务 — CRUD + Dify 工作流异步触发
"""
from __future__ import annotations
import uuid
from datetime import date
from typing import Any
import httpx
from sqlalchemy import select, func, desc, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ai import SalesLog
from app.schemas.auth import CurrentUserPayload
async def create_log(
db: AsyncSession,
user: CurrentUserPayload,
content: str,
customer_id: str | None = None,
contact_ids: list[str] | None = None,
log_date: date | None = None,
company_ids: list[uuid.UUID] | None = None,
) -> dict:
"""创建销售日志"""
log = SalesLog(
salesperson_id=user.user_id,
customer_id=uuid.UUID(customer_id) if customer_id else None,
contact_ids=contact_ids or [],
content=content,
log_date=log_date or date.today(),
involved_company_ids=company_ids or [],
)
db.add(log)
await db.commit()
await db.refresh(log)
return _to_dict(log)
async def list_logs(
db: AsyncSession,
user: CurrentUserPayload,
page: int = 1,
size: int = 20,
customer_id: str | None = None,
user_id: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
company_id: uuid.UUID | None = None,
) -> dict:
"""查询销售日志列表(按 involved_company_ids 包含过滤)"""
from sqlalchemy.orm import aliased
from app.models.crm import CrmCustomer
from app.models.sys import SysUser
conditions = [SalesLog.is_deleted.is_(False)]
if company_id:
# ARRAY contains: 过滤涉及当前公司的日志
conditions.append(SalesLog.involved_company_ids.any(company_id))
# 数据权限
if user.data_scope == "self":
conditions.append(SalesLog.salesperson_id == user.user_id)
elif user_id:
conditions.append(SalesLog.salesperson_id == uuid.UUID(user_id))
if start_date:
conditions.append(SalesLog.log_date >= start_date)
if end_date:
conditions.append(SalesLog.log_date <= end_date)
if customer_id:
conditions.append(SalesLog.customer_id == uuid.UUID(customer_id))
where = and_(*conditions)
# count
count_stmt = select(func.count()).select_from(SalesLog).where(where)
total = (await db.execute(count_stmt)).scalar() or 0
# data — LEFT JOIN customer + user to get names
Author = aliased(SysUser)
stmt = (
select(
SalesLog,
CrmCustomer.name.label("customer_name"),
Author.real_name.label("author_name"),
)
.outerjoin(CrmCustomer, SalesLog.customer_id == CrmCustomer.id)
.outerjoin(Author, SalesLog.salesperson_id == Author.id)
.where(where)
.order_by(desc(SalesLog.created_at))
.offset((page - 1) * size)
.limit(size)
)
rows = (await db.execute(stmt)).all()
items = []
for log, cust_name, auth_name in rows:
d = _to_dict(log)
d["customer_name"] = cust_name
d["author_name"] = auth_name
items.append(d)
return {
"total": total,
"page": page,
"size": size,
"items": items,
}
async def update_log(
db: AsyncSession,
user: CurrentUserPayload,
log_id: uuid.UUID,
content: str | None = None,
customer_id: str | None = None,
contact_ids: list[str] | None = None,
log_date: str | None = None,
company_id: uuid.UUID | None = None,
) -> dict:
"""编辑销售日志 — 员工只能改自己的,管理员可改所有"""
from app.models.crm import CrmCustomer
from app.models.sys import SysUserCompany
log = await db.get(SalesLog, log_id)
if not log or log.is_deleted:
raise Exception("日志不存在")
# 权限检查
if user.data_scope != "all" and log.salesperson_id != user.user_id:
raise Exception("您无权编辑此日志")
if content is not None:
log.content = content
if contact_ids is not None:
log.contact_ids = contact_ids
if log_date is not None:
log.log_date = date.fromisoformat(log_date)
# 更新客户关联 + 自动重算 involved_company_ids
if customer_id is not None:
log.customer_id = uuid.UUID(customer_id) if customer_id else None
# 重新关联公司
resolved = set(log.involved_company_ids or [])
if company_id:
resolved.add(company_id)
if customer_id:
cust = await db.get(CrmCustomer, uuid.UUID(customer_id))
if cust and cust.owner_id:
stmt = select(SysUserCompany.company_id).where(
SysUserCompany.user_id == cust.owner_id
)
rows = (await db.execute(stmt)).scalars().all()
for cid in rows:
resolved.add(cid)
log.involved_company_ids = list(resolved)
await db.commit()
await db.refresh(log)
return _to_dict(log)
async def delete_log(
db: AsyncSession,
user: CurrentUserPayload,
log_id: uuid.UUID,
) -> None:
"""软删除销售日志 — 员工只能删自己的,管理员可删所有"""
log = await db.get(SalesLog, log_id)
if not log or log.is_deleted:
raise Exception("日志不存在")
if user.data_scope != "all" and log.salesperson_id != user.user_id:
raise Exception("您无权删除此日志")
log.is_deleted = True
await db.commit()
async def trigger_persona_workflow(
log_id: uuid.UUID,
customer_id: uuid.UUID,
content: str,
salesperson_name: str = "",
contact_ids: list[str] | None = None,
) -> None:
"""异步触发 Dify 画像提取 Workflowfire-and-forget"""
from app.core.config import settings
if not settings.DIFY_WORKFLOW_PERSONA_KEY or not settings.DIFY_API_BASE_URL:
print("[Workflow] 画像提取 Workflow 未配置,跳过")
return
url = f"{settings.DIFY_API_BASE_URL}/v1/workflows/run"
headers = {
"Authorization": f"Bearer {settings.DIFY_WORKFLOW_PERSONA_KEY}",
"Content-Type": "application/json",
}
payload = {
"inputs": {
"customer_id": str(customer_id),
"content": content,
"salesperson_name": salesperson_name,
"contact_ids": ",".join(contact_ids) if contact_ids else "",
},
"response_mode": "blocking",
"user": str(customer_id),
}
max_retries = 3
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=300) as client:
resp = await client.post(url, json=payload, headers=headers)
print(f"[Workflow] 画像提取触发 status={resp.status_code}, body={resp.text[:200]}")
# 成功后回写 ai_processed
if resp.status_code == 200:
try:
from app.db.database import async_session_factory
from sqlalchemy import update as sa_update
async with async_session_factory() as session:
await session.execute(
sa_update(SalesLog)
.where(SalesLog.id == log_id)
.values(ai_processed=True)
)
await session.commit()
print(f"[Workflow] ai_processed 已更新 log_id={log_id}")
except Exception as db_err:
print(f"[Workflow] ai_processed 回写失败: {db_err}")
return # 成功,退出重试循环
else:
print(f"[Workflow] HTTP {resp.status_code},第 {attempt+1}/{max_retries}")
except Exception as e:
print(f"[Workflow] 画像提取触发失败 (第 {attempt+1}/{max_retries} 次): {e}")
# 重试前等待
if attempt < max_retries - 1:
import asyncio
await asyncio.sleep(10 * (attempt + 1))
def _to_dict(log: SalesLog) -> dict:
return {
"id": str(log.id),
"salesperson_id": str(log.salesperson_id),
"customer_id": str(log.customer_id) if log.customer_id else None,
"contact_ids": log.contact_ids or [],
"involved_company_ids": [str(c) for c in (log.involved_company_ids or [])],
"content": log.content,
"log_date": log.log_date.isoformat() if log.log_date else None,
"ai_processed": log.ai_processed,
"created_at": log.created_at.isoformat() if log.created_at else None,
}