198 lines
7.9 KiB
Python
198 lines
7.9 KiB
Python
"""passbook saml source processor"""
|
|
from typing import TYPE_CHECKING, Dict
|
|
|
|
from defusedxml import ElementTree
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import SuspiciousOperation
|
|
from django.http import HttpRequest, HttpResponse
|
|
from signxml import XMLVerifier
|
|
from structlog import get_logger
|
|
|
|
from passbook.core.models import User
|
|
from passbook.flows.models import Flow
|
|
from passbook.flows.planner import (
|
|
PLAN_CONTEXT_PENDING_USER,
|
|
PLAN_CONTEXT_SSO,
|
|
FlowPlanner,
|
|
)
|
|
from passbook.flows.views import SESSION_KEY_PLAN
|
|
from passbook.lib.utils.urls import redirect_with_qs
|
|
from passbook.policies.utils import delete_none_keys
|
|
from passbook.providers.saml.utils.encoding import decode_base64_and_inflate
|
|
from passbook.sources.saml.exceptions import (
|
|
MismatchedRequestID,
|
|
MissingSAMLResponse,
|
|
UnsupportedNameIDFormat,
|
|
)
|
|
from passbook.sources.saml.models import SAMLSource
|
|
from passbook.sources.saml.processors.constants import (
|
|
SAML_NAME_ID_FORMAT_EMAIL,
|
|
SAML_NAME_ID_FORMAT_PERSISTENT,
|
|
SAML_NAME_ID_FORMAT_TRANSIENT,
|
|
SAML_NAME_ID_FORMAT_WINDOWS,
|
|
SAML_NAME_ID_FORMAT_X509,
|
|
)
|
|
from passbook.sources.saml.processors.request import SESSION_REQUEST_ID
|
|
from passbook.stages.password.stage import PLAN_CONTEXT_AUTHENTICATION_BACKEND
|
|
from passbook.stages.prompt.stage import PLAN_CONTEXT_PROMPT
|
|
|
|
LOGGER = get_logger()
|
|
if TYPE_CHECKING:
|
|
from xml.etree.ElementTree import Element # nosec
|
|
|
|
CACHE_SEEN_REQUEST_ID = "passbook_saml_seen_ids_%s"
|
|
DEFAULT_BACKEND = "django.contrib.auth.backends.ModelBackend"
|
|
|
|
|
|
class ResponseProcessor:
|
|
"""SAML Response Processor"""
|
|
|
|
_source: SAMLSource
|
|
|
|
_root: "Element"
|
|
_root_xml: str
|
|
|
|
def __init__(self, source: SAMLSource):
|
|
self._source = source
|
|
|
|
def parse(self, request: HttpRequest):
|
|
"""Check if `request` contains SAML Response data, parse and validate it."""
|
|
# First off, check if we have any SAML Data at all.
|
|
raw_response = request.POST.get("SAMLResponse", None)
|
|
if not raw_response:
|
|
raise MissingSAMLResponse("Request does not contain 'SAMLResponse'")
|
|
# relay_state = request.POST.get('RelayState', None)
|
|
# Check if response is compressed, b64 decode it
|
|
self._root_xml = decode_base64_and_inflate(raw_response)
|
|
self._root = ElementTree.fromstring(self._root_xml)
|
|
|
|
self._verify_signed()
|
|
self._verify_request_id(request)
|
|
|
|
def _verify_signed(self):
|
|
"""Verify SAML Response's Signature"""
|
|
verifier = XMLVerifier()
|
|
verifier.verify(
|
|
self._root_xml, x509_cert=self._source.signing_kp.certificate_data
|
|
)
|
|
LOGGER.debug("Successfully verified signautre")
|
|
|
|
def _verify_request_id(self, request: HttpRequest):
|
|
if self._source.allow_idp_initiated:
|
|
# If IdP-initiated SSO flows are enabled, we want to cache the Response ID
|
|
# somewhat mitigate replay attacks
|
|
seen_ids = cache.get(CACHE_SEEN_REQUEST_ID % self._source.pk, [])
|
|
if self._root.attrib["ID"] in seen_ids:
|
|
raise SuspiciousOperation("Replay attack detected")
|
|
seen_ids.append(self._root.attrib["ID"])
|
|
cache.set(CACHE_SEEN_REQUEST_ID % self._source.pk, seen_ids)
|
|
return
|
|
if (
|
|
SESSION_REQUEST_ID not in request.session
|
|
or "InResponseTo" not in self._root.attrib
|
|
):
|
|
raise MismatchedRequestID(
|
|
"Missing InResponseTo and IdP-initiated Logins are not allowed"
|
|
)
|
|
if request.session[SESSION_REQUEST_ID] != self._root.attrib["InResponseTo"]:
|
|
raise MismatchedRequestID("Mismatched request ID")
|
|
|
|
def _handle_name_id_transient(self, request: HttpRequest) -> HttpResponse:
|
|
"""Handle a NameID with the Format of Transient. This is a bit more complex than other
|
|
formats, as we need to create a temporary User that is used in the session. This
|
|
user has an attribute that refers to our Source for cleanup. The user is also deleted
|
|
on logout and periodically."""
|
|
# Create a temporary User
|
|
name_id = self._get_name_id().text
|
|
user: User = User.objects.create(
|
|
username=name_id,
|
|
attributes={
|
|
"saml": {"source": self._source.pk.hex, "delete_on_logout": True}
|
|
},
|
|
)
|
|
LOGGER.debug("Created temporary user for NameID Transient", username=name_id)
|
|
user.set_unusable_password()
|
|
user.save()
|
|
return self._flow_response(
|
|
request,
|
|
self._source.authentication_flow,
|
|
**{
|
|
PLAN_CONTEXT_PENDING_USER: user,
|
|
PLAN_CONTEXT_AUTHENTICATION_BACKEND: DEFAULT_BACKEND,
|
|
},
|
|
)
|
|
|
|
def _get_name_id(self) -> "Element":
|
|
"""Get NameID Element"""
|
|
assertion = self._root.find("{urn:oasis:names:tc:SAML:2.0:assertion}Assertion")
|
|
subject = assertion.find("{urn:oasis:names:tc:SAML:2.0:assertion}Subject")
|
|
name_id = subject.find("{urn:oasis:names:tc:SAML:2.0:assertion}NameID")
|
|
if name_id is None:
|
|
raise ValueError("NameID Element not found!")
|
|
return name_id
|
|
|
|
def _get_name_id_filter(self) -> Dict[str, str]:
|
|
"""Returns the subject's NameID as a Filter for the `User`"""
|
|
name_id_el = self._get_name_id()
|
|
name_id = name_id_el.text
|
|
if not name_id:
|
|
raise UnsupportedNameIDFormat("Subject's NameID is empty.")
|
|
_format = name_id_el.attrib["Format"]
|
|
if _format == SAML_NAME_ID_FORMAT_EMAIL:
|
|
return {"email": name_id}
|
|
if _format == SAML_NAME_ID_FORMAT_PERSISTENT:
|
|
return {"username": name_id}
|
|
if _format == SAML_NAME_ID_FORMAT_X509:
|
|
# This attribute is statically set by the LDAP source
|
|
return {"attributes__distinguishedName": name_id}
|
|
if _format == SAML_NAME_ID_FORMAT_WINDOWS:
|
|
if "\\" in name_id:
|
|
name_id = name_id.split("\\")[1]
|
|
return {"username": name_id}
|
|
raise UnsupportedNameIDFormat(
|
|
f"Assertion contains NameID with unsupported format {_format}."
|
|
)
|
|
|
|
def prepare_flow(self, request: HttpRequest) -> HttpResponse:
|
|
"""Prepare flow plan depending on whether or not the user exists"""
|
|
name_id = self._get_name_id()
|
|
# Sanity check, show a warning if NameIDPolicy doesn't match what we go
|
|
if self._source.name_id_policy != name_id.attrib["Format"]:
|
|
LOGGER.warning(
|
|
"NameID from IdP doesn't match our policy",
|
|
expected=self._source.name_id_policy,
|
|
got=name_id.attrib["Format"],
|
|
)
|
|
# transient NameIDs are handeled seperately as they don't have to go through flows.
|
|
if name_id.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT:
|
|
return self._handle_name_id_transient(request)
|
|
|
|
name_id_filter = self._get_name_id_filter()
|
|
matching_users = User.objects.filter(**name_id_filter)
|
|
if matching_users.exists():
|
|
# User exists already, switch to authentication flow
|
|
return self._flow_response(
|
|
request,
|
|
self._source.authentication_flow,
|
|
**{
|
|
PLAN_CONTEXT_PENDING_USER: matching_users.first(),
|
|
PLAN_CONTEXT_AUTHENTICATION_BACKEND: DEFAULT_BACKEND,
|
|
},
|
|
)
|
|
return self._flow_response(
|
|
request,
|
|
self._source.enrollment_flow,
|
|
**{PLAN_CONTEXT_PROMPT: delete_none_keys(name_id_filter)},
|
|
)
|
|
|
|
def _flow_response(
|
|
self, request: HttpRequest, flow: Flow, **kwargs
|
|
) -> HttpResponse:
|
|
kwargs[PLAN_CONTEXT_SSO] = True
|
|
request.session[SESSION_KEY_PLAN] = FlowPlanner(flow).plan(request, kwargs)
|
|
return redirect_with_qs(
|
|
"passbook_flows:flow-executor-shell",
|
|
request.GET,
|
|
flow_slug=flow.slug,
|
|
)
|