sources/ldap: improve unittests

This commit is contained in:
Jens Langhammer 2020-07-10 20:10:51 +02:00
parent c191b62245
commit 8de3c4fbd6
5 changed files with 104 additions and 46 deletions

View File

@ -1,6 +1,5 @@
[run]
source = passbook
branch = True
omit =
*/wsgi.py
manage.py

View File

@ -18,8 +18,7 @@ class LDAPBackend(ModelBackend):
return None
for source in LDAPSource.objects.filter(enabled=True):
LOGGER.debug("LDAP Auth attempt", source=source)
_ldap = Connector(source)
user = _ldap.auth_user(**kwargs)
user = Connector(source).auth_user(**kwargs)
if user:
return user
return None

View File

@ -53,18 +53,19 @@ class Connector:
)
for group in groups:
attributes = group.get("attributes", {})
if self._source.object_uniqueness_field not in attributes:
LOGGER.warning(
"Cannot find uniqueness Field in attributes", user=attributes.keys()
)
continue
uniq = attributes[self._source.object_uniqueness_field]
_, created = Group.objects.update_or_create(
attributes__ldap_uniq=attributes.get(
self._source.object_uniqueness_field, ""
),
attributes__ldap_uniq=uniq,
parent=self._source.sync_parent_group,
# defaults=self._build_object_properties(attributes),
defaults={
"name": attributes.get("name", ""),
"attributes": {
"ldap_uniq": attributes.get(
self._source.object_uniqueness_field, ""
),
"ldap_uniq": uniq,
"distinguishedName": attributes.get("distinguishedName"),
},
},
@ -86,11 +87,12 @@ class Connector:
)
for user in users:
attributes = user.get("attributes", {})
try:
uniq = attributes[self._source.object_uniqueness_field]
except KeyError:
LOGGER.warning("Cannot find uniqueness Field in attributes")
if self._source.object_uniqueness_field not in attributes:
LOGGER.warning(
"Cannot find uniqueness Field in attributes", user=user.keys()
)
continue
uniq = attributes[self._source.object_uniqueness_field]
try:
defaults = self._build_object_properties(attributes)
user, created = User.objects.update_or_create(
@ -180,7 +182,7 @@ class Connector:
)
return properties
def auth_user(self, password: str, **filters: Dict[str, str]) -> Optional[User]:
def auth_user(self, password: str, **filters: str) -> Optional[User]:
"""Try to bind as either user_dn or mail with password.
Returns True on success, otherwise False"""
users = User.objects.filter(**filters)

View File

@ -4,22 +4,6 @@ from passbook.sources.ldap.connector import Connector
from passbook.sources.ldap.models import LDAPSource
@CELERY_APP.task()
def sync_groups(source_pk: int):
"""Sync LDAP Groups on background worker"""
source = LDAPSource.objects.get(pk=source_pk)
connector = Connector(source)
connector.sync_groups()
@CELERY_APP.task()
def sync_users(source_pk: int):
"""Sync LDAP Users on background worker"""
source = LDAPSource.objects.get(pk=source_pk)
connector = Connector(source)
connector.sync_users()
@CELERY_APP.task()
def sync():
"""Sync all sources"""

View File

@ -1,12 +1,15 @@
"""LDAP Source tests"""
from unittest.mock import PropertyMock, patch
from unittest.mock import Mock, PropertyMock, patch
from django.test import TestCase
from ldap3 import MOCK_SYNC, OFFLINE_AD_2012_R2, Connection, Server
from oauth2_provider.generators import generate_client_secret
from passbook.core.models import User
from passbook.core.models import Group, User
from passbook.sources.ldap.auth import LDAPBackend
from passbook.sources.ldap.connector import Connector
from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource
from passbook.sources.ldap.tasks import sync
def _build_mock_connection() -> Connection:
@ -20,30 +23,64 @@ def _build_mock_connection() -> Connection:
client_strategy=MOCK_SYNC,
)
connection.strategy.add_entry(
"cn=user0,ou=test,o=lab",
"cn=group1,ou=groups,ou=test,o=lab",
{
"userPassword": "test0000",
"sAMAccountName": "user0_sn",
"revision": 0,
"objectSid": "unique-test0000",
"objectCategory": "Person",
"name": "test-group",
"objectSid": "unique-test-group",
"objectCategory": "Group",
"distinguishedName": "cn=group1,ou=groups,ou=test,o=lab",
},
)
# Group without SID
connection.strategy.add_entry(
"cn=group2,ou=groups,ou=test,o=lab",
{
"name": "test-group",
"objectCategory": "Group",
"distinguishedName": "cn=group2,ou=groups,ou=test,o=lab",
},
)
connection.strategy.add_entry(
"cn=user1,ou=test,o=lab",
"cn=user0,ou=users,ou=test,o=lab",
{
"userPassword": LDAP_PASSWORD,
"sAMAccountName": "user0_sn",
"name": "user0_sn",
"revision": 0,
"objectSid": "user0",
"objectCategory": "Person",
"memberOf": "cn=group1,ou=groups,ou=test,o=lab",
},
)
# User without SID
connection.strategy.add_entry(
"cn=user1,ou=users,ou=test,o=lab",
{
"userPassword": "test1111",
"sAMAccountName": "user1_sn",
"sAMAccountName": "user2_sn",
"name": "user1_sn",
"revision": 0,
"objectSid": "unique-test1111",
"objectCategory": "Person",
},
)
# Duplicate users
connection.strategy.add_entry(
"cn=user2,ou=test,o=lab",
"cn=user2,ou=users,ou=test,o=lab",
{
"userPassword": "test2222",
"sAMAccountName": "user2_sn",
"name": "user2_sn",
"revision": 0,
"objectSid": "unique-test2222",
"objectCategory": "Person",
},
)
connection.strategy.add_entry(
"cn=user3,ou=users,ou=test,o=lab",
{
"userPassword": "test2222",
"sAMAccountName": "user2_sn",
"name": "user2_sn",
"revision": 0,
"objectSid": "unique-test2222",
"objectCategory": "Person",
@ -53,6 +90,7 @@ def _build_mock_connection() -> Connection:
return connection
LDAP_PASSWORD = generate_client_secret()
LDAP_CONNECTION_PATCH = PropertyMock(return_value=_build_mock_connection())
@ -61,7 +99,11 @@ class LDAPSourceTests(TestCase):
def setUp(self):
self.source = LDAPSource.objects.create(
name="ldap", slug="ldap", base_dn="o=lab"
name="ldap",
slug="ldap",
base_dn="ou=test,o=lab",
additional_user_dn="ou=users",
additional_group_dn="ou=groups",
)
self.source.property_mappings.set(LDAPPropertyMapping.objects.all())
self.source.save()
@ -71,5 +113,37 @@ class LDAPSourceTests(TestCase):
"""Test user sync"""
connector = Connector(self.source)
connector.sync_users()
user = User.objects.filter(username="user2_sn")
self.assertTrue(user.exists())
self.assertTrue(User.objects.filter(username="user0_sn").exists())
self.assertFalse(User.objects.filter(username="user1_sn").exists())
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_sync_groups(self):
"""Test group sync"""
connector = Connector(self.source)
connector.sync_groups()
connector.sync_membership()
group = Group.objects.filter(name="test-group")
self.assertTrue(group.exists())
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_auth(self):
"""Test Cached auth"""
connector = Connector(self.source)
connector.sync_users()
user = User.objects.get(username="user0_sn")
auth_user_by_bind = Mock(return_value=user)
with patch(
"passbook.sources.ldap.connector.Connector.auth_user_by_bind",
auth_user_by_bind,
):
backend = LDAPBackend()
self.assertEqual(
backend.authenticate(None, username="user0_sn", password=LDAP_PASSWORD),
user,
)
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
def test_tasks(self):
"""Test Scheduled tasks"""
sync()