diff --git a/passbook/admin/templates/administration/outpost/list.html b/passbook/admin/templates/administration/outpost/list.html
index 6fb976943..9d26dd4a2 100644
--- a/passbook/admin/templates/administration/outpost/list.html
+++ b/passbook/admin/templates/administration/outpost/list.html
@@ -48,28 +48,41 @@
{{ outpost.providers.all.select_subclasses|join:", " }}
-
- {% with health=outpost.deployment_health %}
- {% if health %}
- {{ health|naturaltime }}
- {% else %}
- Unhealthy
- {% endif %}
- {% endwith %}
- |
-
-
- {% with ver=outpost.deployment_version %}
- {% if not ver.version %}
+ {% with states=outpost.state %}
+ {% if states|length > 1 %}
+ |
+ {% for state in states %}
+
+ {% if state.last_seen %}
+ {{ state.last_seen|naturaltime }}
+ {% else %}
+ {% trans 'Unhealthy' %}
+ {% endif %}
+
+ {% endfor %}
+ |
+
+ {% for state in states %}
+
+ {% if not state.version %}
+
+ {% elif state.version_outdated %}
+ {% blocktrans with is=state.version should=state.version_should %}{{ is }}, should be {{ should }}{% endblocktrans %}
+ {% else %}
+ {{ state.version }}
+ {% endif %}
+
+ {% endfor %}
+ |
+ {% else %}
+
- {% elif ver.outdated %}
- {% blocktrans with is=ver.version should=ver.should %}{{ is }}, should be {{ should }}{% endblocktrans %}
- {% else %}
- {{ ver.version }}
- {% endif %}
- {% endwith %}
-
- |
+
+
+
+ |
+ {% endif %}
+ {% endwith %}
{% trans 'Edit' %}
{% trans 'Delete' %}
diff --git a/passbook/core/channels.py b/passbook/core/channels.py
index 1f1d732f4..ba5eaef12 100644
--- a/passbook/core/channels.py
+++ b/passbook/core/channels.py
@@ -18,6 +18,7 @@ class AuthJsonConsumer(JsonWebsocketConsumer):
if b"authorization" not in headers:
LOGGER.warning("WS Request without authorization header")
self.close()
+ return False
token = headers[b"authorization"]
try:
diff --git a/passbook/outposts/channels.py b/passbook/outposts/channels.py
index 9c4b83c4c..5ddaa7a8e 100644
--- a/passbook/outposts/channels.py
+++ b/passbook/outposts/channels.py
@@ -1,17 +1,16 @@
"""Outpost websocket handler"""
from dataclasses import asdict, dataclass, field
+from datetime import datetime
from enum import IntEnum
-from time import time
from typing import Any, Dict
from dacite import from_dict
from dacite.data import Data
-from django.core.cache import cache
from guardian.shortcuts import get_objects_for_user
from structlog import get_logger
from passbook.core.channels import AuthJsonConsumer
-from passbook.outposts.models import Outpost
+from passbook.outposts.models import OUTPOST_HELLO_INTERVAL, Outpost, OutpostState
LOGGER = get_logger()
@@ -54,24 +53,26 @@ class OutpostConsumer(AuthJsonConsumer):
return
self.accept()
self.outpost = outpost.first()
- self.outpost.channels.append(self.channel_name)
- LOGGER.debug("added channel to outpost", channel_name=self.channel_name)
- self.outpost.save()
+ OutpostState(
+ uid=self.channel_name, last_seen=datetime.now(), _outpost=self.outpost
+ ).save(timeout=OUTPOST_HELLO_INTERVAL * 2)
+ LOGGER.debug("added channel to cache", channel_name=self.channel_name)
# pylint: disable=unused-argument
def disconnect(self, close_code):
- self.outpost.channels.remove(self.channel_name)
- self.outpost.save()
- LOGGER.debug("removed channel from outpost", channel_name=self.channel_name)
+ OutpostState.for_channel(self.outpost, self.channel_name).delete()
+ LOGGER.debug("removed channel from cache", channel_name=self.channel_name)
def receive_json(self, content: Data):
msg = from_dict(WebsocketMessage, content)
+ state = OutpostState(
+ uid=self.channel_name,
+ last_seen=datetime.now(),
+ _outpost=self.outpost,
+ )
if msg.instruction == WebsocketMessageInstruction.HELLO:
- cache.set(self.outpost.state_cache_prefix("health"), time(), timeout=60)
- if "version" in msg.args:
- cache.set(
- self.outpost.state_cache_prefix("version"), msg.args["version"]
- )
+ state.version = msg.args.get("version", None)
+ state.save(timeout=OUTPOST_HELLO_INTERVAL * 2)
elif msg.instruction == WebsocketMessageInstruction.ACK:
return
@@ -80,7 +81,7 @@ class OutpostConsumer(AuthJsonConsumer):
# pylint: disable=unused-argument
def event_update(self, event):
- """Event handler which is called by post_save signals"""
+ """Event handler which is called by post_save signals, Send update instruction"""
self.send_json(
asdict(
WebsocketMessage(instruction=WebsocketMessageInstruction.TRIGGER_UPDATE)
diff --git a/passbook/outposts/migrations/0007_remove_outpost_channels.py b/passbook/outposts/migrations/0007_remove_outpost_channels.py
new file mode 100644
index 000000000..793a59be6
--- /dev/null
+++ b/passbook/outposts/migrations/0007_remove_outpost_channels.py
@@ -0,0 +1,17 @@
+# Generated by Django 3.1.2 on 2020-10-14 08:32
+
+from django.db import migrations
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("passbook_outposts", "0006_auto_20201003_2239"),
+ ]
+
+ operations = [
+ migrations.RemoveField(
+ model_name="outpost",
+ name="channels",
+ ),
+ ]
diff --git a/passbook/outposts/models.py b/passbook/outposts/models.py
index be9f44343..49fffe6ef 100644
--- a/passbook/outposts/models.py
+++ b/passbook/outposts/models.py
@@ -1,20 +1,18 @@
"""Outpost models"""
-from dataclasses import asdict, dataclass
+from dataclasses import asdict, dataclass, field
from datetime import datetime
-from typing import Any, Dict, Iterable, Optional
+from typing import Iterable, List, Optional, Union
from uuid import uuid4
from dacite import from_dict
-from django.contrib.postgres.fields import ArrayField
from django.core.cache import cache
from django.db import models, transaction
from django.db.models.base import Model
from django.http import HttpRequest
-from django.utils import version
from django.utils.translation import gettext_lazy as _
from guardian.models import UserObjectPermission
from guardian.shortcuts import assign_perm
-from packaging.version import InvalidVersion, parse
+from packaging.version import LegacyVersion, Version, parse
from passbook import __version__
from passbook.core.models import Provider, Token, TokenIntents, User
@@ -22,6 +20,7 @@ from passbook.lib.config import CONFIG
from passbook.lib.utils.template import render_to_string
OUR_VERSION = parse(__version__)
+OUTPOST_HELLO_INTERVAL = 10
@dataclass
@@ -87,8 +86,6 @@ class Outpost(models.Model):
providers = models.ManyToManyField(Provider)
- channels = ArrayField(models.TextField(), default=list)
-
@property
def config(self) -> OutpostConfig:
"""Load config as OutpostConfig object"""
@@ -99,36 +96,15 @@ class Outpost(models.Model):
"""Dump config into json"""
self._config = asdict(value)
- def state_cache_prefix(self, suffix: str) -> str:
+ @property
+ def state_cache_prefix(self) -> str:
"""Key by which the outposts status is saved"""
- return f"outpost_{self.uuid.hex}_state_{suffix}"
+ return f"outpost_{self.uuid.hex}_state"
@property
- def deployment_health(self) -> Optional[datetime]:
+ def state(self) -> List["OutpostState"]:
"""Get outpost's health status"""
- key = self.state_cache_prefix("health")
- value = cache.get(key, None)
- if value:
- return datetime.fromtimestamp(value)
- return None
-
- @property
- def deployment_version(self) -> Dict[str, Any]:
- """Get deployed outposts version, and if the version is behind ours.
- Returns a dict with keys version and outdated."""
- key = self.state_cache_prefix("version")
- value = cache.get(key, None)
- if not value:
- return {"version": None, "outdated": False, "should": OUR_VERSION}
- try:
- outpost_version = parse(value)
- return {
- "version": value,
- "outdated": outpost_version < OUR_VERSION,
- "should": OUR_VERSION,
- }
- except InvalidVersion:
- return {"version": version, "outdated": False, "should": OUR_VERSION}
+ return OutpostState.for_outpost(self)
@property
def user(self) -> User:
@@ -189,3 +165,53 @@ class Outpost(models.Model):
def __str__(self) -> str:
return f"Outpost {self.name}"
+
+
+@dataclass
+class OutpostState:
+ """Outpost instance state, last_seen and version"""
+
+ uid: str
+ last_seen: Optional[datetime] = field(default=None)
+ version: Optional[str] = field(default=None)
+ version_should: Union[Version, LegacyVersion] = field(default=OUR_VERSION)
+
+ _outpost: Optional[Outpost] = field(default=None)
+
+ @property
+ def version_outdated(self) -> bool:
+ """Check if outpost version matches our version"""
+ if not self.version:
+ return False
+ return parse(self.version) < OUR_VERSION
+
+ @staticmethod
+ def for_outpost(outpost: Outpost) -> List["OutpostState"]:
+ """Get all states for an outpost"""
+ keys = cache.keys(f"{outpost.state_cache_prefix}_*")
+ states = []
+ for key in keys:
+ channel = key.replace(f"{outpost.state_cache_prefix}_", "")
+ states.append(OutpostState.for_channel(outpost, channel))
+ return states
+
+ @staticmethod
+ def for_channel(outpost: Outpost, channel: str) -> "OutpostState":
+ """Get state for a single channel"""
+ key = f"{outpost.state_cache_prefix}_{channel}"
+ data = cache.get(key, {"uid": channel})
+ state = from_dict(OutpostState, data)
+ state.uid = channel
+ # pylint: disable=protected-access
+ state._outpost = outpost
+ return state
+
+ def save(self, timeout=OUTPOST_HELLO_INTERVAL):
+ """Save current state to cache"""
+ full_key = f"{self._outpost.state_cache_prefix}_{self.uid}"
+ return cache.set(full_key, asdict(self), timeout=timeout)
+
+ def delete(self):
+ """Manually delete from cache, used on channel disconnect"""
+ full_key = f"{self._outpost.state_cache_prefix}_{self.uid}"
+ cache.delete(full_key)
diff --git a/passbook/outposts/settings.py b/passbook/outposts/settings.py
index 62589e1c9..09e4a14fc 100644
--- a/passbook/outposts/settings.py
+++ b/passbook/outposts/settings.py
@@ -2,9 +2,9 @@
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
- "outposts_k8s": {
+ "outposts_controller": {
"task": "passbook.outposts.tasks.outpost_controller",
- "schedule": crontab(minute="*/5"), # Run every 5 minutes
+ "schedule": crontab(minute="*/5"),
"options": {"queue": "passbook_scheduled"},
- }
+ },
}
diff --git a/passbook/outposts/signals.py b/passbook/outposts/signals.py
index 373f03bfd..8c16a3e4c 100644
--- a/passbook/outposts/signals.py
+++ b/passbook/outposts/signals.py
@@ -23,8 +23,10 @@ def ensure_user_and_token(sender, instance: Model, **_):
def post_save_update(sender, instance: Model, **_):
"""If an OutpostModel, or a model that is somehow connected to an OutpostModel is saved,
we send a message down the relevant OutpostModels WS connection to trigger an update"""
- if isinstance(instance, OutpostModel):
- LOGGER.debug("triggering outpost update from outpostmodel", instance=instance)
+ if isinstance(instance, (OutpostModel, Outpost)):
+ LOGGER.debug(
+ "triggering outpost update from outpostmodel/outpost", instance=instance
+ )
outpost_send_update.delay(class_to_path(instance.__class__), instance.pk)
return
diff --git a/passbook/outposts/tasks.py b/passbook/outposts/tasks.py
index 9b88cec36..05d342e74 100644
--- a/passbook/outposts/tasks.py
+++ b/passbook/outposts/tasks.py
@@ -10,6 +10,7 @@ from passbook.outposts.models import (
Outpost,
OutpostDeploymentType,
OutpostModel,
+ OutpostState,
OutpostType,
)
from passbook.providers.proxy.controllers.docker import ProxyDockerController
@@ -19,9 +20,8 @@ from passbook.root.celery import CELERY_APP
LOGGER = get_logger()
-@CELERY_APP.task(bind=True)
-# pylint: disable=unused-argument
-def outpost_controller(self):
+@CELERY_APP.task()
+def outpost_controller():
"""Launch Controller for all Outposts which support it"""
for outpost in Outpost.objects.exclude(
deployment_type=OutpostDeploymentType.CUSTOM
@@ -31,17 +31,14 @@ def outpost_controller(self):
)
-@CELERY_APP.task(bind=True)
-# pylint: disable=unused-argument
-def outpost_controller_single(
- self, outpost: str, deployment_type: str, outpost_type: str
-):
+@CELERY_APP.task()
+def outpost_controller_single(outpost_pk: str, deployment_type: str, outpost_type: str):
"""Launch controller and reconcile deployment/service/etc"""
if outpost_type == OutpostType.PROXY:
if deployment_type == OutpostDeploymentType.KUBERNETES:
- ProxyKubernetesController(outpost).run()
+ ProxyKubernetesController(outpost_pk).run()
if deployment_type == OutpostDeploymentType.DOCKER:
- ProxyDockerController(outpost).run()
+ ProxyDockerController(outpost_pk).run()
@CELERY_APP.task()
@@ -49,9 +46,19 @@ def outpost_send_update(model_class: str, model_pk: Any):
"""Send outpost update to all registered outposts, irregardless to which passbook
instance they are connected"""
model = path_to_class(model_class)
- outpost_model: OutpostModel = model.objects.get(pk=model_pk)
- for outpost in outpost_model.outpost_set.all():
- channel_layer = get_channel_layer()
- for channel in outpost.channels:
- LOGGER.debug("sending update", channel=channel)
- async_to_sync(channel_layer.send)(channel, {"type": "event.update"})
+ model_instace = model.objects.get(pk=model_pk)
+ channel_layer = get_channel_layer()
+ if isinstance(model_instace, OutpostModel):
+ for outpost in model_instace.outpost_set.all():
+ _outpost_single_update(outpost, channel_layer)
+ elif isinstance(model_instace, Outpost):
+ _outpost_single_update(model_instace, channel_layer)
+
+
+def _outpost_single_update(outpost: Outpost, layer=None):
+ """Update outpost instances connected to a single outpost"""
+ if not layer: # pragma: no cover
+ layer = get_channel_layer()
+ for state in OutpostState.for_outpost(outpost):
+ LOGGER.debug("sending update", channel=state.uid, outpost=outpost)
+ async_to_sync(layer.send)(state.uid, {"type": "event.update"})
|