From 42005e7defea9c32e8299a23d1f4f54b40526554 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Tue, 15 Dec 2020 23:39:52 +0100 Subject: [PATCH] outposts: ensure all Service Connection state updates are done by the task --- authentik/outposts/models.py | 20 +++++++++++++------- authentik/outposts/tasks.py | 14 ++++++++------ 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/authentik/outposts/models.py b/authentik/outposts/models.py index 7aaed3380..56077bba8 100644 --- a/authentik/outposts/models.py +++ b/authentik/outposts/models.py @@ -35,6 +35,7 @@ from authentik.lib.models import InheritanceForeignKey from authentik.lib.sentry import SentryIgnoredException from authentik.lib.utils.template import render_to_string from authentik.outposts.docker_tls import DockerInlineTLS +from authentik.outposts.tasks import outpost_service_connection_state OUR_VERSION = parse(__version__) OUTPOST_HELLO_INTERVAL = 10 @@ -113,17 +114,22 @@ class OutpostServiceConnection(models.Model): objects = InheritanceManager() + @property + def state_key(self) -> str: + """Key used to save connection state in cache""" + return f"outpost_service_connection_{self.pk.hex}" + @property def state(self) -> OutpostServiceConnectionState: """Get state of service connection""" - state_key = f"outpost_service_connection_{self.pk.hex}" - state = cache.get(state_key, None) + state = cache.get(self.state_key, None) if not state: - state = self._get_state() - cache.set(state_key, state, timeout=0) + outpost_service_connection_state.delay(self.pk) + return OutpostServiceConnectionState("", False) return state - def _get_state(self) -> OutpostServiceConnectionState: + def fetch_state(self) -> OutpostServiceConnectionState: + """Fetch current Service Connection state""" raise NotImplementedError @property @@ -203,7 +209,7 @@ class DockerServiceConnection(OutpostServiceConnection): raise ServiceConnectionInvalid from exc return client - def _get_state(self) -> OutpostServiceConnectionState: + def fetch_state(self) -> OutpostServiceConnectionState: try: client = self.client() return OutpostServiceConnectionState( @@ -239,7 +245,7 @@ class KubernetesServiceConnection(OutpostServiceConnection): def __str__(self) -> str: return f"Kubernetes Service-Connection {self.name}" - def _get_state(self) -> OutpostServiceConnectionState: + def fetch_state(self) -> OutpostServiceConnectionState: try: client = self.client() api_instance = VersionApi(client) diff --git a/authentik/outposts/tasks.py b/authentik/outposts/tasks.py index d40efcbcc..b7f905108 100644 --- a/authentik/outposts/tasks.py +++ b/authentik/outposts/tasks.py @@ -35,21 +35,23 @@ def outpost_controller_all(): @CELERY_APP.task() -def outpost_service_connection_state(state_pk: Any): +def outpost_service_connection_state(connection_pk: Any): """Update cached state of a service connection""" connection: OutpostServiceConnection = ( - OutpostServiceConnection.objects.filter(pk=state_pk).select_subclasses().first() + OutpostServiceConnection.objects.filter(pk=connection_pk) + .select_subclasses() + .first() ) cache.delete(f"outpost_service_connection_{connection.pk.hex}") - _ = connection.state + state = connection.fetch_state() + cache.set(connection.state_key, state, timeout=0) @CELERY_APP.task(bind=True, base=MonitoredTask) def outpost_service_connection_monitor(self: MonitoredTask): """Regularly check the state of Outpost Service Connections""" - for connection in OutpostServiceConnection.objects.select_subclasses(): - cache.delete(f"outpost_service_connection_{connection.pk.hex}") - _ = connection.state + for connection in OutpostServiceConnection.objects.all(): + outpost_service_connection_state.delay(connection.pk) self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL))