"""Basic SAML Processor""" import time import uuid from defusedxml import ElementTree from structlog import get_logger from passbook.saml_idp import exceptions, utils, xml_render MINUTES = 60 HOURS = 60 * MINUTES def get_random_id(): """Random hex id""" # It is very important that these random IDs NOT start with a number. random_id = '_' + uuid.uuid4().hex return random_id def get_time_string(delta=0): """Get Data formatted in SAML format""" return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() + delta)) # Design note: I've tried to make this easy to sub-class and override # just the bits you need to override. I've made use of object properties, # so that your sub-classes have access to all information: use wisely. # Formatting note: These methods are alphabetized. # pylint: disable=too-many-instance-attributes class Processor: """Base SAML 2.0 AuthnRequest to Response Processor. Sub-classes should provide Service Provider-specific functionality.""" is_idp_initiated = False _audience = '' _assertion_params = None _assertion_xml = None _assertion_id = None _django_request = None _relay_state = None _request = None _request_id = None _request_xml = None _request_params = None _response_id = None _response_xml = None _response_params = None _saml_request = None _saml_response = None _session_index = None _subject = None _subject_format = 'urn:oasis:names:tc:SAML:2.0:nameid-format:persistent' _system_params = {} @property def dotted_path(self): """Return a dotted path to this class""" return '{module}.{class_name}'.format( module=self.__module__, class_name=self.__class__.__name__) def __init__(self, remote): self.name = remote.name self._remote = remote self._logger = get_logger(__name__) self._system_params['ISSUER'] = self._remote.issuer self._logger.debug('processor configured') def _build_assertion(self): """Builds _assertion_params.""" self._determine_assertion_id() self._determine_audience() self._determine_subject() self._determine_session_index() self._assertion_params = { 'ASSERTION_ID': self._assertion_id, 'ASSERTION_SIGNATURE': '', # it's unsigned 'AUDIENCE': self._audience, 'AUTH_INSTANT': get_time_string(), 'ISSUE_INSTANT': get_time_string(), 'NOT_BEFORE': get_time_string(-1 * HOURS), # TODO: Make these settings. 'NOT_ON_OR_AFTER': get_time_string(86400 * MINUTES), 'SESSION_INDEX': self._session_index, 'SESSION_NOT_ON_OR_AFTER': get_time_string(8 * HOURS), 'SP_NAME_QUALIFIER': self._audience, 'SUBJECT': self._subject, 'SUBJECT_FORMAT': self._subject_format, } self._assertion_params.update(self._system_params) self._assertion_params.update(self._request_params) def _build_response(self): """Builds _response_params.""" self._determine_response_id() self._response_params = { 'ASSERTION': self._assertion_xml, 'ISSUE_INSTANT': get_time_string(), 'RESPONSE_ID': self._response_id, 'RESPONSE_SIGNATURE': '', # initially unsigned } self._response_params.update(self._system_params) self._response_params.update(self._request_params) def _decode_request(self): """Decodes _request_xml from _saml_request.""" self._request_xml = utils.decode_base64_and_inflate(self._saml_request).decode('utf-8') self._logger.debug('SAML request decoded') def _determine_assertion_id(self): """Determines the _assertion_id.""" self._assertion_id = get_random_id() def _determine_audience(self): """Determines the _audience.""" self._audience = self._remote.audience self._logger.info('determined audience') def _determine_response_id(self): """Determines _response_id.""" self._response_id = get_random_id() def _determine_session_index(self): self._session_index = self._django_request.session.session_key def _determine_subject(self): """Determines _subject and _subject_type for Assertion Subject.""" self._subject = self._django_request.user.email def _encode_response(self): """Encodes _response_xml to _encoded_xml.""" self._saml_response = utils.nice64(str.encode(self._response_xml)) def _extract_saml_request(self): """Retrieves the _saml_request AuthnRequest from the _django_request.""" self._saml_request = self._django_request.session['SAMLRequest'] self._relay_state = self._django_request.session['RelayState'] def _format_assertion(self): """Formats _assertion_params as _assertion_xml.""" self._assertion_params['ATTRIBUTES'] = [ { 'FriendlyName': 'eduPersonPrincipalName', 'Name': 'urn:oid:1.3.6.1.4.1.5923.1.1.1.6', 'Value': self._django_request.user.email, }, { 'FriendlyName': 'cn', 'Name': 'urn:oid:2.5.4.3', 'Value': self._django_request.user.name, }, { 'FriendlyName': 'mail', 'Name': 'urn:oid:0.9.2342.19200300.100.1.3', 'Value': self._django_request.user.email, }, { 'FriendlyName': 'displayName', 'Name': 'urn:oid:2.16.840.1.113730.3.1.241', 'Value': self._django_request.user.username, }, ] from passbook.saml_idp.models import SAMLPropertyMapping for mapping in self._remote.property_mappings.all().select_subclasses(): if isinstance(mapping, SAMLPropertyMapping): mapping_payload = { 'Name': mapping.saml_name, 'ValueArray': [], 'FriendlyName': mapping.friendly_name } for value in mapping.values: mapping_payload['ValueArray'].append(value.format( user=self._django_request.user, request=self._django_request )) self._assertion_params['ATTRIBUTES'].append(mapping_payload) self._assertion_xml = xml_render.get_assertion_xml( 'saml/xml/assertions/generic.xml', self._assertion_params, signed=True) def _format_response(self): """Formats _response_params as _response_xml.""" assertion_id = self._assertion_params['ASSERTION_ID'] self._response_xml = xml_render.get_response_xml(self._response_params, saml_provider=self._remote, assertion_id=assertion_id) def _get_django_response_params(self): """Returns a dictionary of parameters for the response template.""" return { 'acs_url': self._request_params['ACS_URL'], 'saml_response': self._saml_response, 'relay_state': self._relay_state, 'autosubmit': self._remote.application.skip_authorization, } def _parse_request(self): """Parses various parameters from _request_xml into _request_params.""" # Minimal test to verify that it's not binarily encoded still: if not str(self._request_xml.strip()).startswith('<'): raise Exception('RequestXML is not valid XML; ' 'it may need to be decoded or decompressed.') root = ElementTree.fromstring(self._request_xml) params = {} params['ACS_URL'] = root.attrib['AssertionConsumerServiceURL'] params['REQUEST_ID'] = root.attrib['ID'] params['DESTINATION'] = root.attrib.get('Destination', '') params['PROVIDER_NAME'] = root.attrib.get('ProviderName', '') self._request_params = params def _reset(self, django_request, sp_config=None): """Initialize (and reset) object properties, so we don't risk carrying over anything from the last authentication. If provided, use sp_config throughout; otherwise, it will be set in _validate_request(). """ self._assertion_params = sp_config self._assertion_xml = sp_config self._assertion_id = sp_config self._django_request = django_request self._relay_state = sp_config self._request = sp_config self._request_id = sp_config self._request_xml = sp_config self._request_params = sp_config self._response_id = sp_config self._response_xml = sp_config self._response_params = sp_config self._saml_request = sp_config self._saml_response = sp_config self._session_index = sp_config self._subject = sp_config self._subject_format = 'urn:oasis:names:tc:SAML:2.0:nameid-format:persistent' self._system_params = { 'ISSUER': self._remote.issuer } def _validate_request(self): """ Validates the SAML request against the SP configuration of this processor. Sub-classes should override this and raise a `CannotHandleAssertion` exception if the validation fails. Raises: CannotHandleAssertion: if the ACS URL specified in the SAML request doesn't match the one specified in the processor config. """ request_acs_url = self._request_params['ACS_URL'] if self._remote.acs_url != request_acs_url: msg = ("couldn't find ACS url '{}' in SAML2IDP_REMOTES " "setting.".format(request_acs_url)) self._logger.info(msg) raise exceptions.CannotHandleAssertion(msg) def _validate_user(self): """Validates the User. Sub-classes should override this and throw an CannotHandleAssertion Exception if the validation does not succeed.""" def can_handle(self, request): """Returns true if this processor can handle this request.""" self._reset(request) # Read the request. try: self._extract_saml_request() except Exception as exc: msg = "can't find SAML request in user session: %s" % exc self._logger.info(msg) raise exceptions.CannotHandleAssertion(msg) try: self._decode_request() except Exception as exc: msg = "can't decode SAML request: %s" % exc self._logger.info(msg) raise exceptions.CannotHandleAssertion(msg) try: self._parse_request() except Exception as exc: msg = "can't parse SAML request: %s" % exc self._logger.info(msg) raise exceptions.CannotHandleAssertion(msg) self._validate_request() return True def generate_response(self): """Processes request and returns template variables suitable for a response.""" # Build the assertion and response. # Only call can_handle if SP initiated Request, otherwise we have no Request if not self.is_idp_initiated: self.can_handle(self._django_request) self._validate_user() self._build_assertion() self._format_assertion() self._build_response() self._format_response() self._encode_response() # Return proper template params. return self._get_django_response_params() def init_deep_link(self, request, url): """Initialize this Processor to make an IdP-initiated call to the SP's deep-linked URL.""" self._reset(request) acs_url = self._remote.acs_url # NOTE: The following request params are made up. Some are blank, # because they comes over in the AuthnRequest, but we don't have an # AuthnRequest in this case: # - Destination: Should be this IdP's SSO endpoint URL. Not used in the response? # - ProviderName: According to the spec, this is optional. self._request_params = { 'ACS_URL': acs_url, 'DESTINATION': '', 'PROVIDER_NAME': '', } self._relay_state = url