From 7a4e8af1aec7af337804e420116c82ece0785fc8 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Sun, 13 Sep 2020 14:29:40 +0200 Subject: [PATCH] outpost: fix outpost update signal only being sent to outposts connected to the same passbook instance --- passbook/outposts/models.py | 7 ++++++- passbook/outposts/signals.py | 21 ++++++--------------- passbook/outposts/tasks.py | 29 ++++++++++++++++++++++++++++- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/passbook/outposts/models.py b/passbook/outposts/models.py index 432bc4ea7..e52081d18 100644 --- a/passbook/outposts/models.py +++ b/passbook/outposts/models.py @@ -9,6 +9,7 @@ from dacite import from_dict from django.contrib.postgres.fields import ArrayField from django.core.cache import cache from django.db import models +from django.db.models.base import Model from django.utils.translation import gettext_lazy as _ from guardian.shortcuts import assign_perm @@ -30,13 +31,17 @@ class OutpostConfig: ) -class OutpostModel: +class OutpostModel(Model): """Base model for providers that need more objects than just themselves""" def get_required_objects(self) -> Iterable[models.Model]: """Return a list of all required objects""" return [self] + class Meta: + + abstract = True + class OutpostType(models.TextChoices): """Outpost types, currently only the reverse proxy is available""" diff --git a/passbook/outposts/signals.py b/passbook/outposts/signals.py index 880aa4b53..373f03bfd 100644 --- a/passbook/outposts/signals.py +++ b/passbook/outposts/signals.py @@ -1,31 +1,31 @@ """passbook outpost signals""" -from asgiref.sync import async_to_sync -from channels.layers import get_channel_layer from django.db.models import Model from django.db.models.signals import post_save from django.dispatch import receiver from structlog import get_logger +from passbook.lib.utils.reflection import class_to_path from passbook.outposts.models import Outpost, OutpostModel +from passbook.outposts.tasks import outpost_send_update LOGGER = get_logger() @receiver(post_save, sender=Outpost) # pylint: disable=unused-argument -def ensure_user_and_token(sender, instance, **_): +def ensure_user_and_token(sender, instance: Model, **_): """Ensure that token is created/updated on save""" _ = instance.token @receiver(post_save) # pylint: disable=unused-argument -def post_save_update(sender, instance, **_): +def post_save_update(sender, instance: Model, **_): """If an OutpostModel, or a model that is somehow connected to an OutpostModel is saved, we send a message down the relevant OutpostModels WS connection to trigger an update""" if isinstance(instance, OutpostModel): LOGGER.debug("triggering outpost update from outpostmodel", instance=instance) - _send_update(instance) + outpost_send_update.delay(class_to_path(instance.__class__), instance.pk) return for field in instance._meta.get_fields(): @@ -46,13 +46,4 @@ def post_save_update(sender, instance, **_): # Because the Outpost Model has an M2M to Provider, # we have to iterate over the entire QS for reverse in getattr(instance, field_name).all(): - _send_update(reverse) - - -def _send_update(outpost_model: Model): - """Send update trigger for each channel of an outpost model""" - for outpost in outpost_model.outpost_set.all(): - channel_layer = get_channel_layer() - for channel in outpost.channels: - LOGGER.debug("sending update", channel=channel) - async_to_sync(channel_layer.send)(channel, {"type": "event.update"}) + outpost_send_update(class_to_path(reverse.__class__), reverse.pk) diff --git a/passbook/outposts/tasks.py b/passbook/outposts/tasks.py index 8b4d33e47..e7d7fe717 100644 --- a/passbook/outposts/tasks.py +++ b/passbook/outposts/tasks.py @@ -1,8 +1,22 @@ """outpost tasks""" -from passbook.outposts.models import Outpost, OutpostDeploymentType, OutpostType +from typing import Any + +from asgiref.sync import async_to_sync +from channels.layers import get_channel_layer +from structlog import get_logger + +from passbook.lib.utils.reflection import path_to_class +from passbook.outposts.models import ( + Outpost, + OutpostDeploymentType, + OutpostModel, + OutpostType, +) from passbook.providers.proxy.controllers.kubernetes import ProxyKubernetesController from passbook.root.celery import CELERY_APP +LOGGER = get_logger() + @CELERY_APP.task(bind=True) # pylint: disable=unused-argument @@ -20,3 +34,16 @@ def outpost_k8s_controller_single(self, outpost: str, outpost_type: str): """Launch Kubernetes manager and reconcile deployment/service/etc""" if outpost_type == OutpostType.PROXY: ProxyKubernetesController(outpost).run() + + +@CELERY_APP.task() +def outpost_send_update(model_class: str, model_pk: Any): + """Send outpost update to all registered outposts, irregardless to which passbook + instance they are connected""" + model = path_to_class(model_class) + outpost_model: OutpostModel = model.objects.get(model_pk) + for outpost in outpost_model.outpost_set.all(): + channel_layer = get_channel_layer() + for channel in outpost.channels: + LOGGER.debug("sending update", channel=channel) + async_to_sync(channel_layer.send)(channel, {"type": "event.update"})