This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/passbook/outposts/controllers/kubernetes.py

68 lines
2.3 KiB
Python
Raw Normal View History

2020-09-02 22:04:12 +00:00
"""Kubernetes deployment controller"""
from io import StringIO
from typing import Dict, List, Type
2020-09-02 22:04:12 +00:00
from kubernetes.client import OpenApiException
2020-10-14 15:49:09 +00:00
from kubernetes.config import load_incluster_config, load_kube_config
from kubernetes.config.config_exception import ConfigException
2020-09-02 22:04:12 +00:00
from yaml import dump_all
from passbook.outposts.controllers.base import BaseController, ControllerException
from passbook.outposts.controllers.k8s.base import KubernetesObjectReconciler
2020-10-14 15:49:09 +00:00
from passbook.outposts.controllers.k8s.deployment import DeploymentReconciler
from passbook.outposts.controllers.k8s.secret import SecretReconciler
from passbook.outposts.controllers.k8s.service import ServiceReconciler
from passbook.outposts.models import Outpost
2020-09-02 22:04:12 +00:00
class KubernetesController(BaseController):
"""Manage deployment of outpost in kubernetes"""
reconcilers: Dict[str, Type[KubernetesObjectReconciler]]
reconcile_order: List[str]
def __init__(self, outpost: Outpost) -> None:
super().__init__(outpost)
2020-10-14 15:49:09 +00:00
try:
load_incluster_config()
except ConfigException:
load_kube_config()
self.reconcilers = {
"secret": SecretReconciler,
"deployment": DeploymentReconciler,
"service": ServiceReconciler,
}
self.reconcile_order = ["secret", "deployment", "service"]
2020-09-02 22:04:12 +00:00
def up(self):
try:
for reconcile_key in self.reconcile_order:
reconciler = self.reconcilers[reconcile_key](self)
reconciler.up()
2020-10-14 15:49:09 +00:00
except OpenApiException as exc:
raise ControllerException from exc
def down(self):
try:
for reconcile_key in self.reconcile_order:
reconciler = self.reconcilers[reconcile_key](self)
reconciler.down()
except OpenApiException as exc:
raise ControllerException from exc
2020-09-02 22:04:12 +00:00
def get_static_deployment(self) -> str:
documents = []
for reconcile_key in self.reconcile_order:
reconciler = self.reconcilers[reconcile_key](self)
documents.append(reconciler.get_reference_object().to_dict())
2020-10-14 15:49:09 +00:00
2020-09-02 22:04:12 +00:00
with StringIO() as _str:
dump_all(
documents,
2020-09-02 22:04:12 +00:00
stream=_str,
default_flow_style=False,
)
return _str.getvalue()