Skip to content
Snippets Groups Projects
auth.py 2.13 KiB
Newer Older
  • Learn to ignore specific revisions
  • import logging
    
    from typing import Optional, List
    
    
    from fastapi import Header, HTTPException
    
    from pilab.events import reservation
    
    
    logger = logging.getLogger(__name__)
    
    ADMIN_GROUPS = ["/admin"]
    
    
    def is_admin(x_forwarded_groups: str):
        admin = False
        if x_forwarded_groups and x_forwarded_groups is not None:
            for group in ADMIN_GROUPS:
                if group in x_forwarded_groups:
                    admin = True
        return admin
    
    
    def get_username(usernames: List[str]):
        for name in usernames:
            if name and name is not None:
                return name
        return None
    
    
    async def get_user(x_forwarded_user: Optional[str] = Header(None),
                       x_forwarded_preferred_username: Optional[str] = Header(None),
                       x_forwarded_groups: Optional[str] = Header(None)):
        """
        Extract the username and admin status from the http headers oauth2-proxy provides
        """
    
        logger.debug([
                         "X-Forwarded-Preferred-Username: " + x_forwarded_preferred_username if x_forwarded_preferred_username else ""] +
    
                     ["X-Forwarded-User: " + x_forwarded_user if x_forwarded_user else ""] +
                     ["X-Forwarded-Groups: " + x_forwarded_groups if x_forwarded_groups else ""])
    
        admin = is_admin(x_forwarded_groups)
        username = get_username([x_forwarded_preferred_username, x_forwarded_user])
        return username, admin
    
    
    async def verify_user(cube_id: int, x_forwarded_preferred_username: Optional[str] = Header(None),
                          x_forwarded_user: Optional[str] = Header(None),
                          x_forwarded_groups: Optional[str] = Header(None)):
        admin = is_admin(x_forwarded_groups)
        username = get_username([x_forwarded_preferred_username, x_forwarded_user])
        if admin:
            return True
    
        if username is not None and username in reservation.get_active_users(cube_id):
    
    
        raise HTTPException(status_code=401, detail="Unauthorized")
    
    
    
    def verify_user_pi(cube_id: int, username: str, admin: bool):
        if admin:
            return True
    
        if username is not None and username in reservation.get_active_users(cube_id):