""" Dify 专用工具路由 —— /api/dify/tools 每个 MCP 工具对应一个独立 REST 端点,方便 Dify 通过 OpenAPI Schema 自动发现 注意:这些端点由 Dify Agent 内部调用,使用 API Key 认证而非 JWT """ from __future__ import annotations import uuid from typing import Any from fastapi import APIRouter, Depends, Header, HTTPException from pydantic import BaseModel, Field from sqlalchemy.ext.asyncio import AsyncSession from app.db.database import get_db from app.schemas.auth import CurrentUserPayload from app.schemas.response import ok from app.mcp.registry import execute_tool import app.mcp.tools # noqa: F401 from app.core.action_card_queue import push_card router = APIRouter(prefix="/dify/tools", tags=["Dify 工具接口"]) # ── Dify 工具专用认证:跳过 JWT,使用 Dify API Key 或直接放行 ── async def get_dify_user( authorization: str = Header("", description="可选的 Bearer "), db: AsyncSession = Depends(get_db), ) -> CurrentUserPayload: """ Dify 调用工具时不携带 JWT,这里提供一个管理员级上下文。 生产环境应增加 API Key 校验。 """ # TODO: 生产环境加 Dify API Secret 校验 return CurrentUserPayload( user_id=uuid.UUID("c0000000-0000-0000-0000-000000000001"), # admin username="admin", real_name="系统管理员", dept_id=uuid.UUID("a0000000-0000-0000-0000-000000000001"), dept_name="总部", role_id=uuid.UUID("b0000000-0000-0000-0000-000000000001"), role_name="超级管理员", data_scope="all", menu_keys=[], ) # ── 1. 搜索客户 ────────────────────────────────────────── class SearchCustomersInput(BaseModel): keyword: str | None = Field(None, description="客户名称关键词") level: str | None = Field(None, description="客户等级: A / B / C") page: int = Field(1, description="页码") size: int = Field(10, description="每页数量") @router.post("/search_customers", summary="搜索客户列表,支持按名称模糊搜索和等级过滤") async def dify_search_customers( body: SearchCustomersInput, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_dify_user), ) -> dict: result = await execute_tool("search_customers", db, current_user, body.model_dump(exclude_none=True)) return ok(data={"response_type": result.response_type, "result": result.data, "message": result.message}) # ── 2. 创建客户 ────────────────────────────────────────── class CreateCustomerInput(BaseModel): name: str = Field(..., description="客户名称") level: str = Field("C", description="客户等级: A / B / C") contact: str | None = Field(None, description="联系人") phone: str | None = Field(None, description="电话") @router.post("/create_customer", summary="创建新客户(返回确认卡片,需用户在前端确认后执行)") async def dify_create_customer( body: CreateCustomerInput, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_dify_user), ) -> dict: result = await execute_tool("create_customer", db, current_user, body.model_dump(exclude_none=True)) # action_card 结果推入共享队列,供 SSE 生成器注入前端 if result.response_type == "action_card" and isinstance(result.data, dict): await push_card(result.data) return ok(data={"response_type": result.response_type, "result": result.data, "message": result.message}) # ── 3. 查询客户专属报价 ────────────────────────────────── class CalculatePriceInput(BaseModel): customer_id: str = Field(..., description="客户 UUID") sku_id: str = Field(..., description="产品 SKU UUID") @router.post("/calculate_price", summary="查询客户专属报价:历史成交价追溯,无历史则标准价兜底") async def dify_calculate_price( body: CalculatePriceInput, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_dify_user), ) -> dict: result = await execute_tool("calculate_price", db, current_user, body.model_dump()) return ok(data={"response_type": result.response_type, "result": result.data, "message": result.message}) # ── 4. 创建销售订单 ────────────────────────────────────── class OrderItemInput(BaseModel): sku_id: str = Field(..., description="产品 SKU UUID") qty: float = Field(..., description="数量") unit_price: float = Field(..., description="单价") class CreateOrderInput(BaseModel): customer_id: str = Field(..., description="客户 UUID") items: list[OrderItemInput] = Field(..., description="订单明细行列表") remark: str | None = Field(None, description="备注") @router.post("/create_order", summary="创建销售订单(返回确认卡片,需用户确认后执行)") async def dify_create_order( body: CreateOrderInput, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_dify_user), ) -> dict: params = body.model_dump() params["items"] = [i.model_dump() for i in body.items] result = await execute_tool("create_order", db, current_user, params) if result.response_type == "action_card" and isinstance(result.data, dict): await push_card(result.data) return ok(data={"response_type": result.response_type, "result": result.data, "message": result.message}) # ── 5. 搜索订单 ────────────────────────────────────────── class SearchOrdersInput(BaseModel): keyword: str | None = Field(None, description="客户名称关键词") order_no: str | None = Field(None, description="订单号模糊搜索") shipping_state: str | None = Field(None, description="发货状态: pending / partial / shipped") payment_state: str | None = Field(None, description="付款状态: unpaid / partial / cleared") page: int = Field(1, description="页码") size: int = Field(10, description="每页数量") @router.post("/search_orders", summary="搜索订单列表,支持按客户名称、订单号、发货/付款状态筛选") async def dify_search_orders( body: SearchOrdersInput, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_dify_user), ) -> dict: result = await execute_tool("search_orders", db, current_user, body.model_dump(exclude_none=True)) return ok(data={"response_type": result.response_type, "result": result.data, "message": result.message}) # ── 6. 双轨画像回写 (V5.0) ────────────────────────────── class ContactUpdate(BaseModel): contact_id: str = Field(..., description="联系人 UUID") role: dict | None = Field(None, description="决策角色更新 {decision_role, authority_level}") kpi: dict | None = Field(None, description="KPI 更新 {core_goals, pain_points}") preference: dict | None = Field(None, description="偏好更新 {comm_style, meeting_preference, topics_of_interest}") class UpdatePersonaInput(BaseModel): customer_id: str = Field(..., description="客户 UUID") company_updates: dict | None = Field(None, description="企业级画像增量 {firmographics, dynamic_status}") contact_updates: list[ContactUpdate] | None = Field(None, description="联系人级画像增量列表") @router.post("/update_persona", summary="双轨画像回写:分别更新企业画像与联系人画像 (V5.0)") async def dify_update_persona( body: UpdatePersonaInput, db: AsyncSession = Depends(get_db), _: CurrentUserPayload = Depends(get_dify_user), ) -> dict: """Dify Agent 画像提取 Workflow 调用此工具,分类写入企业/联系人画像""" import httpx from app.core.config import settings # 内部转调 PUT /api/customers/{id}/persona payload: dict[str, Any] = {} if body.company_updates: payload["company_updates"] = body.company_updates if body.contact_updates: payload["contact_updates"] = [cu.model_dump(exclude_none=True) for cu in body.contact_updates] from app.models.crm import CrmCustomer, CrmContact from app.api.customers import _deep_merge from sqlalchemy import update as sa_update cid = uuid.UUID(body.customer_id) if body.company_updates: customer = await db.get(CrmCustomer, cid) if customer: merged = _deep_merge(customer.ai_persona or {}, body.company_updates) stmt = sa_update(CrmCustomer).where(CrmCustomer.id == cid).values(ai_persona=merged) await db.execute(stmt) if body.contact_updates: for cu in body.contact_updates: try: contact = await db.get(CrmContact, uuid.UUID(cu.contact_id)) except (ValueError, TypeError): continue if not contact: continue updates = cu.model_dump(exclude_none=True, exclude={"contact_id"}) merged = _deep_merge(contact.ai_buyer_persona or {}, updates) contact.ai_buyer_persona = merged await db.commit() return ok(message="双轨画像回写成功")