This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/authentik/outposts/consumer.py
Jens L 240cf6dd94
enterprise/providers: Add RAC [AUTH-15] (#7291)
* add basic guacamole

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* make everything mostly work

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add rac build to CI

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix resize, fix web lint, sendSize correctly

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* pre-send connection from client, format

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* improve throughput

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* cleanup

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* rework TokenOutpostConsumer into middleware

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix some layout issues

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add outpost controllers

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* start testing audio things

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix a bunch of things

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add deps

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix to work with outpost group

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add simple loadbalancing

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add simple reconnect

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* show reconnecting text

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix error when checking ports

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* move to providers

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add flow check to interface

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix go lint

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix rac app label

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix audio

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add logging

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* cleanup

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* allow overriding all settings

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix duplicate keyboard, debug high DPI

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* re-add deps

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix lint

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix missing __init__.py breaking model loading

I love python

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* bump successful ws connection to info

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* hide cursor since guac draws that

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add clipboard support (bidirectional)

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* make codespell not want to break the code

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* run pr comment in separate task

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* start endpoint and property mapping stuff

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* more endpoint things

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* unrelated: fix event model_pk filtering with ints

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* unrelated: improve event display for changelog

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* rebuild endpoint stuff again

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* idk special url

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* more stuff, connect token with session

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add disconnect

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* rework disconnect

cleanly disconnect from guacd instead of just letting the connection timeout

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* clear cache when creating outpost

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* support host:port and fix protocol

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* center smaller viewport

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* rework connection to wait more and stop after some time

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add policy control to endpoints

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* remove provider protocol

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* don't switch to different outpost connection when already chosen

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* start using property mappings, add static settings

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add some RAC mapping settings

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix lint

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* start adding tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add tests for event changes

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add tests and fix issues found by said tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add preview banner, move endpoints to main page

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add locale

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* auto-select endpoint if only one is available

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* backport https://github.com/goauthentik/authentik/pull/7831 to rac

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* dont select property mappings on endpoints

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* make table modal only load when opened

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* only auto-redirect when open

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix web deps

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* check for token expiry and terminate session

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* re-add endpoint name to title

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* disconnect connection when token is manually deleted

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add initial RAC docs

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add connection expiry setting to provider

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix flaky tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
2023-12-30 21:33:14 +01:00

149 lines
5.4 KiB
Python

"""Outpost websocket handler"""
from dataclasses import asdict, dataclass, field
from datetime import datetime
from enum import IntEnum
from typing import Any, Optional
from asgiref.sync import async_to_sync
from channels.exceptions import DenyConnection
from channels.generic.websocket import JsonWebsocketConsumer
from dacite.core import from_dict
from dacite.data import Data
from django.http.request import QueryDict
from guardian.shortcuts import get_objects_for_user
from structlog.stdlib import BoundLogger, get_logger
from authentik.outposts.apps import GAUGE_OUTPOSTS_CONNECTED, GAUGE_OUTPOSTS_LAST_UPDATE
from authentik.outposts.models import OUTPOST_HELLO_INTERVAL, Outpost, OutpostState
OUTPOST_GROUP = "group_outpost_%(outpost_pk)s"
OUTPOST_GROUP_INSTANCE = "group_outpost_%(outpost_pk)s_%(instance)s"
class WebsocketMessageInstruction(IntEnum):
"""Commands which can be triggered over Websocket"""
# Simple message used by either side when a message is acknowledged
ACK = 0
# Message used by outposts to report their alive status
HELLO = 1
# Message sent by us to trigger an Update
TRIGGER_UPDATE = 2
# Provider specific message
PROVIDER_SPECIFIC = 3
@dataclass(slots=True)
class WebsocketMessage:
"""Complete Websocket Message that is being sent"""
instruction: int
args: dict[str, Any] = field(default_factory=dict)
class OutpostConsumer(JsonWebsocketConsumer):
"""Handler for Outposts that connect over websockets for health checks and live updates"""
outpost: Optional[Outpost] = None
logger: BoundLogger
instance_uid: Optional[str] = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = get_logger()
def connect(self):
uuid = self.scope["url_route"]["kwargs"]["pk"]
user = self.scope["user"]
outpost = (
get_objects_for_user(user, "authentik_outposts.view_outpost").filter(pk=uuid).first()
)
if not outpost:
raise DenyConnection()
self.logger = self.logger.bind(outpost=outpost)
try:
self.accept()
except RuntimeError as exc:
self.logger.warning("runtime error during accept", exc=exc)
raise DenyConnection()
self.outpost = outpost
query = QueryDict(self.scope["query_string"].decode())
self.instance_uid = query.get("instance_uuid", self.channel_name)
async_to_sync(self.channel_layer.group_add)(
OUTPOST_GROUP % {"outpost_pk": str(self.outpost.pk)}, self.channel_name
)
async_to_sync(self.channel_layer.group_add)(
OUTPOST_GROUP_INSTANCE
% {"outpost_pk": str(self.outpost.pk), "instance": self.instance_uid},
self.channel_name,
)
GAUGE_OUTPOSTS_CONNECTED.labels(
outpost=self.outpost.name,
uid=self.instance_uid,
expected=self.outpost.config.kubernetes_replicas,
).inc()
def disconnect(self, code):
if self.outpost:
async_to_sync(self.channel_layer.group_discard)(
OUTPOST_GROUP % {"outpost_pk": str(self.outpost.pk)}, self.channel_name
)
if self.instance_uid:
async_to_sync(self.channel_layer.group_discard)(
OUTPOST_GROUP_INSTANCE
% {"outpost_pk": str(self.outpost.pk), "instance": self.instance_uid},
self.channel_name,
)
if self.outpost and self.instance_uid:
GAUGE_OUTPOSTS_CONNECTED.labels(
outpost=self.outpost.name,
uid=self.instance_uid,
expected=self.outpost.config.kubernetes_replicas,
).dec()
def receive_json(self, content: Data, **kwargs):
msg = from_dict(WebsocketMessage, content)
if not self.outpost:
raise DenyConnection()
state = OutpostState.for_instance_uid(self.outpost, self.instance_uid)
state.last_seen = datetime.now()
state.hostname = msg.args.pop("hostname", "")
if msg.instruction == WebsocketMessageInstruction.HELLO:
state.version = msg.args.pop("version", None)
state.build_hash = msg.args.pop("buildHash", "")
state.args.update(msg.args)
elif msg.instruction == WebsocketMessageInstruction.ACK:
return
GAUGE_OUTPOSTS_LAST_UPDATE.labels(
outpost=self.outpost.name,
uid=self.instance_uid or "",
version=state.version or "",
).set_to_current_time()
state.save(timeout=OUTPOST_HELLO_INTERVAL * 1.5)
response = WebsocketMessage(instruction=WebsocketMessageInstruction.ACK)
self.send_json(asdict(response))
def event_update(self, event): # pragma: no cover
"""Event handler which is called by post_save signals, Send update instruction"""
self.send_json(
asdict(WebsocketMessage(instruction=WebsocketMessageInstruction.TRIGGER_UPDATE))
)
def event_provider_specific(self, event):
"""Event handler which can be called by provider-specific
implementations to send specific messages to the outpost"""
self.send_json(
asdict(
WebsocketMessage(
instruction=WebsocketMessageInstruction.PROVIDER_SPECIFIC, args=event
)
)
)