This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/passbook/policy/engine.py

97 lines
3.4 KiB
Python

"""passbook policy engine"""
from multiprocessing import Pipe
from multiprocessing.connection import Connection
from typing import List, Tuple
from django.core.cache import cache
from django.http import HttpRequest
from structlog import get_logger
from passbook.core.models import Policy, User
from passbook.policy.struct import PolicyRequest
from passbook.policy.process import PolicyProcess
LOGGER = get_logger()
def _cache_key(policy, user):
return f"policy_{policy.pk}#{user.pk}"
class PolicyEngine:
"""Orchestrate policy checking, launch tasks and return result"""
policies: List[Policy] = []
__request: HttpRequest
__user: User
__proc_list: List[Tuple[Connection, PolicyProcess]] = []
def __init__(self, policies, user: User = None, request: HttpRequest = None):
self.policies = policies
self.__request = request
self.__user = user
def for_user(self, user: User) -> 'PolicyEngine':
"""Check policies for user"""
self.__user = user
return self
def with_request(self, request: HttpRequest) -> 'PolicyEngine':
"""Set request"""
self.__request = request
return self
def _select_subclasses(self) -> List[Policy]:
"""Make sure all Policies are their respective classes"""
return Policy.objects \
.filter(pk__in=[x.pk for x in self.policies]) \
.select_subclasses() \
.order_by('order')
def build(self) -> 'PolicyEngine':
"""Build task group"""
if not self.__user:
raise ValueError("User not set.")
cached_policies = []
request = PolicyRequest(self.__user)
request.http_request = self.__request
for policy in self._select_subclasses():
cached_policy = cache.get(_cache_key(policy, self.__user), None)
if cached_policy:
LOGGER.debug("Taking result from cache", policy=policy.pk.hex)
cached_policies.append(cached_policy)
else:
LOGGER.debug("Evaluating policy", policy=policy.pk.hex)
our_end, task_end = Pipe(False)
task = PolicyProcess()
task.ret = task_end
task.request = request
task.policy = policy
LOGGER.debug("Starting Process", class_name=task.__class__.__name__)
task.start()
self.__proc_list.append((our_end, task))
# If all policies are cached, we have an empty list here.
if self.__proc_list:
for _, running_proc in self.__proc_list:
running_proc.join()
return self
@property
def result(self) -> Tuple[bool, List[str]]:
"""Get policy-checking result"""
messages: List[str] = []
for our_end, _ in self.__proc_list:
policy_result = our_end.recv()
# passing = (policy_action == Policy.ACTION_ALLOW and policy_result) or \
# (policy_action == Policy.ACTION_DENY and not policy_result)
LOGGER.debug('Result=%r => %r', policy_result, policy_result.passing)
if policy_result.messages:
messages += policy_result.messages
if not policy_result.passing:
return False, messages
return True, messages
@property
def passing(self) -> bool:
"""Only get true/false if user passes"""
return self.result[0]