policies/hibp: check in prompt data (#2845)
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
2fe553785e
commit
ec67b60219
|
@ -9,6 +9,7 @@ from structlog.stdlib import get_logger
|
||||||
from authentik.lib.utils.http import get_http_session
|
from authentik.lib.utils.http import get_http_session
|
||||||
from authentik.policies.models import Policy, PolicyResult
|
from authentik.policies.models import Policy, PolicyResult
|
||||||
from authentik.policies.types import PolicyRequest
|
from authentik.policies.types import PolicyRequest
|
||||||
|
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
@ -38,14 +39,17 @@ class HaveIBeenPwendPolicy(Policy):
|
||||||
"""Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5
|
"""Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5
|
||||||
characters of Password in request and checks if full hash is in response. Returns 0
|
characters of Password in request and checks if full hash is in response. Returns 0
|
||||||
if Password is not in result otherwise the count of how many times it was used."""
|
if Password is not in result otherwise the count of how many times it was used."""
|
||||||
if self.password_field not in request.context:
|
password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get(
|
||||||
|
self.password_field, request.context.get(self.password_field)
|
||||||
|
)
|
||||||
|
if not password:
|
||||||
LOGGER.warning(
|
LOGGER.warning(
|
||||||
"Password field not set in Policy Request",
|
"Password field not set in Policy Request",
|
||||||
field=self.password_field,
|
field=self.password_field,
|
||||||
fields=request.context.keys(),
|
fields=request.context.keys(),
|
||||||
)
|
)
|
||||||
return PolicyResult(False, _("Password not set in context"))
|
return PolicyResult(False, _("Password not set in context"))
|
||||||
password = str(request.context[self.password_field])
|
password = str(password)
|
||||||
|
|
||||||
pw_hash = sha1(password.encode("utf-8")).hexdigest() # nosec
|
pw_hash = sha1(password.encode("utf-8")).hexdigest() # nosec
|
||||||
url = f"https://api.pwnedpasswords.com/range/{pw_hash[:5]}"
|
url = f"https://api.pwnedpasswords.com/range/{pw_hash[:5]}"
|
||||||
|
|
|
@ -5,6 +5,7 @@ from guardian.shortcuts import get_anonymous_user
|
||||||
from authentik.lib.generators import generate_key
|
from authentik.lib.generators import generate_key
|
||||||
from authentik.policies.hibp.models import HaveIBeenPwendPolicy
|
from authentik.policies.hibp.models import HaveIBeenPwendPolicy
|
||||||
from authentik.policies.types import PolicyRequest, PolicyResult
|
from authentik.policies.types import PolicyRequest, PolicyResult
|
||||||
|
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
|
||||||
|
|
||||||
|
|
||||||
class TestHIBPPolicy(TestCase):
|
class TestHIBPPolicy(TestCase):
|
||||||
|
@ -26,7 +27,7 @@ class TestHIBPPolicy(TestCase):
|
||||||
name="test_false",
|
name="test_false",
|
||||||
)
|
)
|
||||||
request = PolicyRequest(get_anonymous_user())
|
request = PolicyRequest(get_anonymous_user())
|
||||||
request.context["password"] = "password" # nosec
|
request.context[PLAN_CONTEXT_PROMPT] = {"password": "password"} # nosec
|
||||||
result: PolicyResult = policy.passes(request)
|
result: PolicyResult = policy.passes(request)
|
||||||
self.assertFalse(result.passing)
|
self.assertFalse(result.passing)
|
||||||
self.assertTrue(result.messages[0].startswith("Password exists on "))
|
self.assertTrue(result.messages[0].startswith("Password exists on "))
|
||||||
|
@ -37,7 +38,7 @@ class TestHIBPPolicy(TestCase):
|
||||||
name="test_true",
|
name="test_true",
|
||||||
)
|
)
|
||||||
request = PolicyRequest(get_anonymous_user())
|
request = PolicyRequest(get_anonymous_user())
|
||||||
request.context["password"] = generate_key()
|
request.context[PLAN_CONTEXT_PROMPT] = {"password": generate_key()}
|
||||||
result: PolicyResult = policy.passes(request)
|
result: PolicyResult = policy.passes(request)
|
||||||
self.assertTrue(result.passing)
|
self.assertTrue(result.passing)
|
||||||
self.assertEqual(result.messages, tuple())
|
self.assertEqual(result.messages, tuple())
|
||||||
|
|
Reference in New Issue