events: revert to @prefill_task decorator since base class doesn't get executed until task runs

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer 2021-12-09 10:18:00 +01:00
parent db316b59c5
commit a9bd34f3c5
8 changed files with 85 additions and 51 deletions

View File

@ -11,7 +11,12 @@ from structlog.stdlib import get_logger
from authentik import ENV_GIT_HASH_KEY, __version__ from authentik import ENV_GIT_HASH_KEY, __version__
from authentik.events.models import Event, EventAction, Notification from authentik.events.models import Event, EventAction, Notification
from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.lib.config import CONFIG from authentik.lib.config import CONFIG
from authentik.lib.utils.http import get_http_session from authentik.lib.utils.http import get_http_session
from authentik.root.celery import CELERY_APP from authentik.root.celery import CELERY_APP
@ -48,8 +53,9 @@ def clear_update_notifications():
notification.delete() notification.delete()
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) @CELERY_APP.task(bind=True, base=MonitoredTask)
def update_latest_version(self: PrefilledMonitoredTask): @prefill_task
def update_latest_version(self: MonitoredTask):
"""Update latest version info""" """Update latest version info"""
if CONFIG.y_bool("disable_update_check"): if CONFIG.y_bool("disable_update_check"):
cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT) cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT)

View File

@ -16,15 +16,21 @@ from kubernetes.config.incluster_config import SERVICE_HOST_ENV_NAME
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
from authentik.core.models import AuthenticatedSession, ExpiringModel from authentik.core.models import AuthenticatedSession, ExpiringModel
from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.lib.config import CONFIG from authentik.lib.config import CONFIG
from authentik.root.celery import CELERY_APP from authentik.root.celery import CELERY_APP
LOGGER = get_logger() LOGGER = get_logger()
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) @CELERY_APP.task(bind=True, base=MonitoredTask)
def clean_expired_models(self: PrefilledMonitoredTask): @prefill_task
def clean_expired_models(self: MonitoredTask):
"""Remove expired objects""" """Remove expired objects"""
messages = [] messages = []
for cls in ExpiringModel.__subclasses__(): for cls in ExpiringModel.__subclasses__():
@ -62,8 +68,9 @@ def should_backup() -> bool:
return True return True
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) @CELERY_APP.task(bind=True, base=MonitoredTask)
def backup_database(self: PrefilledMonitoredTask): # pragma: no cover @prefill_task
def backup_database(self: MonitoredTask): # pragma: no cover
"""Database backup""" """Database backup"""
self.result_timeout_hours = 25 self.result_timeout_hours = 25
if not should_backup(): if not should_backup():

View File

@ -6,7 +6,12 @@ from django.utils.translation import gettext_lazy as _
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
from authentik.crypto.models import CertificateKeyPair from authentik.crypto.models import CertificateKeyPair
from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.lib.config import CONFIG from authentik.lib.config import CONFIG
from authentik.root.celery import CELERY_APP from authentik.root.celery import CELERY_APP
@ -15,8 +20,9 @@ LOGGER = get_logger()
MANAGED_DISCOVERED = "goauthentik.io/crypto/discovered/%s" MANAGED_DISCOVERED = "goauthentik.io/crypto/discovered/%s"
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) @CELERY_APP.task(bind=True, base=MonitoredTask)
def certificate_discovery(self: PrefilledMonitoredTask): @prefill_task
def certificate_discovery(self: MonitoredTask):
"""Discover and update certificates form the filesystem""" """Discover and update certificates form the filesystem"""
certs = {} certs = {}
private_keys = {} private_keys = {}

View File

@ -186,27 +186,21 @@ class MonitoredTask(Task):
raise NotImplementedError raise NotImplementedError
class PrefilledMonitoredTask(MonitoredTask): def prefill_task(func):
"""Subclass of MonitoredTask, but create entry in cache if task hasn't been run """Ensure a task's details are always in cache, so it can always be triggered via API"""
Does not support UID""" status = TaskInfo.by_name(func.__name__)
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
status = TaskInfo.by_name(self.__name__)
if status: if status:
return return func
TaskInfo( TaskInfo(
task_name=self.__name__, task_name=func.__name__,
task_description=self.__doc__, task_description=func.__doc__,
result=TaskResult(TaskResultStatus.UNKNOWN, messages=[_("Task has not been run yet.")]), result=TaskResult(TaskResultStatus.UNKNOWN, messages=[_("Task has not been run yet.")]),
task_call_module=self.__module__, task_call_module=func.__module__,
task_call_func=self.__name__, task_call_func=func.__name__,
# We don't have real values for these attributes but they cannot be null # We don't have real values for these attributes but they cannot be null
start_timestamp=default_timer(), start_timestamp=default_timer(),
finish_timestamp=default_timer(), finish_timestamp=default_timer(),
finish_time=datetime.now(), finish_time=datetime.now(),
).save(86400) ).save(86400)
LOGGER.debug("prefilled task", task_name=self.__name__) LOGGER.debug("prefilled task", task_name=func.__name__)
return func
def run(self, *args, **kwargs):
raise NotImplementedError

View File

@ -2,12 +2,18 @@
from django.db import DatabaseError from django.db import DatabaseError
from authentik.core.tasks import CELERY_APP from authentik.core.tasks import CELERY_APP
from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.managed.manager import ObjectManager from authentik.managed.manager import ObjectManager
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) @CELERY_APP.task(bind=True, base=MonitoredTask)
def managed_reconcile(self: PrefilledMonitoredTask): @prefill_task
def managed_reconcile(self: MonitoredTask):
"""Run ObjectManager to ensure objects are up-to-date""" """Run ObjectManager to ensure objects are up-to-date"""
try: try:
ObjectManager().run() ObjectManager().run()

View File

@ -19,9 +19,9 @@ from structlog.stdlib import get_logger
from authentik.events.monitored_tasks import ( from authentik.events.monitored_tasks import (
MonitoredTask, MonitoredTask,
PrefilledMonitoredTask,
TaskResult, TaskResult,
TaskResultStatus, TaskResultStatus,
prefill_task,
) )
from authentik.lib.utils.reflection import path_to_class from authentik.lib.utils.reflection import path_to_class
from authentik.outposts.controllers.base import BaseController, ControllerException from authentik.outposts.controllers.base import BaseController, ControllerException
@ -75,8 +75,9 @@ def outpost_service_connection_state(connection_pk: Any):
cache.set(connection.state_key, state, timeout=None) cache.set(connection.state_key, state, timeout=None)
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) @CELERY_APP.task(bind=True, base=MonitoredTask)
def outpost_service_connection_monitor(self: PrefilledMonitoredTask): @prefill_task
def outpost_service_connection_monitor(self: MonitoredTask):
"""Regularly check the state of Outpost Service Connections""" """Regularly check the state of Outpost Service Connections"""
connections = OutpostServiceConnection.objects.all() connections = OutpostServiceConnection.objects.all()
for connection in connections.iterator(): for connection in connections.iterator():
@ -124,8 +125,9 @@ def outpost_controller(
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs)) self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs))
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) @CELERY_APP.task(bind=True, base=MonitoredTask)
def outpost_token_ensurer(self: PrefilledMonitoredTask): @prefill_task
def outpost_token_ensurer(self: MonitoredTask):
"""Periodically ensure that all Outposts have valid Service Accounts """Periodically ensure that all Outposts have valid Service Accounts
and Tokens""" and Tokens"""
all_outposts = Outpost.objects.all() all_outposts = Outpost.objects.all()

View File

@ -2,7 +2,12 @@
from django.core.cache import cache from django.core.cache import cache
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.policies.reputation.models import IPReputation, UserReputation from authentik.policies.reputation.models import IPReputation, UserReputation
from authentik.policies.reputation.signals import CACHE_KEY_IP_PREFIX, CACHE_KEY_USER_PREFIX from authentik.policies.reputation.signals import CACHE_KEY_IP_PREFIX, CACHE_KEY_USER_PREFIX
from authentik.root.celery import CELERY_APP from authentik.root.celery import CELERY_APP
@ -10,8 +15,9 @@ from authentik.root.celery import CELERY_APP
LOGGER = get_logger() LOGGER = get_logger()
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) @CELERY_APP.task(bind=True, base=MonitoredTask)
def save_ip_reputation(self: PrefilledMonitoredTask): @prefill_task
def save_ip_reputation(self: MonitoredTask):
"""Save currently cached reputation to database""" """Save currently cached reputation to database"""
objects_to_update = [] objects_to_update = []
for key, score in cache.get_many(cache.keys(CACHE_KEY_IP_PREFIX + "*")).items(): for key, score in cache.get_many(cache.keys(CACHE_KEY_IP_PREFIX + "*")).items():
@ -23,8 +29,9 @@ def save_ip_reputation(self: PrefilledMonitoredTask):
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated IP Reputation"])) self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated IP Reputation"]))
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) @CELERY_APP.task(bind=True, base=MonitoredTask)
def save_user_reputation(self: PrefilledMonitoredTask): @prefill_task
def save_user_reputation(self: MonitoredTask):
"""Save currently cached reputation to database""" """Save currently cached reputation to database"""
objects_to_update = [] objects_to_update = []
for key, score in cache.get_many(cache.keys(CACHE_KEY_USER_PREFIX + "*")).items(): for key, score in cache.get_many(cache.keys(CACHE_KEY_USER_PREFIX + "*")).items():

View File

@ -3,7 +3,12 @@ from django.utils.timezone import now
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
from authentik.core.models import AuthenticatedSession, User from authentik.core.models import AuthenticatedSession, User
from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.lib.utils.time import timedelta_from_string from authentik.lib.utils.time import timedelta_from_string
from authentik.root.celery import CELERY_APP from authentik.root.celery import CELERY_APP
from authentik.sources.saml.models import SAMLSource from authentik.sources.saml.models import SAMLSource
@ -11,8 +16,9 @@ from authentik.sources.saml.models import SAMLSource
LOGGER = get_logger() LOGGER = get_logger()
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) @CELERY_APP.task(bind=True, base=MonitoredTask)
def clean_temporary_users(self: PrefilledMonitoredTask): @prefill_task
def clean_temporary_users(self: MonitoredTask):
"""Remove temporary users created by SAML Sources""" """Remove temporary users created by SAML Sources"""
_now = now() _now = now()
messages = [] messages = []