423baff73b
- Docker bridge 网络隔离(8000 端口封死) - Gunicorn 4 Worker 多进程 - Alembic 数据库迁移基线 - 日志轮转 20m×3 - JWT 密钥 + DB 密码 + CORS 收紧 - 3-2-1 备份链路(NAS + R740-B 冷备) - 连接池 pool_pre_ping + pool_recycle=3600
206 lines
9.4 KiB
Python
206 lines
9.4 KiB
Python
"""
|
|
Dify 专用工具路由 —— /api/dify/tools
|
|
每个 MCP 工具对应一个独立 REST 端点,方便 Dify 通过 OpenAPI Schema 自动发现
|
|
注意:这些端点由 Dify Agent 内部调用,使用 API Key 认证而非 JWT
|
|
"""
|
|
from __future__ import annotations
|
|
import uuid
|
|
from typing import Any
|
|
from fastapi import APIRouter, Depends, Header, HTTPException
|
|
from pydantic import BaseModel, Field
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from app.db.database import get_db
|
|
from app.schemas.auth import CurrentUserPayload
|
|
from app.schemas.response import ok
|
|
from app.mcp.registry import execute_tool
|
|
import app.mcp.tools # noqa: F401
|
|
from app.core.action_card_queue import push_card
|
|
|
|
router = APIRouter(prefix="/dify/tools", tags=["Dify 工具接口"])
|
|
|
|
|
|
# ── Dify 工具专用认证:跳过 JWT,使用 Dify API Key 或直接放行 ──
|
|
async def get_dify_user(
|
|
authorization: str = Header("", description="可选的 Bearer <token>"),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> CurrentUserPayload:
|
|
"""
|
|
Dify 调用工具时不携带 JWT,这里提供一个管理员级上下文。
|
|
生产环境应增加 API Key 校验。
|
|
"""
|
|
# TODO: 生产环境加 Dify API Secret 校验
|
|
return CurrentUserPayload(
|
|
user_id=uuid.UUID("c0000000-0000-0000-0000-000000000001"), # admin
|
|
username="admin",
|
|
real_name="系统管理员",
|
|
dept_id=uuid.UUID("a0000000-0000-0000-0000-000000000001"),
|
|
dept_name="总部",
|
|
role_id=uuid.UUID("b0000000-0000-0000-0000-000000000001"),
|
|
role_name="超级管理员",
|
|
data_scope="all",
|
|
menu_keys=[],
|
|
)
|
|
|
|
|
|
# ── 1. 搜索客户 ──────────────────────────────────────────
|
|
class SearchCustomersInput(BaseModel):
|
|
keyword: str | None = Field(None, description="客户名称关键词")
|
|
level: str | None = Field(None, description="客户等级: A / B / C")
|
|
page: int = Field(1, description="页码")
|
|
size: int = Field(10, description="每页数量")
|
|
|
|
|
|
@router.post("/search_customers", summary="搜索客户列表,支持按名称模糊搜索和等级过滤")
|
|
async def dify_search_customers(
|
|
body: SearchCustomersInput,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: CurrentUserPayload = Depends(get_dify_user),
|
|
) -> dict:
|
|
result = await execute_tool("search_customers", db, current_user, body.model_dump(exclude_none=True))
|
|
return ok(data={"response_type": result.response_type, "result": result.data, "message": result.message})
|
|
|
|
|
|
# ── 2. 创建客户 ──────────────────────────────────────────
|
|
class CreateCustomerInput(BaseModel):
|
|
name: str = Field(..., description="客户名称")
|
|
level: str = Field("C", description="客户等级: A / B / C")
|
|
contact: str | None = Field(None, description="联系人")
|
|
phone: str | None = Field(None, description="电话")
|
|
|
|
|
|
@router.post("/create_customer", summary="创建新客户(返回确认卡片,需用户在前端确认后执行)")
|
|
async def dify_create_customer(
|
|
body: CreateCustomerInput,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: CurrentUserPayload = Depends(get_dify_user),
|
|
) -> dict:
|
|
result = await execute_tool("create_customer", db, current_user, body.model_dump(exclude_none=True))
|
|
# action_card 结果推入共享队列,供 SSE 生成器注入前端
|
|
if result.response_type == "action_card" and isinstance(result.data, dict):
|
|
await push_card(result.data)
|
|
return ok(data={"response_type": result.response_type, "result": result.data, "message": result.message})
|
|
|
|
|
|
# ── 3. 查询客户专属报价 ──────────────────────────────────
|
|
class CalculatePriceInput(BaseModel):
|
|
customer_id: str = Field(..., description="客户 UUID")
|
|
sku_id: str = Field(..., description="产品 SKU UUID")
|
|
|
|
|
|
@router.post("/calculate_price", summary="查询客户专属报价:历史成交价追溯,无历史则标准价兜底")
|
|
async def dify_calculate_price(
|
|
body: CalculatePriceInput,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: CurrentUserPayload = Depends(get_dify_user),
|
|
) -> dict:
|
|
result = await execute_tool("calculate_price", db, current_user, body.model_dump())
|
|
return ok(data={"response_type": result.response_type, "result": result.data, "message": result.message})
|
|
|
|
|
|
# ── 4. 创建销售订单 ──────────────────────────────────────
|
|
class OrderItemInput(BaseModel):
|
|
sku_id: str = Field(..., description="产品 SKU UUID")
|
|
qty: float = Field(..., description="数量")
|
|
unit_price: float = Field(..., description="单价")
|
|
|
|
|
|
class CreateOrderInput(BaseModel):
|
|
customer_id: str = Field(..., description="客户 UUID")
|
|
items: list[OrderItemInput] = Field(..., description="订单明细行列表")
|
|
remark: str | None = Field(None, description="备注")
|
|
|
|
|
|
@router.post("/create_order", summary="创建销售订单(返回确认卡片,需用户确认后执行)")
|
|
async def dify_create_order(
|
|
body: CreateOrderInput,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: CurrentUserPayload = Depends(get_dify_user),
|
|
) -> dict:
|
|
params = body.model_dump()
|
|
params["items"] = [i.model_dump() for i in body.items]
|
|
result = await execute_tool("create_order", db, current_user, params)
|
|
if result.response_type == "action_card" and isinstance(result.data, dict):
|
|
await push_card(result.data)
|
|
return ok(data={"response_type": result.response_type, "result": result.data, "message": result.message})
|
|
|
|
|
|
# ── 5. 搜索订单 ──────────────────────────────────────────
|
|
class SearchOrdersInput(BaseModel):
|
|
keyword: str | None = Field(None, description="客户名称关键词")
|
|
order_no: str | None = Field(None, description="订单号模糊搜索")
|
|
shipping_state: str | None = Field(None, description="发货状态: pending / partial / shipped")
|
|
payment_state: str | None = Field(None, description="付款状态: unpaid / partial / cleared")
|
|
page: int = Field(1, description="页码")
|
|
size: int = Field(10, description="每页数量")
|
|
|
|
|
|
@router.post("/search_orders", summary="搜索订单列表,支持按客户名称、订单号、发货/付款状态筛选")
|
|
async def dify_search_orders(
|
|
body: SearchOrdersInput,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: CurrentUserPayload = Depends(get_dify_user),
|
|
) -> dict:
|
|
result = await execute_tool("search_orders", db, current_user, body.model_dump(exclude_none=True))
|
|
return ok(data={"response_type": result.response_type, "result": result.data, "message": result.message})
|
|
|
|
|
|
# ── 6. 双轨画像回写 (V5.0) ──────────────────────────────
|
|
class ContactUpdate(BaseModel):
|
|
contact_id: str = Field(..., description="联系人 UUID")
|
|
role: dict | None = Field(None, description="决策角色更新 {decision_role, authority_level}")
|
|
kpi: dict | None = Field(None, description="KPI 更新 {core_goals, pain_points}")
|
|
preference: dict | None = Field(None, description="偏好更新 {comm_style, meeting_preference, topics_of_interest}")
|
|
|
|
|
|
class UpdatePersonaInput(BaseModel):
|
|
customer_id: str = Field(..., description="客户 UUID")
|
|
company_updates: dict | None = Field(None, description="企业级画像增量 {firmographics, dynamic_status}")
|
|
contact_updates: list[ContactUpdate] | None = Field(None, description="联系人级画像增量列表")
|
|
|
|
|
|
@router.post("/update_persona", summary="双轨画像回写:分别更新企业画像与联系人画像 (V5.0)")
|
|
async def dify_update_persona(
|
|
body: UpdatePersonaInput,
|
|
db: AsyncSession = Depends(get_db),
|
|
_: CurrentUserPayload = Depends(get_dify_user),
|
|
) -> dict:
|
|
"""Dify Agent 画像提取 Workflow 调用此工具,分类写入企业/联系人画像"""
|
|
import httpx
|
|
from app.core.config import settings
|
|
|
|
# 内部转调 PUT /api/customers/{id}/persona
|
|
payload: dict[str, Any] = {}
|
|
if body.company_updates:
|
|
payload["company_updates"] = body.company_updates
|
|
if body.contact_updates:
|
|
payload["contact_updates"] = [cu.model_dump(exclude_none=True) for cu in body.contact_updates]
|
|
|
|
from app.models.crm import CrmCustomer, CrmContact
|
|
from app.api.customers import _deep_merge
|
|
from sqlalchemy import update as sa_update
|
|
|
|
cid = uuid.UUID(body.customer_id)
|
|
|
|
if body.company_updates:
|
|
customer = await db.get(CrmCustomer, cid)
|
|
if customer:
|
|
merged = _deep_merge(customer.ai_persona or {}, body.company_updates)
|
|
stmt = sa_update(CrmCustomer).where(CrmCustomer.id == cid).values(ai_persona=merged)
|
|
await db.execute(stmt)
|
|
|
|
if body.contact_updates:
|
|
for cu in body.contact_updates:
|
|
try:
|
|
contact = await db.get(CrmContact, uuid.UUID(cu.contact_id))
|
|
except (ValueError, TypeError):
|
|
continue
|
|
if not contact:
|
|
continue
|
|
updates = cu.model_dump(exclude_none=True, exclude={"contact_id"})
|
|
merged = _deep_merge(contact.ai_buyer_persona or {}, updates)
|
|
contact.ai_buyer_persona = merged
|
|
|
|
await db.commit()
|
|
return ok(message="双轨画像回写成功")
|
|
|