import logging from typing import Optional, List from fastapi import Header, HTTPException from pilab.events import reservation logger = logging.getLogger(__name__) ADMIN_GROUPS = ["/admin"] def is_admin(x_forwarded_groups: str): admin = False if x_forwarded_groups and x_forwarded_groups is not None: for group in ADMIN_GROUPS: if group in x_forwarded_groups: admin = True return admin def get_username(usernames: List[str]): for name in usernames: if name and name is not None: return name return None async def get_user(x_forwarded_user: Optional[str] = Header(None), x_forwarded_preferred_username: Optional[str] = Header(None), x_forwarded_groups: Optional[str] = Header(None)): """ Extract the username and admin status from the http headers oauth2-proxy provides """ logger.debug([ "X-Forwarded-Preferred-Username: " + x_forwarded_preferred_username if x_forwarded_preferred_username else ""] + ["X-Forwarded-User: " + x_forwarded_user if x_forwarded_user else ""] + ["X-Forwarded-Groups: " + x_forwarded_groups if x_forwarded_groups else ""]) admin = is_admin(x_forwarded_groups) username = get_username([x_forwarded_preferred_username, x_forwarded_user]) return username, admin async def verify_user(cube_id: int, x_forwarded_preferred_username: Optional[str] = Header(None), x_forwarded_user: Optional[str] = Header(None), x_forwarded_groups: Optional[str] = Header(None)): admin = is_admin(x_forwarded_groups) username = get_username([x_forwarded_preferred_username, x_forwarded_user]) if admin: return True if username is not None and username in reservation.get_active_users(cube_id): return True raise HTTPException(status_code=401, detail="Unauthorized") def verify_user_pi(cube_id: int, username: str, admin: bool): if admin: return True if username is not None and username in reservation.get_active_users(cube_id): return True return False