Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import logging
from typing import Optional, List, Dict
from fastapi import Header, HTTPException
logger = logging.getLogger(__name__)
ADMIN_GROUPS = ["/admin"]
CUBE_ACCESS_LIST: Dict[int, List[str]] = {}
def is_admin(x_forwarded_groups: str):
admin = False
if x_forwarded_groups and x_forwarded_groups is not None:
for group in ADMIN_GROUPS:
if group in x_forwarded_groups:
admin = True
return admin
def get_username(usernames: List[str]):
for name in usernames:
if name and name is not None:
return name
return None
async def get_user(x_forwarded_user: Optional[str] = Header(None),
x_forwarded_preferred_username: Optional[str] = Header(None),
x_forwarded_groups: Optional[str] = Header(None)):
"""
Extract the username and admin status from the http headers oauth2-proxy provides
"""
logger.debug(["X-Forwarded-Preferred-Username: " + x_forwarded_preferred_username if x_forwarded_preferred_username else ""] +
["X-Forwarded-User: " + x_forwarded_user if x_forwarded_user else ""] +
["X-Forwarded-Groups: " + x_forwarded_groups if x_forwarded_groups else ""])
admin = is_admin(x_forwarded_groups)
username = get_username([x_forwarded_preferred_username, x_forwarded_user])
return username, admin
async def verify_user(cube_id: int, x_forwarded_preferred_username: Optional[str] = Header(None),
x_forwarded_user: Optional[str] = Header(None),
x_forwarded_groups: Optional[str] = Header(None)):
admin = is_admin(x_forwarded_groups)
username = get_username([x_forwarded_preferred_username, x_forwarded_user])
if admin:
return True
if username is not None and CUBE_ACCESS_LIST.get(cube_id) is not None:
if username in CUBE_ACCESS_LIST.get(cube_id):
return True
raise HTTPException(status_code=401, detail="Unauthorized")