django-orchestra/orchestra/apps/orchestration/methods.py

128 lines
4.3 KiB
Python
Raw Normal View History

2014-05-08 16:59:35 +00:00
import hashlib
import json
2014-10-04 09:29:18 +00:00
import logging
2014-05-08 16:59:35 +00:00
import os
import socket
import sys
import select
import paramiko
from celery.datastructures import ExceptionInfo
2014-10-27 15:15:22 +00:00
from django.conf import settings as djsettings
2014-05-08 16:59:35 +00:00
from . import settings
2014-10-04 09:29:18 +00:00
logger = logging.getLogger(__name__)
2014-10-04 13:23:04 +00:00
transports = {}
2014-10-04 09:29:18 +00:00
2014-05-08 16:59:35 +00:00
def BashSSH(backend, log, server, cmds):
2014-09-10 16:53:09 +00:00
script = '\n'.join(['set -e', 'set -o pipefail'] + cmds + ['exit 0'])
2014-05-08 16:59:35 +00:00
script = script.replace('\r', '')
2014-10-17 10:04:47 +00:00
digest = hashlib.md5(script).hexdigest()
path = os.path.join(settings.ORCHESTRATION_TEMP_SCRIPT_PATH, digest)
remote_path = "%s.remote" % path
log.script = '# %s\n%s' % (remote_path, script)
log.save(update_fields=['script'])
if not cmds:
return
2014-10-15 12:47:28 +00:00
channel = None
ssh = None
2014-05-08 16:59:35 +00:00
try:
2014-10-17 10:04:47 +00:00
logger.debug('%s is going to be executed on %s' % (backend, server))
2014-07-14 14:56:48 +00:00
# Avoid "Argument list too long" on large scripts by genereting a file
# and scping it to the remote server
2014-11-02 14:33:55 +00:00
with os.fdopen(os.open(path, os.O_WRONLY | os.O_CREAT, 0600), 'w') as handle:
handle.write(script)
2014-10-04 13:23:04 +00:00
2014-05-08 16:59:35 +00:00
# ssh connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
addr = server.get_address()
try:
# TODO timeout
ssh.connect(addr, username='root', key_filename=settings.ORCHESTRATION_SSH_KEY_PATH, timeout=10)
2014-05-08 16:59:35 +00:00
except socket.error:
2014-10-04 09:29:18 +00:00
logger.error('%s timed out on %s' % (backend, server))
log.state = log.TIMEOUT
log.save(update_fields=['state'])
2014-05-08 16:59:35 +00:00
return
transport = ssh.get_transport()
2014-10-04 13:23:04 +00:00
# Copy script to remote server
2014-05-08 16:59:35 +00:00
sftp = paramiko.SFTPClient.from_transport(transport)
2014-10-17 10:04:47 +00:00
sftp.put(path, remote_path)
2014-11-02 14:41:50 +00:00
sftp.chmod(remote_path, 0600)
2014-05-08 16:59:35 +00:00
sftp.close()
os.remove(path)
2014-10-04 13:23:04 +00:00
# Execute it
2014-05-08 16:59:35 +00:00
context = {
2014-10-17 10:04:47 +00:00
'remote_path': remote_path,
2014-10-27 15:15:22 +00:00
'digest': digest,
'remove': '' if djsettings.DEBUG else "rm -fr %(remote_path)s\n",
2014-05-08 16:59:35 +00:00
}
cmd = (
2014-10-17 10:04:47 +00:00
"[[ $(md5sum %(remote_path)s|awk {'print $1'}) == %(digest)s ]] && bash %(remote_path)s\n"
2014-05-08 16:59:35 +00:00
"RETURN_CODE=$?\n"
2014-10-27 15:15:22 +00:00
"%(remove)s"
2014-05-08 16:59:35 +00:00
"exit $RETURN_CODE" % context
)
2014-10-02 15:58:27 +00:00
channel = transport.open_session()
2014-05-08 16:59:35 +00:00
channel.exec_command(cmd)
2014-10-04 13:23:04 +00:00
# Log results
2014-10-04 09:29:18 +00:00
logger.debug('%s running on %s' % (backend, server))
2014-05-08 16:59:35 +00:00
if True: # TODO if not async
log.stdout += channel.makefile('rb', -1).read().decode('utf-8')
log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8')
else:
while True:
# Non-blocking is the secret ingridient in the async sauce
select.select([channel], [], [])
if channel.recv_ready():
log.stdout += channel.recv(1024)
if channel.recv_stderr_ready():
log.stderr += channel.recv_stderr(1024)
log.save(update_fields=['stdout', 'stderr'])
2014-05-08 16:59:35 +00:00
if channel.exit_status_ready():
break
log.exit_code = exit_code = channel.recv_exit_status()
log.state = log.SUCCESS if exit_code == 0 else log.FAILURE
2014-10-04 09:29:18 +00:00
logger.debug('%s execution state on %s is %s' % (backend, server, log.state))
2014-05-08 16:59:35 +00:00
log.save()
except:
log.state = log.ERROR
2014-05-08 16:59:35 +00:00
log.traceback = ExceptionInfo(sys.exc_info()).traceback
2014-10-04 13:23:04 +00:00
logger.error('Exception while executing %s on %s' % (backend, server))
logger.debug(log.traceback)
2014-05-08 16:59:35 +00:00
log.save()
2014-10-04 13:23:04 +00:00
finally:
2014-10-15 12:47:28 +00:00
if channel is not None:
channel.close()
if ssh is not None:
ssh.close()
2014-05-08 16:59:35 +00:00
def Python(backend, log, server, cmds):
script = [ str(cmd.func.func_name) + str(cmd.args) for cmd in cmds ]
script = json.dumps(script, indent=4).replace('"', '')
log.script = '\n'.join([log.script, script])
log.save(update_fields=['script'])
2014-05-08 16:59:35 +00:00
stdout = ''
try:
for cmd in cmds:
result = cmd(server)
stdout += str(result)
except:
log.exit_code = 1
log.state = log.FAILURE
2014-05-08 16:59:35 +00:00
log.traceback = ExceptionInfo(sys.exc_info()).traceback
else:
log.exit_code = 0
log.state = log.SUCCESS
2014-05-08 16:59:35 +00:00
log.stdout += stdout
log.save()