sources/ldap: extract vendor-specific functions

#1521

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer 2021-10-18 21:36:51 +02:00
parent c3bd509eb8
commit 2b155964c2
6 changed files with 112 additions and 57 deletions

View file

@ -4,7 +4,7 @@ from re import split
from typing import Optional
import ldap3
import ldap3.core.exceptions
from ldap3.core.exceptions import LDAPAttributeError
from structlog.stdlib import get_logger
from authentik.core.models import User
@ -67,9 +67,9 @@ class LDAPPasswordChanger:
search_scope=ldap3.BASE,
attributes=["pwdProperties"],
)
except ldap3.core.exceptions.LDAPAttributeError:
root_attrs = list(root_attrs)[0]
except (LDAPAttributeError, KeyError, IndexError):
return False
root_attrs = list(root_attrs)[0]
raw_pwd_properties = root_attrs.get("attributes", {}).get("pwdProperties", None)
if raw_pwd_properties is None:
return False
@ -86,7 +86,10 @@ class LDAPPasswordChanger:
if not user_dn:
LOGGER.info(f"User has no {LDAP_DISTINGUISHED_NAME} set.")
return
self._source.connection.extend.microsoft.modify_password(user_dn, password)
try:
self._source.connection.extend.microsoft.modify_password(user_dn, password)
except LDAPAttributeError:
self._source.connection.extend.standard.modify_password(user_dn, new_password=password)
def _ad_check_password_existing(self, password: str, user_dn: str) -> bool:
"""Check if a password contains sAMAccount or displayName"""

View file

@ -1,16 +1,14 @@
"""Sync LDAP Users into authentik"""
from datetime import datetime
import ldap3
import ldap3.core.exceptions
from django.core.exceptions import FieldError
from django.db.utils import IntegrityError
from pytz import UTC
from authentik.core.models import User
from authentik.events.models import Event, EventAction
from authentik.sources.ldap.sync.base import LDAP_UNIQUENESS, BaseLDAPSynchronizer
from authentik.sources.ldap.sync.vendor.ad import UserAccountControl
from authentik.sources.ldap.sync.vendor.freeipa import FreeIPA
from authentik.sources.ldap.sync.vendor.ms_ad import MicrosoftActiveDirectory
class UserLDAPSynchronizer(BaseLDAPSynchronizer):
@ -64,20 +62,6 @@ class UserLDAPSynchronizer(BaseLDAPSynchronizer):
else:
self._logger.debug("Synced User", user=ak_user.username, created=created)
user_count += 1
pwd_last_set: datetime = attributes.get("pwdLastSet", datetime.now())
pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
if created or pwd_last_set >= ak_user.password_change_date:
self.message(f"'{ak_user.username}': Reset user's password")
self._logger.debug(
"Reset user's password",
user=ak_user.username,
created=created,
pwd_last_set=pwd_last_set,
)
ak_user.set_unusable_password()
ak_user.save()
if "userAccountControl" in attributes:
uac = UserAccountControl(attributes.get("userAccountControl"))
ak_user.is_active = UserAccountControl.ACCOUNTDISABLE not in uac
ak_user.save()
MicrosoftActiveDirectory(self._source).sync(attributes, user, created)
FreeIPA(self._source).sync(attributes, user, created)
return user_count

View file

@ -1,32 +0,0 @@
"""Active Directory specific"""
from enum import IntFlag
class UserAccountControl(IntFlag):
"""UserAccountControl attribute for Active directory users"""
# https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity
# /useraccountcontrol-manipulate-account-properties
SCRIPT = 1
ACCOUNTDISABLE = 2
HOMEDIR_REQUIRED = 8
LOCKOUT = 16
PASSWD_NOTREQD = 32
PASSWD_CANT_CHANGE = 64
ENCRYPTED_TEXT_PWD_ALLOWED = 128
TEMP_DUPLICATE_ACCOUNT = 256
NORMAL_ACCOUNT = 512
INTERDOMAIN_TRUST_ACCOUNT = 2048
WORKSTATION_TRUST_ACCOUNT = 4096
SERVER_TRUST_ACCOUNT = 8192
DONT_EXPIRE_PASSWORD = 65536
MNS_LOGON_ACCOUNT = 131072
SMARTCARD_REQUIRED = 262144
TRUSTED_FOR_DELEGATION = 524288
NOT_DELEGATED = 1048576
USE_DES_KEY_ONLY = 2097152
DONT_REQ_PREAUTH = 4194304
PASSWORD_EXPIRED = 8388608
TRUSTED_TO_AUTH_FOR_DELEGATION = 16777216
PARTIAL_SECRETS_ACCOUNT = 67108864

View file

@ -0,0 +1,30 @@
"""FreeIPA specific"""
from datetime import datetime
from typing import Any
from pytz import UTC
from authentik.core.models import User
from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer
class FreeIPA(BaseLDAPSynchronizer):
"""FreeIPA-specific LDAP"""
def sync(self, attributes: dict[str, Any], user: User, created: bool):
self.check_pwd_last_set(attributes, user, created)
def check_pwd_last_set(self, attributes: dict[str, Any], user: User, created: bool):
"""Check krbLastPwdChange"""
pwd_last_set: datetime = attributes.get("krbLastPwdChange", datetime.now())
pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
if created or pwd_last_set >= user.password_change_date:
self.message(f"'{user.username}': Reset user's password")
self._logger.debug(
"Reset user's password",
user=user.username,
created=created,
pwd_last_set=pwd_last_set,
)
user.set_unusable_password()
user.save()

View file

@ -0,0 +1,70 @@
"""Active Directory specific"""
from datetime import datetime
from enum import IntFlag
from typing import Any
from pytz import UTC
from authentik.core.models import User
from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer
class UserAccountControl(IntFlag):
"""UserAccountControl attribute for Active directory users"""
# https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity
# /useraccountcontrol-manipulate-account-properties
SCRIPT = 1
ACCOUNTDISABLE = 2
HOMEDIR_REQUIRED = 8
LOCKOUT = 16
PASSWD_NOTREQD = 32
PASSWD_CANT_CHANGE = 64
ENCRYPTED_TEXT_PWD_ALLOWED = 128
TEMP_DUPLICATE_ACCOUNT = 256
NORMAL_ACCOUNT = 512
INTERDOMAIN_TRUST_ACCOUNT = 2048
WORKSTATION_TRUST_ACCOUNT = 4096
SERVER_TRUST_ACCOUNT = 8192
DONT_EXPIRE_PASSWORD = 65536
MNS_LOGON_ACCOUNT = 131072
SMARTCARD_REQUIRED = 262144
TRUSTED_FOR_DELEGATION = 524288
NOT_DELEGATED = 1048576
USE_DES_KEY_ONLY = 2097152
DONT_REQ_PREAUTH = 4194304
PASSWORD_EXPIRED = 8388608
TRUSTED_TO_AUTH_FOR_DELEGATION = 16777216
PARTIAL_SECRETS_ACCOUNT = 67108864
class MicrosoftActiveDirectory(BaseLDAPSynchronizer):
"""Microsoft-specific LDAP"""
def sync(self, attributes: dict[str, Any], user: User, created: bool):
self.ms_check_uac(attributes, user, created)
self.ms_check_pwd_last_set(attributes, user)
def ms_check_pwd_last_set(self, attributes: dict[str, Any], user: User, created: bool):
"""Check pwdLastSet"""
pwd_last_set: datetime = attributes.get("pwdLastSet", datetime.now())
pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
if created or pwd_last_set >= user.password_change_date:
self.message(f"'{user.username}': Reset user's password")
self._logger.debug(
"Reset user's password",
user=user.username,
created=created,
pwd_last_set=pwd_last_set,
)
user.set_unusable_password()
user.save()
def ms_check_uac(self, attributes: dict[str, Any], user: User):
"""Check userAccountControl"""
if uac_bit := attributes.get("userAccountControl", None):
# uac_bit: int = attributes.get("userAccountControl")
uac = UserAccountControl(uac_bit)
user.is_active = UserAccountControl.ACCOUNTDISABLE not in uac
user.save()

View file

@ -2,7 +2,7 @@
from ldap3 import MOCK_SYNC, OFFLINE_AD_2012_R2, Connection, Server
from authentik.sources.ldap.sync.vendor.ad import UserAccountControl
from authentik.sources.ldap.sync.vendor.ms_ad import UserAccountControl
def mock_ad_connection(password: str) -> Connection: