Improved performance of SSH with ControlPersist

This commit is contained in:
Marc Aymerich 2015-05-10 17:33:36 +00:00
parent c6c58d7d97
commit 2877f64d9d
8 changed files with 113 additions and 77 deletions

View File

@ -343,7 +343,6 @@ TODO mount the filesystem with "nosuid" option
# virtdomains file is not ideal, prevent fake/error on domains there! and make sure this file is required! # virtdomains file is not ideal, prevent fake/error on domains there! and make sure this file is required!
# Deprecate restart/start/stop services (do touch wsgi.py and fuck celery) # Deprecate restart/start/stop services (do touch wsgi.py and fuck celery)
# orchestrate async stdout stderr (inspired on pangea managemengt commands)
orchestra-beat support for uwsgi cron orchestra-beat support for uwsgi cron
make django admin taskstate uncollapse fucking traceback, ( if exists ?) make django admin taskstate uncollapse fucking traceback, ( if exists ?)
@ -356,5 +355,7 @@ resorce monitoring more efficient, less mem an better queries for calc current d
# best_price rating method # best_price rating method
# paramiko arcfour cypher
# error reporting on periodic tasks ciphers=['arcfour128', 'aes256']
http://paramiko-docs.readthedocs.org/en/latest/api/transport.html

View File

@ -113,6 +113,6 @@ class Command(BaseCommand):
stderr = cstderr stderr = cstderr
if log.has_finished: if log.has_finished:
running.remove(log) running.remove(log)
time.sleep(0.1) time.sleep(0.05)
for log in logs: for log in logs:
self.stdout.write(' '.join((log.backend, log.state))) self.stdout.write(' '.join((log.backend, log.state)))

View File

@ -25,6 +25,7 @@ def keep_log(execute, log, operations):
""" send report """ """ send report """
# Remember that threads have their oun connection poll # Remember that threads have their oun connection poll
# No need to EVER temper with the transaction here # No need to EVER temper with the transaction here
log = kwargs['log']
try: try:
log = execute(*args, **kwargs) log = execute(*args, **kwargs)
if log.state != log.SUCCESS: if log.state != log.SUCCESS:
@ -116,11 +117,11 @@ def execute(scripts, serialize=False, async=None):
backend, operations = value backend, operations = value
args = (route.host,) args = (route.host,)
if async is None: if async is None:
async = not serialize and route.async is_async = not serialize and route.async
else: else:
async = not serialize and async is_async = not serialize and async
kwargs = { kwargs = {
'async': async, 'async': is_async,
} }
# we clone the connection just in case we are isolated inside a transaction # we clone the connection just in case we are isolated inside a transaction
with db.clone(model=BackendLog) as handle: with db.clone(model=BackendLog) as handle:
@ -136,7 +137,7 @@ def execute(scripts, serialize=False, async=None):
task = db.close_connection(task) task = db.close_connection(task)
thread = threading.Thread(target=task, args=args, kwargs=kwargs) thread = threading.Thread(target=task, args=args, kwargs=kwargs)
thread.start() thread.start()
if not async: if not is_async:
threads_to_join.append(thread) threads_to_join.append(thread)
logs.append(log) logs.append(log)
[ thread.join() for thread in threads_to_join ] [ thread.join() for thread in threads_to_join ]

View File

@ -10,82 +10,52 @@ import paramiko
from celery.datastructures import ExceptionInfo from celery.datastructures import ExceptionInfo
from django.conf import settings as djsettings from django.conf import settings as djsettings
from orchestra.utils.python import CaptureStdout from orchestra.utils.sys import sshrun
from orchestra.utils.python import CaptureStdout, import_class
from . import settings from . import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
transports = {} paramiko_connections = {}
def SSH(backend, log, server, cmds, async=False): def Paramiko(backend, log, server, cmds, async=False):
""" """
Executes cmds to remote server using SSH Executes cmds to remote server using Pramaiko
The script is first copied using SCP in order to overflood the channel with large scripts
Then the script is executed using the defined backend.script_executable
""" """
script = '\n'.join(cmds) script = '\n'.join(cmds)
script = script.replace('\r', '') script = script.replace('\r', '')
bscript = script.encode('utf-8')
digest = hashlib.md5(bscript).hexdigest()
path = os.path.join(settings.ORCHESTRATION_TEMP_SCRIPT_DIR, digest)
remote_path = "%s.remote" % path
# Ensure unique local paths for each file because of problems when os.remove(path)
path += '@%s' % str(server)
log.state = log.STARTED log.state = log.STARTED
log.script = '# %s\n%s' % (remote_path, script) log.script = script
log.save(update_fields=('script', 'state')) log.save(update_fields=('script', 'state'))
if not cmds: if not cmds:
return return
channel = None channel = None
ssh = None ssh = None
try: try:
# Avoid "Argument list too long" on large scripts by genereting a file
# and scping it to the remote server
with os.fdopen(os.open(path, os.O_WRONLY | os.O_CREAT, 0o600), 'wb') as handle:
handle.write(bscript)
# ssh connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
addr = server.get_address() addr = server.get_address()
key = settings.ORCHESTRATION_SSH_KEY_PATH # ssh connection
try: ssh = paramiko_connections.get(addr)
ssh.connect(addr, username='root', key_filename=key, timeout=10) if not ssh:
except socket.error as e: ssh = paramiko.SSHClient()
logger.error('%s timed out on %s' % (backend, addr)) ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
log.state = log.TIMEOUT key = settings.ORCHESTRATION_SSH_KEY_PATH
log.stderr = str(e) try:
log.save(update_fields=['state', 'stderr']) ssh.connect(addr, username='root', key_filename=key)
return except socket.error as e:
logger.error('%s timed out on %s' % (backend, addr))
log.state = log.TIMEOUT
log.stderr = str(e)
log.save(update_fields=['state', 'stderr'])
return
paramiko_connections[addr] = ssh
transport = ssh.get_transport() transport = ssh.get_transport()
# Copy script to remote server
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put(path, remote_path)
sftp.chmod(remote_path, 0o600)
sftp.close()
os.remove(path)
# Execute it
context = {
'executable': backend.script_executable,
'remote_path': remote_path,
'digest': digest,
'remove': '' if djsettings.DEBUG else "rm -fr %(remote_path)s\n",
}
cmd = (
"[[ $(md5sum %(remote_path)s|awk {'print $1'}) == %(digest)s ]] && %(executable)s %(remote_path)s\n"
"RETURN_CODE=$?\n"
"%(remove)s"
"exit $RETURN_CODE" % context
)
channel = transport.open_session() channel = transport.open_session()
channel.exec_command(cmd) channel.exec_command(backend.script_executable)
channel.sendall(script)
channel.shutdown_write()
# Log results # Log results
logger.debug('%s running on %s' % (backend, server)) logger.debug('%s running on %s' % (backend, server))
if async: if async:
@ -112,8 +82,8 @@ def SSH(backend, log, server, cmds, async=False):
log.stdout += channel.makefile('rb', -1).read().decode('utf-8') log.stdout += channel.makefile('rb', -1).read().decode('utf-8')
log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8') log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8')
log.exit_code = exit_code = channel.recv_exit_status() log.exit_code = channel.recv_exit_status()
log.state = log.SUCCESS if exit_code == 0 else log.FAILURE log.state = log.SUCCESS if log.exit_code == 0 else log.FAILURE
logger.debug('%s execution state on %s is %s' % (backend, server, log.state)) logger.debug('%s execution state on %s is %s' % (backend, server, log.state))
log.save() log.save()
except: except:
@ -128,8 +98,55 @@ def SSH(backend, log, server, cmds, async=False):
log.save(update_fields=['state']) log.save(update_fields=['state'])
if channel is not None: if channel is not None:
channel.close() channel.close()
if ssh is not None:
ssh.close()
def OpenSSH(backend, log, server, cmds, async=False):
"""
Executes cmds to remote server using SSH with connection resuse for maximum performance
"""
script = '\n'.join(cmds)
script = script.replace('\r', '')
log.state = log.STARTED
log.script = script
log.save(update_fields=('script', 'state'))
if not cmds:
return
channel = None
ssh = None
try:
ssh = sshrun(server.get_address(), script, executable=backend.script_executable,
persist=True, async=async)
logger.debug('%s running on %s' % (backend, server))
if async:
second = False
for state in ssh:
log.stdout += state.stdout.decode('utf8')
log.stderr += state.stderr.decode('utf8')
log.save()
log.exit_code = state.exit_code
else:
log.stdout = ssh.stdout
log.stderr = ssh.stderr
log.exit_code = ssh.exit_code
log.state = log.SUCCESS if log.exit_code == 0 else log.FAILURE
logger.debug('%s execution state on %s is %s' % (backend, server, log.state))
log.save()
except:
log.state = log.ERROR
log.traceback = ExceptionInfo(sys.exc_info()).traceback
logger.error('Exception while executing %s on %s' % (backend, server))
logger.debug(log.traceback)
log.save()
finally:
if log.state == log.STARTED:
log.state = log.ABORTED
log.save(update_fields=['state'])
def SSH(*args, **kwargs):
""" facade function enabling to chose between multiple SSH backends"""
method = import_class(settings.ORCHESTRATION_SSH_METHOD_BACKEND)
return method(*args, **kwargs)
def Python(backend, log, server, cmds, async=False): def Python(backend, log, server, cmds, async=False):

View File

@ -98,7 +98,6 @@ class BackendLog(models.Model):
def backend_class(self): def backend_class(self):
return ServiceBackend.get_backend(self.backend) return ServiceBackend.get_backend(self.backend)
class BackendOperation(models.Model): class BackendOperation(models.Model):

View File

@ -1,5 +1,7 @@
from os import path from os import path
from django.utils.translation import ugettext_lazy as _
from orchestra.contrib.settings import Setting from orchestra.contrib.settings import Setting
@ -28,10 +30,6 @@ ORCHESTRATION_ROUTER = Setting('ORCHESTRATION_ROUTER',
) )
ORCHESTRATION_TEMP_SCRIPT_DIR = Setting('ORCHESTRATION_TEMP_SCRIPT_DIR',
'/dev/shm'
)
ORCHESTRATION_DISABLE_EXECUTION = Setting('ORCHESTRATION_DISABLE_EXECUTION', ORCHESTRATION_DISABLE_EXECUTION = Setting('ORCHESTRATION_DISABLE_EXECUTION',
False False
@ -41,3 +39,13 @@ ORCHESTRATION_DISABLE_EXECUTION = Setting('ORCHESTRATION_DISABLE_EXECUTION',
ORCHESTRATION_BACKEND_CLEANUP_DAYS = Setting('ORCHESTRATION_BACKEND_CLEANUP_DAYS', ORCHESTRATION_BACKEND_CLEANUP_DAYS = Setting('ORCHESTRATION_BACKEND_CLEANUP_DAYS',
7 7
) )
ORCHESTRATION_SSH_METHOD_BACKEND = Setting('ORCHESTRATION_SSH_METHOD_BACKEND',
'orchestra.contrib.orchestration.methods.OpenSSH',
help_text=_("Two methods provided:<br>"
"<tt>orchestra.contrib.orchestration.methods.OpenSSH</tt> with ControlPersist.<br>"
"<tt>orchestra.contrib.orchestration.methods.Paramiko</tt> with connection pool.<br>"
"Both perform similarly, but OpenSSH has the advantage that the connections are shared between workers,<br>"
"Paramiko, in contrast, has a per worker connection pool.")
)

View File

@ -69,6 +69,8 @@ class ServiceMonitor(ServiceBackend):
except ValueError: except ValueError:
cls_name = self.__class__.__name__ cls_name = self.__class__.__name__
raise ValueError("%s expected '<id> <value>' got '%s'" % (cls_name, line)) raise ValueError("%s expected '<id> <value>' got '%s'" % (cls_name, line))
if isinstance(value, bytes):
value = value.decode('ascii')
MonitorData.objects.create(monitor=name, object_id=object_id, MonitorData.objects.create(monitor=name, object_id=object_id,
content_type=ct, value=value, created_at=self.current_date) content_type=ct, value=value, created_at=self.current_date)

View File

@ -102,6 +102,7 @@ def runiterator(command, display=False, stdin=b''):
p.stderr.close() p.stderr.close()
raise StopIteration raise StopIteration
def join(iterator, display=False, silent=False, valid_codes=(0,)): def join(iterator, display=False, silent=False, valid_codes=(0,)):
""" joins the iterator process """ """ joins the iterator process """
stdout = b'' stdout = b''
@ -136,13 +137,20 @@ def run(command, display=False, valid_codes=(0,), silent=False, stdin=b'', async
next(iterator) next(iterator)
if async: if async:
return iterator return iterator
return join(iterator, display=display, silent=silent, valid_codes=valie_codes) return join(iterator, display=display, silent=silent, valid_codes=valid_codes)
def sshrun(addr, command, *args, **kwargs): def sshrun(addr, command, *args, executable='bash', persist=False, **kwargs):
command = command.replace("'", """'"'"'""") options = ['stricthostkeychecking=no']
cmd = "ssh -o stricthostkeychecking=no -C root@%s '%s'" % (addr, command) if persist:
return run(cmd, *args, **kwargs) options.extend((
'ControlMaster=auto',
'ControlPersist=yes',
'ControlPath=~/.ssh/orchestra-%r-%h-%p',
))
cmd = 'ssh -o {options} -C root@{addr} {executable}'.format(options=' -o '.join(options),
addr=addr, executable=executable)
return run(cmd, *args, stdin=command.encode('utf8'), **kwargs)
def get_default_celeryd_username(): def get_default_celeryd_username():
@ -202,6 +210,6 @@ class LockFile(object):
self.release() self.release()
def touch_wsgi(): def touch_wsgi(delay=5):
from . import paths from . import paths
run('{ sleep 2 && touch %s/wsgi.py; } &' % paths.get_project_dir(), async=True) run('{ sleep %i && touch %s/wsgi.py; } &' % (delay, paths.get_project_dir()), async=True)