99 lines
3.4 KiB
Python
99 lines
3.4 KiB
Python
"""Docker SSH helper"""
|
|
import os
|
|
from pathlib import Path
|
|
from tempfile import gettempdir
|
|
|
|
from docker.errors import DockerException
|
|
|
|
from authentik.crypto.models import CertificateKeyPair
|
|
|
|
HEADER = "### Managed by authentik"
|
|
FOOTER = "### End Managed by authentik"
|
|
|
|
|
|
def opener(path, flags):
|
|
"""File opener to create files as 700 perms"""
|
|
return os.open(path, flags, 0o700)
|
|
|
|
|
|
class SSHManagedExternallyException(DockerException):
|
|
"""Raised when the ssh config file is managed externally."""
|
|
|
|
|
|
class DockerInlineSSH:
|
|
"""Create paramiko ssh config from CertificateKeyPair"""
|
|
|
|
host: str
|
|
keypair: CertificateKeyPair
|
|
|
|
key_path: str
|
|
config_path: Path
|
|
header: str
|
|
|
|
def __init__(self, host: str, keypair: CertificateKeyPair) -> None:
|
|
self.host = host
|
|
self.keypair = keypair
|
|
self.config_path = Path("~/.ssh/config").expanduser()
|
|
if self.config_path.exists() and HEADER not in self.config_path.read_text(encoding="utf-8"):
|
|
# SSH Config file already exists and there's no header from us, meaning that it's
|
|
# been externally mapped into the container for more complex configs
|
|
raise SSHManagedExternallyException(
|
|
"SSH Config exists and does not contain authentik header"
|
|
)
|
|
if not self.keypair:
|
|
raise DockerException("keypair must be set for SSH connections")
|
|
self.header = f"{HEADER} - {self.host}\n"
|
|
|
|
def write_config(self, key_path: str) -> bool:
|
|
"""Update the local user's ssh config file"""
|
|
with open(self.config_path, "a+", encoding="utf-8") as ssh_config:
|
|
if self.header in ssh_config.readlines():
|
|
return False
|
|
ssh_config.writelines(
|
|
[
|
|
self.header,
|
|
f"Host {self.host}\n",
|
|
f" IdentityFile {key_path}\n",
|
|
" StrictHostKeyChecking No\n",
|
|
" UserKnownHostsFile /dev/null\n",
|
|
f"{FOOTER}\n",
|
|
"\n",
|
|
]
|
|
)
|
|
return True
|
|
|
|
def write_key(self):
|
|
"""Write keypair's private key to a temporary file"""
|
|
path = Path(gettempdir(), f"{self.keypair.pk}_private.pem")
|
|
with open(path, "w", encoding="utf8", opener=opener) as _file:
|
|
_file.write(self.keypair.key_data)
|
|
return str(path)
|
|
|
|
def write(self):
|
|
"""Write keyfile and update ssh config"""
|
|
self.key_path = self.write_key()
|
|
was_written = self.write_config(self.key_path)
|
|
if not was_written:
|
|
self.cleanup()
|
|
|
|
def cleanup(self):
|
|
"""Cleanup when we're done"""
|
|
try:
|
|
os.unlink(self.key_path)
|
|
with open(self.config_path, "r", encoding="utf-8") as ssh_config:
|
|
start = 0
|
|
end = 0
|
|
lines = ssh_config.readlines()
|
|
for idx, line in enumerate(lines):
|
|
if line == self.header:
|
|
start = idx
|
|
if start != 0 and line == f"{FOOTER}\n":
|
|
end = idx
|
|
with open(self.config_path, "w+", encoding="utf-8") as ssh_config:
|
|
lines = lines[:start] + lines[end + 2 :]
|
|
ssh_config.writelines(lines)
|
|
except OSError:
|
|
# If we fail deleting a file it doesn't matter that much
|
|
# since we're just in a container
|
|
pass
|