Improvements on resource monitoring and asynchronous orchestration

This commit is contained in:
Marc Aymerich 2014-11-18 17:47:26 +00:00
parent 9b87ef5e0d
commit 8feac931c6
11 changed files with 51 additions and 30 deletions

View file

@ -178,3 +178,9 @@ Remember that, as always with QuerySets, any subsequent chained methods which im
* Backendlog doesn't show during execution, transaction isolation or what? * Backendlog doesn't show during execution, transaction isolation or what?
* Resource used_list_display=True, allocated_list_displat=True, allow resources to show up on list_display * Resource used_list_display=True, allocated_list_displat=True, allow resources to show up on list_display
* Move plugins back from apps to orchestra main app
* BackendLog.updated_at (tasks that run over several minutes when finished they do not appear first on the changelist)

View file

@ -1,3 +1,5 @@
import os
from django.contrib.auth.hashers import make_password from django.contrib.auth.hashers import make_password
from django.core.validators import RegexValidator, ValidationError from django.core.validators import RegexValidator, ValidationError
from django.db import models from django.db import models
@ -52,7 +54,7 @@ class Mailbox(models.Model):
'name': self.name, 'name': self.name,
'username': self.name, 'username': self.name,
} }
return settings.MAILBOXES_HOME % context return os.path.normpath(settings.MAILBOXES_HOME % context)
def clean(self): def clean(self):
if self.custom_filtering and self.filtering != self.CUSTOM: if self.custom_filtering and self.filtering != self.CUSTOM:

View file

@ -61,7 +61,7 @@ This strategy considers two different implementations:
#### b. Synchronization Based Management #### b. Synchronization Based Management
When Orchestra is the configuration **authority** and also _the responsible of applying the changes_ on the servers (**push** flow). The configuration files are **regenerated** every time by Orchestra, deleting any existing manual configuration. This model is very consistent since it only depends on the current state of the system (_stateless_). Therefore, it makes sense to execute the synchronization operation in **asynchronous** fashion. When Orchestra is the configuration **authority** and also _the responsible of applying the changes_ on the servers (**push** flow). The configuration files are **regenerated** every time by Orchestra, deleting any existing manual configuration. This model is very consistent since it only depends on the current state of the system (_memoryless_). Therefore, it makes sense to execute the synchronization operation in **asynchronous** fashion.
In contrast to registry based management, synchronization management is _fully centralized_, all the management operations are driven by Orchestra so you don't need to install nor configure anything on your servers. In contrast to registry based management, synchronization management is _fully centralized_, all the management operations are driven by Orchestra so you don't need to install nor configure anything on your servers.

View file

@ -95,12 +95,12 @@ class ServiceBackend(plugins.Plugin):
time = timezone.now().strftime("%h %d, %Y %I:%M:%S") time = timezone.now().strftime("%h %d, %Y %I:%M:%S")
return "Generated by Orchestra at %s" % time return "Generated by Orchestra at %s" % time
def execute(self, server): def execute(self, server, async=False):
from .models import BackendLog from .models import BackendLog
state = BackendLog.STARTED if self.cmds else BackendLog.SUCCESS state = BackendLog.STARTED if self.cmds else BackendLog.SUCCESS
log = BackendLog.objects.create(backend=self.get_name(), state=state, server=server) log = BackendLog.objects.create(backend=self.get_name(), state=state, server=server)
for method, cmds in self.cmds: for method, cmds in self.cmds:
method(log, server, cmds) method(log, server, cmds, async)
if log.state != BackendLog.SUCCESS: if log.state != BackendLog.SUCCESS:
break break
return log return log

View file

@ -14,13 +14,9 @@ logger = logging.getLogger(__name__)
def as_task(execute): def as_task(execute):
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
""" failures on the backend execution doesn't fuck the request transaction atomicity """ """ send report """
db.transaction.set_autocommit(False) # Tasks run on a separate transaction pool (thread), no need to temper with the transaction
try: log = execute(*args, **kwargs)
log = execute(*args, **kwargs)
finally:
db.transaction.commit()
db.transaction.set_autocommit(True)
if log.state != log.SUCCESS: if log.state != log.SUCCESS:
send_report(execute, args, log) send_report(execute, args, log)
return log return log
@ -43,7 +39,7 @@ def close_connection(execute):
return wrapper return wrapper
def execute(operations): def execute(operations, async=False):
""" generates and executes the operations on the servers """ """ generates and executes the operations on the servers """
router = import_class(settings.ORCHESTRATION_ROUTER) router = import_class(settings.ORCHESTRATION_ROUTER)
scripts = {} scripts = {}
@ -71,7 +67,7 @@ def execute(operations):
backend.commit() backend.commit()
execute = as_task(backend.execute) execute = as_task(backend.execute)
execute = close_connection(execute) execute = close_connection(execute)
thread = threading.Thread(target=execute, args=(server,)) thread = threading.Thread(target=execute, args=(server,), kwargs={'async': async})
thread.start() thread.start()
threads.append(thread) threads.append(thread)
executions.append((execute, operations)) executions.append((execute, operations))

View file

@ -18,7 +18,7 @@ logger = logging.getLogger(__name__)
transports = {} transports = {}
def BashSSH(backend, log, server, cmds): def BashSSH(backend, log, server, cmds, async=False):
script = '\n'.join(['set -e', 'set -o pipefail'] + cmds + ['exit 0']) script = '\n'.join(['set -e', 'set -o pipefail'] + cmds + ['exit 0'])
script = script.replace('\r', '') script = script.replace('\r', '')
digest = hashlib.md5(script).hexdigest() digest = hashlib.md5(script).hexdigest()
@ -75,10 +75,7 @@ def BashSSH(backend, log, server, cmds):
# Log results # Log results
logger.debug('%s running on %s' % (backend, server)) logger.debug('%s running on %s' % (backend, server))
if True: # TODO if not async if async:
log.stdout += channel.makefile('rb', -1).read().decode('utf-8')
log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8')
else:
while True: while True:
# Non-blocking is the secret ingridient in the async sauce # Non-blocking is the secret ingridient in the async sauce
select.select([channel], [], []) select.select([channel], [], [])
@ -89,6 +86,10 @@ def BashSSH(backend, log, server, cmds):
log.save(update_fields=['stdout', 'stderr']) log.save(update_fields=['stdout', 'stderr'])
if channel.exit_status_ready(): if channel.exit_status_ready():
break break
else:
log.stdout += channel.makefile('rb', -1).read().decode('utf-8')
log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8')
log.exit_code = exit_code = channel.recv_exit_status() log.exit_code = exit_code = channel.recv_exit_status()
log.state = log.SUCCESS if exit_code == 0 else log.FAILURE log.state = log.SUCCESS if exit_code == 0 else log.FAILURE
logger.debug('%s execution state on %s is %s' % (backend, server, log.state)) logger.debug('%s execution state on %s is %s' % (backend, server, log.state))
@ -106,7 +107,7 @@ def BashSSH(backend, log, server, cmds):
ssh.close() ssh.close()
def Python(backend, log, server, cmds): def Python(backend, log, server, cmds, async=False):
script = [ str(cmd.func.func_name) + str(cmd.args) for cmd in cmds ] script = [ str(cmd.func.func_name) + str(cmd.args) for cmd in cmds ]
script = json.dumps(script, indent=4).replace('"', '') script = json.dumps(script, indent=4).replace('"', '')
log.script = '\n'.join([log.script, script]) log.script = '\n'.join([log.script, script])

View file

@ -138,8 +138,8 @@ class BackendOperation(models.Model):
return op return op
@classmethod @classmethod
def execute(cls, operations): def execute(cls, operations, async=False):
return manager.execute(operations) return manager.execute(operations, async=async)
@classmethod @classmethod
def execute_action(cls, instance, action): def execute_action(cls, instance, action):

View file

@ -68,7 +68,7 @@ class ServiceMonitor(ServiceBackend):
MonitorData.objects.create(monitor=name, object_id=object_id, MonitorData.objects.create(monitor=name, object_id=object_id,
content_type=ct, value=value, created_at=self.current_date) content_type=ct, value=value, created_at=self.current_date)
def execute(self, server): def execute(self, server, async=False):
log = super(ServiceMonitor, self).execute(server) log = super(ServiceMonitor, self).execute(server, async=async)
self.store(log) self.store(log)
return log return log

View file

@ -27,7 +27,8 @@ def monitor(resource_id, ids=None):
# Execute monitor # Execute monitor
for obj in model.objects.filter(**kwargs): for obj in model.objects.filter(**kwargs):
operations.append(Operation.create(backend, obj, Operation.MONITOR)) operations.append(Operation.create(backend, obj, Operation.MONITOR))
Operation.execute(operations) # TODO async=TRue only when running with celery
Operation.execute(operations, async=True)
kwargs = {'id__in': ids} if ids else {} kwargs = {'id__in': ids} if ids else {}
# Update used resources and trigger resource exceeded and revovery # Update used resources and trigger resource exceeded and revovery

View file

@ -58,6 +58,7 @@ class SystemUserBackend(ServiceController):
def get_context(self, user): def get_context(self, user):
context = { context = {
'object_id': user.pk,
'username': user.username, 'username': user.username,
'password': user.password if user.active else '*%s' % user.password, 'password': user.password if user.active else '*%s' % user.password,
'shell': user.shell, 'shell': user.shell,
@ -70,16 +71,29 @@ class SystemUserBackend(ServiceController):
class SystemUserDisk(ServiceMonitor): class SystemUserDisk(ServiceMonitor):
model = 'systemusers.SystemUser' model = 'systemusers.SystemUser'
resource = ServiceMonitor.DISK resource = ServiceMonitor.DISK
verbose_name = _('Main user disk') verbose_name = _('Systemuser disk')
def prepare(self):
""" slower """
self.append(textwrap.dedent("""\
function monitor () {
{ du -bs "$1" || echo 0; } | awk {'print $1'}
}"""
))
def monitor(self, user): def monitor(self, user):
context = self.get_context(user) context = self.get_context(user)
self.append("du -s %(home)s | cut -f1 | xargs echo %(object_id)s" % context) if user.is_main or os.path.normpath(user.home) == user.get_base_home():
self.append("echo %(object_id)s $(monitor %(home)s)" % context)
else:
# Home appears to be included in other user home
self.append("echo %(object_id)s 0" % context)
def get_context(self, user): def get_context(self, user):
context = SystemUserBackend().get_context(user) return {
context['object_id'] = user.pk 'object_id': user.pk,
return context 'home': user.home,
}
class FTPTraffic(ServiceMonitor): class FTPTraffic(ServiceMonitor):

View file

@ -75,6 +75,7 @@ class SystemUser(models.Model):
super(SystemUser, self).save(*args, **kwargs) super(SystemUser, self).save(*args, **kwargs)
def clean(self): def clean(self):
self.home = os.path.normpath(self.home)
if self.directory: if self.directory:
directory_error = None directory_error = None
if self.has_shell: if self.has_shell:
@ -118,7 +119,7 @@ class SystemUser(models.Model):
context = { context = {
'username': self.username, 'username': self.username,
} }
return settings.SYSTEMUSERS_HOME % context return os.path.normpath(settings.SYSTEMUSERS_HOME % context)
def get_home(self): def get_home(self):
return os.path.join( return os.path.join(