flows: introduce FlowPlan markers, which indicate when a stage needs … (#79)

* flows: introduce FlowPlan markers, which indicate when a stage needs re-evaluation

Implement re_evaluate_policies
add unittests for several different scenarios
closes #78

* flows: move markers to separate files, cleanup formatting

* flows: fix self.next is not callable
This commit is contained in:
Jens L 2020-06-18 22:43:51 +02:00 committed by GitHub
parent 5b8bdac84b
commit 6a4086c490
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 482 additions and 40 deletions

49
passbook/flows/markers.py Normal file
View File

@ -0,0 +1,49 @@
"""Stage Markers"""
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional
from structlog import get_logger
from passbook.core.models import User
from passbook.flows.models import Stage
from passbook.policies.engine import PolicyEngine
from passbook.policies.models import PolicyBinding
if TYPE_CHECKING:
from passbook.flows.planner import FlowPlan
LOGGER = get_logger()
@dataclass
class StageMarker:
"""Base stage marker class, no extra attributes, and has no special handler."""
# pylint: disable=unused-argument
def process(self, plan: "FlowPlan", stage: Stage) -> Optional[Stage]:
"""Process callback for this marker. This should be overridden by sub-classes.
If a stage should be removed, return None."""
return stage
@dataclass
class ReevaluateMarker(StageMarker):
"""Reevaluate Marker, forces stage's policies to be evaluated again."""
binding: PolicyBinding
user: User
def process(self, plan: "FlowPlan", stage: Stage) -> Optional[Stage]:
"""Re-evaluate policies bound to stage, and if they fail, remove from plan"""
engine = PolicyEngine(self.binding, self.user)
engine.request.context = plan.context
engine.build()
result = engine.result
if result.passing:
return stage
LOGGER.warning(
"f(plan_inst)[re-eval marker]: stage failed re-evaluation",
stage=stage,
messages=result.messages,
)
return None

View File

@ -9,7 +9,8 @@ from structlog import get_logger
from passbook.core.models import User
from passbook.flows.exceptions import EmptyFlowException, FlowNonApplicableException
from passbook.flows.models import Flow, Stage
from passbook.flows.markers import ReevaluateMarker, StageMarker
from passbook.flows.models import Flow, FlowStageBinding, Stage
from passbook.policies.engine import PolicyEngine
LOGGER = get_logger()
@ -33,12 +34,39 @@ class FlowPlan:
of all Stages that should be run."""
flow_pk: str
stages: List[Stage] = field(default_factory=list)
context: Dict[str, Any] = field(default_factory=dict)
markers: List[StageMarker] = field(default_factory=list)
def next(self) -> Stage:
def next(self) -> Optional[Stage]:
"""Return next pending stage from the bottom of the list"""
return self.stages[0]
if not self.has_stages:
return None
stage = self.stages[0]
marker = self.markers[0]
LOGGER.debug("f(plan_inst): stage has marker", stage=stage, marker=marker)
marked_stage = marker.process(self, stage)
if not marked_stage:
LOGGER.debug("f(plan_inst): marker returned none, next stage", stage=stage)
self.stages.remove(stage)
self.markers.remove(marker)
if not self.has_stages:
return None
# pylint: disable=not-callable
return self.next()
return marked_stage
def pop(self):
"""Pop next pending stage from bottom of list"""
self.markers.pop(0)
self.stages.pop(0)
@property
def has_stages(self) -> bool:
"""Check if there are any stages left in this plan"""
return len(self.markers) + len(self.stages) > 0
class FlowPlanner:
@ -100,7 +128,8 @@ class FlowPlanner:
request: HttpRequest,
default_context: Optional[Dict[str, Any]],
) -> FlowPlan:
"""Actually build flow plan"""
"""Build flow plan by checking each stage in their respective
order and checking the applied policies"""
start_time = time()
plan = FlowPlan(flow_pk=self.flow.pk.hex)
if default_context:
@ -111,13 +140,24 @@ class FlowPlanner:
.select_subclasses()
.select_related()
):
binding = stage.flowstagebinding_set.get(flow__pk=self.flow.pk)
binding: FlowStageBinding = stage.flowstagebinding_set.get(
flow__pk=self.flow.pk
)
engine = PolicyEngine(binding, user, request)
engine.request.context = plan.context
engine.build()
if engine.passing:
LOGGER.debug("f(plan): Stage passing", stage=stage, flow=self.flow)
plan.stages.append(stage)
marker = StageMarker()
if binding.re_evaluate_policies:
LOGGER.debug(
"f(plan): Stage has re-evaluate marker",
stage=stage,
flow=self.flow,
)
marker = ReevaluateMarker(binding=binding, user=user)
plan.markers.append(marker)
end_time = time()
LOGGER.debug(
"f(plan): Finished building",

View File

@ -1,6 +1,7 @@
"""flow planner tests"""
from unittest.mock import MagicMock, PropertyMock, patch
from django.contrib.sessions.middleware import SessionMiddleware
from django.core.cache import cache
from django.shortcuts import reverse
from django.test import RequestFactory, TestCase
@ -8,14 +9,19 @@ from guardian.shortcuts import get_anonymous_user
from passbook.core.models import User
from passbook.flows.exceptions import EmptyFlowException, FlowNonApplicableException
from passbook.flows.markers import ReevaluateMarker, StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner, cache_key
from passbook.policies.dummy.models import DummyPolicy
from passbook.policies.models import PolicyBinding
from passbook.policies.types import PolicyResult
from passbook.stages.dummy.models import DummyStage
POLICY_RESULT_MOCK = PropertyMock(return_value=PolicyResult(False))
POLICY_RETURN_FALSE = PropertyMock(return_value=PolicyResult(False))
TIME_NOW_MOCK = MagicMock(return_value=3)
POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True))
class TestFlowPlanner(TestCase):
"""Test planner logic"""
@ -40,7 +46,7 @@ class TestFlowPlanner(TestCase):
planner.plan(request)
@patch(
"passbook.policies.engine.PolicyEngine.result", POLICY_RESULT_MOCK,
"passbook.policies.engine.PolicyEngine.result", POLICY_RETURN_FALSE,
)
def test_non_applicable_plan(self):
"""Test that empty plan raises exception"""
@ -103,3 +109,71 @@ class TestFlowPlanner(TestCase):
planner.plan(request, default_context={PLAN_CONTEXT_PENDING_USER: user})
key = cache_key(flow, user)
self.assertTrue(cache.get(key) is not None)
def test_planner_marker_reevaluate(self):
"""Test that the planner creates the proper marker"""
flow = Flow.objects.create(
name="test-default-context",
slug="test-default-context",
designation=FlowDesignation.AUTHENTICATION,
)
FlowStageBinding.objects.create(
flow=flow,
stage=DummyStage.objects.create(name="dummy1"),
order=0,
re_evaluate_policies=True,
)
request = self.request_factory.get(
reverse("passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}),
)
request.user = get_anonymous_user()
planner = FlowPlanner(flow)
plan = planner.plan(request)
self.assertIsInstance(plan.markers[0], ReevaluateMarker)
def test_planner_reevaluate_actual(self):
"""Test planner with re-evaluate"""
flow = Flow.objects.create(
name="test-default-context",
slug="test-default-context",
designation=FlowDesignation.AUTHENTICATION,
)
false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
binding = FlowStageBinding.objects.create(
flow=flow, stage=DummyStage.objects.create(name="dummy1"), order=0
)
binding2 = FlowStageBinding.objects.create(
flow=flow,
stage=DummyStage.objects.create(name="dummy2"),
order=1,
re_evaluate_policies=True,
)
PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
request = self.request_factory.get(
reverse("passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}),
)
request.user = get_anonymous_user()
middleware = SessionMiddleware()
middleware.process_request(request)
request.session.save()
# Here we patch the dummy policy to evaluate to true so the stage is included
with patch(
"passbook.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE
):
planner = FlowPlanner(flow)
plan = planner.plan(request)
self.assertEqual(plan.stages[0], binding.stage)
self.assertEqual(plan.stages[1], binding2.stage)
self.assertIsInstance(plan.markers[0], StageMarker)
self.assertIsInstance(plan.markers[1], ReevaluateMarker)

View File

@ -3,16 +3,21 @@ from unittest.mock import MagicMock, PropertyMock, patch
from django.shortcuts import reverse
from django.test import Client, TestCase
from django.utils.encoding import force_text
from passbook.flows.exceptions import EmptyFlowException, FlowNonApplicableException
from passbook.flows.markers import ReevaluateMarker, StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import FlowPlan
from passbook.flows.views import NEXT_ARG_NAME, SESSION_KEY_PLAN
from passbook.lib.config import CONFIG
from passbook.policies.dummy.models import DummyPolicy
from passbook.policies.models import PolicyBinding
from passbook.policies.types import PolicyResult
from passbook.stages.dummy.models import DummyStage
POLICY_RESULT_MOCK = PropertyMock(return_value=PolicyResult(False))
POLICY_RETURN_FALSE = PropertyMock(return_value=PolicyResult(False))
POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True))
class TestFlowExecutor(TestCase):
@ -29,7 +34,9 @@ class TestFlowExecutor(TestCase):
designation=FlowDesignation.AUTHENTICATION,
)
stage = DummyStage.objects.create(name="dummy")
plan = FlowPlan(flow_pk=flow.pk.hex + "a", stages=[stage])
plan = FlowPlan(
flow_pk=flow.pk.hex + "a", stages=[stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
@ -45,7 +52,7 @@ class TestFlowExecutor(TestCase):
self.assertEqual(cancel_mock.call_count, 1)
@patch(
"passbook.policies.engine.PolicyEngine.result", POLICY_RESULT_MOCK,
"passbook.policies.engine.PolicyEngine.result", POLICY_RETURN_FALSE,
)
def test_invalid_non_applicable_flow(self):
"""Tests that a non-applicable flow returns the correct error message"""
@ -125,3 +132,197 @@ class TestFlowExecutor(TestCase):
session = self.client.session
plan: FlowPlan = session[SESSION_KEY_PLAN]
self.assertEqual(len(plan.stages), 1)
def test_reevaluate_remove_last(self):
"""Test planner with re-evaluate (last stage is removed)"""
flow = Flow.objects.create(
name="test-default-context",
slug="test-default-context",
designation=FlowDesignation.AUTHENTICATION,
)
false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
binding = FlowStageBinding.objects.create(
flow=flow, stage=DummyStage.objects.create(name="dummy1"), order=0
)
binding2 = FlowStageBinding.objects.create(
flow=flow,
stage=DummyStage.objects.create(name="dummy2"),
order=1,
re_evaluate_policies=True,
)
PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
# Here we patch the dummy policy to evaluate to true so the stage is included
with patch(
"passbook.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE
):
exec_url = reverse(
"passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}
)
# First request, run the planner
response = self.client.get(exec_url)
self.assertEqual(response.status_code, 200)
plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
self.assertEqual(plan.stages[0], binding.stage)
self.assertEqual(plan.stages[1], binding2.stage)
self.assertIsInstance(plan.markers[0], StageMarker)
self.assertIsInstance(plan.markers[1], ReevaluateMarker)
# Second request, this passes the first dummy stage
response = self.client.post(exec_url)
self.assertEqual(response.status_code, 302)
# third request, this should trigger the re-evaluate
# We do this request without the patch, so the policy results in false
response = self.client.post(exec_url)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse("passbook_core:overview"))
def test_reevaluate_remove_middle(self):
"""Test planner with re-evaluate (middle stage is removed)"""
flow = Flow.objects.create(
name="test-default-context",
slug="test-default-context",
designation=FlowDesignation.AUTHENTICATION,
)
false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
binding = FlowStageBinding.objects.create(
flow=flow, stage=DummyStage.objects.create(name="dummy1"), order=0
)
binding2 = FlowStageBinding.objects.create(
flow=flow,
stage=DummyStage.objects.create(name="dummy2"),
order=1,
re_evaluate_policies=True,
)
binding3 = FlowStageBinding.objects.create(
flow=flow, stage=DummyStage.objects.create(name="dummy3"), order=2
)
PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
# Here we patch the dummy policy to evaluate to true so the stage is included
with patch(
"passbook.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE
):
exec_url = reverse(
"passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}
)
# First request, run the planner
response = self.client.get(exec_url)
self.assertEqual(response.status_code, 200)
plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
self.assertEqual(plan.stages[0], binding.stage)
self.assertEqual(plan.stages[1], binding2.stage)
self.assertEqual(plan.stages[2], binding3.stage)
self.assertIsInstance(plan.markers[0], StageMarker)
self.assertIsInstance(plan.markers[1], ReevaluateMarker)
self.assertIsInstance(plan.markers[2], StageMarker)
# Second request, this passes the first dummy stage
response = self.client.post(exec_url)
self.assertEqual(response.status_code, 302)
plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
self.assertEqual(plan.stages[0], binding2.stage)
self.assertEqual(plan.stages[1], binding3.stage)
self.assertIsInstance(plan.markers[0], StageMarker)
self.assertIsInstance(plan.markers[1], StageMarker)
# third request, this should trigger the re-evaluate
# We do this request without the patch, so the policy results in false
response = self.client.post(exec_url)
self.assertEqual(response.status_code, 200)
self.assertJSONEqual(
force_text(response.content),
{"type": "redirect", "to": reverse("passbook_core:overview")},
)
def test_reevaluate_remove_consecutive(self):
"""Test planner with re-evaluate (consecutive stages are removed)"""
flow = Flow.objects.create(
name="test-default-context",
slug="test-default-context",
designation=FlowDesignation.AUTHENTICATION,
)
false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
binding = FlowStageBinding.objects.create(
flow=flow, stage=DummyStage.objects.create(name="dummy1"), order=0
)
binding2 = FlowStageBinding.objects.create(
flow=flow,
stage=DummyStage.objects.create(name="dummy2"),
order=1,
re_evaluate_policies=True,
)
binding3 = FlowStageBinding.objects.create(
flow=flow,
stage=DummyStage.objects.create(name="dummy3"),
order=2,
re_evaluate_policies=True,
)
binding4 = FlowStageBinding.objects.create(
flow=flow, stage=DummyStage.objects.create(name="dummy4"), order=2
)
PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0)
# Here we patch the dummy policy to evaluate to true so the stage is included
with patch(
"passbook.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE
):
exec_url = reverse(
"passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}
)
# First request, run the planner
response = self.client.get(exec_url)
self.assertEqual(response.status_code, 200)
self.assertIn("dummy1", force_text(response.content))
plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
self.assertEqual(plan.stages[0], binding.stage)
self.assertEqual(plan.stages[1], binding2.stage)
self.assertEqual(plan.stages[2], binding3.stage)
self.assertEqual(plan.stages[3], binding4.stage)
self.assertIsInstance(plan.markers[0], StageMarker)
self.assertIsInstance(plan.markers[1], ReevaluateMarker)
self.assertIsInstance(plan.markers[2], ReevaluateMarker)
self.assertIsInstance(plan.markers[3], StageMarker)
# Second request, this passes the first dummy stage
response = self.client.post(exec_url)
self.assertEqual(response.status_code, 302)
# third request, this should trigger the re-evaluate
# A get request will evaluate the policies and this will return stage 4
# but it won't save it, hence we cant' check the plan
response = self.client.get(exec_url)
self.assertEqual(response.status_code, 200)
self.assertIn("dummy4", force_text(response.content))
# fourth request, this confirms the last stage (dummy4)
# We do this request without the patch, so the policy results in false
response = self.client.post(exec_url)
self.assertEqual(response.status_code, 200)
self.assertJSONEqual(
force_text(response.content),
{"type": "redirect", "to": reverse("passbook_core:overview")},
)

View File

@ -26,7 +26,7 @@ class TestHelperView(TestCase):
def test_default_view_invalid_plan(self):
"""Test that ToDefaultFlow returns the expected URL (with an invalid plan)"""
flow = Flow.objects.filter(designation=FlowDesignation.INVALIDATION,).first()
plan = FlowPlan(flow_pk=flow.pk.hex + "aa", stages=[])
plan = FlowPlan(flow_pk=flow.pk.hex + "aa")
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()

View File

@ -86,6 +86,9 @@ class FlowExecutorView(View):
current_stage=self.current_stage,
flow_slug=self.flow.slug,
)
if not self.current_stage:
LOGGER.debug("f(exec): no more stages, flow is done.")
return self._flow_done()
stage_cls = path_to_class(self.current_stage.type)
self.current_stage_view = stage_cls(self)
self.current_stage_view.args = self.args
@ -98,6 +101,7 @@ class FlowExecutorView(View):
LOGGER.debug(
"f(exec): Passing GET",
view_class=class_to_path(self.current_stage_view.__class__),
stage=self.current_stage,
flow_slug=self.flow.slug,
)
stage_response = self.current_stage_view.get(request, *args, **kwargs)
@ -108,6 +112,7 @@ class FlowExecutorView(View):
LOGGER.debug(
"f(exec): Passing POST",
view_class=class_to_path(self.current_stage_view.__class__),
stage=self.current_stage,
flow_slug=self.flow.slug,
)
stage_response = self.current_stage_view.post(request, *args, **kwargs)
@ -133,7 +138,7 @@ class FlowExecutorView(View):
stage_class=class_to_path(self.current_stage_view.__class__),
flow_slug=self.flow.slug,
)
self.plan.stages.pop(0)
self.plan.pop()
self.request.session[SESSION_KEY_PLAN] = self.plan
if self.plan.stages:
LOGGER.debug(

View File

@ -5,6 +5,7 @@ from django.test import Client, TestCase
from django.utils.encoding import force_text
from passbook.core.models import User
from passbook.flows.markers import StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import FlowPlan
from passbook.flows.views import SESSION_KEY_PLAN
@ -35,7 +36,9 @@ class TestCaptchaStage(TestCase):
def test_valid(self):
"""Test valid captcha"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()

View File

@ -4,6 +4,7 @@ from django.test import Client, TestCase
from django.utils.encoding import force_text
from passbook.core.models import User
from passbook.flows.markers import StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import FlowPlan
from passbook.flows.views import SESSION_KEY_PLAN
@ -30,7 +31,9 @@ class TestConsentStage(TestCase):
def test_valid(self):
"""Test valid consent"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()

View File

@ -1,4 +1,6 @@
"""passbook multi-stage authentication engine"""
from typing import Any, Dict
from django.http import HttpRequest
from passbook.flows.stage import StageView
@ -10,3 +12,8 @@ class DummyStage(StageView):
def post(self, request: HttpRequest):
"""Just redirect to next stage"""
return self.executor.stage_ok()
def get_context_data(self, **kwargs: Dict[str, Any]) -> Dict[str, Any]:
kwargs = super().get_context_data(**kwargs)
kwargs["title"] = self.executor.current_stage.name
return kwargs

View File

@ -7,6 +7,7 @@ from django.test import Client, TestCase
from django.utils.encoding import force_text
from passbook.core.models import Token, User
from passbook.flows.markers import StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
from passbook.flows.views import SESSION_KEY_PLAN
@ -34,7 +35,9 @@ class TestEmailStage(TestCase):
def test_rendering(self):
"""Test with pending user"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
@ -48,7 +51,9 @@ class TestEmailStage(TestCase):
def test_without_user(self):
"""Test without pending user"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
@ -61,7 +66,9 @@ class TestEmailStage(TestCase):
def test_pending_user(self):
"""Test with pending user"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
@ -82,7 +89,9 @@ class TestEmailStage(TestCase):
"""Test with token"""
# Make sure token exists
self.test_pending_user()
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()

View File

@ -7,6 +7,7 @@ from django.utils.encoding import force_text
from guardian.shortcuts import get_anonymous_user
from passbook.core.models import User
from passbook.flows.markers import StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
from passbook.flows.views import SESSION_KEY_PLAN
@ -39,7 +40,9 @@ class TestUserLoginStage(TestCase):
def test_without_invitation_fail(self):
"""Test without any invitation, continue_flow_without_invitation not set."""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
plan.context[
PLAN_CONTEXT_AUTHENTICATION_BACKEND
@ -64,7 +67,9 @@ class TestUserLoginStage(TestCase):
"""Test without any invitation, continue_flow_without_invitation is set."""
self.stage.continue_flow_without_invitation = True
self.stage.save()
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
plan.context[
PLAN_CONTEXT_AUTHENTICATION_BACKEND
@ -90,7 +95,9 @@ class TestUserLoginStage(TestCase):
def test_with_invitation(self):
"""Test with invitation, check data in session"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
plan.context[
PLAN_CONTEXT_AUTHENTICATION_BACKEND

View File

@ -9,6 +9,7 @@ from django.test import Client, TestCase
from django.utils.encoding import force_text
from passbook.core.models import User
from passbook.flows.markers import StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
from passbook.flows.views import SESSION_KEY_PLAN
@ -43,7 +44,9 @@ class TestPasswordStage(TestCase):
def test_without_user(self):
"""Test without user"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
@ -68,7 +71,9 @@ class TestPasswordStage(TestCase):
designation=FlowDesignation.RECOVERY, slug="qewrqerqr"
)
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
@ -83,7 +88,9 @@ class TestPasswordStage(TestCase):
def test_valid_password(self):
"""Test with a valid pending user and valid password"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
@ -105,7 +112,9 @@ class TestPasswordStage(TestCase):
def test_invalid_password(self):
"""Test with a valid pending user and invalid password"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
@ -127,7 +136,9 @@ class TestPasswordStage(TestCase):
def test_permission_denied(self):
"""Test with a valid pending user and valid password.
Backend is patched to return PermissionError"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan

View File

@ -6,6 +6,7 @@ from django.test import Client, TestCase
from django.utils.encoding import force_text
from passbook.core.models import User
from passbook.flows.markers import StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import FlowPlan
from passbook.flows.views import SESSION_KEY_PLAN
@ -96,7 +97,9 @@ class TestPromptStage(TestCase):
def test_render(self):
"""Test render of form, check if all prompts are rendered correctly"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
@ -114,7 +117,9 @@ class TestPromptStage(TestCase):
def test_valid_form_with_policy(self) -> PromptForm:
"""Test form validation"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
expr = "return request.context['password_prompt'] == request.context['password2_prompt']"
expr_policy = ExpressionPolicy.objects.create(
name="validate-form", expression=expr
@ -126,7 +131,9 @@ class TestPromptStage(TestCase):
def test_invalid_form(self) -> PromptForm:
"""Test form validation"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
expr = "False"
expr_policy = ExpressionPolicy.objects.create(
name="validate-form", expression=expr
@ -138,7 +145,9 @@ class TestPromptStage(TestCase):
def test_valid_form_request(self):
"""Test a request with valid form data"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()

View File

@ -4,6 +4,7 @@ from django.test import Client, TestCase
from django.utils.encoding import force_text
from passbook.core.models import User
from passbook.flows.markers import StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
from passbook.flows.views import SESSION_KEY_PLAN
@ -29,7 +30,9 @@ class TestUserDeleteStage(TestCase):
def test_no_user(self):
"""Test without user set"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
@ -47,7 +50,9 @@ class TestUserDeleteStage(TestCase):
def test_user_delete_get(self):
"""Test Form render"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
@ -62,7 +67,9 @@ class TestUserDeleteStage(TestCase):
def test_user_delete_post(self):
"""Test User delete (actual)"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan

View File

@ -4,6 +4,7 @@ from django.test import Client, TestCase
from django.utils.encoding import force_text
from passbook.core.models import User
from passbook.flows.markers import StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
from passbook.flows.views import SESSION_KEY_PLAN
@ -30,7 +31,9 @@ class TestUserLoginStage(TestCase):
def test_valid_password(self):
"""Test with a valid pending user and backend"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
plan.context[
PLAN_CONTEXT_AUTHENTICATION_BACKEND
@ -53,7 +56,9 @@ class TestUserLoginStage(TestCase):
def test_without_user(self):
"""Test a plan without any pending user, resulting in a denied"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
@ -72,7 +77,9 @@ class TestUserLoginStage(TestCase):
def test_without_backend(self):
"""Test a plan with pending user, without backend, resulting in a denied"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan

View File

@ -4,6 +4,7 @@ from django.test import Client, TestCase
from django.utils.encoding import force_text
from passbook.core.models import User
from passbook.flows.markers import StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
from passbook.flows.views import SESSION_KEY_PLAN
@ -30,7 +31,9 @@ class TestUserLogoutStage(TestCase):
def test_valid_password(self):
"""Test with a valid pending user and backend"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
plan.context[
PLAN_CONTEXT_AUTHENTICATION_BACKEND

View File

@ -7,6 +7,7 @@ from django.test import Client, TestCase
from django.utils.encoding import force_text
from passbook.core.models import User
from passbook.flows.markers import StageMarker
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
from passbook.flows.views import SESSION_KEY_PLAN
@ -37,7 +38,9 @@ class TestUserWriteStage(TestCase):
for _ in range(8)
)
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PROMPT] = {
"username": "test-user",
"name": "name",
@ -71,7 +74,9 @@ class TestUserWriteStage(TestCase):
SystemRandom().choice(string.ascii_uppercase + string.digits)
for _ in range(8)
)
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create(
username="unittest", email="test@beryju.org"
)
@ -104,7 +109,9 @@ class TestUserWriteStage(TestCase):
def test_without_data(self):
"""Test without data results in error"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, stages=[self.stage])
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()