# -*- coding: utf-8 -*- """ 销售数据分析服务 (Dify BaaS 版本) 基于 SQL 预聚合的销售漏斗统计 + Dify AI 驱动的复盘报告生成。 """ import logging from datetime import datetime, timezone from sqlalchemy import func, select, extract, case from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.dify_client import dify_client from app.models.crm_business import SalesOpportunity logger = logging.getLogger("analytics") # ============================================================ # 1. SQL 预聚合:销售漏斗指标 # ============================================================ async def get_sales_metrics(db: AsyncSession) -> list[dict]: """ 按当月统计各销售阶段的机会数量和总金额。 使用 SQLAlchemy GROUP BY 聚合查询,在数据库层面完成计算。 """ now = datetime.now(timezone.utc) stmt = ( select( SalesOpportunity.stage, func.count(SalesOpportunity.id).label("count"), func.coalesce(func.sum(SalesOpportunity.amount), 0).label("total_amount"), ) .where( extract("year", SalesOpportunity.created_at) == now.year, extract("month", SalesOpportunity.created_at) == now.month, ) .group_by(SalesOpportunity.stage) .order_by( case( (SalesOpportunity.stage == "意向", 1), (SalesOpportunity.stage == "谈判", 2), (SalesOpportunity.stage == "成交", 3), (SalesOpportunity.stage == "流失", 4), else_=5, ) ) ) result = await db.execute(stmt) rows = result.all() metrics = [ { "stage": row.stage, "count": row.count, "total_amount": float(row.total_amount), } for row in rows ] logger.info("当月销售指标: %s", metrics) return metrics # ============================================================ # 2. Dify AI 复盘报告生成 # ============================================================ async def generate_monthly_report(db: AsyncSession) -> dict: """ 生成当月销售复盘报告: 1. SQL 预聚合获取真实数据指标 2. 将结构化数据通过 inputs 传给 Dify 报告 App 3. 直接返回 Dify 生成的报告文本 :return: {"metrics": [...], "report": "Dify 生成的报告文本"} """ # Step 1: 获取真实销售数据 metrics = await get_sales_metrics(db) if not metrics: return { "metrics": [], "report": "当月暂无销售机会数据,无法生成复盘报告。", } # Step 2: 将数据序列化为 Dify 可消费的文本格式 # *** inputs 的键名必须与 Dify 后台配置的变量名对齐 *** # 在 Dify 后台的报告 App 编排中,需定义输入变量 "metrics_data" metrics_text = "\n".join( f"- {m['stage']}: {m['count']} 个机会, 总金额 ¥{m['total_amount']:,.2f}" for m in metrics ) total_count = sum(m["count"] for m in metrics) total_amount = sum(m["total_amount"] for m in metrics) metrics_text += f"\n- 合计: {total_count} 个机会, 总金额 ¥{total_amount:,.2f}" # Step 3: 调用 Dify 报告 App if not settings.DIFY_REPORT_APP_API_KEY: logger.error("DIFY_REPORT_APP_API_KEY 未配置") return { "metrics": metrics, "report": "AI 报告服务未配置,请联系管理员。", } # 动态生成 Dify 后台所需的必填参数 report_period now = datetime.now(timezone.utc) report_period = f"{now.year}年{now.month:02d}月" report_text = await dify_client.call_text_generator( api_key=settings.DIFY_REPORT_APP_API_KEY, inputs={"metrics_data": metrics_text, "report_period": report_period}, ) if not report_text: report_text = "AI 报告生成失败,请稍后重试或检查 Dify 服务状态。" logger.info("月度复盘报告生成完成 (%d 字)", len(report_text)) return { "metrics": metrics, "report": report_text, }