This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/authentik/managed/manager.py

60 lines
1.7 KiB
Python
Raw Normal View History

"""Managed objects manager"""
from typing import Type
from structlog.stdlib import get_logger
from authentik.managed.models import ManagedModel
LOGGER = get_logger()
class EnsureOp:
"""Ensure operation, executed as part of an ObjectManager run"""
_obj: Type[ManagedModel]
_match_fields: tuple[str, ...]
_kwargs: dict
def __init__(self, obj: Type[ManagedModel], *match_fields: str, **kwargs) -> None:
self._obj = obj
self._match_fields = match_fields
self._kwargs = kwargs
def run(self):
"""Do the actual ensure action"""
raise NotImplementedError
class EnsureExists(EnsureOp):
"""Ensure object exists, with kwargs as given values"""
def run(self):
update_kwargs = {
"managed": True,
"defaults": self._kwargs,
}
for field in self._match_fields:
value = self._kwargs.get(field, None)
if value:
update_kwargs[field] = value
self._kwargs.setdefault("managed", True)
self._obj.objects.update_or_create(**update_kwargs)
class ObjectManager:
"""Base class for Apps Object manager"""
def run(self):
"""Main entrypoint for tasks, iterate through all implementation of this
and execute all operations"""
for sub in ObjectManager.__subclasses__():
sub_inst = sub()
ops = sub_inst.reconcile()
LOGGER.debug("Reconciling managed objects", manager=sub.__name__)
for operation in ops:
operation.run()
def reconcile(self) -> list[EnsureOp]:
"""Method which is implemented in subclass that returns a list of Operations"""
raise NotImplementedError