From cc445559d0df544fd20425c211eb070411c6997a Mon Sep 17 00:00:00 2001 From: Marc Date: Thu, 10 Jul 2014 10:03:22 +0000 Subject: [PATCH] Resource monitoring implementation --- orchestra/apps/accounts/models.py | 5 ++ orchestra/apps/orchestration/admin.py | 4 +- orchestra/apps/orchestration/backends.py | 7 ++ orchestra/apps/orchestration/models.py | 34 ++++++---- orchestra/apps/resources/admin.py | 12 +++- orchestra/apps/resources/apps.py | 8 +-- orchestra/apps/resources/forms.py | 4 +- orchestra/apps/resources/models.py | 83 ++++++++++++++++++------ orchestra/apps/resources/tasks.py | 42 +++++++++--- orchestra/utils/python.py | 20 +++--- 10 files changed, 156 insertions(+), 63 deletions(-) diff --git a/orchestra/apps/accounts/models.py b/orchestra/apps/accounts/models.py index a62a6b6c..6eda0dfc 100644 --- a/orchestra/apps/accounts/models.py +++ b/orchestra/apps/accounts/models.py @@ -2,6 +2,8 @@ from django.conf import settings as django_settings from django.db import models from django.utils.translation import ugettext_lazy as _ +from orchestra.core import services + from . import settings @@ -23,3 +25,6 @@ class Account(models.Model): def name(self): self._cached_name = getattr(self, '_cached_name', self.user.username) return self._cached_name + + +services.register(Account, menu=False) diff --git a/orchestra/apps/orchestration/admin.py b/orchestra/apps/orchestration/admin.py index b0c2acbe..72d93b1c 100644 --- a/orchestra/apps/orchestration/admin.py +++ b/orchestra/apps/orchestration/admin.py @@ -30,7 +30,7 @@ class RouteAdmin(admin.ModelAdmin): def display_model(self, route): try: - return route.get_backend().model + return route.backend_class().model except KeyError: return "NOT AVAILABLE" display_model.short_description = _("model") @@ -38,7 +38,7 @@ class RouteAdmin(admin.ModelAdmin): def display_actions(self, route): try: - return '
'.join(route.get_backend().get_actions()) + return '
'.join(route.backend_class().get_actions()) except KeyError: return "NOT AVAILABLE" display_actions.short_description = _("actions") diff --git a/orchestra/apps/orchestration/backends.py b/orchestra/apps/orchestration/backends.py index 6dc673e1..70f97b88 100644 --- a/orchestra/apps/orchestration/backends.py +++ b/orchestra/apps/orchestration/backends.py @@ -67,6 +67,13 @@ class ServiceBackend(object): def get_backends(cls): return cls.plugins + @classmethod + def get_backend(cls, name): + for backend in ServiceMonitor.get_backends(): + if backend.get_name() == name: + return backend + raise KeyError('This backend is not registered') + @classmethod def get_choices(cls): backends = cls.get_backends() diff --git a/orchestra/apps/orchestration/models.py b/orchestra/apps/orchestration/models.py index 16bfcb59..e2468507 100644 --- a/orchestra/apps/orchestration/models.py +++ b/orchestra/apps/orchestration/models.py @@ -5,6 +5,7 @@ from django.db import models from django.utils.translation import ugettext_lazy as _ from orchestra.utils.apps import autodiscover +from orchestra.utils.functional import cached from . import settings, manager from .backends import ServiceBackend @@ -145,31 +146,40 @@ class Route(models.Model): # msg = _("%s backend is not compatible with %s method") # raise ValidationError(msg % (self.backend, self.method) + @classmethod + @cached + def get_routing_table(cls): + table = {} + for route in cls.objects.filter(is_active=True): + for action in route.backend_class().get_actions(): + key = (route.backend, action) + try: + table[key].append(route) + except KeyError: + table[key] = [route] + return table + @classmethod def get_servers(cls, operation): - # TODO use cached data sctructure and refactor - backend = operation.backend + table = cls.get_routing_table() servers = [] + key = (operation.backend.get_name(), operation.action) try: - routes = cls.objects.filter(is_active=True, backend=backend.get_name()) - except cls.DoesNotExist: + routes = table[key] + except KeyError: return servers safe_locals = { 'instance': operation.instance } - actions = backend.get_actions() for route in routes: - if operation.action in actions and eval(route.match, safe_locals): + if eval(route.match, safe_locals): servers.append(route.host) return servers - def get_backend(self): - for backend in ServiceBackend.get_backends(): - if backend.get_name() == self.backend: - return backend - raise KeyError('This backend is not registered') + def backend_class(self): + return ServiceBackend.get_backend(self.backend) -# def get_method_class(self): +# def method_class(self): # for method in MethodBackend.get_backends(): # if method.get_name() == self.method: # return method diff --git a/orchestra/apps/resources/admin.py b/orchestra/apps/resources/admin.py index 221b371d..48ed4217 100644 --- a/orchestra/apps/resources/admin.py +++ b/orchestra/apps/resources/admin.py @@ -5,6 +5,7 @@ from django.utils.translation import ugettext_lazy as _ from orchestra.admin.filters import UsedContentTypeFilter from orchestra.admin.utils import insertattr, get_modeladmin +from orchestra.core import services from orchestra.utils import running_syncdb from .forms import ResourceForm @@ -12,12 +13,14 @@ from .models import Resource, ResourceData, MonitorData class ResourceAdmin(admin.ModelAdmin): + # TODO warning message server/celery should be restarted when creating things + list_display = ( 'name', 'verbose_name', 'content_type', 'period', 'ondemand', 'default_allocation', 'disable_trigger' ) list_filter = (UsedContentTypeFilter, 'period', 'ondemand', 'disable_trigger') - + def save_model(self, request, obj, form, change): super(ResourceAdmin, self).save_model(request, obj, form, change) model = obj.content_type.model_class() @@ -29,6 +32,13 @@ class ResourceAdmin(admin.ModelAdmin): inline = resource_inline_factory(resources) inlines.append(inline) modeladmin.inlines = inlines + + def formfield_for_dbfield(self, db_field, **kwargs): + """ filter service content_types """ + if db_field.name == 'content_type': + models = [ model._meta.model_name for model in services.get().keys() ] + kwargs['queryset'] = db_field.rel.to.objects.filter(model__in=models) + return super(ResourceAdmin, self).formfield_for_dbfield(db_field, **kwargs) class ResourceDataAdmin(admin.ModelAdmin): diff --git a/orchestra/apps/resources/apps.py b/orchestra/apps/resources/apps.py index bfadaa38..36fc3aed 100644 --- a/orchestra/apps/resources/apps.py +++ b/orchestra/apps/resources/apps.py @@ -9,10 +9,6 @@ class ResourcesConfig(AppConfig): verbose_name = 'Resources' def ready(self): - from .models import Resource - # TODO execute on Resource.save() if not running_syncdb(): - relation = generic.GenericRelation('resources.ResourceData') - for resources in Resource.group_by_content_type(): - model = resources[0].content_type.model_class() - model.add_to_class('resources', relation) + from .models import create_resource_relation + create_resource_relation() diff --git a/orchestra/apps/resources/forms.py b/orchestra/apps/resources/forms.py index c3645f1a..8bcd545c 100644 --- a/orchestra/apps/resources/forms.py +++ b/orchestra/apps/resources/forms.py @@ -9,7 +9,7 @@ class ResourceForm(forms.ModelForm): required=False) used = forms.IntegerField(label=_("Used"), widget=ShowTextWidget(), required=False) - last_update = forms.CharField(label=_("Last update"), widget=ShowTextWidget(), + last_update = forms.DateTimeField(label=_("Last update"), widget=ShowTextWidget(), required=False) allocated = forms.IntegerField(label=_("Allocated")) @@ -21,7 +21,7 @@ class ResourceForm(forms.ModelForm): super(ResourceForm, self).__init__(*args, **kwargs) if self.resource: self.fields['verbose_name'].initial = self.resource.verbose_name - self.fields['used'].initial = self.resource.get_current() + self.fields['used'].initial = self.resource.get_used() # TODO if self.resource.ondemand: self.fields['allocated'].required = False self.fields['allocated'].widget = ReadOnlyWidget(None, '') diff --git a/orchestra/apps/resources/models.py b/orchestra/apps/resources/models.py index 284cfe94..c2a5a136 100644 --- a/orchestra/apps/resources/models.py +++ b/orchestra/apps/resources/models.py @@ -42,12 +42,35 @@ class Resource(models.Model): null=True, blank=True) is_active = models.BooleanField(_("is active"), default=True) disable_trigger = models.BooleanField(_("disable trigger"), default=False) + crontab = models.ForeignKey(CrontabSchedule, verbose_name=_("crontab"), + help_text=_("Crontab for periodic execution")) + # TODO create custom field that returns backend python objects monitors = MultiSelectField(_("monitors"), max_length=256, choices=ServiceMonitor.get_choices()) def __unicode__(self): return self.name + def save(self, *args, **kwargs): + super(Resource, self).save(*args, **kwargs) + # Create Celery periodic task + name = 'monitor.%s' % str(self) + try: + task = PeriodicTask.objects.get(name=name) + except PeriodicTask.DoesNotExist: + PeriodicTask.objects.create(name=name, task='resources.Monitor', + args=[self.pk], crontab=self.crontab) + else: + if task.crontab != self.crontab: + task.crontab = self.crontab + task.save() + + def delete(self, *args, **kwargs): + super(Resource, self).delete(*args, **kwargs) + name = 'monitor.%s' % str(self) + PeriodicTask.objects.filter(name=name, task='resources.Monitor', + args=[self.pk]).delete() + @classmethod def group_by_content_type(cls): prev = None @@ -63,14 +86,40 @@ class Resource(models.Model): prev = ct if group: yield group + + +class ResourceData(models.Model): + """ Stores computed resource usage and allocation """ + resource = models.ForeignKey(Resource, related_name='dataset') + content_type = models.ForeignKey(ContentType) + object_id = models.PositiveIntegerField() + used = models.PositiveIntegerField(null=True) + last_update = models.DateTimeField(null=True) + allocated = models.PositiveIntegerField(null=True) - def get_current(self): + content_object = generic.GenericForeignKey() + + class Meta: + unique_together = ('resource', 'content_type', 'object_id') + verbose_name_plural = _("resource data") + + @classmethod + def get_or_create(cls, obj, resource): + try: + return cls.objects.get(content_object=obj, resource=resource) + except cls.DoesNotExists: + return cls.objects.create(content_object=obj, resource=resource, + allocated=resource.defalt_allocation) + + def get_used(self): + resource = self.resource today = datetime.date.today() result = 0 has_result = False - for monitor in self.monitors: - dataset = MonitorData.objects.filter(monitor=monitor) - if self.period == self.MONTHLY_AVG: + for monitor in resource.monitors: + dataset = MonitorData.objects.filter(monitor=monitor, + content_type=self.content_type, object_id=self.object_id) + if resource.period == resource.MONTHLY_AVG: try: last = dataset.latest() except MonitorData.DoesNotExist: @@ -83,14 +132,14 @@ class Resource(models.Model): for data in dataset: slot = (previous-data.date).total_seconds() result += data.value * slot/total - elif self.period == self.MONTHLY_SUM: + elif resource.period == resource.MONTHLY_SUM: data = dataset.filter(date__year=today.year, date__month=today.month) value = data.aggregate(models.Sum('value'))['value__sum'] if value: has_result = True result += value - elif self.period == self.LAST: + elif resource.period == resource.LAST: try: result += dataset.latest().value except MonitorData.DoesNotExist: @@ -101,21 +150,6 @@ class Resource(models.Model): return result if has_result else None -class ResourceData(models.Model): - """ Stores computed resource usage and allocation """ - resource = models.ForeignKey(Resource) - content_type = models.ForeignKey(ContentType) - object_id = models.PositiveIntegerField() - used = models.PositiveIntegerField(null=True) - last_update = models.DateTimeField(null=True) - allocated = models.PositiveIntegerField(null=True) - - content_object = generic.GenericForeignKey() - - class Meta: - unique_together = ('resource', 'content_type', 'object_id') - verbose_name_plural = _("resource data") - class MonitorData(models.Model): """ Stores monitored data """ monitor = models.CharField(_("monitor"), max_length=256, @@ -133,3 +167,10 @@ class MonitorData(models.Model): def __unicode__(self): return str(self.monitor) + + +def create_resource_relation(): + relation = generic.GenericRelation('resources.ResourceData') + for resources in Resource.group_by_content_type(): + model = resources[0].content_type.model_class() + model.add_to_class('resources', relation) diff --git a/orchestra/apps/resources/tasks.py b/orchestra/apps/resources/tasks.py index 046b33a9..9a870132 100644 --- a/orchestra/apps/resources/tasks.py +++ b/orchestra/apps/resources/tasks.py @@ -1,14 +1,38 @@ from celery import shared_task +from orchestra.apps.orchestration.models import BackendOperation as Operation + from .backends import ServiceMonitor +from .models import MonitorData -@shared_task -def monitor(backend_name): - routes = Route.objects.filter(is_active=True, backend=backend_name) - for route in routes: - pass - for backend in ServiceMonitor.get_backends(): - if backend.get_name() == backend_name: - # TODO execute monitor BackendOperation - pass +@shared_task(name='resources.Monitor') +def monitor(resource_id): + resource = Resource.objects.get(pk=resource_id) + + # Execute monitors + for monitor_name in resource.monitors: + backend = ServiceMonitor.get_backend(monitor_name) + model = backend.model + operations = [] + # Execute monitor + for obj in model.objects.all(): + operations.append(Operation.create(backend, obj, Operation.MONITOR)) + Operation.execute(operations) + + # Update used resources and trigger resource exceeded and revovery + operations = [] + model = resource.model + for obj in model.objects.all(): + data = MonitorData.get_or_create(obj, resource) + current = data.get_used() + if data.used < data.allocated and current > data.allocated: + op = Operation.create(backend, data.content_object, Operation.EXCEED) + operations.append(op) + elif res.used > res.allocated and current < res.allocated: + op = Operation.create(backend, data.content_object, Operation.RECOVERY) + operation.append(op) + data.used = current + data.las_update = datetime.now() + data.save() + Operation.execute(operations) diff --git a/orchestra/utils/python.py b/orchestra/utils/python.py index c90a0ba1..c963918d 100644 --- a/orchestra/utils/python.py +++ b/orchestra/utils/python.py @@ -15,51 +15,51 @@ class OrderedSet(collections.MutableSet): self.map = {} # key --> [key, prev, next] if iterable is not None: self |= iterable - + def __len__(self): return len(self.map) - + def __contains__(self, key): return key in self.map - + def add(self, key): if key not in self.map: end = self.end curr = end[1] curr[2] = end[1] = self.map[key] = [key, curr, end] - + def discard(self, key): - if key in self.map: + if key in self.map: key, prev, next = self.map.pop(key) prev[2] = next next[1] = prev - + def __iter__(self): end = self.end curr = end[2] while curr is not end: yield curr[0] curr = curr[2] - + def __reversed__(self): end = self.end curr = end[1] while curr is not end: yield curr[0] curr = curr[1] - + def pop(self, last=True): if not self: raise KeyError('set is empty') key = self.end[1][0] if last else self.end[2][0] self.discard(key) return key - + def __repr__(self): if not self: return '%s()' % (self.__class__.__name__,) return '%s(%r)' % (self.__class__.__name__, list(self)) - + def __eq__(self, other): if isinstance(other, OrderedSet): return len(self) == len(other) and list(self) == list(other)