This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/authentik/outposts/controllers/base.py
Jens Langhammer 3e666de91d outposts: fix formatting of image name
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2021-04-30 16:52:28 +02:00

65 lines
1.9 KiB
Python

"""Base Controller"""
from dataclasses import dataclass
from structlog.stdlib import get_logger
from structlog.testing import capture_logs
from authentik import __version__
from authentik.lib.config import CONFIG
from authentik.lib.sentry import SentryIgnoredException
from authentik.outposts.models import Outpost, OutpostServiceConnection
FIELD_MANAGER = "goauthentik.io"
class ControllerException(SentryIgnoredException):
"""Exception raised when anything fails during controller run"""
@dataclass
class DeploymentPort:
"""Info about deployment's single port."""
port: int
name: str
protocol: str
class BaseController:
"""Base Outpost deployment controller"""
deployment_ports: list[DeploymentPort]
outpost: Outpost
connection: OutpostServiceConnection
def __init__(self, outpost: Outpost, connection: OutpostServiceConnection):
self.outpost = outpost
self.connection = connection
self.logger = get_logger()
self.deployment_ports = []
# pylint: disable=invalid-name
def up(self):
"""Called by scheduled task to reconcile deployment/service/etc"""
raise NotImplementedError
def up_with_logs(self) -> list[str]:
"""Call .up() but capture all log output and return it."""
with capture_logs() as logs:
self.up()
return [x["event"] for x in logs]
def down(self):
"""Handler to delete everything we've created"""
raise NotImplementedError
def get_static_deployment(self) -> str:
"""Return a static deployment configuration"""
raise NotImplementedError
def get_container_image(self) -> str:
"""Get container image to use for this outpost"""
image_name_template: str = CONFIG.y("outposts.docker_image_base")
return image_name_template % {"type": self.outpost.type, "version": __version__}