django-orchestra/orchestra/contrib/orchestration/methods.py

186 lines
6.7 KiB
Python
Raw Normal View History

2015-07-09 13:04:26 +00:00
import inspect
2014-10-04 09:29:18 +00:00
import logging
2014-05-08 16:59:35 +00:00
import socket
import sys
import select
2015-07-09 13:04:26 +00:00
import textwrap
2014-05-08 16:59:35 +00:00
from celery.datastructures import ExceptionInfo
from orchestra.utils.sys import sshrun
from orchestra.utils.python import CaptureStdout, import_class
2015-03-23 15:36:51 +00:00
2014-05-08 16:59:35 +00:00
from . import settings
2014-10-04 09:29:18 +00:00
logger = logging.getLogger(__name__)
2014-10-04 13:23:04 +00:00
2015-07-10 13:00:51 +00:00
def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}):
"""
Executes cmds to remote server using Pramaiko
"""
2015-05-11 14:05:39 +00:00
import paramiko
script = '\n'.join(cmds)
2014-05-08 16:59:35 +00:00
script = script.replace('\r', '')
2015-05-06 14:39:25 +00:00
log.state = log.STARTED
log.script = script
2015-05-11 14:05:39 +00:00
log.save(update_fields=('script', 'state', 'updated_at'))
if not cmds:
return
2014-10-15 12:47:28 +00:00
channel = None
ssh = None
2014-05-08 16:59:35 +00:00
try:
addr = server.get_address()
# ssh connection
ssh = paramiko_connections.get(addr)
if not ssh:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
key = settings.ORCHESTRATION_SSH_KEY_PATH
try:
ssh.connect(addr, username='root', key_filename=key)
except socket.error as e:
logger.error('%s timed out on %s' % (backend, addr))
log.state = log.TIMEOUT
log.stderr = str(e)
2015-05-11 14:05:39 +00:00
log.save(update_fields=('state', 'stderr', 'updated_at'))
return
paramiko_connections[addr] = ssh
2014-05-08 16:59:35 +00:00
transport = ssh.get_transport()
2014-10-02 15:58:27 +00:00
channel = transport.open_session()
channel.exec_command(backend.script_executable)
channel.sendall(script)
channel.shutdown_write()
2014-10-04 13:23:04 +00:00
# Log results
2014-10-04 09:29:18 +00:00
logger.debug('%s running on %s' % (backend, server))
if async:
2015-04-03 10:14:45 +00:00
second = False
2014-05-08 16:59:35 +00:00
while True:
# Non-blocking is the secret ingridient in the async sauce
select.select([channel], [], [])
if channel.recv_ready():
2015-04-03 10:14:45 +00:00
part = channel.recv(1024).decode('utf-8')
while part:
log.stdout += part
2015-04-03 10:14:45 +00:00
part = channel.recv(1024).decode('utf-8')
2014-05-08 16:59:35 +00:00
if channel.recv_stderr_ready():
2015-04-03 10:14:45 +00:00
part = channel.recv_stderr(1024).decode('utf-8')
while part:
log.stderr += part
2015-04-03 10:14:45 +00:00
part = channel.recv_stderr(1024).decode('utf-8')
2015-05-11 14:05:39 +00:00
log.save(update_fields=('stdout', 'stderr', 'updated_at'))
2014-05-08 16:59:35 +00:00
if channel.exit_status_ready():
2015-04-03 10:14:45 +00:00
if second:
break
second = True
else:
log.stdout += channel.makefile('rb', -1).read().decode('utf-8')
log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8')
log.exit_code = channel.recv_exit_status()
log.state = log.SUCCESS if log.exit_code == 0 else log.FAILURE
2014-10-04 09:29:18 +00:00
logger.debug('%s execution state on %s is %s' % (backend, server, log.state))
2014-05-08 16:59:35 +00:00
log.save()
except:
log.state = log.ERROR
2014-05-08 16:59:35 +00:00
log.traceback = ExceptionInfo(sys.exc_info()).traceback
2014-10-04 13:23:04 +00:00
logger.error('Exception while executing %s on %s' % (backend, server))
logger.debug(log.traceback)
2014-05-08 16:59:35 +00:00
log.save()
2014-10-04 13:23:04 +00:00
finally:
if log.state == log.STARTED:
log.state = log.ABORTED
2015-05-11 14:05:39 +00:00
log.save(update_fields=('state', 'updated_at'))
2014-10-15 12:47:28 +00:00
if channel is not None:
channel.close()
def OpenSSH(backend, log, server, cmds, async=False):
"""
Executes cmds to remote server using SSH with connection resuse for maximum performance
"""
script = '\n'.join(cmds)
script = script.replace('\r', '')
log.state = log.STARTED
2015-10-08 13:54:39 +00:00
log.script = '\n'.join((log.script, script))
2015-05-11 14:05:39 +00:00
log.save(update_fields=('script', 'state', 'updated_at'))
if not cmds:
return
try:
ssh = sshrun(server.get_address(), script, executable=backend.script_executable,
2015-05-11 14:05:39 +00:00
persist=True, async=async, silent=True)
logger.debug('%s running on %s' % (backend, server))
if async:
for state in ssh:
log.stdout += state.stdout.decode('utf8')
log.stderr += state.stderr.decode('utf8')
2015-05-11 14:05:39 +00:00
log.save(update_fields=('stdout', 'stderr', 'updated_at'))
2015-10-08 13:54:39 +00:00
exit_code = state.exit_code
else:
2015-10-08 13:54:39 +00:00
log.stdout += ssh.stdout.decode('utf8')
log.stderr += ssh.stderr.decode('utf8')
exit_code = ssh.exit_code
if not log.exit_code:
log.exit_code = exit_code
2016-02-11 14:24:09 +00:00
if exit_code == 255 and log.stderr.startswith('ssh: connect to host'):
log.state = log.TIMEOUT
else:
log.state = log.SUCCESS if exit_code == 0 else log.FAILURE
logger.debug('%s execution state on %s is %s' % (backend, server, log.state))
log.save()
except:
log.state = log.ERROR
log.traceback = ExceptionInfo(sys.exc_info()).traceback
logger.error('Exception while executing %s on %s' % (backend, server))
logger.debug(log.traceback)
log.save()
finally:
if log.state == log.STARTED:
log.state = log.ABORTED
2015-05-11 14:05:39 +00:00
log.save(update_fields=('state', 'updated_at'))
def SSH(*args, **kwargs):
""" facade function enabling to chose between multiple SSH backends"""
method = import_class(settings.ORCHESTRATION_SSH_METHOD_BACKEND)
return method(*args, **kwargs)
2014-05-08 16:59:35 +00:00
def Python(backend, log, server, cmds, async=False):
2015-07-09 13:04:26 +00:00
script = ''
functions = set()
for cmd in cmds:
if cmd.func not in functions:
functions.add(cmd.func)
script += textwrap.dedent(''.join(inspect.getsourcelines(cmd.func)[0]))
script += '\n'
2015-07-09 13:04:26 +00:00
for cmd in cmds:
2015-10-05 13:31:08 +00:00
script += '# %s %s\n' % (cmd.func.__name__, cmd.args)
2015-05-06 14:39:25 +00:00
log.state = log.STARTED
2015-05-11 14:05:39 +00:00
log.script = '\n'.join((log.script, script))
log.save(update_fields=('script', 'state', 'updated_at'))
2015-07-09 13:04:26 +00:00
stdout = ''
2014-05-08 16:59:35 +00:00
try:
for cmd in cmds:
2015-03-23 15:36:51 +00:00
with CaptureStdout() as stdout:
result = cmd(server)
for line in stdout:
2015-04-02 16:14:55 +00:00
log.stdout += line + '\n'
2015-10-05 13:31:08 +00:00
if result:
log.stdout += '# Result: %s\n' % result
if async:
2015-05-11 14:05:39 +00:00
log.save(update_fields=('stdout', 'updated_at'))
2014-05-08 16:59:35 +00:00
except:
log.exit_code = 1
log.state = log.FAILURE
2015-10-08 13:54:39 +00:00
log.stdout += '\n'.join(stdout)
log.traceback += ExceptionInfo(sys.exc_info()).traceback
2015-03-23 15:36:51 +00:00
logger.error('Exception while executing %s on %s' % (backend, server))
2014-05-08 16:59:35 +00:00
else:
2015-10-08 13:54:39 +00:00
if not log.exit_code:
log.exit_code = 0
log.state = log.SUCCESS
2015-03-23 15:36:51 +00:00
logger.debug('%s execution state on %s is %s' % (backend, server, log.state))
2014-05-08 16:59:35 +00:00
log.save()