This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/authentik/outposts/docker_ssh.py
Jens Langhammer 3b6497cd51 outposts: ensure keypair is set for SSH connections
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2022-02-13 15:39:37 +01:00

87 lines
2.8 KiB
Python

"""Docker SSH helper"""
import os
from pathlib import Path
from tempfile import gettempdir
from docker.errors import DockerException
from authentik.crypto.models import CertificateKeyPair
HEADER = "### Managed by authentik"
FOOTER = "### End Managed by authentik"
def opener(path, flags):
"""File opener to create files as 700 perms"""
return os.open(path, flags, 0o700)
class DockerInlineSSH:
"""Create paramiko ssh config from CertificateKeyPair"""
host: str
keypair: CertificateKeyPair
key_path: str
config_path: Path
header: str
def __init__(self, host: str, keypair: CertificateKeyPair) -> None:
self.host = host
self.keypair = keypair
if not self.keypair:
raise DockerException("keypair must be set for SSH connections")
self.config_path = Path("~/.ssh/config").expanduser()
self.header = f"{HEADER} - {self.host}\n"
def write_config(self, key_path: str) -> bool:
"""Update the local user's ssh config file"""
with open(self.config_path, "a+", encoding="utf-8") as ssh_config:
if self.header in ssh_config.readlines():
return False
ssh_config.writelines(
[
self.header,
f"Host {self.host}\n",
f" IdentityFile {key_path}\n",
f"{FOOTER}\n",
"\n",
]
)
return True
def write_key(self):
"""Write keypair's private key to a temporary file"""
path = Path(gettempdir(), f"{self.keypair.pk}_private.pem")
with open(path, "w", encoding="utf8", opener=opener) as _file:
_file.write(self.keypair.key_data)
return str(path)
def write(self):
"""Write keyfile and update ssh config"""
self.key_path = self.write_key()
was_written = self.write_config(self.key_path)
if not was_written:
self.cleanup()
def cleanup(self):
"""Cleanup when we're done"""
try:
os.unlink(self.key_path)
with open(self.config_path, "r+", encoding="utf-8") as ssh_config:
start = 0
end = 0
lines = ssh_config.readlines()
for idx, line in enumerate(lines):
if line == self.header:
start = idx
if start != 0 and line == f"{FOOTER}\n":
end = idx
with open(self.config_path, "w+", encoding="utf-8") as ssh_config:
lines = lines[:start] + lines[end + 2 :]
ssh_config.writelines(lines)
except OSError:
# If we fail deleting a file it doesn't matter that much
# since we're just in a container
pass