providers/saml: fix AuthnRequest Signature validation, add unittests

This commit is contained in:
Jens Langhammer 2020-07-12 16:17:53 +02:00
parent ff6e270886
commit 0ff4545bab
3 changed files with 113 additions and 14 deletions

View File

@ -1,8 +1,12 @@
"""SAML AuthNRequest Parser and dataclass""" """SAML AuthNRequest Parser and dataclass"""
from base64 import b64decode
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional from typing import Optional
from urllib.parse import quote_plus
from cryptography.exceptions import InvalidSignature from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from defusedxml import ElementTree from defusedxml import ElementTree
from signxml import XMLVerifier from signxml import XMLVerifier
from structlog import get_logger from structlog import get_logger
@ -21,7 +25,7 @@ class AuthNRequest:
# pylint: disable=invalid-name # pylint: disable=invalid-name
id: Optional[str] = None id: Optional[str] = None
relay_state: str = "" relay_state: Optional[str] = None
class AuthNRequestParser: class AuthNRequestParser:
@ -32,18 +36,7 @@ class AuthNRequestParser:
def __init__(self, provider: SAMLProvider): def __init__(self, provider: SAMLProvider):
self.provider = provider self.provider = provider
def parse(self, saml_request: str, relay_state: str) -> AuthNRequest: def _parse_xml(self, decoded_xml: str, relay_state: Optional[str]) -> AuthNRequest:
"""Parses various parameters from _request_xml into _request_params."""
decoded_xml = decode_base64_and_inflate(saml_request)
if self.provider.require_signing and self.provider.signing_kp:
try:
XMLVerifier().verify(
decoded_xml, x509_cert=self.provider.signing_kp.certificate_data
)
except InvalidSignature as exc:
raise CannotHandleAssertion("Failed to verify signature") from exc
root = ElementTree.fromstring(decoded_xml) root = ElementTree.fromstring(decoded_xml)
@ -60,6 +53,53 @@ class AuthNRequestParser:
auth_n_request = AuthNRequest(id=root.attrib["ID"], relay_state=relay_state) auth_n_request = AuthNRequest(id=root.attrib["ID"], relay_state=relay_state)
return auth_n_request return auth_n_request
def parse(self, saml_request: str, relay_state: Optional[str]) -> AuthNRequest:
"""Validate and parse raw request with enveloped signautre."""
decoded_xml = decode_base64_and_inflate(saml_request)
if self.provider.signing_kp:
try:
XMLVerifier().verify(
decoded_xml, x509_cert=self.provider.signing_kp.certificate_data
)
except InvalidSignature as exc:
raise CannotHandleAssertion("Failed to verify signature") from exc
return self._parse_xml(decoded_xml, relay_state)
def parse_detached(
self,
saml_request: str,
relay_state: Optional[str],
signature: Optional[str] = None,
sig_alg: Optional[str] = None,
) -> AuthNRequest:
"""Validate and parse raw request with detached signature"""
decoded_xml = decode_base64_and_inflate(saml_request)
if signature and sig_alg:
# if sig_alg == "http://www.w3.org/2000/09/xmldsig#rsa-sha1":
sig_hash = hashes.SHA1() # nosec
querystring = f"SAMLRequest={quote_plus(saml_request)}&"
if relay_state is not None:
querystring += f"RelayState={quote_plus(relay_state)}&"
querystring += f"SigAlg={sig_alg}"
public_key = self.provider.signing_kp.private_key.public_key()
try:
public_key.verify(
b64decode(signature),
querystring.encode(),
padding.PSS(
mgf=padding.MGF1(sig_hash), salt_length=padding.PSS.MAX_LENGTH
),
sig_hash,
)
except InvalidSignature as exc:
raise CannotHandleAssertion("Failed to verify signature") from exc
return self._parse_xml(decoded_xml, relay_state)
def idp_initiated(self) -> AuthNRequest: def idp_initiated(self) -> AuthNRequest:
"""Create IdP Initiated AuthNRequest""" """Create IdP Initiated AuthNRequest"""
return AuthNRequest() return AuthNRequest()

View File

@ -0,0 +1,55 @@
"""Test AuthN Request generator and parser"""
from django.test import RequestFactory, TestCase
from passbook.crypto.models import CertificateKeyPair
from passbook.flows.models import Flow
from passbook.providers.saml.models import SAMLProvider
from passbook.providers.saml.processors.request_parser import AuthNRequestParser
from passbook.providers.saml.utils.encoding import deflate_and_base64_encode
from passbook.sources.saml.models import SAMLSource
from passbook.sources.saml.processors.request import RequestProcessor
class TestAuthNRequest(TestCase):
"""Test AuthN Request generator and parser"""
def setUp(self):
self.provider = SAMLProvider.objects.create(
authorization_flow=Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
),
acs_url="http://testserver/source/saml/provider/acs/",
signing_kp=CertificateKeyPair.objects.first(),
)
self.source = SAMLSource.objects.create(
slug="provider",
issuer="passbook",
signing_kp=CertificateKeyPair.objects.first(),
)
self.factory = RequestFactory()
def test_signed_valid(self):
"""Test generated AuthNRequest with valid signature"""
http_request = self.factory.get("/")
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
request = request_proc.build_auth_n()
# Now we check the ID and signature
parsed_request = AuthNRequestParser(self.provider).parse(
deflate_and_base64_encode(request), "test_state"
)
self.assertEqual(parsed_request.id, request_proc.request_id)
self.assertEqual(parsed_request.relay_state, "test_state")
def test_signed_valid_detached(self):
"""Test generated AuthNRequest with valid signature (detached)"""
http_request = self.factory.get("/")
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
params = request_proc.build_auth_n_detached()
# Now we check the ID and signature
parsed_request = AuthNRequestParser(self.provider).parse_detached(
params["SAMLRequest"], "test_state", params["Signature"], params["SigAlg"]
)
self.assertEqual(parsed_request.id, request_proc.request_id)
self.assertEqual(parsed_request.relay_state, "test_state")

View File

@ -39,6 +39,8 @@ from passbook.stages.consent.stage import PLAN_CONTEXT_CONSENT_TEMPLATE
LOGGER = get_logger() LOGGER = get_logger()
URL_VALIDATOR = URLValidator(schemes=("http", "https")) URL_VALIDATOR = URLValidator(schemes=("http", "https"))
SESSION_KEY_SAML_REQUEST = "SAMLRequest" SESSION_KEY_SAML_REQUEST = "SAMLRequest"
SESSION_KEY_SAML_SIGNATURE = "Signature"
SESSION_KEY_SAML_SIG_ALG = "SigAlg"
SESSION_KEY_SAML_RESPONSE = "SAMLResponse" SESSION_KEY_SAML_RESPONSE = "SAMLResponse"
SESSION_KEY_RELAY_STATE = "RelayState" SESSION_KEY_RELAY_STATE = "RelayState"
SESSION_KEY_AUTH_N_REQUEST = "authn_request" SESSION_KEY_AUTH_N_REQUEST = "authn_request"
@ -102,9 +104,11 @@ class SAMLSSOBindingRedirectView(SAMLSSOView):
) )
try: try:
auth_n_request = AuthNRequestParser(self.provider).parse( auth_n_request = AuthNRequestParser(self.provider).parse_detached(
request.GET[SESSION_KEY_SAML_REQUEST], request.GET[SESSION_KEY_SAML_REQUEST],
request.GET.get(SESSION_KEY_RELAY_STATE, ""), request.GET.get(SESSION_KEY_RELAY_STATE, ""),
request.GET.get(SESSION_KEY_SAML_SIGNATURE),
request.GET.get(SESSION_KEY_SAML_SIG_ALG),
) )
self.request.session[SESSION_KEY_AUTH_N_REQUEST] = auth_n_request self.request.session[SESSION_KEY_AUTH_N_REQUEST] = auth_n_request
except CannotHandleAssertion as exc: except CannotHandleAssertion as exc: