This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/authentik/managed/manager.py
Jens Langhammer c249b55ff5 *: use py3.10 syntax for unions, remove old Type[] import when possible
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2021-12-30 14:59:01 +01:00

71 lines
2 KiB
Python

"""Managed objects manager"""
from typing import Callable, Optional
from structlog.stdlib import get_logger
from authentik.managed.models import ManagedModel
LOGGER = get_logger()
class EnsureOp:
"""Ensure operation, executed as part of an ObjectManager run"""
_obj: type[ManagedModel]
_managed_uid: str
_kwargs: dict
def __init__(self, obj: type[ManagedModel], managed_uid: str, **kwargs) -> None:
self._obj = obj
self._managed_uid = managed_uid
self._kwargs = kwargs
def run(self):
"""Do the actual ensure action"""
raise NotImplementedError
class EnsureExists(EnsureOp):
"""Ensure object exists, with kwargs as given values"""
created_callback: Optional[Callable]
def __init__(
self,
obj: type[ManagedModel],
managed_uid: str,
created_callback: Optional[Callable] = None,
**kwargs,
) -> None:
super().__init__(obj, managed_uid, **kwargs)
self.created_callback = created_callback
def run(self):
self._kwargs.setdefault("managed", self._managed_uid)
obj, created = self._obj.objects.update_or_create(
**{
"managed": self._managed_uid,
"defaults": self._kwargs,
}
)
if created and self.created_callback is not None:
self.created_callback(obj)
class ObjectManager:
"""Base class for Apps Object manager"""
def run(self):
"""Main entrypoint for tasks, iterate through all implementation of this
and execute all operations"""
for sub in ObjectManager.__subclasses__():
sub_inst = sub()
ops = sub_inst.reconcile()
LOGGER.debug("Reconciling managed objects", manager=sub.__name__)
for operation in ops:
operation.run()
def reconcile(self) -> list[EnsureOp]:
"""Method which is implemented in subclass that returns a list of Operations"""
raise NotImplementedError