Refactored orchestration backends to support multiple executables

This commit is contained in:
Marc Aymerich 2014-11-21 13:53:39 +00:00
parent be00ab533c
commit 4b15c742ff
13 changed files with 114 additions and 44 deletions

View file

@ -174,16 +174,12 @@ Remember that, as always with QuerySets, any subsequent chained methods which im
* admin systemuser home/directory, add default home and empty directory with has_shell on admin
* Resource used_list_display=True, allocated_list_displat=True, allow resources to show up on list_display
* Move plugins back from apps to orchestra main app
* BackendLog.updated_at (tasks that run over several minutes when finished they do not appear first on the changelist) (like celery tasks.when)
* Resource.monitor(async=True) admin action
* Validate a model path exists between resource.content_type and backend.model
* Periodic task for cleaning old monitoring data

View file

@ -109,7 +109,7 @@ class MysqlDisk(ServiceMonitor):
))
def prepare(self):
""" slower """
super(MysqlDisk, self).prepare()
self.append(textwrap.dedent("""\
function monitor () {
{ du -bs "/var/lib/mysql/$1" || echo 0; } | awk {'print $1'}

View file

@ -146,6 +146,7 @@ class MailmanTraffic(ServiceMonitor):
verbose_name = _("Mailman traffic")
def prepare(self):
super(MailmanTraffic, self).prepare()
current_date = timezone.localtime(self.current_date)
current_date = current_date.strftime("%b %d %H:%M:%S")
self.append(textwrap.dedent("""\

View file

@ -216,6 +216,7 @@ class MaildirDisk(ServiceMonitor):
verbose_name = _("Maildir disk usage")
def prepare(self):
super(MaildirDisk, self).prepare()
current_date = self.current_date.strftime("%Y-%m-%d %H:%M:%S %Z")
self.append(textwrap.dedent("""\
function monitor () {

View file

@ -2,6 +2,7 @@ from functools import partial
from django.db.models.loading import get_model
from django.utils import timezone
from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _
from orchestra.apps import plugins
@ -19,14 +20,13 @@ class ServiceBackend(plugins.Plugin):
"""
model = None
related_models = () # ((model, accessor__attribute),)
script_method = methods.BashSSH
script_method = methods.SSH
script_executable = '/bin/bash'
function_method = methods.Python
type = 'task' # 'sync'
ignore_fields = []
actions = []
# TODO type: 'script', execution:'task'
__metaclass__ = plugins.PluginMount
def __unicode__(self):
@ -36,7 +36,28 @@ class ServiceBackend(plugins.Plugin):
return unicode(self)
def __init__(self):
self.cmds = []
self.head = []
self.content = []
self.tail = []
def __getattribute__(self, attr):
""" Select head, content or tail section depending on the method name """
IGNORE_ATTRS = (
'append',
'cmd_section',
'head',
'tail',
'content',
'script_method',
'function_method'
)
if attr == 'prepare':
self.cmd_section = self.head
elif attr == 'commit':
self.cmd_section = self.tail
elif attr not in IGNORE_ATTRS:
self.cmd_section = self.content
return super(ServiceBackend, self).__getattribute__(attr)
@classmethod
def get_actions(cls):
@ -91,16 +112,34 @@ class ServiceBackend(plugins.Plugin):
def model_class(cls):
return get_model(cls.model)
@property
def scripts(self):
""" group commands based on their method """
if not self.content:
return []
scripts = {}
for method, cmd in self.content:
scripts[method] = []
for method, commands in self.head + self.content + self.tail:
try:
scripts[method] += commands
except KeyError:
pass
return list(scripts.iteritems())
def get_banner(self):
time = timezone.now().strftime("%h %d, %Y %I:%M:%S")
return "Generated by Orchestra at %s" % time
def execute(self, server, async=False):
from .models import BackendLog
state = BackendLog.STARTED if self.cmds else BackendLog.SUCCESS
scripts = self.scripts
state = BackendLog.STARTED
if not scripts:
state = BackendLog.SUCCESS
log = BackendLog.objects.create(backend=self.get_name(), state=state, server=server)
for method, cmds in self.cmds:
method(log, server, cmds, async)
for method, commands in scripts:
method(log, server, commands, async)
if log.state != BackendLog.SUCCESS:
break
return log
@ -113,22 +152,29 @@ class ServiceBackend(plugins.Plugin):
else:
method = self.function_method
cmd = partial(*cmd)
if not self.cmds or self.cmds[-1][0] != method:
self.cmds.append((method, [cmd]))
if not self.cmd_section or self.cmd_section[-1][0] != method:
self.cmd_section.append((method, [cmd]))
else:
self.cmds[-1][1].append(cmd)
self.cmd_section[-1][1].append(cmd)
def prepare(self):
""" hook for executing something at the beging """
pass
"""
hook for executing something at the beging
define functions or initialize state
"""
self.append(
'set -e\n'
'set -o pipefail'
)
def commit(self):
"""
hook for executing something at the end
apply the configuration, usually reloading a service
reloading a service is done in a separated method in order to reload
the service once in bulk operations
"""
pass
self.append('exit 0')
class ServiceController(ServiceBackend):

View file

@ -18,8 +18,14 @@ logger = logging.getLogger(__name__)
transports = {}
def BashSSH(backend, log, server, cmds, async=False):
script = '\n'.join(['set -e', 'set -o pipefail'] + cmds + ['exit 0'])
def SSH(backend, log, server, cmds, async=False):
"""
Executes cmds to remote server using SSH
The script is first copied using SCP in order to overflood the channel with large scripts
Then the script is executed using the defined backend.script_executable
"""
script = '\n'.join(cmds)
script = script.replace('\r', '')
digest = hashlib.md5(script).hexdigest()
path = os.path.join(settings.ORCHESTRATION_TEMP_SCRIPT_PATH, digest)
@ -41,9 +47,9 @@ def BashSSH(backend, log, server, cmds, async=False):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
addr = server.get_address()
key = settings.ORCHESTRATION_SSH_KEY_PATH
try:
# TODO timeout
ssh.connect(addr, username='root', key_filename=settings.ORCHESTRATION_SSH_KEY_PATH, timeout=10)
ssh.connect(addr, username='root', key_filename=key, timeout=10)
except socket.error:
logger.error('%s timed out on %s' % (backend, server))
log.state = log.TIMEOUT
@ -60,12 +66,13 @@ def BashSSH(backend, log, server, cmds, async=False):
# Execute it
context = {
'executable': backend.script_executable,
'remote_path': remote_path,
'digest': digest,
'remove': '' if djsettings.DEBUG else "rm -fr %(remote_path)s\n",
}
cmd = (
"[[ $(md5sum %(remote_path)s|awk {'print $1'}) == %(digest)s ]] && bash %(remote_path)s\n"
"[[ $(md5sum %(remote_path)s|awk {'print $1'}) == %(digest)s ]] && %(executable)s %(remote_path)s\n"
"RETURN_CODE=$?\n"
"%(remove)s"
"exit $RETURN_CODE" % context
@ -108,6 +115,7 @@ def BashSSH(backend, log, server, cmds, async=False):
def Python(backend, log, server, cmds, async=False):
# TODO collect stdout?
script = [ str(cmd.func.func_name) + str(cmd.args) for cmd in cmds ]
script = json.dumps(script, indent=4).replace('"', '')
log.script = '\n'.join([log.script, script])

View file

@ -1,6 +1,8 @@
from django.contrib import messages
from django.core.urlresolvers import reverse
from django.db import transaction
from django.shortcuts import redirect
from django.utils.safestring import mark_safe
from django.utils.translation import ungettext, ugettext_lazy as _
@ -10,23 +12,33 @@ def run_monitor(modeladmin, request, queryset):
if not queryset:
modeladmin.message_user(request, _("No resource has been selected,"))
return redirect(referer)
async = modeladmin.model.monitor.func_defaults[0]
results = []
for resource in queryset:
resource.monitor()
result = resource.monitor()
if not async:
results += result
modeladmin.log_change(request, resource, _("Run monitors"))
num = len(queryset)
async = resource.monitor.func_defaults[0]
if async:
# TODO schedulet link to celery taskstate page
link = reverse('admin:djcelery_taskstate_changelist')
msg = ungettext(
_("One selected resource has been scheduled for monitoring."),
_("%s selected resource have been scheduled for monitoring.") % num,
_("One selected resource has been <a href='%s'>scheduled for monitoring</a>.") % link,
_("%s selected resource have been <a href='%s'>scheduled for monitoring</a>.") % (num, link),
num)
else:
msg = ungettext(
_("One selected resource has been monitored."),
_("%s selected resource have been monitored.") % num,
num)
modeladmin.message_user(request, msg)
if len(results) == 1:
log = results[0].log
link = reverse('admin:orchestration_backendlog_change', args=(log.pk,))
msg = _("One selected resource has <a href='%s'>been monitored</a>.") % link
elif len(results) >= 1:
logs = [str(result.log.pk) for result in results]
link = reverse('admin:orchestration_backendlog_changelist')
link += '?id__in=%s' % ','.join(logs)
msg = _("%s selected resources have <a href='%s'>been monitored</a>.") % (num, link)
else:
msg = _("No related monitors have been executed.")
modeladmin.message_user(request, mark_safe(msg))
if referer:
return redirect(referer)
run_monitor.url_name = 'monitor'

View file

@ -130,7 +130,7 @@ class Resource(models.Model):
def monitor(self, async=True):
if async:
return tasks.monitor.delay(self.pk, async=async)
tasks.monitor(self.pk, async=async)
return tasks.monitor(self.pk, async=async)
class ResourceData(models.Model):

View file

@ -12,6 +12,7 @@ def monitor(resource_id, ids=None, async=True):
resource = Resource.objects.get(pk=resource_id)
resource_model = resource.content_type.model_class()
operations = []
# Execute monitors
for monitor_name in resource.monitors:
backend = ServiceMonitor.get_backend(monitor_name)
@ -23,16 +24,18 @@ def monitor(resource_id, ids=None, async=True):
kwargs = {
path: ids
}
operations = []
# Execute monitor
monitorings = []
for obj in model.objects.filter(**kwargs):
operations.append(Operation.create(backend, obj, Operation.MONITOR))
op = Operation.create(backend, obj, Operation.MONITOR)
operations.append(op)
monitorings.append(op)
# TODO async=TRue only when running with celery
Operation.execute(operations, async=async)
Operation.execute(monitorings, async=async)
kwargs = {'id__in': ids} if ids else {}
# Update used resources and trigger resource exceeded and revovery
operations = []
triggers = []
model = resource.content_type.model_class()
for obj in model.objects.filter(**kwargs):
data = ResourceData.get_or_create(obj, resource)
@ -40,8 +43,9 @@ def monitor(resource_id, ids=None, async=True):
if not resource.disable_trigger:
if data.used > data.allocated:
op = Operation.create(backend, obj, Operation.EXCEED)
operations.append(op)
triggers.append(op)
elif data.used < data.allocated:
op = Operation.create(backend, obj, Operation.RECOVERY)
operations.append(op)
Operation.execute(operations)
triggers.append(op)
Operation.execute(triggers)
return operations

View file

@ -74,7 +74,7 @@ class SystemUserDisk(ServiceMonitor):
verbose_name = _('Systemuser disk')
def prepare(self):
""" slower """
super(SystemUserDisk, self).prepare()
self.append(textwrap.dedent("""\
function monitor () {
{ du -bs "$1" || echo 0; } | awk {'print $1'}
@ -102,6 +102,7 @@ class FTPTraffic(ServiceMonitor):
verbose_name = _('Systemuser FTP traffic')
def prepare(self):
super(FTPTraffic, self).prepare()
current_date = self.current_date.strftime("%Y-%m-%d %H:%M:%S %Z")
self.append(textwrap.dedent("""\
function monitor () {

View file

@ -37,7 +37,7 @@ class WebApp(models.Model):
return self.name or settings.WEBAPPS_BLANK_NAME
def get_fpm_port(self):
return settings.WEBAPPS_FPM_START_PORT + self.account.pk
return settings.WEBAPPS_FPM_START_PORT + self.account_id
def get_directive(self):
directive = settings.WEBAPPS_TYPES[self.type]['directive']

View file

@ -218,6 +218,7 @@ class Apache2Traffic(ServiceMonitor):
verbose_name = _("Apache 2 Traffic")
def prepare(self):
super(Apache2Traffic, self).prepare()
ignore_hosts = '\\|'.join(settings.WEBSITES_TRAFFIC_IGNORE_HOSTS)
context = {
'current_date': self.current_date.strftime("%Y-%m-%d %H:%M:%S %Z"),

View file

@ -99,7 +99,7 @@ class BaseLiveServerTestCase(AppDependencyMixin, LiveServerTestCase):
def admin_login(self):
session = SessionStore()
session[SESSION_KEY] = self.account.pk
session[SESSION_KEY] = self.account_id
session[BACKEND_SESSION_KEY] = settings.AUTHENTICATION_BACKENDS[0]
session.save()
## to set a cookie we need to first visit the domain.