""" AI 教练引擎 — 事件总线 + Dify 回调 CQRS 解耦模式: 1. 业务端 POST /api/sales-logs → 立即 200 OK → 发消息到 Redis Streams 2. Worker 消费消息 → 调用 Dify Workflow → 写回 ai_coaching_feedback 3. 前端通过 SSE /api/notifications/stream 接收推送 """ from __future__ import annotations import json import uuid from datetime import datetime from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from app.models.ai import SalesLog from app.models.crm import CrmCustomer from app.schemas.auth import CurrentUserPayload # ── Redis 事件发布 ─────────────────────────────────────── async def publish_coaching_event( sales_log_id: uuid.UUID, content: str, customer_id: uuid.UUID | None = None, salesperson_id: uuid.UUID | None = None, ) -> None: """将销售日志推送到 Redis Streams,供 Worker 异步消费""" try: import redis.asyncio as aioredis import os redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0") r = aioredis.from_url(redis_url, decode_responses=True) await r.xadd( "coaching:sales_logs", { "sales_log_id": str(sales_log_id), "content": content[:2000], # 限长 "customer_id": str(customer_id) if customer_id else "", "salesperson_id": str(salesperson_id) if salesperson_id else "", "timestamp": datetime.utcnow().isoformat(), }, ) await r.aclose() except Exception as e: # Redis 不可用时降级——不阻塞主流程 print(f"[AI EventBus] Redis 推送失败(降级): {e}") # ── Dify 回调处理 ─────────────────────────────────────── async def handle_dify_coaching_callback( db: AsyncSession, sales_log_id: uuid.UUID, feedback: dict, ) -> None: """Dify Workflow 回调 → 写回 SalesLog.ai_coaching_feedback""" await db.execute( update(SalesLog) .where(SalesLog.id == sales_log_id) .values( ai_coaching_feedback=feedback, ai_processed=True, updated_at=datetime.utcnow(), ) ) # 如果反馈中包含客户健康评分,同步更新 CrmCustomer health_score = feedback.get("health_score") meddic_status = feedback.get("meddic_status") if health_score is not None or meddic_status is not None: log = (await db.execute( select(SalesLog).where(SalesLog.id == sales_log_id) )).scalar_one_or_none() if log and log.customer_id: update_vals: dict = {} if health_score is not None: update_vals["health_score"] = float(health_score) if meddic_status is not None: update_vals["meddic_status"] = meddic_status if update_vals: await db.execute( update(CrmCustomer) .where(CrmCustomer.id == log.customer_id) .values(**update_vals) ) await db.commit() # ── SSE 通知流 ────────────────────────────────────────── async def sse_notification_generator(user_id: uuid.UUID): """服务端推送事件流(SSE)—— 监听 Redis PubSub 频道""" import asyncio try: import redis.asyncio as aioredis import os redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0") r = aioredis.from_url(redis_url, decode_responses=True) pubsub = r.pubsub() channel = f"notifications:{user_id}" await pubsub.subscribe(channel) async for message in pubsub.listen(): if message["type"] == "message": yield f"data: {message['data']}\n\n" except Exception as e: yield f"data: {json.dumps({'error': str(e)})}\n\n"