2020-09-02 22:04:12 +00:00
|
|
|
"""outpost tasks"""
|
2020-09-13 12:29:40 +00:00
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
from asgiref.sync import async_to_sync
|
|
|
|
from channels.layers import get_channel_layer
|
2020-10-14 16:41:16 +00:00
|
|
|
from django.db.models.base import Model
|
2020-10-16 19:31:12 +00:00
|
|
|
from django.utils.text import slugify
|
2020-09-13 12:29:40 +00:00
|
|
|
from structlog import get_logger
|
|
|
|
|
2020-10-16 09:38:49 +00:00
|
|
|
from passbook.lib.tasks import MonitoredTask, TaskResult, TaskResultStatus
|
2020-09-13 12:29:40 +00:00
|
|
|
from passbook.lib.utils.reflection import path_to_class
|
2020-10-16 09:38:49 +00:00
|
|
|
from passbook.outposts.controllers.base import ControllerException
|
2020-09-13 12:29:40 +00:00
|
|
|
from passbook.outposts.models import (
|
|
|
|
Outpost,
|
|
|
|
OutpostDeploymentType,
|
|
|
|
OutpostModel,
|
2020-10-14 08:44:17 +00:00
|
|
|
OutpostState,
|
2020-09-13 12:29:40 +00:00
|
|
|
OutpostType,
|
|
|
|
)
|
2020-10-03 22:36:12 +00:00
|
|
|
from passbook.providers.proxy.controllers.docker import ProxyDockerController
|
2020-09-02 22:04:12 +00:00
|
|
|
from passbook.providers.proxy.controllers.kubernetes import ProxyKubernetesController
|
|
|
|
from passbook.root.celery import CELERY_APP
|
|
|
|
|
2020-09-13 12:29:40 +00:00
|
|
|
LOGGER = get_logger()
|
|
|
|
|
2020-09-02 22:04:12 +00:00
|
|
|
|
2020-10-14 08:44:17 +00:00
|
|
|
@CELERY_APP.task()
|
2020-10-16 09:38:49 +00:00
|
|
|
def outpost_controller_all():
|
2020-10-03 22:36:12 +00:00
|
|
|
"""Launch Controller for all Outposts which support it"""
|
|
|
|
for outpost in Outpost.objects.exclude(
|
|
|
|
deployment_type=OutpostDeploymentType.CUSTOM
|
2020-09-02 22:04:12 +00:00
|
|
|
):
|
2020-10-16 10:54:52 +00:00
|
|
|
outpost_controller.delay(outpost.pk.hex)
|
2020-09-02 22:04:12 +00:00
|
|
|
|
|
|
|
|
2020-10-16 09:38:49 +00:00
|
|
|
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
2020-10-16 10:54:52 +00:00
|
|
|
def outpost_controller(self: MonitoredTask, outpost_pk: str):
|
2020-10-19 19:35:31 +00:00
|
|
|
"""Create/update/monitor the deployment of an Outpost"""
|
2020-10-16 09:38:49 +00:00
|
|
|
logs = []
|
2020-10-16 10:54:52 +00:00
|
|
|
outpost: Outpost = Outpost.objects.get(pk=outpost_pk)
|
2020-10-19 19:29:58 +00:00
|
|
|
self.set_uid(slugify(outpost.name))
|
2020-10-16 09:38:49 +00:00
|
|
|
try:
|
2020-10-16 10:54:52 +00:00
|
|
|
if outpost.type == OutpostType.PROXY:
|
|
|
|
if outpost.deployment_type == OutpostDeploymentType.KUBERNETES:
|
2020-10-16 20:22:15 +00:00
|
|
|
logs = ProxyKubernetesController(outpost).up_with_logs()
|
2020-10-16 10:54:52 +00:00
|
|
|
if outpost.deployment_type == OutpostDeploymentType.DOCKER:
|
2020-10-16 20:22:15 +00:00
|
|
|
logs = ProxyDockerController(outpost).up_with_logs()
|
2020-10-16 09:38:49 +00:00
|
|
|
except ControllerException as exc:
|
2020-10-19 19:29:58 +00:00
|
|
|
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
|
2020-10-16 09:38:49 +00:00
|
|
|
else:
|
2020-10-19 19:29:58 +00:00
|
|
|
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs))
|
2020-09-13 12:29:40 +00:00
|
|
|
|
|
|
|
|
2020-10-16 20:26:47 +00:00
|
|
|
@CELERY_APP.task()
|
|
|
|
def outpost_pre_delete(outpost_pk: str):
|
|
|
|
"""Delete outpost objects before deleting the DB Object"""
|
|
|
|
outpost = Outpost.objects.get(pk=outpost_pk)
|
|
|
|
if outpost.type == OutpostType.PROXY:
|
|
|
|
if outpost.deployment_type == OutpostDeploymentType.KUBERNETES:
|
|
|
|
ProxyKubernetesController(outpost).down()
|
|
|
|
if outpost.deployment_type == OutpostDeploymentType.DOCKER:
|
|
|
|
ProxyDockerController(outpost).down()
|
|
|
|
|
|
|
|
|
2020-09-13 12:29:40 +00:00
|
|
|
@CELERY_APP.task()
|
2020-10-14 16:41:16 +00:00
|
|
|
def outpost_post_save(model_class: str, model_pk: Any):
|
|
|
|
"""If an Outpost is saved, Ensure that token is created/updated
|
|
|
|
|
|
|
|
If an OutpostModel, or a model that is somehow connected to an OutpostModel is saved,
|
|
|
|
we send a message down the relevant OutpostModels WS connection to trigger an update"""
|
|
|
|
model: Model = path_to_class(model_class)
|
|
|
|
try:
|
|
|
|
instance = model.objects.get(pk=model_pk)
|
|
|
|
except model.DoesNotExist:
|
|
|
|
LOGGER.warning("Model does not exist", model=model, pk=model_pk)
|
|
|
|
return
|
|
|
|
|
|
|
|
if isinstance(instance, Outpost):
|
|
|
|
LOGGER.debug("Ensuring token for outpost", instance=instance)
|
|
|
|
_ = instance.token
|
2020-10-16 19:08:35 +00:00
|
|
|
LOGGER.debug("Trigger reconcile for outpost")
|
|
|
|
outpost_controller.delay(instance.pk)
|
2020-10-14 16:41:16 +00:00
|
|
|
return
|
|
|
|
|
|
|
|
if isinstance(instance, (OutpostModel, Outpost)):
|
|
|
|
LOGGER.debug(
|
|
|
|
"triggering outpost update from outpostmodel/outpost", instance=instance
|
|
|
|
)
|
|
|
|
outpost_send_update(instance)
|
|
|
|
return
|
|
|
|
|
|
|
|
for field in instance._meta.get_fields():
|
|
|
|
# Each field is checked if it has a `related_model` attribute (when ForeginKeys or M2Ms)
|
|
|
|
# are used, and if it has a value
|
|
|
|
if not hasattr(field, "related_model"):
|
|
|
|
continue
|
|
|
|
if not field.related_model:
|
|
|
|
continue
|
|
|
|
if not issubclass(field.related_model, OutpostModel):
|
|
|
|
continue
|
|
|
|
|
|
|
|
field_name = f"{field.name}_set"
|
|
|
|
if not hasattr(instance, field_name):
|
|
|
|
continue
|
|
|
|
|
|
|
|
LOGGER.debug("triggering outpost update from from field", field=field.name)
|
|
|
|
# Because the Outpost Model has an M2M to Provider,
|
|
|
|
# we have to iterate over the entire QS
|
|
|
|
for reverse in getattr(instance, field_name).all():
|
|
|
|
outpost_send_update(reverse)
|
|
|
|
|
|
|
|
|
|
|
|
def outpost_send_update(model_instace: Model):
|
2020-09-13 12:29:40 +00:00
|
|
|
"""Send outpost update to all registered outposts, irregardless to which passbook
|
|
|
|
instance they are connected"""
|
2020-10-14 08:44:17 +00:00
|
|
|
channel_layer = get_channel_layer()
|
|
|
|
if isinstance(model_instace, OutpostModel):
|
|
|
|
for outpost in model_instace.outpost_set.all():
|
|
|
|
_outpost_single_update(outpost, channel_layer)
|
|
|
|
elif isinstance(model_instace, Outpost):
|
|
|
|
_outpost_single_update(model_instace, channel_layer)
|
|
|
|
|
|
|
|
|
|
|
|
def _outpost_single_update(outpost: Outpost, layer=None):
|
|
|
|
"""Update outpost instances connected to a single outpost"""
|
2020-10-29 16:25:29 +00:00
|
|
|
# Ensure token again, because this function is called when anything related to an
|
|
|
|
# OutpostModel is saved, so we can be sure permissions are right
|
|
|
|
_ = outpost.token
|
2020-10-14 08:44:17 +00:00
|
|
|
if not layer: # pragma: no cover
|
|
|
|
layer = get_channel_layer()
|
|
|
|
for state in OutpostState.for_outpost(outpost):
|
|
|
|
LOGGER.debug("sending update", channel=state.uid, outpost=outpost)
|
|
|
|
async_to_sync(layer.send)(state.uid, {"type": "event.update"})
|