# -*- coding: utf-8 -*- """ SHBL-CRM Integration Tests Module 3: Business Logic + AI Workflow Test 1: POST /api/v1/logs - Event-driven AI background task Test 2: GET /api/v1/reports/monthly - SQL pre-aggregation + AI report Prerequisites: - Backend running: uvicorn app.main:app (port 8000) - PostgreSQL + Alembic migration applied - Ollama node reachable (for AI tests) Run: pytest tests/test_api_integration.py -v -s """ import asyncio import logging import uuid import httpx import pytest import pytest_asyncio # ---- Logging setup for test output ---- logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(name)-12s | %(levelname)-5s | %(message)s", ) logger = logging.getLogger("test_integration") # ---- Config ---- BASE_URL = "http://127.0.0.1:8000/api/v1" # Test fixtures: pre-seeded client & auth # Must insert a test client into DB before running, # or use the seed script. This UUID will be used for all tests. TEST_CLIENT_ID: str | None = None # Populated by fixture TEST_TOKEN: str | None = None # Populated by fixture # ============================================================ # Fixtures # ============================================================ @pytest_asyncio.fixture(scope="module") async def seed_test_data(): """ Seed a test client and admin user into the database, then authenticate to get a JWT token. """ global TEST_CLIENT_ID, TEST_TOKEN # Create admin user + test client via direct DB insert import psycopg2 import bcrypt admin_hash = bcrypt.hashpw(b"admin123", bcrypt.gensalt()).decode("utf-8") test_client_uuid = str(uuid.uuid4()) conn = psycopg2.connect( host="192.168.1.85", port=5432, user="admin", password="admin_password_2026", dbname="lubrication_crm", ) cur = conn.cursor() # Seed admin user (idempotent) cur.execute(""" INSERT INTO users (id, username, password_hash, role, is_active) VALUES (%s, 'test_admin', %s, 'admin', true) ON CONFLICT (username) DO UPDATE SET password_hash = EXCLUDED.password_hash """, (str(uuid.uuid4()), admin_hash)) # Seed test client cur.execute(""" INSERT INTO clients (id, name, contact_person, phone) VALUES (%s, 'Test_Integration_Client', 'Zhang San', '13800138000') """, (test_client_uuid,)) conn.commit() cur.close() conn.close() TEST_CLIENT_ID = test_client_uuid # Authenticate to get JWT token async with httpx.AsyncClient(base_url=BASE_URL, timeout=10.0) as client: resp = await client.post("/auth/login", json={ "username": "test_admin", "password": "admin123", }) assert resp.status_code == 200, f"Login failed: {resp.text}" TEST_TOKEN = resp.json()["access_token"] logger.info("Auth token acquired for test_admin") yield # Cleanup: remove test data conn = psycopg2.connect( host="192.168.1.85", port=5432, user="admin", password="admin_password_2026", dbname="lubrication_crm", ) cur = conn.cursor() cur.execute("DELETE FROM customer_tags WHERE customer_id = %s", (test_client_uuid,)) cur.execute("DELETE FROM follow_up_todos WHERE customer_id = %s", (test_client_uuid,)) cur.execute("DELETE FROM customer_logs WHERE customer_id = %s", (test_client_uuid,)) cur.execute("DELETE FROM sales_opportunities WHERE customer_id = %s", (test_client_uuid,)) cur.execute("DELETE FROM clients WHERE id = %s", (test_client_uuid,)) conn.commit() cur.close() conn.close() logger.info("Test data cleaned up") # ============================================================ # Test Case 1: POST /api/v1/logs + AI Background Task # ============================================================ @pytest.mark.asyncio async def test_create_log_and_ai_background_task(seed_test_data): """ Test the event-driven AI processing pipeline: 1. POST log → 200 OK (immediate) 2. Wait for BackgroundTasks to call Ollama 3. Verify tags & todos were written to DB """ assert TEST_TOKEN, "Auth token missing" assert TEST_CLIENT_ID, "Test client ID missing" headers = {"Authorization": f"Bearer {TEST_TOKEN}"} log_content = "今天拜访了张总,客户对价格敏感,要求下周二前给折扣方案" # ---- Step 1: Submit log ---- async with httpx.AsyncClient(base_url=BASE_URL, timeout=15.0) as client: resp = await client.post( "/logs", json={ "customer_id": TEST_CLIENT_ID, "content": log_content, }, headers=headers, ) # ---- Assert 1: Immediate response ---- assert resp.status_code == 200, f"Expected 200, got {resp.status_code}: {resp.text}" data = resp.json() assert "id" in data, "Response missing 'id' field" log_id = data["id"] logger.info("Log submitted successfully: id=%s", log_id) # ---- Step 2: Wait for AI background task ---- # BackgroundTasks calls Ollama (qwen3:14b), may take 5-30s logger.info("Waiting 10s for AI background processing...") await asyncio.sleep(10) # ---- Assert 2: Verify DB side effects ---- import psycopg2 conn = psycopg2.connect( host="192.168.1.85", port=5432, user="admin", password="admin_password_2026", dbname="lubrication_crm", ) cur = conn.cursor() # Check customer_tags cur.execute( "SELECT tag_name FROM customer_tags WHERE customer_id = %s", (TEST_CLIENT_ID,), ) tags = [r[0] for r in cur.fetchall()] logger.info("Generated tags: %s", tags) # Check follow_up_todos cur.execute( "SELECT task_desc, status FROM follow_up_todos WHERE customer_id = %s", (TEST_CLIENT_ID,), ) todos = cur.fetchall() logger.info("Generated todos: %s", todos) cur.close() conn.close() # AI may or may not succeed (Ollama connectivity), # but if it did, we should see results. # Use soft assertion: log warning if empty, don't fail hard # (Ollama node might be unreachable in test env) if tags: assert len(tags) <= 3, f"Expected at most 3 tags, got {len(tags)}" logger.info("PASS: AI generated %d tag(s)", len(tags)) else: logger.warning( "WARNING: No tags generated. " "Check Ollama connectivity and backend logs for errors." ) if todos: assert todos[0][1] == "pending", "Todo status should be 'pending'" logger.info("PASS: AI generated todo: %s", todos[0][0][:80]) else: logger.warning( "WARNING: No todos generated. " "Check Ollama connectivity and backend logs for errors." ) # ============================================================ # Test Case 2: GET /api/v1/reports/monthly # ============================================================ @pytest.mark.asyncio async def test_monthly_sales_report_generation(seed_test_data): """ Test SQL pre-aggregation + AI report generation: 1. GET /reports/monthly → 200 2. Response contains metrics list + report string """ assert TEST_TOKEN, "Auth token missing" headers = {"Authorization": f"Bearer {TEST_TOKEN}"} # ---- Step 1: Request monthly report ---- # This is a synchronous wait for AI generation, set generous timeout async with httpx.AsyncClient(base_url=BASE_URL, timeout=90.0) as client: logger.info("Requesting monthly report (may take 30-60s for AI generation)...") resp = await client.get("/reports/monthly", headers=headers) # ---- Assert 1: Status code ---- assert resp.status_code == 200, f"Expected 200, got {resp.status_code}: {resp.text}" data = resp.json() logger.info("Report response received (%d bytes)", len(resp.text)) # ---- Assert 2: Response structure ---- assert "metrics" in data, "Response missing 'metrics' field" assert "report" in data, "Response missing 'report' field" assert isinstance(data["metrics"], list), "'metrics' should be a list" assert isinstance(data["report"], str), "'report' should be a string" assert len(data["report"]) > 0, "'report' should be non-empty" logger.info("Metrics: %s", data["metrics"]) logger.info("Report preview: %s...", data["report"][:200]) logger.info("PASS: Monthly report generated (%d chars)", len(data["report"]))