flows: Correctly check initial policies on flow with context

# Conflicts:
#	passbook/flows/planner.py
#	passbook/flows/tests/test_planner.py
#	passbook/flows/tests/test_views.py
#	passbook/flows/views.py
This commit is contained in:
Jens Langhammer 2020-05-23 20:23:09 +02:00
parent d4fa60f509
commit 3f92d1c420
6 changed files with 47 additions and 24 deletions

View file

@ -20,7 +20,7 @@ def create_default_user(apps: Apps, schema_editor: BaseDatabaseSchemaEditor):
class Migration(migrations.Migration):
dependencies = [
("passbook_core", "0002_auto_20200523_1133"),
("passbook_core", "0001_initial"),
]
operations = [

View file

@ -3,12 +3,16 @@ from typing import Optional
from uuid import uuid4
from django.db import models
from django.http import HttpRequest
from django.utils.translation import gettext_lazy as _
from model_utils.managers import InheritanceManager
from structlog import get_logger
from passbook.core.types import UIUserSettings
from passbook.policies.models import PolicyBindingModel
LOGGER = get_logger()
class FlowDesignation(models.TextChoices):
"""Designation of what a Flow should be used for. At a later point, this
@ -62,10 +66,29 @@ class Flow(PolicyBindingModel):
PolicyBindingModel, parent_link=True, on_delete=models.CASCADE, related_name="+"
)
def related_flow(self, designation: str) -> Optional["Flow"]:
@staticmethod
def with_policy(request: HttpRequest, **flow_filter) -> Optional["Flow"]:
"""Get a Flow by `**flow_filter` and check if the request from `request` can access it."""
from passbook.policies.engine import PolicyEngine
flows = Flow.objects.filter(**flow_filter)
for flow in flows:
engine = PolicyEngine(flow, request.user, request)
engine.build()
result = engine.result
if result.passing:
LOGGER.debug("with_policy: flow passing", flow=flow)
return flow
LOGGER.warning(
"with_policy: flow not passing", flow=flow, messages=result.messages
)
LOGGER.debug("with_policy: no flow found", filters=flow_filter)
return None
def related_flow(self, designation: str, request: HttpRequest) -> Optional["Flow"]:
"""Get a related flow with `designation`. Currently this only queries
Flows by `designation`, but will eventually use `self` for related lookups."""
return Flow.objects.filter(designation=designation).first()
return Flow.with_policy(request, designation=designation)
def __str__(self) -> str:
return f"Flow {self.name} ({self.slug})"

View file

@ -11,7 +11,6 @@ from passbook.core.models import User
from passbook.flows.exceptions import EmptyFlowException, FlowNonApplicableException
from passbook.flows.models import Flow, Stage
from passbook.policies.engine import PolicyEngine
from passbook.policies.types import PolicyResult
LOGGER = get_logger()
@ -52,22 +51,12 @@ class FlowPlanner:
self.use_cache = True
self.flow = flow
def _check_flow_root_policies(self, request: HttpRequest) -> PolicyResult:
engine = PolicyEngine(self.flow, request.user, request)
engine.build()
return engine.result
def plan(
self, request: HttpRequest, default_context: Optional[Dict[str, Any]] = None
) -> FlowPlan:
"""Check each of the flows' policies, check policies for each stage with PolicyBinding
and return ordered list"""
LOGGER.debug("f(plan): Starting planning process", flow=self.flow)
# First off, check the flow's direct policy bindings
# to make sure the user even has access to the flow
root_result = self._check_flow_root_policies(request)
if not root_result.passing:
raise FlowNonApplicableException(*root_result.messages)
# Bit of a workaround here, if there is a pending user set in the default context
# we use that user for our cache key
# to make sure they don't get the generic response
@ -75,6 +64,16 @@ class FlowPlanner:
user = default_context[PLAN_CONTEXT_PENDING_USER]
else:
user = request.user
# First off, check the flow's direct policy bindings
# to make sure the user even has access to the flow
engine = PolicyEngine(self.flow, user, request)
if default_context:
engine.request.context = default_context
engine.build()
result = engine.result
if not result.passing:
raise FlowNonApplicableException(result.messages)
# User is passing so far, check if we have a cached plan
cached_plan_key = cache_key(self.flow, user)
cached_plan = cache.get(cached_plan_key, None)
if cached_plan and self.use_cache:
@ -82,6 +81,7 @@ class FlowPlanner:
"f(plan): Taking plan from cache", flow=self.flow, key=cached_plan_key
)
return cached_plan
LOGGER.debug("f(plan): building plan", flow=self.flow)
plan = self._build_plan(user, request, default_context)
cache.set(cache_key(self.flow, user), plan)
if not plan.stages:

View file

@ -1,5 +1,5 @@
"""flow planner tests"""
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, PropertyMock, patch
from django.core.cache import cache
from django.shortcuts import reverse
@ -13,7 +13,7 @@ from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner, cache
from passbook.policies.types import PolicyResult
from passbook.stages.dummy.models import DummyStage
POLICY_RESULT_MOCK = MagicMock(return_value=PolicyResult(False))
POLICY_RESULT_MOCK = PropertyMock(return_value=PolicyResult(False))
TIME_NOW_MOCK = MagicMock(return_value=3)
@ -40,8 +40,7 @@ class TestFlowPlanner(TestCase):
planner.plan(request)
@patch(
"passbook.flows.planner.FlowPlanner._check_flow_root_policies",
POLICY_RESULT_MOCK,
"passbook.policies.engine.PolicyEngine.result", POLICY_RESULT_MOCK,
)
def test_non_applicable_plan(self):
"""Test that empty plan raises exception"""

View file

@ -1,5 +1,5 @@
"""flow views tests"""
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, PropertyMock, patch
from django.shortcuts import reverse
from django.test import Client, TestCase
@ -12,7 +12,7 @@ from passbook.lib.config import CONFIG
from passbook.policies.types import PolicyResult
from passbook.stages.dummy.models import DummyStage
POLICY_RESULT_MOCK = MagicMock(return_value=PolicyResult(False))
POLICY_RESULT_MOCK = PropertyMock(return_value=PolicyResult(False))
class TestFlowExecutor(TestCase):
@ -45,8 +45,7 @@ class TestFlowExecutor(TestCase):
self.assertEqual(cancel_mock.call_count, 1)
@patch(
"passbook.flows.planner.FlowPlanner._check_flow_root_policies",
POLICY_RESULT_MOCK,
"passbook.policies.engine.PolicyEngine.result", POLICY_RESULT_MOCK,
)
def test_invalid_non_applicable_flow(self):
"""Tests that a non-applicable flow returns the correct error message"""

View file

@ -1,7 +1,7 @@
"""passbook multi-stage authentication engine"""
from typing import Any, Dict, Optional
from django.http import HttpRequest, HttpResponse
from django.http import Http404, HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404, redirect, reverse
from django.utils.decorators import method_decorator
from django.views.decorators.clickjacking import xframe_options_sameorigin
@ -164,7 +164,9 @@ class ToDefaultFlow(View):
designation: Optional[FlowDesignation] = None
def dispatch(self, request: HttpRequest) -> HttpResponse:
flow = get_object_or_404(Flow, designation=self.designation)
flow = Flow.with_policy(request, designation=self.designation)
if not flow:
raise Http404
# If user already has a pending plan, clear it so we don't have to later.
if SESSION_KEY_PLAN in self.request.session:
plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]