""" Auth 路由 —— /api/auth/login & /api/auth/me """ from __future__ import annotations from datetime import datetime from fastapi import APIRouter, Depends from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_current_user from app.core.exceptions import BizException, UnauthorizedException from app.core.security import create_access_token, hash_password, verify_password from app.db.database import get_db from app.models.sys import SysUser from app.schemas.auth import ( CurrentUserPayload, LoginRequest, TokenResponse, UpdatePasswordRequest, ) from app.schemas.response import ok router = APIRouter(prefix="/auth", tags=["鉴权"]) @router.post("/login", summary="账号密码登录,签发 JWT") async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)) -> dict: # 1. 查询用户 stmt = select(SysUser).where( SysUser.username == body.username, SysUser.is_deleted.is_(False), ) result = await db.execute(stmt) user = result.scalar_one_or_none() if user is None: raise BizException(code=401, message="用户名或密码错误") # 2. 校验密码 if not verify_password(body.password, user.password_hash): raise BizException(code=401, message="用户名或密码错误") # 3. 检查账号状态 if user.status != 1: raise BizException(code=403, message="账号已被禁用,请联系管理员") # 4. 签发 Token(sub 存 user_id 字符串) access_token = create_access_token(data={"sub": str(user.id)}) # 5. 刷新最后登录时间 await db.execute( update(SysUser) .where(SysUser.id == user.id) .values(last_login_at=datetime.utcnow()) ) await db.commit() return ok( data=TokenResponse(access_token=access_token).model_dump(), message="登录成功", ) @router.get("/me", summary="获取当前登录用户信息(验证 Token 有效性)") async def get_me( current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: return ok(data=current_user.model_dump(mode="json")) @router.put("/password", summary="当前用户修改密码") async def change_password( body: UpdatePasswordRequest, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: # 1. 查出用户记录 stmt = select(SysUser).where(SysUser.id == current_user.user_id) result = await db.execute(stmt) user = result.scalar_one_or_none() if user is None: raise BizException(code=404, message="用户不存在") # 2. 校验旧密码 if not verify_password(body.old_password, user.password_hash): raise BizException(code=400, message="旧密码错误") # 3. 哈希新密码并更新 await db.execute( update(SysUser) .where(SysUser.id == current_user.user_id) .values(password_hash=hash_password(body.new_password), updated_at=datetime.utcnow()) ) await db.commit() return ok(message="密码修改成功,请重新登录")