This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/authentik/admin/tasks.py
Jens Langhammer 2bd29e2fdd *: improve error handling for startup tasks
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2022-08-01 23:31:47 +02:00

96 lines
3.5 KiB
Python

"""authentik admin tasks"""
import re
from django.core.cache import cache
from django.core.validators import URLValidator
from django.db import DatabaseError, InternalError, ProgrammingError
from packaging.version import parse
from requests import RequestException
from structlog.stdlib import get_logger
from authentik import __version__, get_build_hash
from authentik.admin.apps import PROM_INFO
from authentik.events.models import Event, EventAction, Notification
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.lib.config import CONFIG
from authentik.lib.utils.http import get_http_session
from authentik.root.celery import CELERY_APP
LOGGER = get_logger()
VERSION_CACHE_KEY = "authentik_latest_version"
VERSION_CACHE_TIMEOUT = 8 * 60 * 60 # 8 hours
# Chop of the first ^ because we want to search the entire string
URL_FINDER = URLValidator.regex.pattern[1:]
LOCAL_VERSION = parse(__version__)
def _set_prom_info():
"""Set prometheus info for version"""
PROM_INFO.info(
{
"version": __version__,
"latest": cache.get(VERSION_CACHE_KEY, ""),
"build_hash": get_build_hash(),
}
)
@CELERY_APP.task(
throws=(DatabaseError, ProgrammingError, InternalError),
)
def clear_update_notifications():
"""Clear update notifications on startup if the notification was for the version
we're running now."""
for notification in Notification.objects.filter(event__action=EventAction.UPDATE_AVAILABLE):
if "new_version" not in notification.event.context:
continue
notification_version = notification.event.context["new_version"]
if LOCAL_VERSION >= parse(notification_version):
notification.delete()
@CELERY_APP.task(bind=True, base=MonitoredTask)
@prefill_task
def update_latest_version(self: MonitoredTask):
"""Update latest version info"""
if CONFIG.y_bool("disable_update_check"):
cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT)
self.set_status(TaskResult(TaskResultStatus.WARNING, messages=["Version check disabled."]))
return
try:
response = get_http_session().get(
"https://version.goauthentik.io/version.json",
)
response.raise_for_status()
data = response.json()
upstream_version = data.get("stable", {}).get("version")
cache.set(VERSION_CACHE_KEY, upstream_version, VERSION_CACHE_TIMEOUT)
self.set_status(
TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated latest Version"])
)
_set_prom_info()
# Check if upstream version is newer than what we're running,
# and if no event exists yet, create one.
if LOCAL_VERSION < parse(upstream_version):
# Event has already been created, don't create duplicate
if Event.objects.filter(
action=EventAction.UPDATE_AVAILABLE,
context__new_version=upstream_version,
).exists():
return
event_dict = {"new_version": upstream_version}
if match := re.search(URL_FINDER, data.get("stable", {}).get("changelog", "")):
event_dict["message"] = f"Changelog: {match.group()}"
Event.new(EventAction.UPDATE_AVAILABLE, **event_dict).save()
except (RequestException, IndexError) as exc:
cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT)
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
_set_prom_info()