Files
hankin 423baff73b v0.1.0: CRM/ERP 系统内测版本 - 安全加固完成
- Docker bridge 网络隔离(8000 端口封死)
- Gunicorn 4 Worker 多进程
- Alembic 数据库迁移基线
- 日志轮转 20m×3
- JWT 密钥 + DB 密码 + CORS 收紧
- 3-2-1 备份链路(NAS + R740-B 冷备)
- 连接池 pool_pre_ping + pool_recycle=3600
2026-03-16 07:31:37 +00:00

101 lines
3.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Auth 路由 —— /api/auth/login & /api/auth/me
"""
from __future__ import annotations
from datetime import datetime
from fastapi import APIRouter, Depends
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_user
from app.core.exceptions import BizException, UnauthorizedException
from app.core.security import create_access_token, hash_password, verify_password
from app.db.database import get_db
from app.models.sys import SysUser
from app.schemas.auth import (
CurrentUserPayload,
LoginRequest,
TokenResponse,
UpdatePasswordRequest,
)
from app.schemas.response import ok
router = APIRouter(prefix="/auth", tags=["鉴权"])
@router.post("/login", summary="账号密码登录,签发 JWT")
async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)) -> dict:
# 1. 查询用户
stmt = select(SysUser).where(
SysUser.username == body.username,
SysUser.is_deleted.is_(False),
)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if user is None:
raise BizException(code=401, message="用户名或密码错误")
# 2. 校验密码
if not verify_password(body.password, user.password_hash):
raise BizException(code=401, message="用户名或密码错误")
# 3. 检查账号状态
if user.status != 1:
raise BizException(code=403, message="账号已被禁用,请联系管理员")
# 4. 签发 Tokensub 存 user_id 字符串)
access_token = create_access_token(data={"sub": str(user.id)})
# 5. 刷新最后登录时间
await db.execute(
update(SysUser)
.where(SysUser.id == user.id)
.values(last_login_at=datetime.utcnow())
)
await db.commit()
return ok(
data=TokenResponse(access_token=access_token).model_dump(),
message="登录成功",
)
@router.get("/me", summary="获取当前登录用户信息(验证 Token 有效性)")
async def get_me(
current_user: CurrentUserPayload = Depends(get_current_user),
) -> dict:
return ok(data=current_user.model_dump(mode="json"))
@router.put("/password", summary="当前用户修改密码")
async def change_password(
body: UpdatePasswordRequest,
db: AsyncSession = Depends(get_db),
current_user: CurrentUserPayload = Depends(get_current_user),
) -> dict:
# 1. 查出用户记录
stmt = select(SysUser).where(SysUser.id == current_user.user_id)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if user is None:
raise BizException(code=404, message="用户不存在")
# 2. 校验旧密码
if not verify_password(body.old_password, user.password_hash):
raise BizException(code=400, message="旧密码错误")
# 3. 哈希新密码并更新
await db.execute(
update(SysUser)
.where(SysUser.id == current_user.user_id)
.values(password_hash=hash_password(body.new_password), updated_at=datetime.utcnow())
)
await db.commit()
return ok(message="密码修改成功,请重新登录")