outposts: make state more consistent (#5403)
This commit is contained in:
parent
01663468de
commit
9d1ad104ec
|
@ -128,7 +128,7 @@ class OutpostServiceConnection(models.Model):
|
||||||
@property
|
@property
|
||||||
def state_key(self) -> str:
|
def state_key(self) -> str:
|
||||||
"""Key used to save connection state in cache"""
|
"""Key used to save connection state in cache"""
|
||||||
return f"outpost_service_connection_{self.pk.hex}"
|
return f"goauthentik.io/outposts/service_connection_state/{self.pk.hex}"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self) -> OutpostServiceConnectionState:
|
def state(self) -> OutpostServiceConnectionState:
|
||||||
|
@ -278,7 +278,7 @@ class Outpost(SerializerModel, ManagedModel):
|
||||||
@property
|
@property
|
||||||
def state_cache_prefix(self) -> str:
|
def state_cache_prefix(self) -> str:
|
||||||
"""Key by which the outposts status is saved"""
|
"""Key by which the outposts status is saved"""
|
||||||
return f"goauthentik.io/outposts/{self.uuid.hex}_state"
|
return f"goauthentik.io/outposts/state/{self.uuid.hex}"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self) -> list["OutpostState"]:
|
def state(self) -> list["OutpostState"]:
|
||||||
|
@ -433,19 +433,19 @@ class OutpostState:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def for_outpost(outpost: Outpost) -> list["OutpostState"]:
|
def for_outpost(outpost: Outpost) -> list["OutpostState"]:
|
||||||
"""Get all states for an outpost"""
|
"""Get all states for an outpost"""
|
||||||
keys = cache.keys(f"{outpost.state_cache_prefix}_*")
|
keys = cache.keys(f"{outpost.state_cache_prefix}/*")
|
||||||
if not keys:
|
if not keys:
|
||||||
return []
|
return []
|
||||||
states = []
|
states = []
|
||||||
for key in keys:
|
for key in keys:
|
||||||
instance_uid = key.replace(f"{outpost.state_cache_prefix}_", "")
|
instance_uid = key.replace(f"{outpost.state_cache_prefix}/", "")
|
||||||
states.append(OutpostState.for_instance_uid(outpost, instance_uid))
|
states.append(OutpostState.for_instance_uid(outpost, instance_uid))
|
||||||
return states
|
return states
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def for_instance_uid(outpost: Outpost, uid: str) -> "OutpostState":
|
def for_instance_uid(outpost: Outpost, uid: str) -> "OutpostState":
|
||||||
"""Get state for a single instance"""
|
"""Get state for a single instance"""
|
||||||
key = f"{outpost.state_cache_prefix}_{uid}"
|
key = f"{outpost.state_cache_prefix}/{uid}"
|
||||||
default_data = {"uid": uid, "channel_ids": []}
|
default_data = {"uid": uid, "channel_ids": []}
|
||||||
data = cache.get(key, default_data)
|
data = cache.get(key, default_data)
|
||||||
if isinstance(data, str):
|
if isinstance(data, str):
|
||||||
|
@ -458,10 +458,10 @@ class OutpostState:
|
||||||
|
|
||||||
def save(self, timeout=OUTPOST_HELLO_INTERVAL):
|
def save(self, timeout=OUTPOST_HELLO_INTERVAL):
|
||||||
"""Save current state to cache"""
|
"""Save current state to cache"""
|
||||||
full_key = f"{self._outpost.state_cache_prefix}_{self.uid}"
|
full_key = f"{self._outpost.state_cache_prefix}/{self.uid}"
|
||||||
return cache.set(full_key, asdict(self), timeout=timeout)
|
return cache.set(full_key, asdict(self), timeout=timeout)
|
||||||
|
|
||||||
def delete(self):
|
def delete(self):
|
||||||
"""Manually delete from cache, used on channel disconnect"""
|
"""Manually delete from cache, used on channel disconnect"""
|
||||||
full_key = f"{self._outpost.state_cache_prefix}_{self.uid}"
|
full_key = f"{self._outpost.state_cache_prefix}/{self.uid}"
|
||||||
cache.delete(full_key)
|
cache.delete(full_key)
|
||||||
|
|
|
@ -45,7 +45,7 @@ from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesCont
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
CACHE_KEY_OUTPOST_DOWN = "outpost_teardown_%s"
|
CACHE_KEY_OUTPOST_DOWN = "goauthentik.io/outposts/teardown/%s"
|
||||||
|
|
||||||
|
|
||||||
def controller_for_outpost(outpost: Outpost) -> Optional[type[BaseController]]:
|
def controller_for_outpost(outpost: Outpost) -> Optional[type[BaseController]]:
|
||||||
|
@ -148,6 +148,8 @@ def outpost_controller(
|
||||||
except (ControllerException, ServiceConnectionInvalid) as exc:
|
except (ControllerException, ServiceConnectionInvalid) as exc:
|
||||||
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
|
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
|
||||||
else:
|
else:
|
||||||
|
if from_cache:
|
||||||
|
cache.delete(CACHE_KEY_OUTPOST_DOWN % outpost_pk)
|
||||||
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs))
|
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs))
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -208,7 +208,7 @@ class TestProviderProxyConnect(ChannelsLiveServerTestCase):
|
||||||
"""Test proxy connectivity over websocket"""
|
"""Test proxy connectivity over websocket"""
|
||||||
outpost_connection_discovery() # pylint: disable=no-value-for-parameter
|
outpost_connection_discovery() # pylint: disable=no-value-for-parameter
|
||||||
proxy: ProxyProvider = ProxyProvider.objects.create(
|
proxy: ProxyProvider = ProxyProvider.objects.create(
|
||||||
name="proxy_provider",
|
name=generate_id(),
|
||||||
authorization_flow=Flow.objects.get(
|
authorization_flow=Flow.objects.get(
|
||||||
slug="default-provider-authorization-implicit-consent"
|
slug="default-provider-authorization-implicit-consent"
|
||||||
),
|
),
|
||||||
|
@ -222,7 +222,7 @@ class TestProviderProxyConnect(ChannelsLiveServerTestCase):
|
||||||
Application.objects.create(name="proxy", slug="proxy", provider=proxy)
|
Application.objects.create(name="proxy", slug="proxy", provider=proxy)
|
||||||
service_connection = DockerServiceConnection.objects.get(local=True)
|
service_connection = DockerServiceConnection.objects.get(local=True)
|
||||||
outpost: Outpost = Outpost.objects.create(
|
outpost: Outpost = Outpost.objects.create(
|
||||||
name="proxy_outpost",
|
name=generate_id(),
|
||||||
type=OutpostType.PROXY,
|
type=OutpostType.PROXY,
|
||||||
service_connection=service_connection,
|
service_connection=service_connection,
|
||||||
_config=asdict(OutpostConfig(authentik_host=self.live_server_url, log_level="debug")),
|
_config=asdict(OutpostConfig(authentik_host=self.live_server_url, log_level="debug")),
|
||||||
|
@ -241,7 +241,7 @@ class TestProviderProxyConnect(ChannelsLiveServerTestCase):
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
|
|
||||||
state = outpost.state
|
state = outpost.state
|
||||||
self.assertEqual(len(state), 1)
|
self.assertGreaterEqual(len(state), 1)
|
||||||
|
|
||||||
# Make sure to delete the outpost to remove the container
|
# Make sure to delete the outpost to remove the container
|
||||||
outpost.delete()
|
outpost.delete()
|
||||||
|
|
Reference in a new issue