Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
auth.py 2.22 KiB
import logging
from typing import Optional, List, Dict
from fastapi import Header, HTTPException

logger = logging.getLogger(__name__)

ADMIN_GROUPS = ["/admin"]

CUBE_ACCESS_LIST: Dict[int, List[str]] = {}


def is_admin(x_forwarded_groups: str):
    admin = False
    if x_forwarded_groups and x_forwarded_groups is not None:
        for group in ADMIN_GROUPS:
            if group in x_forwarded_groups:
                admin = True
    return admin


def get_username(usernames: List[str]):
    for name in usernames:
        if name and name is not None:
            return name
    return None


async def get_user(x_forwarded_user: Optional[str] = Header(None),
                   x_forwarded_preferred_username: Optional[str] = Header(None),
                   x_forwarded_groups: Optional[str] = Header(None)):
    """
    Extract the username and admin status from the http headers oauth2-proxy provides
    """
    logger.debug(["X-Forwarded-Preferred-Username: " + x_forwarded_preferred_username if x_forwarded_preferred_username else ""] +
                 ["X-Forwarded-User: " + x_forwarded_user if x_forwarded_user else ""] +
                 ["X-Forwarded-Groups: " + x_forwarded_groups if x_forwarded_groups else ""])

    admin = is_admin(x_forwarded_groups)
    username = get_username([x_forwarded_preferred_username, x_forwarded_user])
    return username, admin


async def verify_user(cube_id: int, x_forwarded_preferred_username: Optional[str] = Header(None),
                      x_forwarded_user: Optional[str] = Header(None),
                      x_forwarded_groups: Optional[str] = Header(None)):

    admin = is_admin(x_forwarded_groups)
    username = get_username([x_forwarded_preferred_username, x_forwarded_user])
    if admin:
        return True
    if username is not None and CUBE_ACCESS_LIST.get(cube_id) is not None:
        if username in CUBE_ACCESS_LIST.get(cube_id):
            return True

    raise HTTPException(status_code=401, detail="Unauthorized")


def verify_user_pi(cube_id: int, username: str, admin: bool):
    if admin:
        return True
    elif username is not None and CUBE_ACCESS_LIST.get(cube_id) is not None:
        if username in CUBE_ACCESS_LIST.get(cube_id):
            return True
    
    return False