From 8ca23451c649fd9147d5da12a671a8ae7d5757f1 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Wed, 14 Oct 2020 10:44:17 +0200 Subject: [PATCH] outposts: rewrite state logic, use cache to expire old channels, support multiple instances --- .../administration/outpost/list.html | 55 ++++++----- passbook/core/channels.py | 1 + passbook/outposts/channels.py | 31 ++++--- .../0007_remove_outpost_channels.py | 17 ++++ passbook/outposts/models.py | 92 ++++++++++++------- passbook/outposts/settings.py | 6 +- passbook/outposts/signals.py | 6 +- passbook/outposts/tasks.py | 39 ++++---- 8 files changed, 157 insertions(+), 90 deletions(-) create mode 100644 passbook/outposts/migrations/0007_remove_outpost_channels.py diff --git a/passbook/admin/templates/administration/outpost/list.html b/passbook/admin/templates/administration/outpost/list.html index 6fb976943..9d26dd4a2 100644 --- a/passbook/admin/templates/administration/outpost/list.html +++ b/passbook/admin/templates/administration/outpost/list.html @@ -48,28 +48,41 @@ {{ outpost.providers.all.select_subclasses|join:", " }} - - {% with health=outpost.deployment_health %} - {% if health %} - {{ health|naturaltime }} - {% else %} - Unhealthy - {% endif %} - {% endwith %} - - - - {% with ver=outpost.deployment_version %} - {% if not ver.version %} + {% with states=outpost.state %} + {% if states|length > 1 %} + + {% for state in states %} +
+ {% if state.last_seen %} + {{ state.last_seen|naturaltime }} + {% else %} + {% trans 'Unhealthy' %} + {% endif %} +
+ {% endfor %} + + + {% for state in states %} +
+ {% if not state.version %} + + {% elif state.version_outdated %} + {% blocktrans with is=state.version should=state.version_should %}{{ is }}, should be {{ should }}{% endblocktrans %} + {% else %} + {{ state.version }} + {% endif %} +
+ {% endfor %} + + {% else %} + - {% elif ver.outdated %} - {% blocktrans with is=ver.version should=ver.should %}{{ is }}, should be {{ should }}{% endblocktrans %} - {% else %} - {{ ver.version }} - {% endif %} - {% endwith %} -
- + + + + + {% endif %} + {% endwith %} {% trans 'Edit' %} {% trans 'Delete' %} diff --git a/passbook/core/channels.py b/passbook/core/channels.py index 1f1d732f4..ba5eaef12 100644 --- a/passbook/core/channels.py +++ b/passbook/core/channels.py @@ -18,6 +18,7 @@ class AuthJsonConsumer(JsonWebsocketConsumer): if b"authorization" not in headers: LOGGER.warning("WS Request without authorization header") self.close() + return False token = headers[b"authorization"] try: diff --git a/passbook/outposts/channels.py b/passbook/outposts/channels.py index 9c4b83c4c..5ddaa7a8e 100644 --- a/passbook/outposts/channels.py +++ b/passbook/outposts/channels.py @@ -1,17 +1,16 @@ """Outpost websocket handler""" from dataclasses import asdict, dataclass, field +from datetime import datetime from enum import IntEnum -from time import time from typing import Any, Dict from dacite import from_dict from dacite.data import Data -from django.core.cache import cache from guardian.shortcuts import get_objects_for_user from structlog import get_logger from passbook.core.channels import AuthJsonConsumer -from passbook.outposts.models import Outpost +from passbook.outposts.models import OUTPOST_HELLO_INTERVAL, Outpost, OutpostState LOGGER = get_logger() @@ -54,24 +53,26 @@ class OutpostConsumer(AuthJsonConsumer): return self.accept() self.outpost = outpost.first() - self.outpost.channels.append(self.channel_name) - LOGGER.debug("added channel to outpost", channel_name=self.channel_name) - self.outpost.save() + OutpostState( + uid=self.channel_name, last_seen=datetime.now(), _outpost=self.outpost + ).save(timeout=OUTPOST_HELLO_INTERVAL * 2) + LOGGER.debug("added channel to cache", channel_name=self.channel_name) # pylint: disable=unused-argument def disconnect(self, close_code): - self.outpost.channels.remove(self.channel_name) - self.outpost.save() - LOGGER.debug("removed channel from outpost", channel_name=self.channel_name) + OutpostState.for_channel(self.outpost, self.channel_name).delete() + LOGGER.debug("removed channel from cache", channel_name=self.channel_name) def receive_json(self, content: Data): msg = from_dict(WebsocketMessage, content) + state = OutpostState( + uid=self.channel_name, + last_seen=datetime.now(), + _outpost=self.outpost, + ) if msg.instruction == WebsocketMessageInstruction.HELLO: - cache.set(self.outpost.state_cache_prefix("health"), time(), timeout=60) - if "version" in msg.args: - cache.set( - self.outpost.state_cache_prefix("version"), msg.args["version"] - ) + state.version = msg.args.get("version", None) + state.save(timeout=OUTPOST_HELLO_INTERVAL * 2) elif msg.instruction == WebsocketMessageInstruction.ACK: return @@ -80,7 +81,7 @@ class OutpostConsumer(AuthJsonConsumer): # pylint: disable=unused-argument def event_update(self, event): - """Event handler which is called by post_save signals""" + """Event handler which is called by post_save signals, Send update instruction""" self.send_json( asdict( WebsocketMessage(instruction=WebsocketMessageInstruction.TRIGGER_UPDATE) diff --git a/passbook/outposts/migrations/0007_remove_outpost_channels.py b/passbook/outposts/migrations/0007_remove_outpost_channels.py new file mode 100644 index 000000000..793a59be6 --- /dev/null +++ b/passbook/outposts/migrations/0007_remove_outpost_channels.py @@ -0,0 +1,17 @@ +# Generated by Django 3.1.2 on 2020-10-14 08:32 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("passbook_outposts", "0006_auto_20201003_2239"), + ] + + operations = [ + migrations.RemoveField( + model_name="outpost", + name="channels", + ), + ] diff --git a/passbook/outposts/models.py b/passbook/outposts/models.py index be9f44343..49fffe6ef 100644 --- a/passbook/outposts/models.py +++ b/passbook/outposts/models.py @@ -1,20 +1,18 @@ """Outpost models""" -from dataclasses import asdict, dataclass +from dataclasses import asdict, dataclass, field from datetime import datetime -from typing import Any, Dict, Iterable, Optional +from typing import Iterable, List, Optional, Union from uuid import uuid4 from dacite import from_dict -from django.contrib.postgres.fields import ArrayField from django.core.cache import cache from django.db import models, transaction from django.db.models.base import Model from django.http import HttpRequest -from django.utils import version from django.utils.translation import gettext_lazy as _ from guardian.models import UserObjectPermission from guardian.shortcuts import assign_perm -from packaging.version import InvalidVersion, parse +from packaging.version import LegacyVersion, Version, parse from passbook import __version__ from passbook.core.models import Provider, Token, TokenIntents, User @@ -22,6 +20,7 @@ from passbook.lib.config import CONFIG from passbook.lib.utils.template import render_to_string OUR_VERSION = parse(__version__) +OUTPOST_HELLO_INTERVAL = 10 @dataclass @@ -87,8 +86,6 @@ class Outpost(models.Model): providers = models.ManyToManyField(Provider) - channels = ArrayField(models.TextField(), default=list) - @property def config(self) -> OutpostConfig: """Load config as OutpostConfig object""" @@ -99,36 +96,15 @@ class Outpost(models.Model): """Dump config into json""" self._config = asdict(value) - def state_cache_prefix(self, suffix: str) -> str: + @property + def state_cache_prefix(self) -> str: """Key by which the outposts status is saved""" - return f"outpost_{self.uuid.hex}_state_{suffix}" + return f"outpost_{self.uuid.hex}_state" @property - def deployment_health(self) -> Optional[datetime]: + def state(self) -> List["OutpostState"]: """Get outpost's health status""" - key = self.state_cache_prefix("health") - value = cache.get(key, None) - if value: - return datetime.fromtimestamp(value) - return None - - @property - def deployment_version(self) -> Dict[str, Any]: - """Get deployed outposts version, and if the version is behind ours. - Returns a dict with keys version and outdated.""" - key = self.state_cache_prefix("version") - value = cache.get(key, None) - if not value: - return {"version": None, "outdated": False, "should": OUR_VERSION} - try: - outpost_version = parse(value) - return { - "version": value, - "outdated": outpost_version < OUR_VERSION, - "should": OUR_VERSION, - } - except InvalidVersion: - return {"version": version, "outdated": False, "should": OUR_VERSION} + return OutpostState.for_outpost(self) @property def user(self) -> User: @@ -189,3 +165,53 @@ class Outpost(models.Model): def __str__(self) -> str: return f"Outpost {self.name}" + + +@dataclass +class OutpostState: + """Outpost instance state, last_seen and version""" + + uid: str + last_seen: Optional[datetime] = field(default=None) + version: Optional[str] = field(default=None) + version_should: Union[Version, LegacyVersion] = field(default=OUR_VERSION) + + _outpost: Optional[Outpost] = field(default=None) + + @property + def version_outdated(self) -> bool: + """Check if outpost version matches our version""" + if not self.version: + return False + return parse(self.version) < OUR_VERSION + + @staticmethod + def for_outpost(outpost: Outpost) -> List["OutpostState"]: + """Get all states for an outpost""" + keys = cache.keys(f"{outpost.state_cache_prefix}_*") + states = [] + for key in keys: + channel = key.replace(f"{outpost.state_cache_prefix}_", "") + states.append(OutpostState.for_channel(outpost, channel)) + return states + + @staticmethod + def for_channel(outpost: Outpost, channel: str) -> "OutpostState": + """Get state for a single channel""" + key = f"{outpost.state_cache_prefix}_{channel}" + data = cache.get(key, {"uid": channel}) + state = from_dict(OutpostState, data) + state.uid = channel + # pylint: disable=protected-access + state._outpost = outpost + return state + + def save(self, timeout=OUTPOST_HELLO_INTERVAL): + """Save current state to cache""" + full_key = f"{self._outpost.state_cache_prefix}_{self.uid}" + return cache.set(full_key, asdict(self), timeout=timeout) + + def delete(self): + """Manually delete from cache, used on channel disconnect""" + full_key = f"{self._outpost.state_cache_prefix}_{self.uid}" + cache.delete(full_key) diff --git a/passbook/outposts/settings.py b/passbook/outposts/settings.py index 62589e1c9..09e4a14fc 100644 --- a/passbook/outposts/settings.py +++ b/passbook/outposts/settings.py @@ -2,9 +2,9 @@ from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { - "outposts_k8s": { + "outposts_controller": { "task": "passbook.outposts.tasks.outpost_controller", - "schedule": crontab(minute="*/5"), # Run every 5 minutes + "schedule": crontab(minute="*/5"), "options": {"queue": "passbook_scheduled"}, - } + }, } diff --git a/passbook/outposts/signals.py b/passbook/outposts/signals.py index 373f03bfd..8c16a3e4c 100644 --- a/passbook/outposts/signals.py +++ b/passbook/outposts/signals.py @@ -23,8 +23,10 @@ def ensure_user_and_token(sender, instance: Model, **_): def post_save_update(sender, instance: Model, **_): """If an OutpostModel, or a model that is somehow connected to an OutpostModel is saved, we send a message down the relevant OutpostModels WS connection to trigger an update""" - if isinstance(instance, OutpostModel): - LOGGER.debug("triggering outpost update from outpostmodel", instance=instance) + if isinstance(instance, (OutpostModel, Outpost)): + LOGGER.debug( + "triggering outpost update from outpostmodel/outpost", instance=instance + ) outpost_send_update.delay(class_to_path(instance.__class__), instance.pk) return diff --git a/passbook/outposts/tasks.py b/passbook/outposts/tasks.py index 9b88cec36..05d342e74 100644 --- a/passbook/outposts/tasks.py +++ b/passbook/outposts/tasks.py @@ -10,6 +10,7 @@ from passbook.outposts.models import ( Outpost, OutpostDeploymentType, OutpostModel, + OutpostState, OutpostType, ) from passbook.providers.proxy.controllers.docker import ProxyDockerController @@ -19,9 +20,8 @@ from passbook.root.celery import CELERY_APP LOGGER = get_logger() -@CELERY_APP.task(bind=True) -# pylint: disable=unused-argument -def outpost_controller(self): +@CELERY_APP.task() +def outpost_controller(): """Launch Controller for all Outposts which support it""" for outpost in Outpost.objects.exclude( deployment_type=OutpostDeploymentType.CUSTOM @@ -31,17 +31,14 @@ def outpost_controller(self): ) -@CELERY_APP.task(bind=True) -# pylint: disable=unused-argument -def outpost_controller_single( - self, outpost: str, deployment_type: str, outpost_type: str -): +@CELERY_APP.task() +def outpost_controller_single(outpost_pk: str, deployment_type: str, outpost_type: str): """Launch controller and reconcile deployment/service/etc""" if outpost_type == OutpostType.PROXY: if deployment_type == OutpostDeploymentType.KUBERNETES: - ProxyKubernetesController(outpost).run() + ProxyKubernetesController(outpost_pk).run() if deployment_type == OutpostDeploymentType.DOCKER: - ProxyDockerController(outpost).run() + ProxyDockerController(outpost_pk).run() @CELERY_APP.task() @@ -49,9 +46,19 @@ def outpost_send_update(model_class: str, model_pk: Any): """Send outpost update to all registered outposts, irregardless to which passbook instance they are connected""" model = path_to_class(model_class) - outpost_model: OutpostModel = model.objects.get(pk=model_pk) - for outpost in outpost_model.outpost_set.all(): - channel_layer = get_channel_layer() - for channel in outpost.channels: - LOGGER.debug("sending update", channel=channel) - async_to_sync(channel_layer.send)(channel, {"type": "event.update"}) + model_instace = model.objects.get(pk=model_pk) + channel_layer = get_channel_layer() + if isinstance(model_instace, OutpostModel): + for outpost in model_instace.outpost_set.all(): + _outpost_single_update(outpost, channel_layer) + elif isinstance(model_instace, Outpost): + _outpost_single_update(model_instace, channel_layer) + + +def _outpost_single_update(outpost: Outpost, layer=None): + """Update outpost instances connected to a single outpost""" + if not layer: # pragma: no cover + layer = get_channel_layer() + for state in OutpostState.for_outpost(outpost): + LOGGER.debug("sending update", channel=state.uid, outpost=outpost) + async_to_sync(layer.send)(state.uid, {"type": "event.update"})