From 2877f64d9db25d64a47fc2e07225e4340e73e002 Mon Sep 17 00:00:00 2001 From: Marc Aymerich Date: Sun, 10 May 2015 17:33:36 +0000 Subject: [PATCH] Improved performance of SSH with ControlPersist --- TODO.md | 5 +- .../management/commands/orchestrate.py | 2 +- orchestra/contrib/orchestration/manager.py | 9 +- orchestra/contrib/orchestration/methods.py | 133 ++++++++++-------- orchestra/contrib/orchestration/models.py | 1 - orchestra/contrib/orchestration/settings.py | 16 ++- orchestra/contrib/resources/backends.py | 2 + orchestra/utils/sys.py | 22 ++- 8 files changed, 113 insertions(+), 77 deletions(-) diff --git a/TODO.md b/TODO.md index 78d2cfdd..59614868 100644 --- a/TODO.md +++ b/TODO.md @@ -343,7 +343,6 @@ TODO mount the filesystem with "nosuid" option # virtdomains file is not ideal, prevent fake/error on domains there! and make sure this file is required! # Deprecate restart/start/stop services (do touch wsgi.py and fuck celery) -# orchestrate async stdout stderr (inspired on pangea managemengt commands) orchestra-beat support for uwsgi cron make django admin taskstate uncollapse fucking traceback, ( if exists ?) @@ -356,5 +355,7 @@ resorce monitoring more efficient, less mem an better queries for calc current d # best_price rating method +# paramiko arcfour cypher -# error reporting on periodic tasks +ciphers=['arcfour128', 'aes256'] +http://paramiko-docs.readthedocs.org/en/latest/api/transport.html diff --git a/orchestra/contrib/orchestration/management/commands/orchestrate.py b/orchestra/contrib/orchestration/management/commands/orchestrate.py index 3eb204d6..33cf0d0b 100644 --- a/orchestra/contrib/orchestration/management/commands/orchestrate.py +++ b/orchestra/contrib/orchestration/management/commands/orchestrate.py @@ -113,6 +113,6 @@ class Command(BaseCommand): stderr = cstderr if log.has_finished: running.remove(log) - time.sleep(0.1) + time.sleep(0.05) for log in logs: self.stdout.write(' '.join((log.backend, log.state))) diff --git a/orchestra/contrib/orchestration/manager.py b/orchestra/contrib/orchestration/manager.py index fc980c16..3136353e 100644 --- a/orchestra/contrib/orchestration/manager.py +++ b/orchestra/contrib/orchestration/manager.py @@ -25,6 +25,7 @@ def keep_log(execute, log, operations): """ send report """ # Remember that threads have their oun connection poll # No need to EVER temper with the transaction here + log = kwargs['log'] try: log = execute(*args, **kwargs) if log.state != log.SUCCESS: @@ -116,11 +117,11 @@ def execute(scripts, serialize=False, async=None): backend, operations = value args = (route.host,) if async is None: - async = not serialize and route.async + is_async = not serialize and route.async else: - async = not serialize and async + is_async = not serialize and async kwargs = { - 'async': async, + 'async': is_async, } # we clone the connection just in case we are isolated inside a transaction with db.clone(model=BackendLog) as handle: @@ -136,7 +137,7 @@ def execute(scripts, serialize=False, async=None): task = db.close_connection(task) thread = threading.Thread(target=task, args=args, kwargs=kwargs) thread.start() - if not async: + if not is_async: threads_to_join.append(thread) logs.append(log) [ thread.join() for thread in threads_to_join ] diff --git a/orchestra/contrib/orchestration/methods.py b/orchestra/contrib/orchestration/methods.py index 6141dcb5..938db0e5 100644 --- a/orchestra/contrib/orchestration/methods.py +++ b/orchestra/contrib/orchestration/methods.py @@ -10,82 +10,52 @@ import paramiko from celery.datastructures import ExceptionInfo from django.conf import settings as djsettings -from orchestra.utils.python import CaptureStdout +from orchestra.utils.sys import sshrun +from orchestra.utils.python import CaptureStdout, import_class from . import settings logger = logging.getLogger(__name__) -transports = {} +paramiko_connections = {} -def SSH(backend, log, server, cmds, async=False): +def Paramiko(backend, log, server, cmds, async=False): """ - Executes cmds to remote server using SSH - - The script is first copied using SCP in order to overflood the channel with large scripts - Then the script is executed using the defined backend.script_executable + Executes cmds to remote server using Pramaiko """ script = '\n'.join(cmds) script = script.replace('\r', '') - bscript = script.encode('utf-8') - digest = hashlib.md5(bscript).hexdigest() - path = os.path.join(settings.ORCHESTRATION_TEMP_SCRIPT_DIR, digest) - remote_path = "%s.remote" % path - # Ensure unique local paths for each file because of problems when os.remove(path) - path += '@%s' % str(server) log.state = log.STARTED - log.script = '# %s\n%s' % (remote_path, script) + log.script = script log.save(update_fields=('script', 'state')) if not cmds: return channel = None ssh = None try: - # Avoid "Argument list too long" on large scripts by genereting a file - # and scping it to the remote server - with os.fdopen(os.open(path, os.O_WRONLY | os.O_CREAT, 0o600), 'wb') as handle: - handle.write(bscript) - - # ssh connection - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) addr = server.get_address() - key = settings.ORCHESTRATION_SSH_KEY_PATH - try: - ssh.connect(addr, username='root', key_filename=key, timeout=10) - except socket.error as e: - logger.error('%s timed out on %s' % (backend, addr)) - log.state = log.TIMEOUT - log.stderr = str(e) - log.save(update_fields=['state', 'stderr']) - return + # ssh connection + ssh = paramiko_connections.get(addr) + if not ssh: + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + key = settings.ORCHESTRATION_SSH_KEY_PATH + try: + ssh.connect(addr, username='root', key_filename=key) + except socket.error as e: + logger.error('%s timed out on %s' % (backend, addr)) + log.state = log.TIMEOUT + log.stderr = str(e) + log.save(update_fields=['state', 'stderr']) + return + paramiko_connections[addr] = ssh transport = ssh.get_transport() - - # Copy script to remote server - sftp = paramiko.SFTPClient.from_transport(transport) - sftp.put(path, remote_path) - sftp.chmod(remote_path, 0o600) - sftp.close() - os.remove(path) - - # Execute it - context = { - 'executable': backend.script_executable, - 'remote_path': remote_path, - 'digest': digest, - 'remove': '' if djsettings.DEBUG else "rm -fr %(remote_path)s\n", - } - cmd = ( - "[[ $(md5sum %(remote_path)s|awk {'print $1'}) == %(digest)s ]] && %(executable)s %(remote_path)s\n" - "RETURN_CODE=$?\n" - "%(remove)s" - "exit $RETURN_CODE" % context - ) channel = transport.open_session() - channel.exec_command(cmd) - + channel.exec_command(backend.script_executable) + channel.sendall(script) + channel.shutdown_write() # Log results logger.debug('%s running on %s' % (backend, server)) if async: @@ -112,8 +82,8 @@ def SSH(backend, log, server, cmds, async=False): log.stdout += channel.makefile('rb', -1).read().decode('utf-8') log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8') - log.exit_code = exit_code = channel.recv_exit_status() - log.state = log.SUCCESS if exit_code == 0 else log.FAILURE + log.exit_code = channel.recv_exit_status() + log.state = log.SUCCESS if log.exit_code == 0 else log.FAILURE logger.debug('%s execution state on %s is %s' % (backend, server, log.state)) log.save() except: @@ -128,8 +98,55 @@ def SSH(backend, log, server, cmds, async=False): log.save(update_fields=['state']) if channel is not None: channel.close() - if ssh is not None: - ssh.close() + + +def OpenSSH(backend, log, server, cmds, async=False): + """ + Executes cmds to remote server using SSH with connection resuse for maximum performance + """ + script = '\n'.join(cmds) + script = script.replace('\r', '') + log.state = log.STARTED + log.script = script + log.save(update_fields=('script', 'state')) + if not cmds: + return + channel = None + ssh = None + try: + ssh = sshrun(server.get_address(), script, executable=backend.script_executable, + persist=True, async=async) + logger.debug('%s running on %s' % (backend, server)) + if async: + second = False + for state in ssh: + log.stdout += state.stdout.decode('utf8') + log.stderr += state.stderr.decode('utf8') + log.save() + log.exit_code = state.exit_code + else: + log.stdout = ssh.stdout + log.stderr = ssh.stderr + log.exit_code = ssh.exit_code + log.state = log.SUCCESS if log.exit_code == 0 else log.FAILURE + logger.debug('%s execution state on %s is %s' % (backend, server, log.state)) + log.save() + except: + log.state = log.ERROR + log.traceback = ExceptionInfo(sys.exc_info()).traceback + logger.error('Exception while executing %s on %s' % (backend, server)) + logger.debug(log.traceback) + log.save() + finally: + if log.state == log.STARTED: + log.state = log.ABORTED + log.save(update_fields=['state']) + + +def SSH(*args, **kwargs): + """ facade function enabling to chose between multiple SSH backends""" + method = import_class(settings.ORCHESTRATION_SSH_METHOD_BACKEND) + return method(*args, **kwargs) def Python(backend, log, server, cmds, async=False): diff --git a/orchestra/contrib/orchestration/models.py b/orchestra/contrib/orchestration/models.py index b8281b78..a5c735d9 100644 --- a/orchestra/contrib/orchestration/models.py +++ b/orchestra/contrib/orchestration/models.py @@ -98,7 +98,6 @@ class BackendLog(models.Model): def backend_class(self): return ServiceBackend.get_backend(self.backend) - class BackendOperation(models.Model): diff --git a/orchestra/contrib/orchestration/settings.py b/orchestra/contrib/orchestration/settings.py index cea71474..7539dc8b 100644 --- a/orchestra/contrib/orchestration/settings.py +++ b/orchestra/contrib/orchestration/settings.py @@ -1,5 +1,7 @@ from os import path +from django.utils.translation import ugettext_lazy as _ + from orchestra.contrib.settings import Setting @@ -28,10 +30,6 @@ ORCHESTRATION_ROUTER = Setting('ORCHESTRATION_ROUTER', ) -ORCHESTRATION_TEMP_SCRIPT_DIR = Setting('ORCHESTRATION_TEMP_SCRIPT_DIR', - '/dev/shm' -) - ORCHESTRATION_DISABLE_EXECUTION = Setting('ORCHESTRATION_DISABLE_EXECUTION', False @@ -41,3 +39,13 @@ ORCHESTRATION_DISABLE_EXECUTION = Setting('ORCHESTRATION_DISABLE_EXECUTION', ORCHESTRATION_BACKEND_CLEANUP_DAYS = Setting('ORCHESTRATION_BACKEND_CLEANUP_DAYS', 7 ) + + +ORCHESTRATION_SSH_METHOD_BACKEND = Setting('ORCHESTRATION_SSH_METHOD_BACKEND', + 'orchestra.contrib.orchestration.methods.OpenSSH', + help_text=_("Two methods provided:
" + "orchestra.contrib.orchestration.methods.OpenSSH with ControlPersist.
" + "orchestra.contrib.orchestration.methods.Paramiko with connection pool.
" + "Both perform similarly, but OpenSSH has the advantage that the connections are shared between workers,
" + "Paramiko, in contrast, has a per worker connection pool.") +) diff --git a/orchestra/contrib/resources/backends.py b/orchestra/contrib/resources/backends.py index 41dd3f39..644e5c3e 100644 --- a/orchestra/contrib/resources/backends.py +++ b/orchestra/contrib/resources/backends.py @@ -69,6 +69,8 @@ class ServiceMonitor(ServiceBackend): except ValueError: cls_name = self.__class__.__name__ raise ValueError("%s expected ' ' got '%s'" % (cls_name, line)) + if isinstance(value, bytes): + value = value.decode('ascii') MonitorData.objects.create(monitor=name, object_id=object_id, content_type=ct, value=value, created_at=self.current_date) diff --git a/orchestra/utils/sys.py b/orchestra/utils/sys.py index 1865f4e9..abaef876 100644 --- a/orchestra/utils/sys.py +++ b/orchestra/utils/sys.py @@ -102,6 +102,7 @@ def runiterator(command, display=False, stdin=b''): p.stderr.close() raise StopIteration + def join(iterator, display=False, silent=False, valid_codes=(0,)): """ joins the iterator process """ stdout = b'' @@ -136,13 +137,20 @@ def run(command, display=False, valid_codes=(0,), silent=False, stdin=b'', async next(iterator) if async: return iterator - return join(iterator, display=display, silent=silent, valid_codes=valie_codes) + return join(iterator, display=display, silent=silent, valid_codes=valid_codes) -def sshrun(addr, command, *args, **kwargs): - command = command.replace("'", """'"'"'""") - cmd = "ssh -o stricthostkeychecking=no -C root@%s '%s'" % (addr, command) - return run(cmd, *args, **kwargs) +def sshrun(addr, command, *args, executable='bash', persist=False, **kwargs): + options = ['stricthostkeychecking=no'] + if persist: + options.extend(( + 'ControlMaster=auto', + 'ControlPersist=yes', + 'ControlPath=~/.ssh/orchestra-%r-%h-%p', + )) + cmd = 'ssh -o {options} -C root@{addr} {executable}'.format(options=' -o '.join(options), + addr=addr, executable=executable) + return run(cmd, *args, stdin=command.encode('utf8'), **kwargs) def get_default_celeryd_username(): @@ -202,6 +210,6 @@ class LockFile(object): self.release() -def touch_wsgi(): +def touch_wsgi(delay=5): from . import paths - run('{ sleep 2 && touch %s/wsgi.py; } &' % paths.get_project_dir(), async=True) + run('{ sleep %i && touch %s/wsgi.py; } &' % (delay, paths.get_project_dir()), async=True)