from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation from django.contrib.contenttypes.models import ContentType from django.apps import apps from django.db import models from django.utils import timezone from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ from djcelery.models import PeriodicTask from orchestra.core import validators from orchestra.models import queryset, fields from orchestra.models.utils import get_model_field_path from . import tasks from .backends import ServiceMonitor from .aggregations import Aggregation from .validators import validate_scale class ResourceQuerySet(models.QuerySet): group_by = queryset.group_by class Resource(models.Model): """ Defines a resource, a resource is basically an interpretation of data gathered by a Monitor """ LAST = 'LAST' MONTHLY_SUM = 'MONTHLY_SUM' MONTHLY_AVG = 'MONTHLY_AVG' PERIODS = ( (LAST, _("Last")), (MONTHLY_SUM, _("Monthly sum")), (MONTHLY_AVG, _("Monthly avg")), ) _related = set() # keeps track of related models for resource cleanup name = models.CharField(_("name"), max_length=32, help_text=_("Required. 32 characters or fewer. Lowercase letters, " "digits and hyphen only."), validators=[validators.validate_name]) verbose_name = models.CharField(_("verbose name"), max_length=256) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, help_text=_("Model where this resource will be hooked.")) aggregation = models.CharField(_("aggregation"), max_length=16, choices=Aggregation.get_choices(), default=Aggregation.get_choices()[0][0], help_text=_("Method used for aggregating this resource monitored data.")) on_demand = models.BooleanField(_("on demand"), default=False, help_text=_("If enabled the resource will not be pre-allocated, " "but allocated under the application demand")) default_allocation = models.PositiveIntegerField(_("default allocation"), null=True, blank=True, help_text=_("Default allocation value used when this is not an " "on demand resource")) unit = models.CharField(_("unit"), max_length=16, help_text=_("The unit in which this resource is represented. " "For example GB, KB or subscribers")) scale = models.CharField(_("scale"), max_length=32, validators=[validate_scale], help_text=_("Scale in which this resource monitoring resoults should " "be prorcessed to match with unit. e.g. <tt>10**9</tt>")) disable_trigger = models.BooleanField(_("disable trigger"), default=True, help_text=_("Disables monitors exeeded and recovery triggers")) crontab = models.ForeignKey('djcelery.CrontabSchedule', verbose_name=_("crontab"), null=True, blank=True, on_delete=models.SET_NULL, help_text=_("Crontab for periodic execution. " "Leave it empty to disable periodic monitoring")) monitors = fields.MultiSelectField(_("monitors"), max_length=256, blank=True, choices=ServiceMonitor.get_choices(), help_text=_("Monitor backends used for monitoring this resource.")) is_active = models.BooleanField(_("active"), default=True) objects = ResourceQuerySet.as_manager() class Meta: unique_together = ( ('name', 'content_type'), ('verbose_name', 'content_type') ) def __str__(self): return "%s-%s" % (self.content_type, self.name) @cached_property def aggregation_class(self): return Aggregation.get(self.aggregation) @cached_property def aggregation_instance(self): """ Per request lived type_instance """ return self.aggregation_class(self) def clean(self): self.verbose_name = self.verbose_name.strip() if self.on_demand and self.default_allocation: raise validators.ValidationError({ 'default_allocation': _("Default allocation can not be set for 'on demand' services") }) # Validate that model path exists between ct and each monitor.model monitor_errors = [] for monitor in self.monitors: try: self.get_model_path(monitor) except (RuntimeError, LookupError): model = apps.get_model(ServiceMonitor.get_backend(monitor).model) monitor_errors.append(model._meta.model_name) if monitor_errors: model_name = self.content_type.model_class()._meta.model_name raise validators.ValidationError({ 'monitors': [ _("Path does not exists between '%s' and '%s'") % ( error, model_name, ) for error in monitor_errors ]}) def save(self, *args, **kwargs): super(Resource, self).save(*args, **kwargs) # This only works on tests (multiprocessing used on real deployments) apps.get_app_config('resources').reload_relations() def sync_periodic_task(self, delete=False): """ sync periodic task on save/delete resource operations """ name = 'monitor.%s' % self if delete or not self.crontab or not self.is_active: PeriodicTask.objects.filter(name=name).delete() elif self.pk: try: task = PeriodicTask.objects.get(name=name) except PeriodicTask.DoesNotExist: if self.is_active: PeriodicTask.objects.create( name=name, task='resources.Monitor', args=[self.pk], crontab=self.crontab ) else: if task.crontab != self.crontab: task.crontab = self.crontab task.save(update_fields=['crontab']) def get_model_path(self, monitor): """ returns a model path between self.content_type and monitor.model """ resource_model = self.content_type.model_class() monitor_model = ServiceMonitor.get_backend(monitor).model_class() return get_model_field_path(monitor_model, resource_model) def get_scale(self): return eval(self.scale) def get_verbose_name(self): return self.verbose_name or self.name def monitor(self, run_async=True): if run_async: return tasks.monitor.apply_async(self.pk) return tasks.monitor(self.pk) class ResourceDataQuerySet(models.QuerySet): def get_or_create(self, obj, resource): ct = ContentType.objects.get_for_model(type(obj)) try: return self.get( content_type=ct, object_id=obj.pk, resource=resource ), False except self.model.DoesNotExist: return self.create( content_object=obj, resource=resource, allocated=resource.default_allocation ), True class ResourceData(models.Model): """ Stores computed resource usage and allocation """ resource = models.ForeignKey(Resource, on_delete=models.CASCADE, related_name='dataset', verbose_name=_("resource")) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, verbose_name=_("content type")) object_id = models.PositiveIntegerField(_("object id")) used = models.DecimalField(_("used"), max_digits=16, decimal_places=3, null=True, editable=False) updated_at = models.DateTimeField(_("updated"), null=True, editable=False) allocated = models.PositiveIntegerField(_("allocated"), null=True, blank=True) content_object_repr = models.CharField(_("content object representation"), max_length=256, editable=False) content_object = GenericForeignKey() objects = ResourceDataQuerySet.as_manager() class Meta: unique_together = ('resource', 'content_type', 'object_id') verbose_name_plural = _("resource data") index_together = ( ('content_type', 'object_id'), ) # def __str__(self): # return "%s: %s" % (self.resource, self.content_object) def __str__(self): return "%s" % (self.content_object) @property def unit(self): return self.resource.unit @property def verbose_name(self): return self.resource.verbose_name def get_used(self): resource = self.resource total = 0 has_result = False for monitor, dataset in self.get_monitor_datasets(): dataset = resource.aggregation_instance.filter(dataset) usage = resource.aggregation_instance.compute_usage(dataset) if usage is not None: has_result = True total += usage return float(total)/resource.get_scale() if has_result else None def update(self, current=None): if current is None: current = self.get_used() self.used = current or 0 self.updated_at = timezone.now() self.content_object_repr = str(self.content_object) self.save(update_fields=('used', 'updated_at', 'content_object_repr')) def monitor(self, run_async=False): ids = (self.object_id,) if run_async: return tasks.monitor.delay(self.resource_id, ids=ids) return tasks.monitor(self.resource_id, ids=ids) def get_monitor_datasets(self): resource = self.resource for monitor in resource.monitors: path = resource.get_model_path(monitor) if path == []: dataset = MonitorData.objects.filter( monitor=monitor, content_type=self.content_type_id, object_id=self.object_id, ) else: fields = '__'.join(path) monitor_model = ServiceMonitor.get_backend(monitor).model_class() objects = monitor_model.objects.filter(**{fields: self.object_id}) pks = objects.values_list('id', flat=True) ct = ContentType.objects.get_for_model(monitor_model) dataset = MonitorData.objects.filter( monitor=monitor, content_type=ct, object_id__in=pks, ) yield monitor, dataset class MonitorDataQuerySet(models.QuerySet): group_by = queryset.group_by class MonitorData(models.Model): """ Stores monitored data """ monitor = models.CharField(_("monitor"), max_length=256, db_index=True, choices=ServiceMonitor.get_choices()) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, verbose_name=_("content type")) object_id = models.PositiveIntegerField(_("object id")) created_at = models.DateTimeField(_("created"), default=timezone.now, db_index=True) value = models.DecimalField(_("value"), max_digits=16, decimal_places=2) state = models.DecimalField(_("state"), max_digits=16, decimal_places=2, null=True, help_text=_("Optional field used to store current state needed for diff-based monitoring.")) content_object_repr = models.CharField(_("content object representation"), max_length=256, editable=False) content_object = GenericForeignKey() objects = MonitorDataQuerySet.as_manager() launch_id = models.PositiveIntegerField(_("launch id"), blank=True, null=True) class Meta: get_latest_by = 'id' verbose_name_plural = _("monitor data") index_together = ( ('content_type', 'object_id'), ) def __str__(self): return str(self.monitor) @cached_property def unit(self): return self.resource.unit def create_resource_relation(): class ResourceHandler(object): """ account.resources.web """ def __getattr__(self, attr): """ get or build ResourceData """ if attr.startswith('_'): raise AttributeError try: return self.obj.__resource_cache[attr] except AttributeError: self.obj.__resource_cache = {} except KeyError: pass try: rdata = self.obj.resource_set.get(resource__name=attr) except ResourceData.DoesNotExist: model = self.obj._meta.model_name resource = Resource.objects.get( content_type__model=model, name=attr, is_active=True ) rdata = ResourceData( content_object=self.obj, content_object_repr=str(self.obj), resource=resource, allocated=resource.default_allocation ) self.obj.__resource_cache[attr] = rdata return rdata def __get__(self, obj, cls): """ proxy handled object """ self.obj = obj return self def __iter__(self): return iter(self.obj.resource_set.all()) # Clean previous state for related in Resource._related: try: delattr(related, 'resource_set') delattr(related, 'resources') except AttributeError: pass else: related._meta.private_fields = [ field for field in related._meta.private_fields if field.remote_field.model != ResourceData ] for ct, resources in Resource.objects.group_by('content_type').items(): model = ct.model_class() relation = GenericRelation('resources.ResourceData') model.add_to_class('resource_set', relation) model.resources = ResourceHandler() Resource._related.add(model)