sources/ldap: divide connector into password, sync and auth, add unittests for password

This commit is contained in:
Jens Langhammer 2020-09-21 21:35:50 +02:00
parent 945d5bfaf6
commit 59e8dca499
12 changed files with 485 additions and 342 deletions

View File

@ -1,9 +1,12 @@
"""passbook LDAP Authentication Backend"""
from typing import Optional
import ldap3
from django.contrib.auth.backends import ModelBackend
from django.http import HttpRequest
from structlog import get_logger
from passbook.sources.ldap.connector import Connector
from passbook.core.models import User
from passbook.sources.ldap.models import LDAPSource
LOGGER = get_logger()
@ -18,7 +21,56 @@ class LDAPBackend(ModelBackend):
return None
for source in LDAPSource.objects.filter(enabled=True):
LOGGER.debug("LDAP Auth attempt", source=source)
user = Connector(source).auth_user(**kwargs)
user = self.auth_user(source, **kwargs)
if user:
return user
return None
def auth_user(
self, source: LDAPSource, password: str, **filters: str
) -> Optional[User]:
"""Try to bind as either user_dn or mail with password.
Returns True on success, otherwise False"""
users = User.objects.filter(**filters)
if not users.exists():
return None
user: User = users.first()
if "distinguishedName" not in user.attributes:
LOGGER.debug(
"User doesn't have DN set, assuming not LDAP imported.", user=user
)
return None
# Either has unusable password,
# or has a password, but couldn't be authenticated by ModelBackend.
# This means we check with a bind to see if the LDAP password has changed
if self.auth_user_by_bind(source, user, password):
# Password given successfully binds to LDAP, so we save it in our Database
LOGGER.debug("Updating user's password in DB", user=user)
user.set_password(password, signal=False)
user.save()
return user
# Password doesn't match
LOGGER.debug("Failed to bind, password invalid")
return None
def auth_user_by_bind(
self, source: LDAPSource, user: User, password: str
) -> Optional[User]:
"""Attempt authentication by binding to the LDAP server as `user`. This
method should be avoided as its slow to do the bind."""
# Try to bind as new user
LOGGER.debug("Attempting Binding as user", user=user)
try:
temp_connection = ldap3.Connection(
source.connection.server,
user=user.attributes.get("distinguishedName"),
password=password,
raise_exceptions=True,
)
temp_connection.bind()
return user
except ldap3.core.exceptions.LDAPInvalidCredentialsResult as exception:
LOGGER.debug("LDAPInvalidCredentialsResult", user=user, error=exception)
except ldap3.core.exceptions.LDAPException as exception:
LOGGER.warning(exception)
return None

View File

@ -0,0 +1,155 @@
"""Help validate and update passwords in LDAP"""
from enum import IntFlag
from re import split
from typing import Optional
import ldap3
import ldap3.core.exceptions
from structlog import get_logger
from passbook.core.models import User
from passbook.sources.ldap.models import LDAPSource
LOGGER = get_logger()
NON_ALPHA = r"~!@#$%^&*_-+=`|\(){}[]:;\"'<>,.?/"
RE_DISPLAYNAME_SEPARATORS = r",\.—_\s#\t"
class PwdProperties(IntFlag):
"""Possible values for the pwdProperties attribute"""
DOMAIN_PASSWORD_COMPLEX = 1
DOMAIN_PASSWORD_NO_ANON_CHANGE = 2
DOMAIN_PASSWORD_NO_CLEAR_CHANGE = 4
DOMAIN_LOCKOUT_ADMINS = 8
DOMAIN_PASSWORD_STORE_CLEARTEXT = 16
DOMAIN_REFUSE_PASSWORD_CHANGE = 32
class PasswordCategories(IntFlag):
"""Password categories as defined by Microsoft, a category can only be counted
once, hence intflag."""
NONE = 0
ALPHA_LOWER = 1
ALPHA_UPPER = 2
ALPHA_OTHER = 4
NUMERIC = 8
SYMBOL = 16
class LDAPPasswordChanger:
"""Help validate and update passwords in LDAP"""
_source: LDAPSource
def __init__(self, source: LDAPSource) -> None:
self._source = source
def get_domain_root_dn(self) -> str:
"""Attempt to get root DN via MS specific fields or generic LDAP fields"""
info = self._source.connection.server.info
if "rootDomainNamingContext" in info.other:
return info.other["rootDomainNamingContext"][0]
naming_contexts = info.naming_contexts
naming_contexts.sort(key=len)
return naming_contexts[0]
def check_ad_password_complexity_enabled(self) -> bool:
"""Check if DOMAIN_PASSWORD_COMPLEX is enabled"""
root_dn = self.get_domain_root_dn()
root_attrs = self._source.connection.extend.standard.paged_search(
search_base=root_dn,
search_filter="(objectClass=*)",
search_scope=ldap3.BASE,
attributes=["pwdProperties"],
)
root_attrs = list(root_attrs)[0]
pwd_properties = PwdProperties(root_attrs["attributes"]["pwdProperties"])
if PwdProperties.DOMAIN_PASSWORD_COMPLEX in pwd_properties:
return True
return False
def change_password(self, user: User, password: str):
"""Change user's password"""
user_dn = user.attributes.get("distinguishedName", None)
if not user_dn:
raise AttributeError("User has no distinguishedName set.")
self._source.connection.extend.microsoft.modify_password(user_dn, password)
def _ad_check_password_existing(self, password: str, user_dn: str) -> bool:
"""Check if a password contains sAMAccount or displayName"""
users = list(
self._source.connection.extend.standard.paged_search(
search_base=user_dn,
search_filter=self._source.user_object_filter,
search_scope=ldap3.BASE,
attributes=["displayName", "sAMAccountName"],
)
)
if len(users) != 1:
raise AssertionError()
user_attributes = users[0]["attributes"]
# If sAMAccountName is longer than 3 chars, check if its contained in password
if len(user_attributes["sAMAccountName"]) >= 3:
if password.lower() in user_attributes["sAMAccountName"].lower():
return False
display_name_tokens = split(
RE_DISPLAYNAME_SEPARATORS, user_attributes["displayName"]
)
for token in display_name_tokens:
# Ignore tokens under 3 chars
if len(token) < 3:
continue
if token.lower() in password.lower():
return False
return True
def ad_password_complexity(
self, password: str, user: Optional[User] = None
) -> bool:
"""Check if password matches Active direcotry password policies
https://docs.microsoft.com/en-us/windows/security/threat-protection/
security-policy-settings/password-must-meet-complexity-requirements
"""
if user:
# Check if password contains sAMAccountName or displayNames
if "distinguishedName" in user.attributes:
existing_user_check = self._ad_check_password_existing(
password, user.attributes.get("distinguishedName")
)
if not existing_user_check:
LOGGER.debug("Password failed name check", user=user)
return existing_user_check
# Step 2, match at least 3 of 5 categories
matched_categories = PasswordCategories.NONE
required = 3
for letter in password:
# Only match one category per letter,
if letter.islower():
matched_categories |= PasswordCategories.ALPHA_LOWER
elif letter.isupper():
matched_categories |= PasswordCategories.ALPHA_UPPER
elif not letter.isascii() and letter.isalpha():
# Not exactly matching microsoft's policy, but count it as "Other unicode" char
# when its alpha and not ascii
matched_categories |= PasswordCategories.ALPHA_OTHER
elif letter.isnumeric():
matched_categories |= PasswordCategories.NUMERIC
elif letter in NON_ALPHA:
matched_categories |= PasswordCategories.SYMBOL
if bin(matched_categories).count("1") < required:
LOGGER.debug(
"Password didn't match enough categories",
has=matched_categories,
must=required,
)
return False
LOGGER.debug(
"Password matched categories", has=matched_categories, must=required
)
return True

View File

@ -10,8 +10,8 @@ from ldap3.core.exceptions import LDAPException
from passbook.core.models import User
from passbook.core.signals import password_changed
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
from passbook.sources.ldap.connector import Connector
from passbook.sources.ldap.models import LDAPSource
from passbook.sources.ldap.password import LDAPPasswordChanger
from passbook.sources.ldap.tasks import sync_single
from passbook.stages.prompt.signals import password_validate
@ -32,9 +32,9 @@ def ldap_password_validate(sender, password: str, plan_context: Dict[str, Any],
if not sources.exists():
return
source = sources.first()
connector = Connector(source)
if connector.check_ad_password_complexity_enabled():
passing = connector.ad_password_complexity(
changer = LDAPPasswordChanger(source)
if changer.check_ad_password_complexity_enabled():
passing = changer.ad_password_complexity(
password, plan_context.get(PLAN_CONTEXT_PENDING_USER, None)
)
if not passing:
@ -52,8 +52,8 @@ def ldap_sync_password(sender, user: User, password: str, **_):
if not sources.exists():
return
source = sources.first()
connector = Connector(source)
changer = LDAPPasswordChanger(source)
try:
connector.change_password(user, password)
changer.change_password(user, password)
except LDAPException as exc:
raise ValidationError("Failed to set password") from exc

View File

@ -1,7 +1,5 @@
"""Wrapper for ldap3 to easily manage user"""
from enum import IntFlag
from re import split
from typing import Any, Dict, Optional
"""Sync LDAP Users and groups into passbook"""
from typing import Any, Dict
import ldap3
import ldap3.core.exceptions
@ -14,23 +12,9 @@ from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource
LOGGER = get_logger()
NON_ALPHA = r"~!@#$%^&*_-+=`|\(){}[]:;\"'<>,.?/"
RE_DISPLAYNAME_SEPARATORS = r",\.—_\s#\t"
class PwdProperties(IntFlag):
"""Possible values for the pwdProperties attribute"""
DOMAIN_PASSWORD_COMPLEX = 1
DOMAIN_PASSWORD_NO_ANON_CHANGE = 2
DOMAIN_PASSWORD_NO_CLEAR_CHANGE = 4
DOMAIN_LOCKOUT_ADMINS = 8
DOMAIN_PASSWORD_STORE_CLEARTEXT = 16
DOMAIN_REFUSE_PASSWORD_CHANGE = 32
class Connector:
"""Wrapper for ldap3 to easily manage user authentication and creation"""
class LDAPSynchronizer:
"""Sync LDAP Users and groups into passbook"""
_source: LDAPSource
@ -198,151 +182,3 @@ class Connector:
"distinguishedName"
)
return properties
def auth_user(self, password: str, **filters: str) -> Optional[User]:
"""Try to bind as either user_dn or mail with password.
Returns True on success, otherwise False"""
users = User.objects.filter(**filters)
if not users.exists():
return None
user: User = users.first()
if "distinguishedName" not in user.attributes:
LOGGER.debug(
"User doesn't have DN set, assuming not LDAP imported.", user=user
)
return None
# Either has unusable password,
# or has a password, but couldn't be authenticated by ModelBackend.
# This means we check with a bind to see if the LDAP password has changed
if self.auth_user_by_bind(user, password):
# Password given successfully binds to LDAP, so we save it in our Database
LOGGER.debug("Updating user's password in DB", user=user)
user.set_password(password, signal=False)
user.save()
return user
# Password doesn't match
LOGGER.debug("Failed to bind, password invalid")
return None
def auth_user_by_bind(self, user: User, password: str) -> Optional[User]:
"""Attempt authentication by binding to the LDAP server as `user`. This
method should be avoided as its slow to do the bind."""
# Try to bind as new user
LOGGER.debug("Attempting Binding as user", user=user)
try:
temp_connection = ldap3.Connection(
self._source.connection.server,
user=user.attributes.get("distinguishedName"),
password=password,
raise_exceptions=True,
)
temp_connection.bind()
return user
except ldap3.core.exceptions.LDAPInvalidCredentialsResult as exception:
LOGGER.debug("LDAPInvalidCredentialsResult", user=user, error=exception)
except ldap3.core.exceptions.LDAPException as exception:
LOGGER.warning(exception)
return None
def get_domain_root_dn(self) -> str:
"""Attempt to get root DN via MS specific fields or generic LDAP fields"""
info = self._source.connection.server.info
if "rootDomainNamingContext" in info.other:
return info.other["rootDomainNamingContext"][0]
naming_contexts = info.naming_contexts
naming_contexts.sort(key=len)
return naming_contexts[0]
def check_ad_password_complexity_enabled(self) -> bool:
"""Check if DOMAIN_PASSWORD_COMPLEX is enabled"""
root_dn = self.get_domain_root_dn()
root_attrs = self._source.connection.extend.standard.paged_search(
search_base=root_dn,
search_filter="(objectClass=*)",
search_scope=ldap3.BASE,
attributes=["pwdProperties"],
)
root_attrs = list(root_attrs)[0]
pwd_properties = PwdProperties(root_attrs["attributes"]["pwdProperties"])
if PwdProperties.DOMAIN_PASSWORD_COMPLEX in pwd_properties:
return True
return False
def change_password(self, user: User, password: str):
"""Change user's password"""
user_dn = user.attributes.get("distinguishedName", None)
if not user_dn:
raise AttributeError("User has no distinguishedName set.")
self._source.connection.extend.microsoft.modify_password(user_dn, password)
def _ad_check_password_existing(self, password: str, user_dn: str) -> bool:
"""Check if a password contains sAMAccount or displayName"""
users = self._source.connection.extend.standard.paged_search(
search_base=user_dn,
search_filter="(objectClass=*)",
search_scope=ldap3.BASE,
attributes=["displayName", "sAMAccountName"],
)
if len(users) != 1:
raise AssertionError()
user = users[0]
# If sAMAccountName is longer than 3 chars, check if its contained in password
if len(user.sAMAccountName.value) >= 3:
if password.lower() in user.sAMAccountName.value.lower():
return False
display_name_tokens = split(RE_DISPLAYNAME_SEPARATORS, user.displayName.value)
for token in display_name_tokens:
# Ignore tokens under 3 chars
if len(token) < 3:
continue
if token.lower() in password.lower():
return False
return True
def ad_password_complexity(
self, password: str, user: Optional[User] = None
) -> bool:
"""Check if password matches Active direcotry password policies
https://docs.microsoft.com/en-us/windows/security/threat-protection/
security-policy-settings/password-must-meet-complexity-requirements
"""
if user:
# Check if password contains sAMAccountName or displayNames
if "distinguishedName" in user.attributes:
existing_user_check = self._ad_check_password_existing(
password, user.attributes.get("distinguishedName")
)
if not existing_user_check:
LOGGER.debug("Password failed name check", user=user)
return existing_user_check
# Step 2, match at least 3 of 5 categories
matched_categories = 0
required = 3
for letter in password:
# Only match one category per letter,
if letter.islower():
matched_categories += 1
elif letter.isupper():
matched_categories += 1
elif not letter.isascii() and letter.isalpha():
# Not exactly matching microsoft's policy, but count it as "Other unicode" char
# when its alpha and not ascii
matched_categories += 1
elif letter.isnumeric():
matched_categories += 1
elif letter in NON_ALPHA:
matched_categories += 1
if matched_categories < required:
LOGGER.debug(
"Password didn't match enough categories",
has=matched_categories,
must=required,
)
return False
LOGGER.debug(
"Password matched categories", has=matched_categories, must=required
)
return True

View File

@ -4,8 +4,8 @@ from time import time
from django.core.cache import cache
from passbook.root.celery import CELERY_APP
from passbook.sources.ldap.connector import Connector
from passbook.sources.ldap.models import LDAPSource
from passbook.sources.ldap.sync import LDAPSynchronizer
@CELERY_APP.task()
@ -19,9 +19,9 @@ def sync():
def sync_single(source_pk):
"""Sync a single source"""
source: LDAPSource = LDAPSource.objects.get(pk=source_pk)
connector = Connector(source)
connector.sync_users()
connector.sync_groups()
connector.sync_membership()
syncer = LDAPSynchronizer(source)
syncer.sync_users()
syncer.sync_groups()
syncer.sync_membership()
cache_key = source.state_cache_prefix("last_sync")
cache.set(cache_key, time(), timeout=60 * 60)

View File

@ -1,149 +0,0 @@
"""LDAP Source tests"""
from unittest.mock import Mock, PropertyMock, patch
from django.test import TestCase
from ldap3 import MOCK_SYNC, OFFLINE_AD_2012_R2, Connection, Server
from passbook.core.models import Group, User
from passbook.providers.oauth2.generators import generate_client_secret
from passbook.sources.ldap.auth import LDAPBackend
from passbook.sources.ldap.connector import Connector
from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource
from passbook.sources.ldap.tasks import sync
def _build_mock_connection() -> Connection:
"""Create mock connection"""
server = Server("my_fake_server", get_info=OFFLINE_AD_2012_R2)
_pass = "foo" # noqa # nosec
connection = Connection(
server,
user="cn=my_user,ou=test,o=lab",
password=_pass,
client_strategy=MOCK_SYNC,
)
connection.strategy.add_entry(
"cn=group1,ou=groups,ou=test,o=lab",
{
"name": "test-group",
"objectSid": "unique-test-group",
"objectCategory": "Group",
"distinguishedName": "cn=group1,ou=groups,ou=test,o=lab",
},
)
# Group without SID
connection.strategy.add_entry(
"cn=group2,ou=groups,ou=test,o=lab",
{
"name": "test-group",
"objectCategory": "Group",
"distinguishedName": "cn=group2,ou=groups,ou=test,o=lab",
},
)
connection.strategy.add_entry(
"cn=user0,ou=users,ou=test,o=lab",
{
"userPassword": LDAP_PASSWORD,
"sAMAccountName": "user0_sn",
"name": "user0_sn",
"revision": 0,
"objectSid": "user0",
"objectCategory": "Person",
"memberOf": "cn=group1,ou=groups,ou=test,o=lab",
},
)
# User without SID
connection.strategy.add_entry(
"cn=user1,ou=users,ou=test,o=lab",
{
"userPassword": "test1111",
"sAMAccountName": "user2_sn",
"name": "user1_sn",
"revision": 0,
"objectCategory": "Person",
},
)
# Duplicate users
connection.strategy.add_entry(
"cn=user2,ou=users,ou=test,o=lab",
{
"userPassword": "test2222",
"sAMAccountName": "user2_sn",
"name": "user2_sn",
"revision": 0,
"objectSid": "unique-test2222",
"objectCategory": "Person",
},
)
connection.strategy.add_entry(
"cn=user3,ou=users,ou=test,o=lab",
{
"userPassword": "test2222",
"sAMAccountName": "user2_sn",
"name": "user2_sn",
"revision": 0,
"objectSid": "unique-test2222",
"objectCategory": "Person",
},
)
connection.bind()
return connection
LDAP_PASSWORD = generate_client_secret()
LDAP_CONNECTION_PATCH = PropertyMock(return_value=_build_mock_connection())
class LDAPSourceTests(TestCase):
"""LDAP Source tests"""
def setUp(self):
self.source = LDAPSource.objects.create(
name="ldap",
slug="ldap",
base_dn="ou=test,o=lab",
additional_user_dn="ou=users",
additional_group_dn="ou=groups",
)
self.source.property_mappings.set(LDAPPropertyMapping.objects.all())
self.source.save()
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_sync_users(self):
"""Test user sync"""
connector = Connector(self.source)
connector.sync_users()
self.assertTrue(User.objects.filter(username="user0_sn").exists())
self.assertFalse(User.objects.filter(username="user1_sn").exists())
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_sync_groups(self):
"""Test group sync"""
connector = Connector(self.source)
connector.sync_groups()
connector.sync_membership()
group = Group.objects.filter(name="test-group")
self.assertTrue(group.exists())
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_auth(self):
"""Test Cached auth"""
connector = Connector(self.source)
connector.sync_users()
user = User.objects.get(username="user0_sn")
auth_user_by_bind = Mock(return_value=user)
with patch(
"passbook.sources.ldap.connector.Connector.auth_user_by_bind",
auth_user_by_bind,
):
backend = LDAPBackend()
self.assertEqual(
backend.authenticate(None, username="user0_sn", password=LDAP_PASSWORD),
user,
)
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_tasks(self):
"""Test Scheduled tasks"""
sync()

View File

View File

@ -0,0 +1,47 @@
"""LDAP Source tests"""
from unittest.mock import Mock, PropertyMock, patch
from django.test import TestCase
from passbook.core.models import User
from passbook.providers.oauth2.generators import generate_client_secret
from passbook.sources.ldap.auth import LDAPBackend
from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource
from passbook.sources.ldap.sync import LDAPSynchronizer
from passbook.sources.ldap.tests.utils import _build_mock_connection
LDAP_PASSWORD = generate_client_secret()
LDAP_CONNECTION_PATCH = PropertyMock(return_value=_build_mock_connection(LDAP_PASSWORD))
class LDAPSyncTests(TestCase):
"""LDAP Sync tests"""
def setUp(self):
self.source = LDAPSource.objects.create(
name="ldap",
slug="ldap",
base_dn="DC=AD2012,DC=LAB",
additional_user_dn="ou=users",
additional_group_dn="ou=groups",
)
self.source.property_mappings.set(LDAPPropertyMapping.objects.all())
self.source.save()
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_auth_synced_user(self):
"""Test Cached auth"""
syncer = LDAPSynchronizer(self.source)
syncer.sync_users()
user = User.objects.get(username="user0_sn")
auth_user_by_bind = Mock(return_value=user)
with patch(
"passbook.sources.ldap.auth.LDAPBackend.auth_user_by_bind",
auth_user_by_bind,
):
backend = LDAPBackend()
self.assertEqual(
backend.authenticate(None, username="user0_sn", password=LDAP_PASSWORD),
user,
)

View File

@ -0,0 +1,54 @@
"""LDAP Source tests"""
from unittest.mock import PropertyMock, patch
from django.test import TestCase
from passbook.core.models import User
from passbook.providers.oauth2.generators import generate_client_secret
from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource
from passbook.sources.ldap.password import LDAPPasswordChanger
from passbook.sources.ldap.tests.utils import _build_mock_connection
LDAP_PASSWORD = generate_client_secret()
LDAP_CONNECTION_PATCH = PropertyMock(return_value=_build_mock_connection(LDAP_PASSWORD))
class LDAPPasswordTests(TestCase):
"""LDAP Password tests"""
def setUp(self):
self.source = LDAPSource.objects.create(
name="ldap",
slug="ldap",
base_dn="DC=AD2012,DC=LAB",
additional_user_dn="ou=users",
additional_group_dn="ou=groups",
)
self.source.property_mappings.set(LDAPPropertyMapping.objects.all())
self.source.save()
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_password_complexity(self):
"""Test password without user"""
pwc = LDAPPasswordChanger(self.source)
self.assertFalse(pwc.ad_password_complexity("test")) # 1 category
self.assertFalse(pwc.ad_password_complexity("test1")) # 2 categories
self.assertTrue(pwc.ad_password_complexity("test1!")) # 2 categories
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_password_complexity_user(self):
"""test password with user"""
pwc = LDAPPasswordChanger(self.source)
user = User.objects.create(
username="test",
attributes={"distinguishedName": "cn=user,ou=users,DC=AD2012,DC=LAB"},
)
self.assertFalse(pwc.ad_password_complexity("test", user)) # 1 category
self.assertFalse(pwc.ad_password_complexity("test1", user)) # 2 categories
self.assertTrue(pwc.ad_password_complexity("test1!", user)) # 2 categories
self.assertFalse(
pwc.ad_password_complexity("erin!qewrqewr", user)
) # displayName token
self.assertFalse(
pwc.ad_password_complexity("hagens!qewrqewr", user)
) # displayName token

View File

@ -0,0 +1,51 @@
"""LDAP Source tests"""
from unittest.mock import PropertyMock, patch
from django.test import TestCase
from passbook.core.models import Group, User
from passbook.providers.oauth2.generators import generate_client_secret
from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource
from passbook.sources.ldap.sync import LDAPSynchronizer
from passbook.sources.ldap.tasks import sync
from passbook.sources.ldap.tests.utils import _build_mock_connection
LDAP_PASSWORD = generate_client_secret()
LDAP_CONNECTION_PATCH = PropertyMock(return_value=_build_mock_connection(LDAP_PASSWORD))
class LDAPSyncTests(TestCase):
"""LDAP Sync tests"""
def setUp(self):
self.source = LDAPSource.objects.create(
name="ldap",
slug="ldap",
base_dn="DC=AD2012,DC=LAB",
additional_user_dn="ou=users",
additional_group_dn="ou=groups",
)
self.source.property_mappings.set(LDAPPropertyMapping.objects.all())
self.source.save()
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_sync_users(self):
"""Test user sync"""
syncer = LDAPSynchronizer(self.source)
syncer.sync_users()
self.assertTrue(User.objects.filter(username="user0_sn").exists())
self.assertFalse(User.objects.filter(username="user1_sn").exists())
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_sync_groups(self):
"""Test group sync"""
syncer = LDAPSynchronizer(self.source)
syncer.sync_groups()
syncer.sync_membership()
group = Group.objects.filter(name="test-group")
self.assertTrue(group.exists())
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_tasks(self):
"""Test Scheduled tasks"""
sync()

View File

@ -0,0 +1,93 @@
"""ldap testing utils"""
from ldap3 import MOCK_SYNC, OFFLINE_AD_2012_R2, Connection, Server
def _build_mock_connection(password: str) -> Connection:
"""Create mock connection"""
server = Server("my_fake_server", get_info=OFFLINE_AD_2012_R2)
_pass = "foo" # noqa # nosec
connection = Connection(
server,
user="cn=my_user,DC=AD2012,DC=LAB",
password=_pass,
client_strategy=MOCK_SYNC,
)
# Entry for password checking
connection.strategy.add_entry(
"cn=user,ou=users,DC=AD2012,DC=LAB",
{
"name": "test-user",
"objectSid": "unique-test-group",
"objectCategory": "Person",
"displayName": "Erin M. Hagens",
"sAMAccountName": "sAMAccountName",
"distinguishedName": "cn=user,ou=users,DC=AD2012,DC=LAB",
},
)
connection.strategy.add_entry(
"cn=group1,ou=groups,DC=AD2012,DC=LAB",
{
"name": "test-group",
"objectSid": "unique-test-group",
"objectCategory": "Group",
"distinguishedName": "cn=group1,ou=groups,DC=AD2012,DC=LAB",
},
)
# Group without SID
connection.strategy.add_entry(
"cn=group2,ou=groups,DC=AD2012,DC=LAB",
{
"name": "test-group",
"objectCategory": "Group",
"distinguishedName": "cn=group2,ou=groups,DC=AD2012,DC=LAB",
},
)
connection.strategy.add_entry(
"cn=user0,ou=users,DC=AD2012,DC=LAB",
{
"userPassword": password,
"sAMAccountName": "user0_sn",
"name": "user0_sn",
"revision": 0,
"objectSid": "user0",
"objectCategory": "Person",
"memberOf": "cn=group1,ou=groups,DC=AD2012,DC=LAB",
},
)
# User without SID
connection.strategy.add_entry(
"cn=user1,ou=users,DC=AD2012,DC=LAB",
{
"userPassword": "test1111",
"sAMAccountName": "user2_sn",
"name": "user1_sn",
"revision": 0,
"objectCategory": "Person",
},
)
# Duplicate users
connection.strategy.add_entry(
"cn=user2,ou=users,DC=AD2012,DC=LAB",
{
"userPassword": "test2222",
"sAMAccountName": "user2_sn",
"name": "user2_sn",
"revision": 0,
"objectSid": "unique-test2222",
"objectCategory": "Person",
},
)
connection.strategy.add_entry(
"cn=user3,ou=users,DC=AD2012,DC=LAB",
{
"userPassword": "test2222",
"sAMAccountName": "user2_sn",
"name": "user2_sn",
"revision": 0,
"objectSid": "unique-test2222",
"objectCategory": "Person",
},
)
connection.bind()
return connection

View File

@ -5836,18 +5836,22 @@ definitions:
title: Action
type: string
enum:
- LOGIN
- LOGIN_FAILED
- LOGOUT
- AUTHORIZE_APPLICATION
- SUSPICIOUS_REQUEST
- SIGN_UP
- PASSWORD_RESET
- INVITE_CREATED
- INVITE_USED
- IMPERSONATION_STARTED
- IMPERSONATION_ENDED
- CUSTOM
- login
- login_failed
- logout
- sign_up
- authorize_application
- suspicious_request
- password_set
- invitation_created
- invitation_used
- source_linked
- impersonation_started
- impersonation_ended
- model_created
- model_updated
- model_deleted
- custom_
date:
title: Date
type: string