Source code for app.auth.auth_controller

from functools import wraps
from http import HTTPStatus

from flask import session, abort, g, redirect

from app.auth.permission_strategies import Ctx, indexed_permission_evaluators, indexed_endpoint_validators
from app.auth.scopes.system_scopes import SystemScopes

from app.repositories.member_repository import MemberRepository
from app.repositories.project_participation_repository import ProjectParticipationRepository
from app.repositories.project_repository import ProjectRepository

from app.schemas.member_schema import MemberSchema


[docs] class AuthController: """ Enforces authentication and authorization on controllers using `flask-session`. This class provides decorators to: - Log in members via :func:`login_member`. - Log out members via :func:`logout_member`. - Enforce authentication on controllers via :func:`requires_login` and populate the ``current_member`` global proxy. - Enforce authorization checks on controllers via ``requires_permission``. This also enforces authentication by using :func`requires_login`, making ``current_member`` also available. :param enabled: Flag to enable or disable access control enforcement. :type enabled: bool :param member_repo: Repository interface to retrieve member data. :type member_repo: ``app.repositories.member_repository.MemberRepository`` :param project_repo: Repository interface to retrieve member data. :type project_repo: ``app.repositories.project_repository.ProjectRepository`` :param participation_repo: Repository interface to retrieve member data. :type participation_repo: ``app.repositories.project_participation_repository.ProjectParticipationRepository`` :param system_scopes: Class with system scopes. :type participation_repo: ``app.auth.scopes.system_scopes.SystemScopes`` """ def __init__(self, *, enabled: bool, member_repo: MemberRepository, project_repo: ProjectRepository, participation_repo: ProjectParticipationRepository,system_scopes: SystemScopes): self.enabled = enabled self.member_repo = member_repo self.project_repo = project_repo self.participation_repo = participation_repo self.system_scopes = system_scopes
[docs] def login_member(self, fn): """ This is meant to act as the controller to start member sessions. Should be used to decorate functions that authenticate a member and return it to start the session, an optional URL for a redirect value can also be returned. It's not a controller decorator, it is the controller, the functions should simply provide the member model. Example:: @app.route("/login") @login_member def login(): return Member(), None # return the authenticated member and doesn't redirect @app.route("/oauth-callback") @login_member def oauth_callback(): return Member(), "https://frontend.com/dashboard" # redirects to frontend with params ?login=success&username=member_username :param fn: Function that authenticates a user and returns its model. :type fn: function """ @wraps(fn) def wrapper(*args, **kwargs): if not self.enabled: return abort(HTTPStatus.NOT_IMPLEMENTED) member, redirect_uri = fn(*args, **kwargs) session.clear() if redirect_uri: if member is None: return redirect_uri(redirect_uri + f"?login=fail") session["id"] = member.id return redirect(redirect_uri + f"?login=success&username={member.username}") if member is None: return abort(HTTPStatus.UNAUTHORIZED, description=f"Failed authentication") session["id"] = member.id return {"description": "Logged in successfully!", "member": MemberSchema.from_member(member).model_dump(exclude="password")} return wrapper
[docs] def requires_login(self, fn): """ Decorate controllers that require a logged-in user. This decorator enables ``current_member`` global to be accessed in controllers. Example:: from app.access import current_member @bp.route("/members/<username>", methods=["PUT"]) @requires_login def update_member(username): if current_member.username == username: pass :param fn: Decorated controller. :type fn: function """ @wraps(fn) def wrapper(*args, **kwargs): if not self.enabled: return fn(*args, **kwargs) if "id" not in session: return abort(HTTPStatus.UNAUTHORIZED, description="You are not logged in") member = self.member_repo.get_member_by_id(session["id"]) if member is None: # member deleted while session was still valid return abort(HTTPStatus.UNAUTHORIZED, description="You are not logged in") g.current_member = member return fn(*args, **kwargs) return wrapper
[docs] def logout_member(self, fn): """ Decorate controllers meant to end a user session. Example:: @app.route("/login") @access_controller.logout_member def logout(): return {"message": "Logged out successfully!"} :param fn: Decorated controller. :type fn: function """ @wraps(fn) @self.requires_login def wrapper(*args, **kwargs): if not self.enabled: return abort(HTTPStatus.NOT_IMPLEMENTED, description="Access control disabled") r = fn(*args, **kwargs) session.clear() g.current_member = None return r return wrapper
[docs] def requires_permission(self, **scoped_permissions): """ Decorator to enforce scoped permission checks on route handlers. Example:: @bp.route("/projects/<name>", methods=["PUT"]) @requires_permission(general="project:update", project="edit") def update_project(name): pass Each keyword argument represents a scope, and its value is the required permission for that scope. If any one scope grants the required permission, access is allowed. :param scoped_permissions: Mapping of scope names to required permission strings. :type scoped_permissions: dict[str, str] :raises ValueError: If an undefined scope is passed. :raises ValueError: If and undefined permission is passed for the corresponding scope. """ for scope in scoped_permissions: if self.system_scopes.get_scope(scope) is None or scope not in indexed_permission_evaluators: raise ValueError(f"Undefined scope or permission evaluator for scope '{scope}'") permission = scoped_permissions[scope] for role in self.system_scopes.get_scope(scope).roles: if permission in role.permissions: break else: raise ValueError(f"Undefined permission '{permission}' in any role for scope '{scope}'") def decorator(fn): for scope in scoped_permissions: assert_valid_endpoint = indexed_endpoint_validators[scope] assert_valid_endpoint(fn) # raises error if invalid endpoint signature @wraps(fn) @self.requires_login def wrapper(*args, **kwargs): # skip authorization if access control is disabled if not self.enabled: return fn(*args, **kwargs) # check if user has at least permissions in one scope for scope in scoped_permissions: has_perm_eval = indexed_permission_evaluators[scope] if has_perm_eval(Ctx(authCtx=self, permission=scoped_permissions[scope], args=args, kwargs=kwargs)): return fn(*args, **kwargs) return abort(HTTPStatus.FORBIDDEN, description="You don't have permissions to perform this action") return wrapper return decorator