""" 系统设置与权限域路由 —— /api/settings 核心亮点: 1. 部门树递归组装 2. 角色 CRUD + JSONB menu_keys 3. 员工管理 + bcrypt 密码哈希 4. 全接口强制管理员权限校验 """ from __future__ import annotations import uuid from datetime import datetime, timezone from fastapi import APIRouter, Depends, Query from sqlalchemy import func, select, update from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_current_user from app.core.exceptions import BizException, ForbiddenException, NotFoundException from app.core.security import hash_password from app.db.database import get_db from app.models.sys import SysDepartment, SysRole, SysUser from app.schemas.auth import CurrentUserPayload from app.schemas.sys import ( DeptNode, RoleCreate, RoleResponse, RoleUpdate, UserCreate, UserListResponse, UserResetPassword, UserResponse, UserUpdate, ) from app.schemas.response import ok router = APIRouter(prefix="/settings", tags=["系统设置"]) # ── 管理员权限守卫 ──────────────────────────────────────── def _require_admin(user: CurrentUserPayload) -> None: """全接口强制校验:必须 data_scope == 'all'""" if user.data_scope != "all": raise ForbiddenException("系统设置仅限管理员操作") # ── 工具:部门树递归组装 ───────────────────────────────── def _build_dept_tree( items: list[SysDepartment], parent_id: uuid.UUID | None = None ) -> list[DeptNode]: nodes: list[DeptNode] = [] for item in items: if item.parent_id == parent_id: children = _build_dept_tree(items, item.id) nodes.append( DeptNode( id=item.id, parent_id=item.parent_id, name=item.name, sort_order=item.sort_order, status=item.status, children=children, ) ) nodes.sort(key=lambda n: n.sort_order) return nodes # ── 工具:收集部门及所有子部门 ID(递归) ──────────────── def _collect_dept_ids( items: list[SysDepartment], root_id: uuid.UUID ) -> list[uuid.UUID]: """从扁平列表中递归收集某部门及其所有后代 ID""" result = [root_id] for item in items: if item.parent_id == root_id: result.extend(_collect_dept_ids(items, item.id)) return result # ── 工具:User ORM → Response ──────────────────────────── def _user_to_resp(u: SysUser) -> UserResponse: return UserResponse( id=u.id, username=u.username, real_name=u.real_name, phone=u.phone, email=u.email, dept_id=u.dept_id, dept_name=u.department.name if u.department else None, role_id=u.role_id, role_name=u.role.role_name if u.role else None, data_scope=u.role.data_scope if u.role else None, status=u.status, last_login_at=u.last_login_at, created_at=u.created_at, ) # ================================================================ # 1. GET /api/settings/departments/tree —— 部门树 # ================================================================ @router.get("/departments/tree", summary="获取组织架构树") async def get_dept_tree( db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: _require_admin(current_user) stmt = ( select(SysDepartment) .where(SysDepartment.is_deleted.is_(False)) .order_by(SysDepartment.sort_order) ) depts = list((await db.execute(stmt)).scalars().all()) tree = _build_dept_tree(depts, parent_id=None) return ok(data=[n.model_dump(mode="json") for n in tree]) # ================================================================ # 2. GET /api/settings/roles —— 角色列表 # ================================================================ @router.get("/roles", summary="角色列表") async def list_roles( db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: _require_admin(current_user) stmt = ( select(SysRole) .where(SysRole.is_deleted.is_(False)) .order_by(SysRole.created_at) ) roles = (await db.execute(stmt)).scalars().all() return ok( data=[ RoleResponse( id=r.id, role_name=r.role_name, data_scope=r.data_scope, menu_keys=r.menu_keys or [], description=r.description, status=r.status, created_at=r.created_at, ).model_dump(mode="json") for r in roles ] ) # ================================================================ # 3. POST /api/settings/roles —— 新增角色 # ================================================================ @router.post("/roles", summary="新增角色") async def create_role( body: RoleCreate, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: _require_admin(current_user) # 校验名称唯一 exists = ( await db.execute( select(SysRole.id).where( SysRole.role_name == body.role_name, SysRole.is_deleted.is_(False), ) ) ).scalar_one_or_none() if exists: raise BizException(message=f"角色名称 '{body.role_name}' 已存在") role = SysRole( role_name=body.role_name, data_scope=body.data_scope, menu_keys=body.menu_keys, description=body.description, status=body.status, ) db.add(role) await db.commit() await db.refresh(role) return ok( data=RoleResponse( id=role.id, role_name=role.role_name, data_scope=role.data_scope, menu_keys=role.menu_keys or [], description=role.description, status=role.status, created_at=role.created_at, ).model_dump(mode="json"), message="角色创建成功", ) # ================================================================ # 4. PUT /api/settings/roles/{id} —— 修改角色 # ================================================================ @router.put("/roles/{role_id}", summary="修改角色(含 JSONB menu_keys)") async def update_role( role_id: uuid.UUID, body: RoleUpdate, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: _require_admin(current_user) role = ( await db.execute( select(SysRole).where( SysRole.id == role_id, SysRole.is_deleted.is_(False) ) ) ).scalar_one_or_none() if role is None: raise NotFoundException("角色不存在") update_data = body.model_dump(exclude_unset=True) if not update_data: raise BizException(message="未提供任何需要更新的字段") # 名称唯一性校验(如果改了名) if "role_name" in update_data and update_data["role_name"] != role.role_name: dup = ( await db.execute( select(SysRole.id).where( SysRole.role_name == update_data["role_name"], SysRole.id != role_id, SysRole.is_deleted.is_(False), ) ) ).scalar_one_or_none() if dup: raise BizException(message=f"角色名称 '{update_data['role_name']}' 已存在") update_data["updated_at"] = datetime.utcnow() await db.execute( update(SysRole).where(SysRole.id == role_id).values(**update_data) ) await db.commit() refreshed = ( await db.execute(select(SysRole).where(SysRole.id == role_id)) ).scalar_one() return ok( data=RoleResponse( id=refreshed.id, role_name=refreshed.role_name, data_scope=refreshed.data_scope, menu_keys=refreshed.menu_keys or [], description=refreshed.description, status=refreshed.status, created_at=refreshed.created_at, ).model_dump(mode="json"), message="角色信息已更新", ) # ================================================================ # 5. GET /api/settings/users —— 员工分页列表 # ================================================================ @router.get("/users", summary="员工分页列表(支持部门树递归过滤)") async def list_users( page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), dept_id: uuid.UUID | None = Query(None, description="部门 ID(含所有子部门)"), keyword: str | None = Query(None, description="姓名/手机号模糊搜索"), status: int | None = Query(None, ge=0, le=1), db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: _require_admin(current_user) where = [SysUser.is_deleted.is_(False)] # 按部门过滤(递归收集子部门 ID) if dept_id: all_depts = list( ( await db.execute( select(SysDepartment).where(SysDepartment.is_deleted.is_(False)) ) ).scalars().all() ) target_ids = _collect_dept_ids(all_depts, dept_id) where.append(SysUser.dept_id.in_(target_ids)) if keyword: where.append( SysUser.real_name.ilike(f"%{keyword}%") | SysUser.phone.ilike(f"%{keyword}%") ) if status is not None: where.append(SysUser.status == status) total = ( await db.execute(select(func.count()).select_from(SysUser).where(*where)) ).scalar() or 0 stmt = ( select(SysUser) .where(*where) .order_by(SysUser.created_at.desc()) .offset((page - 1) * size) .limit(size) ) users = (await db.execute(stmt)).scalars().all() return ok( data=UserListResponse( total=total, items=[_user_to_resp(u) for u in users], page=page, size=size, ).model_dump(mode="json") ) # ================================================================ # 6. POST /api/settings/users —— 开通账号 # ================================================================ @router.post("/users", summary="开通员工账号(bcrypt 密码哈希)") async def create_user( body: UserCreate, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: _require_admin(current_user) # 用户名唯一 exists = ( await db.execute( select(SysUser.id).where( SysUser.username == body.username, SysUser.is_deleted.is_(False), ) ) ).scalar_one_or_none() if exists: raise BizException(message=f"用户名 '{body.username}' 已被占用") user = SysUser( username=body.username, password_hash=hash_password(body.password), real_name=body.real_name, phone=body.phone, email=body.email, dept_id=body.dept_id, role_id=body.role_id, status=body.status, ) db.add(user) await db.commit() await db.refresh(user) return ok(data=_user_to_resp(user).model_dump(mode="json"), message="账号创建成功") # ================================================================ # 7. PUT /api/settings/users/{id} —— 编辑员工信息 # ================================================================ @router.put("/users/{user_id}", summary="编辑员工信息(不含密码)") async def update_user( user_id: uuid.UUID, body: UserUpdate, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: _require_admin(current_user) user = ( await db.execute( select(SysUser).where( SysUser.id == user_id, SysUser.is_deleted.is_(False) ) ) ).scalar_one_or_none() if user is None: raise NotFoundException("用户不存在") update_data = body.model_dump(exclude_unset=True) if not update_data: raise BizException(message="未提供任何需要更新的字段") update_data["updated_at"] = datetime.utcnow() await db.execute( update(SysUser).where(SysUser.id == user_id).values(**update_data) ) await db.commit() refreshed = ( await db.execute(select(SysUser).where(SysUser.id == user_id)) ).scalar_one() return ok(data=_user_to_resp(refreshed).model_dump(mode="json"), message="员工信息已更新") # ================================================================ # 8. PUT /api/settings/users/{id}/reset-password —— 强制重置密码 # ================================================================ @router.put("/users/{user_id}/reset-password", summary="强制重置密码(bcrypt)") async def reset_password( user_id: uuid.UUID, body: UserResetPassword, db: AsyncSession = Depends(get_db), current_user: CurrentUserPayload = Depends(get_current_user), ) -> dict: _require_admin(current_user) user = ( await db.execute( select(SysUser).where( SysUser.id == user_id, SysUser.is_deleted.is_(False) ) ) ).scalar_one_or_none() if user is None: raise NotFoundException("用户不存在") await db.execute( update(SysUser) .where(SysUser.id == user_id) .values( password_hash=hash_password(body.new_password), updated_at=datetime.utcnow(), ) ) await db.commit() return ok(message="密码已重置")