From 8feac931c6bea9f9ad8a3a4f23c0fc3538fe9476 Mon Sep 17 00:00:00 2001 From: Marc Aymerich Date: Tue, 18 Nov 2014 17:47:26 +0000 Subject: [PATCH] Improvements on resource monitoring and asynchronous orchestration --- TODO.md | 6 ++++++ orchestra/apps/mailboxes/models.py | 4 +++- orchestra/apps/orchestration/README.md | 2 +- orchestra/apps/orchestration/backends.py | 4 ++-- orchestra/apps/orchestration/manager.py | 14 +++++--------- orchestra/apps/orchestration/methods.py | 13 +++++++------ orchestra/apps/orchestration/models.py | 4 ++-- orchestra/apps/resources/backends.py | 4 ++-- orchestra/apps/resources/tasks.py | 3 ++- orchestra/apps/systemusers/backends.py | 24 +++++++++++++++++++----- orchestra/apps/systemusers/models.py | 3 ++- 11 files changed, 51 insertions(+), 30 deletions(-) diff --git a/TODO.md b/TODO.md index 02e008fa..82ed92c6 100644 --- a/TODO.md +++ b/TODO.md @@ -178,3 +178,9 @@ Remember that, as always with QuerySets, any subsequent chained methods which im * Backendlog doesn't show during execution, transaction isolation or what? * Resource used_list_display=True, allocated_list_displat=True, allow resources to show up on list_display + + +* Move plugins back from apps to orchestra main app + + +* BackendLog.updated_at (tasks that run over several minutes when finished they do not appear first on the changelist) diff --git a/orchestra/apps/mailboxes/models.py b/orchestra/apps/mailboxes/models.py index 508484cc..798c194e 100644 --- a/orchestra/apps/mailboxes/models.py +++ b/orchestra/apps/mailboxes/models.py @@ -1,3 +1,5 @@ +import os + from django.contrib.auth.hashers import make_password from django.core.validators import RegexValidator, ValidationError from django.db import models @@ -52,7 +54,7 @@ class Mailbox(models.Model): 'name': self.name, 'username': self.name, } - return settings.MAILBOXES_HOME % context + return os.path.normpath(settings.MAILBOXES_HOME % context) def clean(self): if self.custom_filtering and self.filtering != self.CUSTOM: diff --git a/orchestra/apps/orchestration/README.md b/orchestra/apps/orchestration/README.md index da635563..4008a0b9 100644 --- a/orchestra/apps/orchestration/README.md +++ b/orchestra/apps/orchestration/README.md @@ -61,7 +61,7 @@ This strategy considers two different implementations: #### b. Synchronization Based Management -When Orchestra is the configuration **authority** and also _the responsible of applying the changes_ on the servers (**push** flow). The configuration files are **regenerated** every time by Orchestra, deleting any existing manual configuration. This model is very consistent since it only depends on the current state of the system (_stateless_). Therefore, it makes sense to execute the synchronization operation in **asynchronous** fashion. +When Orchestra is the configuration **authority** and also _the responsible of applying the changes_ on the servers (**push** flow). The configuration files are **regenerated** every time by Orchestra, deleting any existing manual configuration. This model is very consistent since it only depends on the current state of the system (_memoryless_). Therefore, it makes sense to execute the synchronization operation in **asynchronous** fashion. In contrast to registry based management, synchronization management is _fully centralized_, all the management operations are driven by Orchestra so you don't need to install nor configure anything on your servers. diff --git a/orchestra/apps/orchestration/backends.py b/orchestra/apps/orchestration/backends.py index effa64db..2a02fa49 100644 --- a/orchestra/apps/orchestration/backends.py +++ b/orchestra/apps/orchestration/backends.py @@ -95,12 +95,12 @@ class ServiceBackend(plugins.Plugin): time = timezone.now().strftime("%h %d, %Y %I:%M:%S") return "Generated by Orchestra at %s" % time - def execute(self, server): + def execute(self, server, async=False): from .models import BackendLog state = BackendLog.STARTED if self.cmds else BackendLog.SUCCESS log = BackendLog.objects.create(backend=self.get_name(), state=state, server=server) for method, cmds in self.cmds: - method(log, server, cmds) + method(log, server, cmds, async) if log.state != BackendLog.SUCCESS: break return log diff --git a/orchestra/apps/orchestration/manager.py b/orchestra/apps/orchestration/manager.py index 86284f74..3451ab37 100644 --- a/orchestra/apps/orchestration/manager.py +++ b/orchestra/apps/orchestration/manager.py @@ -14,13 +14,9 @@ logger = logging.getLogger(__name__) def as_task(execute): def wrapper(*args, **kwargs): - """ failures on the backend execution doesn't fuck the request transaction atomicity """ - db.transaction.set_autocommit(False) - try: - log = execute(*args, **kwargs) - finally: - db.transaction.commit() - db.transaction.set_autocommit(True) + """ send report """ + # Tasks run on a separate transaction pool (thread), no need to temper with the transaction + log = execute(*args, **kwargs) if log.state != log.SUCCESS: send_report(execute, args, log) return log @@ -43,7 +39,7 @@ def close_connection(execute): return wrapper -def execute(operations): +def execute(operations, async=False): """ generates and executes the operations on the servers """ router = import_class(settings.ORCHESTRATION_ROUTER) scripts = {} @@ -71,7 +67,7 @@ def execute(operations): backend.commit() execute = as_task(backend.execute) execute = close_connection(execute) - thread = threading.Thread(target=execute, args=(server,)) + thread = threading.Thread(target=execute, args=(server,), kwargs={'async': async}) thread.start() threads.append(thread) executions.append((execute, operations)) diff --git a/orchestra/apps/orchestration/methods.py b/orchestra/apps/orchestration/methods.py index ec7b3fab..637d6ec7 100644 --- a/orchestra/apps/orchestration/methods.py +++ b/orchestra/apps/orchestration/methods.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) transports = {} -def BashSSH(backend, log, server, cmds): +def BashSSH(backend, log, server, cmds, async=False): script = '\n'.join(['set -e', 'set -o pipefail'] + cmds + ['exit 0']) script = script.replace('\r', '') digest = hashlib.md5(script).hexdigest() @@ -75,10 +75,7 @@ def BashSSH(backend, log, server, cmds): # Log results logger.debug('%s running on %s' % (backend, server)) - if True: # TODO if not async - log.stdout += channel.makefile('rb', -1).read().decode('utf-8') - log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8') - else: + if async: while True: # Non-blocking is the secret ingridient in the async sauce select.select([channel], [], []) @@ -89,6 +86,10 @@ def BashSSH(backend, log, server, cmds): log.save(update_fields=['stdout', 'stderr']) if channel.exit_status_ready(): break + else: + log.stdout += channel.makefile('rb', -1).read().decode('utf-8') + log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8') + log.exit_code = exit_code = channel.recv_exit_status() log.state = log.SUCCESS if exit_code == 0 else log.FAILURE logger.debug('%s execution state on %s is %s' % (backend, server, log.state)) @@ -106,7 +107,7 @@ def BashSSH(backend, log, server, cmds): ssh.close() -def Python(backend, log, server, cmds): +def Python(backend, log, server, cmds, async=False): script = [ str(cmd.func.func_name) + str(cmd.args) for cmd in cmds ] script = json.dumps(script, indent=4).replace('"', '') log.script = '\n'.join([log.script, script]) diff --git a/orchestra/apps/orchestration/models.py b/orchestra/apps/orchestration/models.py index 0c461ed8..d34293de 100644 --- a/orchestra/apps/orchestration/models.py +++ b/orchestra/apps/orchestration/models.py @@ -138,8 +138,8 @@ class BackendOperation(models.Model): return op @classmethod - def execute(cls, operations): - return manager.execute(operations) + def execute(cls, operations, async=False): + return manager.execute(operations, async=async) @classmethod def execute_action(cls, instance, action): diff --git a/orchestra/apps/resources/backends.py b/orchestra/apps/resources/backends.py index ba5bb77e..04f70409 100644 --- a/orchestra/apps/resources/backends.py +++ b/orchestra/apps/resources/backends.py @@ -68,7 +68,7 @@ class ServiceMonitor(ServiceBackend): MonitorData.objects.create(monitor=name, object_id=object_id, content_type=ct, value=value, created_at=self.current_date) - def execute(self, server): - log = super(ServiceMonitor, self).execute(server) + def execute(self, server, async=False): + log = super(ServiceMonitor, self).execute(server, async=async) self.store(log) return log diff --git a/orchestra/apps/resources/tasks.py b/orchestra/apps/resources/tasks.py index 6c650a63..2f1b2de5 100644 --- a/orchestra/apps/resources/tasks.py +++ b/orchestra/apps/resources/tasks.py @@ -27,7 +27,8 @@ def monitor(resource_id, ids=None): # Execute monitor for obj in model.objects.filter(**kwargs): operations.append(Operation.create(backend, obj, Operation.MONITOR)) - Operation.execute(operations) + # TODO async=TRue only when running with celery + Operation.execute(operations, async=True) kwargs = {'id__in': ids} if ids else {} # Update used resources and trigger resource exceeded and revovery diff --git a/orchestra/apps/systemusers/backends.py b/orchestra/apps/systemusers/backends.py index c150d721..527a579b 100644 --- a/orchestra/apps/systemusers/backends.py +++ b/orchestra/apps/systemusers/backends.py @@ -58,6 +58,7 @@ class SystemUserBackend(ServiceController): def get_context(self, user): context = { + 'object_id': user.pk, 'username': user.username, 'password': user.password if user.active else '*%s' % user.password, 'shell': user.shell, @@ -70,16 +71,29 @@ class SystemUserBackend(ServiceController): class SystemUserDisk(ServiceMonitor): model = 'systemusers.SystemUser' resource = ServiceMonitor.DISK - verbose_name = _('Main user disk') + verbose_name = _('Systemuser disk') + + def prepare(self): + """ slower """ + self.append(textwrap.dedent("""\ + function monitor () { + { du -bs "$1" || echo 0; } | awk {'print $1'} + }""" + )) def monitor(self, user): context = self.get_context(user) - self.append("du -s %(home)s | cut -f1 | xargs echo %(object_id)s" % context) + if user.is_main or os.path.normpath(user.home) == user.get_base_home(): + self.append("echo %(object_id)s $(monitor %(home)s)" % context) + else: + # Home appears to be included in other user home + self.append("echo %(object_id)s 0" % context) def get_context(self, user): - context = SystemUserBackend().get_context(user) - context['object_id'] = user.pk - return context + return { + 'object_id': user.pk, + 'home': user.home, + } class FTPTraffic(ServiceMonitor): diff --git a/orchestra/apps/systemusers/models.py b/orchestra/apps/systemusers/models.py index 5f03532c..dd0e8de7 100644 --- a/orchestra/apps/systemusers/models.py +++ b/orchestra/apps/systemusers/models.py @@ -75,6 +75,7 @@ class SystemUser(models.Model): super(SystemUser, self).save(*args, **kwargs) def clean(self): + self.home = os.path.normpath(self.home) if self.directory: directory_error = None if self.has_shell: @@ -118,7 +119,7 @@ class SystemUser(models.Model): context = { 'username': self.username, } - return settings.SYSTEMUSERS_HOME % context + return os.path.normpath(settings.SYSTEMUSERS_HOME % context) def get_home(self): return os.path.join(