58 lines
1.9 KiB
Python
58 lines
1.9 KiB
Python
"""Kubernetes Service Reconciler"""
|
|
from typing import Dict
|
|
|
|
from kubernetes.client import CoreV1Api, V1Service, V1ServicePort, V1ServiceSpec
|
|
|
|
from passbook.outposts.controllers.k8s.base import (
|
|
KubernetesObjectReconciler,
|
|
NeedsUpdate,
|
|
)
|
|
from passbook.outposts.models import Outpost
|
|
|
|
|
|
class ServiceReconciler(KubernetesObjectReconciler[V1Service]):
|
|
"""Kubernetes Service Reconciler"""
|
|
|
|
deployment_ports: Dict[str, int]
|
|
|
|
def __init__(self, outpost: Outpost) -> None:
|
|
super().__init__(outpost)
|
|
self.api = CoreV1Api()
|
|
self.deployment_ports = {}
|
|
|
|
def reconcile(self, current: V1Service, reference: V1Service):
|
|
if len(current.spec.ports) != len(reference.spec.ports):
|
|
raise NeedsUpdate()
|
|
for port in reference.spec.ports:
|
|
if port not in current.spec.ports:
|
|
raise NeedsUpdate()
|
|
|
|
def get_reference_object(self) -> V1Service:
|
|
"""Get deployment object for outpost"""
|
|
meta = self.get_object_meta(name=f"passbook-outpost-{self.outpost.name}")
|
|
ports = []
|
|
for port_name, port in self.deployment_ports.items():
|
|
ports.append(V1ServicePort(name=port_name, port=port))
|
|
return V1Service(
|
|
metadata=meta,
|
|
spec=V1ServiceSpec(ports=ports, selector=meta.labels, type="ClusterIP"),
|
|
)
|
|
|
|
def create(self, reference: V1Service):
|
|
return self.api.create_namespaced_service(self.namespace, reference)
|
|
|
|
def delete(self, reference: V1Service):
|
|
return self.api.delete_namespaced_service(
|
|
reference.metadata.name, self.namespace
|
|
)
|
|
|
|
def retrieve(self) -> V1Service:
|
|
return self.api.read_namespaced_service(
|
|
f"passbook-outpost-{self.outpost.name}", self.namespace
|
|
)
|
|
|
|
def update(self, current: V1Service, reference: V1Service):
|
|
return self.api.patch_namespaced_service(
|
|
current.metadata.name, self.namespace, reference
|
|
)
|